uvgrtp-base/src/media_stream.cc

1151 lines
37 KiB
C++

#include "uvgrtp/media_stream.hh"
#include "uvgrtp/rtcp.hh"
#include "formats/h264.hh"
#include "formats/h265.hh"
#include "formats/h266.hh"
#include "debug.hh"
#include "random.hh"
#include "rtp.hh"
#include "zrtp.hh"
#include "socket.hh"
#include "rtcp_reader.hh"
#include "holepuncher.hh"
#include "reception_flow.hh"
#include "srtp/srtcp.hh"
#include "srtp/srtp.hh"
#include "formats/media.hh"
#include "global.hh"
#include "socketfactory.hh"
#ifdef _WIN32
#include <Ws2tcpip.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#endif
#include <cstring>
#include <errno.h>
uvgrtp::media_stream::media_stream(std::string cname, std::string remote_addr,
std::string local_addr, uint16_t src_port, uint16_t dst_port, rtp_format_t fmt,
std::shared_ptr<uvgrtp::socketfactory> sfp, int rce_flags) :
key_(uvgrtp::random::generate_32()),
srtp_(nullptr),
srtcp_(nullptr),
socket_(nullptr),
rtp_(nullptr),
rtcp_(nullptr),
sfp_(sfp),
remote_sockaddr_(),
remote_sockaddr_ip6_(),
remote_address_(remote_addr),
local_address_(local_addr),
src_port_(src_port),
dst_port_(dst_port),
ipv6_(false),
fmt_(fmt),
new_socket_(false),
rce_flags_(rce_flags),
initialized_(false),
rtp_handler_key_(0),
zrtp_handler_key_(0),
reception_flow_(nullptr),
media_(nullptr),
holepuncher_(nullptr),
cname_(cname),
fps_numerator_(30),
fps_denominator_(1),
ssrc_(std::make_shared<std::atomic<std::uint32_t>>(uvgrtp::random::generate_32())),
remote_ssrc_(std::make_shared<std::atomic<std::uint32_t>>(0)),
snd_buf_size_(-1),
rcv_buf_size_(-1)
{
}
uvgrtp::media_stream::~media_stream()
{
// TODO: I would take a close look at what happens when pull_frame is called
// and media stream is destroyed. Note that this is the only way to stop pull
// frame without waiting
if ((rce_flags_ & RCE_RTCP) && rtcp_)
{
rtcp_->stop();
}
// Clear this media stream from the reception_flow
if ( reception_flow_ && (reception_flow_->clear_stream_from_flow(remote_ssrc_)) == 1) {
reception_flow_->stop();
if (sfp_) {
sfp_->clear_port(src_port_, socket_);
}
}
(void)free_resources(RTP_OK);
}
rtp_error_t uvgrtp::media_stream::init_connection()
{
rtp_error_t ret = RTP_GENERIC_ERROR;
ipv6_ = sfp_->get_ipv6();
// First check if the given local address is a multicast address. If it is, the streams automatically gets its own socket,
// regardless of any socket multiplexing measures
sockaddr_in6 multicast_sockaddr6_;
sockaddr_in multicast_sockaddr_;
bool multicast = false;
if (ipv6_ && src_port_ != 0 && local_address_ != "") {
multicast_sockaddr6_ = uvgrtp::socket::create_ip6_sockaddr(local_address_, src_port_);
if (uvgrtp::socket::is_multicast(multicast_sockaddr6_)) {
socket_ = sfp_->create_new_socket(2);
new_socket_ = true;
multicast = true;
}
}
else if (src_port_ != 0 && local_address_ != "") {
multicast_sockaddr_ = uvgrtp::socket::create_sockaddr(AF_INET, local_address_, src_port_);
if (uvgrtp::socket::is_multicast(multicast_sockaddr_)) {
socket_ = sfp_->create_new_socket(2);
new_socket_ = true;
multicast = true;
}
}
// If the given local address is not a multicast address, either create a new socket or fetch the existing
// socket if socket multiplexing is used
// Source port is given and is not in use -> create new socket
if (!multicast && src_port_ != 0 && !sfp_->is_port_in_use(src_port_)) {
socket_ = sfp_->create_new_socket(2);
new_socket_ = true;
}
// Source port is in use -> fetch the existing socket
else if (!multicast) {
socket_ = sfp_->get_socket_ptr(src_port_);
if (!socket_) {
socket_ = sfp_->create_new_socket(2);
new_socket_ = true;
}
}
//if (!(rce_flags_ & RCE_RECEIVE_ONLY) && remote_address_ != "" && dst_port_ != 0)
if (remote_address_ != "" && dst_port_ != 0)
{
// no reason to fail sending even if binding fails so we set remote address first
if (ipv6_) {
remote_sockaddr_ip6_ = uvgrtp::socket::create_ip6_sockaddr(remote_address_, dst_port_);
}
else {
remote_sockaddr_ = uvgrtp::socket::create_sockaddr(AF_INET, remote_address_, dst_port_);
}
holepuncher_ = std::unique_ptr<uvgrtp::holepuncher>(new uvgrtp::holepuncher(socket_));
holepuncher_->set_remote_address(remote_sockaddr_, remote_sockaddr_ip6_);
}
if (rce_flags_ & RCE_RECEIVE_ONLY) {
UVG_LOG_INFO("Sending disabled for this stream");
}
if (!(rce_flags_ & RCE_SEND_ONLY))
{
if (local_address_ != "" && src_port_ != 0) {
if ((ret = sfp_->bind_socket(socket_, src_port_)) != RTP_OK)
{
log_platform_error("bind(2) failed");
return ret;
}
}
else if (src_port_ != 0)
{
if ((ret = sfp_->bind_socket_anyip(socket_, src_port_)) != RTP_OK)
{
log_platform_error("bind(2) to any failed");
return ret;
}
}
else
{
UVG_LOG_INFO("Not binding, receiving is not possible");
}
}
else
{
UVG_LOG_INFO("Not binding, receiving is not possible");
}
/* Set the default UDP send/recv buffer sizes to 4MB as on Windows
* the default size is way too small for a larger video conference */
int buf_size = 4 * 1024 * 1024;
if ((ret = socket_->setsockopt(SOL_SOCKET, SO_SNDBUF, (const char*)&buf_size, sizeof(int))) != RTP_OK)
{
return ret;
}
if ((ret = socket_->setsockopt(SOL_SOCKET, SO_RCVBUF, (const char*)&buf_size, sizeof(int))) != RTP_OK)
{
return ret;
}
return ret;
}
rtp_error_t uvgrtp::media_stream::create_media(rtp_format_t fmt)
{
switch (fmt) {
case RTP_FORMAT_H264:
{
uvgrtp::formats::h264* format_264 = new uvgrtp::formats::h264(socket_, rtp_, rce_flags_);
reception_flow_->new_install_handler(
5, remote_ssrc_,
std::bind(&uvgrtp::formats::h264::new_packet_handler, format_264, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), nullptr
);
reception_flow_->new_install_getter(remote_ssrc_,
std::bind(&uvgrtp::formats::h264::frame_getter, format_264, std::placeholders::_1));
media_.reset(format_264);
break;
}
case RTP_FORMAT_H265:
{
uvgrtp::formats::h265* format_265 = new uvgrtp::formats::h265(socket_, rtp_, rce_flags_);
reception_flow_->new_install_handler(
5, remote_ssrc_,
std::bind(&uvgrtp::formats::h265::new_packet_handler, format_265, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), nullptr
);
reception_flow_->new_install_getter(remote_ssrc_,
std::bind(&uvgrtp::formats::h265::frame_getter, format_265, std::placeholders::_1));
media_.reset(format_265);
break;
}
case RTP_FORMAT_H266:
{
uvgrtp::formats::h266* format_266 = new uvgrtp::formats::h266(socket_, rtp_, rce_flags_);
reception_flow_->new_install_handler(
5, remote_ssrc_,
std::bind(&uvgrtp::formats::h266::new_packet_handler, format_266, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), nullptr
);
reception_flow_->new_install_getter(remote_ssrc_,
std::bind(&uvgrtp::formats::h266::frame_getter, format_266, std::placeholders::_1));
media_.reset(format_266);
break;
}
case RTP_FORMAT_OPUS:
case RTP_FORMAT_PCMU:
case RTP_FORMAT_GSM:
case RTP_FORMAT_G723:
case RTP_FORMAT_DVI4_32:
case RTP_FORMAT_DVI4_64:
case RTP_FORMAT_LPC:
case RTP_FORMAT_PCMA:
case RTP_FORMAT_G722:
case RTP_FORMAT_L16_STEREO:
case RTP_FORMAT_L16_MONO:
case RTP_FORMAT_G728:
case RTP_FORMAT_DVI4_441:
case RTP_FORMAT_DVI4_882:
case RTP_FORMAT_G729:
case RTP_FORMAT_G726_40:
case RTP_FORMAT_G726_32:
case RTP_FORMAT_G726_24:
case RTP_FORMAT_G726_16:
case RTP_FORMAT_G729D:
case RTP_FORMAT_G729E:
case RTP_FORMAT_GSM_EFR:
case RTP_FORMAT_L8:
case RTP_FORMAT_VDVI:
{
media_ = std::unique_ptr<uvgrtp::formats::media>(new uvgrtp::formats::media(socket_, rtp_, rce_flags_));
reception_flow_->new_install_handler(
5, remote_ssrc_,
std::bind(&uvgrtp::formats::media::new_packet_handler, media_.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), media_->get_media_frame_info());
break;
}
default:
{
UVG_LOG_ERROR("Unknown payload format %u\n", fmt_);
media_ = nullptr;
return RTP_NOT_SUPPORTED;
}
}
// set default values for fps
media_->set_fps(fps_numerator_, fps_denominator_);
return RTP_OK;
}
rtp_error_t uvgrtp::media_stream::free_resources(rtp_error_t ret)
{
if ((rce_flags_ & RCE_HOLEPUNCH_KEEPALIVE) && holepuncher_)
{
holepuncher_->stop();
}
rtcp_ = nullptr;
rtp_ = nullptr;
srtp_ = nullptr;
srtcp_ = nullptr;
reception_flow_ = nullptr;
holepuncher_ = nullptr;
media_ = nullptr;
socket_ = nullptr;
return ret;
}
rtp_error_t uvgrtp::media_stream::init()
{
if (init_connection() != RTP_OK) {
UVG_LOG_ERROR("Failed to initialize the underlying socket");
return free_resources(RTP_GENERIC_ERROR);
}
reception_flow_ = sfp_->get_reception_flow_ptr(socket_);
rtp_ = std::shared_ptr<uvgrtp::rtp>(new uvgrtp::rtp(fmt_, ssrc_, ipv6_));
rtcp_ = std::shared_ptr<uvgrtp::rtcp>(new uvgrtp::rtcp(rtp_, ssrc_, remote_ssrc_, cname_, sfp_, rce_flags_));
socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec);
reception_flow_->new_install_handler(
1, remote_ssrc_,
std::bind(&uvgrtp::rtp::new_packet_handler, rtp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5),
nullptr);
if (rce_flags_ & RCE_RTCP) {
reception_flow_->new_install_handler(
6, remote_ssrc_,
std::bind(&uvgrtp::rtcp::new_recv_packet_handler_common, rtcp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), rtcp_.get());
}
if (rce_flags_ & RCE_RTCP_MUX) {
rtcp_->set_socket(socket_);
reception_flow_->new_install_handler(
2, remote_ssrc_,
std::bind(&uvgrtp::rtcp::new_recv_packet_handler, rtcp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), nullptr);
}
return start_components();
}
rtp_error_t uvgrtp::media_stream::init(std::shared_ptr<uvgrtp::zrtp> zrtp)
{
if (init_connection() != RTP_OK) {
log_platform_error("Failed to initialize the underlying socket");
return RTP_GENERIC_ERROR;
}
reception_flow_ = sfp_->get_reception_flow_ptr(socket_);
rtp_ = std::shared_ptr<uvgrtp::rtp>(new uvgrtp::rtp(fmt_, ssrc_, ipv6_));
bool perform_dh = !(rce_flags_ & RCE_ZRTP_MULTISTREAM_MODE);
if (!perform_dh)
{
UVG_LOG_DEBUG("Sleeping non-DH performing stream until DH has finished");
std::chrono::system_clock::time_point tp = std::chrono::system_clock::now();
while (!zrtp->has_dh_finished())
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - tp).count() > 10)
{
UVG_LOG_ERROR("Giving up on DH after 10 seconds");
return free_resources(RTP_TIMEOUT);
}
}
}
rtp_error_t ret = RTP_OK;
if ((ret = zrtp->init(rtp_->get_ssrc(), socket_, remote_sockaddr_, remote_sockaddr_ip6_, perform_dh, ipv6_)) != RTP_OK) {
UVG_LOG_WARN("Failed to initialize ZRTP for media stream!");
return free_resources(ret);
}
srtp_ = std::shared_ptr<uvgrtp::srtp>(new uvgrtp::srtp(rce_flags_));
if ((ret = init_srtp_with_zrtp(rce_flags_, SRTP, srtp_, zrtp)) != RTP_OK)
return free_resources(ret);
srtcp_ = std::shared_ptr<uvgrtp::srtcp>(new uvgrtp::srtcp());
if ((ret = init_srtp_with_zrtp(rce_flags_, SRTCP, srtcp_, zrtp)) != RTP_OK)
return free_resources(ret);
zrtp->dh_has_finished(); // only after the DH stream has gotten its keys, do we let non-DH stream perform ZRTP
rtcp_ = std::shared_ptr<uvgrtp::rtcp>(new uvgrtp::rtcp(rtp_, ssrc_, remote_ssrc_, cname_, sfp_, srtcp_, rce_flags_));
socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec);
socket_->install_handler(srtp_.get(), srtp_->send_packet_handler);
reception_flow_->new_install_handler(
1, remote_ssrc_,
std::bind(&uvgrtp::rtp::new_packet_handler, rtp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5),
nullptr);
if (rce_flags_ & RCE_RTCP) {
reception_flow_->new_install_handler(
6, remote_ssrc_,
std::bind(&uvgrtp::rtcp::new_recv_packet_handler_common, rtcp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), rtcp_.get());
}
if (rce_flags_ & RCE_RTCP_MUX) {
rtcp_->set_socket(socket_);
reception_flow_->new_install_handler(
2, remote_ssrc_,
std::bind(&uvgrtp::rtcp::new_recv_packet_handler, rtcp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), nullptr);
}
return start_components();
}
rtp_error_t uvgrtp::media_stream::add_srtp_ctx(uint8_t *key, uint8_t *salt)
{
if (!key || !salt)
return RTP_INVALID_VALUE;
unsigned int srtp_rce_flags = RCE_SRTP | RCE_SRTP_KMNGMNT_USER;
rtp_error_t ret = RTP_OK;
if (init_connection() != RTP_OK) {
UVG_LOG_ERROR("Failed to initialize the underlying socket");
return free_resources(RTP_GENERIC_ERROR);
}
reception_flow_ = sfp_->get_reception_flow_ptr(socket_);
if ((rce_flags_ & srtp_rce_flags) != srtp_rce_flags)
return free_resources(RTP_NOT_SUPPORTED);
rtp_ = std::shared_ptr<uvgrtp::rtp> (new uvgrtp::rtp(fmt_, ssrc_, ipv6_));
srtp_ = std::shared_ptr<uvgrtp::srtp> (new uvgrtp::srtp(rce_flags_));
// why are they local and remote key/salt the same?
if ((ret = srtp_->init(SRTP, rce_flags_, key, key, salt, salt)) != RTP_OK) {
UVG_LOG_WARN("Failed to initialize SRTP for media stream!");
return free_resources(ret);
}
srtcp_ = std::shared_ptr<uvgrtp::srtcp> (new uvgrtp::srtcp());
if ((ret = srtcp_->init(SRTCP, rce_flags_, key, key, salt, salt)) != RTP_OK) {
UVG_LOG_WARN("Failed to initialize SRTCP for media stream!");
return free_resources(ret);
}
rtcp_ = std::shared_ptr<uvgrtp::rtcp> (new uvgrtp::rtcp(rtp_, ssrc_, remote_ssrc_, cname_, sfp_, srtcp_, rce_flags_));
socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec);
socket_->install_handler(srtp_.get(), srtp_->send_packet_handler);
reception_flow_->new_install_handler(
1, remote_ssrc_,
std::bind(&uvgrtp::rtp::new_packet_handler, rtp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5),
nullptr);
if (rce_flags_ & RCE_RTCP) {
reception_flow_->new_install_handler(
6, remote_ssrc_,
std::bind(&uvgrtp::rtcp::new_recv_packet_handler_common, rtcp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), rtcp_.get());
}
if (rce_flags_ & RCE_RTCP_MUX) {
rtcp_->set_socket(socket_);
reception_flow_->new_install_handler(
2, remote_ssrc_,
std::bind(&uvgrtp::rtcp::new_recv_packet_handler, rtcp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), nullptr);
}
reception_flow_->new_install_handler(
4, remote_ssrc_,
std::bind(&uvgrtp::srtp::new_recv_packet_handler, srtp_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5), srtp_.get());
return start_components();
}
rtp_error_t uvgrtp::media_stream::start_components()
{
if (create_media(fmt_) != RTP_OK)
return free_resources(RTP_MEMORY_ERROR);
if (rce_flags_ & RCE_HOLEPUNCH_KEEPALIVE) {
holepuncher_->start();
}
if (rce_flags_ & RCE_RTCP) {
if (remote_address_ == "" ||
src_port_ == 0 ||
dst_port_ == 0)
{
UVG_LOG_ERROR("Using RTCP requires setting at least remote address, local port and remote port");
}
else
{
if (!(rce_flags_ & RCE_RTCP_MUX)) {
rtcp_->set_network_addresses(local_address_, remote_address_, src_port_ + 1, dst_port_ + 1, ipv6_);
}
else {
rtcp_->set_network_addresses(local_address_, remote_address_, src_port_, dst_port_, ipv6_);
}
rtcp_->add_initial_participant(rtp_->get_clock_rate());
bandwidth_ = get_default_bandwidth_kbps(fmt_);
rtcp_->set_session_bandwidth(bandwidth_);
uint16_t rtcp_port = src_port_ + 1;
std::shared_ptr<uvgrtp::socket> rtcp_socket;
std::shared_ptr<uvgrtp::rtcp_reader> rtcp_reader;
if (!(rce_flags_ & RCE_RTCP_MUX)) {
// If RTCP is not multiplexed with RTP, configure the socket for RTCP:
// 1. If RTCP port is not in use -> create new socket
// 2. Install an RTCP reader into socketfactory
// 3. In RTCP reader, map our RTCP object to the REMOTE ssrc of this stream. If we are not doing
// any socket multiplexing, it will be 0 by default
if (!sfp_->is_port_in_use(rtcp_port)) {
rtcp_socket = sfp_->create_new_socket(1);
rtcp_reader = sfp_->install_rtcp_reader(rtcp_port);
rtcp_reader->set_socket(rtcp_socket);
rtcp_->set_socket(rtcp_socket);
rtcp_reader->map_ssrc_to_rtcp(remote_ssrc_, rtcp_);
}
// 1. Source port is in use -> fetch the existing socket
// 2. Fetch the existing RTCP reader from socketfactory
// 3. In RTCP reader, map our RTCP object to the REMOTE ssrc of this stream. In this case, the remote
// ssrc should be manually set to allow multiplexing
else {
rtcp_socket = sfp_->get_socket_ptr(rtcp_port);
if (!rtcp_socket) {
// This should not ever happen. However if it does, you could just create a new socket like above
UVG_LOG_ERROR("No RTCP socket found");
return RTP_GENERIC_ERROR;
}
rtcp_->set_socket(rtcp_socket);
rtcp_reader = sfp_->get_rtcp_reader(rtcp_port);
rtcp_reader->map_ssrc_to_rtcp(remote_ssrc_, rtcp_);
}
}
rtcp_->start();
}
}
if (rce_flags_ & RCE_SRTP_AUTHENTICATE_RTP) {
if (ipv6_) {
rtp_->set_payload_size(MAX_IPV6_MEDIA_PAYLOAD - UVG_AUTH_TAG_LENGTH);
}
else {
rtp_->set_payload_size(MAX_IPV4_MEDIA_PAYLOAD - UVG_AUTH_TAG_LENGTH);
}
}
initialized_ = true;
return reception_flow_->start(socket_, rce_flags_);
}
rtp_error_t uvgrtp::media_stream::push_frame(uint8_t *data, size_t data_len, int rtp_flags)
{
rtp_error_t ret = check_push_preconditions(rtp_flags, false);
if (ret == RTP_OK)
{
if (rce_flags_ & RCE_HOLEPUNCH_KEEPALIVE)
holepuncher_->notify();
if (rtp_flags & RTP_COPY)
{
data = copy_frame(data, data_len);
std::unique_ptr<uint8_t[]> data_copy(data);
ret = media_->push_frame(remote_sockaddr_, remote_sockaddr_ip6_, std::move(data_copy), data_len, rtp_flags);
}
else
{
ret = media_->push_frame(remote_sockaddr_, remote_sockaddr_ip6_, data, data_len, rtp_flags);
}
}
return ret;
}
rtp_error_t uvgrtp::media_stream::push_frame(std::unique_ptr<uint8_t[]> data, size_t data_len, int rtp_flags)
{
rtp_error_t ret = check_push_preconditions(rtp_flags, true);
if (ret == RTP_OK)
{
if (rce_flags_ & RCE_HOLEPUNCH_KEEPALIVE)
holepuncher_->notify();
// making a copy of a smart pointer does not make sense
ret = media_->push_frame(remote_sockaddr_, remote_sockaddr_ip6_, std::move(data), data_len, rtp_flags);
}
return ret;
}
rtp_error_t uvgrtp::media_stream::push_frame(uint8_t *data, size_t data_len, uint32_t ts, int rtp_flags)
{
rtp_error_t ret = check_push_preconditions(rtp_flags, false);
if (ret == RTP_OK)
{
if (rce_flags_ & RCE_HOLEPUNCH_KEEPALIVE)
holepuncher_->notify();
rtp_->set_timestamp(ts);
if (rtp_flags & RTP_COPY)
{
data = copy_frame(data, data_len);
std::unique_ptr<uint8_t[]> data_copy(data);
ret = media_->push_frame(remote_sockaddr_, remote_sockaddr_ip6_, std::move(data_copy), data_len, rtp_flags);
}
else
{
ret = media_->push_frame(remote_sockaddr_, remote_sockaddr_ip6_, data, data_len, rtp_flags);
}
rtp_->set_timestamp(INVALID_TS);
}
return ret;
}
rtp_error_t uvgrtp::media_stream::push_frame(uint8_t* data, size_t data_len, uint32_t ts, uint64_t ntp_ts, int rtp_flags)
{
rtp_error_t ret = check_push_preconditions(rtp_flags, false);
if (ret == RTP_OK)
{
if (rce_flags_ & RCE_HOLEPUNCH_KEEPALIVE)
holepuncher_->notify();
rtp_->set_timestamp(ts);
rtp_->set_sampling_ntp(ntp_ts);
if (rtp_flags & RTP_COPY)
{
data = copy_frame(data, data_len);
std::unique_ptr<uint8_t[]> data_copy(data);
ret = media_->push_frame(remote_sockaddr_, remote_sockaddr_ip6_, std::move(data_copy), data_len, rtp_flags);
}
else
{
ret = media_->push_frame(remote_sockaddr_, remote_sockaddr_ip6_, data, data_len, rtp_flags);
}
rtp_->set_timestamp(INVALID_TS);
}
return ret;
}
rtp_error_t uvgrtp::media_stream::push_frame(std::unique_ptr<uint8_t[]> data, size_t data_len, uint32_t ts, int rtp_flags)
{
rtp_error_t ret = check_push_preconditions(rtp_flags, true);
if (ret == RTP_OK)
{
if (rce_flags_ & RCE_HOLEPUNCH_KEEPALIVE)
holepuncher_->notify();
// making a copy of a smart pointer does not make sense
rtp_->set_timestamp(ts);
ret = media_->push_frame(remote_sockaddr_, remote_sockaddr_ip6_, std::move(data), data_len, rtp_flags);
rtp_->set_timestamp(INVALID_TS);
}
return ret;
}
rtp_error_t uvgrtp::media_stream::push_frame(std::unique_ptr<uint8_t[]> data, size_t data_len, uint32_t ts, uint64_t ntp_ts, int rtp_flags)
{
rtp_error_t ret = check_push_preconditions(rtp_flags, true);
if (ret == RTP_OK)
{
if (rce_flags_ & RCE_HOLEPUNCH_KEEPALIVE)
holepuncher_->notify();
// making a copy of a smart pointer does not make sense
rtp_->set_timestamp(ts);
rtp_->set_sampling_ntp(ntp_ts);
ret = media_->push_frame(remote_sockaddr_, remote_sockaddr_ip6_, std::move(data), data_len, rtp_flags);
rtp_->set_timestamp(INVALID_TS);
}
return ret;
}
/* ----------- User packets not yet supported -----------
rtp_error_t uvgrtp::media_stream::send_user_packet(uint8_t* data, uint32_t payload_size,
std::string remote_address, uint16_t port)
{
sockaddr_in6 addr6;
sockaddr_in addr;
if (ipv6_) {
addr6 = uvgrtp::socket::create_ip6_sockaddr(remote_address, port);
}
else {
addr = uvgrtp::socket::create_sockaddr(AF_INET, remote_address, port);
}
UVG_LOG_DEBUG("Sending user packet");
return socket_->sendto(addr, addr6, data, payload_size, 0);
}
rtp_error_t uvgrtp::media_stream::install_user_hook(void* arg, void (*hook)(void*, uint8_t* payload))
{
if (!initialized_) {
UVG_LOG_ERROR("RTP context has not been initialized fully, cannot continue!");
return RTP_NOT_INITIALIZED;
}
if (!hook)
return RTP_INVALID_VALUE;
return reception_flow_->install_user_hook(arg, hook);;
}*/
uvgrtp::frame::rtp_frame *uvgrtp::media_stream::pull_frame()
{
if (!check_pull_preconditions()) {
return nullptr;
}
// If the remote_ssrc is set, only pull frames that come from this ssrc
if (remote_ssrc_.get()->load() != 0) {
return reception_flow_->pull_frame(remote_ssrc_);
}
return reception_flow_->pull_frame();
}
uvgrtp::frame::rtp_frame *uvgrtp::media_stream::pull_frame(size_t timeout_ms)
{
if (!check_pull_preconditions()) {
return nullptr;
}
// If the remote_ssrc is set, only pull frames that come from this ssrc
if (remote_ssrc_.get()->load() != 0) {
return reception_flow_->pull_frame(timeout_ms, remote_ssrc_);
}
return reception_flow_->pull_frame(timeout_ms);
}
bool uvgrtp::media_stream::check_pull_preconditions()
{
if (!initialized_) {
UVG_LOG_ERROR("RTP context has not been initialized fully, cannot continue!");
rtp_errno = RTP_NOT_INITIALIZED;
return false;
}
return true;
}
rtp_error_t uvgrtp::media_stream::check_push_preconditions(int rtp_flags, bool smart_pointer)
{
if (!initialized_) {
UVG_LOG_ERROR("RTP context has not been initialized fully, cannot continue!");
return RTP_NOT_INITIALIZED;
}
if (remote_address_ == "" && dst_port_ != 0)
{
UVG_LOG_ERROR("Cannot push frame if remote address and port have not been provided!");
return RTP_INVALID_VALUE;
}
if (rtp_flags & RTP_OBSOLETE)
{
UVG_LOG_WARN("Detected an obsolete RTP flag, consider updating your flags");
}
if (rce_flags_ & RCE_RECEIVE_ONLY)
{
UVG_LOG_WARN("Stream is designated as RECEIVE ONLY");
return RTP_GENERIC_ERROR;
}
if (smart_pointer && (rtp_flags & RTP_COPY))
{
UVG_LOG_ERROR("Copying a smart pointer does not make sense since the original would be lost");
}
return RTP_OK;
}
uint8_t* uvgrtp::media_stream::copy_frame(uint8_t* original, size_t data_len)
{
uint8_t* copy = new uint8_t[data_len];
memcpy(copy, original, data_len);
return copy;
}
rtp_error_t uvgrtp::media_stream::install_receive_hook(void *arg, void (*hook)(void *, uvgrtp::frame::rtp_frame *))
{
if (!initialized_) {
UVG_LOG_ERROR("RTP context has not been initialized fully, cannot continue!");
return RTP_NOT_INITIALIZED;
}
if (!hook) {
return RTP_INVALID_VALUE;
}
return reception_flow_->install_receive_hook(arg, hook, remote_ssrc_);
}
rtp_error_t uvgrtp::media_stream::configure_ctx(int rcc_flag, ssize_t value)
{
rtp_error_t ret = RTP_OK;
if (rcc_flag == RCC_SSRC) {
if (value <= 0 || value > (ssize_t)UINT32_MAX)
return RTP_INVALID_VALUE;
*ssrc_ = (uint32_t)value;
return ret;
}
else if (rcc_flag == RCC_REMOTE_SSRC) {
if (value <= 0 || value > (ssize_t)UINT32_MAX)
return RTP_INVALID_VALUE;
*remote_ssrc_ = (uint32_t)value;
return ret;
}
if (!initialized_) {
UVG_LOG_ERROR("RTP context has not been initialized fully, cannot continue!");
return RTP_NOT_INITIALIZED;
}
switch (rcc_flag) {
case RCC_UDP_SND_BUF_SIZE: {
if (value <= 0)
return RTP_INVALID_VALUE;
int buf_size = (int)value;
snd_buf_size_ = buf_size;
ret = socket_->setsockopt(SOL_SOCKET, SO_SNDBUF, (const char*)&buf_size, sizeof(int));
break;
}
case RCC_UDP_RCV_BUF_SIZE: {
if (value <= 0)
return RTP_INVALID_VALUE;
int buf_size = (int)value;
rcv_buf_size_ = buf_size;
ret = socket_->setsockopt(SOL_SOCKET, SO_RCVBUF, (const char*)&buf_size, sizeof(int));
break;
}
case RCC_RING_BUFFER_SIZE: {
if (value <= 0)
return RTP_INVALID_VALUE;
reception_flow_->set_buffer_size(value);
break;
}
case RCC_PKT_MAX_DELAY: {
if (value <= 0)
return RTP_INVALID_VALUE;
rtp_->set_pkt_max_delay(value);
break;
}
case RCC_DYN_PAYLOAD_TYPE: {
if (value <= 0 || (ssize_t)UINT8_MAX < value)
return RTP_INVALID_VALUE;
rtp_->set_dynamic_payload((uint8_t)value);
break;
}
case RCC_CLOCK_RATE: {
if (value <= 0 || (ssize_t)UINT32_MAX < value)
return RTP_INVALID_VALUE;
rtp_->set_clock_rate((uint32_t)value);
break;
}
case RCC_MTU_SIZE: {
ssize_t hdr = IPV4_HDR_SIZE + UDP_HDR_SIZE + RTP_HDR_SIZE;
if (rce_flags_ & RCE_SRTP_AUTHENTICATE_RTP)
hdr += UVG_AUTH_TAG_LENGTH;
if (value <= hdr)
return RTP_INVALID_VALUE;
if (value > (ssize_t)UINT16_MAX) {
UVG_LOG_ERROR("Payload size (%zd) is larger than maximum UDP datagram size (%u)",
value, UINT16_MAX);
return RTP_INVALID_VALUE;
}
rtp_->set_payload_size(value - hdr);
// auth tag is always included with SRTP and RTCP has a header for each packet within a compound frame
rtcp_->set_payload_size( value - (IPV4_HDR_SIZE + UDP_HDR_SIZE));
reception_flow_->set_payload_size(value - (IPV4_HDR_SIZE + UDP_HDR_SIZE)); // largest packet we can get from socket
break;
}
case RCC_FPS_NUMERATOR: {
fps_numerator_ = value;
if (value > 0 && (rce_flags_ & RCE_SYSTEM_CALL_CLUSTERING)) {
UVG_LOG_WARN("Setting FPS numerator will disable System Call Clustering (SCC)");
}
media_->set_fps(fps_numerator_, fps_denominator_);
break;
}
case RCC_FPS_DENOMINATOR: {
fps_denominator_ = value;
if (value > 0 && (rce_flags_ & RCE_SYSTEM_CALL_CLUSTERING)) {
UVG_LOG_WARN("Setting FPS denominator will disable System Call Clustering (SCC)");
}
media_->set_fps(fps_numerator_, fps_denominator_);
break;
}
case RCC_SESSION_BANDWIDTH: {
bandwidth_ = (uint32_t)value;
// TODO: Is there a max value for bandwidth?
if (value <= 0) {
UVG_LOG_WARN("Bandwidth cannot be negative");
return RTP_INVALID_VALUE;
}
if (rtcp_) {
rtcp_->set_session_bandwidth(bandwidth_);
}
break;
}
case RCC_SSRC: {
if (value <= 0 || value > (ssize_t)UINT32_MAX)
return RTP_INVALID_VALUE;
*ssrc_ = (uint32_t)value;
break;
}
case RCC_REMOTE_SSRC: {
if (value <= 0 || value > (ssize_t)UINT32_MAX)
return RTP_INVALID_VALUE;
*remote_ssrc_ = (uint32_t)value;
break;
}
default:
return RTP_INVALID_VALUE;
}
return ret;
}
int uvgrtp::media_stream::get_configuration_value(int rcc_flag)
{
int ret = -1;
if (rcc_flag == RCC_SSRC) {
return ssrc_.get()->load();
}
else if (rcc_flag == RCC_REMOTE_SSRC) {
return remote_ssrc_.get()->load();
}
if (!initialized_) {
UVG_LOG_ERROR("RTP context has not been initialized fully, cannot continue!");
return RTP_NOT_INITIALIZED;
}
switch (rcc_flag) {
case RCC_UDP_SND_BUF_SIZE: {
return snd_buf_size_;
}
case RCC_UDP_RCV_BUF_SIZE: {
return rcv_buf_size_;
}
case RCC_RING_BUFFER_SIZE: {
return (int)reception_flow_->get_buffer_size();
}
case RCC_PKT_MAX_DELAY: {
return (int)rtp_->get_pkt_max_delay();
}
case RCC_DYN_PAYLOAD_TYPE: {
return (int)rtp_->get_dynamic_payload();
}
case RCC_CLOCK_RATE: {
return (int)rtp_->get_clock_rate();
}
case RCC_MTU_SIZE: {
return (int)rtp_->get_payload_size();
}
case RCC_FPS_NUMERATOR: {
return (int)fps_numerator_;
}
case RCC_FPS_DENOMINATOR: {
return (int)fps_denominator_;
}
case RCC_SESSION_BANDWIDTH: {
return (int)bandwidth_;
}
default:
ret = -1;
}
}
uint32_t uvgrtp::media_stream::get_key() const
{
return key_;
}
uvgrtp::rtcp *uvgrtp::media_stream::get_rtcp()
{
return rtcp_.get();
}
uint32_t uvgrtp::media_stream::get_ssrc() const
{
if (!initialized_ || rtp_ == nullptr) {
UVG_LOG_ERROR("RTP context has not been initialized, please call init before asking ssrc!");
return 0;
}
return *ssrc_.get();
}
rtp_error_t uvgrtp::media_stream::init_srtp_with_zrtp(int rce_flags, int type, std::shared_ptr<uvgrtp::base_srtp> srtp,
std::shared_ptr<uvgrtp::zrtp> zrtp)
{
uint32_t key_size = srtp->get_key_size(rce_flags);
uint8_t* local_key = new uint8_t[key_size];
uint8_t* remote_key = new uint8_t[key_size];
uint8_t local_salt[UVG_SALT_LENGTH];
uint8_t remote_salt[UVG_SALT_LENGTH];
rtp_error_t ret = zrtp->get_srtp_keys(
local_key, key_size * 8,
remote_key, key_size * 8,
local_salt, UVG_SALT_LENGTH * 8,
remote_salt, UVG_SALT_LENGTH * 8
);
if (ret == RTP_OK)
{
ret = srtp->init(type, rce_flags, local_key, remote_key,
local_salt, remote_salt);
}
else
{
UVG_LOG_WARN("Failed to initialize SRTP for media stream!");
}
delete[] local_key;
delete[] remote_key;
return ret;
}
uint32_t uvgrtp::media_stream::get_default_bandwidth_kbps(rtp_format_t fmt)
{
int bandwidth = 50;
switch (fmt) {
case RTP_FORMAT_PCMU:
case RTP_FORMAT_PCMA:
case RTP_FORMAT_L8: // L8 bitrate depends on sampling rate
bandwidth = 64;
break;
case RTP_FORMAT_G723:
bandwidth = 6;
break;
case RTP_FORMAT_DVI4_32:
bandwidth = 32;
break;
case RTP_FORMAT_DVI4_64:
bandwidth = 64;
break;
case RTP_FORMAT_LPC:
bandwidth = 6;
break;
case RTP_FORMAT_GSM:
bandwidth = 13;
break;
case RTP_FORMAT_G722:
bandwidth = 64;
break;
case RTP_FORMAT_L16_STEREO:
bandwidth = 1411;
break;
case RTP_FORMAT_L16_MONO:
bandwidth = 706;
break;
case RTP_FORMAT_G728:
bandwidth = 16;
break;
case RTP_FORMAT_DVI4_441:
bandwidth = 44;
break;
case RTP_FORMAT_DVI4_882:
bandwidth = 88;
break;
case RTP_FORMAT_G729:
bandwidth = 8;
break;
case RTP_FORMAT_G726_40:
bandwidth = 40;
break;
case RTP_FORMAT_G726_32:
bandwidth = 32;
break;
case RTP_FORMAT_G726_24:
bandwidth = 24;
break;
case RTP_FORMAT_G726_16:
bandwidth = 16;
break;
case RTP_FORMAT_G729D:
bandwidth = 6;
break;
case RTP_FORMAT_G729E:
bandwidth = 11;
break;
case RTP_FORMAT_GSM_EFR:
bandwidth = 12;
break;
case RTP_FORMAT_VDVI:
bandwidth = 25;
break;
case RTP_FORMAT_H264:
bandwidth = 6000;
break;
case RTP_FORMAT_H265:
bandwidth = 3000;
break;
case RTP_FORMAT_H266:
bandwidth = 2000;
break;
case RTP_FORMAT_OPUS:
bandwidth = 24;
break;
default:
UVG_LOG_WARN("Unknown RTP format, setting session bandwidth to 64 kbps");
bandwidth = 64;
break;
}
return bandwidth;
}