diff --git a/src/formats/h26x.cc b/src/formats/h26x.cc index ad5a278..a2add7d 100644 --- a/src/formats/h26x.cc +++ b/src/formats/h26x.cc @@ -633,6 +633,10 @@ bool uvgrtp::formats::h26x::is_duplicate_frame(uint32_t timestamp, uint16_t seq_ } return false; } +rtp_error_t uvgrtp::formats::h26x::new_packet_handler(int rce_flags, uint8_t* read_ptr, size_t size, uvgrtp::frame::rtp_frame** out) +{ + return packet_handler(rce_flags, out); +} rtp_error_t uvgrtp::formats::h26x::packet_handler(int rce_flags, uvgrtp::frame::rtp_frame** out) { diff --git a/src/formats/h26x.hh b/src/formats/h26x.hh index cdd3616..0821518 100644 --- a/src/formats/h26x.hh +++ b/src/formats/h26x.hh @@ -116,6 +116,7 @@ namespace uvgrtp { * Return RTP_PKT_MODIFIED if the packet was modified but should be forwarded to other handlers * Return RTP_GENERIC_ERROR if the packet was corrupted in some way */ rtp_error_t packet_handler(int rce_flags, frame::rtp_frame** frame); + rtp_error_t new_packet_handler(int rce_flags, uint8_t* read_ptr, size_t size, uvgrtp::frame::rtp_frame** out); protected: diff --git a/src/media_stream.cc b/src/media_stream.cc index df0a444..db26064 100644 --- a/src/media_stream.cc +++ b/src/media_stream.cc @@ -206,9 +206,9 @@ rtp_error_t uvgrtp::media_stream::create_media(rtp_format_t fmt) case RTP_FORMAT_H264: { uvgrtp::formats::h264* format_264 = new uvgrtp::formats::h264(socket_, rtp_, rce_flags_); - reception_flow_->install_aux_handler_cpp( - rtp_handler_key_, - std::bind(&uvgrtp::formats::h264::packet_handler, format_264, std::placeholders::_1, std::placeholders::_2), + reception_flow_->new_install_handler( + 5, remote_ssrc_, + std::bind(&uvgrtp::formats::h264::new_packet_handler, format_264, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), std::bind(&uvgrtp::formats::h264::frame_getter, format_264, std::placeholders::_1)); media_.reset(format_264); break; @@ -216,9 +216,9 @@ rtp_error_t uvgrtp::media_stream::create_media(rtp_format_t fmt) case RTP_FORMAT_H265: { uvgrtp::formats::h265* format_265 = new uvgrtp::formats::h265(socket_, rtp_, rce_flags_); - reception_flow_->install_aux_handler_cpp( - rtp_handler_key_, - std::bind(&uvgrtp::formats::h265::packet_handler, format_265, std::placeholders::_1, std::placeholders::_2), + reception_flow_->new_install_handler( + 5, remote_ssrc_, + std::bind(&uvgrtp::formats::h265::new_packet_handler, format_265, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), std::bind(&uvgrtp::formats::h265::frame_getter, format_265, std::placeholders::_1)); media_.reset(format_265); break; @@ -226,9 +226,9 @@ rtp_error_t uvgrtp::media_stream::create_media(rtp_format_t fmt) case RTP_FORMAT_H266: { uvgrtp::formats::h266* format_266 = new uvgrtp::formats::h266(socket_, rtp_, rce_flags_); - reception_flow_->install_aux_handler_cpp( - rtp_handler_key_, - std::bind(&uvgrtp::formats::h266::packet_handler, format_266, std::placeholders::_1, std::placeholders::_2), + reception_flow_->new_install_handler( + 5, remote_ssrc_, + std::bind(&uvgrtp::formats::h266::new_packet_handler, format_266, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), std::bind(&uvgrtp::formats::h266::frame_getter, format_266, std::placeholders::_1)); media_.reset(format_266); break; @@ -259,6 +259,7 @@ rtp_error_t uvgrtp::media_stream::create_media(rtp_format_t fmt) case RTP_FORMAT_VDVI: { media_ = std::unique_ptr(new uvgrtp::formats::media(socket_, rtp_, rce_flags_)); + // TODO _______________________________ reception_flow_->install_aux_handler( rtp_handler_key_, media_->get_media_frame_info(), @@ -322,12 +323,12 @@ rtp_error_t uvgrtp::media_stream::init() reception_flow_->map_rtcp_to_rec(remote_ssrc_, rtcp_); rtcp_->set_socket(socket_); } - /* + reception_flow_->new_install_handler( 1, remote_ssrc_, std::bind(&uvgrtp::rtp::new_packet_handler, rtp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), - nullptr); + nullptr);/* reception_flow_->new_install_handler( 2, remote_ssrc_, diff --git a/src/reception_flow.cc b/src/reception_flow.cc index ae2cf22..6f24f4d 100644 --- a/src/reception_flow.cc +++ b/src/reception_flow.cc @@ -37,10 +37,7 @@ uvgrtp::reception_flow::reception_flow(bool ipv6) : receiver_(nullptr), //user_hook_arg_(nullptr), //user_hook_(nullptr), - rtp_handlers_({}), - rtcp_handlers_({}), - zrtp_handlers_({}), - srtp_handlers_({}), + NEW_packet_handlers_({}), ring_buffer_(), ring_read_index_(-1), // invalid first index that will increase to a valid one last_ring_write_index_(-1), @@ -329,22 +326,25 @@ rtp_error_t uvgrtp::reception_flow::new_install_handler(int type, std::shared_pt std::function handler, std::function getter) { - handler_new pair = {handler, getter}; switch (type) { case 1: { - rtp_handlers_[remote_ssrc] = { handler, getter }; + NEW_packet_handlers_[remote_ssrc].handler_rtp = handler; break; } case 2: { - rtcp_handlers_[remote_ssrc] = { handler, getter }; + NEW_packet_handlers_[remote_ssrc].handler_rtcp = handler; break; } case 3: { - zrtp_handlers_[remote_ssrc] = { handler, getter }; + NEW_packet_handlers_[remote_ssrc].handler_zrtp = handler; break; } case 4: { - srtp_handlers_[remote_ssrc] = { handler, getter }; + NEW_packet_handlers_[remote_ssrc].handler_srtp = handler; + break; + } + case 5: { + NEW_packet_handlers_[remote_ssrc].handler_media = handler; break; } default: { @@ -352,6 +352,7 @@ rtp_error_t uvgrtp::reception_flow::new_install_handler(int type, std::shared_pt break; } } + NEW_packet_handlers_[remote_ssrc].getter = getter; return RTP_OK; } @@ -656,7 +657,9 @@ void uvgrtp::reception_flow::process_packet(int rce_flags) // zrtp headerit network byte orderissa, 32 bittiä pitkiä. rtp myös // process the ring buffer location through all the handlers - for (auto& handler : packet_handlers_) { + + for (auto& p : NEW_packet_handlers_) { + uvgrtp::frame::rtp_frame* frame = nullptr; //sockaddr_in from = ring_buffer_[ring_read_index_].from; @@ -666,7 +669,7 @@ void uvgrtp::reception_flow::process_packet(int rce_flags) /* -------------------- SSRC checks -------------------- */ uint32_t packet_ssrc = ntohl(*(uint32_t*)&ptr[8]); - uint32_t current_ssrc = handler_mapping_[handler.first].get()->load(); + uint32_t current_ssrc = p.first.get()->load(); bool found = false; if (current_ssrc == packet_ssrc) { // Socket multiplexing, this handler is the correct one @@ -681,6 +684,8 @@ void uvgrtp::reception_flow::process_packet(int rce_flags) // User pkt???? continue; } + // Handler set is found + handler_new* handlers = &p.second; /* -------------------- Protocol checks -------------------- */ /* Checks in the following order: * 1. If RCE_RTCP_MUX && packet type is 200 - 204 -> RTCP packet (or SRTCP) @@ -688,16 +693,14 @@ void uvgrtp::reception_flow::process_packet(int rce_flags) * 3. Version is 2 -> RTP packet (or SRTP) * 4. Version is 00 -> Keep-Alive/Holepuncher */ rtp_error_t retval; + size_t size = (size_t)ring_buffer_[ring_read_index_].read; + /* -------------------- RTCP check -------------------- */ if (rce_flags & RCE_RTCP_MUX) { uint8_t pt = (uint8_t)ptr[1]; //UVG_LOG_DEBUG("Received frame with pt %u", pt); if (pt >= 200 && pt <= 204) { - for (auto& p : rtcp_handlers_) { - if (p.first.get()->load() == current_ssrc) { - retval = p.second.handler(rce_flags, &ptr[0], (size_t)ring_buffer_[ring_read_index_].read, &frame); - } - } + retval = handlers->handler_rtcp(rce_flags, &ptr[0], size, &frame); break; } } @@ -714,15 +717,20 @@ void uvgrtp::reception_flow::process_packet(int rce_flags) /* -------------------- RTP check ---------------------------------- */ else if (version == 0x2) { - for (auto& p : rtp_handlers_) { - if (p.first.get()->load() == current_ssrc) { - retval = p.second.handler(rce_flags, &ptr[0], (size_t)ring_buffer_[ring_read_index_].read, &frame); - if (retval == RTP_PKT_MODIFIED) { - // next handler??? - } - else { - //error?? + retval = handlers->handler_rtp(rce_flags, &ptr[0], size, &frame); + + if (retval == RTP_PKT_MODIFIED) { + retval = handlers->handler_media(rce_flags, &ptr[0], size, &frame); + if (retval == RTP_PKT_READY) { + return_frame(frame); + break; + } + else if (retval == RTP_MULTIPLE_PKTS_READY) { + UVG_LOG_INFO("TODO:is this correct???"); + while (handlers->getter(&frame) == RTP_PKT_READY) { + return_frame(frame); } + break; } } break; @@ -734,52 +742,8 @@ void uvgrtp::reception_flow::process_packet(int rce_flags) UVG_LOG_INFO("Holepuncher packet"); //break; } - - // Here we don't lock ring mutex because the chaging is only done above. - // NOTE: If there is a need for multiple processing threads, the read should be guarded - switch ((ret = (*handler.second.primary)(ring_buffer_[ring_read_index_].read, - ring_buffer_[ring_read_index_].data, rce_flags, &frame))) { - case RTP_OK: - { - // packet was handled successfully - break; - } - case RTP_PKT_NOT_HANDLED: - { - /* ----------- User packets not yet supported ----------- - std::string from_str; - if (ipv6_) { - from_str = uvgrtp::socket::sockaddr_ip6_to_string(from6); - } - else { - from_str = uvgrtp::socket::sockaddr_to_string(from); - } - UVG_LOG_DEBUG("User packet from ip: %s", from_str.c_str()); - return_user_pkt(ptr); - */ - - // packet was not handled by this primary handlers, proceed to the next one - //continue; - /* packet was handled by the primary handler - * and should be dispatched to the auxiliary handler(s) - } - case RTP_PKT_MODIFIED: - { - call_aux_handlers(handler.first, rce_flags, &frame); - break; - } - case RTP_GENERIC_ERROR: - { - UVG_LOG_DEBUG("Error in handling of received packet!"); - break; - } - default: - { - UVG_LOG_ERROR("Unknown error code from packet handler: %d", ret); - break; - } - }*/ - } + + } // to make sure we don't process this packet again ring_buffer_[ring_read_index_].read = 0; diff --git a/src/reception_flow.hh b/src/reception_flow.hh index 379f081..bbe2eb3 100644 --- a/src/reception_flow.hh +++ b/src/reception_flow.hh @@ -64,7 +64,11 @@ namespace uvgrtp { //typedef rtp_error_t(*packet_handler_new)(void*, int, uint8_t*, size_t); struct handler_new { - std::function handler; + std::function handler_rtp; + std::function handler_rtcp; + std::function handler_zrtp; + std::function handler_srtp; + std::function handler_media; std::function getter; }; @@ -130,6 +134,7 @@ namespace uvgrtp { 2 rtcp 3 zrtp 4 srtp + 5 media getter can be nullptr if there is no getter (for media handlers mostly) */ rtp_error_t new_install_handler(int type, std::shared_ptr> remote_ssrc, @@ -282,12 +287,8 @@ namespace uvgrtp { //void* user_hook_arg_; //void (*user_hook_)(void* arg, uint8_t* payload); - // Map different types of handlers by remote SSRCs - std::map>, handler_new> rtp_handlers_; - std::map>, handler_new> rtcp_handlers_; - std::map>, handler_new> zrtp_handlers_; - - std::map>, handler_new> srtp_handlers_; + // Map different types of handlers by remote SSRC + std::map>, handler_new> NEW_packet_handlers_; std::vector ring_buffer_; std::mutex ring_mutex_; diff --git a/src/rtp.cc b/src/rtp.cc index 4aaf3fe..7bc2a70 100644 --- a/src/rtp.cc +++ b/src/rtp.cc @@ -235,93 +235,7 @@ uint32_t uvgrtp::rtp::get_rtp_ts() const { rtp_error_t uvgrtp::rtp::new_packet_handler(int rce_flags, uint8_t* read_ptr, size_t size, frame::rtp_frame** out) { - /* not an RTP frame */ - if (size < 12) - { - UVG_LOG_DEBUG("Received RTP packet is too small to contain header"); - return RTP_PKT_NOT_HANDLED; - } - uint8_t* ptr = read_ptr; - - /* invalid version */ - if (((ptr[0] >> 6) & 0x03) != 0x2) - { - UVG_LOG_DEBUG("Received RTP packet with invalid version"); - return RTP_PKT_NOT_HANDLED; - } - *out = uvgrtp::frame::alloc_rtp_frame(); - - if (*out == nullptr) - { - return RTP_GENERIC_ERROR; - } - (*out)->header.version = (ptr[0] >> 6) & 0x03; - (*out)->header.padding = (ptr[0] >> 5) & 0x01; - (*out)->header.ext = (ptr[0] >> 4) & 0x01; - (*out)->header.cc = (ptr[0] >> 0) & 0x0f; - (*out)->header.marker = (ptr[1] & 0x80) ? 1 : 0; - (*out)->header.payload = (ptr[1] & 0x7f); - (*out)->header.seq = ntohs(*(uint16_t*)&ptr[2]); - (*out)->header.timestamp = ntohl(*(uint32_t*)&ptr[4]); - (*out)->header.ssrc = ntohl(*(uint32_t*)&ptr[8]); - - (*out)->payload_len = (size_t)size - sizeof(uvgrtp::frame::rtp_header); - - /* Skip the generic RTP header - * There may be 0..N CSRC entries after the header, so check those */ - ptr += sizeof(uvgrtp::frame::rtp_header); - - if ((*out)->header.cc > 0) { - UVG_LOG_DEBUG("frame contains csrc entries"); - - if ((ssize_t)((*out)->payload_len - (*out)->header.cc * sizeof(uint32_t)) < 0) { - UVG_LOG_DEBUG("Invalid frame length, %d CSRC entries, total length %zu", (*out)->header.cc, (*out)->payload_len); - (void)uvgrtp::frame::dealloc_frame(*out); - return RTP_GENERIC_ERROR; - } - UVG_LOG_DEBUG("Allocating %u CSRC entries", (*out)->header.cc); - - (*out)->csrc = new uint32_t[(*out)->header.cc]; - (*out)->payload_len -= (*out)->header.cc * sizeof(uint32_t); - - for (size_t i = 0; i < (*out)->header.cc; ++i) { - (*out)->csrc[i] = *(uint32_t*)ptr; - ptr += sizeof(uint32_t); - } - } - - if ((*out)->header.ext) { - UVG_LOG_DEBUG("Frame contains extension information"); - (*out)->ext = new uvgrtp::frame::ext_header; - - (*out)->ext->type = ntohs(*(uint16_t*)&ptr[0]); - (*out)->ext->len = ntohs(*(uint16_t*)&ptr[2]) * sizeof(uint32_t); - (*out)->ext->data = (uint8_t*)memdup(ptr + 2 * sizeof(uint16_t), (*out)->ext->len); - (*out)->payload_len -= 2 * sizeof(uint16_t) + (*out)->ext->len; - ptr += 2 * sizeof(uint16_t) + (*out)->ext->len; - } - - /* If padding is set to 1, the last byte of the payload indicates - * how many padding bytes was used. Make sure the padding length is - * valid and subtract the amount of padding bytes from payload length */ - if ((*out)->header.padding) { - UVG_LOG_DEBUG("Frame contains padding"); - uint8_t padding_len = (*out)->payload[(*out)->payload_len - 1]; - - if (!padding_len || (*out)->payload_len <= padding_len) { - uvgrtp::frame::dealloc_frame(*out); - return RTP_GENERIC_ERROR; - } - - (*out)->payload_len -= padding_len; - (*out)->padding_len = padding_len; - } - - (*out)->payload = (uint8_t*)memdup(ptr, (*out)->payload_len); - (*out)->dgram = read_ptr; - (*out)->dgram_size = size; - - return RTP_PKT_MODIFIED; + return packet_handler(size, (void*)read_ptr, rce_flags, out); } rtp_error_t uvgrtp::rtp::packet_handler(ssize_t size, void *packet, int rce_flags, uvgrtp::frame::rtp_frame **out)