From 5cedd36641c641436b983a1b30a70c7601406866 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Fri, 24 Apr 2020 21:20:27 +0300 Subject: [PATCH] Add ability to fragment/reconstruct generic frames --- include/util.hh | 16 +++- src/formats/generic.cc | 179 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 180 insertions(+), 15 deletions(-) diff --git a/include/util.hh b/include/util.hh index 2fa2e1f..cffd705 100644 --- a/include/util.hh +++ b/include/util.hh @@ -130,7 +130,21 @@ enum RTP_CTX_ENABLE_FLAGS { * equally and drops all frames that are late */ RCE_HEVC_NO_INTRA_DELAY = 1 << 5, - RCE_LAST = 1 << 6, + /* Fragment generic frames into RTP packets of 1500 bytes. + * + * If RCE_FRAGMENT_GENERIC is given to create_stream(), kvzRTP will split frames + * of type RTP_FORMAT_GENERIC into packets of 1500 bytes automatically and reconstruct + * the full frame from the fragments in the receiver + * + * This behavior is not from any specification and only supported by kvzRTP so it + * will break interoperability between libraries if enabled. + * + * RCE_FRAGMENT_GENERIC can be used, for example, when you're using kvzRTP for + * both sender and receiver and the media stream you wish to stream is not supported + * by kvzRTP but requires packetization because MEDIA_FRAME_SIZE > MTU */ + RCE_FRAGMENT_GENERIC = 1 << 6, + + RCE_LAST = 1 << 7, }; /* These options are given to configuration() */ diff --git a/src/formats/generic.cc b/src/formats/generic.cc index 581fd4b..3cbb083 100644 --- a/src/formats/generic.cc +++ b/src/formats/generic.cc @@ -17,39 +17,190 @@ #include "formats/generic.hh" -// TODO implement frame splitting if data_len > MTU +#define INVALID_SEQ 0xffffffff + +/* The generic frames are fragmented using the marker bit of the RTP header. + * First and last fragment of a larger frame have marker bits set and middle fragments don't. + * All fragments have the same timestamp so the receiver knows which fragments are part of a larger frame. */ rtp_error_t kvz_rtp::generic::push_frame(kvz_rtp::sender *sender, uint8_t *data, size_t data_len, int flags) { (void)flags; - if (data_len > MAX_PAYLOAD) { - LOG_WARN("packet is larger (%zu bytes) than MAX_PAYLOAD (%u bytes)", data_len, MAX_PAYLOAD); - } - uint8_t header[kvz_rtp::frame::HEADER_SIZE_RTP]; sender->get_rtp_ctx()->fill_header(header); + if (data_len > MAX_PAYLOAD) { + if (sender->get_conf().flags & RCE_FRAGMENT_GENERIC) { + + rtp_error_t ret = RTP_OK; + ssize_t data_left = data_len; + ssize_t data_pos = 0; + + /* set marker bit for the first fragment */ + header[1] |= (1 << 7); + + while (data_left > MAX_PAYLOAD) { + ret = kvz_rtp::send::send_frame(sender, header, sizeof(header), data + data_pos, MAX_PAYLOAD); + + if (ret != RTP_OK) + return ret; + + sender->get_rtp_ctx()->update_sequence(header); + + data_pos += MAX_PAYLOAD; + data_left -= MAX_PAYLOAD; + + /* clear marker bit for middle fragments */ + header[1] &= 0x7f; + } + + /* set marker bit for the last frame */ + header[1] |= (1 << 7); + return kvz_rtp::send::send_frame(sender, header, sizeof(header), data + data_pos, data_left); + + } else { + LOG_WARN("packet is larger (%zu bytes) than MAX_PAYLOAD (%u bytes)", data_len, MAX_PAYLOAD); + } + } + return kvz_rtp::send::send_frame(sender, header, sizeof(header), data, data_len); } rtp_error_t kvz_rtp::generic::push_frame(kvz_rtp::sender *sender, std::unique_ptr data, size_t data_len, int flags) { - (void)flags; + return kvz_rtp::generic::push_frame(sender, data.get(), data_len, flags); +} - if (data_len > MAX_PAYLOAD) { - LOG_WARN("packet is larger (%zu bytes) than MAX_PAYLOAD (%u bytes)", data_len, MAX_PAYLOAD); +static rtp_error_t __fragment_receiver(kvz_rtp::receiver *receiver) +{ + LOG_INFO("use fragment receiver"); + + int nread = 0; + sockaddr_in sender_addr; + rtp_error_t ret = RTP_OK; + kvz_rtp::socket socket = receiver->get_socket(); + kvz_rtp::frame::rtp_frame *frame; + + struct frame_info { + uint32_t s_seq; + uint32_t e_seq; + size_t npkts; + std::map fragments; + }; + std::unordered_map frames; + + fd_set read_fds; + struct timeval t_val; + + FD_ZERO(&read_fds); + + t_val.tv_sec = 0; + t_val.tv_usec = 1500; + + while (!receiver->active()) + ; + + while (receiver->active()) { + FD_SET(socket.get_raw_socket(), &read_fds); + int sret = ::select(socket.get_raw_socket() + 1, &read_fds, nullptr, nullptr, &t_val); + + if (sret < 0) { +#ifdef __linux__ + LOG_ERROR("select failed: %s!", strerror(errno)); +#else + win_get_last_error(); +#endif + return RTP_GENERIC_ERROR; + } + + do { +#ifdef __linux__ + ret = socket.recvfrom(receiver->get_recv_buffer(), receiver->get_recv_buffer_len(), MSG_DONTWAIT, &sender_addr, &nread); + + if (ret != RTP_OK) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + + LOG_ERROR("recvfrom failed! FrameReceiver cannot continue %s!", strerror(errno)); + return RTP_GENERIC_ERROR; + } +#else + ret = socket.recvfrom(receiver->get_recv_buffer(), receiver->get_recv_buffer_len(), 0, &sender_addr, &nread); + + if (ret != RTP_OK) { + if (WSAGetLastError() == WSAEWOULDBLOCK) + break; + + LOG_ERROR("recvfrom failed! FrameReceiver cannot continue %s!", strerror(errno)); + return RTP_GENERIC_ERROR; + } +#endif + + if ((frame = receiver->validate_rtp_frame(receiver->get_recv_buffer(), nread)) == nullptr) { + LOG_DEBUG("received an invalid frame, discarding"); + continue; + } + memcpy(&frame->src_addr, &sender_addr, sizeof(sockaddr_in)); + + uint32_t ts = frame->header.timestamp; + uint32_t seq = frame->header.seq; + size_t recv = 0; + + if (frames.find(ts) != frames.end()) { + frames[ts].npkts++; + frames[ts].fragments[seq] = frame; + + if (frame->header.marker) + frames[ts].e_seq = seq; + + if (frames[ts].e_seq != INVALID_SEQ && frames[ts].s_seq != INVALID_SEQ) { + if (frames[ts].s_seq > frames[ts].e_seq) + recv = 0xffff - frames[ts].s_seq + frames[ts].e_seq + 2; + else + recv = frames[ts].e_seq - frames[ts].s_seq + 1; + + if (recv == frames[ts].npkts) { + auto retframe = kvz_rtp::frame::alloc_rtp_frame(recv * MAX_PAYLOAD); + size_t ptr = 0; + + std::memcpy(&retframe->header, &frame->header, sizeof(frame->header)); + + for (auto& frag : frames[ts].fragments) { + std::memcpy( + retframe->payload + ptr, + frag.second->payload, + frag.second->payload_len + ); + ptr += frag.second->payload_len; + (void)kvz_rtp::frame::dealloc_frame(frag.second); + } + + frames.erase(ts); + receiver->return_frame(retframe); + } + } + } else { + if (frame->header.marker) { + frames[ts].npkts = 1; + frames[ts].s_seq = seq; + frames[ts].e_seq = INVALID_SEQ; + frames[ts].fragments[seq] = frame; + } else { + receiver->return_frame(frame); + } + } + } while (ret == RTP_OK); } - uint8_t header[kvz_rtp::frame::HEADER_SIZE_RTP]; - sender->get_rtp_ctx()->fill_header(header); - - /* We don't need to transfer the ownership of of "data" to socket because send_frame() executes immediately */ - return kvz_rtp::send::send_frame(sender, header, sizeof(header), data.get(), data_len); + receiver->get_mutex().unlock(); + return ret; } rtp_error_t kvz_rtp::generic::frame_receiver(kvz_rtp::receiver *receiver) { - LOG_INFO("Generic frame receiver starting..."); + /* If user uses fragmented generic frames, start the fragment receiver instead */ + if (receiver->get_conf().flags & RCE_FRAGMENT_GENERIC) + return __fragment_receiver(receiver); int nread = 0; sockaddr_in sender_addr;