Rewrite kvzRTP's HEVC receiver
Overall code cleanup and removal of unnecessarily complex logic. Now the RTP frame delays are more adaptive to Kvazzup's needs: frame delay for intra frames is the intra period so intra frame can be late f.ex. 2s seconds whereas inter frames must be received within the 100ms limit or they are dropped. This change should remove the gray screens completely Also remove the global frame array for HEVC fragments and use frame-specific map to store them. I noticed that with high-quality streams the number of packets used by one frame was starting to be close to UINT16_MAX, which caused kvzRTP to overwrite previous frames' fragments with new fragments and thus dropping frames by itself. Now each frame has its own fragment buffer so the frame size no longer matters
This commit is contained in:
parent
74a7207f90
commit
9d73b81659
|
|
@ -2,6 +2,7 @@
|
|||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <unordered_set>
|
||||
|
||||
#include "debug.hh"
|
||||
#include "queue.hh"
|
||||
|
|
@ -10,9 +11,14 @@
|
|||
|
||||
#define RTP_FRAME_MAX_DELAY 100
|
||||
#define INVALID_SEQ 0x13371338
|
||||
#define INVALID_TS 0xffffffff
|
||||
|
||||
#define TS(x) ((x)->header.timestamp)
|
||||
#define SEQ(x) ((x)->header.seq)
|
||||
#define RTP_HDR_SIZE 12
|
||||
#define NAL_HDR_SIZE 2
|
||||
|
||||
#define TS(x) ((x)->header.timestamp)
|
||||
#define SEQ(x) ((x)->header.seq)
|
||||
#define FRAME(t, s) (finfo[(t)].fragments[(s)])
|
||||
|
||||
enum FRAG_TYPES {
|
||||
FT_INVALID = -2, /* invalid combination of S and E bits */
|
||||
|
|
@ -22,15 +28,36 @@ enum FRAG_TYPES {
|
|||
FT_END = 3, /* frame contains a fragment with E bit set */
|
||||
};
|
||||
|
||||
struct hevc_fu_info {
|
||||
kvz_rtp::clock::hrc::hrc_t sframe_time; /* clock reading when the first fragment is received */
|
||||
uint32_t sframe_seq; /* sequence number of the frame with s-bit */
|
||||
uint32_t eframe_seq; /* sequence number of the frame with e-bit */
|
||||
size_t pkts_received; /* how many fragments have been received */
|
||||
size_t total_size; /* total size of all fragments */
|
||||
enum NAL_TYPES {
|
||||
NT_INTRA = 0x00,
|
||||
NT_INTER = 0x01,
|
||||
NT_OTHER = 0xff
|
||||
};
|
||||
|
||||
static int __check_frame(kvz_rtp::frame::rtp_frame *frame)
|
||||
typedef std::unordered_map<uint32_t, struct hevc_info> frame_info_t;
|
||||
|
||||
struct hevc_info {
|
||||
/* clock reading when the first fragment is received */
|
||||
kvz_rtp::clock::hrc::hrc_t sframe_time;
|
||||
|
||||
/* sequence number of the frame with s-bit */
|
||||
uint32_t s_seq;
|
||||
|
||||
/* sequence number of the frame with e-bit */
|
||||
uint32_t e_seq;
|
||||
|
||||
/* how many fragments have been received */
|
||||
size_t pkts_received;
|
||||
|
||||
/* total size of all fragments */
|
||||
size_t total_size;
|
||||
|
||||
/* map of frame's fragments,
|
||||
* allows out-of-order insertion and loop-through in order */
|
||||
std::map<uint16_t, kvz_rtp::frame::rtp_frame *> fragments;
|
||||
};
|
||||
|
||||
static int FRAG(kvz_rtp::frame::rtp_frame *frame)
|
||||
{
|
||||
bool first_frag = frame->payload[2] & 0x80;
|
||||
bool last_frag = frame->payload[2] & 0x40;
|
||||
|
|
@ -50,29 +77,54 @@ static int __check_frame(kvz_rtp::frame::rtp_frame *frame)
|
|||
return FT_MIDDLE;
|
||||
}
|
||||
|
||||
static inline uint8_t NAL(kvz_rtp::frame::rtp_frame *frame)
|
||||
{
|
||||
switch (frame->payload[2] & 0x3f) {
|
||||
case 19: return NT_INTRA;
|
||||
case 1: return NT_INTER;
|
||||
default: break;
|
||||
}
|
||||
|
||||
return NT_OTHER;
|
||||
}
|
||||
|
||||
static inline bool __frame_late(hevc_info& hinfo)
|
||||
{
|
||||
return (kvz_rtp::clock::hrc::diff_now(hinfo.sframe_time) >= RTP_FRAME_MAX_DELAY);
|
||||
}
|
||||
|
||||
static void __drop_frame(frame_info_t& finfo, uint32_t ts)
|
||||
{
|
||||
uint16_t s_seq = finfo.at(ts).s_seq;
|
||||
uint16_t e_seq = finfo.at(ts).e_seq;
|
||||
|
||||
LOG_INFO("Dropping frame %u, %u - %u", ts, s_seq, e_seq);
|
||||
|
||||
for (auto& fragment : finfo.at(ts).fragments)
|
||||
(void)kvz_rtp::frame::dealloc_frame(fragment.second);
|
||||
|
||||
finfo.erase(ts);
|
||||
}
|
||||
|
||||
rtp_error_t __hevc_receiver(kvz_rtp::receiver *receiver)
|
||||
{
|
||||
LOG_INFO("frameReceiver starting listening...");
|
||||
|
||||
int nread = 0;
|
||||
sockaddr_in sender_addr;
|
||||
frame_info_t finfo;
|
||||
rtp_error_t ret = RTP_OK;
|
||||
uint32_t intra = INVALID_TS;
|
||||
kvz_rtp::socket socket = receiver->get_socket();
|
||||
kvz_rtp::frame::rtp_frame *frame, *frames[0xffff + 1] = { 0 };
|
||||
bool enable_idelay = !(receiver->get_conf().flags & RCE_HEVC_NO_INTRA_DELAY);
|
||||
std::unordered_set<uint32_t> dropped;
|
||||
|
||||
fd_set read_fds;
|
||||
struct timeval t_val;
|
||||
FD_ZERO(&read_fds);
|
||||
|
||||
uint8_t nal_header[2] = { 0 };
|
||||
std::map<uint32_t, hevc_fu_info> s_timers;
|
||||
std::map<uint32_t, size_t> dropped_frames;
|
||||
|
||||
while (!receiver->active())
|
||||
;
|
||||
|
||||
while (receiver->active()) {
|
||||
|
||||
/* Reset select() parameters.
|
||||
*
|
||||
* FD_SET() must be called every time before calling select() at least on Windows
|
||||
|
|
@ -94,7 +146,7 @@ rtp_error_t __hevc_receiver(kvz_rtp::receiver *receiver)
|
|||
|
||||
do {
|
||||
#ifdef __linux__
|
||||
ret = socket.recvfrom(receiver->get_recv_buffer(), receiver->get_recv_buffer_len(), MSG_DONTWAIT, &sender_addr, &nread);
|
||||
ret = socket.recvfrom(receiver->get_recv_buffer(), receiver->get_recv_buffer_len(), MSG_DONTWAIT, nullptr, &nread);
|
||||
|
||||
if (ret != RTP_OK) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
||||
|
|
@ -104,7 +156,7 @@ rtp_error_t __hevc_receiver(kvz_rtp::receiver *receiver)
|
|||
return RTP_GENERIC_ERROR;
|
||||
}
|
||||
#else
|
||||
ret = socket.recvfrom(receiver->get_recv_buffer(), receiver->get_recv_buffer_len(), 0, &sender_addr, &nread);
|
||||
ret = socket.recvfrom(receiver->get_recv_buffer(), receiver->get_recv_buffer_len(), 0, nullptr, &nread);
|
||||
|
||||
if (ret != RTP_OK) {
|
||||
if (WSAGetLastError() == WSAEWOULDBLOCK)
|
||||
|
|
@ -119,8 +171,6 @@ rtp_error_t __hevc_receiver(kvz_rtp::receiver *receiver)
|
|||
if ((frame = receiver->validate_rtp_frame(receiver->get_recv_buffer(), nread)) == nullptr)
|
||||
continue;
|
||||
|
||||
memcpy(&frame->src_addr, &sender_addr, sizeof(sockaddr_in));
|
||||
|
||||
/* Update session related statistics
|
||||
* If this is a new peer, RTCP will take care of initializing necessary stuff
|
||||
*
|
||||
|
|
@ -164,130 +214,72 @@ rtp_error_t __hevc_receiver(kvz_rtp::receiver *receiver)
|
|||
kvz_rtp::frame::HEADER_SIZE_HEVC_NAL +
|
||||
kvz_rtp::frame::HEADER_SIZE_HEVC_FU;
|
||||
|
||||
int type = __check_frame(frame);
|
||||
uint32_t c_ts = TS(frame);
|
||||
uint32_t c_seq = SEQ(frame);
|
||||
int frag_type = FRAG(frame);
|
||||
uint8_t nal_type = NAL(frame);
|
||||
|
||||
if (type == FT_NOT_FRAG) {
|
||||
if (frag_type == FT_NOT_FRAG) {
|
||||
receiver->return_frame(frame);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == FT_INVALID) {
|
||||
if (frag_type == FT_INVALID) {
|
||||
LOG_WARN("invalid frame received!");
|
||||
(void)kvz_rtp::frame::dealloc_frame(frame);
|
||||
continue;
|
||||
}
|
||||
|
||||
bool duplicate = true;
|
||||
/* initialize new frame */
|
||||
if (finfo.find(c_ts) == finfo.end()) {
|
||||
|
||||
/* Save the received frame to "frames" array where frames are indexed using
|
||||
* their sequence number. This way when all fragments of a frame are received,
|
||||
* we can loop through the range sframe_seq - eframe_seq and merge all fragments */
|
||||
if (frames[SEQ(frame)] == nullptr) {
|
||||
frames[SEQ(frame)] = frame;
|
||||
duplicate = false;
|
||||
/* when initializing a new frame, we need to keep twothings in mind:
|
||||
* 1) new intra frame will supersede older intra frame
|
||||
* 2) new inter fame is accepted only if current intra has been returned
|
||||
* to user or if it's still in progress, its timer has not run out */
|
||||
|
||||
} else if (TS(frame) != frames[SEQ(frame)]->header.timestamp) {
|
||||
(void)kvz_rtp::frame::dealloc_frame(frames[SEQ(frame)]);
|
||||
frames[SEQ(frame)] = frame;
|
||||
duplicate = false;
|
||||
} else {
|
||||
(void)kvz_rtp::frame::dealloc_frame(frame);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* If frames[SEQ(frame)] is not nullptr, there's actually two possibilites:
|
||||
* - The RTP frame is actually duplicate (quite rare)
|
||||
* - Previous HEVC was never returned to user and the RTP frames are still in the array
|
||||
*
|
||||
* Because this normal receiver does not have a notion of active frames (because all frames are active)
|
||||
* it is very much possible that an RTP frame is dropped but the receiver never notices is because it's
|
||||
* constructing multiple frames at once.
|
||||
*
|
||||
* This actually results in quite unintended behaviour where no data is returned to user.
|
||||
* This problem can be averted by checking the timestamp of frames[SEQ(frame)].
|
||||
*
|
||||
* If the entry is more than RTP_FRAME_MAX_DELAY milliseconds old, it can be released and
|
||||
* the entry is replaced with this current RTP frame */
|
||||
if (duplicate) {
|
||||
uint64_t diff = kvz_rtp::clock::hrc::diff_now(
|
||||
s_timers[frames[SEQ(frame)]->header.timestamp].sframe_time);
|
||||
|
||||
if (diff >= RTP_FRAME_MAX_DELAY) {
|
||||
LOG_ERROR("duplicate frame must be dropped");
|
||||
kvz_rtp::frame::dealloc_frame(frames[SEQ(frame)]);
|
||||
frames[SEQ(frame)] = frame;
|
||||
duplicate = false;
|
||||
} else {
|
||||
fprintf(stderr, "not old enough\n");
|
||||
}
|
||||
}
|
||||
|
||||
/* If this is the first packet received with this timestamp, create new entry
|
||||
* to s_timers and save current time.
|
||||
*
|
||||
* This timestamp is used to keep track of how long we've been receiving chunks
|
||||
* and if the time exceeds RTP_FRAME_MAX_DELAY, we drop the frame */
|
||||
if (s_timers.find(TS(frame)) == s_timers.end()) {
|
||||
/* UDP being unreliable, we can't know for sure in what order the packets are arriving.
|
||||
* Especially on linux where the fragment frames are batched and sent together it possible
|
||||
* that the first fragment we receive is the fragment containing the E-bit which sounds weird
|
||||
*
|
||||
* When the first fragment is received (regardless of its type), the timer is started and if the
|
||||
* fragment is special (S or E bit set), the sequence number is saved so we know the range of complete
|
||||
* full HEVC frame if/when all fragments have been received */
|
||||
|
||||
if (type == FT_START) {
|
||||
s_timers[TS(frame)].sframe_seq = SEQ(frame);
|
||||
s_timers[TS(frame)].eframe_seq = INVALID_SEQ;
|
||||
} else if (type == FT_END) {
|
||||
s_timers[TS(frame)].eframe_seq = SEQ(frame);
|
||||
s_timers[TS(frame)].sframe_seq = INVALID_SEQ;
|
||||
} else {
|
||||
s_timers[TS(frame)].sframe_seq = INVALID_SEQ;
|
||||
s_timers[TS(frame)].eframe_seq = INVALID_SEQ;
|
||||
/* if intra is incomplete and late, drop all inters */
|
||||
if (intra != INVALID_TS && nal_type == NT_INTER) {
|
||||
if (__frame_late(finfo.at(intra)))
|
||||
continue;
|
||||
}
|
||||
|
||||
s_timers[TS(frame)].sframe_time = kvz_rtp::clock::hrc::now();
|
||||
s_timers[TS(frame)].total_size = frame->payload_len - HEVC_HDR_SIZE;
|
||||
s_timers[TS(frame)].pkts_received = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
uint64_t diff = kvz_rtp::clock::hrc::diff_now(s_timers[TS(frame)].sframe_time);
|
||||
|
||||
if (diff > RTP_FRAME_MAX_DELAY) {
|
||||
LOG_ERROR("frame must be dropped, max delay reached: %lu!", diff);
|
||||
|
||||
if (dropped_frames.find(TS(frame)) == dropped_frames.end()) {
|
||||
dropped_frames[TS(frame)] = 1;
|
||||
} else {
|
||||
dropped_frames[TS(frame)]++;
|
||||
/* drop old intra if a new one is received */
|
||||
if (nal_type == NT_INTRA) {
|
||||
if (intra != INVALID_TS)
|
||||
__drop_frame(finfo, intra);
|
||||
intra = c_ts;
|
||||
}
|
||||
|
||||
frames[SEQ(frame)] = nullptr;
|
||||
(void)kvz_rtp::frame::dealloc_frame(frame);
|
||||
finfo[c_ts].s_seq = INVALID_SEQ;
|
||||
finfo[c_ts].e_seq = INVALID_SEQ;
|
||||
|
||||
if (frag_type == FT_START) finfo[c_ts].s_seq = c_seq;
|
||||
if (frag_type == FT_END) finfo[c_ts].e_seq = c_seq;
|
||||
|
||||
finfo[c_ts].sframe_time = kvz_rtp::clock::hrc::now();
|
||||
finfo[c_ts].total_size = frame->payload_len - HEVC_HDR_SIZE;
|
||||
finfo[c_ts].pkts_received = 1;
|
||||
|
||||
finfo[c_ts].fragments[c_seq] = frame;
|
||||
continue;
|
||||
}
|
||||
finfo[c_ts].fragments[c_seq] = frame;
|
||||
|
||||
if (!duplicate) {
|
||||
s_timers[TS(frame)].pkts_received++;
|
||||
s_timers[TS(frame)].total_size += (frame->payload_len - HEVC_HDR_SIZE);
|
||||
}
|
||||
finfo[TS(frame)].pkts_received += 1;
|
||||
finfo[TS(frame)].total_size += (frame->payload_len - HEVC_HDR_SIZE);
|
||||
|
||||
if (type == FT_START)
|
||||
s_timers[TS(frame)].sframe_seq = SEQ(frame);
|
||||
if (frag_type == FT_START)
|
||||
finfo[c_ts].s_seq = c_seq;
|
||||
|
||||
if (type == FT_END)
|
||||
s_timers[TS(frame)].eframe_seq = SEQ(frame);
|
||||
if (frag_type == FT_END)
|
||||
finfo[c_ts].e_seq = c_seq;
|
||||
|
||||
if (s_timers[TS(frame)].sframe_seq != INVALID_SEQ &&
|
||||
s_timers[TS(frame)].eframe_seq != INVALID_SEQ)
|
||||
{
|
||||
uint32_t ts = TS(frame);
|
||||
uint16_t s_seq = s_timers[ts].sframe_seq;
|
||||
uint16_t e_seq = s_timers[ts].eframe_seq;
|
||||
size_t ptr = 0;
|
||||
if (finfo[c_ts].s_seq != INVALID_SEQ && finfo[c_ts].e_seq != INVALID_SEQ) {
|
||||
size_t received = 0;
|
||||
size_t fptr = 0;
|
||||
size_t s_seq = finfo[c_ts].s_seq;
|
||||
size_t e_seq = finfo[c_ts].e_seq;
|
||||
|
||||
if (s_seq > e_seq)
|
||||
received = 0xffff - s_seq + e_seq + 2;
|
||||
|
|
@ -295,35 +287,45 @@ rtp_error_t __hevc_receiver(kvz_rtp::receiver *receiver)
|
|||
received = e_seq - s_seq + 1;
|
||||
|
||||
/* we've received every fragment and the frame can be reconstructed */
|
||||
if (received == s_timers[TS(frame)].pkts_received) {
|
||||
nal_header[0] = (frames[s_seq]->payload[0] & 0x81) | ((frame->payload[2] & 0x3f) << 1);
|
||||
nal_header[1] = frames[s_seq]->payload[1];
|
||||
if (received == finfo[c_ts].pkts_received) {
|
||||
uint8_t nal_header[2] = {
|
||||
(uint8_t)((FRAME(c_ts, s_seq)->payload[0] & 0x81) | ((frame->payload[2] & 0x3f) << 1)),
|
||||
(uint8_t)FRAME(c_ts, s_seq)->payload[1]
|
||||
};
|
||||
|
||||
kvz_rtp::frame::rtp_frame *out = kvz_rtp::frame::alloc_rtp_frame();
|
||||
|
||||
out->payload_len = s_timers[TS(frame)].total_size + kvz_rtp::frame::HEADER_SIZE_HEVC_NAL;
|
||||
out->payload_len = finfo[c_ts].total_size + kvz_rtp::frame::HEADER_SIZE_HEVC_NAL;
|
||||
out->payload = new uint8_t[out->payload_len];
|
||||
|
||||
std::memcpy(&out->header, &frames[s_seq]->header, kvz_rtp::frame::HEADER_SIZE_RTP);
|
||||
std::memcpy(out->payload, nal_header, kvz_rtp::frame::HEADER_SIZE_HEVC_NAL);
|
||||
std::memcpy(&out->header, &FRAME(c_ts, s_seq)->header, RTP_HDR_SIZE);
|
||||
std::memcpy(out->payload, nal_header, NAL_HDR_SIZE);
|
||||
|
||||
ptr += kvz_rtp::frame::HEADER_SIZE_HEVC_NAL;
|
||||
fptr += kvz_rtp::frame::HEADER_SIZE_HEVC_NAL;
|
||||
|
||||
for (size_t i = s_seq; i <= e_seq; ++i) {
|
||||
for (auto& fragment : finfo.at(c_ts).fragments) {
|
||||
std::memcpy(
|
||||
&out->payload[ptr],
|
||||
&frames[i]->payload[HEVC_HDR_SIZE],
|
||||
frames[i]->payload_len - HEVC_HDR_SIZE
|
||||
&out->payload[fptr],
|
||||
&fragment.second->payload[HEVC_HDR_SIZE],
|
||||
fragment.second->payload_len - HEVC_HDR_SIZE
|
||||
);
|
||||
ptr += frames[i]->payload_len - HEVC_HDR_SIZE;
|
||||
(void)kvz_rtp::frame::dealloc_frame(frames[i]);
|
||||
frames[i] = nullptr;
|
||||
fptr += fragment.second->payload_len - HEVC_HDR_SIZE;
|
||||
(void)kvz_rtp::frame::dealloc_frame(fragment.second);
|
||||
}
|
||||
|
||||
if (nal_type == NT_INTRA)
|
||||
intra = INVALID_TS;
|
||||
|
||||
receiver->return_frame(out);
|
||||
s_timers.erase(ts);
|
||||
finfo.erase(c_ts);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (__frame_late(finfo.at(c_ts))) {
|
||||
if (nal_type != NT_INTRA || (nal_type == NT_INTRA && !enable_idelay))
|
||||
__drop_frame(finfo, c_ts);
|
||||
}
|
||||
} while (ret == RTP_OK);
|
||||
}
|
||||
|
||||
|
|
|
|||
26
src/util.hh
26
src/util.hh
|
|
@ -111,6 +111,32 @@ enum RTP_CTX_ENABLE_FLAGS {
|
|||
RCE_LAST = 1 << 5,
|
||||
};
|
||||
|
||||
enum RTP_CTX_ENABLE_MEDIA_FLAGS {
|
||||
RCE_MEDIA_NO_FLAGS = 0 << 0,
|
||||
|
||||
/* When kvzRTP is receiving HEVC stream, as an attempt to improve
|
||||
* QoS, it will set frame delay for intra frames to be the same
|
||||
* as intra period.
|
||||
*
|
||||
* What this means is that if the regular timer expires for frame
|
||||
* (100 ms) and the frame type is intra, kvzRTP will not drop the
|
||||
* frame but will continue receiving packets in hopes that all the
|
||||
* packets of the intra frame will be received and the frame can be
|
||||
* returned to user. During this period, when the intra frame is deemed
|
||||
* to be late and incomplete, kvzRTP will drop all inter frames until
|
||||
* a) all the packets of late intra frame are received or
|
||||
* b) a new intra frame is received
|
||||
*
|
||||
* This behaviour should reduce the number of gray screens during
|
||||
* HEVC decoding but might cause the video stream to freeze for a while
|
||||
* which is subjectively lesser of two evils
|
||||
*
|
||||
* This behavior can be disabled with RCE_HEVC_NO_INTRA_DELAY
|
||||
* If this flag is given, kvzRTP treats all frame types
|
||||
* equally and drops all frames that are late */
|
||||
RCE_HEVC_NO_INTRA_DELAY = 1 << 0
|
||||
};
|
||||
|
||||
/* These options are given to configuration() */
|
||||
enum RTP_CTX_CONFIGURATION_FLAGS {
|
||||
/* No configuration flags */
|
||||
|
|
|
|||
Loading…
Reference in New Issue