Separate packet handlers into primary and auxiliary

Because most packet handlers do not require the raw UDP datagram
received through the socket, it makes little sense to relay those
parameters to them.

Additionally, there's a clear distinction between crafting
an RTP frame and operating on it so having one handler type
for both operations is not the best design choice.

Thus the packet handlers are divided into primary and auxiliary
handlers. Primary handlers are responsible for creating a packet
that the auxiliary handlers can operate on and auxiliary handlers
are responsible for doing all other operations on the packet such as
gathering sessions statistic information or decrypting the packet.
This commit is contained in:
Aaro Altonen 2020-08-06 07:32:28 +03:00
parent 9a59dd0cc8
commit 39e9b07726
10 changed files with 115 additions and 40 deletions

View File

@ -33,7 +33,7 @@ namespace uvg_rtp {
* Return RTP_PKT_NOT_HANDLED if the packet is not handled by this handler
* Return RTP_PKT_MODIFIED if the packet was modified but should be forwarded to other handlers
* Return RTP_GENERIC_ERROR if the packet was corrupted in some way */
static rtp_error_t packet_handler(ssize_t size, void *packet, int flags, frame::rtp_frame **out);
static rtp_error_t packet_handler(int flags, frame::rtp_frame **frame);
protected:
rtp_error_t __push_frame(uint8_t *data, size_t data_len, int flags);

View File

@ -33,7 +33,7 @@ namespace uvg_rtp {
* Return RTP_PKT_NOT_HANDLED if the packet is not handled by this handler
* Return RTP_PKT_MODIFIED if the packet was modified but should be forwarded to other handlers
* Return RTP_GENERIC_ERROR if the packet was corrupted in some way */
static rtp_error_t packet_handler(ssize_t size, void *packet, int flags, frame::rtp_frame **out);
static rtp_error_t packet_handler(int flags, frame::rtp_frame **frame);
protected:
virtual rtp_error_t __push_frame(uint8_t *data, size_t data_len, int flags);

View File

@ -199,6 +199,9 @@ namespace uvg_rtp {
/* media stream type */
enum mstream_type type_;
/* Primary handler's key for the RTP packet dispatcher */
uint32_t rtp_handler_key_;
/* RTP packet dispatcher for the receiver */
uvg_rtp::pkt_dispatcher *pkt_dispatcher_;
std::thread *dispatcher_thread_;

View File

@ -11,17 +11,42 @@
namespace uvg_rtp {
typedef rtp_error_t (*packet_handler)(ssize_t, void *, int, uvg_rtp::frame::rtp_frame **);
typedef rtp_error_t (*packet_handler_aux)(int, uvg_rtp::frame::rtp_frame **);
struct packet_handlers {
packet_handler primary;
std::vector<packet_handler_aux> auxiliary;
};
class pkt_dispatcher : public runner {
public:
pkt_dispatcher();
~pkt_dispatcher();
/* Install a generic handler for an incoming packet
/* Install a primary handler for an incoming UDP datagram
*
* Return RTP_OK on successfully
* Return RTP_INVALID_VALUE if "handler" is nullptr */
rtp_error_t install_handler(packet_handler handler);
* This handler is responsible for creating an operable RTP packet
* that auxiliary handlers can work with.
*
* It is also responsible for validating the packet on a high level
* (ZRTP checksum/RTP version etc) before passing it onto other handlers.
*
* Return a key on success that differentiates primary packet handlers
* Return 0 "handler" is nullptr */
uint32_t install_handler(packet_handler handler);
/* Install auxiliary handler for the packet
*
* This handler is responsible for doing auxiliary operations on the packet
* such as gathering sessions statistics data or decrypting the packet
* It is called only after the primary handler of the auxiliary handler is called
*
* "key" is used to specify for which primary handlers for "handler"
* An auxiliary handler can be installed to multiple primary handlers
*
* Return RTP_OK on success
* Return RTP_INVALID_VALUE if "handler" is nullptr or if "key" is not valid */
rtp_error_t install_aux_handler(uint32_t key, packet_handler_aux handler);
/* Install receive hook for the RTP packet dispatcher
*
@ -52,12 +77,15 @@ namespace uvg_rtp {
uvg_rtp::frame::rtp_frame *pull_frame();
uvg_rtp::frame::rtp_frame *pull_frame(size_t ms);
/* Return reference to the vector that holds all installed handlers */
std::vector<uvg_rtp::packet_handler>& get_handlers();
/* Return reference to the map that holds all installed handlers */
std::unordered_map<uint32_t, uvg_rtp::packet_handlers>& get_handlers();
/* Return a processed RTP frame to user either through frame queue or receive hook */
void return_frame(uvg_rtp::frame::rtp_frame *frame);
/* Call auxiliary handlers of a primary handler */
void call_aux_handlers(uint32_t key, int flags, uvg_rtp::frame::rtp_frame **frame);
/* RTP packet dispatcher thread */
static void runner(
uvg_rtp::pkt_dispatcher *dispatcher,
@ -67,7 +95,7 @@ namespace uvg_rtp {
);
private:
std::vector<packet_handler> packet_handlers_;
std::unordered_map<uint32_t, packet_handlers> packet_handlers_;
/* If receive hook has not been installed, frames are pushed to "frames_"
* and they can be retrieved using pull_frame() */

View File

@ -145,7 +145,7 @@ namespace uvg_rtp {
rtp_error_t install_app_hook(void (*hook)(uvg_rtp::frame::rtcp_app_frame *));
/* Update RTCP-related session statistics */
static rtp_error_t packet_handler(ssize_t size, void *packet, int flags, frame::rtp_frame **out);
static rtp_error_t packet_handler(int flags, frame::rtp_frame **out);
private:
static void rtcp_runner(rtcp *rtcp);

View File

@ -102,10 +102,8 @@ static void __drop_frame(frame_info_t& finfo, uint32_t ts)
finfo.erase(ts);
}
rtp_error_t uvg_rtp::formats::hevc::packet_handler(ssize_t size, void *packet, int flags, uvg_rtp::frame::rtp_frame **out)
rtp_error_t uvg_rtp::formats::hevc::packet_handler(int flags, uvg_rtp::frame::rtp_frame **out)
{
(void)size, (void)packet;
static frame_info_t finfo;
static std::unordered_set<uint32_t> dropped;

View File

@ -92,15 +92,8 @@ rtp_error_t uvg_rtp::formats::media::__push_frame(uint8_t *data, size_t data_len
return socket_->sendto(buffers, 0);
}
rtp_error_t uvg_rtp::formats::media::packet_handler(
ssize_t size,
void *packet,
int flags,
uvg_rtp::frame::rtp_frame **out
)
rtp_error_t uvg_rtp::formats::media::packet_handler(int flags, uvg_rtp::frame::rtp_frame **out)
{
(void)size, (void)packet;
struct frame_info {
uint32_t s_seq;
uint32_t e_seq;

View File

@ -23,6 +23,7 @@ uvg_rtp::media_stream::media_stream(std::string addr, int src_port, int dst_port
ctx_config_(),
media_config_(nullptr),
initialized_(false),
rtp_handler_key_(0),
pkt_dispatcher_(nullptr),
dispatcher_thread_(nullptr),
media_(nullptr)
@ -115,19 +116,22 @@ rtp_error_t uvg_rtp::media_stream::init()
return RTP_MEMORY_ERROR;
}
pkt_dispatcher_->install_handler(rtp_->packet_handler);
pkt_dispatcher_->install_handler(rtcp_->packet_handler);
rtp_handler_key_ = pkt_dispatcher_->install_handler(rtp_->packet_handler);
pkt_dispatcher_->install_aux_handler(rtp_handler_key_, rtcp_->packet_handler);
switch (fmt_) {
case RTP_FORMAT_HEVC:
media_ = new uvg_rtp::formats::hevc(&socket_, rtp_, ctx_config_.flags);
pkt_dispatcher_->install_handler(dynamic_cast<uvg_rtp::formats::hevc *>(media_)->packet_handler);
pkt_dispatcher_->install_aux_handler(
rtp_handler_key_,
dynamic_cast<uvg_rtp::formats::hevc *>(media_)->packet_handler
);
break;
case RTP_FORMAT_OPUS:
case RTP_FORMAT_GENERIC:
media_ = new uvg_rtp::formats::media(&socket_, rtp_, ctx_config_.flags);
pkt_dispatcher_->install_handler(media_->packet_handler);
pkt_dispatcher_->install_aux_handler(rtp_handler_key_, media_->packet_handler);
break;
default:

View File

@ -9,6 +9,7 @@
#include "debug.hh"
#include "pkt_dispatch.hh"
#include "random.hh"
#include "util.hh"
uvg_rtp::pkt_dispatcher::pkt_dispatcher():
@ -88,16 +89,34 @@ uvg_rtp::frame::rtp_frame *uvg_rtp::pkt_dispatcher::pull_frame(size_t timeout)
return frame;
}
rtp_error_t uvg_rtp::pkt_dispatcher::install_handler(uvg_rtp::packet_handler handler)
uint32_t uvg_rtp::pkt_dispatcher::install_handler(uvg_rtp::packet_handler handler)
{
uint32_t key;
if (!handler)
return 0;
do {
key = uvg_rtp::random::generate_32();
} while (!key || (packet_handlers_.find(key) != packet_handlers_.end()));
packet_handlers_[key].primary = handler;
return key;
}
rtp_error_t uvg_rtp::pkt_dispatcher::install_aux_handler(uint32_t key, uvg_rtp::packet_handler_aux handler)
{
if (!handler)
return RTP_INVALID_VALUE;
packet_handlers_.push_back(handler);
if (packet_handlers_.find(key) == packet_handlers_.end())
return RTP_INVALID_VALUE;
packet_handlers_[key].auxiliary.push_back(handler);
return RTP_OK;
}
std::vector<uvg_rtp::packet_handler>& uvg_rtp::pkt_dispatcher::get_handlers()
std::unordered_map<uint32_t, uvg_rtp::packet_handlers>& uvg_rtp::pkt_dispatcher::get_handlers()
{
return packet_handlers_;
}
@ -113,6 +132,37 @@ void uvg_rtp::pkt_dispatcher::return_frame(uvg_rtp::frame::rtp_frame *frame)
}
}
void uvg_rtp::pkt_dispatcher::call_aux_handlers(uint32_t key, int flags, uvg_rtp::frame::rtp_frame **frame)
{
rtp_error_t ret;
for (auto& handler : packet_handlers_[key].auxiliary) {
switch ((ret = (*handler)(flags, frame))) {
/* packet was handled successfully */
case RTP_OK:
break;
case RTP_PKT_READY:
this->return_frame(*frame);
break;
/* packet was not handled or only partially handled by the handler
* proceed to the next handler */
case RTP_PKT_NOT_HANDLED:
case RTP_PKT_MODIFIED:
continue;
case RTP_GENERIC_ERROR:
LOG_DEBUG("Received a corrupted packet!");
break;
default:
LOG_ERROR("Unknown error code from packet handler: %d", ret);
break;
}
}
}
/* The point of packet dispatcher is to provide much-needed isolation between different layers
* of uvgRTP. For example, HEVC handler should not concern itself with RTP packet validation
* because that should be a global operation done for all packets.
@ -169,6 +219,7 @@ void uvg_rtp::pkt_dispatcher::runner(
const size_t recv_buffer_len = 8192;
uint8_t recv_buffer[recv_buffer_len] = { 0 };
auto handlers = dispatcher->get_handlers();
while (!dispatcher->active())
;
@ -193,23 +244,22 @@ void uvg_rtp::pkt_dispatcher::runner(
break;
}
for (auto& handler : dispatcher->get_handlers()) {
switch ((ret = (*handler)(nread, recv_buffer, flags, &frame))) {
for (auto& handler : handlers) {
switch ((ret = (*handler.second.primary)(nread, recv_buffer, flags, &frame))) {
/* packet was handled successfully */
case RTP_OK:
break;
/* "out" contains an RTP packet that can be returned to the user */
case RTP_PKT_READY:
dispatcher->return_frame(frame);
break;
/* the received packet is not handled at all or only partially by the called handler
* proceed to the next handler */
/* packet was not handled by this primary handlers, proceed to the next one */
case RTP_PKT_NOT_HANDLED:
case RTP_PKT_MODIFIED:
continue;
/* packet was handled by the primary handler
* and should be dispatched to the auxiliary handler(s) */
case RTP_PKT_MODIFIED:
dispatcher->call_aux_handlers(handler.first, flags, &frame);
break;
case RTP_GENERIC_ERROR:
LOG_DEBUG("Received a corrupted packet!");
break;
@ -218,7 +268,6 @@ void uvg_rtp::pkt_dispatcher::runner(
LOG_ERROR("Unknown error code from packet handler: %d", ret);
break;
}
}
} while (ret == RTP_OK);
}

View File

@ -1109,7 +1109,7 @@ void uvg_rtp::rtcp::rtcp_runner(uvg_rtp::rtcp *rtcp)
}
}
rtp_error_t uvg_rtp::rtcp::packet_handler(ssize_t size, void *packet, int flags, frame::rtp_frame **out)
rtp_error_t uvg_rtp::rtcp::packet_handler(int flags, frame::rtp_frame **out)
{
return RTP_PKT_NOT_HANDLED;
}