From 519b535ecb3cf9f5534b82fab56a69d7132d0718 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joni=20R=C3=A4s=C3=A4nen?= Date: Fri, 13 May 2022 14:32:42 +0300 Subject: [PATCH] rtcp: Improve thread safety in rtcp --- include/uvgrtp/rtcp.hh | 21 +++-- src/rtcp.cc | 178 ++++++++++++++++++++++++++++++----------- 2 files changed, 146 insertions(+), 53 deletions(-) diff --git a/include/uvgrtp/rtcp.hh b/include/uvgrtp/rtcp.hh index 8c8f61a..c9e559f 100644 --- a/include/uvgrtp/rtcp.hh +++ b/include/uvgrtp/rtcp.hh @@ -12,6 +12,7 @@ #include #include #include +#include namespace uvgrtp { @@ -283,6 +284,9 @@ namespace uvgrtp { rtp_error_t install_app_hook(std::function)> app_handler); rtp_error_t install_app_hook(std::function)> app_handler); + + rtp_error_t remove_all_hooks(); + /// \cond DO_NOT_DOCUMENT /* Update RTCP-related sender statistics */ rtp_error_t update_sender_stats(size_t pkt_size); @@ -450,14 +454,19 @@ namespace uvgrtp { void (*sdes_hook_)(uvgrtp::frame::rtcp_sdes_packet *); void (*app_hook_)(uvgrtp::frame::rtcp_app_packet *); - std::function)> sr_hook_f_; - std::function)> sr_hook_u_; + std::function)> sr_hook_f_; + std::function)> sr_hook_u_; std::function)> rr_hook_f_; std::function)> rr_hook_u_; - std::function)> sdes_hook_f_; - std::function)> sdes_hook_u_; - std::function)> app_hook_f_; - std::function)> app_hook_u_; + std::function)> sdes_hook_f_; + std::function)> sdes_hook_u_; + std::function)> app_hook_f_; + std::function)> app_hook_u_; + + std::mutex sr_mutex_; + std::mutex rr_mutex_; + std::mutex sdes_mutex_; + std::mutex app_mutex_; std::unique_ptr report_generator_; diff --git a/src/rtcp.cc b/src/rtcp.cc index 2ade2cd..25e8a5d 100644 --- a/src/rtcp.cc +++ b/src/rtcp.cc @@ -46,9 +46,13 @@ uvgrtp::rtcp::rtcp(std::shared_ptr rtp, int flags): sdes_hook_(nullptr), app_hook_(nullptr), sr_hook_f_(nullptr), + sr_hook_u_(nullptr), rr_hook_f_(nullptr), + rr_hook_u_(nullptr), sdes_hook_f_(nullptr), + sdes_hook_u_(nullptr), app_hook_f_(nullptr), + app_hook_u_(nullptr), active_(false) { ssrc_ = rtp->get_ssrc(); @@ -306,6 +310,34 @@ rtp_error_t uvgrtp::rtcp::add_participant(uint32_t ssrc) return RTP_OK; } +rtp_error_t uvgrtp::rtcp::remove_all_hooks() +{ + sr_mutex_.lock(); + sender_hook_ = nullptr; + sr_hook_f_ = nullptr; + sr_hook_u_ = nullptr; + sr_mutex_.unlock(); + + rr_mutex_.lock(); + receiver_hook_ = nullptr; + rr_hook_f_ = nullptr; + rr_hook_u_ = nullptr; + rr_mutex_.unlock(); + + sdes_mutex_.lock(); + sdes_hook_ = nullptr; + sdes_hook_f_ = nullptr; + sdes_hook_u_ = nullptr; + sdes_mutex_.unlock(); + + app_mutex_.lock(); + app_hook_ = nullptr; + app_hook_f_ = nullptr; + app_hook_u_ = nullptr; + app_mutex_.unlock(); + return RTP_OK; +} + rtp_error_t uvgrtp::rtcp::install_sender_hook(void (*hook)(uvgrtp::frame::rtcp_sender_report*)) { if (!hook) @@ -313,9 +345,12 @@ rtp_error_t uvgrtp::rtcp::install_sender_hook(void (*hook)(uvgrtp::frame::rtcp_s return RTP_INVALID_VALUE; } + sr_mutex_.lock(); sender_hook_ = hook; - sr_hook_f_ = nullptr; - sr_hook_u_ = nullptr; + sr_hook_f_ = nullptr; + sr_hook_u_ = nullptr; + sr_mutex_.unlock(); + return RTP_OK; } @@ -326,9 +361,12 @@ rtp_error_t uvgrtp::rtcp::install_sender_hook(std::functionsr_frame; participants_[ssrc]->sr_frame = nullptr; + sr_mutex_.unlock(); return frame; } @@ -482,8 +552,10 @@ uvgrtp::frame::rtcp_receiver_report* uvgrtp::rtcp::get_receiver_packet(uint32_t return nullptr; } + rr_mutex_.lock(); auto frame = participants_[ssrc]->rr_frame; participants_[ssrc]->rr_frame = nullptr; + rr_mutex_.unlock(); return frame; } @@ -495,8 +567,10 @@ uvgrtp::frame::rtcp_sdes_packet* uvgrtp::rtcp::get_sdes_packet(uint32_t ssrc) return nullptr; } + sdes_mutex_.lock(); auto frame = participants_[ssrc]->sdes_frame; participants_[ssrc]->sdes_frame = nullptr; + sdes_mutex_.unlock(); return frame; } @@ -508,8 +582,10 @@ uvgrtp::frame::rtcp_app_packet* uvgrtp::rtcp::get_app_packet(uint32_t ssrc) return nullptr; } + app_mutex_.lock(); auto frame = participants_[ssrc]->app_frame; participants_[ssrc]->app_frame = nullptr; + app_mutex_.unlock(); return frame; } @@ -945,16 +1021,6 @@ rtp_error_t uvgrtp::rtcp::handle_sdes_packet(uint8_t* packet, size_t size, return ret; } - /* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */ - if (participants_[frame->ssrc]->sdes_frame) - { - for (auto& item : participants_[frame->ssrc]->sdes_frame->items) - { - delete[](uint8_t*)item.data; - } - delete participants_[frame->ssrc]->sdes_frame; - } - for (int ptr = 8; ptr < frame->header.length; ) { uvgrtp::frame::rtcp_sdes_item item; @@ -967,6 +1033,7 @@ rtp_error_t uvgrtp::rtcp::handle_sdes_packet(uint8_t* packet, size_t size, ptr += item.length; // TODO: Clang warning here } + sdes_mutex_.lock(); if (sdes_hook_) { sdes_hook_(frame); } else if (sdes_hook_f_) { @@ -974,8 +1041,19 @@ rtp_error_t uvgrtp::rtcp::handle_sdes_packet(uint8_t* packet, size_t size, } else if (sdes_hook_u_) { sdes_hook_u_(std::unique_ptr(frame)); } else { + /* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */ + if (participants_[frame->ssrc]->sdes_frame) + { + for (auto& item : participants_[frame->ssrc]->sdes_frame->items) + { + delete[](uint8_t*)item.data; + } + delete participants_[frame->ssrc]->sdes_frame; + } + participants_[frame->ssrc]->sdes_frame = frame; } + sdes_mutex_.unlock(); return RTP_OK; } @@ -1031,18 +1109,13 @@ rtp_error_t uvgrtp::rtcp::handle_app_packet(uint8_t* packet, size_t size, add_participant(frame->ssrc); } - if (participants_[frame->ssrc]->app_frame) - { - delete[] participants_[frame->ssrc]->app_frame->payload; - delete participants_[frame->ssrc]->app_frame; - } - frame->payload = new uint8_t[frame->header.length]; memcpy(frame->name, &packet[RTCP_HEADER_SIZE + SSRC_CSRC_SIZE], APP_NAME_SIZE); memcpy(frame->payload, &packet[RTCP_HEADER_SIZE + SSRC_CSRC_SIZE + APP_NAME_SIZE], frame->header.length - RTCP_HEADER_SIZE + SSRC_CSRC_SIZE + APP_NAME_SIZE); + app_mutex_.lock(); if (app_hook_) { app_hook_(frame); } else if (app_hook_f_) { @@ -1050,8 +1123,16 @@ rtp_error_t uvgrtp::rtcp::handle_app_packet(uint8_t* packet, size_t size, } else if (app_hook_u_) { app_hook_u_(std::unique_ptr(frame)); } else { + + if (participants_[frame->ssrc]->app_frame) + { + delete[] participants_[frame->ssrc]->app_frame->payload; + delete participants_[frame->ssrc]->app_frame; + } + participants_[frame->ssrc]->app_frame = frame; } + app_mutex_.unlock(); return RTP_OK; } @@ -1092,15 +1173,9 @@ rtp_error_t uvgrtp::rtcp::handle_receiver_report_packet(uint8_t* packet, size_t return RTP_INVALID_VALUE; } - /* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */ - if (participants_[frame->ssrc]->rr_frame) - { - delete participants_[frame->ssrc]->rr_frame; - participants_[frame->ssrc]->rr_frame = nullptr; - } - read_reports(packet, size, frame->header.count, false, frame->report_blocks); + rr_mutex_.lock(); if (receiver_hook_) { receiver_hook_(frame); } else if (rr_hook_f_) { @@ -1108,8 +1183,15 @@ rtp_error_t uvgrtp::rtcp::handle_receiver_report_packet(uint8_t* packet, size_t } else if (rr_hook_u_) { rr_hook_u_(std::unique_ptr(frame)); } else { + /* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */ + if (participants_[frame->ssrc]->rr_frame) + { + delete participants_[frame->ssrc]->rr_frame; + } + participants_[frame->ssrc]->rr_frame = frame; } + rr_mutex_.unlock(); return RTP_OK; } @@ -1139,12 +1221,6 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* packet, size_t si add_participant(frame->ssrc); } - /* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */ - if (participants_[frame->ssrc]->sr_frame) - { - delete participants_[frame->ssrc]->sr_frame; - } - frame->sender_info.ntp_msw = ntohl(*(uint32_t*)&packet[8]); frame->sender_info.ntp_lsw = ntohl(*(uint32_t*)&packet[12]); frame->sender_info.rtp_ts = ntohl(*(uint32_t*)&packet[16]); @@ -1158,6 +1234,7 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* packet, size_t si read_reports(packet, size, frame->header.count, true, frame->report_blocks); + sr_mutex_.lock(); if (sender_hook_) { sender_hook_(frame); } else if (sr_hook_f_) { @@ -1165,8 +1242,15 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* packet, size_t si } else if (sr_hook_u_) { sr_hook_u_(std::unique_ptr(frame)); } else { + /* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */ + if (participants_[frame->ssrc]->sr_frame) + { + delete participants_[frame->ssrc]->sr_frame; + } + participants_[frame->ssrc]->sr_frame = frame; } + sr_mutex_.unlock(); return RTP_OK; }