From c68bc0a963459540e6b3153601e8237e705724bc Mon Sep 17 00:00:00 2001 From: Heikki Tampio Date: Wed, 3 May 2023 10:11:15 +0300 Subject: [PATCH] multiplex: Add function add_zrtp_ctx You can start ZRTP initialization with this function. Used like add_srtp_ctx(). --- include/uvgrtp/media_stream.hh | 3 + src/media_stream.cc | 137 ++++++++++++++++++--------------- src/rtcp.cc | 2 +- 3 files changed, 77 insertions(+), 65 deletions(-) diff --git a/include/uvgrtp/media_stream.hh b/include/uvgrtp/media_stream.hh index d6ba0d8..46af012 100644 --- a/include/uvgrtp/media_stream.hh +++ b/include/uvgrtp/media_stream.hh @@ -372,6 +372,8 @@ namespace uvgrtp { */ uint32_t get_ssrc() const; + bool start_receiving(); + private: /* Initialize the connection by initializing the socket * and binding ourselves to specified interface and creating @@ -403,6 +405,7 @@ namespace uvgrtp { std::shared_ptr socket_; std::shared_ptr rtp_; std::shared_ptr rtcp_; + std::shared_ptr zrtp_; std::shared_ptr sfp_; diff --git a/src/media_stream.cc b/src/media_stream.cc index 1099fd8..b80fa5f 100644 --- a/src/media_stream.cc +++ b/src/media_stream.cc @@ -39,6 +39,7 @@ uvgrtp::media_stream::media_stream(std::string cname, std::string remote_addr, socket_(nullptr), rtp_(nullptr), rtcp_(nullptr), + zrtp_(nullptr), sfp_(sfp), remote_sockaddr_(), remote_sockaddr_ip6_(), @@ -300,68 +301,13 @@ rtp_error_t uvgrtp::media_stream::init() rtp_error_t uvgrtp::media_stream::init(std::shared_ptr zrtp) { - if (init_connection() != RTP_OK) { - log_platform_error("Failed to initialize the underlying socket"); + if (!zrtp) { + UVG_LOG_ERROR("No ZRTP found"); return RTP_GENERIC_ERROR; } - if (!new_socket_) { - reception_flow_ = sfp_->get_reception_flow_ptr(socket_); - } - else { - reception_flow_ = sfp_->install_reception_flow(socket_); - } + zrtp_ = zrtp; - rtp_ = std::shared_ptr (new uvgrtp::rtp(fmt_, ssrc_, ipv6_)); - - bool perform_dh = !(rce_flags_ & RCE_ZRTP_MULTISTREAM_MODE); - if (!perform_dh) - { - UVG_LOG_DEBUG("Sleeping non-DH performing stream until DH has finished"); - std::chrono::system_clock::time_point tp = std::chrono::system_clock::now(); - - while (!zrtp->has_dh_finished()) - { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - - if (std::chrono::duration_cast(std::chrono::system_clock::now() - tp).count() > 10) - { - UVG_LOG_ERROR("Giving up on DH after 10 seconds"); - return free_resources(RTP_TIMEOUT); - } - } - } - - rtp_error_t ret = RTP_OK; - if ((ret = zrtp->init(rtp_->get_ssrc(), socket_, remote_sockaddr_, remote_sockaddr_ip6_, perform_dh, ipv6_)) != RTP_OK) { - UVG_LOG_WARN("Failed to initialize ZRTP for media stream!"); - // TÄMÄ FAILAA, koska ei handlereita?? korjaus: siirrä handlerit tän eteen - return free_resources(ret); - } - - srtp_ = std::shared_ptr(new uvgrtp::srtp(rce_flags_)); - if ((ret = init_srtp_with_zrtp(rce_flags_, SRTP, srtp_, zrtp)) != RTP_OK) - return free_resources(ret); - - srtcp_ = std::shared_ptr (new uvgrtp::srtcp()); - if ((ret = init_srtp_with_zrtp(rce_flags_, SRTCP, srtcp_, zrtp)) != RTP_OK) - return free_resources(ret); - - zrtp->dh_has_finished(); // only after the DH stream has gotten its keys, do we let non-DH stream perform ZRTP - - rtcp_ = std::shared_ptr (new uvgrtp::rtcp(rtp_, ssrc_, cname_, sfp_, srtcp_, rce_flags_)); - - socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec); - socket_->install_handler(srtp_.get(), srtp_->send_packet_handler); - - rtp_handler_key_ = reception_flow_->install_handler(rtp_->packet_handler); - zrtp_handler_key_ = reception_flow_->install_handler(zrtp->packet_handler); - - reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_); - reception_flow_->map_handler_key(zrtp_handler_key_, remote_ssrc_); - - reception_flow_->install_aux_handler(rtp_handler_key_, rtcp_.get(), rtcp_->recv_packet_handler, nullptr); - reception_flow_->install_aux_handler(rtp_handler_key_, srtp_.get(), srtp_->recv_packet_handler, nullptr); - return start_components(); + return RTP_OK; } rtp_error_t uvgrtp::media_stream::add_srtp_ctx(uint8_t *key, uint8_t *salt) @@ -421,6 +367,14 @@ rtp_error_t uvgrtp::media_stream::add_srtp_ctx(uint8_t *key, uint8_t *salt) rtp_error_t uvgrtp::media_stream::add_zrtp_ctx() { + if (!zrtp_) { + UVG_LOG_ERROR("No ZRTP found"); + return RTP_GENERIC_ERROR; + } + unsigned int srtp_rce_flags = RCE_SRTP | RCE_SRTP_KMNGMNT_ZRTP; + if ((rce_flags_ & srtp_rce_flags) != srtp_rce_flags) + return free_resources(RTP_NOT_SUPPORTED); + if (init_connection() != RTP_OK) { log_platform_error("Failed to initialize the underlying socket"); return RTP_GENERIC_ERROR; @@ -432,6 +386,56 @@ rtp_error_t uvgrtp::media_stream::add_zrtp_ctx() reception_flow_ = sfp_->install_reception_flow(socket_); } + rtp_ = std::shared_ptr(new uvgrtp::rtp(fmt_, ssrc_, ipv6_)); + + bool perform_dh = !(rce_flags_ & RCE_ZRTP_MULTISTREAM_MODE); + if (!perform_dh) + { + UVG_LOG_DEBUG("Sleeping non-DH performing stream until DH has finished"); + std::chrono::system_clock::time_point tp = std::chrono::system_clock::now(); + + while (!zrtp_->has_dh_finished()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + if (std::chrono::duration_cast(std::chrono::system_clock::now() - tp).count() > 10) + { + UVG_LOG_ERROR("Giving up on DH after 10 seconds"); + return free_resources(RTP_TIMEOUT); + } + } + } + rtp_error_t ret = RTP_OK; + if ((ret = zrtp_->init(rtp_->get_ssrc(), socket_, remote_sockaddr_, remote_sockaddr_ip6_, perform_dh, ipv6_)) != RTP_OK) { + UVG_LOG_WARN("Failed to initialize ZRTP for media stream!"); + return free_resources(ret); + } + srtp_ = std::shared_ptr(new uvgrtp::srtp(rce_flags_)); + if ((ret = init_srtp_with_zrtp(rce_flags_, SRTP, srtp_, zrtp_)) != RTP_OK) + return free_resources(ret); + + srtcp_ = std::shared_ptr(new uvgrtp::srtcp()); + if ((ret = init_srtp_with_zrtp(rce_flags_, SRTCP, srtcp_, zrtp_)) != RTP_OK) + return free_resources(ret); + + zrtp_->dh_has_finished(); // only after the DH stream has gotten its keys, do we let non-DH stream perform ZRTP + + UVG_LOG_DEBUG("DH negotiation finished!"); + + rtcp_ = std::shared_ptr(new uvgrtp::rtcp(rtp_, ssrc_, cname_, sfp_, srtcp_, rce_flags_)); + + socket_->install_handler(srtp_.get(), srtp_->send_packet_handler); + socket_->install_handler(rtcp_.get(), rtcp_->send_packet_handler_vec); + + rtp_handler_key_ = reception_flow_->install_handler(rtp_->packet_handler); + zrtp_handler_key_ = reception_flow_->install_handler(zrtp_->packet_handler); + + reception_flow_->map_handler_key(rtp_handler_key_, remote_ssrc_); + reception_flow_->map_handler_key(zrtp_handler_key_, remote_ssrc_); + + reception_flow_->install_aux_handler(rtp_handler_key_, srtp_.get(), srtp_->recv_packet_handler, nullptr); + reception_flow_->install_aux_handler(rtp_handler_key_, rtcp_.get(), rtcp_->recv_packet_handler, nullptr); + return start_components(); } @@ -472,9 +476,7 @@ rtp_error_t uvgrtp::media_stream::start_components() } initialized_ = true; - if (new_socket_) { - return reception_flow_->start(socket_, 0); - } + UVG_LOG_DEBUG("Stream initialized"); return RTP_OK; } @@ -686,12 +688,12 @@ rtp_error_t uvgrtp::media_stream::install_receive_hook(void *arg, void (*hook)(v return RTP_OK; } if (remote_ssrc_.get()->load() == 0) { - return reception_flow_->install_receive_hook(arg, hook, 0); hooked_ = true; + return reception_flow_->install_receive_hook(arg, hook, 0); } else { - return reception_flow_->install_receive_hook(arg, hook, remote_ssrc_.get()->load()); hooked_ = true; + return reception_flow_->install_receive_hook(arg, hook, remote_ssrc_.get()->load()); } } @@ -859,6 +861,13 @@ uint32_t uvgrtp::media_stream::get_ssrc() const return *ssrc_.get(); } +bool uvgrtp::media_stream::start_receiving() +{ + if (new_socket_) { + return reception_flow_->start(socket_, 0); + } +} + rtp_error_t uvgrtp::media_stream::init_srtp_with_zrtp(int rce_flags, int type, std::shared_ptr srtp, std::shared_ptr zrtp) { diff --git a/src/rtcp.cc b/src/rtcp.cc index 6a3e219..98ca055 100644 --- a/src/rtcp.cc +++ b/src/rtcp.cc @@ -131,7 +131,7 @@ uvgrtp::rtcp::~rtcp() void uvgrtp::rtcp::cleanup_participants() { - UVG_LOG_DEBUG("Removing all participants"); + UVG_LOG_DEBUG("Removing all RTCP participants"); participants_mutex_.lock(); /* free all receiver statistic structs */