multiplex: Modify RTCP to use socketfactory for socket management

This commit is contained in:
Heikki Tampio 2023-04-28 09:04:06 +03:00
parent 8f8e58ba01
commit 01d84f303a
5 changed files with 48 additions and 61 deletions

View File

@ -26,6 +26,7 @@ namespace uvgrtp {
class rtp;
class srtcp;
class socket;
class socketfactory;
typedef std::vector<std::pair<size_t, uint8_t*>> buf_vec; // also defined in socket.hh
@ -118,8 +119,10 @@ namespace uvgrtp {
class rtcp {
public:
/// \cond DO_NOT_DOCUMENT
rtcp(std::shared_ptr<uvgrtp::rtp> rtp, std::shared_ptr<std::atomic<std::uint32_t>> ssrc, std::string cname, int rce_flags);
rtcp(std::shared_ptr<uvgrtp::rtp> rtp, std::shared_ptr<std::atomic<std::uint32_t>> ssrc, std::string cname, std::shared_ptr<uvgrtp::srtcp> srtcp, int rce_flags);
rtcp(std::shared_ptr<uvgrtp::rtp> rtp, std::shared_ptr<std::atomic<std::uint32_t>> ssrc, std::string cname,
std::shared_ptr<uvgrtp::socketfactory> sfp, int rce_flags);
rtcp(std::shared_ptr<uvgrtp::rtp> rtp, std::shared_ptr<std::atomic<std::uint32_t>> ssrc, std::string cname,
std::shared_ptr<uvgrtp::socketfactory> sfp, std::shared_ptr<uvgrtp::srtcp> srtcp, int rce_flags);
~rtcp();
/* start the RTCP runner thread
@ -649,6 +652,7 @@ namespace uvgrtp {
std::unique_ptr<std::thread> report_generator_;
std::unique_ptr<std::thread> report_reader_;
std::shared_ptr<uvgrtp::socket> rtcp_socket_;
std::shared_ptr<uvgrtp::socketfactory> sfp_;
bool is_active() const
{
@ -672,6 +676,7 @@ namespace uvgrtp {
std::multimap<std::string, std::function <std::unique_ptr<uint8_t[]>(uint8_t& subtype, uint32_t& payload_len)>> outgoing_app_hooks_;
bool hooked_app_;
bool new_socket_;
uvgrtp::frame::rtcp_sdes_item cnameItem_;

View File

@ -285,13 +285,13 @@ rtp_error_t uvgrtp::media_stream::init()
}
rtp_ = std::shared_ptr<uvgrtp::rtp> (new uvgrtp::rtp(fmt_, ssrc_, ipv6_));
rtcp_ = std::shared_ptr<uvgrtp::rtcp> (new uvgrtp::rtcp(rtp_, ssrc_, cname_, rce_flags_));
rtcp_ = std::shared_ptr<uvgrtp::rtcp> (new uvgrtp::rtcp(rtp_, ssrc_, cname_, sfp_, rce_flags_));
socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec);
rtp_handler_key_ = reception_flow_->install_handler(rtp_->packet_handler);
reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_.get()->load());
reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_);
reception_flow_->install_aux_handler(rtp_handler_key_, rtcp_.get(), rtcp_->recv_packet_handler, nullptr);
@ -348,7 +348,7 @@ rtp_error_t uvgrtp::media_stream::init(std::shared_ptr<uvgrtp::zrtp> zrtp)
zrtp->dh_has_finished(); // only after the DH stream has gotten its keys, do we let non-DH stream perform ZRTP
rtcp_ = std::shared_ptr<uvgrtp::rtcp> (new uvgrtp::rtcp(rtp_, ssrc_, cname_, srtcp_, rce_flags_));
rtcp_ = std::shared_ptr<uvgrtp::rtcp> (new uvgrtp::rtcp(rtp_, ssrc_, cname_, sfp_, srtcp_, rce_flags_));
socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec);
socket_->install_handler(srtp_.get(), srtp_->send_packet_handler);
@ -356,8 +356,8 @@ rtp_error_t uvgrtp::media_stream::init(std::shared_ptr<uvgrtp::zrtp> zrtp)
rtp_handler_key_ = reception_flow_->install_handler(rtp_->packet_handler);
zrtp_handler_key_ = reception_flow_->install_handler(zrtp->packet_handler);
reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_.get()->load());
reception_flow_->map_handler_key(zrtp_handler_key_, remote_ssrc_.get()->load());
reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_);
reception_flow_->map_handler_key(zrtp_handler_key_, remote_ssrc_);
reception_flow_->install_aux_handler(rtp_handler_key_, rtcp_.get(), rtcp_->recv_packet_handler, nullptr);
reception_flow_->install_aux_handler(rtp_handler_key_, srtp_.get(), srtp_->recv_packet_handler, nullptr);
@ -399,7 +399,7 @@ rtp_error_t uvgrtp::media_stream::add_srtp_ctx(uint8_t *key, uint8_t *salt)
return free_resources(ret);
}
rtcp_ = std::shared_ptr<uvgrtp::rtcp> (new uvgrtp::rtcp(rtp_, ssrc_, cname_, srtcp_, rce_flags_));
rtcp_ = std::shared_ptr<uvgrtp::rtcp> (new uvgrtp::rtcp(rtp_, ssrc_, cname_, sfp_, srtcp_, rce_flags_));
socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec);
socket_->install_handler(srtp_.get(), srtp_->send_packet_handler);

View File

@ -302,7 +302,7 @@ void uvgrtp::reception_flow::call_aux_handlers(uint32_t key, int rce_flags, uvgr
auto fr = *frame;
uint32_t pkt_ssrc = fr->header.ssrc;
uint32_t current_ssrc = handler_mapping_[key];
uint32_t current_ssrc = handler_mapping_[key].get()->load();
bool auxh = false;
if (current_ssrc == pkt_ssrc) {
auxh = true;
@ -514,13 +514,16 @@ void uvgrtp::reception_flow::process_packet(int rce_flags)
uint32_t nhssrc = ntohl(*(uint32_t*)&ptr[8]);
uint32_t hnssrc = (uint32_t)ptr[8];
uint32_t current_ssrc = handler_mapping_[handler.first];
uint32_t current_ssrc = handler_mapping_[handler.first].get()->load();
bool reth = false;
if (current_ssrc == hnssrc || current_ssrc == nhssrc|| current_ssrc == frame->header.ssrc) {
reth = true;
UVG_LOG_INFO("Hook ssrc %d", current_ssrc);
}
else if (current_ssrc == 0) {
reth = true;
UVG_LOG_INFO("Hook ssrc 0");
}
else {
@ -623,7 +626,7 @@ void uvgrtp::reception_flow::increase_buffer_size(ssize_t next_write_index)
}
}
bool uvgrtp::reception_flow::map_handler_key(uint32_t key, uint32_t remote_ssrc)
bool uvgrtp::reception_flow::map_handler_key(uint32_t key, std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc)
{
bool ret = false;
if (handler_mapping_.find(key) == handler_mapping_.end()) {

View File

@ -156,7 +156,7 @@ namespace uvgrtp {
void set_buffer_size(const ssize_t& value);
void set_payload_size(const size_t& value);
bool map_handler_key(uint32_t key, uint32_t remote_ssrc);
bool map_handler_key(uint32_t key, std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc);
private:
/* RTP packet receiver thread */
@ -193,7 +193,7 @@ namespace uvgrtp {
std::map<uint32_t, receive_pkt_hook> hooks_;
// Map handler keys to media streams remote ssrcs
std::map<uint32_t, uint32_t> handler_mapping_;
std::map<uint32_t, std::shared_ptr<std::atomic<std::uint32_t>>> handler_mapping_;
std::mutex flow_mutex_;
bool should_stop_;

View File

@ -10,6 +10,7 @@
#include "debug.hh"
#include "srtp/srtcp.hh"
#include "rtcp_packets.hh"
#include "socketfactory.hh"
#include "global.hh"
@ -43,7 +44,8 @@ constexpr int ESTIMATED_MAX_RECEPTION_TIME_MS = 10;
const uint32_t MAX_SUPPORTED_PARTICIPANTS = 31;
uvgrtp::rtcp::rtcp(std::shared_ptr<uvgrtp::rtp> rtp, std::shared_ptr<std::atomic_uint> ssrc, std::string cname, int rce_flags):
uvgrtp::rtcp::rtcp(std::shared_ptr<uvgrtp::rtp> rtp, std::shared_ptr<std::atomic_uint> ssrc, std::string cname,
std::shared_ptr<uvgrtp::socketfactory> sfp, int rce_flags) :
rce_flags_(rce_flags), our_role_(RECEIVER),
tp_(0), tc_(0), tn_(0), pmembers_(0),
members_(0), senders_(0), rtcp_bandwidth_(0), reduced_minimum_(0),
@ -66,12 +68,14 @@ uvgrtp::rtcp::rtcp(std::shared_ptr<uvgrtp::rtp> rtp, std::shared_ptr<std::atomic
sdes_hook_u_(nullptr),
app_hook_f_(nullptr),
app_hook_u_(nullptr),
sfp_(sfp),
active_(false),
interval_ms_(DEFAULT_RTCP_INTERVAL_MS),
rtp_ptr_(rtp),
ourItems_(),
bye_ssrcs_(false),
hooked_app_(false),
new_socket_(false),
mtu_size_(MAX_IPV4_PAYLOAD)
{
clock_rate_ = rtp->get_clock_rate();
@ -107,8 +111,8 @@ uvgrtp::rtcp::rtcp(std::shared_ptr<uvgrtp::rtp> rtp, std::shared_ptr<std::atomic
}
uvgrtp::rtcp::rtcp(std::shared_ptr<uvgrtp::rtp> rtp, std::shared_ptr<std::atomic_uint> ssrc, std::string cname,
std::shared_ptr<uvgrtp::srtcp> srtcp, int rce_flags):
rtcp(rtp, ssrc, cname, rce_flags)
std::shared_ptr<uvgrtp::socketfactory> sfp, std::shared_ptr<uvgrtp::srtcp> srtcp, int rce_flags):
rtcp(rtp, ssrc, cname, sfp, rce_flags)
{
srtcp_ = srtcp;
}
@ -199,17 +203,22 @@ void uvgrtp::rtcp::free_participant(std::unique_ptr<rtcp_participant> participan
rtp_error_t uvgrtp::rtcp::start()
{
active_ = true;
rtcp_socket_ = std::unique_ptr<uvgrtp::socket>(new uvgrtp::socket(0));
rtp_error_t ret = RTP_OK;
if (ipv6_) {
ret = rtcp_socket_->init(AF_INET6, SOCK_DGRAM, 0);
ipv6_ = sfp_->get_ipv6();
// Source port is given and is not in use -> create new socket
if (local_port_ != 0 && !sfp_->is_port_in_use(local_port_)) {
rtcp_socket_ = sfp_->create_new_socket();
new_socket_ = true;
}
// Source port is in use -> fetch the existing socket
else {
ret = rtcp_socket_->init(AF_INET, SOCK_DGRAM, 0);
}
if (ret != RTP_OK) {
return ret;
rtcp_socket_ = sfp_->get_socket_ptr(local_port_);
if (!rtcp_socket_) {
rtcp_socket_ = sfp_->create_new_socket();
new_socket_ = true;
}
}
rtp_error_t ret = RTP_OK;
int enable = 1;
@ -217,17 +226,7 @@ rtp_error_t uvgrtp::rtcp::start()
{
return ret;
}
#ifdef _WIN32
/* Make the socket non-blocking */
int enabled = 1;
if (::ioctlsocket(rtcp_socket_->get_raw_socket(), FIONBIO, (u_long*)&enabled) < 0)
{
UVG_LOG_ERROR("Failed to make the socket non-blocking!");
}
#endif
/* Set read timeout (5s for now)
*
* This means that the socket is listened for 5s at a time and after the timeout,
@ -244,19 +243,9 @@ rtp_error_t uvgrtp::rtcp::start()
if (local_addr_ != "")
{
UVG_LOG_INFO("Binding RTCP to port %s:%d", local_addr_.c_str(), local_port_);
if (ipv6_) {
sockaddr_in6 bind_addr6 = rtcp_socket_->create_ip6_sockaddr(local_addr_, local_port_);
if ((ret = rtcp_socket_->bind_ip6(bind_addr6)) != RTP_OK)
{
return ret;
}
}
else {
sockaddr_in bind_addr = rtcp_socket_->create_sockaddr(AF_INET, local_addr_, local_port_);
if ((ret = rtcp_socket_->bind(bind_addr)) != RTP_OK)
{
return ret;
}
if ((ret = sfp_->bind_socket(rtcp_socket_, local_port_)) != RTP_OK) {
log_platform_error("bind(2) failed");
return ret;
}
}
else
@ -264,19 +253,9 @@ rtp_error_t uvgrtp::rtcp::start()
UVG_LOG_WARN("No local address provided, binding RTCP to INADDR_ANY");
UVG_LOG_INFO("Binding RTCP to port %s:%d", local_addr_.c_str(), local_port_);
if (ipv6_) {
sockaddr_in6 bind_addr6 = rtcp_socket_->create_ip6_sockaddr_any(local_port_);
if ((ret = rtcp_socket_->bind_ip6(bind_addr6)) != RTP_OK)
{
return ret;
}
}
else {
sockaddr_in bind_addr = rtcp_socket_->create_sockaddr(AF_INET, INADDR_ANY, local_port_);
if ((ret = rtcp_socket_->bind(bind_addr)) != RTP_OK)
{
return ret;
}
if ((ret = sfp_->bind_socket_anyip(rtcp_socket_, local_port_)) != RTP_OK) {
log_platform_error("bind(2) to any failed");
return ret;
}
}
if (ipv6_) {