multiplex: Add mutexes to socketfactory

This commit is contained in:
Heikki Tampio 2023-06-27 13:13:36 +03:00
parent 15e2cdd321
commit d145dc04ef
3 changed files with 26 additions and 9 deletions

View File

@ -482,21 +482,28 @@ rtp_error_t uvgrtp::media_stream::start_components()
// 2. Install an RTCP reader into socketfactory
// 3. In RTCP reader, map our RTCP object to the REMOTE ssrc of this stream. If we are not doing
// any socket multiplexing, it will be 0 by default
if (!sfp_->is_port_in_use(rtcp_port)) {
rtcp_socket = sfp_->get_socket_ptr(rtcp_port);
if (!rtcp_socket) {
rtcp_socket = sfp_->create_new_socket(1);
rtcp_reader = sfp_->install_rtcp_reader(rtcp_port);
rtcp_reader->set_socket(rtcp_socket);
rtcp_->set_socket(rtcp_socket);
rtcp_reader->map_ssrc_to_rtcp(remote_ssrc_, rtcp_);
}
/*if (!sfp_->is_port_in_use(rtcp_port)) {
rtcp_socket = sfp_->create_new_socket(1);
rtcp_reader = sfp_->install_rtcp_reader(rtcp_port);
rtcp_reader->set_socket(rtcp_socket);
rtcp_->set_socket(rtcp_socket);
rtcp_reader->map_ssrc_to_rtcp(remote_ssrc_, rtcp_);
}
}*/
// 1. Source port is in use -> fetch the existing socket
// 2. Fetch the existing RTCP reader from socketfactory
// 3. In RTCP reader, map our RTCP object to the REMOTE ssrc of this stream. In this case, the remote
// ssrc should be manually set to allow multiplexing
else {
rtcp_socket = sfp_->get_socket_ptr(rtcp_port);
//rtcp_socket = sfp_->get_socket_ptr(rtcp_port);
if (!rtcp_socket) {
// This should not ever happen. However if it does, you could just create a new socket like above
UVG_LOG_ERROR("No RTCP socket found");

View File

@ -119,6 +119,7 @@ rtp_error_t uvgrtp::socketfactory::bind_socket(std::shared_ptr<uvgrtp::socket> s
// streams can bind to the same port
// If it is a regular address and you want to multiplex several streams into a single socket, one
// bind is enough
ports_mutex_.lock();
if (ipv6_) {
bind_addr6 = uvgrtp::socket::create_ip6_sockaddr(local_address_, port);
if (uvgrtp::socket::is_multicast(bind_addr6)) {
@ -141,6 +142,7 @@ rtp_error_t uvgrtp::socketfactory::bind_socket(std::shared_ptr<uvgrtp::socket> s
}
if (ret == RTP_OK) {
used_ports_.insert({ port, soc });
ports_mutex_.unlock();
return RTP_OK;
}
@ -160,16 +162,20 @@ rtp_error_t uvgrtp::socketfactory::bind_socket_anyip(std::shared_ptr<uvgrtp::soc
ret = soc->bind(AF_INET, INADDR_ANY, port);
}
if (ret == RTP_OK) {
ports_mutex_.lock();
used_ports_.insert({ port, soc });
ports_mutex_.unlock();
return RTP_OK;
}
}
return ret;
}
std::shared_ptr<uvgrtp::socket> uvgrtp::socketfactory::get_socket_ptr(uint16_t port) const
std::shared_ptr<uvgrtp::socket> uvgrtp::socketfactory::get_socket_ptr(uint16_t port)
{
ports_mutex_.lock();
const auto& ptr = used_ports_.find(port);
ports_mutex_.unlock();
if (ptr != used_ports_.end()) {
return ptr->second;
}
@ -208,7 +214,7 @@ bool uvgrtp::socketfactory::get_ipv6() const
return ipv6_;
}
bool uvgrtp::socketfactory::is_port_in_use(uint16_t port) const
bool uvgrtp::socketfactory::is_port_in_use(uint16_t port)
{
if (used_ports_.find(port) == used_ports_.end()) {
return false;
@ -218,20 +224,23 @@ bool uvgrtp::socketfactory::is_port_in_use(uint16_t port) const
bool uvgrtp::socketfactory::clear_port(uint16_t port, std::shared_ptr<uvgrtp::socket> socket)
{
ports_mutex_.lock();
if (port && used_ports_.find(port) != used_ports_.end()) {
used_ports_.erase(port);
}
ports_mutex_.unlock();
for (auto& p : rtcp_readers_to_ports_) {
if (p.second == port) {
rtcp_readers_to_ports_.erase(p.first);
break;
}
}
socket_mutex_.lock();
auto it = std::find(used_sockets_.begin(), used_sockets_.end(), socket);
if (it != used_sockets_.end()) {
used_sockets_.erase(it);
}
socket_mutex_.unlock();
for (auto& p : reception_flows_) {
if (p.second == socket) {
reception_flows_.erase(p.first);

View File

@ -66,7 +66,7 @@ namespace uvgrtp {
*
* Param port socket with wanted port
* Return pointer to socket on success, nullptr otherwise */
std::shared_ptr<uvgrtp::socket> get_socket_ptr(uint16_t port) const;
std::shared_ptr<uvgrtp::socket> get_socket_ptr(uint16_t port);
/* Get reception flow matching the given socket
*
@ -96,12 +96,13 @@ namespace uvgrtp {
/// \cond DO_NOT_DOCUMENT
bool get_ipv6() const;
bool is_port_in_use(uint16_t port) const;
bool is_port_in_use(uint16_t port);
/// \endcond
private:
std::mutex socket_mutex_;
std::mutex ports_mutex_;
int rce_flags_;
std::string local_address_;
std::map<uint16_t, std::shared_ptr<uvgrtp::socket>> used_ports_;