Exit from RTP packet dispatcher gracefully
Indicate to the main thread through a mutex that the packet dispatcher object can be destroyed. If this is not done, there's race condition that can result into a segmentation fault as the dispatcher object could be destroyed before it stops running.
This commit is contained in:
parent
bdfe027352
commit
eb33b7edf7
|
@ -35,7 +35,13 @@ namespace uvg_rtp {
|
||||||
* Return RTP_MEMORY_ERROR if allocation of a thread object fails */
|
* Return RTP_MEMORY_ERROR if allocation of a thread object fails */
|
||||||
rtp_error_t start(uvg_rtp::socket *socket, int flags);
|
rtp_error_t start(uvg_rtp::socket *socket, int flags);
|
||||||
|
|
||||||
/* Fetchj frame from the frame queue that contains all received frame.
|
/* Stop the RTP packet dispatcher and wait until the receive loop is exited
|
||||||
|
* to make sure that destroying the object in media_stream.cc is safe
|
||||||
|
*
|
||||||
|
* Return RTP_OK on success */
|
||||||
|
rtp_error_t stop();
|
||||||
|
|
||||||
|
/* Fetch frame from the frame queue that contains all received frame.
|
||||||
* pull_frame() will block until there is a frame that can be returned.
|
* pull_frame() will block until there is a frame that can be returned.
|
||||||
* If "timeout" is given, pull_frame() will block only for however long
|
* If "timeout" is given, pull_frame() will block only for however long
|
||||||
* that value tells it to.
|
* that value tells it to.
|
||||||
|
@ -53,7 +59,12 @@ namespace uvg_rtp {
|
||||||
void return_frame(uvg_rtp::frame::rtp_frame *frame);
|
void return_frame(uvg_rtp::frame::rtp_frame *frame);
|
||||||
|
|
||||||
/* RTP packet dispatcher thread */
|
/* RTP packet dispatcher thread */
|
||||||
static void runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket *socket, int flags);
|
static void runner(
|
||||||
|
uvg_rtp::pkt_dispatcher *dispatcher,
|
||||||
|
uvg_rtp::socket *socket,
|
||||||
|
int flags,
|
||||||
|
std::mutex *exit_mtx
|
||||||
|
);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<packet_handler> packet_handlers_;
|
std::vector<packet_handler> packet_handlers_;
|
||||||
|
@ -62,6 +73,7 @@ namespace uvg_rtp {
|
||||||
* and they can be retrieved using pull_frame() */
|
* and they can be retrieved using pull_frame() */
|
||||||
std::vector<uvg_rtp::frame::rtp_frame *> frames_;
|
std::vector<uvg_rtp::frame::rtp_frame *> frames_;
|
||||||
std::mutex frames_mtx_;
|
std::mutex frames_mtx_;
|
||||||
|
std::mutex exit_mtx_;
|
||||||
|
|
||||||
void *recv_hook_arg_;
|
void *recv_hook_arg_;
|
||||||
void (*recv_hook_)(void *arg, uvg_rtp::frame::rtp_frame *frame);
|
void (*recv_hook_)(void *arg, uvg_rtp::frame::rtp_frame *frame);
|
||||||
|
|
|
@ -50,8 +50,7 @@ uvg_rtp::media_stream::media_stream(
|
||||||
|
|
||||||
uvg_rtp::media_stream::~media_stream()
|
uvg_rtp::media_stream::~media_stream()
|
||||||
{
|
{
|
||||||
if (initialized_) {
|
pkt_dispatcher_->stop();
|
||||||
}
|
|
||||||
|
|
||||||
delete rtcp_;
|
delete rtcp_;
|
||||||
delete rtp_;
|
delete rtp_;
|
||||||
|
|
|
@ -23,13 +23,23 @@ uvg_rtp::pkt_dispatcher::~pkt_dispatcher()
|
||||||
|
|
||||||
rtp_error_t uvg_rtp::pkt_dispatcher::start(uvg_rtp::socket *socket, int flags)
|
rtp_error_t uvg_rtp::pkt_dispatcher::start(uvg_rtp::socket *socket, int flags)
|
||||||
{
|
{
|
||||||
if (!(runner_ = new std::thread(runner, this, socket, flags)))
|
if (!(runner_ = new std::thread(runner, this, socket, flags, &exit_mtx_)))
|
||||||
return RTP_MEMORY_ERROR;
|
return RTP_MEMORY_ERROR;
|
||||||
|
|
||||||
runner_->detach();
|
runner_->detach();
|
||||||
return uvg_rtp::runner::start();
|
return uvg_rtp::runner::start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rtp_error_t uvg_rtp::pkt_dispatcher::stop()
|
||||||
|
{
|
||||||
|
active_ = false;
|
||||||
|
|
||||||
|
while (!exit_mtx_.try_lock())
|
||||||
|
;
|
||||||
|
|
||||||
|
return RTP_OK;
|
||||||
|
}
|
||||||
|
|
||||||
rtp_error_t uvg_rtp::pkt_dispatcher::install_receive_hook(
|
rtp_error_t uvg_rtp::pkt_dispatcher::install_receive_hook(
|
||||||
void *arg,
|
void *arg,
|
||||||
void (*hook)(void *, uvg_rtp::frame::rtp_frame *)
|
void (*hook)(void *, uvg_rtp::frame::rtp_frame *)
|
||||||
|
@ -139,7 +149,12 @@ void uvg_rtp::pkt_dispatcher::return_frame(uvg_rtp::frame::rtp_frame *frame)
|
||||||
*
|
*
|
||||||
* If a handler receives a non-null "out", it can safely ignore "packet" and operate just on
|
* If a handler receives a non-null "out", it can safely ignore "packet" and operate just on
|
||||||
* the "out" parameter because at that point it already contains all needed information. */
|
* the "out" parameter because at that point it already contains all needed information. */
|
||||||
void uvg_rtp::pkt_dispatcher::runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket *socket, int flags)
|
void uvg_rtp::pkt_dispatcher::runner(
|
||||||
|
uvg_rtp::pkt_dispatcher *dispatcher,
|
||||||
|
uvg_rtp::socket *socket,
|
||||||
|
int flags,
|
||||||
|
std::mutex *exit_mtx
|
||||||
|
)
|
||||||
{
|
{
|
||||||
int nread;
|
int nread;
|
||||||
fd_set read_fds;
|
fd_set read_fds;
|
||||||
|
@ -158,6 +173,8 @@ void uvg_rtp::pkt_dispatcher::runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rt
|
||||||
while (!dispatcher->active())
|
while (!dispatcher->active())
|
||||||
;
|
;
|
||||||
|
|
||||||
|
exit_mtx->lock();
|
||||||
|
|
||||||
while (dispatcher->active()) {
|
while (dispatcher->active()) {
|
||||||
FD_SET(socket->get_raw_socket(), &read_fds);
|
FD_SET(socket->get_raw_socket(), &read_fds);
|
||||||
int sret = ::select(socket->get_raw_socket() + 1, &read_fds, nullptr, nullptr, &t_val);
|
int sret = ::select(socket->get_raw_socket() + 1, &read_fds, nullptr, nullptr, &t_val);
|
||||||
|
@ -205,4 +222,6 @@ void uvg_rtp::pkt_dispatcher::runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rt
|
||||||
}
|
}
|
||||||
} while (ret == RTP_OK);
|
} while (ret == RTP_OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exit_mtx->unlock();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue