Switch from compile-time to runtime configuration of kvzRTP

This commit is contained in:
Aaro Altonen 2020-01-08 10:55:00 +02:00
parent 9472c4142c
commit 3e981ec4ae
19 changed files with 433 additions and 294 deletions

View File

@ -1,9 +1,7 @@
.PHONY: all clean obj install
CXX = g++
CXXFLAGS = -g -Wall -Wextra -Wuninitialized -O2 -std=c++11 -DNDEBUG -Isrc -fPIC \
-D__RTP_USE_SYSCALL_DISPATCHER__ \
-D__RTP_USE_OPTIMISTIC_RECEIVER__
CXXFLAGS = -g -Wall -Wextra -Wuninitialized -O2 -std=c++11 -DNDEBUG -Isrc -fPIC
SOURCES = $(wildcard src/*.cc)
MODULES := src/formats

View File

@ -14,6 +14,8 @@ We provide several simple and thoroughly commented examples on how to use kvzRTP
[How to create a simple RTP receiver (hooking)](examples/simple/rtp/receiving_hook.cc)
[How to configure RTP sender to send high-quality stream](examples/simple/rtp/sending_hq.cc)
NOTE: The hook should **not** be used for media processing. It should be rather used as interface between application and library where the frame handout happens.
[How to create a simple RTP receiver (polling)](examples/simple/rtp/receiving_poll.cc)
@ -164,74 +166,10 @@ _Overwriting shifts_, as the name suggests, overwrite the previous content so th
Overwriting shifts also cause _appending shifts_ because when a fragment is removed, and a valid fragment is shifted on its place, this valid fragment must not be overwritten and all subsequent valid fragments must be appended.
#### Frame Size and Allocation Heuristics (FSAH)
Trying to mitigate both reallocations and internal fragmentation, OFR uses a "tool" called FSAH.
FSAH keeps track of allocations and frame sizes and tries to build a model that resembles the frame sizes to the best of its ability. Usually a stream consist of inter and intra frames and the intra period is user-configurable. OFR tries to guess the intra period from the frame sizes using FSAH.
Using FSAH, the OFR can efficiently update its allocation sizes when f.ex. encoding parameters or resolution of the video is changed.
### Configuring the OFR
NOTE 1: If you wish to use OFR, you need to supply the maximum payload size used by the sender. When using kvzRTP for both sending and receiving you don't need to worry about that but if you're using f.ex. FFmpeg for sending, you need to set the `MAX_PAYLOAD` to 1506.
`__RTP_USE_OPTIMISTIC_RECEIVER__`
* Enable the receiver, if not given the then default receiver is used
`__RTP_USE_PROBATION_ZONE__`
* Linux-only optimization, small area of free-to-use memory for fragments that cannot be relocated at the moment
`__RTP_PROBATION_ZONE_SIZE__`
* How many **packets** (ie. `N * MAX_PAYLOAD` bytes) can the probation zone hold
`__RTP_N_PACKETS_PER_SYSCALL__`
* How many packets should the OFR read per system call
* This is a double-edged sword because on the other you can reduce the overhead caused by system calls but it will in turn increase the amount of processing done by system call and may cause more complex relocations
* Setting this to 1, you can get rid of shifting completely
`__RTP_MAX_PAYLOAD__`
* This defines the maximum payload used by sender (so only the actual HEVC data, all headers excluded).
* NOTE: it's important that most of the fragments are of size `__RTP_MAX_PAYLOAD__`, otherwise the OFR will spent a lot of time shifting the memory
One additional way of improving the performance of OFR is decreasing the VPS period. VPS/SPS/PPS packets are read into the active frame just like fragments which means that they must be copied out and all remaining fragments of current read must be shifted.
For VPS period of 1, that's 3 copies (assuming all three are received) and `MAX_DATAGRAMS - 3` memory shifts.
## Defines
Use `__RTP_SILENT__` to disable all prints
Use `__RTP_USE_OPTIMISTIC_RECEIVER__` to enable the optimistic fragment receiver
* See src/formats/hevc.cc for more details
Use `__RTP_USE_PROBATION_ZONE__` to enable the probation zone allocation for RTP frames
* See src/frame.hh for more details
* NOTE: Probation zone is enabled only if optimistic fragment receiver is enabled
Use `__RTP_MAX_PAYLOAD__` to determine the maximum UDP **payload** size
* If you're unsure about this, don't define it. kvzRTP will default to 1441 bytes
* NOTE: If you're not using kvzRTP for both sending and receiving, it is very much advised to set this (to some default value??)
`__RTP_N_PACKETS_PER_SYSCALL__` (Linux only)
* How many packets should the OFR read per system call
* See "Configuring the OFR" for more details
Use `__RTP_PROBATION_ZONE_SIZE__` to configure the probation zone is
* This should define the number of **packets** that fit into probation zone
Use `__RTP_MAX_QUEUED_MSGS__` to configure how many transaction entries the free queue can hold
* This can be rather low number for small amount of clients
* Default is 20
Use `__RTP_FQUEUE_MAX_SIZE__` to configure how many UDP packets one transaction entry holds
* Default is 500
Use `__RTP_FQUEUE_BUFFERS_PER_PACKET__`
* How many buffers does it take to construct one UDP packet
* Default is 4 (RTP, NAL, FU and payload)
Use `NDEBUG` to disable `LOG_DEBUG` which is the most verbose level of logging
# Adding support for new media types

View File

@ -0,0 +1,48 @@
#include <kvzrtp/lib.hh>
#define PAYLOAD_MAXLEN 1 * 1000 * 1000
int main(int argc, char **argv)
{
/* Enable system call dispatcher to improve sending speed */
kvz_rtp::context ctx(RCE_SYSTEM_CALL_DISPATCHER);
/* Increase the send UDP buffer size to 40 MB */
ctx.configure(RCC_WRITER_UDP_BUF_SIZE, 40 * 1024 * 1024);
/* Cache 30 transactions to prevent constant (de)allocation */
ctx.configure(RCC_MAX_TRANSACTIONS, 30);
/* Set max size for one input frame to 1.4 MB (1441 * 1000)
* (1.4 MB frame would equal 43 MB bitrate for a 30 FPS video which is very large) */
ctx.configure(RCC_MAX_MESSAGES, 1000);
kvz_rtp::writer *writer = ctx.create_writer("127.0.0.1", 5566, 8888, RTP_FORMAT_HEVC);
/* Before the writer can be used, it must be started.
* This initializes the underlying socket and all needed data structures */
(void)writer->start();
for (int i = 0; i < 10; ++i) {
/* We're using System Call Dispatcher so we must adhere to the memory ownership/deallocation
* rules defined in README.md
*
* Easiest way is to use smart pointers (as done here). If this memory was, however, received
* from f.ex. HEVC encoder directly and was not wrapped in a smart pointer, we could either
* install a deallocation hook for the memory or pass RTP_COPY to push_frame() to force kvzRTP
* to make a copy of the memory */
auto buffer = std::unique_ptr<uint8_t[]>(new uint8_t[PAYLOAD_MAXLEN]);
/* We're using */
if (writer->push_frame(std::move(buffer), PAYLOAD_MAXLEN, RTP_NO_FLAGS) != RTP_OK) {
fprintf(stderr, "Failed to send RTP frame!");
}
}
/* Writer must be destroyed manually */
ctx.destroy_writer(writer);
fprintf(stderr, "done\n");
return 0;
}

View File

@ -17,31 +17,30 @@
#include "formats/hevc.hh"
#include "formats/opus.hh"
kvz_rtp::connection::connection(rtp_format_t fmt, bool reader):
kvz_rtp::connection::connection(rtp_format_t fmt, rtp_ctx_conf_t& conf, bool reader):
config_(nullptr),
socket_(),
rtcp_(nullptr),
reader_(reader),
wc_start_(0),
fqueue_(nullptr),
dispatcher_(nullptr)
dispatcher_(nullptr),
conf_(conf)
{
rtp_sequence_ = 1; //kvz_rtp::random::generate_32();
rtp_ssrc_ = kvz_rtp::random::generate_32();
rtp_payload_ = fmt;
if (!reader) {
#ifdef __RTP_USE_SYSCALL_DISPATCHER__
if (fmt == RTP_FORMAT_HEVC) {
if (fmt == RTP_FORMAT_HEVC &&
conf_.flags & RCE_SYSTEM_CALL_DISPATCHER)
{
dispatcher_ = new kvz_rtp::dispatcher(&socket_);
fqueue_ = new kvz_rtp::frame_queue(fmt, dispatcher_);
fqueue_ = new kvz_rtp::frame_queue(fmt, conf_, dispatcher_);
} else {
fqueue_ = new kvz_rtp::frame_queue(fmt);
fqueue_ = new kvz_rtp::frame_queue(fmt, conf_);
dispatcher_ = nullptr;
}
#else
fqueue_ = new kvz_rtp::frame_queue(fmt);
#endif
}
set_payload(fmt);
@ -54,13 +53,11 @@ kvz_rtp::connection::~connection()
delete rtcp_;
}
#ifdef __RTP_USE_SYSCALL_DISPATCHER__
if (!reader_ && rtp_payload_ == RTP_FORMAT_HEVC) {
if (!reader_ && rtp_payload_ == RTP_FORMAT_HEVC && conf_.flags & RCE_SYSTEM_CALL_DISPATCHER) {
while (dispatcher_->stop() != RTP_OK) {
/* fprintf(stderr, "here...\n"); */
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
#endif
}
void kvz_rtp::connection::set_payload(rtp_format_t fmt)
@ -255,3 +252,8 @@ kvz_rtp::rtcp *kvz_rtp::connection::get_rtcp()
{
return rtcp_;
}
rtp_ctx_conf_t& kvz_rtp::connection::get_ctx_conf()
{
return conf_;
}

View File

@ -25,7 +25,7 @@ namespace kvz_rtp {
class connection : public runner {
public:
connection(rtp_format_t fmt, bool reader);
connection(rtp_format_t fmt, rtp_ctx_conf_t& conf, bool reader);
virtual ~connection();
uint16_t get_sequence() const;
@ -98,6 +98,8 @@ namespace kvz_rtp {
* TODO make this const */
kvz_rtp::rtcp *get_rtcp();
rtp_ctx_conf_t& get_ctx_conf();
protected:
void *config_;
uint32_t id_;
@ -120,5 +122,7 @@ namespace kvz_rtp {
kvz_rtp::frame_queue *fqueue_;
kvz_rtp::dispatcher *dispatcher_;
rtp_ctx_conf_t conf_;
};
};

View File

@ -2,10 +2,12 @@
#include "dispatch.hh"
#include "socket.hh"
#ifdef __RTP_USE_SYSCALL_DISPATCHER__
#include <easy/profiler.h>
kvz_rtp::dispatcher::dispatcher(kvz_rtp::socket *socket):
socket_(socket)
{
LOG_ERROR("starting system call dispatcher!");
}
kvz_rtp::dispatcher::~dispatcher()
@ -86,4 +88,3 @@ void kvz_rtp::dispatcher::dispatch_runner(kvz_rtp::dispatcher *dispatcher, kvz_r
dispatcher->get_cvar().notify_one();
}
}
#endif

View File

@ -517,9 +517,7 @@ rtp_error_t kvz_rtp::hevc::push_frame(kvz_rtp::connection *conn, std::unique_ptr
rtp_error_t kvz_rtp::hevc::frame_receiver(kvz_rtp::reader *reader)
{
#ifdef __RTP_USE_OPTIMISTIC_RECEIVER__
if (reader->get_ctx_conf().flags & RCE_OPTIMISTIC_RECEIVER)
return __hevc_receiver_optimistic(reader);
#else
return __hevc_receiver(reader);
#endif
}

View File

@ -287,6 +287,9 @@ struct frames {
size_t total_received;
size_t total_bytes_received;
size_t total_copied;
bool pz_enabled; /* has probation zone been enabled */
size_t pz_size; /* how many packets fit into the frame's probation zone */
};
/* Buffer contains three bytes: 2 byte NAL header and 1 byte FU header */
@ -359,10 +362,15 @@ static inline void __add_relocation_entry(struct frame_info& finfo, uint16_t seq
finfo.rinfo.insert(std::make_pair(seq, reloc));
}
static void __relocate_temporarily(struct frame_info& src, struct frame_info& dst, uint16_t seq, size_t size, size_t offset)
static void __relocate_temporarily(
struct frame_info& src, struct frame_info& dst,
uint16_t seq, size_t size, size_t offset, bool pz_enabled
)
{
#ifdef __RTP_USE_PROBATION_ZONE__
if (dst.frame->probation_off != dst.frame->probation_len) {
if (pz_enabled) {
if (dst.frame->probation_off >= dst.frame->probation_len)
goto alloc_normal;
void *ptr = dst.frame->probation + dst.frame->probation_off;
src.relocs++;
@ -371,22 +379,25 @@ static void __relocate_temporarily(struct frame_info& src, struct frame_info& ds
__add_relocation_entry(dst, seq, ptr, size, RT_PROBATION);
dst.frame->probation_off += size;
} else {
#endif
return;
}
alloc_normal:
auto tmp_frame = kvz_rtp::frame::alloc_rtp_frame(size);
src.relocs++;
std::memcpy(tmp_frame->payload, src.frame->payload + offset, size);
__add_relocation_entry(dst, seq, tmp_frame, size, RT_FRAME);
#ifdef __RTP_USE_PROBATION_ZONE__
}
#endif
}
static void __reallocate_frame(struct frames& frames, uint32_t timestamp)
{
auto tmp_frame = kvz_rtp::frame::alloc_rtp_frame(GET_FRAME(timestamp).total_size + frames.fsah.ras);
kvz_rtp::frame::rtp_frame *tmp_frame = nullptr;
if (frames.pz_enabled)
tmp_frame = kvz_rtp::frame::alloc_rtp_frame(GET_FRAME(timestamp).total_size + frames.fsah.ras, frames.pz_size);
else
tmp_frame = kvz_rtp::frame::alloc_rtp_frame(GET_FRAME(timestamp).total_size + frames.fsah.ras);
std::memcpy(tmp_frame->payload, GET_FRAME(timestamp).frame->payload, GET_FRAME(timestamp).total_size);
(void)kvz_rtp::frame::dealloc_frame(GET_FRAME(timestamp).frame);
@ -402,7 +413,11 @@ static void __create_frame_entry(struct frames& frames, uint32_t timestamp, void
{
(void)payload, (void)size;
if (frames.pz_enabled)
GET_FRAME(timestamp).frame = kvz_rtp::frame::alloc_rtp_frame(frames.fsah.das, frames.pz_size);
else
GET_FRAME(timestamp).frame = kvz_rtp::frame::alloc_rtp_frame(frames.fsah.das);
GET_FRAME(timestamp).total_size = frames.fsah.das;
GET_FRAME(timestamp).next_off = NAL_HDR_SIZE;
GET_FRAME(timestamp).received_size = NAL_HDR_SIZE;
@ -466,7 +481,7 @@ static void __resolve_relocations(struct frames& frames, uint32_t ts)
rtp_error_t __hevc_receiver_optimistic(kvz_rtp::reader *reader)
{
/* LOG_INFO("Starting Optimistic HEVC Fragment Receiver..."); */
LOG_INFO("Starting Optimistic HEVC Fragment Receiver...");
struct frames frames;
@ -490,9 +505,16 @@ rtp_error_t __hevc_receiver_optimistic(kvz_rtp::reader *reader)
std::memset(frames.headers, 0, sizeof(frames.headers));
int nread = 0;
rtp_ctx_conf_t conf = reader->get_ctx_conf();
uint64_t total_proc_time = 0;
uint64_t total_frames_recved = 0;
frames.pz_enabled = !!(conf.flags & RCE_PROBATION_ZONE);
frames.pz_size = conf.ctx_values[RCC_PROBATION_ZONE_SIZE];
if (!frames.pz_enabled) {
LOG_ERROR("probatoin zone dialbed");
} else {
LOG_ERROR("PZ enabled");
}
while (reader->active()) {
@ -692,7 +714,7 @@ rtp_error_t __hevc_receiver_optimistic(kvz_rtp::reader *reader)
}
} else {
/* Relocate the fragment temporarily to probation zone/temporary frame */
__relocate_temporarily(ACTIVE, GET_FRAME(c_ts), c_seq, FRAG_PSIZE(i), off);
__relocate_temporarily(ACTIVE, GET_FRAME(c_ts), c_seq, FRAG_PSIZE(i), off, frames.pz_enabled);
}
/* This is not the first fragment of an inactive frame, copy it to correct frame
@ -715,7 +737,7 @@ rtp_error_t __hevc_receiver_optimistic(kvz_rtp::reader *reader)
} else {
/* Relocate the fragment temporarily to probation zone/temporary frame */
__relocate_temporarily(ACTIVE, GET_FRAME(c_ts), c_seq, FRAG_PSIZE(i), off);
__relocate_temporarily(ACTIVE, GET_FRAME(c_ts), c_seq, FRAG_PSIZE(i), off, frames.pz_enabled);
}
}
@ -797,9 +819,6 @@ rtp_error_t __hevc_receiver_optimistic(kvz_rtp::reader *reader)
/* fprintf(stderr, "%zu missing\n", pkts_received - ACTIVE.pkts_received); */
if (ACTIVE.pkts_received == pkts_received) {
total_proc_time += kvz_rtp::clock::hrc::diff_now_us(ACTIVE.start);
total_frames_recved += 1;
__resolve_relocations(frames, frames.active_ts);
frames.total_received += ACTIVE.pkts_received;

View File

@ -18,10 +18,7 @@ kvz_rtp::frame::rtp_frame *kvz_rtp::frame::alloc_rtp_frame()
std::memset(frame, 0, sizeof(kvz_rtp::frame::rtp_frame));
frame->payload = nullptr;
#if defined(__linux__) && defined(__RTP_USE_PROBATION_ZONE__)
frame->probation = nullptr;
#endif
return frame;
}
@ -33,17 +30,25 @@ kvz_rtp::frame::rtp_frame *kvz_rtp::frame::alloc_rtp_frame(size_t payload_len)
if ((frame = kvz_rtp::frame::alloc_rtp_frame()) == nullptr)
return nullptr;
#if defined(__linux__) && defined(__RTP_USE_PROBATION_ZONE__)
frame->probation = new uint8_t[PROBATION_MAX_PKTS * MAX_PAYLOAD + payload_len];
frame->probation_len = PROBATION_MAX_PKTS * MAX_PAYLOAD;
frame->payload = new uint8_t[payload_len];
frame->payload_len = payload_len;
return frame;
}
kvz_rtp::frame::rtp_frame *kvz_rtp::frame::alloc_rtp_frame(size_t payload_len, size_t pz_size)
{
kvz_rtp::frame::rtp_frame *frame = nullptr;
if ((frame = kvz_rtp::frame::alloc_rtp_frame()) == nullptr)
return nullptr;
frame->probation = new uint8_t[pz_size * MAX_PAYLOAD + payload_len];
frame->probation_len = pz_size * MAX_PAYLOAD;
frame->probation_off = 0;
frame->payload = (uint8_t *)frame->probation + frame->probation_len;
frame->payload_len = payload_len;
#else
frame->payload = new uint8_t[payload_len];
frame->payload_len = payload_len;
#endif
return frame;
}
@ -59,13 +64,11 @@ rtp_error_t kvz_rtp::frame::dealloc_frame(kvz_rtp::frame::rtp_frame *frame)
if (frame->ext)
delete frame->ext;
#if defined(__linux__) && defined(__RTP_USE_PROBATION_ZONE__)
if (frame->probation)
delete[] frame->probation;
#else
if (frame->payload)
else if (frame->payload)
delete[] frame->payload;
#endif
LOG_DEBUG("Deallocating frame, type %u", frame->type);

View File

@ -4,12 +4,6 @@
#include <ws2def.h>
#else
#include <netinet/in.h>
#ifdef __RTP_PROBATION_ZONE_SIZE__
#define PROBATION_MAX_PKTS __RTP_PROBATION_ZONE_SIZE__
#else
#define PROBATION_MAX_PKTS 8
#endif
#endif
#include <string>
@ -67,7 +61,6 @@ namespace kvz_rtp {
size_t padding_len; /* non-zero if frame is padded */
size_t payload_len; /* payload_len: total_len - header_len - padding length (if padded) */
#if defined(__linux__) && defined(__RTP_USE_PROBATION_ZONE__)
/* Probation zone is a small area of free-to-use memory for the frame receiver
* when handling fragments. For example HEVC fragments that belong to future frames
* but cannot be relocated there (start sequence missing) are copied to probation
@ -80,7 +73,6 @@ namespace kvz_rtp {
size_t probation_len;
size_t probation_off;
uint8_t *probation;
#endif
uint8_t *payload;
rtp_format_t format;
@ -158,6 +150,7 @@ namespace kvz_rtp {
rtp_frame *alloc_rtp_frame();
rtp_frame *alloc_rtp_frame(size_t payload_len);
rtp_frame *alloc_rtp_frame(size_t payload_len, size_t pz_size);
rtcp_app_frame *alloc_rtcp_app_frame(std::string name, uint8_t subtype, size_t payload_len);
rtcp_sdes_frame *alloc_rtcp_sdes_frame(size_t ssrc_count, size_t total_len);
rtcp_receiver_frame *alloc_rtcp_receiver_frame(size_t nblocks);

View File

@ -1,4 +1,5 @@
#include <cstdlib>
#include <cstring>
#include <iostream>
#include "debug.hh"
@ -9,10 +10,14 @@
thread_local rtp_error_t rtp_errno;
kvz_rtp::context::context()
kvz_rtp::context::context(rtp_ctx_flags_t flags)
{
cname_ = kvz_rtp::context::generate_cname();
std::memset(&ctx_conf_, 0, sizeof(ctx_conf_));
ctx_conf_.flags = flags;
#ifdef _WIN32
WSADATA wsd;
int rc;
@ -38,9 +43,17 @@ kvz_rtp::context::~context()
#endif
}
void kvz_rtp::context::configure(rtp_ctx_conf_flags_t flag, ssize_t value)
{
if (flag >= RTP_CTX_CONF_LAST)
return;
ctx_conf_.ctx_values[flag] = value;
}
kvz_rtp::reader *kvz_rtp::context::create_reader(std::string srcAddr, int srcPort, rtp_format_t fmt)
{
kvz_rtp::reader *reader = new kvz_rtp::reader(fmt, srcAddr, srcPort);
kvz_rtp::reader *reader = new kvz_rtp::reader(fmt, ctx_conf_, srcAddr, srcPort);
if (!reader) {
std::cerr << "Failed to create kvz_rtp::reader for " << srcAddr << ":" << srcPort << "!" << std::endl;
@ -54,7 +67,7 @@ kvz_rtp::reader *kvz_rtp::context::create_reader(std::string srcAddr, int srcPor
kvz_rtp::writer *kvz_rtp::context::create_writer(std::string dstAddr, int dstPort, rtp_format_t fmt)
{
kvz_rtp::writer *writer = new kvz_rtp::writer(fmt, dstAddr, dstPort);
kvz_rtp::writer *writer = new kvz_rtp::writer(fmt, ctx_conf_, dstAddr, dstPort);
if (!writer) {
LOG_ERROR("Failed to create writer for %s:%d!", dstAddr.c_str(), dstPort);
@ -68,7 +81,7 @@ kvz_rtp::writer *kvz_rtp::context::create_writer(std::string dstAddr, int dstPor
kvz_rtp::writer *kvz_rtp::context::create_writer(std::string dstAddr, int dstPort, int srcPort, rtp_format_t fmt)
{
kvz_rtp::writer *writer = new kvz_rtp::writer(fmt, dstAddr, dstPort, srcPort);
kvz_rtp::writer *writer = new kvz_rtp::writer(fmt, ctx_conf_, dstAddr, dstPort, srcPort);
if (!writer) {
LOG_ERROR("Failed to create writer for %s:%d!", dstAddr.c_str(), dstPort);

View File

@ -6,12 +6,22 @@
#include "writer.hh"
namespace kvz_rtp {
class context {
class context {
public:
context();
context(rtp_ctx_flags_t flags);
~context();
/* Add configuration for a flag. Examples could be:
*
* rtp_ctx.configure(RTP_CTX_CONF_PROBATION_ZONE_SIZE, 10);
* rtp_ctx.configure(RTP_CTX_CONF_MAX_QUEUED_MSGS, 50);
*
* The first one sets the probation zone size to 10 packets and the second
* sets the maximum size of transaction queueu to 50 transactions */
void configure(rtp_ctx_conf_flags_t flag, ssize_t value);
/* Start listening to incoming RTP packets form src_addr:src_port
*
* Read packets are stored in a ring buffer which can be read by
@ -40,6 +50,10 @@ namespace kvz_rtp {
/* CNAME is the same for all connections */
std::string cname_;
/* RTP context configuration, holds both writer and reader configurations
* that are passed to those objects when they're created */
rtp_ctx_conf_t ctx_conf_;
std::map<uint32_t, connection *> conns_;
};
};

View File

@ -16,15 +16,34 @@
#include "formats/hevc.hh"
kvz_rtp::frame_queue::frame_queue(rtp_format_t fmt):
active_(nullptr), fmt_(fmt)
kvz_rtp::frame_queue::frame_queue(rtp_format_t fmt, rtp_ctx_conf_t& conf):
active_(nullptr), fmt_(fmt), dealloc_hook_(nullptr)
{
active_ = nullptr;
free_.reserve(MAX_QUEUED_MSGS);
dispatcher_ = nullptr;
max_queued_ = conf.ctx_values[RCC_MAX_TRANSACTIONS];
max_mcount_ = conf.ctx_values[RCC_MAX_MESSAGES];
max_ccount_ = conf.ctx_values[RCC_MAX_CHUNKS_PER_MSG] * max_mcount_;
if (max_queued_ <= 0)
max_queued_ = MAX_QUEUED_MSGS;
if (max_mcount_ <= 0)
max_mcount_ = MAX_MSG_COUNT;
if (max_ccount_ <= 0)
max_ccount_ = MAX_CHUNK_COUNT * max_mcount_;
LOG_ERROR("max transactions: %u", max_queued_);
LOG_ERROR("max messages: %u", max_mcount_);
LOG_ERROR("max chunk: %u", max_ccount_);
free_.reserve(max_queued_);
}
kvz_rtp::frame_queue::frame_queue(rtp_format_t fmt, kvz_rtp::dispatcher *dispatcher):
frame_queue(fmt)
kvz_rtp::frame_queue::frame_queue(rtp_format_t fmt, rtp_ctx_conf_t& conf, kvz_rtp::dispatcher *dispatcher):
frame_queue(fmt, conf)
{
dispatcher_ = dispatcher;
}
@ -37,15 +56,22 @@ rtp_error_t kvz_rtp::frame_queue::init_transaction(kvz_rtp::connection *conn)
{
std::lock_guard<std::mutex> lock(transaction_mtx_);
if (active_ != nullptr) {
/* LOG_WARN("initializing a new transaction while previous is still active!"); */
if (active_ != nullptr)
active_ = nullptr;
}
if (free_.empty()) {
active_ = new transaction_t;
active_->key = kvz_rtp::random::generate_32();
#ifdef __linux__
active_->headers = new struct mmsghdr[max_mcount_];
active_->chunks = new struct iovec[max_ccount_];
#else
active_->headers = nullptr;
active_->chunks = nullptr;
#endif
active_->rtp_headers = new kvz_rtp::frame::rtp_header[max_mcount_];
switch (fmt_) {
case RTP_FORMAT_HEVC:
active_->media_headers = new kvz_rtp::hevc::media_headers;
@ -133,7 +159,7 @@ rtp_error_t kvz_rtp::frame_queue::deinit_transaction(uint32_t key)
transaction_it->second->data_raw = nullptr;
}
if (free_.size() > MAX_QUEUED_MSGS) {
if (free_.size() > max_queued_) {
switch (fmt_) {
case RTP_FORMAT_HEVC:
delete (kvz_rtp::hevc::media_headers *)transaction_it->second->media_headers;
@ -141,6 +167,9 @@ rtp_error_t kvz_rtp::frame_queue::deinit_transaction(uint32_t key)
break;
}
delete transaction_it->second->headers;
delete transaction_it->second->chunks;
delete transaction_it->second->rtp_headers;
delete transaction_it->second;
} else {
free_.push_back(transaction_it->second);
@ -169,7 +198,7 @@ rtp_error_t kvz_rtp::frame_queue::enqueue_message(
return RTP_INVALID_VALUE;
#ifdef __linux__
if (active_->chunk_ptr + 2 >= MAX_CHUNK_COUNT || active_->hdr_ptr + 1 >= MAX_MSG_COUNT) {
if (active_->chunk_ptr + 2 >= max_ccount_ || active_->hdr_ptr + 1 >= max_mcount_) {
LOG_ERROR("maximum amount of chunks (%zu) or messages (%zu) exceeded!", active_->chunk_ptr, active_->hdr_ptr);
return RTP_MEMORY_ERROR;
}
@ -212,7 +241,7 @@ rtp_error_t kvz_rtp::frame_queue::enqueue_message(
return RTP_INVALID_VALUE;
#ifdef __linux__
if (active_->chunk_ptr + buffers.size() + 1 >= MAX_CHUNK_COUNT || active_->hdr_ptr + 1 >= MAX_MSG_COUNT) {
if (active_->chunk_ptr + buffers.size() + 1 >= max_ccount_ || active_->hdr_ptr + 1 >= max_mcount_) {
LOG_ERROR("maximum amount of chunks (%zu) or messages (%zu) exceeded!", active_->chunk_ptr, active_->hdr_ptr);
return RTP_MEMORY_ERROR;
}
@ -264,16 +293,12 @@ rtp_error_t kvz_rtp::frame_queue::flush_queue(kvz_rtp::connection *conn)
queued_.insert(std::make_pair(active_->key, active_));
transaction_mtx_.unlock();
#ifdef __RTP_USE_SYSCALL_DISPATCHER__
/* Dispatcher may be enabled but it doesn't necessarily
* mean that this frame queue uses it to send messages */
if (dispatcher_) {
dispatcher_->trigger_send(active_);
active_ = nullptr;
return RTP_OK;
}
#endif
if (conn->get_socket().send_vecio(active_->headers, active_->hdr_ptr, 0) != RTP_OK) {
LOG_ERROR("Failed to flush the message queue: %s", strerror(errno));
(void)deinit_transaction();

View File

@ -9,19 +9,9 @@
#include "dispatch.hh"
#include "util.hh"
#ifdef __RTP_FQUEUE_MAX_SIZE__
# define MAX_MSG_COUNT __RTP_FQUEUE_MAX_SIZE__
#else
# define MAX_MSG_COUNT 5000
#endif
#ifdef __RTP_FQUEUE_BUFFERS_PER_PACKET__
# define MAX_CHUNK_COUNT (MAX_MSG_COUNT * __RTP_FQUEUE_MAX_SIZE__)
#else
# define MAX_CHUNK_COUNT (MAX_MSG_COUNT * 4)
#endif
#define MAX_QUEUED_MSGS 50
const int MAX_MSG_COUNT = 500;
const int MAX_QUEUED_MSGS = 10;
const int MAX_CHUNK_COUNT = 4;
namespace kvz_rtp {
@ -48,14 +38,14 @@ namespace kvz_rtp {
* Keeping a separate common RTP header and then just copying this is cleaner than initializing
* RTP header for each packet */
kvz_rtp::frame::rtp_header rtp_common;
kvz_rtp::frame::rtp_header rtp_headers[MAX_MSG_COUNT];
kvz_rtp::frame::rtp_header *rtp_headers;
#ifdef __linux__
struct mmsghdr headers[MAX_MSG_COUNT];
struct iovec chunks[MAX_CHUNK_COUNT];
struct mmsghdr *headers;
struct iovec *chunks;
#else
char headers[MAX_MSG_COUNT];
char chunks[MAX_CHUNK_COUNT];
char *headers;
char *chunks;
#endif
/* Media may need space for additional buffers,
@ -97,8 +87,8 @@ namespace kvz_rtp {
class frame_queue {
public:
frame_queue(rtp_format_t fmt);
frame_queue(rtp_format_t fmt, kvz_rtp::dispatcher *dispatcher);
frame_queue(rtp_format_t fmt, rtp_ctx_conf_t& conf);
frame_queue(rtp_format_t fmt, rtp_ctx_conf_t& conf, kvz_rtp::dispatcher *dispatcher);
~frame_queue();
rtp_error_t init_transaction(kvz_rtp::connection *conn);
@ -206,5 +196,9 @@ namespace kvz_rtp {
/* Deallocation hook is stored here and copied to transaction upon initialization */
void (*dealloc_hook_)(void *);
ssize_t max_queued_; /* number of queued transactions */
ssize_t max_mcount_; /* number of messages per transactions */
ssize_t max_ccount_; /* number of chunks per message */
};
};

View File

@ -10,8 +10,8 @@
#define RTP_HEADER_VERSION 2
kvz_rtp::reader::reader(rtp_format_t fmt, std::string src_addr, int src_port):
connection(fmt, true),
kvz_rtp::reader::reader(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string src_addr, int src_port):
connection(fmt, conf, true),
src_addr_(src_addr),
src_port_(src_port),
recv_hook_arg_(nullptr),
@ -43,12 +43,17 @@ rtp_error_t kvz_rtp::reader::start()
return ret;
int enable = 1;
int udp_buf_size = 40 * 1024 * 1024;
if ((ret = socket_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (const char *)&enable, sizeof(int))) != RTP_OK)
return ret;
if ((ret = socket_.setsockopt(SOL_SOCKET, SO_RCVBUF, (const char *)&udp_buf_size, sizeof(int))) != RTP_OK)
auto ctx_conf = get_ctx_conf();
ssize_t buf_size = ctx_conf.ctx_values[RCC_READER_UDP_BUF_SIZE];
if (buf_size <= 0)
buf_size = 4 * 1000 * 1000;
if ((ret = socket_.setsockopt(SOL_SOCKET, SO_RCVBUF, (const char *)&buf_size, sizeof(int))) != RTP_OK)
return ret;
LOG_DEBUG("Binding to port %d (source port)", src_port_);

View File

@ -7,10 +7,10 @@
#include "socket.hh"
namespace kvz_rtp {
class reader : public connection {
class reader : public connection {
public:
reader(rtp_format_t fmt, std::string src_addr, int src_port);
reader(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string src_addr, int src_port);
~reader();
/* NOTE: this operation is blocking */

View File

@ -69,6 +69,83 @@ typedef enum RTP_FLAGS {
RTP_COPY = 1 << 2,
} rtp_flags_t;
/* These flags are given when kvzRTP context is created */
typedef enum RTP_CTX_ENABLE_FLAGS {
RTP_CTX_NO_FLAGS = 0 << 0,
/* Use optimistic receiver (HEVC only) */
RCE_OPTIMISTIC_RECEIVER = 1 << 0,
/* Enable probation zone (enabled only if optimistic receiver is enabled) */
RCE_PROBATION_ZONE = 1 << 1,
/* Enable system call dispatcher (HEVC only) */
RCE_SYSTEM_CALL_DISPATCHER = 1 << 2,
} rtp_ctx_flags_t;
/* These options are given to configuration() */
typedef enum RTP_CTX_CONFIGURATION_FLAGS {
/* No configuration flags */
RCC_NO_FLAGS = 0,
/* How many packets can be fit into
* probation zone until they overflow to separate frames.
*
* By default, probation zone is disabled
*
* NOTE: how many **packets**, not bytes */
RCC_PROBATION_ZONE_SIZE = 1,
/* How many transactions can be cached for later use
* Caching transactions improves performance by reducing
* the number of (de)allocations but increases the memory
* footprint of the program
*
* By default, 10 transactions are cached */
RCC_MAX_TRANSACTIONS = 2,
/* How many UDP packets does one transaction object hold.
*
* kvzRTP splits one input frame [argument of push_frame()] into
* multiple UDP packets, each of size 1500 bytes. This UDP packets
* are stored into a transaction object.
*
* Video with high bitrate may require large value for "RCC_MAX_MESSAGES"
*
* By default, it is set to 500 (ie. one frame can take up to 500 * 1500 bytes) */
RCC_MAX_MESSAGES = 3,
/* How many chunks each UDP packet can at most contain
*
* By default, this is set to 4 */
RCC_MAX_CHUNKS_PER_MSG = 4,
/* How large is the receive UDP buffer size
*
* Default value is 4 MB
*
* For video with high bitrate, it is advisable to set this
* to a high number to prevent OS from dropping packets */
RCC_READER_UDP_BUF_SIZE = 5,
/* How large is the send UDP buffer size
*
* Default value is 4 MB
*
* For video with high bitrate, it is advisable to set this
* to a high number to prevent OS from dropping packets */
RCC_WRITER_UDP_BUF_SIZE = 6,
RTP_CTX_CONF_LAST
} rtp_ctx_conf_flags_t;
/* see src/util.hh for more information */
typedef struct rtp_ctx_conf {
rtp_ctx_flags_t flags;
ssize_t ctx_values[RTP_CTX_CONF_LAST];
} rtp_ctx_conf_t;
extern thread_local rtp_error_t rtp_errno;
static inline void hex_dump(uint8_t *buf, size_t len)

View File

@ -22,16 +22,16 @@ using namespace mingw;
#include "formats/hevc.hh"
#include "formats/generic.hh"
kvz_rtp::writer::writer(rtp_format_t fmt, std::string dst_addr, int dst_port):
connection(fmt, false),
kvz_rtp::writer::writer(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string dst_addr, int dst_port):
connection(fmt, conf, false),
dst_addr_(dst_addr),
dst_port_(dst_port),
src_port_(0)
{
}
kvz_rtp::writer::writer(rtp_format_t fmt, std::string dst_addr, int dst_port, int src_port):
writer(fmt, dst_addr, dst_port)
kvz_rtp::writer::writer(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string dst_addr, int dst_port, int src_port):
writer(fmt, conf, dst_addr, dst_port)
{
src_port_ = src_port;
}
@ -47,9 +47,13 @@ rtp_error_t kvz_rtp::writer::start()
if ((ret = socket_.init(AF_INET, SOCK_DGRAM, 0)) != RTP_OK)
return ret;
int udp_buf_size = 40 * 1024 * 1024;
auto ctx_conf = get_ctx_conf();
ssize_t buf_size = ctx_conf.ctx_values[RCC_WRITER_UDP_BUF_SIZE];
if ((ret = socket_.setsockopt(SOL_SOCKET, SO_SNDBUF, (const char *)&udp_buf_size, sizeof(int))) != RTP_OK)
if (buf_size <= 0)
buf_size = 4 * 1000 * 1000;
if ((ret = socket_.setsockopt(SOL_SOCKET, SO_SNDBUF, (const char *)&buf_size, sizeof(int))) != RTP_OK)
return ret;
/* if source port is not 0, writer should be bind to that port so that outgoing packet
@ -69,12 +73,13 @@ rtp_error_t kvz_rtp::writer::start()
addr_out_ = socket_.create_sockaddr(AF_INET, dst_addr_, dst_port_);
socket_.set_sockaddr(addr_out_);
#ifdef __RTP_USE_SYSCALL_DISPATCHER__
if (get_payload() == RTP_FORMAT_HEVC && get_ctx_conf().flags & RCE_SYSTEM_CALL_DISPATCHER) {
LOG_ERROR("get dispatcher");
auto dispatcher = get_dispatcher();
if (dispatcher)
dispatcher->start();
#endif
}
if (rtcp_ == nullptr) {
if ((rtcp_ = new kvz_rtp::rtcp(get_ssrc(), false)) == nullptr) {

View File

@ -8,12 +8,14 @@
#include "socket.hh"
namespace kvz_rtp {
class writer : public connection {
/* typedef struct rtp_ctx_conf rtp_ctx_conf_t; */
class writer : public connection {
public:
/* "src_port" is an optional argument, given if holepunching want to be used */
writer(rtp_format_t fmt, std::string dst_addr, int dst_port);
writer(rtp_format_t fmt, std::string dst_addr, int dst_port, int src_port);
writer(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string dst_addr, int dst_port);
writer(rtp_format_t fmt, rtp_ctx_conf_t& conf, std::string dst_addr, int dst_port, int src_port);
~writer();
/* Open socket for sending frames and start SCD if enabled