Implement faster RTP frag frame sending for Linux

This implementatation uses thread-local storage + sendmmsg()
to split large frames and send them using only one system call.

This may results in large performance improvements especially when
frame sizes increase as these implementatation doesn't do any extra
copying and sends the frame to remote using one system call (compared
to previous implementatation where each RTP frame was send individually)

Only works on Linux
This commit is contained in:
Aaro Altonen 2019-06-12 11:14:24 +03:00
parent 03542419d1
commit f8d6ec6361
2 changed files with 186 additions and 4 deletions

View File

@ -3,8 +3,8 @@
#include <windows.h>
#else
#include <arpa/inet.h>
#include <sys/types.h>
#endif
#include <stdint.h>
#include <cstring>
#include <iostream>
@ -24,10 +24,11 @@ static rtp_error_t __internal_write(kvz_rtp::connection *conn, uint8_t *buf, siz
kvz_rtp::writer *writer = dynamic_cast<kvz_rtp::writer *>(conn);
#ifdef __linux__
sockaddr_in out_addr = writer->get_out_address();
sockaddr_in out_addr = writer->get_out_address();
if (sendto(conn->get_socket(), buf, buf_len, flags, (struct sockaddr *)&out_addr, sizeof(out_addr)) == -1)
return RTP_SEND_ERROR;
#else
DWORD sent_bytes;
WSABUF data_buf;
@ -81,8 +82,8 @@ rtp_error_t kvz_rtp::sender::write_rtp_header(kvz_rtp::connection *conn, uint32_
return RTP_INVALID_VALUE;
uint8_t header[kvz_rtp::frame::HEADER_SIZE_RTP] = { 0 };
conn->fill_rtp_header(header, timestamp);
return kvz_rtp::sender::write_generic_header(conn, header, kvz_rtp::frame::HEADER_SIZE_RTP);
}
@ -112,6 +113,11 @@ rtp_error_t kvz_rtp::sender::write_frame(
rtp_error_t ret;
#ifdef __linux__
/* TODO: error checking */
(void)kvz_rtp::sender::enqueue_message(conn, header, header_len, payload, payload_len);
ret = kvz_rtp::sender::flush_message_queue(conn);
#else
if ((ret = kvz_rtp::sender::write_generic_header(conn, header, header_len)) != RTP_OK) {
LOG_ERROR("Failed to write generic header, length: %zu", header_len);
return ret;
@ -121,6 +127,151 @@ rtp_error_t kvz_rtp::sender::write_frame(
LOG_ERROR("Failed to write payload, length: %zu", payload_len);
return ret;
}
#endif
return ret;
}
#define MAX_CHUNK_COUNT 30
#define MAX_MSG_COUNT 10
static thread_local struct mmsghdr headers[MAX_MSG_COUNT];
static thread_local struct msghdr messages[MAX_MSG_COUNT];
static thread_local struct iovec chunks[MAX_CHUNK_COUNT];
static thread_local int chunk_ptr = 0;
static thread_local int hdr_ptr = 0;
static thread_local int msg_ptr = 0;
rtp_error_t kvz_rtp::sender::enqueue_message(
kvz_rtp::connection *conn,
uint8_t *header, size_t header_len,
uint8_t *payload, size_t payload_len
)
{
if (!conn || !header || header_len == 0|| !payload || payload_len == 0)
return RTP_INVALID_VALUE;
if (chunk_ptr + 2 >= MAX_CHUNK_COUNT || msg_ptr + 1 >= MAX_MSG_COUNT) {
LOG_ERROR("maximum amount of chunks (%d) or messages (%d) exceeded!", chunk_ptr, msg_ptr);
return RTP_MEMORY_ERROR;
}
sockaddr_in out_addr = dynamic_cast<kvz_rtp::writer *>(conn)->get_out_address();
chunks[chunk_ptr + 0].iov_base = header;
chunks[chunk_ptr + 0].iov_len = header_len;
chunks[chunk_ptr + 1].iov_base = payload;
chunks[chunk_ptr + 1].iov_len = payload_len;
messages[msg_ptr].msg_name = (void *)&out_addr;
messages[msg_ptr].msg_namelen = sizeof(out_addr);
messages[msg_ptr].msg_iov = &chunks[chunk_ptr];
messages[msg_ptr].msg_iovlen = 2;
messages[msg_ptr].msg_control = 0;
messages[msg_ptr].msg_controllen = 0;
headers[hdr_ptr].msg_hdr = messages[msg_ptr];
chunk_ptr += 2;
msg_ptr += 1;
hdr_ptr += 1;
conn->incRTPSequence(1);
return RTP_OK;
}
rtp_error_t kvz_rtp::sender::enqueue_message(
kvz_rtp::connection *conn,
uint8_t *message, size_t message_len
)
{
if (!conn || !message || message_len == 0)
return RTP_INVALID_VALUE;
if (chunk_ptr + 1 >= MAX_CHUNK_COUNT || msg_ptr + 1 >= MAX_MSG_COUNT) {
LOG_ERROR("maximum amount of chunks (%d) or messages (%d) exceeded!", chunk_ptr, msg_ptr);
return RTP_MEMORY_ERROR;
}
sockaddr_in out_addr = dynamic_cast<kvz_rtp::writer *>(conn)->get_out_address();
chunks[chunk_ptr + 0].iov_base = message;
chunks[chunk_ptr + 0].iov_len = message_len;
messages[msg_ptr].msg_name = (void *)&out_addr;
messages[msg_ptr].msg_namelen = sizeof(out_addr);
messages[msg_ptr].msg_iov = &chunks[chunk_ptr];
messages[msg_ptr].msg_iovlen = 1;
messages[msg_ptr].msg_control = 0;
messages[msg_ptr].msg_controllen = 0;
headers[hdr_ptr].msg_hdr = messages[msg_ptr];
chunk_ptr++;
msg_ptr++;
hdr_ptr++;
conn->incRTPSequence(1);
return RTP_OK;
}
rtp_error_t kvz_rtp::sender::enqueue_message(
kvz_rtp::connection *conn,
std::vector<std::pair<size_t, uint8_t *>>& buffers
)
{
if (!conn || buffers.size() == 0)
return RTP_INVALID_VALUE;
if (chunk_ptr + buffers.size() >= MAX_CHUNK_COUNT || msg_ptr + 1 >= MAX_MSG_COUNT) {
LOG_ERROR("maximum amount of chunks (%d) or messages (%d) exceeded!", chunk_ptr, msg_ptr);
return RTP_MEMORY_ERROR;
}
for (size_t i = 0; i < buffers.size(); ++i) {
chunks[chunk_ptr + i].iov_len = buffers.at(i).first;
chunks[chunk_ptr + i].iov_base = buffers.at(i).second;
}
sockaddr_in out_addr = dynamic_cast<kvz_rtp::writer *>(conn)->get_out_address();
messages[msg_ptr].msg_name = (void *)&out_addr;
messages[msg_ptr].msg_namelen = sizeof(out_addr);
messages[msg_ptr].msg_iov = &chunks[chunk_ptr];
messages[msg_ptr].msg_iovlen = buffers.size();
messages[msg_ptr].msg_control = 0;
messages[msg_ptr].msg_controllen = 0;
headers[hdr_ptr].msg_hdr = messages[msg_ptr];
chunk_ptr += buffers.size();
msg_ptr += 1;
hdr_ptr += 1;
conn->incRTPSequence(1);
return RTP_OK;
}
rtp_error_t kvz_rtp::sender::flush_message_queue(kvz_rtp::connection *conn)
{
rtp_error_t ret = RTP_OK;
if (msg_ptr == 0 || hdr_ptr == 0 || chunk_ptr == 0) {
LOG_ERROR("Cannot send 0 messages or messages containing 0 chunks!");
ret = RTP_INVALID_VALUE;
goto end;
}
if (sendmmsg(conn->get_socket(), headers, hdr_ptr, 0) < 0) {
LOG_ERROR("Failed to flush the message queue!");
ret = RTP_SEND_ERROR;
}
end:
chunk_ptr = hdr_ptr = msg_ptr = 0;
return ret;
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <vector>
namespace kvz_rtp {
class connection;
@ -30,11 +32,40 @@ namespace kvz_rtp {
* Return RTP_OK on success and RTP_ERROR on error */
rtp_error_t write_generic_frame(kvz_rtp::connection *conn, kvz_rtp::frame::rtp_frame *frame);
/* TODO: */
/* If the header and payload of RTP messages are separate, the can be combined and sent
* using write_frame()
*
* This function will send the message right away.
*
* return RTP_OK on success and RTP_ERROR on error */
rtp_error_t write_frame(
kvz_rtp::connection *conn,
uint8_t *header, size_t header_len,
uint8_t *payload, size_t payload_len
);
/* linux specific functions*/
/* TODO: */
rtp_error_t enqueue_message(
kvz_rtp::connection *conn,
uint8_t *header, size_t header_len,
uint8_t *payload, size_t payload_len
);
/* TODO: */
rtp_error_t enqueue_message(
kvz_rtp::connection *conn,
std::vector<std::pair<size_t, uint8_t *>>& buffers
);
/* TODO: */
rtp_error_t enqueue_message(
kvz_rtp::connection *conn,
uint8_t *message, size_t message_len
);
/* TODO: */
rtp_error_t flush_message_queue(kvz_rtp::connection *conn);
};
};