formats: Streamline the h26x fragment reception
This commit is contained in:
parent
58d494853f
commit
ea00db4d12
|
|
@ -84,6 +84,7 @@ uvgrtp::formats::h26x::h26x(std::shared_ptr<uvgrtp::socket> socket, std::shared_
|
|||
media(socket, rtp, flags),
|
||||
queued_(),
|
||||
frames_(),
|
||||
fragments_(UINT16_MAX, nullptr),
|
||||
dropped_(),
|
||||
rtp_ctx_(rtp),
|
||||
last_garbage_collection_(uvgrtp::clock::hrc::now())
|
||||
|
|
@ -525,46 +526,26 @@ bool uvgrtp::formats::h26x::is_frame_late(uvgrtp::formats::h26x_info_t& hinfo, s
|
|||
|
||||
uint32_t uvgrtp::formats::h26x::drop_frame(uint32_t ts)
|
||||
{
|
||||
uint16_t s_seq = frames_.at(ts).s_seq;
|
||||
uint16_t e_seq = frames_.at(ts).e_seq;
|
||||
|
||||
if (s_seq == INVALID_SEQ)
|
||||
{
|
||||
s_seq = 0;
|
||||
}
|
||||
if (e_seq == INVALID_SEQ)
|
||||
{
|
||||
e_seq = 0;
|
||||
}
|
||||
|
||||
LOG_INFO("Dropping frame. Ts: %lu, Seq: %u - %u, expected/received: %li/%li",
|
||||
ts, s_seq, e_seq, calculate_expected_fus(ts), frames_[ts].pkts_received);
|
||||
|
||||
uint32_t total_cleaned = 0;
|
||||
if (frames_.find(ts) == frames_.end())
|
||||
{
|
||||
LOG_ERROR("Tried to drop a non-existing frame");
|
||||
return 0;
|
||||
return total_cleaned;
|
||||
}
|
||||
|
||||
uint32_t total_cleaned = 0;
|
||||
uint16_t s_seq = frames_.at(ts).s_seq;
|
||||
uint16_t e_seq = frames_.at(ts).e_seq;
|
||||
|
||||
// clean fragments
|
||||
for (auto& fragment : frames_[ts].fragments) {
|
||||
total_cleaned += fragment.second->payload_len + sizeof(uvgrtp::frame::rtp_frame);
|
||||
(void)uvgrtp::frame::dealloc_frame(fragment.second);
|
||||
}
|
||||
frames_[ts].fragments.clear();
|
||||
LOG_INFO("Dropping frame. Ts: %lu, Seq: %u <-> %u, received/expected: %lli/%lli",
|
||||
ts, s_seq, e_seq, frames_[ts].received_packet_seqs.size(), calculate_expected_fus(ts));
|
||||
|
||||
// clean fragments that have no place
|
||||
for (auto& temporary : frames_[ts].temporary) {
|
||||
total_cleaned += temporary->payload_len + sizeof(uvgrtp::frame::rtp_frame);;
|
||||
(void)uvgrtp::frame::dealloc_frame(temporary);
|
||||
for (auto& fragment_seq : frames_[ts].received_packet_seqs)
|
||||
{
|
||||
total_cleaned += fragments_[fragment_seq]->payload_len + sizeof(uvgrtp::frame::rtp_frame);
|
||||
free_fragment(fragment_seq);
|
||||
}
|
||||
|
||||
// lastly, remove the frame structure from map
|
||||
frames_[ts].temporary.clear();
|
||||
frames_.erase(ts);
|
||||
|
||||
dropped_.insert(ts);
|
||||
|
||||
return total_cleaned;
|
||||
|
|
@ -643,10 +624,10 @@ rtp_error_t uvgrtp::formats::h26x::packet_handler(int flags, uvgrtp::frame::rtp_
|
|||
// We have received a fragment. Rest of the function deals with fragmented frames
|
||||
|
||||
// Fragment timestamp, all fragments of the same frame have the same timestamp
|
||||
uint32_t fragment_ts = frame->header.timestamp;
|
||||
uint16_t fragment_ts = frame->header.timestamp;
|
||||
|
||||
// Fragment sequence number, determines the order of the fragments within frame
|
||||
uint32_t fragment_seq = frame->header.seq;
|
||||
uint16_t fragment_seq = frame->header.seq;
|
||||
|
||||
uvgrtp::formats::NAL_TYPE nal_type = get_nal_type(frame); // Intra, inter or some other type of frame
|
||||
|
||||
|
|
@ -655,81 +636,61 @@ rtp_error_t uvgrtp::formats::h26x::packet_handler(int flags, uvgrtp::frame::rtp_
|
|||
|
||||
// Make sure we haven't discarded the frame corresponding to the fragment timestamp before
|
||||
if (dropped_.find(fragment_ts) != dropped_.end()) {
|
||||
LOG_WARN("packet belonging to a dropped frame was received!");
|
||||
LOG_WARN("Fragment belonging to a dropped frame was received! Timestamp: %lu",
|
||||
fragment_ts);
|
||||
return RTP_GENERIC_ERROR;
|
||||
}
|
||||
|
||||
initialize_new_fragmented_frame(fragment_ts);
|
||||
}
|
||||
else if (frames_[fragment_ts].received_packet_seqs.find(fragment_seq) !=
|
||||
frames_[fragment_ts].received_packet_seqs.end()) {
|
||||
|
||||
// we have already received this seq
|
||||
LOG_WARN("Detected duplicate fragment, dropping! Seq: %u", fragment_seq);
|
||||
(void)uvgrtp::frame::dealloc_frame(frame); // free fragment memory
|
||||
*out = nullptr;
|
||||
return RTP_GENERIC_ERROR;
|
||||
}
|
||||
|
||||
const uint8_t sizeof_fu_headers = (uint8_t)get_payload_header_size() +
|
||||
get_fu_header_size();
|
||||
|
||||
frames_[fragment_ts].pkts_received += 1;
|
||||
// keep track of fragments belonging to this frame in case we need to delete them
|
||||
frames_[fragment_ts].received_packet_seqs.insert(fragment_seq);
|
||||
frames_[fragment_ts].total_size += (frame->payload_len - sizeof_fu_headers);
|
||||
|
||||
if (frag_type == uvgrtp::formats::FRAG_TYPE::FT_START) {
|
||||
frames_[fragment_ts].s_seq = fragment_seq; // set the first sequence number of the frame
|
||||
frames_[fragment_ts].fragments[fragment_seq] = frame;
|
||||
|
||||
// move all fragments that arrived before the start fragment to correct places
|
||||
for (auto& temp_fragment : frames_[fragment_ts].temporary) {
|
||||
uint16_t temp_seq = temp_fragment->header.seq;
|
||||
uint32_t seq = (fragment_seq > temp_seq) ? 0x10000 + temp_seq : temp_seq;
|
||||
|
||||
frames_[fragment_ts].fragments[seq] = temp_fragment;
|
||||
}
|
||||
|
||||
frames_[fragment_ts].temporary.clear();
|
||||
}
|
||||
else
|
||||
if (fragments_[fragment_seq] != nullptr)
|
||||
{
|
||||
if (frag_type == uvgrtp::formats::FRAG_TYPE::FT_END)
|
||||
{
|
||||
frames_[fragment_ts].e_seq = fragment_seq;
|
||||
}
|
||||
LOG_WARN("Found an existing fragment with same sequence number %u! Fragment ts: %lu, current ts: %lu",
|
||||
fragment_seq, fragments_[fragment_seq]->header.timestamp, fragment_ts);
|
||||
|
||||
// find a place to store the fragment
|
||||
if (frames_[fragment_ts].s_seq != INVALID_SEQ) { // has the start fragment arrived yet?
|
||||
free_fragment(fragment_seq);
|
||||
}
|
||||
|
||||
/* Out-of-order nature poses an interesting problem when reconstructing the frame:
|
||||
* How to store the fragments such that we don't have to shuffle them around when
|
||||
* frame reconstruction takes place?
|
||||
*
|
||||
* std::map is an option but the overflow of 16-bit sequence number counter makes
|
||||
* that a little harder because if the first few fragments of a frame are near
|
||||
* 16-bit maximum (65535), the rest of the fragments are going to have sequence
|
||||
* numbers less than that and thus our frame reconstruction breaks.
|
||||
*
|
||||
* This can be solved by checking if current fragment's sequence is less than start fragment's sequence number
|
||||
* (overflow has occurred) and correcting the current sequence by adding 0x10000 to its value so it appears
|
||||
* in order with other fragments
|
||||
// save the fragment for later reconstruction
|
||||
fragments_[fragment_seq] = frame;
|
||||
|
||||
* Here the overflow has occurred, adjust the sequence number of current
|
||||
* fragment so it appears in order with other fragments of the frame
|
||||
*
|
||||
* Note: if the frame is huge (~94 MB), this will not work */
|
||||
frames_[fragment_ts].fragments[((frames_[fragment_ts].s_seq > fragment_seq) ? 0x10000 + fragment_seq : fragment_seq)] = frame;
|
||||
}
|
||||
else {
|
||||
// position for the fragment cannot be calculated so move the fragment to a temporary storage
|
||||
frames_[fragment_ts].temporary.push_back(frame);
|
||||
}
|
||||
// if this is first or last, save it to help with reconstruction
|
||||
if (frag_type == uvgrtp::formats::FRAG_TYPE::FT_START) {
|
||||
frames_[fragment_ts].s_seq = fragment_seq;
|
||||
frames_[fragment_ts].start_received = true;
|
||||
}
|
||||
else if (frag_type == uvgrtp::formats::FRAG_TYPE::FT_END) {
|
||||
frames_[fragment_ts].e_seq = fragment_seq;
|
||||
frames_[fragment_ts].end_received = true;
|
||||
}
|
||||
|
||||
// have the first and last fragment arrived so we can possibly start reconstructing the frame?
|
||||
if (frames_[fragment_ts].s_seq != INVALID_SEQ && frames_[fragment_ts].e_seq != INVALID_SEQ) {
|
||||
if (frames_[fragment_ts].start_received && frames_[fragment_ts].end_received) {
|
||||
size_t received = calculate_expected_fus(fragment_ts);
|
||||
|
||||
// have we received every fragment and can the frame can be reconstructed?
|
||||
if (received == frames_[fragment_ts].pkts_received) {
|
||||
|
||||
|
||||
if (received == frames_[fragment_ts].received_packet_seqs.size()) {
|
||||
// TODO: check here if previous dependencies have been sent forward
|
||||
|
||||
|
||||
// Finally, reconstruct and return the completed frame
|
||||
|
||||
// Reconstruction of frame from fragments
|
||||
size_t fptr = 0;
|
||||
|
||||
// allocating the frame with start code ready saves a copy operation for the frame
|
||||
|
|
@ -740,16 +701,24 @@ rtp_error_t uvgrtp::formats::h26x::packet_handler(int flags, uvgrtp::frame::rtp_
|
|||
get_nal_header_from_fu_headers(fptr, frame->payload, complete->payload); // NAL header
|
||||
fptr += get_nal_header_size();
|
||||
|
||||
// reconstruct rest of the frame data from fragments
|
||||
for (auto& fragment : frames_.at(fragment_ts).fragments) {
|
||||
uint16_t next_from_last = frames_.at(fragment_ts).e_seq + 1;
|
||||
for (uint16_t i = frames_.at(fragment_ts).s_seq; i != next_from_last; ++i)
|
||||
{
|
||||
if (fragments_[i] == nullptr)
|
||||
{
|
||||
LOG_ERROR("Missing fragment in reconstruction. Seq range: %u - %u. Missing seq %u",
|
||||
frames_.at(fragment_ts).s_seq, frames_.at(fragment_ts).e_seq, i);
|
||||
return RTP_GENERIC_ERROR;
|
||||
}
|
||||
|
||||
// copy everything expect fu headers (which repeat for every fu)
|
||||
std::memcpy(
|
||||
&complete->payload[fptr],
|
||||
&fragment.second->payload[sizeof_fu_headers],
|
||||
fragment.second->payload_len - sizeof_fu_headers
|
||||
&fragments_[i]->payload[sizeof_fu_headers],
|
||||
fragments_[i]->payload_len - sizeof_fu_headers
|
||||
);
|
||||
fptr += fragment.second->payload_len - sizeof_fu_headers;
|
||||
(void)uvgrtp::frame::dealloc_frame(fragment.second); // free fragment memory
|
||||
fptr += fragments_[i]->payload_len - sizeof_fu_headers;
|
||||
free_fragment(i);
|
||||
}
|
||||
|
||||
*out = complete; // save result to output
|
||||
|
|
@ -813,32 +782,46 @@ void uvgrtp::formats::h26x::garbage_collect_lost_frames()
|
|||
|
||||
void uvgrtp::formats::h26x::initialize_new_fragmented_frame(uint32_t ts)
|
||||
{
|
||||
frames_[ts].s_seq = INVALID_SEQ;
|
||||
frames_[ts].e_seq = INVALID_SEQ;
|
||||
frames_[ts].s_seq = 0;
|
||||
frames_[ts].start_received = false;
|
||||
frames_[ts].e_seq = 0;
|
||||
frames_[ts].end_received = false;
|
||||
|
||||
frames_[ts].sframe_time = uvgrtp::clock::hrc::now();
|
||||
frames_[ts].total_size = 0;
|
||||
frames_[ts].pkts_received = 0;
|
||||
}
|
||||
|
||||
size_t uvgrtp::formats::h26x::calculate_expected_fus(uint32_t ts)
|
||||
{
|
||||
if (frames_[ts].s_seq == INVALID_SEQ || frames_[ts].e_seq == INVALID_SEQ)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
size_t expected = 0;
|
||||
size_t s_seq = frames_[ts].s_seq;
|
||||
size_t e_seq = frames_[ts].e_seq;
|
||||
size_t expected = 0;
|
||||
|
||||
if (s_seq > e_seq)
|
||||
expected = 0xffff - s_seq + e_seq + 2;
|
||||
else
|
||||
expected = e_seq - s_seq + 1;
|
||||
if (frames_[ts].start_received && frames_[ts].end_received)
|
||||
{
|
||||
if (s_seq > e_seq) {
|
||||
expected = (UINT16_MAX - s_seq) + e_seq + 2;
|
||||
}
|
||||
else {
|
||||
expected = e_seq - s_seq + 1;
|
||||
}
|
||||
}
|
||||
|
||||
return expected;
|
||||
}
|
||||
|
||||
void uvgrtp::formats::h26x::free_fragment(uint16_t sequence_number)
|
||||
{
|
||||
if (fragments_[sequence_number] == nullptr)
|
||||
{
|
||||
LOG_ERROR("Tried to free an already freed fragment with seq: %u", sequence_number);
|
||||
return;
|
||||
}
|
||||
|
||||
(void)uvgrtp::frame::dealloc_frame(fragments_[sequence_number]); // free fragment memory
|
||||
fragments_[sequence_number] = nullptr;
|
||||
}
|
||||
|
||||
void uvgrtp::formats::h26x::scl(uint8_t* data, size_t data_len, size_t packet_size,
|
||||
std::vector<nal_info>& nals, bool& can_be_aggregated)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
|
||||
namespace uvgrtp {
|
||||
|
||||
|
|
@ -16,7 +17,6 @@ namespace uvgrtp {
|
|||
|
||||
namespace formats {
|
||||
|
||||
#define INVALID_SEQ 0x13371338
|
||||
#define RTP_HDR_SIZE 12
|
||||
|
||||
enum class FRAG_TYPE {
|
||||
|
|
@ -38,24 +38,20 @@ namespace uvgrtp {
|
|||
/* clock reading when the first fragment is received */
|
||||
uvgrtp::clock::hrc::hrc_t sframe_time;
|
||||
|
||||
bool start_received = false;
|
||||
bool end_received = false;
|
||||
|
||||
/* sequence number of the fragment with s-bit (start) */
|
||||
uint32_t s_seq = 0;
|
||||
uint16_t s_seq = 0;
|
||||
|
||||
/* sequence number of the fragment with e-bit (end) */
|
||||
uint32_t e_seq = 0;
|
||||
|
||||
/* how many fragments have been received */
|
||||
size_t pkts_received = 0;
|
||||
uint16_t e_seq = 0;
|
||||
|
||||
/* total size of all fragments */
|
||||
size_t total_size = 0;
|
||||
|
||||
/* map of frame's fragments,
|
||||
* allows out-of-order insertion and loop-through in order */
|
||||
std::map<uint32_t, uvgrtp::frame::rtp_frame*> fragments;
|
||||
|
||||
/* storage for fragments that require relocation */
|
||||
std::vector<uvgrtp::frame::rtp_frame*> temporary;
|
||||
// needed for cleaning fragments in case the frame is dropped
|
||||
std::set<uint16_t> received_packet_seqs;
|
||||
} h26x_info_t;
|
||||
|
||||
struct nal_info
|
||||
|
|
@ -96,8 +92,7 @@ namespace uvgrtp {
|
|||
|
||||
/* Packet handler for RTP frames that transport HEVC bitstream
|
||||
*
|
||||
* If "frame" is not a fragmentation unit, packet handler checks
|
||||
* if "frame" is SPS/VPS/PPS packet and if so, returns the packet
|
||||
* If "frame" is not a fragmentation unit, packet handler returns the packet
|
||||
* to user immediately.
|
||||
*
|
||||
* If "frame" is a fragmentation unit, packet handler checks if
|
||||
|
|
@ -162,6 +157,8 @@ namespace uvgrtp {
|
|||
inline size_t calculate_expected_fus(uint32_t ts);
|
||||
inline void initialize_new_fragmented_frame(uint32_t ts);
|
||||
|
||||
void free_fragment(uint16_t sequence_number);
|
||||
|
||||
void scl(uint8_t* data, size_t data_len, size_t packet_size,
|
||||
std::vector<nal_info>& nals, bool& can_be_aggregated);
|
||||
|
||||
|
|
@ -172,6 +169,10 @@ namespace uvgrtp {
|
|||
|
||||
std::deque<uvgrtp::frame::rtp_frame*> queued_;
|
||||
std::unordered_map<uint32_t, h26x_info_t> frames_;
|
||||
|
||||
// Holds all possible fragments in sequence number order
|
||||
std::vector<uvgrtp::frame::rtp_frame*> fragments_;
|
||||
|
||||
std::unordered_set<uint32_t> dropped_;
|
||||
std::shared_ptr<uvgrtp::rtp> rtp_ctx_;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue