From d145dc04ef5fc53cd0c1dd00399bf5da74241f6c Mon Sep 17 00:00:00 2001 From: Heikki Tampio Date: Tue, 27 Jun 2023 13:13:36 +0300 Subject: [PATCH] multiplex: Add mutexes to socketfactory --- src/media_stream.cc | 15 +++++++++++---- src/socketfactory.cc | 15 ++++++++++++--- src/socketfactory.hh | 5 +++-- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/media_stream.cc b/src/media_stream.cc index 1f34803..cd54d6d 100644 --- a/src/media_stream.cc +++ b/src/media_stream.cc @@ -482,21 +482,28 @@ rtp_error_t uvgrtp::media_stream::start_components() // 2. Install an RTCP reader into socketfactory // 3. In RTCP reader, map our RTCP object to the REMOTE ssrc of this stream. If we are not doing // any socket multiplexing, it will be 0 by default - - if (!sfp_->is_port_in_use(rtcp_port)) { + rtcp_socket = sfp_->get_socket_ptr(rtcp_port); + if (!rtcp_socket) { + rtcp_socket = sfp_->create_new_socket(1); + rtcp_reader = sfp_->install_rtcp_reader(rtcp_port); + rtcp_reader->set_socket(rtcp_socket); + rtcp_->set_socket(rtcp_socket); + rtcp_reader->map_ssrc_to_rtcp(remote_ssrc_, rtcp_); + } + /*if (!sfp_->is_port_in_use(rtcp_port)) { rtcp_socket = sfp_->create_new_socket(1); rtcp_reader = sfp_->install_rtcp_reader(rtcp_port); rtcp_reader->set_socket(rtcp_socket); rtcp_->set_socket(rtcp_socket); rtcp_reader->map_ssrc_to_rtcp(remote_ssrc_, rtcp_); - } + }*/ // 1. Source port is in use -> fetch the existing socket // 2. Fetch the existing RTCP reader from socketfactory // 3. In RTCP reader, map our RTCP object to the REMOTE ssrc of this stream. In this case, the remote // ssrc should be manually set to allow multiplexing else { - rtcp_socket = sfp_->get_socket_ptr(rtcp_port); + //rtcp_socket = sfp_->get_socket_ptr(rtcp_port); if (!rtcp_socket) { // This should not ever happen. However if it does, you could just create a new socket like above UVG_LOG_ERROR("No RTCP socket found"); diff --git a/src/socketfactory.cc b/src/socketfactory.cc index 6df3be6..032d623 100644 --- a/src/socketfactory.cc +++ b/src/socketfactory.cc @@ -119,6 +119,7 @@ rtp_error_t uvgrtp::socketfactory::bind_socket(std::shared_ptr s // streams can bind to the same port // If it is a regular address and you want to multiplex several streams into a single socket, one // bind is enough + ports_mutex_.lock(); if (ipv6_) { bind_addr6 = uvgrtp::socket::create_ip6_sockaddr(local_address_, port); if (uvgrtp::socket::is_multicast(bind_addr6)) { @@ -141,6 +142,7 @@ rtp_error_t uvgrtp::socketfactory::bind_socket(std::shared_ptr s } if (ret == RTP_OK) { used_ports_.insert({ port, soc }); + ports_mutex_.unlock(); return RTP_OK; } @@ -160,16 +162,20 @@ rtp_error_t uvgrtp::socketfactory::bind_socket_anyip(std::shared_ptrbind(AF_INET, INADDR_ANY, port); } if (ret == RTP_OK) { + ports_mutex_.lock(); used_ports_.insert({ port, soc }); + ports_mutex_.unlock(); return RTP_OK; } } return ret; } -std::shared_ptr uvgrtp::socketfactory::get_socket_ptr(uint16_t port) const +std::shared_ptr uvgrtp::socketfactory::get_socket_ptr(uint16_t port) { + ports_mutex_.lock(); const auto& ptr = used_ports_.find(port); + ports_mutex_.unlock(); if (ptr != used_ports_.end()) { return ptr->second; } @@ -208,7 +214,7 @@ bool uvgrtp::socketfactory::get_ipv6() const return ipv6_; } -bool uvgrtp::socketfactory::is_port_in_use(uint16_t port) const +bool uvgrtp::socketfactory::is_port_in_use(uint16_t port) { if (used_ports_.find(port) == used_ports_.end()) { return false; @@ -218,20 +224,23 @@ bool uvgrtp::socketfactory::is_port_in_use(uint16_t port) const bool uvgrtp::socketfactory::clear_port(uint16_t port, std::shared_ptr socket) { + ports_mutex_.lock(); if (port && used_ports_.find(port) != used_ports_.end()) { used_ports_.erase(port); } + ports_mutex_.unlock(); for (auto& p : rtcp_readers_to_ports_) { if (p.second == port) { rtcp_readers_to_ports_.erase(p.first); break; } } + socket_mutex_.lock(); auto it = std::find(used_sockets_.begin(), used_sockets_.end(), socket); if (it != used_sockets_.end()) { used_sockets_.erase(it); } - + socket_mutex_.unlock(); for (auto& p : reception_flows_) { if (p.second == socket) { reception_flows_.erase(p.first); diff --git a/src/socketfactory.hh b/src/socketfactory.hh index 27afbf2..6e34d75 100644 --- a/src/socketfactory.hh +++ b/src/socketfactory.hh @@ -66,7 +66,7 @@ namespace uvgrtp { * * Param port socket with wanted port * Return pointer to socket on success, nullptr otherwise */ - std::shared_ptr get_socket_ptr(uint16_t port) const; + std::shared_ptr get_socket_ptr(uint16_t port); /* Get reception flow matching the given socket * @@ -96,12 +96,13 @@ namespace uvgrtp { /// \cond DO_NOT_DOCUMENT bool get_ipv6() const; - bool is_port_in_use(uint16_t port) const; + bool is_port_in_use(uint16_t port); /// \endcond private: std::mutex socket_mutex_; + std::mutex ports_mutex_; int rce_flags_; std::string local_address_; std::map> used_ports_;