From 478a06408e9d9f57fcc3ab622f7777cfd5d5c855 Mon Sep 17 00:00:00 2001 From: Heikki Tampio Date: Fri, 12 May 2023 13:25:26 +0300 Subject: [PATCH] multiplex: Add ability to send user packets --- include/uvgrtp/media_stream.hh | 6 ++++++ src/media_stream.cc | 39 ++++++++++++++++++++++++++++++++++ src/reception_flow.cc | 28 ++++++++++++++++++++++-- src/reception_flow.hh | 9 +++++++- 4 files changed, 79 insertions(+), 3 deletions(-) diff --git a/include/uvgrtp/media_stream.hh b/include/uvgrtp/media_stream.hh index 08c0ea5..b31fbcf 100644 --- a/include/uvgrtp/media_stream.hh +++ b/include/uvgrtp/media_stream.hh @@ -273,6 +273,10 @@ namespace uvgrtp { */ rtp_error_t push_frame(std::unique_ptr data, size_t data_len, uint32_t ts, uint64_t ntp_ts, int rtp_flags); + rtp_error_t send_user_packet(uint8_t* data, uint32_t payload_size, + std::string remote_address, uint16_t port); + rtp_error_t install_user_hook(void* arg, void (*hook)(void*, uint8_t* payload)); + /** * \brief Poll a frame indefinitely from the media stream object * @@ -372,6 +376,8 @@ namespace uvgrtp { * an outgoing address */ rtp_error_t init_connection(); + static rtp_error_t user_pkt_handler(void* arg, int rce_flags, uint8_t* ptr, uint32_t size); + /* Create the media object for the stream */ rtp_error_t create_media(rtp_format_t fmt); diff --git a/src/media_stream.cc b/src/media_stream.cc index 988b2ca..6a9e02c 100644 --- a/src/media_stream.cc +++ b/src/media_stream.cc @@ -631,6 +631,45 @@ rtp_error_t uvgrtp::media_stream::push_frame(std::unique_ptr data, si return ret; } +rtp_error_t uvgrtp::media_stream::send_user_packet(uint8_t* data, uint32_t payload_size, + std::string remote_address, uint16_t port) +{ + sockaddr_in6 addr6; + sockaddr_in addr; + if (ipv6_) { + addr6 = uvgrtp::socket::create_ip6_sockaddr(remote_address, port); + } + else { + addr = uvgrtp::socket::create_sockaddr(AF_INET, remote_address, port); + } + UVG_LOG_DEBUG("Sending user packet"); + return socket_->sendto(addr, addr6, data, payload_size, 0); +} + +rtp_error_t uvgrtp::media_stream::user_pkt_handler(void* arg, int rce_flags, uint8_t* ptr, uint32_t size) +{ + (void)rce_flags; + + // This is the packets final destination + UVG_LOG_DEBUG("media stream pkt handler!"); + + return RTP_PKT_READY; +} + +rtp_error_t uvgrtp::media_stream::install_user_hook(void* arg, void (*hook)(void*, uint8_t* payload)) +{ + if (!initialized_) { + UVG_LOG_ERROR("RTP context has not been initialized fully, cannot continue!"); + return RTP_NOT_INITIALIZED; + } + + if (!hook) + return RTP_INVALID_VALUE; + + return reception_flow_->install_user_hook(arg, hook);; + +} + uvgrtp::frame::rtp_frame *uvgrtp::media_stream::pull_frame() { if (!check_pull_preconditions()) { diff --git a/src/reception_flow.cc b/src/reception_flow.cc index d66f02d..c2abd13 100644 --- a/src/reception_flow.cc +++ b/src/reception_flow.cc @@ -28,6 +28,8 @@ uvgrtp::reception_flow::reception_flow() : handler_mapping_({}), should_stop_(true), receiver_(nullptr), + user_hook_arg_(nullptr), + user_hook_(nullptr), ring_buffer_(), ring_read_index_(-1), // invalid first index that will increase to a valid one last_ring_write_index_(-1), @@ -357,7 +359,29 @@ void uvgrtp::reception_flow::return_frame(uvgrtp::frame::rtp_frame *frame) }*/ } -void uvgrtp::reception_flow::call_aux_handlers(uint32_t key, int rce_flags, uvgrtp::frame::rtp_frame **frame) +rtp_error_t uvgrtp::reception_flow::install_user_hook(void* arg, void (*hook)(void*, uint8_t* payload)) +{ + if (!hook) + return RTP_INVALID_VALUE; + + user_hook_ = hook; + user_hook_arg_ = arg; + + return RTP_OK; +} + +void uvgrtp::reception_flow::return_user_pkt(uint8_t* pkt) +{ + UVG_LOG_DEBUG("Received user packet"); + if (user_hook_) { + user_hook_(user_hook_arg_, pkt); + } + else { + UVG_LOG_DEBUG("No user hook installed"); + } +} + +void uvgrtp::reception_flow::call_aux_handlers(uint32_t key, int rce_flags, uvgrtp::frame::rtp_frame **frame, uint8_t* ptr) { rtp_error_t ret; @@ -614,7 +638,7 @@ void uvgrtp::reception_flow::process_packet(int rce_flags) } case RTP_PKT_MODIFIED: { - call_aux_handlers(handler.first, rce_flags, &frame); + call_aux_handlers(handler.first, rce_flags, &frame, ptr); break; } case RTP_GENERIC_ERROR: diff --git a/src/reception_flow.hh b/src/reception_flow.hh index d297a71..16fd6c3 100644 --- a/src/reception_flow.hh +++ b/src/reception_flow.hh @@ -22,6 +22,7 @@ namespace uvgrtp { class socket; typedef void (*recv_hook)(void* arg, uvgrtp::frame::rtp_frame* frame); + typedef void (*user_hook)(void* arg, uint8_t* payload, uint32_t payload_size); struct receive_pkt_hook { void* arg = nullptr; @@ -175,6 +176,7 @@ namespace uvgrtp { void set_buffer_size(const ssize_t& value); ssize_t get_buffer_size() const; void set_payload_size(const size_t& value); + rtp_error_t install_user_hook(void* arg, void (*hook)(void*, uint8_t* payload)); /// \endcond private: @@ -187,8 +189,10 @@ namespace uvgrtp { /* Return a processed RTP frame to user either through frame queue or receive hook */ void return_frame(uvgrtp::frame::rtp_frame *frame); + void return_user_pkt(uint8_t* pkt); + /* Call auxiliary handlers of a primary handler */ - void call_aux_handlers(uint32_t key, int rce_flags, uvgrtp::frame::rtp_frame **frame); + void call_aux_handlers(uint32_t key, int rce_flags, uvgrtp::frame::rtp_frame **frame, uint8_t* ptr); inline void increase_buffer_size(ssize_t next_write_index); @@ -226,6 +230,9 @@ namespace uvgrtp { int read; }; + void* user_hook_arg_; + void (*user_hook_)(void* arg, uint8_t* payload); + std::vector ring_buffer_; std::mutex ring_mutex_; // these uphold the ring buffer details