multiplex: Handle RTP packets in reception_flow

This commit is contained in:
Heikki Tampio 2023-06-15 09:36:28 +03:00
parent 533285b9ba
commit 4ff8d1998e
6 changed files with 60 additions and 175 deletions

View File

@ -633,6 +633,10 @@ bool uvgrtp::formats::h26x::is_duplicate_frame(uint32_t timestamp, uint16_t seq_
}
return false;
}
rtp_error_t uvgrtp::formats::h26x::new_packet_handler(int rce_flags, uint8_t* read_ptr, size_t size, uvgrtp::frame::rtp_frame** out)
{
return packet_handler(rce_flags, out);
}
rtp_error_t uvgrtp::formats::h26x::packet_handler(int rce_flags, uvgrtp::frame::rtp_frame** out)
{

View File

@ -116,6 +116,7 @@ namespace uvgrtp {
* Return RTP_PKT_MODIFIED if the packet was modified but should be forwarded to other handlers
* Return RTP_GENERIC_ERROR if the packet was corrupted in some way */
rtp_error_t packet_handler(int rce_flags, frame::rtp_frame** frame);
rtp_error_t new_packet_handler(int rce_flags, uint8_t* read_ptr, size_t size, uvgrtp::frame::rtp_frame** out);
protected:

View File

@ -206,9 +206,9 @@ rtp_error_t uvgrtp::media_stream::create_media(rtp_format_t fmt)
case RTP_FORMAT_H264:
{
uvgrtp::formats::h264* format_264 = new uvgrtp::formats::h264(socket_, rtp_, rce_flags_);
reception_flow_->install_aux_handler_cpp(
rtp_handler_key_,
std::bind(&uvgrtp::formats::h264::packet_handler, format_264, std::placeholders::_1, std::placeholders::_2),
reception_flow_->new_install_handler(
5, remote_ssrc_,
std::bind(&uvgrtp::formats::h264::new_packet_handler, format_264, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
std::bind(&uvgrtp::formats::h264::frame_getter, format_264, std::placeholders::_1));
media_.reset(format_264);
break;
@ -216,9 +216,9 @@ rtp_error_t uvgrtp::media_stream::create_media(rtp_format_t fmt)
case RTP_FORMAT_H265:
{
uvgrtp::formats::h265* format_265 = new uvgrtp::formats::h265(socket_, rtp_, rce_flags_);
reception_flow_->install_aux_handler_cpp(
rtp_handler_key_,
std::bind(&uvgrtp::formats::h265::packet_handler, format_265, std::placeholders::_1, std::placeholders::_2),
reception_flow_->new_install_handler(
5, remote_ssrc_,
std::bind(&uvgrtp::formats::h265::new_packet_handler, format_265, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
std::bind(&uvgrtp::formats::h265::frame_getter, format_265, std::placeholders::_1));
media_.reset(format_265);
break;
@ -226,9 +226,9 @@ rtp_error_t uvgrtp::media_stream::create_media(rtp_format_t fmt)
case RTP_FORMAT_H266:
{
uvgrtp::formats::h266* format_266 = new uvgrtp::formats::h266(socket_, rtp_, rce_flags_);
reception_flow_->install_aux_handler_cpp(
rtp_handler_key_,
std::bind(&uvgrtp::formats::h266::packet_handler, format_266, std::placeholders::_1, std::placeholders::_2),
reception_flow_->new_install_handler(
5, remote_ssrc_,
std::bind(&uvgrtp::formats::h266::new_packet_handler, format_266, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
std::bind(&uvgrtp::formats::h266::frame_getter, format_266, std::placeholders::_1));
media_.reset(format_266);
break;
@ -259,6 +259,7 @@ rtp_error_t uvgrtp::media_stream::create_media(rtp_format_t fmt)
case RTP_FORMAT_VDVI:
{
media_ = std::unique_ptr<uvgrtp::formats::media>(new uvgrtp::formats::media(socket_, rtp_, rce_flags_));
// TODO _______________________________
reception_flow_->install_aux_handler(
rtp_handler_key_,
media_->get_media_frame_info(),
@ -322,12 +323,12 @@ rtp_error_t uvgrtp::media_stream::init()
reception_flow_->map_rtcp_to_rec(remote_ssrc_, rtcp_);
rtcp_->set_socket(socket_);
}
/*
reception_flow_->new_install_handler(
1,
remote_ssrc_,
std::bind(&uvgrtp::rtp::new_packet_handler, rtp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
nullptr);
nullptr);/*
reception_flow_->new_install_handler(
2,
remote_ssrc_,

View File

@ -37,10 +37,7 @@ uvgrtp::reception_flow::reception_flow(bool ipv6) :
receiver_(nullptr),
//user_hook_arg_(nullptr),
//user_hook_(nullptr),
rtp_handlers_({}),
rtcp_handlers_({}),
zrtp_handlers_({}),
srtp_handlers_({}),
NEW_packet_handlers_({}),
ring_buffer_(),
ring_read_index_(-1), // invalid first index that will increase to a valid one
last_ring_write_index_(-1),
@ -329,22 +326,25 @@ rtp_error_t uvgrtp::reception_flow::new_install_handler(int type, std::shared_pt
std::function<rtp_error_t(int, uint8_t*, size_t, frame::rtp_frame** out)> handler,
std::function<rtp_error_t(uvgrtp::frame::rtp_frame**)> getter)
{
handler_new pair = {handler, getter};
switch (type) {
case 1: {
rtp_handlers_[remote_ssrc] = { handler, getter };
NEW_packet_handlers_[remote_ssrc].handler_rtp = handler;
break;
}
case 2: {
rtcp_handlers_[remote_ssrc] = { handler, getter };
NEW_packet_handlers_[remote_ssrc].handler_rtcp = handler;
break;
}
case 3: {
zrtp_handlers_[remote_ssrc] = { handler, getter };
NEW_packet_handlers_[remote_ssrc].handler_zrtp = handler;
break;
}
case 4: {
srtp_handlers_[remote_ssrc] = { handler, getter };
NEW_packet_handlers_[remote_ssrc].handler_srtp = handler;
break;
}
case 5: {
NEW_packet_handlers_[remote_ssrc].handler_media = handler;
break;
}
default: {
@ -352,6 +352,7 @@ rtp_error_t uvgrtp::reception_flow::new_install_handler(int type, std::shared_pt
break;
}
}
NEW_packet_handlers_[remote_ssrc].getter = getter;
return RTP_OK;
}
@ -656,7 +657,9 @@ void uvgrtp::reception_flow::process_packet(int rce_flags)
// zrtp headerit network byte orderissa, 32 bittiä pitkiä. rtp myös
// process the ring buffer location through all the handlers
for (auto& handler : packet_handlers_) {
for (auto& p : NEW_packet_handlers_) {
uvgrtp::frame::rtp_frame* frame = nullptr;
//sockaddr_in from = ring_buffer_[ring_read_index_].from;
@ -666,7 +669,7 @@ void uvgrtp::reception_flow::process_packet(int rce_flags)
/* -------------------- SSRC checks -------------------- */
uint32_t packet_ssrc = ntohl(*(uint32_t*)&ptr[8]);
uint32_t current_ssrc = handler_mapping_[handler.first].get()->load();
uint32_t current_ssrc = p.first.get()->load();
bool found = false;
if (current_ssrc == packet_ssrc) {
// Socket multiplexing, this handler is the correct one
@ -681,6 +684,8 @@ void uvgrtp::reception_flow::process_packet(int rce_flags)
// User pkt????
continue;
}
// Handler set is found
handler_new* handlers = &p.second;
/* -------------------- Protocol checks -------------------- */
/* Checks in the following order:
* 1. If RCE_RTCP_MUX && packet type is 200 - 204 -> RTCP packet (or SRTCP)
@ -688,16 +693,14 @@ void uvgrtp::reception_flow::process_packet(int rce_flags)
* 3. Version is 2 -> RTP packet (or SRTP)
* 4. Version is 00 -> Keep-Alive/Holepuncher */
rtp_error_t retval;
size_t size = (size_t)ring_buffer_[ring_read_index_].read;
/* -------------------- RTCP check -------------------- */
if (rce_flags & RCE_RTCP_MUX) {
uint8_t pt = (uint8_t)ptr[1];
//UVG_LOG_DEBUG("Received frame with pt %u", pt);
if (pt >= 200 && pt <= 204) {
for (auto& p : rtcp_handlers_) {
if (p.first.get()->load() == current_ssrc) {
retval = p.second.handler(rce_flags, &ptr[0], (size_t)ring_buffer_[ring_read_index_].read, &frame);
}
}
retval = handlers->handler_rtcp(rce_flags, &ptr[0], size, &frame);
break;
}
}
@ -714,15 +717,20 @@ void uvgrtp::reception_flow::process_packet(int rce_flags)
/* -------------------- RTP check ---------------------------------- */
else if (version == 0x2) {
for (auto& p : rtp_handlers_) {
if (p.first.get()->load() == current_ssrc) {
retval = p.second.handler(rce_flags, &ptr[0], (size_t)ring_buffer_[ring_read_index_].read, &frame);
if (retval == RTP_PKT_MODIFIED) {
// next handler???
}
else {
//error??
retval = handlers->handler_rtp(rce_flags, &ptr[0], size, &frame);
if (retval == RTP_PKT_MODIFIED) {
retval = handlers->handler_media(rce_flags, &ptr[0], size, &frame);
if (retval == RTP_PKT_READY) {
return_frame(frame);
break;
}
else if (retval == RTP_MULTIPLE_PKTS_READY) {
UVG_LOG_INFO("TODO:is this correct???");
while (handlers->getter(&frame) == RTP_PKT_READY) {
return_frame(frame);
}
break;
}
}
break;
@ -734,52 +742,8 @@ void uvgrtp::reception_flow::process_packet(int rce_flags)
UVG_LOG_INFO("Holepuncher packet");
//break;
}
// Here we don't lock ring mutex because the chaging is only done above.
// NOTE: If there is a need for multiple processing threads, the read should be guarded
switch ((ret = (*handler.second.primary)(ring_buffer_[ring_read_index_].read,
ring_buffer_[ring_read_index_].data, rce_flags, &frame))) {
case RTP_OK:
{
// packet was handled successfully
break;
}
case RTP_PKT_NOT_HANDLED:
{
/* ----------- User packets not yet supported -----------
std::string from_str;
if (ipv6_) {
from_str = uvgrtp::socket::sockaddr_ip6_to_string(from6);
}
else {
from_str = uvgrtp::socket::sockaddr_to_string(from);
}
UVG_LOG_DEBUG("User packet from ip: %s", from_str.c_str());
return_user_pkt(ptr);
*/
// packet was not handled by this primary handlers, proceed to the next one
//continue;
/* packet was handled by the primary handler
* and should be dispatched to the auxiliary handler(s)
}
case RTP_PKT_MODIFIED:
{
call_aux_handlers(handler.first, rce_flags, &frame);
break;
}
case RTP_GENERIC_ERROR:
{
UVG_LOG_DEBUG("Error in handling of received packet!");
break;
}
default:
{
UVG_LOG_ERROR("Unknown error code from packet handler: %d", ret);
break;
}
}*/
}
}
// to make sure we don't process this packet again
ring_buffer_[ring_read_index_].read = 0;

View File

@ -64,7 +64,11 @@ namespace uvgrtp {
//typedef rtp_error_t(*packet_handler_new)(void*, int, uint8_t*, size_t);
struct handler_new {
std::function<rtp_error_t(int, uint8_t*, size_t, frame::rtp_frame** out)> handler;
std::function<rtp_error_t(int, uint8_t*, size_t, frame::rtp_frame** out)> handler_rtp;
std::function<rtp_error_t(int, uint8_t*, size_t, frame::rtp_frame** out)> handler_rtcp;
std::function<rtp_error_t(int, uint8_t*, size_t, frame::rtp_frame** out)> handler_zrtp;
std::function<rtp_error_t(int, uint8_t*, size_t, frame::rtp_frame** out)> handler_srtp;
std::function<rtp_error_t(int, uint8_t*, size_t, frame::rtp_frame** out)> handler_media;
std::function<rtp_error_t(uvgrtp::frame::rtp_frame ** out)> getter;
};
@ -130,6 +134,7 @@ namespace uvgrtp {
2 rtcp
3 zrtp
4 srtp
5 media
getter can be nullptr if there is no getter (for media handlers mostly)
*/
rtp_error_t new_install_handler(int type, std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc,
@ -282,12 +287,8 @@ namespace uvgrtp {
//void* user_hook_arg_;
//void (*user_hook_)(void* arg, uint8_t* payload);
// Map different types of handlers by remote SSRCs
std::map<std::shared_ptr<std::atomic<std::uint32_t>>, handler_new> rtp_handlers_;
std::map<std::shared_ptr<std::atomic<std::uint32_t>>, handler_new> rtcp_handlers_;
std::map<std::shared_ptr<std::atomic<std::uint32_t>>, handler_new> zrtp_handlers_;
std::map<std::shared_ptr<std::atomic<std::uint32_t>>, handler_new> srtp_handlers_;
// Map different types of handlers by remote SSRC
std::map<std::shared_ptr<std::atomic<std::uint32_t>>, handler_new> NEW_packet_handlers_;
std::vector<Buffer> ring_buffer_;
std::mutex ring_mutex_;

View File

@ -235,93 +235,7 @@ uint32_t uvgrtp::rtp::get_rtp_ts() const {
rtp_error_t uvgrtp::rtp::new_packet_handler(int rce_flags, uint8_t* read_ptr, size_t size, frame::rtp_frame** out)
{
/* not an RTP frame */
if (size < 12)
{
UVG_LOG_DEBUG("Received RTP packet is too small to contain header");
return RTP_PKT_NOT_HANDLED;
}
uint8_t* ptr = read_ptr;
/* invalid version */
if (((ptr[0] >> 6) & 0x03) != 0x2)
{
UVG_LOG_DEBUG("Received RTP packet with invalid version");
return RTP_PKT_NOT_HANDLED;
}
*out = uvgrtp::frame::alloc_rtp_frame();
if (*out == nullptr)
{
return RTP_GENERIC_ERROR;
}
(*out)->header.version = (ptr[0] >> 6) & 0x03;
(*out)->header.padding = (ptr[0] >> 5) & 0x01;
(*out)->header.ext = (ptr[0] >> 4) & 0x01;
(*out)->header.cc = (ptr[0] >> 0) & 0x0f;
(*out)->header.marker = (ptr[1] & 0x80) ? 1 : 0;
(*out)->header.payload = (ptr[1] & 0x7f);
(*out)->header.seq = ntohs(*(uint16_t*)&ptr[2]);
(*out)->header.timestamp = ntohl(*(uint32_t*)&ptr[4]);
(*out)->header.ssrc = ntohl(*(uint32_t*)&ptr[8]);
(*out)->payload_len = (size_t)size - sizeof(uvgrtp::frame::rtp_header);
/* Skip the generic RTP header
* There may be 0..N CSRC entries after the header, so check those */
ptr += sizeof(uvgrtp::frame::rtp_header);
if ((*out)->header.cc > 0) {
UVG_LOG_DEBUG("frame contains csrc entries");
if ((ssize_t)((*out)->payload_len - (*out)->header.cc * sizeof(uint32_t)) < 0) {
UVG_LOG_DEBUG("Invalid frame length, %d CSRC entries, total length %zu", (*out)->header.cc, (*out)->payload_len);
(void)uvgrtp::frame::dealloc_frame(*out);
return RTP_GENERIC_ERROR;
}
UVG_LOG_DEBUG("Allocating %u CSRC entries", (*out)->header.cc);
(*out)->csrc = new uint32_t[(*out)->header.cc];
(*out)->payload_len -= (*out)->header.cc * sizeof(uint32_t);
for (size_t i = 0; i < (*out)->header.cc; ++i) {
(*out)->csrc[i] = *(uint32_t*)ptr;
ptr += sizeof(uint32_t);
}
}
if ((*out)->header.ext) {
UVG_LOG_DEBUG("Frame contains extension information");
(*out)->ext = new uvgrtp::frame::ext_header;
(*out)->ext->type = ntohs(*(uint16_t*)&ptr[0]);
(*out)->ext->len = ntohs(*(uint16_t*)&ptr[2]) * sizeof(uint32_t);
(*out)->ext->data = (uint8_t*)memdup(ptr + 2 * sizeof(uint16_t), (*out)->ext->len);
(*out)->payload_len -= 2 * sizeof(uint16_t) + (*out)->ext->len;
ptr += 2 * sizeof(uint16_t) + (*out)->ext->len;
}
/* If padding is set to 1, the last byte of the payload indicates
* how many padding bytes was used. Make sure the padding length is
* valid and subtract the amount of padding bytes from payload length */
if ((*out)->header.padding) {
UVG_LOG_DEBUG("Frame contains padding");
uint8_t padding_len = (*out)->payload[(*out)->payload_len - 1];
if (!padding_len || (*out)->payload_len <= padding_len) {
uvgrtp::frame::dealloc_frame(*out);
return RTP_GENERIC_ERROR;
}
(*out)->payload_len -= padding_len;
(*out)->padding_len = padding_len;
}
(*out)->payload = (uint8_t*)memdup(ptr, (*out)->payload_len);
(*out)->dgram = read_ptr;
(*out)->dgram_size = size;
return RTP_PKT_MODIFIED;
return packet_handler(size, (void*)read_ptr, rce_flags, out);
}
rtp_error_t uvgrtp::rtp::packet_handler(ssize_t size, void *packet, int rce_flags, uvgrtp::frame::rtp_frame **out)