Make dynamic configuration connection-based

Making configuration global was moronic considering there are
different types of media streams per session (f.ex Opus and HEVC)
which have very different types of needs. For example, setting
receiver's UDP buffer size to 40 MB would make no sense for Opus.

Now each connection can be configured individually which is also
a needed feature for SRTP

This change reverted the changes made earlier to global API
This commit is contained in:
Aaro Altonen 2020-01-13 10:00:02 +02:00
parent d0f18d4864
commit 7cf1c4d36d
11 changed files with 124 additions and 110 deletions

View File

@ -17,35 +17,19 @@
#include "formats/hevc.hh"
#include "formats/opus.hh"
kvz_rtp::connection::connection(rtp_format_t fmt, rtp_ctx_conf_t& conf, bool reader):
kvz_rtp::connection::connection(rtp_format_t fmt, bool reader):
config_(nullptr),
socket_(),
rtcp_(nullptr),
reader_(reader),
wc_start_(0),
fqueue_(nullptr),
dispatcher_(nullptr),
conf_(conf)
fqueue_(nullptr)
{
rtp_sequence_ = 1; //kvz_rtp::random::generate_32();
rtp_ssrc_ = kvz_rtp::random::generate_32();
rtp_payload_ = fmt;
if (!reader) {
#ifndef _WIN32
if (fmt == RTP_FORMAT_HEVC &&
conf_.flags & RCE_SYSTEM_CALL_DISPATCHER)
{
dispatcher_ = new kvz_rtp::dispatcher(&socket_);
fqueue_ = new kvz_rtp::frame_queue(fmt, conf_, dispatcher_);
} else {
#endif
fqueue_ = new kvz_rtp::frame_queue(fmt, conf_);
dispatcher_ = nullptr;
#ifndef _WIN32
}
#endif
}
std::memset(&conf_, 0, sizeof(conf_));
set_payload(fmt);
}
@ -56,12 +40,6 @@ kvz_rtp::connection::~connection()
rtcp_->stop();
delete rtcp_;
}
if (!reader_ && rtp_payload_ == RTP_FORMAT_HEVC && conf_.flags & RCE_SYSTEM_CALL_DISPATCHER) {
while (dispatcher_->stop() != RTP_OK) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
}
void kvz_rtp::connection::set_payload(rtp_format_t fmt)
@ -108,7 +86,7 @@ void kvz_rtp::connection::set_ssrc(uint32_t ssrc)
rtp_ssrc_ = ssrc;
}
uint8_t kvz_rtp::connection::get_payload() const
rtp_format_t kvz_rtp::connection::get_payload() const
{
return rtp_payload_;
}
@ -239,9 +217,9 @@ kvz_rtp::frame_queue *kvz_rtp::connection::get_frame_queue()
return fqueue_;
}
kvz_rtp::dispatcher *kvz_rtp::connection::get_dispatcher()
void kvz_rtp::connection::set_frame_queue(kvz_rtp::frame_queue *fqueue)
{
return dispatcher_;
fqueue_ = fqueue;
}
void kvz_rtp::connection::install_dealloc_hook(void (*dealloc_hook)(void *))
@ -261,3 +239,19 @@ rtp_ctx_conf_t& kvz_rtp::connection::get_ctx_conf()
{
return conf_;
}
rtp_error_t kvz_rtp::connection::configure(int flag, ssize_t value)
{
if (flag < 0 || flag >= RCC_LAST || value < 0)
return RTP_INVALID_VALUE;
conf_.ctx_values[flag] = value;
}
rtp_error_t kvz_rtp::connection::configure(int flag)
{
if (flag < 0 || flag >= RCE_LAST)
return RTP_INVALID_VALUE;
conf_.flags |= flag;
}

View File

@ -25,18 +25,19 @@ namespace kvz_rtp {
class connection : public runner {
public:
connection(rtp_format_t fmt, rtp_ctx_conf_t& conf, bool reader);
connection(rtp_format_t fmt, bool reader);
virtual ~connection();
uint16_t get_sequence() const;
uint32_t get_ssrc() const;
uint8_t get_payload() const;
uint16_t get_sequence() const;
uint32_t get_ssrc() const;
rtp_format_t get_payload() const;
socket& get_socket();
socket_t get_raw_socket();
void set_payload(rtp_format_t fmt);
void set_ssrc(uint32_t ssrc);
void set_frame_queue(kvz_rtp::frame_queue *fqueue);
/* Functions for increasing various RTP statistics
* Overloaded functions without parameters increase the counter by 1
@ -95,11 +96,17 @@ namespace kvz_rtp {
/* Return pointer to RTCP object if RTCP has been enabled
* Otherwise return nullptr
*
* TODO make this const */
* TODO make this const (TODO ???) */
kvz_rtp::rtcp *get_rtcp();
rtp_ctx_conf_t& get_ctx_conf();
/* TODO: */
rtp_error_t configure(int flag, ssize_t value);
/* TODO: */
rtp_error_t configure(int flag);
protected:
void *config_;
uint32_t id_;
@ -112,17 +119,17 @@ namespace kvz_rtp {
/* RTP */
uint16_t rtp_sequence_;
uint8_t rtp_payload_;
uint32_t rtp_ssrc_;
uint32_t rtp_timestamp_;
uint64_t wc_start_;
rtp_format_t rtp_payload_;
kvz_rtp::clock::hrc::hrc_t wc_start_2;
uint32_t clock_rate_;
kvz_rtp::frame_queue *fqueue_;
kvz_rtp::dispatcher *dispatcher_;
rtp_ctx_conf_t conf_;
/* After creation in writer.cc, pointer to frame queue is transferred
* to conn.cc so we can get rid of the dynamic cast in push_fram() */
kvz_rtp::frame_queue *fqueue_;
};
};

View File

@ -18,6 +18,8 @@ rtp_error_t kvz_rtp::dispatcher::start()
if ((runner_ = new std::thread(dispatch_runner, this, socket_)) == nullptr)
return RTP_MEMORY_ERROR;
runner_->detach();
return kvz_rtp::runner::start();
}
@ -26,6 +28,7 @@ rtp_error_t kvz_rtp::dispatcher::stop()
if (tasks_.size() > 0)
return RTP_NOT_READY;
cv_.notify_one();
return kvz_rtp::runner::stop();
}
@ -75,6 +78,9 @@ void kvz_rtp::dispatcher::dispatch_runner(kvz_rtp::dispatcher *dispatcher, kvz_r
if ((t = dispatcher->get_transaction()) == nullptr) {
dispatcher->get_cvar().wait(lk);
t = dispatcher->get_transaction();
if (t == nullptr)
break;
}
do {
@ -86,5 +92,7 @@ void kvz_rtp::dispatcher::dispatch_runner(kvz_rtp::dispatcher *dispatcher, kvz_r
dispatcher->get_cvar().notify_one();
}
std::exit(EXIT_SUCCESS);
}
#endif

View File

@ -504,8 +504,8 @@ rtp_error_t __hevc_receiver_optimistic(kvz_rtp::reader *reader)
int nread = 0;
rtp_ctx_conf_t conf = reader->get_ctx_conf();
frames.pz_enabled = !!(conf.flags & RCE_PROBATION_ZONE);
frames.pz_size = conf.ctx_values[RCC_PROBATION_ZONE_SIZE];
frames.pz_enabled = !!(frames.pz_size > 0);
while (reader->active()) {

View File

@ -10,13 +10,9 @@
thread_local rtp_error_t rtp_errno;
kvz_rtp::context::context(rtp_ctx_flags_t flags)
kvz_rtp::context::context()
{
cname_ = kvz_rtp::context::generate_cname();
std::memset(&ctx_conf_, 0, sizeof(ctx_conf_));
ctx_conf_.flags = flags;
cname_ = kvz_rtp::context::generate_cname();
#ifdef _WIN32
WSADATA wsd;
@ -43,17 +39,9 @@ kvz_rtp::context::~context()
#endif
}
void kvz_rtp::context::configure(rtp_ctx_conf_flags_t flag, ssize_t value)
{
if (flag >= RTP_CTX_CONF_LAST)
return;
ctx_conf_.ctx_values[flag] = value;
}
kvz_rtp::reader *kvz_rtp::context::create_reader(std::string srcAddr, int srcPort, rtp_format_t fmt)
{
kvz_rtp::reader *reader = new kvz_rtp::reader(fmt, ctx_conf_, srcAddr, srcPort);
kvz_rtp::reader *reader = new kvz_rtp::reader(fmt, srcAddr, srcPort);
if (!reader) {
std::cerr << "Failed to create kvz_rtp::reader for " << srcAddr << ":" << srcPort << "!" << std::endl;
@ -67,7 +55,7 @@ kvz_rtp::reader *kvz_rtp::context::create_reader(std::string srcAddr, int srcPor
kvz_rtp::writer *kvz_rtp::context::create_writer(std::string dstAddr, int dstPort, rtp_format_t fmt)
{
kvz_rtp::writer *writer = new kvz_rtp::writer(fmt, ctx_conf_, dstAddr, dstPort);
kvz_rtp::writer *writer = new kvz_rtp::writer(fmt, dstAddr, dstPort);
if (!writer) {
LOG_ERROR("Failed to create writer for %s:%d!", dstAddr.c_str(), dstPort);
@ -81,7 +69,7 @@ kvz_rtp::writer *kvz_rtp::context::create_writer(std::string dstAddr, int dstPor
kvz_rtp::writer *kvz_rtp::context::create_writer(std::string dstAddr, int dstPort, int srcPort, rtp_format_t fmt)
{
kvz_rtp::writer *writer = new kvz_rtp::writer(fmt, ctx_conf_, dstAddr, dstPort, srcPort);
kvz_rtp::writer *writer = new kvz_rtp::writer(fmt, dstAddr, dstPort, srcPort);
if (!writer) {
LOG_ERROR("Failed to create writer for %s:%d!", dstAddr.c_str(), dstPort);
@ -97,6 +85,7 @@ rtp_error_t kvz_rtp::context::destroy_writer(kvz_rtp::writer *writer)
{
conns_.erase(writer->get_ssrc());
writer->stop();
delete writer;
return RTP_OK;
}

View File

@ -10,18 +10,9 @@ namespace kvz_rtp {
class context {
public:
context(rtp_ctx_flags_t flags);
context();
~context();
/* Add configuration for a flag. Examples could be:
*
* rtp_ctx.configure(RTP_CTX_CONF_PROBATION_ZONE_SIZE, 10);
* rtp_ctx.configure(RTP_CTX_CONF_MAX_QUEUED_MSGS, 50);
*
* The first one sets the probation zone size to 10 packets and the second
* sets the maximum size of transaction queueu to 50 transactions */
void configure(rtp_ctx_conf_flags_t flag, ssize_t value);
/* Start listening to incoming RTP packets form src_addr:src_port
*
* Read packets are stored in a ring buffer which can be read by
@ -50,10 +41,6 @@ namespace kvz_rtp {
/* CNAME is the same for all connections */
std::string cname_;
/* RTP context configuration, holds both writer and reader configurations
* that are passed to those objects when they're created */
rtp_ctx_conf_t ctx_conf_;
std::map<uint32_t, connection *> conns_;
};
};

View File

@ -10,8 +10,8 @@
#define RTP_HEADER_VERSION 2
kvz_rtp::reader::reader(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string src_addr, int src_port):
connection(fmt, conf, true),
kvz_rtp::reader::reader(rtp_format_t fmt, std::string src_addr, int src_port):
connection(fmt, true),
src_addr_(src_addr),
src_port_(src_port),
recv_hook_arg_(nullptr),
@ -48,7 +48,7 @@ rtp_error_t kvz_rtp::reader::start()
return ret;
auto ctx_conf = get_ctx_conf();
ssize_t buf_size = ctx_conf.ctx_values[RCC_READER_UDP_BUF_SIZE];
ssize_t buf_size = ctx_conf.ctx_values[RCC_UDP_BUF_SIZE];
if (buf_size <= 0)
buf_size = 4 * 1000 * 1000;

View File

@ -10,7 +10,7 @@ namespace kvz_rtp {
class reader : public connection {
public:
reader(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string src_addr, int src_port);
reader(rtp_format_t fmt, std::string src_addr, int src_port);
~reader();
/* NOTE: this operation is blocking */

View File

@ -70,22 +70,20 @@ typedef enum RTP_FLAGS {
} rtp_flags_t;
/* These flags are given when kvzRTP context is created */
typedef enum RTP_CTX_ENABLE_FLAGS {
RTP_CTX_NO_FLAGS = 0 << 0,
enum RTP_CTX_ENABLE_FLAGS {
RTP_CTX_NO_FLAGS = 0 << 0,
/* Use optimistic receiver (HEVC only) */
RCE_OPTIMISTIC_RECEIVER = 1 << 0,
/* Enable probation zone (enabled only if optimistic receiver is enabled) */
RCE_PROBATION_ZONE = 1 << 1,
/* Enable system call dispatcher (HEVC only) */
RCE_SYSTEM_CALL_DISPATCHER = 1 << 2,
} rtp_ctx_flags_t;
RCE_LAST
};
/* These options are given to configuration() */
typedef enum RTP_CTX_CONFIGURATION_FLAGS {
enum RTP_CTX_CONFIGURATION_FLAGS {
/* No configuration flags */
RCC_NO_FLAGS = 0,
@ -121,29 +119,21 @@ typedef enum RTP_CTX_CONFIGURATION_FLAGS {
* By default, this is set to 4 */
RCC_MAX_CHUNKS_PER_MSG = 4,
/* How large is the receive UDP buffer size
/* How large is the receiver/sender UDP buffer size
*
* Default value is 4 MB
*
* For video with high bitrate, it is advisable to set this
* to a high number to prevent OS from dropping packets */
RCC_READER_UDP_BUF_SIZE = 5,
RCC_UDP_BUF_SIZE = 5,
/* How large is the send UDP buffer size
*
* Default value is 4 MB
*
* For video with high bitrate, it is advisable to set this
* to a high number to prevent OS from dropping packets */
RCC_WRITER_UDP_BUF_SIZE = 6,
RTP_CTX_CONF_LAST
} rtp_ctx_conf_flags_t;
RCC_LAST
};
/* see src/util.hh for more information */
typedef struct rtp_ctx_conf {
rtp_ctx_flags_t flags;
ssize_t ctx_values[RTP_CTX_CONF_LAST];
int flags;
ssize_t ctx_values[RCC_LAST];
} rtp_ctx_conf_t;
extern thread_local rtp_error_t rtp_errno;

View File

@ -22,22 +22,38 @@ using namespace mingw;
#include "formats/hevc.hh"
#include "formats/generic.hh"
kvz_rtp::writer::writer(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string dst_addr, int dst_port):
connection(fmt, conf, false),
kvz_rtp::writer::writer(rtp_format_t fmt, std::string dst_addr, int dst_port):
connection(fmt, false),
dst_addr_(dst_addr),
dst_port_(dst_port),
src_port_(0)
src_port_(0),
fqueue_(nullptr),
dispatcher_(nullptr)
{
}
kvz_rtp::writer::writer(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string dst_addr, int dst_port, int src_port):
writer(fmt, conf, dst_addr, dst_port)
kvz_rtp::writer::writer(rtp_format_t fmt, std::string dst_addr, int dst_port, int src_port):
writer(fmt, dst_addr, dst_port)
{
src_port_ = src_port;
}
kvz_rtp::writer::~writer()
{
fprintf(stderr, "deallocate stuff\n");
delete fqueue_;
delete dispatcher_;
}
rtp_error_t kvz_rtp::writer::stop()
{
if (get_payload() == RTP_FORMAT_HEVC && get_ctx_conf().flags & RCE_SYSTEM_CALL_DISPATCHER) {
while (dispatcher_->stop() != RTP_OK) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
return RTP_OK;
}
rtp_error_t kvz_rtp::writer::start()
@ -48,7 +64,7 @@ rtp_error_t kvz_rtp::writer::start()
return ret;
auto ctx_conf = get_ctx_conf();
ssize_t buf_size = ctx_conf.ctx_values[RCC_WRITER_UDP_BUF_SIZE];
ssize_t buf_size = ctx_conf.ctx_values[RCC_UDP_BUF_SIZE];
if (buf_size <= 0)
buf_size = 4 * 1000 * 1000;
@ -73,13 +89,29 @@ rtp_error_t kvz_rtp::writer::start()
addr_out_ = socket_.create_sockaddr(AF_INET, dst_addr_, dst_port_);
socket_.set_sockaddr(addr_out_);
if (get_payload() == RTP_FORMAT_HEVC && get_ctx_conf().flags & RCE_SYSTEM_CALL_DISPATCHER) {
LOG_ERROR("get dispatcher");
auto dispatcher = get_dispatcher();
auto conf = get_ctx_conf();
auto fmt = get_payload();
if (dispatcher)
dispatcher->start();
#ifndef _WIN32
if (fmt == RTP_FORMAT_HEVC && conf.flags & RCE_SYSTEM_CALL_DISPATCHER) {
dispatcher_ = new kvz_rtp::dispatcher(&socket_);
fqueue_ = new kvz_rtp::frame_queue(fmt, conf, dispatcher_);
if (dispatcher_)
dispatcher_->start();
} else {
#endif
fqueue_ = new kvz_rtp::frame_queue(fmt, conf);
dispatcher_ = nullptr;
#ifndef _WIN32
}
#endif
/* save pointer frame queue pointer to frame queue to conn so push_frame()
* does not have to do a dynamic cast each time it's called
*
* TODO why push_frame() takes conn instead of writer? */
set_frame_queue(fqueue_);
if (rtcp_ == nullptr) {
if ((rtcp_ = new kvz_rtp::rtcp(get_ssrc(), false)) == nullptr) {

View File

@ -9,13 +9,11 @@
namespace kvz_rtp {
/* typedef struct rtp_ctx_conf rtp_ctx_conf_t; */
class writer : public connection {
public:
/* "src_port" is an optional argument, given if holepunching want to be used */
writer(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string dst_addr, int dst_port);
writer(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string dst_addr, int dst_port, int src_port);
writer(rtp_format_t fmt, std::string dst_addr, int dst_port);
writer(rtp_format_t fmt, std::string dst_addr, int dst_port, int src_port);
~writer();
/* Open socket for sending frames and start SCD if enabled
@ -28,6 +26,12 @@ namespace kvz_rtp {
* Return RTP_GENERIC_ERROR for any other error condition */
rtp_error_t start();
/* Stop and destroy SCD if it's enabled
* stop() will block until SCD is finished to ensure that all packet are sent to remote
*
* Return RTP_OK on success */
rtp_error_t stop();
/* Split "data" into 1500 byte chunks and send them to remote
*
* NOTE: If SCD has been enabled, calling this version of push_frame()
@ -54,5 +58,8 @@ namespace kvz_rtp {
int dst_port_;
int src_port_;
sockaddr_in addr_out_;
kvz_rtp::frame_queue *fqueue_;
kvz_rtp::dispatcher *dispatcher_;
};
};