rtcp: Improve thread safety in rtcp

This commit is contained in:
Joni Räsänen 2022-05-13 14:32:42 +03:00
parent a4b45ecd5f
commit 519b535ecb
2 changed files with 146 additions and 53 deletions

View File

@ -12,6 +12,7 @@
#include <vector>
#include <functional>
#include <memory>
#include <mutex>
namespace uvgrtp {
@ -283,6 +284,9 @@ namespace uvgrtp {
rtp_error_t install_app_hook(std::function<void(std::shared_ptr<uvgrtp::frame::rtcp_app_packet>)> app_handler);
rtp_error_t install_app_hook(std::function<void(std::unique_ptr<uvgrtp::frame::rtcp_app_packet>)> app_handler);
rtp_error_t remove_all_hooks();
/// \cond DO_NOT_DOCUMENT
/* Update RTCP-related sender statistics */
rtp_error_t update_sender_stats(size_t pkt_size);
@ -459,6 +463,11 @@ namespace uvgrtp {
std::function<void(std::shared_ptr<uvgrtp::frame::rtcp_app_packet>)> app_hook_f_;
std::function<void(std::unique_ptr<uvgrtp::frame::rtcp_app_packet>)> app_hook_u_;
std::mutex sr_mutex_;
std::mutex rr_mutex_;
std::mutex sdes_mutex_;
std::mutex app_mutex_;
std::unique_ptr<std::thread> report_generator_;
bool is_active() const

View File

@ -46,9 +46,13 @@ uvgrtp::rtcp::rtcp(std::shared_ptr<uvgrtp::rtp> rtp, int flags):
sdes_hook_(nullptr),
app_hook_(nullptr),
sr_hook_f_(nullptr),
sr_hook_u_(nullptr),
rr_hook_f_(nullptr),
rr_hook_u_(nullptr),
sdes_hook_f_(nullptr),
sdes_hook_u_(nullptr),
app_hook_f_(nullptr),
app_hook_u_(nullptr),
active_(false)
{
ssrc_ = rtp->get_ssrc();
@ -306,6 +310,34 @@ rtp_error_t uvgrtp::rtcp::add_participant(uint32_t ssrc)
return RTP_OK;
}
rtp_error_t uvgrtp::rtcp::remove_all_hooks()
{
sr_mutex_.lock();
sender_hook_ = nullptr;
sr_hook_f_ = nullptr;
sr_hook_u_ = nullptr;
sr_mutex_.unlock();
rr_mutex_.lock();
receiver_hook_ = nullptr;
rr_hook_f_ = nullptr;
rr_hook_u_ = nullptr;
rr_mutex_.unlock();
sdes_mutex_.lock();
sdes_hook_ = nullptr;
sdes_hook_f_ = nullptr;
sdes_hook_u_ = nullptr;
sdes_mutex_.unlock();
app_mutex_.lock();
app_hook_ = nullptr;
app_hook_f_ = nullptr;
app_hook_u_ = nullptr;
app_mutex_.unlock();
return RTP_OK;
}
rtp_error_t uvgrtp::rtcp::install_sender_hook(void (*hook)(uvgrtp::frame::rtcp_sender_report*))
{
if (!hook)
@ -313,9 +345,12 @@ rtp_error_t uvgrtp::rtcp::install_sender_hook(void (*hook)(uvgrtp::frame::rtcp_s
return RTP_INVALID_VALUE;
}
sr_mutex_.lock();
sender_hook_ = hook;
sr_hook_f_ = nullptr;
sr_hook_u_ = nullptr;
sr_mutex_.unlock();
return RTP_OK;
}
@ -326,9 +361,12 @@ rtp_error_t uvgrtp::rtcp::install_sender_hook(std::function<void(std::shared_ptr
return RTP_INVALID_VALUE;
}
sr_mutex_.lock();
sender_hook_ = nullptr;
sr_hook_f_ = sr_handler;
sr_hook_u_ = nullptr;
sr_mutex_.unlock();
return RTP_OK;
}
@ -339,9 +377,12 @@ rtp_error_t uvgrtp::rtcp::install_sender_hook(std::function<void(std::unique_ptr
return RTP_INVALID_VALUE;
}
sr_mutex_.lock();
sender_hook_ = nullptr;
sr_hook_f_ = nullptr;
sr_hook_u_ = sr_handler;
sr_mutex_.unlock();
return RTP_OK;
}
@ -352,9 +393,12 @@ rtp_error_t uvgrtp::rtcp::install_receiver_hook(void (*hook)(uvgrtp::frame::rtcp
return RTP_INVALID_VALUE;
}
rr_mutex_.lock();
receiver_hook_ = hook;
rr_hook_f_ = nullptr;
rr_hook_u_ = nullptr;
rr_mutex_.unlock();
return RTP_OK;
}
@ -365,9 +409,12 @@ rtp_error_t uvgrtp::rtcp::install_receiver_hook(std::function<void(std::shared_p
return RTP_INVALID_VALUE;
}
rr_mutex_.lock();
receiver_hook_ = nullptr;
rr_hook_f_ = rr_handler;
rr_hook_u_ = nullptr;
rr_mutex_.unlock();
return RTP_OK;
}
@ -378,9 +425,12 @@ rtp_error_t uvgrtp::rtcp::install_receiver_hook(std::function<void(std::unique_p
return RTP_INVALID_VALUE;
}
rr_mutex_.lock();
receiver_hook_ = nullptr;
rr_hook_f_ = nullptr;
rr_hook_u_ = rr_handler;
rr_mutex_.unlock();
return RTP_OK;
}
@ -391,9 +441,12 @@ rtp_error_t uvgrtp::rtcp::install_sdes_hook(void (*hook)(uvgrtp::frame::rtcp_sde
return RTP_INVALID_VALUE;
}
sdes_mutex_.lock();
sdes_hook_ = hook;
sdes_hook_f_ = nullptr;
sdes_hook_u_ = nullptr;
sdes_mutex_.unlock();
return RTP_OK;
}
@ -404,9 +457,12 @@ rtp_error_t uvgrtp::rtcp::install_sdes_hook(std::function<void(std::shared_ptr<u
return RTP_INVALID_VALUE;
}
sdes_mutex_.lock();
sdes_hook_ = nullptr;
sdes_hook_f_ = sdes_handler;
sdes_hook_u_ = nullptr;
sdes_mutex_.unlock();
return RTP_OK;
}
@ -417,9 +473,12 @@ rtp_error_t uvgrtp::rtcp::install_sdes_hook(std::function<void(std::unique_ptr<u
return RTP_INVALID_VALUE;
}
sdes_mutex_.lock();
sdes_hook_ = nullptr;
sdes_hook_f_ = nullptr;
sdes_hook_u_ = sdes_handler;
sdes_mutex_.unlock();
return RTP_OK;
}
@ -430,9 +489,12 @@ rtp_error_t uvgrtp::rtcp::install_app_hook(void (*hook)(uvgrtp::frame::rtcp_app_
return RTP_INVALID_VALUE;
}
app_mutex_.lock();
app_hook_ = hook;
app_hook_f_ = nullptr;
app_hook_u_ = nullptr;
app_mutex_.unlock();
return RTP_OK;
}
@ -443,9 +505,12 @@ rtp_error_t uvgrtp::rtcp::install_app_hook(std::function<void(std::shared_ptr<uv
return RTP_INVALID_VALUE;
}
app_mutex_.lock();
app_hook_ = nullptr;
app_hook_f_ = app_handler;
app_hook_u_ = nullptr;
app_mutex_.unlock();
return RTP_OK;
}
@ -456,9 +521,12 @@ rtp_error_t uvgrtp::rtcp::install_app_hook(std::function<void(std::unique_ptr<uv
return RTP_INVALID_VALUE;
}
app_mutex_.lock();
app_hook_ = nullptr;
app_hook_f_ = nullptr;
app_hook_u_ = app_handler;
app_mutex_.unlock();
return RTP_OK;
}
@ -469,8 +537,10 @@ uvgrtp::frame::rtcp_sender_report* uvgrtp::rtcp::get_sender_packet(uint32_t ssrc
return nullptr;
}
sr_mutex_.lock();
auto frame = participants_[ssrc]->sr_frame;
participants_[ssrc]->sr_frame = nullptr;
sr_mutex_.unlock();
return frame;
}
@ -482,8 +552,10 @@ uvgrtp::frame::rtcp_receiver_report* uvgrtp::rtcp::get_receiver_packet(uint32_t
return nullptr;
}
rr_mutex_.lock();
auto frame = participants_[ssrc]->rr_frame;
participants_[ssrc]->rr_frame = nullptr;
rr_mutex_.unlock();
return frame;
}
@ -495,8 +567,10 @@ uvgrtp::frame::rtcp_sdes_packet* uvgrtp::rtcp::get_sdes_packet(uint32_t ssrc)
return nullptr;
}
sdes_mutex_.lock();
auto frame = participants_[ssrc]->sdes_frame;
participants_[ssrc]->sdes_frame = nullptr;
sdes_mutex_.unlock();
return frame;
}
@ -508,8 +582,10 @@ uvgrtp::frame::rtcp_app_packet* uvgrtp::rtcp::get_app_packet(uint32_t ssrc)
return nullptr;
}
app_mutex_.lock();
auto frame = participants_[ssrc]->app_frame;
participants_[ssrc]->app_frame = nullptr;
app_mutex_.unlock();
return frame;
}
@ -945,16 +1021,6 @@ rtp_error_t uvgrtp::rtcp::handle_sdes_packet(uint8_t* packet, size_t size,
return ret;
}
/* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */
if (participants_[frame->ssrc]->sdes_frame)
{
for (auto& item : participants_[frame->ssrc]->sdes_frame->items)
{
delete[](uint8_t*)item.data;
}
delete participants_[frame->ssrc]->sdes_frame;
}
for (int ptr = 8; ptr < frame->header.length; )
{
uvgrtp::frame::rtcp_sdes_item item;
@ -967,6 +1033,7 @@ rtp_error_t uvgrtp::rtcp::handle_sdes_packet(uint8_t* packet, size_t size,
ptr += item.length; // TODO: Clang warning here
}
sdes_mutex_.lock();
if (sdes_hook_) {
sdes_hook_(frame);
} else if (sdes_hook_f_) {
@ -974,8 +1041,19 @@ rtp_error_t uvgrtp::rtcp::handle_sdes_packet(uint8_t* packet, size_t size,
} else if (sdes_hook_u_) {
sdes_hook_u_(std::unique_ptr<uvgrtp::frame::rtcp_sdes_packet>(frame));
} else {
/* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */
if (participants_[frame->ssrc]->sdes_frame)
{
for (auto& item : participants_[frame->ssrc]->sdes_frame->items)
{
delete[](uint8_t*)item.data;
}
delete participants_[frame->ssrc]->sdes_frame;
}
participants_[frame->ssrc]->sdes_frame = frame;
}
sdes_mutex_.unlock();
return RTP_OK;
}
@ -1031,18 +1109,13 @@ rtp_error_t uvgrtp::rtcp::handle_app_packet(uint8_t* packet, size_t size,
add_participant(frame->ssrc);
}
if (participants_[frame->ssrc]->app_frame)
{
delete[] participants_[frame->ssrc]->app_frame->payload;
delete participants_[frame->ssrc]->app_frame;
}
frame->payload = new uint8_t[frame->header.length];
memcpy(frame->name, &packet[RTCP_HEADER_SIZE + SSRC_CSRC_SIZE], APP_NAME_SIZE);
memcpy(frame->payload, &packet[RTCP_HEADER_SIZE + SSRC_CSRC_SIZE + APP_NAME_SIZE],
frame->header.length - RTCP_HEADER_SIZE + SSRC_CSRC_SIZE + APP_NAME_SIZE);
app_mutex_.lock();
if (app_hook_) {
app_hook_(frame);
} else if (app_hook_f_) {
@ -1050,8 +1123,16 @@ rtp_error_t uvgrtp::rtcp::handle_app_packet(uint8_t* packet, size_t size,
} else if (app_hook_u_) {
app_hook_u_(std::unique_ptr<uvgrtp::frame::rtcp_app_packet>(frame));
} else {
if (participants_[frame->ssrc]->app_frame)
{
delete[] participants_[frame->ssrc]->app_frame->payload;
delete participants_[frame->ssrc]->app_frame;
}
participants_[frame->ssrc]->app_frame = frame;
}
app_mutex_.unlock();
return RTP_OK;
}
@ -1092,15 +1173,9 @@ rtp_error_t uvgrtp::rtcp::handle_receiver_report_packet(uint8_t* packet, size_t
return RTP_INVALID_VALUE;
}
/* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */
if (participants_[frame->ssrc]->rr_frame)
{
delete participants_[frame->ssrc]->rr_frame;
participants_[frame->ssrc]->rr_frame = nullptr;
}
read_reports(packet, size, frame->header.count, false, frame->report_blocks);
rr_mutex_.lock();
if (receiver_hook_) {
receiver_hook_(frame);
} else if (rr_hook_f_) {
@ -1108,8 +1183,15 @@ rtp_error_t uvgrtp::rtcp::handle_receiver_report_packet(uint8_t* packet, size_t
} else if (rr_hook_u_) {
rr_hook_u_(std::unique_ptr<uvgrtp::frame::rtcp_receiver_report>(frame));
} else {
/* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */
if (participants_[frame->ssrc]->rr_frame)
{
delete participants_[frame->ssrc]->rr_frame;
}
participants_[frame->ssrc]->rr_frame = frame;
}
rr_mutex_.unlock();
return RTP_OK;
}
@ -1139,12 +1221,6 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* packet, size_t si
add_participant(frame->ssrc);
}
/* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */
if (participants_[frame->ssrc]->sr_frame)
{
delete participants_[frame->ssrc]->sr_frame;
}
frame->sender_info.ntp_msw = ntohl(*(uint32_t*)&packet[8]);
frame->sender_info.ntp_lsw = ntohl(*(uint32_t*)&packet[12]);
frame->sender_info.rtp_ts = ntohl(*(uint32_t*)&packet[16]);
@ -1158,6 +1234,7 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* packet, size_t si
read_reports(packet, size, frame->header.count, true, frame->report_blocks);
sr_mutex_.lock();
if (sender_hook_) {
sender_hook_(frame);
} else if (sr_hook_f_) {
@ -1165,8 +1242,15 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* packet, size_t si
} else if (sr_hook_u_) {
sr_hook_u_(std::unique_ptr<uvgrtp::frame::rtcp_sender_report>(frame));
} else {
/* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */
if (participants_[frame->ssrc]->sr_frame)
{
delete participants_[frame->ssrc]->sr_frame;
}
participants_[frame->ssrc]->sr_frame = frame;
}
sr_mutex_.unlock();
return RTP_OK;
}