multiplex: Check sender IP address when receiving packets in reception_flow

This commit is contained in:
Heikki Tampio 2023-05-12 14:56:00 +03:00
parent 478a06408e
commit 2a7526ec26
7 changed files with 47 additions and 24 deletions

View File

@ -376,8 +376,6 @@ namespace uvgrtp {
* an outgoing address */
rtp_error_t init_connection();
static rtp_error_t user_pkt_handler(void* arg, int rce_flags, uint8_t* ptr, uint32_t size);
/* Create the media object for the stream */
rtp_error_t create_media(rtp_format_t fmt);

View File

@ -646,16 +646,6 @@ rtp_error_t uvgrtp::media_stream::send_user_packet(uint8_t* data, uint32_t paylo
return socket_->sendto(addr, addr6, data, payload_size, 0);
}
rtp_error_t uvgrtp::media_stream::user_pkt_handler(void* arg, int rce_flags, uint8_t* ptr, uint32_t size)
{
(void)rce_flags;
// This is the packets final destination
UVG_LOG_DEBUG("media stream pkt handler!");
return RTP_PKT_READY;
}
rtp_error_t uvgrtp::media_stream::install_user_hook(void* arg, void (*hook)(void*, uint8_t* payload))
{
if (!initialized_) {
@ -944,13 +934,13 @@ int uvgrtp::media_stream::get_configuration_value(int rcc_flag)
return (int)rtp_->get_payload_size();
}
case RCC_FPS_NUMERATOR: {
return fps_numerator_;
return (int)fps_numerator_;
}
case RCC_FPS_DENOMINATOR: {
return fps_denominator_;
return (int)fps_denominator_;
}
case RCC_SESSION_BANDWIDTH: {
return bandwidth_;
return (int)bandwidth_;
}
default:
return -1;

View File

@ -20,10 +20,16 @@
#endif
#include <cstring>
#ifdef _WIN32
#include <ws2ipdef.h>
#else
#include <netinet/ip.h>
#include <sys/socket.h>
#endif
constexpr size_t DEFAULT_INITIAL_BUFFER_SIZE = 4194304;
uvgrtp::reception_flow::reception_flow() :
uvgrtp::reception_flow::reception_flow(bool ipv6) :
hooks_({}),
handler_mapping_({}),
should_stop_(true),
@ -36,7 +42,8 @@ uvgrtp::reception_flow::reception_flow() :
socket_(),
buffer_size_kbytes_(DEFAULT_INITIAL_BUFFER_SIZE),
payload_size_(MAX_IPV4_PAYLOAD),
active_(false)
active_(false),
ipv6_(ipv6)
{
create_ring_buffer();
}
@ -373,6 +380,10 @@ rtp_error_t uvgrtp::reception_flow::install_user_hook(void* arg, void (*hook)(vo
void uvgrtp::reception_flow::return_user_pkt(uint8_t* pkt)
{
UVG_LOG_DEBUG("Received user packet");
if (!pkt) {
UVG_LOG_DEBUG("User packet empty");
return;
}
if (user_hook_) {
user_hook_(user_hook_arg_, pkt);
}
@ -527,10 +538,13 @@ void uvgrtp::reception_flow::receiver(std::shared_ptr<uvgrtp::socket> socket)
//increase_buffer_size(next_write_index);
rtp_error_t ret = RTP_OK;
sockaddr_in sender = {};
sockaddr_in6 sender6 = {};
// get the potential packet
ret = socket->recvfrom(ring_buffer_[next_write_index].data, payload_size_,
MSG_DONTWAIT, &ring_buffer_[next_write_index].read);
MSG_DONTWAIT, &sender, &sender6, &ring_buffer_[next_write_index].read);
if (ret == RTP_INTERRUPTED)
{
@ -548,7 +562,8 @@ void uvgrtp::reception_flow::receiver(std::shared_ptr<uvgrtp::socket> socket)
}
++read_packets;
ring_buffer_[next_write_index].from6 = sender6;
ring_buffer_[next_write_index].from = sender;
// finally we update the ring buffer so processing (reading) knows that there is a new frame
last_ring_write_index_ = next_write_index;
}
@ -613,6 +628,9 @@ void uvgrtp::reception_flow::process_packet(int rce_flags)
else if (current_ssrc == 0) {
found = true;
}
else {
return_user_pkt(ptr);
}
if (!found) {
// No SSRC match found, skip this handler
continue;
@ -632,6 +650,7 @@ void uvgrtp::reception_flow::process_packet(int rce_flags)
case RTP_PKT_NOT_HANDLED:
{
// packet was not handled by this primary handlers, proceed to the next one
return_user_pkt(ptr);
continue;
/* packet was handled by the primary handler
* and should be dispatched to the auxiliary handler(s) */

View File

@ -13,6 +13,13 @@
#include <deque>
#include <map>
#ifdef _WIN32
#include <ws2ipdef.h>
#else
#include <netinet/ip.h>
#include <sys/socket.h>
#endif
namespace uvgrtp {
namespace frame {
@ -91,7 +98,7 @@ namespace uvgrtp {
class reception_flow{
public:
reception_flow();
reception_flow(bool ipv6);
~reception_flow();
/* Install a primary handler for an incoming UDP datagram
@ -224,10 +231,13 @@ namespace uvgrtp {
std::unique_ptr<std::thread> receiver_;
std::unique_ptr<std::thread> processor_;
// from is the address that this packet came from
struct Buffer
{
uint8_t* data;
int read;
sockaddr_in6 from6;
sockaddr_in from;
};
void* user_hook_arg_;
@ -247,6 +257,7 @@ namespace uvgrtp {
ssize_t buffer_size_kbytes_;
size_t payload_size_;
bool active_;
bool ipv6_;
};
}

View File

@ -940,8 +940,12 @@ rtp_error_t uvgrtp::socket::__recvfrom_ip6(uint8_t* buf, size_t buf_len, int rec
return RTP_OK;
}
rtp_error_t uvgrtp::socket::recvfrom(uint8_t *buf, size_t buf_len, int recv_flags, sockaddr_in *sender, int *bytes_read)
rtp_error_t uvgrtp::socket::recvfrom(uint8_t *buf, size_t buf_len, int recv_flags, sockaddr_in *sender,
sockaddr_in6 *sender6, int *bytes_read)
{
if (ipv6_) {
return __recvfrom_ip6(buf, buf_len, recv_flags, sender6, bytes_read);
}
return __recvfrom(buf, buf_len, recv_flags, sender, bytes_read);
}

View File

@ -139,7 +139,8 @@ namespace uvgrtp {
* Return RTP_OK on success and write the amount of bytes sent to "bytes_sent"
* Return RTP_INTERRUPTED if the call was interrupted due to timeout and set "bytes_sent" to 0
* Return RTP_GENERIC_ERROR on error and set "bytes_sent" to -1 */
rtp_error_t recvfrom(uint8_t *buf, size_t buf_len, int recv_flags, sockaddr_in *sender, int *bytes_read);
rtp_error_t recvfrom(uint8_t *buf, size_t buf_len, int recv_flags, sockaddr_in *sender,
sockaddr_in6 *sender6, int *bytes_read);
rtp_error_t recvfrom(uint8_t *buf, size_t buf_len, int recv_flags, sockaddr_in *sender);
rtp_error_t recvfrom(uint8_t *buf, size_t buf_len, int recv_flags, int *bytes_read);
rtp_error_t recvfrom(uint8_t *buf, size_t buf_len, int recv_flags);

View File

@ -99,7 +99,7 @@ std::shared_ptr<uvgrtp::socket> uvgrtp::socketfactory::create_new_socket(int typ
// If the socket is a type 2 (non-RTCP) socket, install a reception_flow
if (type == 2) {
std::shared_ptr<uvgrtp::reception_flow> flow = std::shared_ptr<uvgrtp::reception_flow>(new uvgrtp::reception_flow());
std::shared_ptr<uvgrtp::reception_flow> flow = std::shared_ptr<uvgrtp::reception_flow>(new uvgrtp::reception_flow(ipv6_));
std::pair pair = std::make_pair(flow, socket);
reception_flows_.insert(pair);
}