diff --git a/include/uvgrtp/rtcp.hh b/include/uvgrtp/rtcp.hh index db4a20e..aa74ff5 100644 --- a/include/uvgrtp/rtcp.hh +++ b/include/uvgrtp/rtcp.hh @@ -26,6 +26,7 @@ namespace uvgrtp { class rtp; class srtcp; class socket; + class socketfactory; typedef std::vector> buf_vec; // also defined in socket.hh @@ -118,8 +119,10 @@ namespace uvgrtp { class rtcp { public: /// \cond DO_NOT_DOCUMENT - rtcp(std::shared_ptr rtp, std::shared_ptr> ssrc, std::string cname, int rce_flags); - rtcp(std::shared_ptr rtp, std::shared_ptr> ssrc, std::string cname, std::shared_ptr srtcp, int rce_flags); + rtcp(std::shared_ptr rtp, std::shared_ptr> ssrc, std::string cname, + std::shared_ptr sfp, int rce_flags); + rtcp(std::shared_ptr rtp, std::shared_ptr> ssrc, std::string cname, + std::shared_ptr sfp, std::shared_ptr srtcp, int rce_flags); ~rtcp(); /* start the RTCP runner thread @@ -649,6 +652,7 @@ namespace uvgrtp { std::unique_ptr report_generator_; std::unique_ptr report_reader_; std::shared_ptr rtcp_socket_; + std::shared_ptr sfp_; bool is_active() const { @@ -672,6 +676,7 @@ namespace uvgrtp { std::multimap(uint8_t& subtype, uint32_t& payload_len)>> outgoing_app_hooks_; bool hooked_app_; + bool new_socket_; uvgrtp::frame::rtcp_sdes_item cnameItem_; diff --git a/src/media_stream.cc b/src/media_stream.cc index 56f921d..e5d3bf5 100644 --- a/src/media_stream.cc +++ b/src/media_stream.cc @@ -285,13 +285,13 @@ rtp_error_t uvgrtp::media_stream::init() } rtp_ = std::shared_ptr (new uvgrtp::rtp(fmt_, ssrc_, ipv6_)); - rtcp_ = std::shared_ptr (new uvgrtp::rtcp(rtp_, ssrc_, cname_, rce_flags_)); + rtcp_ = std::shared_ptr (new uvgrtp::rtcp(rtp_, ssrc_, cname_, sfp_, rce_flags_)); socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec); rtp_handler_key_ = reception_flow_->install_handler(rtp_->packet_handler); - reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_.get()->load()); + reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_); reception_flow_->install_aux_handler(rtp_handler_key_, rtcp_.get(), rtcp_->recv_packet_handler, nullptr); @@ -348,7 +348,7 @@ rtp_error_t uvgrtp::media_stream::init(std::shared_ptr zrtp) zrtp->dh_has_finished(); // only after the DH stream has gotten its keys, do we let non-DH stream perform ZRTP - rtcp_ = std::shared_ptr (new uvgrtp::rtcp(rtp_, ssrc_, cname_, srtcp_, rce_flags_)); + rtcp_ = std::shared_ptr (new uvgrtp::rtcp(rtp_, ssrc_, cname_, sfp_, srtcp_, rce_flags_)); socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec); socket_->install_handler(srtp_.get(), srtp_->send_packet_handler); @@ -356,8 +356,8 @@ rtp_error_t uvgrtp::media_stream::init(std::shared_ptr zrtp) rtp_handler_key_ = reception_flow_->install_handler(rtp_->packet_handler); zrtp_handler_key_ = reception_flow_->install_handler(zrtp->packet_handler); - reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_.get()->load()); - reception_flow_->map_handler_key(zrtp_handler_key_, remote_ssrc_.get()->load()); + reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_); + reception_flow_->map_handler_key(zrtp_handler_key_, remote_ssrc_); reception_flow_->install_aux_handler(rtp_handler_key_, rtcp_.get(), rtcp_->recv_packet_handler, nullptr); reception_flow_->install_aux_handler(rtp_handler_key_, srtp_.get(), srtp_->recv_packet_handler, nullptr); @@ -399,7 +399,7 @@ rtp_error_t uvgrtp::media_stream::add_srtp_ctx(uint8_t *key, uint8_t *salt) return free_resources(ret); } - rtcp_ = std::shared_ptr (new uvgrtp::rtcp(rtp_, ssrc_, cname_, srtcp_, rce_flags_)); + rtcp_ = std::shared_ptr (new uvgrtp::rtcp(rtp_, ssrc_, cname_, sfp_, srtcp_, rce_flags_)); socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec); socket_->install_handler(srtp_.get(), srtp_->send_packet_handler); diff --git a/src/reception_flow.cc b/src/reception_flow.cc index 2d2e2cf..773a81a 100644 --- a/src/reception_flow.cc +++ b/src/reception_flow.cc @@ -302,7 +302,7 @@ void uvgrtp::reception_flow::call_aux_handlers(uint32_t key, int rce_flags, uvgr auto fr = *frame; uint32_t pkt_ssrc = fr->header.ssrc; - uint32_t current_ssrc = handler_mapping_[key]; + uint32_t current_ssrc = handler_mapping_[key].get()->load(); bool auxh = false; if (current_ssrc == pkt_ssrc) { auxh = true; @@ -514,13 +514,16 @@ void uvgrtp::reception_flow::process_packet(int rce_flags) uint32_t nhssrc = ntohl(*(uint32_t*)&ptr[8]); uint32_t hnssrc = (uint32_t)ptr[8]; - uint32_t current_ssrc = handler_mapping_[handler.first]; + uint32_t current_ssrc = handler_mapping_[handler.first].get()->load(); bool reth = false; if (current_ssrc == hnssrc || current_ssrc == nhssrc|| current_ssrc == frame->header.ssrc) { reth = true; + UVG_LOG_INFO("Hook ssrc %d", current_ssrc); + } else if (current_ssrc == 0) { reth = true; + UVG_LOG_INFO("Hook ssrc 0"); } else { @@ -623,7 +626,7 @@ void uvgrtp::reception_flow::increase_buffer_size(ssize_t next_write_index) } } -bool uvgrtp::reception_flow::map_handler_key(uint32_t key, uint32_t remote_ssrc) +bool uvgrtp::reception_flow::map_handler_key(uint32_t key, std::shared_ptr> remote_ssrc) { bool ret = false; if (handler_mapping_.find(key) == handler_mapping_.end()) { diff --git a/src/reception_flow.hh b/src/reception_flow.hh index 99bda31..cf59610 100644 --- a/src/reception_flow.hh +++ b/src/reception_flow.hh @@ -156,7 +156,7 @@ namespace uvgrtp { void set_buffer_size(const ssize_t& value); void set_payload_size(const size_t& value); - bool map_handler_key(uint32_t key, uint32_t remote_ssrc); + bool map_handler_key(uint32_t key, std::shared_ptr> remote_ssrc); private: /* RTP packet receiver thread */ @@ -193,7 +193,7 @@ namespace uvgrtp { std::map hooks_; // Map handler keys to media streams remote ssrcs - std::map handler_mapping_; + std::map>> handler_mapping_; std::mutex flow_mutex_; bool should_stop_; diff --git a/src/rtcp.cc b/src/rtcp.cc index c73e81c..6a3e219 100644 --- a/src/rtcp.cc +++ b/src/rtcp.cc @@ -10,6 +10,7 @@ #include "debug.hh" #include "srtp/srtcp.hh" #include "rtcp_packets.hh" +#include "socketfactory.hh" #include "global.hh" @@ -43,7 +44,8 @@ constexpr int ESTIMATED_MAX_RECEPTION_TIME_MS = 10; const uint32_t MAX_SUPPORTED_PARTICIPANTS = 31; -uvgrtp::rtcp::rtcp(std::shared_ptr rtp, std::shared_ptr ssrc, std::string cname, int rce_flags): +uvgrtp::rtcp::rtcp(std::shared_ptr rtp, std::shared_ptr ssrc, std::string cname, + std::shared_ptr sfp, int rce_flags) : rce_flags_(rce_flags), our_role_(RECEIVER), tp_(0), tc_(0), tn_(0), pmembers_(0), members_(0), senders_(0), rtcp_bandwidth_(0), reduced_minimum_(0), @@ -66,12 +68,14 @@ uvgrtp::rtcp::rtcp(std::shared_ptr rtp, std::shared_ptrget_clock_rate(); @@ -107,8 +111,8 @@ uvgrtp::rtcp::rtcp(std::shared_ptr rtp, std::shared_ptr rtp, std::shared_ptr ssrc, std::string cname, - std::shared_ptr srtcp, int rce_flags): - rtcp(rtp, ssrc, cname, rce_flags) + std::shared_ptr sfp, std::shared_ptr srtcp, int rce_flags): + rtcp(rtp, ssrc, cname, sfp, rce_flags) { srtcp_ = srtcp; } @@ -199,17 +203,22 @@ void uvgrtp::rtcp::free_participant(std::unique_ptr participan rtp_error_t uvgrtp::rtcp::start() { active_ = true; - rtcp_socket_ = std::unique_ptr(new uvgrtp::socket(0)); - rtp_error_t ret = RTP_OK; - if (ipv6_) { - ret = rtcp_socket_->init(AF_INET6, SOCK_DGRAM, 0); + ipv6_ = sfp_->get_ipv6(); + + // Source port is given and is not in use -> create new socket + if (local_port_ != 0 && !sfp_->is_port_in_use(local_port_)) { + rtcp_socket_ = sfp_->create_new_socket(); + new_socket_ = true; } + // Source port is in use -> fetch the existing socket else { - ret = rtcp_socket_->init(AF_INET, SOCK_DGRAM, 0); - } - if (ret != RTP_OK) { - return ret; + rtcp_socket_ = sfp_->get_socket_ptr(local_port_); + if (!rtcp_socket_) { + rtcp_socket_ = sfp_->create_new_socket(); + new_socket_ = true; + } } + rtp_error_t ret = RTP_OK; int enable = 1; @@ -217,17 +226,7 @@ rtp_error_t uvgrtp::rtcp::start() { return ret; } - -#ifdef _WIN32 - /* Make the socket non-blocking */ - int enabled = 1; - - if (::ioctlsocket(rtcp_socket_->get_raw_socket(), FIONBIO, (u_long*)&enabled) < 0) - { - UVG_LOG_ERROR("Failed to make the socket non-blocking!"); - } -#endif - + /* Set read timeout (5s for now) * * This means that the socket is listened for 5s at a time and after the timeout, @@ -244,19 +243,9 @@ rtp_error_t uvgrtp::rtcp::start() if (local_addr_ != "") { UVG_LOG_INFO("Binding RTCP to port %s:%d", local_addr_.c_str(), local_port_); - if (ipv6_) { - sockaddr_in6 bind_addr6 = rtcp_socket_->create_ip6_sockaddr(local_addr_, local_port_); - if ((ret = rtcp_socket_->bind_ip6(bind_addr6)) != RTP_OK) - { - return ret; - } - } - else { - sockaddr_in bind_addr = rtcp_socket_->create_sockaddr(AF_INET, local_addr_, local_port_); - if ((ret = rtcp_socket_->bind(bind_addr)) != RTP_OK) - { - return ret; - } + if ((ret = sfp_->bind_socket(rtcp_socket_, local_port_)) != RTP_OK) { + log_platform_error("bind(2) failed"); + return ret; } } else @@ -264,19 +253,9 @@ rtp_error_t uvgrtp::rtcp::start() UVG_LOG_WARN("No local address provided, binding RTCP to INADDR_ANY"); UVG_LOG_INFO("Binding RTCP to port %s:%d", local_addr_.c_str(), local_port_); - if (ipv6_) { - sockaddr_in6 bind_addr6 = rtcp_socket_->create_ip6_sockaddr_any(local_port_); - if ((ret = rtcp_socket_->bind_ip6(bind_addr6)) != RTP_OK) - { - return ret; - } - } - else { - sockaddr_in bind_addr = rtcp_socket_->create_sockaddr(AF_INET, INADDR_ANY, local_port_); - if ((ret = rtcp_socket_->bind(bind_addr)) != RTP_OK) - { - return ret; - } + if ((ret = sfp_->bind_socket_anyip(rtcp_socket_, local_port_)) != RTP_OK) { + log_platform_error("bind(2) to any failed"); + return ret; } } if (ipv6_) {