multiplex: Fix bug when closing RTCP report reader

This commit is contained in:
Heikki Tampio 2023-05-10 14:22:15 +03:00
parent 6679eb4719
commit 0dde713b63
10 changed files with 135 additions and 52 deletions

View File

@ -28,6 +28,7 @@ namespace uvgrtp {
class holepuncher;
class socket;
class socketfactory;
class rtcp_reader;
namespace frame {
struct rtp_frame;

View File

@ -412,6 +412,8 @@ namespace uvgrtp {
/* Update RTCP-related sender statistics */
rtp_error_t update_sender_stats(size_t pkt_size);
void set_socket(std::shared_ptr<uvgrtp::socket> socket);
/* Update RTCP-related receiver statistics */
static rtp_error_t recv_packet_handler(void *arg, int rce_flags, frame::rtp_frame **out);

View File

@ -10,6 +10,7 @@
#include "rtp.hh"
#include "zrtp.hh"
#include "socket.hh"
#include "rtcp_reader.hh"
#include "holepuncher.hh"
#include "reception_flow.hh"
@ -451,7 +452,34 @@ rtp_error_t uvgrtp::media_stream::start_components()
rtcp_->add_initial_participant(rtp_->get_clock_rate());
bandwidth_ = get_default_bandwidth_kbps(fmt_);
rtcp_->set_session_bandwidth(bandwidth_);
// Source port is given and is not in use -> create new socket
uint16_t rtcp_port = src_port_ + 1;
std::shared_ptr<uvgrtp::socket> rtcp_socket;
if (!sfp_->is_port_in_use(rtcp_port)) {
rtcp_socket = sfp_->create_new_socket();
auto rtcp_reader = sfp_->install_rtcp_reader(rtcp_port);
//new_socket_ = true;
rtcp_reader->set_socket(rtcp_socket, rtcp_port);
rtcp_->set_socket(rtcp_socket);
rtcp_reader->map_ssrc_to_rtcp(remote_ssrc_, rtcp_);
}
// Source port is in use -> fetch the existing socket
else {
rtcp_socket = sfp_->get_socket_ptr(rtcp_port);
if (!rtcp_socket) {
// This should not ever happen. However if it does, you could just create a new socket like above
UVG_LOG_ERROR("No RTCP socket found");
return RTP_GENERIC_ERROR;
}
rtcp_->set_socket(rtcp_socket);
auto rtcp_reader = sfp_->get_rtcp_reader(rtcp_port);
//rtcp_socket = sfp_->get_socket_ptr(rtcp_port);
rtcp_reader->map_ssrc_to_rtcp(remote_ssrc_, rtcp_);
}
rtcp_->start();
}
}
@ -465,11 +493,7 @@ rtp_error_t uvgrtp::media_stream::start_components()
}
initialized_ = true;
if (new_socket_) {
return reception_flow_->start(socket_, rce_flags_);
}
return RTP_OK;
return reception_flow_->start(socket_, rce_flags_);
}
rtp_error_t uvgrtp::media_stream::push_frame(uint8_t *data, size_t data_len, int rtp_flags)

View File

@ -33,7 +33,8 @@ uvgrtp::reception_flow::reception_flow() :
last_ring_write_index_(-1),
socket_(),
buffer_size_kbytes_(DEFAULT_INITIAL_BUFFER_SIZE),
payload_size_(MAX_IPV4_PAYLOAD)
payload_size_(MAX_IPV4_PAYLOAD),
active_(false)
{
create_ring_buffer();
}
@ -102,6 +103,9 @@ void uvgrtp::reception_flow::set_payload_size(const size_t& value)
rtp_error_t uvgrtp::reception_flow::start(std::shared_ptr<uvgrtp::socket> socket, int rce_flags)
{
if (active_) {
return RTP_OK;
}
should_stop_ = false;
UVG_LOG_DEBUG("Creating receiving threads and setting priorities");
@ -121,12 +125,15 @@ rtp_error_t uvgrtp::reception_flow::start(std::shared_ptr<uvgrtp::socket> socket
SetThreadPriority(processor_->native_handle(), ABOVE_NORMAL_PRIORITY_CLASS);
#endif
active_ = true;
return RTP_ERROR::RTP_OK;
}
rtp_error_t uvgrtp::reception_flow::stop()
{
if (!active_) {
return RTP_OK;
}
should_stop_ = true;
process_cond_.notify_all();
@ -141,6 +148,7 @@ rtp_error_t uvgrtp::reception_flow::stop()
}
clear_frames();
active_ = false;
return RTP_OK;
}

View File

@ -223,6 +223,7 @@ namespace uvgrtp {
ssize_t buffer_size_kbytes_;
size_t payload_size_;
bool active_;
};
}

View File

@ -204,16 +204,14 @@ rtp_error_t uvgrtp::rtcp::start()
{
active_ = true;
ipv6_ = sfp_->get_ipv6();
/*
// Source port is given and is not in use -> create new socket
if (local_port_ != 0 && !sfp_->is_port_in_use(local_port_)) {
rtcp_socket_ = sfp_->create_new_socket();
new_socket_ = true;
rtcp_reader_ = std::shared_ptr<uvgrtp::rtcp_reader>(new uvgrtp::rtcp_reader(sfp_));
rtcp_reader_ = sfp_->install_rtcp_reader(local_port_);
rtcp_reader_->set_socket(rtcp_socket_, local_port_);
rtcp_reader_->map_ssrc_to_rtcp(remote_ssrc_, std::shared_ptr<uvgrtp::rtcp>(this));
sfp_->map_port_to_rtcp_reader(local_port_, rtcp_reader_);
}
// Source port is in use -> fetch the existing socket
else {
@ -225,10 +223,10 @@ rtp_error_t uvgrtp::rtcp::start()
}
rtcp_reader_ = sfp_->get_rtcp_reader(local_port_);
rtcp_socket_ = sfp_->get_socket_ptr(local_port_);
//rtcp_reader_->set_socket(rtcp_socket_);
rtcp_reader_->map_ssrc_to_rtcp(remote_ssrc_, std::shared_ptr<uvgrtp::rtcp>(this));
}
*/
rtcp_reader_ = sfp_->get_rtcp_reader(local_port_);
rtp_error_t ret = RTP_OK;
@ -270,10 +268,7 @@ rtp_error_t uvgrtp::rtcp::start()
socket_address_ = uvgrtp::socket::create_sockaddr(AF_INET, remote_addr_, dst_port_);
}
report_generator_.reset(new std::thread(rtcp_runner, this));
//report_reader_.reset(new std::thread(rtcp_report_reader, this));
if (new_socket_) {
rtcp_reader_->start();
}
rtcp_reader_->start();
return RTP_OK;
}
@ -299,7 +294,6 @@ rtp_error_t uvgrtp::rtcp::stop()
cleanup_participants();
return RTP_OK;
}
active_ = false;
if (report_generator_ && report_generator_->joinable())
{
@ -307,12 +301,8 @@ rtp_error_t uvgrtp::rtcp::stop()
report_generator_->join();
}
if ((rtcp_reader_->clear_rtcp_from_reader(remote_ssrc_)) == 1) {
rtcp_reader_->stop();
if (sfp_ && local_port_ != 0) {
sfp_->clear_port(local_port_, rtcp_socket_, nullptr);
rtcp_socket_.reset();
}
if (rtcp_reader_ && rtcp_reader_->clear_rtcp_from_reader(remote_ssrc_, local_port_ == 1)) {
sfp_->clear_port(local_port_, rtcp_socket_, nullptr);
}
return ret;
}
@ -2132,3 +2122,8 @@ void uvgrtp::rtcp::set_payload_size(size_t mtu_size)
{
mtu_size_ = mtu_size;
}
void uvgrtp::rtcp::set_socket(std::shared_ptr<uvgrtp::socket> socket)
{
rtcp_socket_ = socket;
}

View File

@ -19,9 +19,8 @@
const int MAX_PACKET = 65536;
uvgrtp::rtcp_reader::rtcp_reader(std::shared_ptr<uvgrtp::socketfactory> sfp) :
uvgrtp::rtcp_reader::rtcp_reader() :
active_(false),
sfp_(sfp),
socket_(nullptr),
rtcps_map_({})
{
@ -37,6 +36,9 @@ uvgrtp::rtcp_reader::~rtcp_reader()
rtp_error_t uvgrtp::rtcp_reader::start()
{
if (active_) {
return RTP_OK;
}
report_reader_.reset(new std::thread(&uvgrtp::rtcp_reader::rtcp_report_reader, this));
active_ = true;
return RTP_OK;
@ -103,24 +105,22 @@ bool uvgrtp::rtcp_reader::set_socket(std::shared_ptr<uvgrtp::socket> socket, uin
bool uvgrtp::rtcp_reader::map_ssrc_to_rtcp(std::shared_ptr<std::atomic<uint32_t>> ssrc, std::shared_ptr<uvgrtp::rtcp> rtcp)
{
map_mutex_.lock();
rtcps_map_[ssrc] = rtcp;
map_mutex_.unlock();
return true;
}
int uvgrtp::rtcp_reader::clear_rtcp_from_reader(std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc)
{
if (rtcps_map_.find(remote_ssrc) != rtcps_map_.end() && rtcps_map_.size() == 1) {
int uvgrtp::rtcp_reader::clear_rtcp_from_reader(std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc, uint16_t port)
{
map_mutex_.lock();
if (rtcps_map_.find(remote_ssrc) != rtcps_map_.end()) {
rtcps_map_.erase(remote_ssrc);
}
map_mutex_.unlock();
if (rtcps_map_.empty()) {
stop();
return 1;
}
return 0;
/*
map_mutex_.lock();
if (rtcps_map_.find(remote_ssrc) != rtcps_map_.end()) {
//rtcps_map_.erase(remote_ssrc);
}
map_mutex_.unlock();/
if (rtcps_map_.empty()) {
return 1;
}
return 0;*/
}

View File

@ -27,7 +27,7 @@ namespace uvgrtp {
class rtcp_reader {
public:
rtcp_reader(std::shared_ptr<uvgrtp::socketfactory> sfp);
rtcp_reader();
~rtcp_reader();
rtp_error_t start();
rtp_error_t stop();
@ -36,11 +36,10 @@ namespace uvgrtp {
bool set_socket(std::shared_ptr<uvgrtp::socket> socket, uint16_t port);
// Map REMOTE ssrc to rtcp
bool map_ssrc_to_rtcp(std::shared_ptr<std::atomic<uint32_t>> ssrc, std::shared_ptr<uvgrtp::rtcp> rtcp);
int clear_rtcp_from_reader(std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc);
int clear_rtcp_from_reader(std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc, uint16_t port);
private:
bool active_;
std::shared_ptr<uvgrtp::socketfactory> sfp_;
std::shared_ptr<uvgrtp::socket> socket_;
std::map<std::shared_ptr<std::atomic<uint32_t>>, std::shared_ptr<uvgrtp::rtcp>> rtcps_map_;
std::unique_ptr<std::thread> report_reader_;

View File

@ -1,6 +1,7 @@
#include "socketfactory.hh"
#include "socket.hh"
#include "uvgrtp/frame.hh"
#include "rtcp_reader.hh"
#include "random.hh"
#include "global.hh"
#include "debug.hh"
@ -49,7 +50,6 @@ rtp_error_t uvgrtp::socketfactory::stop()
rtp_error_t uvgrtp::socketfactory::set_local_interface(std::string local_addr)
{
//rtp_error_t ret;
local_address_ = local_addr;
// check IP address family
@ -116,7 +116,7 @@ rtp_error_t uvgrtp::socketfactory::bind_socket(std::shared_ptr<uvgrtp::socket> s
// If it is a regular address and you want to multiplex several streams into a single socket, one
// bind is enough
if (ipv6_) {
bind_addr6 = soc->create_ip6_sockaddr(local_address_, port);
bind_addr6 = uvgrtp::socket::create_ip6_sockaddr(local_address_, port);
if (uvgrtp::socket::is_multicast(bind_addr6)) {
UVG_LOG_INFO("The used address %s is a multicast address", local_address_.c_str());
ret = soc->bind_ip6(bind_addr6);
@ -126,7 +126,7 @@ rtp_error_t uvgrtp::socketfactory::bind_socket(std::shared_ptr<uvgrtp::socket> s
}
}
else {
bind_addr = soc->create_sockaddr(AF_INET, local_address_, port);
bind_addr = uvgrtp::socket::create_sockaddr(AF_INET, local_address_, port);
if (uvgrtp::socket::is_multicast(bind_addr)) {
UVG_LOG_INFO("The used address %s is a multicast address", local_address_.c_str());
ret = soc->bind(bind_addr);
@ -149,7 +149,7 @@ rtp_error_t uvgrtp::socketfactory::bind_socket_anyip(std::shared_ptr<uvgrtp::soc
if (!is_port_in_use(port)) {
if (ipv6_) {
sockaddr_in6 bind_addr6 = soc->create_ip6_sockaddr_any(port);
sockaddr_in6 bind_addr6 = uvgrtp::socket::create_ip6_sockaddr_any(port);
ret = soc->bind_ip6(bind_addr6);
}
else {
@ -182,10 +182,11 @@ std::shared_ptr<uvgrtp::reception_flow> uvgrtp::socketfactory::get_reception_flo
return nullptr;
}
rtp_error_t uvgrtp::socketfactory::map_port_to_rtcp_reader(uint16_t port, std::shared_ptr <uvgrtp::rtcp_reader> reader)
std::shared_ptr<uvgrtp::rtcp_reader> uvgrtp::socketfactory::install_rtcp_reader(uint16_t port)
{
std::shared_ptr<uvgrtp::rtcp_reader> reader = std::shared_ptr<uvgrtp::rtcp_reader>(new uvgrtp::rtcp_reader());
rtcp_readers_to_ports_[reader] = port;
return RTP_OK;
return reader;
}
std::shared_ptr <uvgrtp::rtcp_reader> uvgrtp::socketfactory::get_rtcp_reader(uint16_t port)
@ -213,16 +214,25 @@ bool uvgrtp::socketfactory::is_port_in_use(uint16_t port) const
bool uvgrtp::socketfactory::clear_port(uint16_t port, std::shared_ptr<uvgrtp::socket> socket, std::shared_ptr<uvgrtp::reception_flow> flow)
{
if (used_ports_.find(port) != used_ports_.end()) {
if (port && used_ports_.find(port) != used_ports_.end()) {
used_ports_.erase(port);
}
for (auto& p : rtcp_readers_to_ports_) {
if (p.second == port) {
rtcp_readers_to_ports_.erase(p.first);
break;
}
}
auto it = std::find(used_sockets_.begin(), used_sockets_.end(), socket);
if (it != used_sockets_.end()) {
used_sockets_.erase(it);
}
if (reception_flows_.find(flow) != reception_flows_.end()) {
reception_flows_.erase(flow);
for (auto& p : reception_flows_) {
if (p.second == socket) {
reception_flows_.erase(p.first);
break;
}
}
return true;
}

View File

@ -14,6 +14,9 @@ namespace uvgrtp {
class reception_flow;
class rtcp_reader;
/* This class keeps track of all the sockets that uvgRTP is using.
* the "out" parameter because at that point it already contains all needed information. */
class socketfactory {
public:
@ -21,14 +24,54 @@ namespace uvgrtp {
~socketfactory();
rtp_error_t stop();
/* Set the local addres for socketfactory.
*
* Param local_addr local IPv4 or IPv6 address
* Return RTP OK on success */
rtp_error_t set_local_interface(std::string local_addr);
/* Create a new socket. Depending on if the local address was IPv4 or IPv6, the socket
* will use the correct IP version
*
* Return the created socket on success, nullptr otherwise */
std::shared_ptr<uvgrtp::socket> create_new_socket();
/* Bind socket to the local IP address and given port
*
* Param soc pointer to socket
* Param port port to bind into
* Return RTP OK on success */
rtp_error_t bind_socket(std::shared_ptr<uvgrtp::socket> soc, uint16_t port);
/* Bind socket any address and given port
*
* Param soc pointer to socket
* Param port port to bind into
* Return RTP OK on success */
rtp_error_t bind_socket_anyip(std::shared_ptr<uvgrtp::socket> soc, uint16_t port);
/* Get the socket bound to the given port
*
* Param port socket with wanted port
* Return pointer to socket on success, nullptr otherwise */
std::shared_ptr<uvgrtp::socket> get_socket_ptr(uint16_t port) const;
/* Get reception flow matching the given socket
*
* Param socket socket matching the wanted reception_flow
* Return pointer to reception_flow on success, nullptr otherwise */
std::shared_ptr<uvgrtp::reception_flow> get_reception_flow_ptr(std::shared_ptr<uvgrtp::socket> socket) const;
rtp_error_t map_port_to_rtcp_reader(uint16_t port, std::shared_ptr <uvgrtp::rtcp_reader> reader);
/* Install an RTCP reader and map it to the given port
*
* Param port port to map the created RTCP reader into
* Return pointer to created RTCP reader */
std::shared_ptr<uvgrtp::rtcp_reader> install_rtcp_reader(uint16_t port);
/* Get a pointer to the RTCP reader matching the given port
*
* Param port into which the wanted RTCP reader is mapped into
* Return pointer to RTCP reader */
std::shared_ptr <uvgrtp::rtcp_reader> get_rtcp_reader(uint16_t port);
/// \cond DO_NOT_DOCUMENT