diff --git a/src/formats/h26x.cc b/src/formats/h26x.cc index 0557576..c43224a 100644 --- a/src/formats/h26x.cc +++ b/src/formats/h26x.cc @@ -84,6 +84,7 @@ uvgrtp::formats::h26x::h26x(std::shared_ptr socket, std::shared_ media(socket, rtp, flags), queued_(), frames_(), + fragments_(UINT16_MAX, nullptr), dropped_(), rtp_ctx_(rtp), last_garbage_collection_(uvgrtp::clock::hrc::now()) @@ -525,46 +526,26 @@ bool uvgrtp::formats::h26x::is_frame_late(uvgrtp::formats::h26x_info_t& hinfo, s uint32_t uvgrtp::formats::h26x::drop_frame(uint32_t ts) { - uint16_t s_seq = frames_.at(ts).s_seq; - uint16_t e_seq = frames_.at(ts).e_seq; - - if (s_seq == INVALID_SEQ) - { - s_seq = 0; - } - if (e_seq == INVALID_SEQ) - { - e_seq = 0; - } - - LOG_INFO("Dropping frame. Ts: %lu, Seq: %u - %u, expected/received: %li/%li", - ts, s_seq, e_seq, calculate_expected_fus(ts), frames_[ts].pkts_received); - + uint32_t total_cleaned = 0; if (frames_.find(ts) == frames_.end()) { LOG_ERROR("Tried to drop a non-existing frame"); - return 0; + return total_cleaned; } - uint32_t total_cleaned = 0; + uint16_t s_seq = frames_.at(ts).s_seq; + uint16_t e_seq = frames_.at(ts).e_seq; - // clean fragments - for (auto& fragment : frames_[ts].fragments) { - total_cleaned += fragment.second->payload_len + sizeof(uvgrtp::frame::rtp_frame); - (void)uvgrtp::frame::dealloc_frame(fragment.second); - } - frames_[ts].fragments.clear(); + LOG_INFO("Dropping frame. Ts: %lu, Seq: %u <-> %u, received/expected: %lli/%lli", + ts, s_seq, e_seq, frames_[ts].received_packet_seqs.size(), calculate_expected_fus(ts)); - // clean fragments that have no place - for (auto& temporary : frames_[ts].temporary) { - total_cleaned += temporary->payload_len + sizeof(uvgrtp::frame::rtp_frame);; - (void)uvgrtp::frame::dealloc_frame(temporary); + for (auto& fragment_seq : frames_[ts].received_packet_seqs) + { + total_cleaned += fragments_[fragment_seq]->payload_len + sizeof(uvgrtp::frame::rtp_frame); + free_fragment(fragment_seq); } - // lastly, remove the frame structure from map - frames_[ts].temporary.clear(); frames_.erase(ts); - dropped_.insert(ts); return total_cleaned; @@ -643,10 +624,10 @@ rtp_error_t uvgrtp::formats::h26x::packet_handler(int flags, uvgrtp::frame::rtp_ // We have received a fragment. Rest of the function deals with fragmented frames // Fragment timestamp, all fragments of the same frame have the same timestamp - uint32_t fragment_ts = frame->header.timestamp; + uint16_t fragment_ts = frame->header.timestamp; // Fragment sequence number, determines the order of the fragments within frame - uint32_t fragment_seq = frame->header.seq; + uint16_t fragment_seq = frame->header.seq; uvgrtp::formats::NAL_TYPE nal_type = get_nal_type(frame); // Intra, inter or some other type of frame @@ -655,81 +636,61 @@ rtp_error_t uvgrtp::formats::h26x::packet_handler(int flags, uvgrtp::frame::rtp_ // Make sure we haven't discarded the frame corresponding to the fragment timestamp before if (dropped_.find(fragment_ts) != dropped_.end()) { - LOG_WARN("packet belonging to a dropped frame was received!"); + LOG_WARN("Fragment belonging to a dropped frame was received! Timestamp: %lu", + fragment_ts); return RTP_GENERIC_ERROR; } initialize_new_fragmented_frame(fragment_ts); } + else if (frames_[fragment_ts].received_packet_seqs.find(fragment_seq) != + frames_[fragment_ts].received_packet_seqs.end()) { + + // we have already received this seq + LOG_WARN("Detected duplicate fragment, dropping! Seq: %u", fragment_seq); + (void)uvgrtp::frame::dealloc_frame(frame); // free fragment memory + *out = nullptr; + return RTP_GENERIC_ERROR; + } const uint8_t sizeof_fu_headers = (uint8_t)get_payload_header_size() + get_fu_header_size(); - frames_[fragment_ts].pkts_received += 1; + // keep track of fragments belonging to this frame in case we need to delete them + frames_[fragment_ts].received_packet_seqs.insert(fragment_seq); frames_[fragment_ts].total_size += (frame->payload_len - sizeof_fu_headers); - if (frag_type == uvgrtp::formats::FRAG_TYPE::FT_START) { - frames_[fragment_ts].s_seq = fragment_seq; // set the first sequence number of the frame - frames_[fragment_ts].fragments[fragment_seq] = frame; - - // move all fragments that arrived before the start fragment to correct places - for (auto& temp_fragment : frames_[fragment_ts].temporary) { - uint16_t temp_seq = temp_fragment->header.seq; - uint32_t seq = (fragment_seq > temp_seq) ? 0x10000 + temp_seq : temp_seq; - - frames_[fragment_ts].fragments[seq] = temp_fragment; - } - - frames_[fragment_ts].temporary.clear(); - } - else + if (fragments_[fragment_seq] != nullptr) { - if (frag_type == uvgrtp::formats::FRAG_TYPE::FT_END) - { - frames_[fragment_ts].e_seq = fragment_seq; - } + LOG_WARN("Found an existing fragment with same sequence number %u! Fragment ts: %lu, current ts: %lu", + fragment_seq, fragments_[fragment_seq]->header.timestamp, fragment_ts); - // find a place to store the fragment - if (frames_[fragment_ts].s_seq != INVALID_SEQ) { // has the start fragment arrived yet? + free_fragment(fragment_seq); + } - /* Out-of-order nature poses an interesting problem when reconstructing the frame: - * How to store the fragments such that we don't have to shuffle them around when - * frame reconstruction takes place? - * - * std::map is an option but the overflow of 16-bit sequence number counter makes - * that a little harder because if the first few fragments of a frame are near - * 16-bit maximum (65535), the rest of the fragments are going to have sequence - * numbers less than that and thus our frame reconstruction breaks. - * - * This can be solved by checking if current fragment's sequence is less than start fragment's sequence number - * (overflow has occurred) and correcting the current sequence by adding 0x10000 to its value so it appears - * in order with other fragments + // save the fragment for later reconstruction + fragments_[fragment_seq] = frame; - * Here the overflow has occurred, adjust the sequence number of current - * fragment so it appears in order with other fragments of the frame - * - * Note: if the frame is huge (~94 MB), this will not work */ - frames_[fragment_ts].fragments[((frames_[fragment_ts].s_seq > fragment_seq) ? 0x10000 + fragment_seq : fragment_seq)] = frame; - } - else { - // position for the fragment cannot be calculated so move the fragment to a temporary storage - frames_[fragment_ts].temporary.push_back(frame); - } + // if this is first or last, save it to help with reconstruction + if (frag_type == uvgrtp::formats::FRAG_TYPE::FT_START) { + frames_[fragment_ts].s_seq = fragment_seq; + frames_[fragment_ts].start_received = true; + } + else if (frag_type == uvgrtp::formats::FRAG_TYPE::FT_END) { + frames_[fragment_ts].e_seq = fragment_seq; + frames_[fragment_ts].end_received = true; } // have the first and last fragment arrived so we can possibly start reconstructing the frame? - if (frames_[fragment_ts].s_seq != INVALID_SEQ && frames_[fragment_ts].e_seq != INVALID_SEQ) { + if (frames_[fragment_ts].start_received && frames_[fragment_ts].end_received) { size_t received = calculate_expected_fus(fragment_ts); // have we received every fragment and can the frame can be reconstructed? - if (received == frames_[fragment_ts].pkts_received) { - - + if (received == frames_[fragment_ts].received_packet_seqs.size()) { // TODO: check here if previous dependencies have been sent forward - // Finally, reconstruct and return the completed frame - + // Reconstruction of frame from fragments size_t fptr = 0; // allocating the frame with start code ready saves a copy operation for the frame @@ -740,16 +701,24 @@ rtp_error_t uvgrtp::formats::h26x::packet_handler(int flags, uvgrtp::frame::rtp_ get_nal_header_from_fu_headers(fptr, frame->payload, complete->payload); // NAL header fptr += get_nal_header_size(); - // reconstruct rest of the frame data from fragments - for (auto& fragment : frames_.at(fragment_ts).fragments) { + uint16_t next_from_last = frames_.at(fragment_ts).e_seq + 1; + for (uint16_t i = frames_.at(fragment_ts).s_seq; i != next_from_last; ++i) + { + if (fragments_[i] == nullptr) + { + LOG_ERROR("Missing fragment in reconstruction. Seq range: %u - %u. Missing seq %u", + frames_.at(fragment_ts).s_seq, frames_.at(fragment_ts).e_seq, i); + return RTP_GENERIC_ERROR; + } + // copy everything expect fu headers (which repeat for every fu) std::memcpy( &complete->payload[fptr], - &fragment.second->payload[sizeof_fu_headers], - fragment.second->payload_len - sizeof_fu_headers + &fragments_[i]->payload[sizeof_fu_headers], + fragments_[i]->payload_len - sizeof_fu_headers ); - fptr += fragment.second->payload_len - sizeof_fu_headers; - (void)uvgrtp::frame::dealloc_frame(fragment.second); // free fragment memory + fptr += fragments_[i]->payload_len - sizeof_fu_headers; + free_fragment(i); } *out = complete; // save result to output @@ -813,32 +782,46 @@ void uvgrtp::formats::h26x::garbage_collect_lost_frames() void uvgrtp::formats::h26x::initialize_new_fragmented_frame(uint32_t ts) { - frames_[ts].s_seq = INVALID_SEQ; - frames_[ts].e_seq = INVALID_SEQ; + frames_[ts].s_seq = 0; + frames_[ts].start_received = false; + frames_[ts].e_seq = 0; + frames_[ts].end_received = false; frames_[ts].sframe_time = uvgrtp::clock::hrc::now(); frames_[ts].total_size = 0; - frames_[ts].pkts_received = 0; } size_t uvgrtp::formats::h26x::calculate_expected_fus(uint32_t ts) { - if (frames_[ts].s_seq == INVALID_SEQ || frames_[ts].e_seq == INVALID_SEQ) - { - return 0; - } + size_t expected = 0; size_t s_seq = frames_[ts].s_seq; size_t e_seq = frames_[ts].e_seq; - size_t expected = 0; - if (s_seq > e_seq) - expected = 0xffff - s_seq + e_seq + 2; - else - expected = e_seq - s_seq + 1; + if (frames_[ts].start_received && frames_[ts].end_received) + { + if (s_seq > e_seq) { + expected = (UINT16_MAX - s_seq) + e_seq + 2; + } + else { + expected = e_seq - s_seq + 1; + } + } return expected; } +void uvgrtp::formats::h26x::free_fragment(uint16_t sequence_number) +{ + if (fragments_[sequence_number] == nullptr) + { + LOG_ERROR("Tried to free an already freed fragment with seq: %u", sequence_number); + return; + } + + (void)uvgrtp::frame::dealloc_frame(fragments_[sequence_number]); // free fragment memory + fragments_[sequence_number] = nullptr; +} + void uvgrtp::formats::h26x::scl(uint8_t* data, size_t data_len, size_t packet_size, std::vector& nals, bool& can_be_aggregated) { diff --git a/src/formats/h26x.hh b/src/formats/h26x.hh index eeca7bf..b9c92f1 100644 --- a/src/formats/h26x.hh +++ b/src/formats/h26x.hh @@ -8,6 +8,7 @@ #include #include +#include namespace uvgrtp { @@ -16,7 +17,6 @@ namespace uvgrtp { namespace formats { - #define INVALID_SEQ 0x13371338 #define RTP_HDR_SIZE 12 enum class FRAG_TYPE { @@ -38,24 +38,20 @@ namespace uvgrtp { /* clock reading when the first fragment is received */ uvgrtp::clock::hrc::hrc_t sframe_time; + bool start_received = false; + bool end_received = false; + /* sequence number of the fragment with s-bit (start) */ - uint32_t s_seq = 0; + uint16_t s_seq = 0; /* sequence number of the fragment with e-bit (end) */ - uint32_t e_seq = 0; - - /* how many fragments have been received */ - size_t pkts_received = 0; + uint16_t e_seq = 0; /* total size of all fragments */ size_t total_size = 0; - /* map of frame's fragments, - * allows out-of-order insertion and loop-through in order */ - std::map fragments; - - /* storage for fragments that require relocation */ - std::vector temporary; + // needed for cleaning fragments in case the frame is dropped + std::set received_packet_seqs; } h26x_info_t; struct nal_info @@ -96,8 +92,7 @@ namespace uvgrtp { /* Packet handler for RTP frames that transport HEVC bitstream * - * If "frame" is not a fragmentation unit, packet handler checks - * if "frame" is SPS/VPS/PPS packet and if so, returns the packet + * If "frame" is not a fragmentation unit, packet handler returns the packet * to user immediately. * * If "frame" is a fragmentation unit, packet handler checks if @@ -162,6 +157,8 @@ namespace uvgrtp { inline size_t calculate_expected_fus(uint32_t ts); inline void initialize_new_fragmented_frame(uint32_t ts); + void free_fragment(uint16_t sequence_number); + void scl(uint8_t* data, size_t data_len, size_t packet_size, std::vector& nals, bool& can_be_aggregated); @@ -172,6 +169,10 @@ namespace uvgrtp { std::deque queued_; std::unordered_map frames_; + + // Holds all possible fragments in sequence number order + std::vector fragments_; + std::unordered_set dropped_; std::shared_ptr rtp_ctx_;