multiplex: Improve thread safety in reception_flow

This commit is contained in:
Heikki Tampio 2023-06-28 11:08:23 +03:00
parent f21ac16d4d
commit 6bba9263f9
2 changed files with 21 additions and 11 deletions

View File

@ -247,9 +247,12 @@ uvgrtp::frame::rtp_frame* uvgrtp::reception_flow::pull_frame(std::shared_ptr<std
frames_mtx_.lock(); frames_mtx_.lock();
if (!frames_.empty()) { if (!frames_.empty()) {
frame = frames_.front(); frame = frames_.front();
if (frame->header.ssrc == remote_ssrc.get()->load()) { if ( frame && frame->header.ssrc == remote_ssrc.get()->load()) {
frames_.erase(frames_.begin()); frames_.erase(frames_.begin());
} }
else {
frame = nullptr;
}
} }
frames_mtx_.unlock(); frames_mtx_.unlock();
return frame; return frame;
@ -269,23 +272,25 @@ uvgrtp::frame::rtp_frame* uvgrtp::reception_flow::pull_frame(ssize_t timeout_ms,
if (should_stop_ || frames_.empty()) if (should_stop_ || frames_.empty())
return nullptr; return nullptr;
// Check if the source ssrc in the frame matches the remote ssrc that we want to pull frames from // Check if the source ssrc in the frame matches the remote ssrc that we want to pull frames from
bool found_frame = false; uvgrtp::frame::rtp_frame* frame = nullptr;
frames_mtx_.lock(); frames_mtx_.lock();
auto frame = frames_.front(); if (!frames_.empty()) {
if (frame->header.ssrc == remote_ssrc.get()->load()) { frame = frames_.front();
if (frame && frame->header.ssrc == remote_ssrc.get()->load()) {
frames_.pop_front(); frames_.pop_front();
found_frame = true; }
else {
frame = nullptr;
}
} }
frames_mtx_.unlock(); frames_mtx_.unlock();
if (found_frame) {
return frame; return frame;
}
return nullptr;
} }
rtp_error_t uvgrtp::reception_flow::new_install_handler(int type, std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc, rtp_error_t uvgrtp::reception_flow::new_install_handler(int type, std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc,
std::function<rtp_error_t(void*, int, uint8_t*, size_t, frame::rtp_frame** out)> handler, void* args) std::function<rtp_error_t(void*, int, uint8_t*, size_t, frame::rtp_frame** out)> handler, void* args)
{ {
handlers_mutex_.lock();
switch (type) { switch (type) {
case 1: { case 1: {
packet_handlers_[remote_ssrc].rtp.handler = handler; packet_handlers_[remote_ssrc].rtp.handler = handler;
@ -322,13 +327,16 @@ rtp_error_t uvgrtp::reception_flow::new_install_handler(int type, std::shared_pt
break; break;
} }
} }
handlers_mutex_.unlock();
return RTP_OK; return RTP_OK;
} }
rtp_error_t uvgrtp::reception_flow::new_install_getter(std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc, rtp_error_t uvgrtp::reception_flow::new_install_getter(std::shared_ptr<std::atomic<std::uint32_t>> remote_ssrc,
std::function<rtp_error_t(uvgrtp::frame::rtp_frame**)> getter) std::function<rtp_error_t(uvgrtp::frame::rtp_frame**)> getter)
{ {
handlers_mutex_.lock();
packet_handlers_[remote_ssrc].getter = getter; packet_handlers_[remote_ssrc].getter = getter;
handlers_mutex_.unlock();
return RTP_OK; return RTP_OK;
} }
@ -702,10 +710,11 @@ int uvgrtp::reception_flow::clear_stream_from_flow(std::shared_ptr<std::atomic<s
if (hooks_.find(remote_ssrc) != hooks_.end()) { if (hooks_.find(remote_ssrc) != hooks_.end()) {
hooks_.erase(remote_ssrc); hooks_.erase(remote_ssrc);
} }
handlers_mutex_.lock();
if (packet_handlers_.find(remote_ssrc) != packet_handlers_.end()) { if (packet_handlers_.find(remote_ssrc) != packet_handlers_.end()) {
packet_handlers_.erase(remote_ssrc); packet_handlers_.erase(remote_ssrc);
} }
handlers_mutex_.unlock();
// If all the data structures are empty, return 1 which means that there is no streams left for this reception_flow // If all the data structures are empty, return 1 which means that there is no streams left for this reception_flow
// and it can be safely deleted // and it can be safely deleted
if (hooks_.empty() && packet_handlers_.empty()) { if (hooks_.empty() && packet_handlers_.empty()) {

View File

@ -205,6 +205,7 @@ namespace uvgrtp {
std::map<std::shared_ptr<std::atomic<std::uint32_t>>, handler> packet_handlers_; std::map<std::shared_ptr<std::atomic<std::uint32_t>>, handler> packet_handlers_;
std::vector<Buffer> ring_buffer_; std::vector<Buffer> ring_buffer_;
std::mutex handlers_mutex_;
std::mutex ring_mutex_; std::mutex ring_mutex_;
// these uphold the ring buffer details // these uphold the ring buffer details
std::atomic<ssize_t> ring_read_index_; std::atomic<ssize_t> ring_read_index_;