common: Using the MTU size doesn't work with small streams

The socket seems to really want the full size buffer even though the
packet size is mostly just 1458. This fixes the small streams working.
This commit is contained in:
Joni Räsänen 2022-02-02 13:15:18 +02:00
parent 314eec11bf
commit 2db5061291
3 changed files with 69 additions and 64 deletions

View File

@ -596,7 +596,6 @@ rtp_error_t uvgrtp::media_stream::configure_ctx(int flag, ssize_t value)
}
rtp_->set_payload_size(value - hdr);
pkt_dispatcher_->set_mtu_size(value);
}
break;

View File

@ -18,9 +18,9 @@
#include <cstring>
constexpr size_t RECV_BUFFER_SIZE = 0xffff - IPV4_HDR_SIZE - UDP_HDR_SIZE;
constexpr size_t DEFAULT_MTU_SIZE = 1458;
constexpr size_t DEFAULT_BUFFER_SIZE = 4194304;
constexpr size_t DEFAULT_INITIAL_BUFFER_SIZE = 4194304;
uvgrtp::pkt_dispatcher::pkt_dispatcher() :
@ -31,8 +31,7 @@ uvgrtp::pkt_dispatcher::pkt_dispatcher() :
ring_buffer_(),
ring_read_index_(-1), // invalid first index that will increase to a valid one
last_ring_write_index_(0),
mtu_size_(DEFAULT_MTU_SIZE),
buffer_size_kbytes_(DEFAULT_BUFFER_SIZE)
buffer_size_kbytes_(DEFAULT_INITIAL_BUFFER_SIZE)
{
create_ring_buffer();
}
@ -47,11 +46,11 @@ uvgrtp::pkt_dispatcher::~pkt_dispatcher()
void uvgrtp::pkt_dispatcher::create_ring_buffer()
{
destroy_ring_buffer();
size_t elements = buffer_size_kbytes_ / mtu_size_;
size_t elements = buffer_size_kbytes_ / RECV_BUFFER_SIZE;
for (int i = 0; i < elements; ++i)
{
ring_buffer_.push_back({ new uint8_t[mtu_size_] , 0 });
ring_buffer_.push_back({ new uint8_t[RECV_BUFFER_SIZE] , 0 });
}
}
@ -64,13 +63,6 @@ void uvgrtp::pkt_dispatcher::destroy_ring_buffer()
ring_buffer_.clear();
}
void uvgrtp::pkt_dispatcher::set_mtu_size(ssize_t& value)
{
mtu_size_ = value;
create_ring_buffer();
}
void uvgrtp::pkt_dispatcher::set_buffer_size(ssize_t& value)
{
buffer_size_kbytes_ = value;
@ -320,42 +312,6 @@ void uvgrtp::pkt_dispatcher::call_aux_handlers(uint32_t key, int flags, uvgrtp::
}
}
/* The point of packet dispatcher is to provide much-needed isolation between different layers
* of uvgRTP. For example, HEVC handler should not concern itself with RTP packet validation
* because that should be a global operation done for all packets.
*
* Neither should Opus handler take SRTP-provided authentication tag into account when it is
* performing operations on the packet.
* And ZRTP packets should not be relayed from media handler to ZRTP handler et cetera.
*
* This can be achieved by having a global UDP packet handler for any packet type that validates
* all common stuff it can and then dispatches the validated packet to the correct layer using
* one of the installed handlers.
*
* If it's unclear as to which handler should be called, the packet is dispatched to all relevant
* handlers and a handler then returns RTP_OK/RTP_PKT_NOT_HANDLED based on whether the packet was handled.
*
* For example, if runner detects an incoming ZRTP packet, that packet is immediately dispatched to the
* installed ZRTP handler if ZRTP has been enabled.
* Likewise, if RTP packet authentication has been enabled, runner validates the packet before passing
* it onto any other layer so all future work on the packet is not done in vain due to invalid data
*
* One piece of design choice that complicates the design of packet dispatcher a little is that the order
* of handlers is important. First handler must be ZRTP and then follows SRTP, RTP and finally media handlers.
* This requirement gives packet handler a clean and generic interface while giving a possibility to modify
* the packet in each of the called handlers if needed. For example SRTP handler verifies RTP authentication
* tag and decrypts the packet and RTP handler verifies the fields of the RTP packet and processes it into
* a more easily modifiable format for the media handler.
*
* If packet is modified by the handler but the frame is not ready to be returned to user,
* handler returns RTP_PKT_MODIFIED to indicate that it has modified the input buffer and that
* the packet should be passed onto other handlers.
*
* When packet is ready to be returned to user, "out" parameter of packet handler is set to point to
* the allocated frame that can be returned and return value of the packet handler is RTP_PKT_READY.
*
* If a handler receives a non-null "out", it can safely ignore "packet" and operate just on
* the "out" parameter because at that point it already contains all needed information. */
void uvgrtp::pkt_dispatcher::receiver(uvgrtp::socket *socket, int flags)
{
LOG_DEBUG("Start reception loop");
@ -393,28 +349,40 @@ void uvgrtp::pkt_dispatcher::receiver(uvgrtp::socket *socket, int flags)
if (pfds->revents & POLLIN) {
// we write as many packets as socket has in the buffer
rtp_error_t ret = RTP_OK;
while (ret == RTP_OK && !should_stop_)
while (!should_stop_)
{
// get the potential next write. Poll makes sure we already have data in the buffer, but
// to make sure processing doesn't start reading incomplete/old packets, we update index
// after we have the data
// get the potential next write. To make sure processing doesn't start reading
// incomplete/old packets, we only update the index after buffer has the data
int next_write_index = next_buffer_location(last_ring_write_index_);
// wait if the process/read hasn't freed any spots on the ring buffer
if (next_write_index == ring_read_index_)
{
LOG_WARN("Reception processing too slow, dropping oldest packet!");
++ring_read_index_;
LOG_DEBUG("Reception buffer ran out, doubling the buffer size ...");
// double the size every time we run out
int increase = ring_buffer_.size();
ring_mutex_.lock();
for (unsigned int i = 0; i < increase; ++i)
{
ring_buffer_.insert(ring_buffer_.begin() + next_write_index, { new uint8_t[RECV_BUFFER_SIZE] , 0 });
++ring_read_index_;
}
ring_mutex_.unlock();
}
// get potential packet (there should be because of poll())
if ((ret = socket->recvfrom(ring_buffer_[next_write_index].data,
mtu_size_, MSG_DONTWAIT, &ring_buffer_[next_write_index].read)) == RTP_INTERRUPTED)
rtp_error_t ret = RTP_OK;
// get the potential packet
if ((ret = socket->recvfrom(ring_buffer_[next_write_index].data, RECV_BUFFER_SIZE,
MSG_DONTWAIT, &ring_buffer_[next_write_index].read)) == RTP_INTERRUPTED ||
ring_buffer_[next_write_index].read == 0)
{
break;
if (ret != RTP_OK) {
}
else if (ret != RTP_OK) {
LOG_ERROR("recvfrom(2) failed! Packet dispatcher cannot continue %d!", ret);
should_stop_ = true;
break;
}
@ -433,6 +401,41 @@ void uvgrtp::pkt_dispatcher::receiver(uvgrtp::socket *socket, int flags)
}
}
/* The point of packet dispatcher is to provide isolation between different layers
* of uvgRTP. For example, HEVC handler should not concern itself with RTP packet validation
* because that should be a global operation done for all packets. Neither should Opus handler
* take SRTP-provided authentication tag into account when it is performing operations on
* the packet and ZRTP packets should not be relayed from media handler
* to ZRTP handler et cetera.
*
* This can be achieved by having a global UDP packet handler for any packet type that validates
* all common stuff it can and then dispatches the validated packet to the correct layer using
* one of the installed handlers.
*
* If it's unclear as to which handler should be called, the packet is dispatched to all relevant
* handlers and a handler then returns RTP_OK/RTP_PKT_NOT_HANDLED based on whether the packet was handled.
*
* For example, if runner detects an incoming ZRTP packet, that packet is immediately dispatched to the
* installed ZRTP handler if ZRTP has been enabled.
* Likewise, if RTP packet authentication has been enabled, runner validates the packet before passing
* it onto any other layer so all future work on the packet is not done in vain due to invalid data
*
* One piece of design choice that complicates the design of packet dispatcher a little is that the order
* of handlers is important. First handler must be ZRTP and then follows SRTP, RTP and finally media handlers.
* This requirement gives packet handler a clean and generic interface while giving a possibility to modify
* the packet in each of the called handlers if needed. For example SRTP handler verifies RTP authentication
* tag and decrypts the packet and RTP handler verifies the fields of the RTP packet and processes it into
* a more easily modifiable format for the media handler.
*
* If packet is modified by the handler but the frame is not ready to be returned to user,
* handler returns RTP_PKT_MODIFIED to indicate that it has modified the input buffer and that
* the packet should be passed onto other handlers.
*
* When packet is ready to be returned to user, "out" parameter of packet handler is set to point to
* the allocated frame that can be returned and return value of the packet handler is RTP_PKT_READY.
*
* If a handler receives a non-null "out", it can safely ignore "packet" and operate just on
* the "out" parameter because at that point it already contains all needed information. */
void uvgrtp::pkt_dispatcher::process_packet(int flags)
{
LOG_DEBUG("Start processing loop");
@ -448,6 +451,8 @@ void uvgrtp::pkt_dispatcher::process_packet(int flags)
break;
}
ring_mutex_.lock();
// process all available reads in one go
while (next_buffer_location(ring_read_index_) != last_ring_write_index_)
{
@ -488,6 +493,8 @@ void uvgrtp::pkt_dispatcher::process_packet(int flags)
}
}
}
ring_mutex_.unlock();
}
}

View File

@ -106,7 +106,6 @@ namespace uvgrtp {
uvgrtp::frame::rtp_frame *pull_frame();
uvgrtp::frame::rtp_frame *pull_frame(size_t timeout_ms);
void set_mtu_size(ssize_t& value);
void set_buffer_size(ssize_t& value);
private:
@ -156,10 +155,10 @@ namespace uvgrtp {
std::atomic<int> last_ring_write_index_;
std::mutex wait_mtx_;
std::mutex ring_mutex_;
std::condition_variable process_cond_;
ssize_t mtu_size_;
ssize_t buffer_size_kbytes_;
};
}