Spawn thread for RTP packet dispatcher

This commit is contained in:
Aaro Altonen 2020-08-05 08:16:19 +03:00
parent 42aeccfaa1
commit 95ebd4289d
4 changed files with 21 additions and 7 deletions

View File

@ -212,6 +212,7 @@ namespace uvg_rtp {
/* RTP packet dispatcher for the receiver */
uvg_rtp::pkt_dispatcher *pkt_dispatcher_;
std::thread *dispatcher_thread_;
/* Media object associated with this media stream. */
uvg_rtp::formats::media *media_;

View File

@ -43,8 +43,10 @@ namespace uvg_rtp {
/* Return reference to the vector that holds all installed handlers */
std::vector<uvg_rtp::packet_handler>& get_handlers();
/* RTP packet dispatcher thread */
static void runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket *socket, int flags);
private:
static void runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket& socket, int flags);
uvg_rtp::socket socket_;
std::vector<packet_handler> packet_handlers_;

View File

@ -24,6 +24,7 @@ uvg_rtp::media_stream::media_stream(std::string addr, int src_port, int dst_port
media_config_(nullptr),
initialized_(false),
pkt_dispatcher_(nullptr),
dispatcher_thread_(nullptr),
media_(nullptr)
{
fmt_ = fmt;
@ -56,6 +57,7 @@ uvg_rtp::media_stream::~media_stream()
delete rtp_;
delete srtp_;
delete pkt_dispatcher_;
delete dispatcher_thread_;
delete media_;
}
@ -131,10 +133,16 @@ rtp_error_t uvg_rtp::media_stream::init()
delete pkt_dispatcher_;
return RTP_MEMORY_ERROR;
}
pkt_dispatcher_->install_handler(media_->packet_handler);
initialized_ = true;
initialized_ = !!(dispatcher_thread_ = new std::thread(
pkt_dispatcher_->runner,
pkt_dispatcher_,
&socket_,
ctx_config_.flags
)
);
return pkt_dispatcher_->start();
}

View File

@ -116,7 +116,7 @@ std::vector<uvg_rtp::packet_handler>& uvg_rtp::pkt_dispatcher::get_handlers()
*
* If a handler receives a non-null "out", it can safely ignore "packet" and operate just on
* the "out" parameter because at that point it already contains all needed information. */
static void runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket& socket, int flags)
static void runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket *socket, int flags)
{
int nread;
fd_set read_fds;
@ -132,9 +132,12 @@ static void runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket& socket,
const size_t recv_buffer_len = 8192;
uint8_t recv_buffer[recv_buffer_len] = { 0 };
while (!dispatcher->active())
;
while (dispatcher->active()) {
FD_SET(socket.get_raw_socket(), &read_fds);
int sret = ::select(socket.get_raw_socket() + 1, &read_fds, nullptr, nullptr, &t_val);
FD_SET(socket->get_raw_socket(), &read_fds);
int sret = ::select(socket->get_raw_socket() + 1, &read_fds, nullptr, nullptr, &t_val);
if (sret < 0) {
log_platform_error("select(2) failed");
@ -142,7 +145,7 @@ static void runner(uvg_rtp::pkt_dispatcher *dispatcher, uvg_rtp::socket& socket,
}
do {
if ((ret = socket.recvfrom(recv_buffer, recv_buffer_len, MSG_DONTWAIT, &nread)) == RTP_INTERRUPTED)
if ((ret = socket->recvfrom(recv_buffer, recv_buffer_len, MSG_DONTWAIT, &nread)) == RTP_INTERRUPTED)
break;
if (ret != RTP_OK) {