Remove RTP sender

Media-specific object is going to handle all send operations in the near
future, thus deprecating sender
This commit is contained in:
Aaro Altonen 2020-08-03 10:07:45 +03:00
parent 95f103eac1
commit 86837e82d5
6 changed files with 12 additions and 261 deletions

View File

@ -1,76 +0,0 @@
#pragma once
#include "dispatch.hh"
#include "queue.hh"
#include "rtp.hh"
#include "socket.hh"
namespace uvg_rtp {
class frame_queue;
class dispatcher;
class sender {
public:
sender(uvg_rtp::socket& socket, rtp_ctx_conf& conf, rtp_format_t fmt, uvg_rtp::rtp *rtp);
~sender();
/* Initialize the RTP sender by adjusting UDP buffer size,
* creating a frame queue and possibly creating a dispatcher
*
* Return RTP_OK on success
* Return RTP_MEMORY_ERROR if allocation failed */
rtp_error_t init();
/* TODO: */
rtp_error_t destroy();
/* Split "data" into 1500 byte chunks and send them to remote
*
* NOTE: If SCD has been enabled, calling this version of push_frame()
* requires either that the caller has given a deallocation callback to
* SCD OR that "flags" contains flags "RTP_COPY"
*
* Return RTP_OK success
* Return RTP_INVALID_VALUE if one of the parameters are invalid
* Return RTP_MEMORY_ERROR if the data chunk is too large to be processed
* Return RTP_SEND_ERROR if uvgRTP failed to send the data to remote
* Return RTP_GENERIC_ERROR for any other error condition */
rtp_error_t push_frame(uint8_t *data, size_t data_len, int flags);
/* Same as push_frame() defined above but no callback nor RTP_COPY must be provided
* One must call this like: push_frame(std::move(data), ...) to give ownership of the
* memory to uvgRTP */
rtp_error_t push_frame(std::unique_ptr<uint8_t[]> data, size_t data_len, int flags);
/* Get pointer to the frame queue */
uvg_rtp::frame_queue *get_frame_queue();
/* Install deallocation hook to frame queue */
void install_dealloc_hook(void (*dealloc_hook)(void *));
/* Get reference to the underlying socket object */
uvg_rtp::socket& get_socket();
/* Get pointer to RTP context where all clocking information,
* SSRC, sequence number etc. are stored */
uvg_rtp::rtp *get_rtp_ctx();
/* Get reference to the media stream's config structure */
rtp_ctx_conf& get_conf();
private:
rtp_error_t __push_frame(uint8_t *data, size_t data_len, int flags);
rtp_error_t __push_frame(std::unique_ptr<uint8_t[]> data, size_t data_len, int flags);
uvg_rtp::socket socket_;
uvg_rtp::rtp *rtp_;
rtp_ctx_conf conf_;
rtp_format_t fmt_;
sockaddr_in addr_out_;
uvg_rtp::frame_queue *fqueue_;
uvg_rtp::dispatcher *dispatcher_;
};
};

View File

@ -12,13 +12,14 @@
#include "debug.hh"
#include "receiver.hh"
#include "send.hh"
#include "sender.hh"
/* #include "sender.hh" */
#include "util.hh"
#include "formats/generic.hh"
#define INVALID_SEQ 0xffffffff
#if 0
/* The generic frames are fragmented using the marker bit of the RTP header.
* First and last fragment of a larger frame have marker bits set and middle fragments don't.
* All fragments have the same timestamp so the receiver knows which fragments are part of a larger frame. */
@ -271,3 +272,4 @@ rtp_error_t uvg_rtp::generic::frame_receiver(uvg_rtp::receiver *receiver)
receiver->get_mutex().unlock();
return ret;
}
#endif

View File

@ -16,7 +16,6 @@
uvg_rtp::media_stream::media_stream(std::string addr, int src_port, int dst_port, rtp_format_t fmt, int flags):
srtp_(nullptr),
socket_(flags),
sender_(nullptr),
receiver_(nullptr),
rtp_(nullptr),
rtcp_(nullptr),
@ -48,13 +47,10 @@ uvg_rtp::media_stream::media_stream(
uvg_rtp::media_stream::~media_stream()
{
if (initialized_) {
if (sender_)
sender_->destroy();
if (receiver_)
receiver_->stop();
}
delete sender_;
delete receiver_;
delete rtcp_;
delete rtp_;
@ -178,10 +174,10 @@ rtp_error_t uvg_rtp::media_stream::init(uvg_rtp::zrtp *zrtp)
socket_.set_srtp(srtp_);
sender_ = new uvg_rtp::sender(socket_, ctx_config_, fmt_, rtp_);
/* TODO: install srtp packet handler */
receiver_ = new uvg_rtp::receiver(socket_, ctx_config_, fmt_, rtp_);
sender_->init();
receiver_->start();
initialized_ = true;
@ -221,10 +217,8 @@ rtp_error_t uvg_rtp::media_stream::add_srtp_ctx(uint8_t *key, uint8_t *salt)
socket_.set_srtp(srtp_);
sender_ = new uvg_rtp::sender(socket_, ctx_config_, fmt_, rtp_);
receiver_ = new uvg_rtp::receiver(socket_, ctx_config_, fmt_, rtp_);
sender_->init();
receiver_->start();
initialized_ = true;
@ -240,10 +234,7 @@ rtp_error_t uvg_rtp::media_stream::push_frame(uint8_t *data, size_t data_len, in
return RTP_NOT_INITIALIZED;
}
if (!sender_)
return RTP_NOT_SUPPORTED;
return sender_->push_frame(data, data_len, flags);
return media_->push_frame(data, data_len, flags);
}
rtp_error_t uvg_rtp::media_stream::push_frame(std::unique_ptr<uint8_t[]> data, size_t data_len, int flags)
@ -253,10 +244,7 @@ rtp_error_t uvg_rtp::media_stream::push_frame(std::unique_ptr<uint8_t[]> data, s
return RTP_NOT_INITIALIZED;
}
if (!sender_)
return RTP_NOT_SUPPORTED;
return sender_->push_frame(std::move(data), data_len, flags);
return media_->push_frame(std::move(data), data_len, flags);
}
rtp_error_t uvg_rtp::media_stream::push_frame(uint8_t *data, size_t data_len, uint32_t ts, int flags)
@ -268,11 +256,8 @@ rtp_error_t uvg_rtp::media_stream::push_frame(uint8_t *data, size_t data_len, ui
return RTP_NOT_INITIALIZED;
}
if (!sender_)
return RTP_NOT_SUPPORTED;
rtp_->set_timestamp(ts);
ret = sender_->push_frame(data, data_len, flags);
ret = media_->push_frame(data, data_len, flags);
rtp_->set_timestamp(INVALID_TS);
return ret;
@ -287,11 +272,8 @@ rtp_error_t uvg_rtp::media_stream::push_frame(std::unique_ptr<uint8_t[]> data, s
return RTP_NOT_INITIALIZED;
}
if (!sender_)
return RTP_NOT_SUPPORTED;
rtp_->set_timestamp(ts);
ret = sender_->push_frame(std::move(data), data_len, flags);
ret = media_->push_frame(std::move(data), data_len, flags);
rtp_->set_timestamp(INVALID_TS);
return ret;
@ -360,8 +342,6 @@ rtp_error_t uvg_rtp::media_stream::install_deallocation_hook(void (*hook)(void *
if (!hook)
return RTP_INVALID_VALUE;
sender_->install_dealloc_hook(hook);
return RTP_OK;
}

View File

@ -11,7 +11,6 @@
#include "debug.hh"
#include "queue.hh"
#include "random.hh"
#include "sender.hh"
#include "formats/hevc.hh"

View File

@ -13,10 +13,11 @@
/* #include "debug.hh" */
/* #include "formats/generic.hh" */
#include "send.hh"
#include "sender.hh"
/* #include "sender.hh" */
/* #include "util.hh" */
/* #include "sender.hh" */
#if 0
rtp_error_t uvg_rtp::send::send_frame(
uvg_rtp::sender *sender,
uint8_t *frame, size_t frame_len
@ -71,3 +72,4 @@ rtp_error_t uvg_rtp::send::send_frame(
return sender->get_socket().sendto(buffers, 0);
}
#endif

View File

@ -1,156 +0,0 @@
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <arpa/inet.h>
#endif
#if defined(__MINGW32__) || defined(__MINGW64__)
#include "mingw_inet.hh"
using namespace uvg_rtp;
using namespace mingw;
#endif
#include <cstring>
#include <iostream>
#include "debug.hh"
#include "dispatch.hh"
#include "sender.hh"
#include "formats/opus.hh"
#include "formats/hevc.hh"
#include "formats/generic.hh"
uvg_rtp::sender::sender(uvg_rtp::socket& socket, rtp_ctx_conf& conf, rtp_format_t fmt, uvg_rtp::rtp *rtp):
socket_(socket),
rtp_(rtp),
conf_(conf),
fmt_(fmt)
{
}
uvg_rtp::sender::~sender()
{
delete dispatcher_;
delete fqueue_;
}
rtp_error_t uvg_rtp::sender::destroy()
{
if (fmt_ == RTP_FORMAT_HEVC && conf_.flags & RCE_SYSTEM_CALL_DISPATCHER) {
while (dispatcher_->stop() != RTP_OK) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
return RTP_OK;
}
rtp_error_t uvg_rtp::sender::init()
{
rtp_error_t ret = RTP_OK;
ssize_t buf_size = 4 * 1000 * 1000;
if ((ret = socket_.setsockopt(SOL_SOCKET, SO_SNDBUF, (const char *)&buf_size, sizeof(int))) != RTP_OK)
return ret;
#ifndef _WIN32
if (fmt_ == RTP_FORMAT_HEVC && conf_.flags & RCE_SYSTEM_CALL_DISPATCHER) {
dispatcher_ = new uvg_rtp::dispatcher(&socket_);
fqueue_ = new uvg_rtp::frame_queue(fmt_, dispatcher_);
if (dispatcher_)
dispatcher_->start();
} else {
#endif
fqueue_ = new uvg_rtp::frame_queue(fmt_);
dispatcher_ = nullptr;
#ifndef _WIN32
}
#endif
return ret;
}
rtp_error_t uvg_rtp::sender::__push_frame(uint8_t *data, size_t data_len, int flags)
{
switch (fmt_) {
case RTP_FORMAT_HEVC:
return uvg_rtp::hevc::push_frame(this, data, data_len, flags);
case RTP_FORMAT_OPUS:
return uvg_rtp::opus::push_frame(this, data, data_len, flags);
default:
LOG_DEBUG("Format not recognized, pushing the frame as generic");
return uvg_rtp::generic::push_frame(this, data, data_len, flags);
}
}
rtp_error_t uvg_rtp::sender::__push_frame(std::unique_ptr<uint8_t[]> data, size_t data_len, int flags)
{
switch (fmt_) {
case RTP_FORMAT_HEVC:
return uvg_rtp::hevc::push_frame(this, std::move(data), data_len, flags);
case RTP_FORMAT_OPUS:
return uvg_rtp::opus::push_frame(this, std::move(data), data_len, flags);
default:
LOG_DEBUG("Format not recognized, pushing the frame as generic");
return uvg_rtp::generic::push_frame(this, std::move(data), data_len, flags);
}
}
rtp_error_t uvg_rtp::sender::push_frame(uint8_t *data, size_t data_len, int flags)
{
if (flags & RTP_COPY || (conf_.flags & RCE_SRTP && !(conf_.flags & RCE_INPLACE_ENCRYPTION))) {
std::unique_ptr<uint8_t[]> data_ptr = std::unique_ptr<uint8_t[]>(new uint8_t[data_len]);
std::memcpy(data_ptr.get(), data, data_len);
return __push_frame(std::move(data_ptr), data_len, flags);
}
return __push_frame(data, data_len, flags);
}
rtp_error_t uvg_rtp::sender::push_frame(std::unique_ptr<uint8_t[]> data, size_t data_len, int flags)
{
std::unique_ptr<uint8_t[]> data_ptr = std::move(data);
if (flags & RTP_COPY || (conf_.flags & RCE_SRTP && !(conf_.flags & RCE_INPLACE_ENCRYPTION))) {
data_ptr = std::unique_ptr<uint8_t[]>(new uint8_t[data_len]);
std::memcpy(data_ptr.get(), data.get(), data_len);
}
return __push_frame(std::move(data_ptr), data_len, flags);
}
uvg_rtp::frame_queue *uvg_rtp::sender::get_frame_queue()
{
return fqueue_;
}
uvg_rtp::socket& uvg_rtp::sender::get_socket()
{
return socket_;
}
uvg_rtp::rtp *uvg_rtp::sender::get_rtp_ctx()
{
return rtp_;
}
void uvg_rtp::sender::install_dealloc_hook(void (*dealloc_hook)(void *))
{
if (!fqueue_)
return;
fqueue_->install_dealloc_hook(dealloc_hook);
}
rtp_ctx_conf& uvg_rtp::sender::get_conf()
{
return conf_;
}