Implement RTP packet dispatcher
Packet dispatcher is responsible for doing receive-related socket operations and dispatching the received UDP datagram to the installed packet handlers. Packet dispatcher goes through all installed handlers until a suitable handler for the packet is found.
This commit is contained in:
parent
421ad4a549
commit
ff583c3cdd
|
@ -0,0 +1,33 @@
|
|||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
#include "runner.hh"
|
||||
#include "socket.hh"
|
||||
#include "util.hh"
|
||||
|
||||
namespace uvg_rtp {
|
||||
|
||||
typedef rtp_error_t (*packet_handler)(ssize_t, void *);
|
||||
|
||||
class pkt_dispatcher : public runner {
|
||||
public:
|
||||
pkt_dispatcher(uvg_rtp::socket socket);
|
||||
~pkt_dispatcher();
|
||||
|
||||
/* Install a generic handler for an incoming packet
|
||||
*
|
||||
* Return RTP_OK on successfully
|
||||
* Return RTP_INVALID_VALUE if "handler" is nullptr */
|
||||
rtp_error_t install_handler(packet_handler handler);
|
||||
|
||||
/* Return reference to the vector that holds all installed handlers */
|
||||
std::vector<uvg_rtp::packet_handler>& get_handlers();
|
||||
|
||||
private:
|
||||
static void runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket& socket);
|
||||
|
||||
uvg_rtp::socket socket_;
|
||||
std::vector<packet_handler> packet_handlers_;
|
||||
};
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
#ifdef __linux__
|
||||
#include <errno.h>
|
||||
|
||||
#else
|
||||
#define MSG_DONTWAIT 0
|
||||
#endif
|
||||
|
||||
#include <cstring>
|
||||
|
||||
#include "debug.hh"
|
||||
#include "pkt_dispatch.hh"
|
||||
#include "util.hh"
|
||||
|
||||
uvg_rtp::pkt_dispatcher::pkt_dispatcher(uvg_rtp::socket socket):
|
||||
socket_(socket)
|
||||
{
|
||||
}
|
||||
|
||||
uvg_rtp::pkt_dispatcher::~pkt_dispatcher()
|
||||
{
|
||||
}
|
||||
|
||||
rtp_error_t uvg_rtp::pkt_dispatcher::install_handler(uvg_rtp::packet_handler handler)
|
||||
{
|
||||
if (!handler)
|
||||
return RTP_INVALID_VALUE;
|
||||
|
||||
packet_handlers_.push_back(handler);
|
||||
return RTP_OK;
|
||||
}
|
||||
|
||||
std::vector<uvg_rtp::packet_handler>& uvg_rtp::pkt_dispatcher::get_handlers()
|
||||
{
|
||||
return packet_handlers_;
|
||||
}
|
||||
|
||||
/* The point of packet dispatcher is to provide much-needed isolation between different layers
|
||||
* of uvgRTP. For example, HEVC handler should not concern itself with RTP packet validation
|
||||
* because that should be a global operation done for all packets.
|
||||
*
|
||||
* Neither should Opus handler take SRTP-provided authentication tag into account when it is
|
||||
* performing operations on the packet.
|
||||
* And ZRTP packetshould not be relayed from media handler to ZRTP handler et cetera.
|
||||
*
|
||||
* This can be achieved by having a global UDP packet handler for any packet type that validates
|
||||
* all common stuff it can and then dispatches the validated packet to the correct layer using
|
||||
* one of the installed handlers.
|
||||
*
|
||||
* If it's unclear as to which handler should be called, the packet is dispatcher to all relevant
|
||||
* handlers and a handler then returns RTP_OK/RTP_PKT_NOT_HANDLED based on whether the packet was handled.
|
||||
*
|
||||
* For example, if runner detects an incoming ZRTP packet, that packet is immediately dispatched to the
|
||||
* installed ZRTP handler if ZRTP has been enabled.
|
||||
* Likewise, if RTP packet authentication has been enabled, runner validates the packet before passing
|
||||
* it onto any other layer so all future work on the packet is not done in vain due to invalid data */
|
||||
static void runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket& socket)
|
||||
{
|
||||
int nread;
|
||||
fd_set read_fds;
|
||||
rtp_error_t ret;
|
||||
struct timeval t_val;
|
||||
|
||||
FD_ZERO(&read_fds);
|
||||
|
||||
t_val.tv_sec = 0;
|
||||
t_val.tv_usec = 1500;
|
||||
|
||||
const size_t recv_buffer_len = 8192;
|
||||
uint8_t recv_buffer[recv_buffer_len] = { 0 };
|
||||
|
||||
while (dispatcher->active()) {
|
||||
FD_SET(socket.get_raw_socket(), &read_fds);
|
||||
int sret = ::select(socket.get_raw_socket() + 1, &read_fds, nullptr, nullptr, &t_val);
|
||||
|
||||
if (sret < 0) {
|
||||
log_platform_error("select(2) failed");
|
||||
break;
|
||||
}
|
||||
|
||||
do {
|
||||
if ((ret = socket.recvfrom(recv_buffer, recv_buffer_len, MSG_DONTWAIT, &nread)) == RTP_INTERRUPTED)
|
||||
break;
|
||||
|
||||
if (ret != RTP_OK) {
|
||||
LOG_ERROR("recvfrom(2) failed! Packet dispatcher cannot continue %d!", ret);
|
||||
break;
|
||||
}
|
||||
|
||||
for (auto& handler : dispatcher->get_handlers()) {
|
||||
switch ((ret = (*handler)(nread, recv_buffer))) {
|
||||
/* packet was handled successfully or the packet was in some way corrupted */
|
||||
case RTP_OK:
|
||||
case RTP_GENERIC_ERROR:
|
||||
break;
|
||||
|
||||
/* the received packet is not handled by the called handler */
|
||||
case RTP_PKT_NOT_HANDLED:
|
||||
continue;
|
||||
|
||||
default:
|
||||
LOG_ERROR("Unknown error code from packet handler: %d", ret);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
} while (ret == RTP_OK);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue