From 6bba9263f9e2bea39803470d856fb7a14dac4b43 Mon Sep 17 00:00:00 2001 From: Heikki Tampio Date: Wed, 28 Jun 2023 11:08:23 +0300 Subject: [PATCH] multiplex: Improve thread safety in reception_flow --- src/reception_flow.cc | 31 ++++++++++++++++++++----------- src/reception_flow.hh | 1 + 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/reception_flow.cc b/src/reception_flow.cc index 73dacde..3fd00e4 100644 --- a/src/reception_flow.cc +++ b/src/reception_flow.cc @@ -247,9 +247,12 @@ uvgrtp::frame::rtp_frame* uvgrtp::reception_flow::pull_frame(std::shared_ptrheader.ssrc == remote_ssrc.get()->load()) { + if ( frame && frame->header.ssrc == remote_ssrc.get()->load()) { frames_.erase(frames_.begin()); } + else { + frame = nullptr; + } } frames_mtx_.unlock(); return frame; @@ -269,23 +272,25 @@ uvgrtp::frame::rtp_frame* uvgrtp::reception_flow::pull_frame(ssize_t timeout_ms, if (should_stop_ || frames_.empty()) return nullptr; // Check if the source ssrc in the frame matches the remote ssrc that we want to pull frames from - bool found_frame = false; + uvgrtp::frame::rtp_frame* frame = nullptr; frames_mtx_.lock(); - auto frame = frames_.front(); - if (frame->header.ssrc == remote_ssrc.get()->load()) { - frames_.pop_front(); - found_frame = true; + if (!frames_.empty()) { + frame = frames_.front(); + if (frame && frame->header.ssrc == remote_ssrc.get()->load()) { + frames_.pop_front(); + } + else { + frame = nullptr; + } } frames_mtx_.unlock(); - if (found_frame) { - return frame; - } - return nullptr; + return frame; } rtp_error_t uvgrtp::reception_flow::new_install_handler(int type, std::shared_ptr> remote_ssrc, std::function handler, void* args) { + handlers_mutex_.lock(); switch (type) { case 1: { packet_handlers_[remote_ssrc].rtp.handler = handler; @@ -322,13 +327,16 @@ rtp_error_t uvgrtp::reception_flow::new_install_handler(int type, std::shared_pt break; } } + handlers_mutex_.unlock(); return RTP_OK; } rtp_error_t uvgrtp::reception_flow::new_install_getter(std::shared_ptr> remote_ssrc, std::function getter) { + handlers_mutex_.lock(); packet_handlers_[remote_ssrc].getter = getter; + handlers_mutex_.unlock(); return RTP_OK; } @@ -702,10 +710,11 @@ int uvgrtp::reception_flow::clear_stream_from_flow(std::shared_ptr>, handler> packet_handlers_; std::vector ring_buffer_; + std::mutex handlers_mutex_; std::mutex ring_mutex_; // these uphold the ring buffer details std::atomic ring_read_index_;