multiplex: Add universal hook to socketfactory

This commit is contained in:
Heikki Tampio 2023-04-19 15:19:50 +03:00
parent fbc8931190
commit 7efce9901a
5 changed files with 59 additions and 11 deletions

View File

@ -396,6 +396,7 @@ namespace uvgrtp {
uint16_t dst_port_;
bool ipv6_;
rtp_format_t fmt_;
bool new_socket_;
/* Media context config */
int rce_flags_ = 0;

View File

@ -48,6 +48,7 @@ uvgrtp::media_stream::media_stream(std::string cname, std::string remote_addr,
dst_port_(dst_port),
ipv6_(false),
fmt_(fmt),
new_socket_(false),
rce_flags_(rce_flags),
initialized_(false),
rtp_handler_key_(0),
@ -58,7 +59,7 @@ uvgrtp::media_stream::media_stream(std::string cname, std::string remote_addr,
cname_(cname),
fps_numerator_(30),
fps_denominator_(1),
ssrc_(std::make_shared<std::atomic<std::uint32_t>>(uvgrtp::random::generate_32())),
ssrc_(std::make_shared<std::atomic<std::uint32_t>>(1111)),
remote_ssrc_(std::make_shared<std::atomic<std::uint32_t>>(uvgrtp::random::generate_32()))
{
//socket_ = sfp_->create_new_socket();
@ -98,12 +99,14 @@ rtp_error_t uvgrtp::media_stream::init_connection()
if (src_port_ != 0 && !sfp_->is_port_in_use(src_port_)) {
socket_ = sfp_->create_new_socket();
sfp_->start(socket_, 0);
new_socket_ = true;
//sfp_->start(socket_, 0);
}
else {
if (sfp_->get_socket_ptr() == nullptr) {
socket_ = sfp_->create_new_socket();
sfp_->start(socket_, 0);
new_socket_ = true;
//sfp_->start(socket_, 0);
}
else {
socket_ = sfp_->get_socket_ptr();
@ -306,7 +309,7 @@ rtp_error_t uvgrtp::media_stream::init(std::shared_ptr<uvgrtp::zrtp> zrtp)
return RTP_GENERIC_ERROR;
}
reception_flow_ = std::unique_ptr<uvgrtp::reception_flow> (new uvgrtp::reception_flow());
//reception_flow_ = std::unique_ptr<uvgrtp::reception_flow> (new uvgrtp::reception_flow());
rtp_ = std::shared_ptr<uvgrtp::rtp> (new uvgrtp::rtp(fmt_, ssrc_, ipv6_));
@ -349,12 +352,17 @@ rtp_error_t uvgrtp::media_stream::init(std::shared_ptr<uvgrtp::zrtp> zrtp)
socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec);
socket_->install_handler(srtp_.get(), srtp_->send_packet_handler);
rtp_handler_key_ = reception_flow_->install_handler(rtp_->packet_handler);
zrtp_handler_key_ = reception_flow_->install_handler(zrtp->packet_handler);
//rtp_handler_key_ = reception_flow_->install_handler(rtp_->packet_handler);
//zrtp_handler_key_ = reception_flow_->install_handler(zrtp->packet_handler);
reception_flow_->install_aux_handler(rtp_handler_key_, rtcp_.get(), rtcp_->recv_packet_handler, nullptr);
reception_flow_->install_aux_handler(rtp_handler_key_, srtp_.get(), srtp_->recv_packet_handler, nullptr);
rtp_handler_key_ = sfp_->install_handler(rtp_->packet_handler);
zrtp_handler_key_ = sfp_->install_handler(zrtp->packet_handler);
//reception_flow_->install_aux_handler(rtp_handler_key_, rtcp_.get(), rtcp_->recv_packet_handler, nullptr);
//reception_flow_->install_aux_handler(rtp_handler_key_, srtp_.get(), srtp_->recv_packet_handler, nullptr);
sfp_->install_aux_handler(rtp_handler_key_, rtcp_.get(), rtcp_->recv_packet_handler, nullptr);
sfp_->install_aux_handler(rtp_handler_key_, srtp_.get(), srtp_->recv_packet_handler, nullptr);
return start_components();
}
@ -445,6 +453,10 @@ rtp_error_t uvgrtp::media_stream::start_components()
initialized_ = true;
//return reception_flow_->start(socket_, rce_flags_);
//return sfp_->start(rce_flags_);
//return RTP_OK;
if (new_socket_) {
return sfp_->start(socket_, 0);
}
return RTP_OK;
}
@ -655,7 +667,8 @@ rtp_error_t uvgrtp::media_stream::install_receive_hook(void *arg, void (*hook)(v
return RTP_INVALID_VALUE;
//reception_flow_
// PLACEHOLDER -------- IMPLEMENTS THIS: should be the remote ssrc
sfp_->install_receive_hook(arg, hook, remote_ssrc_.get()->load());
//remote_ssrc_.get()->load()
sfp_->install_receive_hook(arg, hook, 1111);
return RTP_OK;
}

View File

@ -5,7 +5,7 @@
static std::mt19937 rng{std::random_device{}()};
static std::uniform_int_distribution<uint32_t> gen32_dist{
std::numeric_limits<uint32_t>::min(), std::numeric_limits<uint32_t>::max()};
1, std::numeric_limits<uint32_t>::max()};
uint32_t uvgrtp::random::generate_32() {
return gen32_dist(rng);

View File

@ -33,6 +33,7 @@ uvgrtp::socketfactory::socketfactory(int rce_flags) :
ipv6_(false),
used_sockets_({}),
hooks_({}),
universal_hook_set_(false),
packet_handlers_({}),
should_stop_(true),
receiver_(nullptr),
@ -212,6 +213,26 @@ rtp_error_t uvgrtp::socketfactory::install_receive_hook(
return RTP_OK;
}
rtp_error_t uvgrtp::socketfactory::install_universal_receive_hook(
void* arg,
void (*hook)(void*, uvgrtp::frame::rtp_frame*)
)
{
if (!hook) {
return RTP_INVALID_VALUE;
}
// tämä ssrc on sitten se meidän eli vastaanottajan pään media streamin ssrc
if (!universal_hook_set_) {
receive_pkt_hook new_hook = { arg, hook };
hooks_[0] = new_hook;
universal_hook_set_ = true;
}
//recv_hook_ = hook;
//recv_hook_arg_ = arg;
return RTP_OK;
}
rtp_error_t uvgrtp::socketfactory::install_aux_handler(
uint32_t key,
void* arg,
@ -651,8 +672,15 @@ void uvgrtp::socketfactory::destroy_ring_buffer()
void uvgrtp::socketfactory::return_frame(uvgrtp::frame::rtp_frame* frame)
{
uint32_t ssrc = frame->header.ssrc;
if (universal_hook_set_) {
// if universal hook is set, first call it
receive_pkt_hook pkt_hook = hooks_[0];
recv_hook hook = pkt_hook.hook;
void* arg = pkt_hook.arg;
hook(arg, frame);
}
if(hooks_.count(ssrc) > 0) {
// then call the hook that is assigned to receive from this remote ssrc
receive_pkt_hook pkt_hook = hooks_[ssrc];
recv_hook hook = pkt_hook.hook;
void* arg = pkt_hook.arg;

View File

@ -41,6 +41,11 @@ namespace uvgrtp {
rtp_error_t install_aux_handler_cpp(uint32_t key,
std::function<rtp_error_t(int, uvgrtp::frame::rtp_frame**)> handler,
std::function<rtp_error_t(uvgrtp::frame::rtp_frame**)> getter);
// SSRC 0 is reserved for universal receive hook
rtp_error_t install_universal_receive_hook(void* arg, void (*hook)(void*, uvgrtp::frame::rtp_frame*));
rtp_error_t install_receive_hook(void* arg, void (*hook)(void*, uvgrtp::frame::rtp_frame*), uint32_t ssrc);
rtp_error_t start(std::shared_ptr<uvgrtp::socket> socket, int rce_flags);
@ -78,6 +83,7 @@ namespace uvgrtp {
//void* recv_hook_arg_;
//void (*recv_hook_)(void* arg, uvgrtp::frame::rtp_frame* frame);
std::map<uint32_t, receive_pkt_hook> hooks_;
bool universal_hook_set_;
std::unordered_map<uint32_t, packet_handlers> packet_handlers_;
bool should_stop_;