Pass socket and RTP context to frame queue

Only RTP context and socketa are needed for the operations performed
by the frame queue so passing them to a constructor is much simpler
than passing sender to every function call and accessing the
context/socket through that.
This commit is contained in:
Aaro Altonen 2020-08-02 13:19:07 +03:00
parent 3312a0b559
commit e2c4006972
2 changed files with 58 additions and 55 deletions

View File

@ -7,7 +7,8 @@
#include "dispatch.hh"
#include "frame.hh"
#include "sender.hh"
#include "rtp.hh"
#include "socket.hh"
#include "util.hh"
#if defined(_MSC_VER)
@ -20,7 +21,6 @@ const int MAX_CHUNK_COUNT = 4;
namespace uvg_rtp {
class sender;
class dispatcher;
class frame_queue;
@ -96,13 +96,14 @@ namespace uvg_rtp {
class frame_queue {
public:
frame_queue(rtp_format_t fmt);
frame_queue(rtp_format_t fmt, uvg_rtp::dispatcher *dispatcher);
/* frame_queue(rtp_format_t fmt); */
/* frame_queue(rtp_format_t fmt, uvg_rtp::dispatcher *dispatcher); */
frame_queue(uvg_rtp::socket *socket, uvg_rtp::rtp *rtp);
~frame_queue();
rtp_error_t init_transaction(uvg_rtp::sender *sender);
rtp_error_t init_transaction(uvg_rtp::sender *sender, uint8_t *data);
rtp_error_t init_transaction(uvg_rtp::sender *sender, std::unique_ptr<uint8_t[]> data);
rtp_error_t init_transaction();
rtp_error_t init_transaction(uint8_t *data);
rtp_error_t init_transaction(std::unique_ptr<uint8_t[]> data);
/* If there are less than "MAX_QUEUED_MSGS" in the "free_" vector,
* the transaction is moved there, otherwise it's destroyed
@ -126,27 +127,21 @@ namespace uvg_rtp {
* Return RTP_OK on success
* Return RTP_INVALID_VALUE if one of the parameters is invalid
* Return RTP_MEMORY_ERROR if the maximum amount of chunks/messages is exceeded */
rtp_error_t enqueue_message(
uvg_rtp::sender *sender,
uint8_t *message, size_t message_len
);
rtp_error_t enqueue_message(uint8_t *message, size_t message_len);
/* Cache all messages in "buffers" in order to frame queue
*
* Return RTP_OK on success
* Return RTP_INVALID_VALUE if one of the parameters is invalid
* Return RTP_MEMORY_ERROR if the maximum amount of chunks/messages is exceeded */
rtp_error_t enqueue_message(
uvg_rtp::sender *sender,
buff_vec& buffers
);
rtp_error_t enqueue_message(buff_vec& buffers);
/* Flush the message queue
*
* Return RTP_OK on success
* Return RTP_INVALID_VALUE if "sender" is nullptr or message buffer is empty
* return RTP_SEND_ERROR if send fails */
rtp_error_t flush_queue(uvg_rtp::sender *sender);
rtp_error_t flush_queue();
/* Media may have extra headers (f.ex. NAL and FU headers for HEVC).
* These headers must be valid until the message is sent (ie. they cannot be saved to
@ -168,7 +163,7 @@ namespace uvg_rtp {
void *get_media_headers();
/* Update the active task's current packet's sequence number */
void update_rtp_header(uvg_rtp::sender *sender);
void update_rtp_header();
/* Because frame queue supports both raw and smart pointers and the smart pointer ownership
* is transferred to active transaction, the code that created the transaction must query
@ -215,5 +210,8 @@ namespace uvg_rtp {
ssize_t max_queued_; /* number of queued transactions */
ssize_t max_mcount_; /* number of messages per transactions */
ssize_t max_ccount_; /* number of chunks per message */
uvg_rtp::rtp *rtp_;
uvg_rtp::socket *socket_;
};
};

View File

@ -15,8 +15,25 @@
#include "formats/hevc.hh"
uvg_rtp::frame_queue::frame_queue(rtp_format_t fmt):
active_(nullptr), fmt_(fmt), dealloc_hook_(nullptr)
/* uvg_rtp::frame_queue::frame_queue(rtp_format_t fmt): */
/* active_(nullptr), fmt_(fmt), dealloc_hook_(nullptr) */
/* { */
/* active_ = nullptr; */
/* dispatcher_ = nullptr; */
/* max_queued_ = MAX_QUEUED_MSGS; */
/* max_mcount_ = MAX_MSG_COUNT; */
/* max_ccount_ = MAX_CHUNK_COUNT * max_mcount_; */
/* } */
/* uvg_rtp::frame_queue::frame_queue(rtp_format_t fmt, uvg_rtp::dispatcher *dispatcher): */
/* frame_queue(fmt) */
/* { */
/* dispatcher_ = dispatcher; */
/* } */
uvg_rtp::frame_queue::frame_queue(uvg_rtp::socket *socket, uvg_rtp::rtp *rtp):
rtp_(rtp), socket_(socket)
{
active_ = nullptr;
dispatcher_ = nullptr;
@ -26,12 +43,6 @@ uvg_rtp::frame_queue::frame_queue(rtp_format_t fmt):
max_ccount_ = MAX_CHUNK_COUNT * max_mcount_;
}
uvg_rtp::frame_queue::frame_queue(rtp_format_t fmt, uvg_rtp::dispatcher *dispatcher):
frame_queue(fmt)
{
dispatcher_ = dispatcher;
}
uvg_rtp::frame_queue::~frame_queue()
{
for (auto& i : free_) {
@ -43,7 +54,7 @@ uvg_rtp::frame_queue::~frame_queue()
(void)destroy_transaction(active_);
}
rtp_error_t uvg_rtp::frame_queue::init_transaction(uvg_rtp::sender *sender)
rtp_error_t uvg_rtp::frame_queue::init_transaction()
{
std::lock_guard<std::mutex> lock(transaction_mtx_);
@ -85,19 +96,19 @@ rtp_error_t uvg_rtp::frame_queue::init_transaction(uvg_rtp::sender *sender)
active_->data_smart = nullptr;
active_->dealloc_hook = dealloc_hook_;
active_->out_addr = sender->get_socket().get_out_address();
sender->get_rtp_ctx()->fill_header((uint8_t *)&active_->rtp_common);
active_->out_addr = socket_->get_out_address();
rtp_->fill_header((uint8_t *)&active_->rtp_common);
active_->buffers.clear();
return RTP_OK;
}
rtp_error_t uvg_rtp::frame_queue::init_transaction(uvg_rtp::sender *sender, uint8_t *data)
rtp_error_t uvg_rtp::frame_queue::init_transaction(uint8_t *data)
{
if (!sender || !data)
if (!data)
return RTP_INVALID_VALUE;
if (init_transaction(sender) != RTP_OK) {
if (init_transaction() != RTP_OK) {
LOG_ERROR("Failed to initialize transaction");
return RTP_GENERIC_ERROR;
}
@ -108,12 +119,12 @@ rtp_error_t uvg_rtp::frame_queue::init_transaction(uvg_rtp::sender *sender, uint
return RTP_OK;
}
rtp_error_t uvg_rtp::frame_queue::init_transaction(uvg_rtp::sender *sender, std::unique_ptr<uint8_t[]> data)
rtp_error_t uvg_rtp::frame_queue::init_transaction(std::unique_ptr<uint8_t[]> data)
{
if (!sender || !data)
if (!data)
return RTP_INVALID_VALUE;
if (init_transaction(sender) != RTP_OK) {
if (init_transaction() != RTP_OK) {
LOG_ERROR("Failed to initialize transaction");
return RTP_GENERIC_ERROR;
}
@ -214,12 +225,9 @@ rtp_error_t uvg_rtp::frame_queue::deinit_transaction()
return uvg_rtp::frame_queue::deinit_transaction(active_->key);
}
rtp_error_t uvg_rtp::frame_queue::enqueue_message(
uvg_rtp::sender *sender,
uint8_t *message, size_t message_len
)
rtp_error_t uvg_rtp::frame_queue::enqueue_message(uint8_t *message, size_t message_len)
{
if (!sender || !message || message_len == 0)
if (!message || !message_len)
return RTP_INVALID_VALUE;
#ifdef __linux__
@ -229,7 +237,7 @@ rtp_error_t uvg_rtp::frame_queue::enqueue_message(
}
/* update the RTP header at "rtpheaders_ptr_" */
uvg_rtp::frame_queue::update_rtp_header(sender);
uvg_rtp::frame_queue::update_rtp_header();
active_->chunks[active_->chunk_ptr + 0].iov_base = &active_->rtp_headers[active_->rtphdr_ptr];
active_->chunks[active_->chunk_ptr + 0].iov_len = sizeof(active_->rtp_headers[active_->rtphdr_ptr]);
@ -251,18 +259,15 @@ rtp_error_t uvg_rtp::frame_queue::enqueue_message(
/* TODO: winsock stuff */
#endif
sender->get_rtp_ctx()->inc_sequence();
sender->get_rtp_ctx()->inc_sent_pkts();
rtp_->inc_sequence();
rtp_->inc_sent_pkts();
return RTP_OK;
}
rtp_error_t uvg_rtp::frame_queue::enqueue_message(
uvg_rtp::sender *sender,
std::vector<std::pair<size_t, uint8_t *>>& buffers
)
rtp_error_t uvg_rtp::frame_queue::enqueue_message(std::vector<std::pair<size_t, uint8_t *>>& buffers)
{
if (!sender || buffers.size() == 0)
if (!buffers.size())
return RTP_INVALID_VALUE;
#ifdef __linux__
@ -272,7 +277,7 @@ rtp_error_t uvg_rtp::frame_queue::enqueue_message(
}
/* update the RTP header at "rtpheaders_ptr_" */
uvg_rtp::frame_queue::update_rtp_header(sender);
uvg_rtp::frame_queue::update_rtp_header();
active_->chunks[active_->chunk_ptr].iov_len = sizeof(active_->rtp_headers[active_->rtphdr_ptr]);
active_->chunks[active_->chunk_ptr].iov_base = &active_->rtp_headers[active_->rtphdr_ptr];
@ -296,15 +301,15 @@ rtp_error_t uvg_rtp::frame_queue::enqueue_message(
/* TODO: winsock stuff */
#endif
sender->get_rtp_ctx()->inc_sequence();
sender->get_rtp_ctx()->inc_sent_pkts();
rtp_->inc_sequence();
rtp_->inc_sent_pkts();
return RTP_OK;
}
rtp_error_t uvg_rtp::frame_queue::flush_queue(uvg_rtp::sender *sender)
rtp_error_t uvg_rtp::frame_queue::flush_queue()
{
if (!sender || active_->hdr_ptr == 0 || active_->chunk_ptr == 0) {
if (!active_->hdr_ptr || !active_->chunk_ptr) {
LOG_ERROR("Cannot send 0 messages or messages containing 0 chunks!");
(void)deinit_transaction();
return RTP_INVALID_VALUE;
@ -324,7 +329,7 @@ rtp_error_t uvg_rtp::frame_queue::flush_queue(uvg_rtp::sender *sender)
return RTP_OK;
}
if (sender->get_socket().send_vecio(active_->headers, active_->hdr_ptr, 0) != RTP_OK) {
if (socket_->send_vecio(active_->headers, active_->hdr_ptr, 0) != RTP_OK) {
LOG_ERROR("Failed to flush the message queue: %s", strerror(errno));
(void)deinit_transaction();
return RTP_SEND_ERROR;
@ -336,10 +341,10 @@ rtp_error_t uvg_rtp::frame_queue::flush_queue(uvg_rtp::sender *sender)
#endif
}
void uvg_rtp::frame_queue::update_rtp_header(uvg_rtp::sender *sender)
void uvg_rtp::frame_queue::update_rtp_header()
{
memcpy(&active_->rtp_headers[active_->rtphdr_ptr], &active_->rtp_common, sizeof(active_->rtp_common));
sender->get_rtp_ctx()->update_sequence((uint8_t *)(&active_->rtp_headers[active_->rtphdr_ptr]));
rtp_->update_sequence((uint8_t *)(&active_->rtp_headers[active_->rtphdr_ptr]));
}
uvg_rtp::buff_vec& uvg_rtp::frame_queue::get_buffer_vector()