Create optimistic HEVC fragment receiver
Small frames work, larger frames are sometimes dropped. This is supposed to reduce the amount of copying but the performance will degrade as network load increases
This commit is contained in:
parent
78401d6fa8
commit
3577b87379
|
@ -24,7 +24,7 @@ kvz_rtp::connection::connection(bool reader):
|
|||
wc_start_(0),
|
||||
fqueue_(nullptr)
|
||||
{
|
||||
rtp_sequence_ = kvz_rtp::random::generate_32();
|
||||
rtp_sequence_ = 1; //kvz_rtp::random::generate_32();
|
||||
rtp_ssrc_ = kvz_rtp::random::generate_32();
|
||||
rtp_payload_ = RTP_FORMAT_GENERIC;
|
||||
|
||||
|
|
|
@ -94,6 +94,7 @@ namespace kvz_rtp {
|
|||
uint32_t rtp_ssrc_;
|
||||
uint32_t rtp_timestamp_;
|
||||
uint64_t wc_start_;
|
||||
kvz_rtp::clock::hrc::hrc_t wc_start_2;
|
||||
uint32_t clock_rate_;
|
||||
|
||||
kvz_rtp::frame_queue *fqueue_;
|
||||
|
|
|
@ -1,6 +1,13 @@
|
|||
#ifdef _WIN32
|
||||
#else
|
||||
#include <sys/socket.h>
|
||||
#endif
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <unordered_map>
|
||||
#include <queue>
|
||||
|
||||
#include "conn.hh"
|
||||
#include "debug.hh"
|
||||
|
@ -13,8 +20,20 @@
|
|||
|
||||
#define PTR_DIFF(a, b) ((ptrdiff_t)((char *)(a) - (char *)(b)))
|
||||
|
||||
#define RTP_FRAME_MAX_DELAY 50
|
||||
#define INVALID_SEQ 0x13371338
|
||||
#define INVALID_TS 0xffffffff
|
||||
|
||||
#define MAX_DATAGRAMS 5
|
||||
#define RTP_FRAME_MAX_DELAY 50
|
||||
|
||||
#define RTP_HDR_SIZE 12
|
||||
#define NAL_HDR_SIZE 2
|
||||
#define FU_HDR_SIZE 1
|
||||
|
||||
#define INACTIVE(ts) (buff.inactive[(ts)])
|
||||
|
||||
const int DEFAULT_ALLOC_SIZE = 200 * MAX_PAYLOAD;
|
||||
const int REALLOC_SIZE = 50 * MAX_PAYLOAD; /* TODO: realloc size should be calculated dynamically */
|
||||
|
||||
enum FRAG_TYPES {
|
||||
FT_INVALID = -2, /* invalid combination of S and E bits */
|
||||
|
@ -419,12 +438,13 @@ error:
|
|||
#endif
|
||||
}
|
||||
|
||||
static int __check_frame(kvz_rtp::frame::rtp_frame *frame)
|
||||
/* Buffer contains three bytes: 2 byte NAL header and 1 byte FU header */
|
||||
static int __get_frame_type(uint8_t *buffer)
|
||||
{
|
||||
bool first_frag = frame->payload[2] & 0x80;
|
||||
bool last_frag = frame->payload[2] & 0x40;
|
||||
bool first_frag = buffer[2] & 0x80;
|
||||
bool last_frag = buffer[2] & 0x40;
|
||||
|
||||
if ((frame->payload[0] >> 1) != 49)
|
||||
if ((buffer[0] >> 1) != 49)
|
||||
return FT_NOT_FRAG;
|
||||
|
||||
if (first_frag && last_frag)
|
||||
|
@ -439,196 +459,610 @@ static int __check_frame(kvz_rtp::frame::rtp_frame *frame)
|
|||
return FT_MIDDLE;
|
||||
}
|
||||
|
||||
static inline uint32_t __get_next_ts(std::queue<uint32_t>& ts)
|
||||
{
|
||||
uint32_t n_ts = ts.front();
|
||||
ts.pop();
|
||||
|
||||
return n_ts;
|
||||
}
|
||||
|
||||
/* Calculate the absolute offset within frame at "index"
|
||||
*
|
||||
* For empty frames the start offet ("start") might be 0 so we need
|
||||
* to take that into consideration when calculating the offset */
|
||||
static inline size_t __calculate_offset(size_t start, size_t index)
|
||||
{
|
||||
size_t off = index * MAX_PAYLOAD;
|
||||
|
||||
if (start > NAL_HDR_SIZE)
|
||||
off = start - MAX_DATAGRAMS * MAX_PAYLOAD + off;
|
||||
else
|
||||
off += NAL_HDR_SIZE;
|
||||
|
||||
return off;
|
||||
}
|
||||
|
||||
/* TODO: tämä koodi pitää kommentoida todella hyvin! */
|
||||
|
||||
enum RELOC_TYPES {
|
||||
|
||||
/* Fragment is already in frame,
|
||||
* most likely in correct place too (verification needed) */
|
||||
RT_PAYLOAD = 0,
|
||||
|
||||
/* Fragment has been copied to probation zone and it should be
|
||||
* relocated to frame */
|
||||
RT_PROBATION = 1,
|
||||
|
||||
/* Probation zone has been disabled or it has run out of space and
|
||||
* the fragment has been copied to separate RTP frame and stored
|
||||
* to "probation" vector */
|
||||
RT_FRAME = 2
|
||||
};
|
||||
|
||||
struct reloc_info {
|
||||
size_t c_off; /* current offset in the frame */
|
||||
size_t d_off; /* destination offset */
|
||||
void *ptr; /* pointer to memory */
|
||||
int reloc_type; /* relocation type (see RELOC_TYPES) */
|
||||
};
|
||||
|
||||
struct inactive_info {
|
||||
size_t pkts_received;
|
||||
size_t total_size;
|
||||
size_t received_size;
|
||||
|
||||
kvz_rtp::frame::rtp_frame *frame;
|
||||
|
||||
size_t next_off;
|
||||
|
||||
/* TODO: relocs? */
|
||||
std::map<uint16_t, struct reloc_info> rinfo;
|
||||
|
||||
uint32_t s_seq;
|
||||
uint32_t e_seq;
|
||||
|
||||
kvz_rtp::clock::hrc::hrc_t start; /* clock reading when the first fragment is received */
|
||||
};
|
||||
|
||||
/* TODO: muuta recv_buffer sisältämään kaiken tämän turhan datan sijaan
|
||||
* vain inactive_infoja (future: frame_info) ja frame_info UNORDERED_MAPIN lisäksi
|
||||
* säilytä tieto aktiivisen framen timestampista! */
|
||||
|
||||
struct recv_buffer {
|
||||
/* One large block of contiguos memory
|
||||
* This may be much larger than the actual frame (large internal fragmentation)
|
||||
* but the algorithm tries to learn from TODO
|
||||
*
|
||||
* Henceforth this is the "receiver frame" */
|
||||
kvz_rtp::frame::rtp_frame *frame;
|
||||
|
||||
/* TODO: */
|
||||
kvz_rtp::frame::rtp_header rtp_headers[MAX_DATAGRAMS];
|
||||
uint8_t hevc_ext_buf[MAX_DATAGRAMS][NAL_HDR_SIZE + FU_HDR_SIZE];
|
||||
|
||||
/* TODO: */
|
||||
struct mmsghdr headers[MAX_DATAGRAMS];
|
||||
struct iovec iov[MAX_DATAGRAMS * 3];
|
||||
|
||||
/* Because UDP packets don't come in order (but we read them as if they are in order)
|
||||
* we need to some relocations when all fragments have been received.
|
||||
*
|
||||
* Keep a separate buffer of the relocation information which holds the current position
|
||||
* and the correct position of the frame within the full frame */
|
||||
std::vector<std::pair<size_t, size_t>> reloc_needed;
|
||||
|
||||
/* How many packets we've received */
|
||||
size_t pkts_received;
|
||||
|
||||
/* */
|
||||
size_t total_size;
|
||||
size_t received_size;
|
||||
|
||||
/* */
|
||||
size_t next_off;
|
||||
|
||||
/* Timestamp of the active frame */
|
||||
uint32_t active_ts;
|
||||
|
||||
/* Sequence number of previous frame's last fragment.
|
||||
*
|
||||
* This number is used to check the placement of new fragments within the frame
|
||||
* and calculate new position for the fragments if they're out of order */
|
||||
uint32_t prev_f_seq;
|
||||
|
||||
std::unordered_map<uint32_t, bool> seqs;
|
||||
|
||||
std::unordered_map<uint32_t, inactive_info> inactive;
|
||||
std::queue<uint32_t> tss; /* TODO: this is beyond ugly */
|
||||
|
||||
kvz_rtp::clock::hrc::hrc_t start; /* clock reading when the first fragment is received */
|
||||
uint32_t s_seq; /* sequence number of the frame with s-bit */
|
||||
uint32_t e_seq; /* sequence number of the frame with e-bit */
|
||||
};
|
||||
|
||||
struct frame_info {
|
||||
/* One big frame for all fragments, this is resized if all space is consumed */
|
||||
kvz_rtp::frame::rtp_frame *frame;
|
||||
|
||||
size_t total_size; /* allocated size */
|
||||
size_t received_size; /* used size */
|
||||
size_t pkts_received; /* # packets received (used to detect when all fragments have been received) */
|
||||
|
||||
/* next fragment slot in the frame */
|
||||
size_t next_off;
|
||||
|
||||
/* Fragments that require relocation within frame */
|
||||
std::map<uint16_t, struct reloc_info> rinfo;
|
||||
|
||||
/* If probation zone is disabled or all its memory has been used
|
||||
* fragments that cannot be relocated are pushed here and when all
|
||||
* fragments have been received, the fragments are copied from probation to the frame */
|
||||
/* TODO: käyät rinfoa tämän soktun sijaan */
|
||||
std::vector<kvz_rtp::frame::rtp_frame *> probation;
|
||||
|
||||
/* Store all received sequence numbers here so we can detect duplicate packets */
|
||||
std::unordered_map<uint32_t, bool> seqs;
|
||||
|
||||
/* start and end sequences of the frame (frames with S/E bit set) */
|
||||
uint32_t s_seq;
|
||||
uint32_t e_seq;
|
||||
|
||||
/* clock reading when the first fragment is received */
|
||||
kvz_rtp::clock::hrc::hrc_t start;
|
||||
};
|
||||
|
||||
struct frames {
|
||||
/* Global (and overwritable) buffers used for fragment receiving */
|
||||
kvz_rtp::frame::rtp_header rtp_headers[MAX_DATAGRAMS];
|
||||
uint8_t hevc_ext_buf[MAX_DATAGRAMS][NAL_HDR_SIZE + FU_HDR_SIZE];
|
||||
struct mmsghdr headers[MAX_DATAGRAMS];
|
||||
struct iovec iov[MAX_DATAGRAMS * 3];
|
||||
|
||||
/* timestamp of the old (still active frame), used to index finfo */
|
||||
uint32_t active_ts;
|
||||
|
||||
/* Frames are handled FIFO style so keep the timestamps in a queue */
|
||||
std::queue<uint32_t> tss;
|
||||
|
||||
/* All active and inactive frames */
|
||||
std::unordered_map<uint32_t, struct frame_info> finfo;
|
||||
};
|
||||
|
||||
rtp_error_t kvz_rtp::hevc::frame_receiver(kvz_rtp::reader *reader)
|
||||
{
|
||||
LOG_INFO("frameReceiver starting listening...");
|
||||
int socket = reader->get_raw_socket();
|
||||
int pkts_read = 0;
|
||||
|
||||
int nread = 0;
|
||||
rtp_error_t ret;
|
||||
sockaddr_in sender_addr;
|
||||
kvz_rtp::socket socket = reader->get_socket();
|
||||
kvz_rtp::frame::rtp_frame *frame, *frames[0xffff + 1] = { 0 };
|
||||
recv_buffer buff;
|
||||
buff.total_size = 2 * DEFAULT_ALLOC_SIZE + NAL_HDR_SIZE;
|
||||
buff.frame = kvz_rtp::frame::alloc_rtp_frame(buff.total_size);
|
||||
buff.next_off = NAL_HDR_SIZE;
|
||||
buff.pkts_received = 0;
|
||||
|
||||
uint8_t nal_header[2] = { 0 };
|
||||
std::map<uint32_t, hevc_fu_info> s_timers;
|
||||
std::map<uint32_t, size_t> dropped_frames;
|
||||
buff.active_ts = INVALID_TS;
|
||||
buff.prev_f_seq = INVALID_SEQ;
|
||||
buff.s_seq = INVALID_SEQ;
|
||||
buff.e_seq = INVALID_SEQ;
|
||||
|
||||
std::memset(buff.headers, 0, sizeof(buff.headers));
|
||||
|
||||
uint64_t avg_us = 0, avg_us_total = 0;
|
||||
uint64_t avg_fs = 0, avg_fs_total = 0;
|
||||
|
||||
while (reader->active()) {
|
||||
ret = socket.recvfrom(reader->get_recv_buffer(), reader->get_recv_buffer_len(), 0, &sender_addr, &nread);
|
||||
for (size_t i = 0, k = 0; i < MAX_DATAGRAMS; ++i, k += 3) {
|
||||
|
||||
if (ret != RTP_OK) {
|
||||
LOG_ERROR("recvfrom failed! FrameReceiver cannot continue!");
|
||||
return RTP_GENERIC_ERROR;;
|
||||
if (buff.next_off + MAX_DATAGRAMS * MAX_PAYLOAD > buff.total_size) {
|
||||
LOG_ERROR("Reallocate RTP frame from %u to %u!", buff.total_size, buff.total_size + REALLOC_SIZE);
|
||||
for (;;);
|
||||
#if 0
|
||||
auto tmp_frame = kvz_rtp::frame::alloc_rtp_frame(buff.total_size + REALLOC_SIZE);
|
||||
|
||||
std::memcpy(
|
||||
tmp_frame->payload,
|
||||
buff.frame->payload,
|
||||
buff.total_size
|
||||
);
|
||||
|
||||
(void)kvz_rtp::frame::dealloc_frame(buff.frame);
|
||||
buff.frame = tmp_frame;
|
||||
buff.total_size += REALLOC_SIZE;
|
||||
|
||||
/* for (;;); */
|
||||
#endif
|
||||
}
|
||||
|
||||
buff.iov[k + 0].iov_base = &buff.rtp_headers[i];
|
||||
buff.iov[k + 0].iov_len = sizeof(buff.rtp_headers[i]);
|
||||
|
||||
buff.iov[k + 1].iov_base = &buff.hevc_ext_buf[i];
|
||||
buff.iov[k + 1].iov_len = 3;
|
||||
|
||||
buff.iov[k + 2].iov_base = buff.frame->payload + buff.next_off;
|
||||
buff.iov[k + 2].iov_len = MAX_PAYLOAD;
|
||||
buff.next_off += MAX_PAYLOAD;
|
||||
|
||||
buff.headers[i].msg_hdr.msg_iov = &buff.iov[k];
|
||||
buff.headers[i].msg_hdr.msg_iovlen = 3;
|
||||
}
|
||||
|
||||
if ((frame = reader->validate_rtp_frame(reader->get_recv_buffer(), nread)) == nullptr) {
|
||||
LOG_DEBUG("received an invalid frame, discarding");
|
||||
continue;
|
||||
}
|
||||
memcpy(&frame->src_addr, &sender_addr, sizeof(sockaddr_in));
|
||||
|
||||
/* Update session related statistics
|
||||
* If this is a new peer, RTCP will take care of initializing necessary stuff
|
||||
*
|
||||
* Skip processing the packet if it was invalid. This is mostly likely caused
|
||||
* by an SSRC collision */
|
||||
if (reader->update_receiver_stats(frame) != RTP_OK)
|
||||
continue;
|
||||
|
||||
/* How to the frame is handled is based what its type is. Generic and Opus frames
|
||||
* don't require any extra processing so they can be returned to the user as soon as
|
||||
* they're received without any buffering.
|
||||
*
|
||||
* Frames that can be fragmented (only HEVC for now) require some preprocessing before they're returned.
|
||||
*
|
||||
* When a frame with an S-bit set is received, the frame is saved to "frames" array and an NTP timestamp
|
||||
* is saved. All subsequent frag frames are also saved in the "frames" array and they may arrive in
|
||||
* any order they want because they're saved in the array using the sequence number.
|
||||
*
|
||||
* Each time a new frame is received, the initial timestamp is compared with current time to see
|
||||
* how much time this frame still has left until it's discarded. Each frame is given N milliseconds
|
||||
* and if all its fragments are not received within that time window, the frame is considered invalid
|
||||
* and all fragments are discarded.
|
||||
*
|
||||
* When the last fragment is received (ie. the frame with E-bit set) AND if all previous fragments
|
||||
* very received too, frame_receiver() will call post-processing function to merge all the fragments
|
||||
* into one complete frame.
|
||||
*
|
||||
* If all previous fragments have not been received (ie. some frame is late), the code will wait
|
||||
* until the time windows closes. When that happens, the array is inspected once more to see if
|
||||
* all fragments were received and if so, the fragments are merged and returned to user.
|
||||
*
|
||||
* If some fragments were dropped, the whole HEVC frame is discarded
|
||||
*
|
||||
* Due to the nature of UDP, it's possible that during a fragment reception,
|
||||
* a stray RTP packet from earlier fragment might be received and that might corrupt
|
||||
* the reception process. To mitigate this, the sequence number and RTP timestamp of each
|
||||
* incoming packet is matched and checked against our own clock to get sense whether this packet valid
|
||||
*
|
||||
* Invalid packets (such as very late packets) are discarded automatically without further processing */
|
||||
const size_t HEVC_HDR_SIZE =
|
||||
kvz_rtp::frame::HEADER_SIZE_HEVC_NAL +
|
||||
kvz_rtp::frame::HEADER_SIZE_HEVC_FU;
|
||||
|
||||
int type = __check_frame(frame);
|
||||
|
||||
if (type == FT_NOT_FRAG) {
|
||||
reader->return_frame(frame);
|
||||
if ((pkts_read = recvmmsg(socket, buff.headers, MAX_DATAGRAMS, 0, nullptr)) < 0) {
|
||||
LOG_ERROR("recvmmsg() failed, %s", strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == FT_INVALID) {
|
||||
LOG_WARN("invalid frame received!");
|
||||
(void)kvz_rtp::frame::dealloc_frame(frame);
|
||||
break;
|
||||
}
|
||||
uint32_t p_seq = INVALID_SEQ;
|
||||
|
||||
/* TODO: this is ugly */
|
||||
bool duplicate = true;
|
||||
struct shift_info {
|
||||
bool shift_needed;
|
||||
size_t shift_size;
|
||||
size_t shift_offset;
|
||||
} sinfo;
|
||||
|
||||
/* Save the received frame to "frames" array where frames are indexed using
|
||||
* their sequence number. This way when all fragments of a frame are received,
|
||||
* we can loop through the range sframe_seq - eframe_seq and merge all fragments */
|
||||
if (frames[frame->header.seq] == nullptr) {
|
||||
frames[frame->header.seq] = frame;
|
||||
duplicate = false;
|
||||
}
|
||||
sinfo.shift_needed = false;
|
||||
sinfo.shift_size = 0;
|
||||
sinfo.shift_offset = 0;
|
||||
|
||||
/* If this is the first packet received with this timestamp, create new entry
|
||||
* to s_timers and save current time.
|
||||
*
|
||||
* This timestamp is used to keep track of how long we've been receiving chunks
|
||||
* and if the time exceeds RTP_FRAME_MAX_DELAY, we drop the frame */
|
||||
if (s_timers.find(frame->header.timestamp) == s_timers.end()) {
|
||||
/* UDP being unreliable, we can't know for sure in what order the packets are arriving.
|
||||
* Especially on linux where the fragment frames are batched and sent together it possible
|
||||
* that the first fragment we receive is the fragment containing the E-bit which sounds weird
|
||||
for (size_t i = 0; i < MAX_DATAGRAMS; ++i) {
|
||||
int type = __get_frame_type(buff.hevc_ext_buf[i]);
|
||||
|
||||
buff.rtp_headers[i].timestamp = ntohl(buff.rtp_headers[i].timestamp);
|
||||
buff.rtp_headers[i].seq = ntohs(buff.rtp_headers[i].seq);
|
||||
|
||||
uint32_t c_ts = buff.rtp_headers[i].timestamp;
|
||||
uint32_t c_seq = buff.rtp_headers[i].seq;
|
||||
|
||||
/* Previous fragment was returned to user/moved to other frame and now
|
||||
* it's considered garbage memory as far as active frame is concerned
|
||||
*
|
||||
* When the first fragment is received (regardless of its type), the timer is started and if the
|
||||
* fragment is special (S or E bit set), the sequence number is saved so we know the range of complete
|
||||
* full HEVC frame if/when all fragments have been received */
|
||||
* We must clean this "garbage" by shifting current fragment
|
||||
* (and all subsequent fragments) so the full HEVC frame can be decoded
|
||||
* successfully
|
||||
*
|
||||
* There are two kinds of shifts: overwriting and appending shifts.
|
||||
*
|
||||
* Overwriting shirts, as the name suggests, overwrite the previous content
|
||||
* so the shift offset is not updated between shifts. Non-fragments and fragments
|
||||
* that belong to other frames cause overwriting shifts.
|
||||
*
|
||||
* Overwriting shifts also cause appending shifts because when a fragment is removed,
|
||||
* and a valid fragment is shifted on its place, this valid fragment must not be overwritten
|
||||
* and all subsequent valid fragments must be appended.*/
|
||||
if (sinfo.shift_needed) {
|
||||
size_t c_off = __calculate_offset(buff.next_off, i);
|
||||
|
||||
if (type == FT_START) {
|
||||
s_timers[frame->header.timestamp].sframe_seq = frame->header.seq;
|
||||
s_timers[frame->header.timestamp].eframe_seq = INVALID_SEQ;
|
||||
} else if (type == FT_END) {
|
||||
s_timers[frame->header.timestamp].eframe_seq = frame->header.seq;
|
||||
s_timers[frame->header.timestamp].sframe_seq = INVALID_SEQ;
|
||||
} else {
|
||||
s_timers[frame->header.timestamp].sframe_seq = INVALID_SEQ;
|
||||
s_timers[frame->header.timestamp].eframe_seq = INVALID_SEQ;
|
||||
}
|
||||
|
||||
s_timers[frame->header.timestamp].sframe_time = kvz_rtp::clock::hrc::now();
|
||||
s_timers[frame->header.timestamp].total_size = frame->payload_len - HEVC_HDR_SIZE;
|
||||
s_timers[frame->header.timestamp].pkts_received = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
uint64_t diff = kvz_rtp::clock::hrc::diff_now(s_timers[frame->header.timestamp].sframe_time);
|
||||
|
||||
if (diff > RTP_FRAME_MAX_DELAY) {
|
||||
if (dropped_frames.find(frame->header.timestamp) == dropped_frames.end()) {
|
||||
dropped_frames[frame->header.timestamp] = 1;
|
||||
} else {
|
||||
dropped_frames[frame->header.timestamp]++;
|
||||
}
|
||||
|
||||
frames[frame->header.seq] = nullptr;
|
||||
(void)kvz_rtp::frame::dealloc_frame(frame);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!duplicate) {
|
||||
s_timers[frame->header.timestamp].pkts_received++;
|
||||
s_timers[frame->header.timestamp].total_size += (frame->payload_len - HEVC_HDR_SIZE);
|
||||
}
|
||||
|
||||
if (type == FT_START)
|
||||
s_timers[frame->header.timestamp].sframe_seq = frame->header.seq;
|
||||
|
||||
if (type == FT_END)
|
||||
s_timers[frame->header.timestamp].eframe_seq = frame->header.seq;
|
||||
|
||||
if (s_timers[frame->header.timestamp].sframe_seq != INVALID_SEQ &&
|
||||
s_timers[frame->header.timestamp].eframe_seq != INVALID_SEQ)
|
||||
{
|
||||
uint32_t ts = frame->header.timestamp;
|
||||
uint16_t s_seq = s_timers[ts].sframe_seq;
|
||||
uint16_t e_seq = s_timers[ts].eframe_seq;
|
||||
size_t ptr = 0;
|
||||
|
||||
/* we've received every fragment and the frame can be reconstructed */
|
||||
if (e_seq - s_seq + 1 == (ssize_t)s_timers[frame->header.timestamp].pkts_received) {
|
||||
nal_header[0] = (frames[s_seq]->payload[0] & 0x81) | ((frame->payload[2] & 0x3f) << 1);
|
||||
nal_header[1] = frames[s_seq]->payload[1];
|
||||
|
||||
kvz_rtp::frame::rtp_frame *out = kvz_rtp::frame::alloc_rtp_frame();
|
||||
|
||||
out->payload_len = s_timers[frame->header.timestamp].total_size + kvz_rtp::frame::HEADER_SIZE_HEVC_NAL;
|
||||
out->payload = new uint8_t[out->payload_len];
|
||||
|
||||
std::memcpy(&out->header, &frames[s_seq]->header, kvz_rtp::frame::HEADER_SIZE_RTP);
|
||||
std::memcpy(out->payload, nal_header, kvz_rtp::frame::HEADER_SIZE_HEVC_NAL);
|
||||
|
||||
ptr += kvz_rtp::frame::HEADER_SIZE_HEVC_NAL;
|
||||
|
||||
for (size_t i = s_seq; i <= e_seq; ++i) {
|
||||
/* We don't need to shift non-fragments and invalid data
|
||||
* because non-fragments will be copied to other frame from their currenct position
|
||||
* in the frame and both non-fragments and invalid packets will be overwritten */
|
||||
if (type != FT_NOT_FRAG && type != FT_INVALID) {
|
||||
std::memcpy(
|
||||
&out->payload[ptr],
|
||||
&frames[i]->payload[HEVC_HDR_SIZE],
|
||||
frames[i]->payload_len - HEVC_HDR_SIZE
|
||||
buff.frame->payload + sinfo.shift_offset,
|
||||
buff.frame->payload + c_off,
|
||||
MAX_PAYLOAD
|
||||
);
|
||||
ptr += frames[i]->payload_len - HEVC_HDR_SIZE;
|
||||
(void)kvz_rtp::frame::dealloc_frame(frames[i]);
|
||||
frames[i] = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (type == FT_NOT_FRAG) {
|
||||
size_t len = buff.headers[i].msg_len - RTP_HDR_SIZE;
|
||||
auto frame = kvz_rtp::frame::alloc_rtp_frame(len);
|
||||
size_t off = __calculate_offset(buff.next_off, i);
|
||||
|
||||
/* LOG_WARN("not fragment %u", len); */
|
||||
/* TODO: add good comment */
|
||||
if (!sinfo.shift_needed) {
|
||||
sinfo.shift_needed = true;
|
||||
sinfo.shift_offset = off;
|
||||
}
|
||||
|
||||
reader->return_frame(out);
|
||||
s_timers.erase(ts);
|
||||
std::memcpy(frame->payload + 0, &buff.hevc_ext_buf[i], 3);
|
||||
std::memcpy(frame->payload + 3, buff.frame->payload + off, len - 3);
|
||||
|
||||
buff.prev_f_seq = c_seq;
|
||||
|
||||
reader->return_frame(frame);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == FT_INVALID) {
|
||||
/* TODO: update shift info */
|
||||
LOG_WARN("Invalid frame received!");
|
||||
for (;;);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* this is for first frame only TODO: ei pidä paikkaansa enää */
|
||||
if (buff.active_ts == INVALID_TS)
|
||||
buff.active_ts = c_ts;
|
||||
|
||||
if (buff.active_ts == c_ts) {
|
||||
buff.pkts_received++;
|
||||
buff.received_size += (buff.headers[i].msg_len - RTP_HDR_SIZE - NAL_HDR_SIZE - FU_HDR_SIZE);
|
||||
|
||||
if (sinfo.shift_needed) {
|
||||
/* we have shifted the memory to correct place above but we need to
|
||||
* update the offset. Because this fragment belongs to this frame,
|
||||
* next shift (if there are fragments left) must be appending instead of overwriting shift */
|
||||
/* sinfo.shift_offset += MAX_PAYLOAD; */
|
||||
|
||||
/* TODO: käytä tätä kokoa, sillä saadaan framesta pienempi */
|
||||
sinfo.shift_offset += buff.headers[i].msg_len - RTP_HDR_SIZE - NAL_HDR_SIZE - FU_HDR_SIZE;
|
||||
}
|
||||
} else {
|
||||
/* seuraavan framen fragmentti tuli keskellä vielä keskeneräistä frame,
|
||||
* joten kaikkia fragmentteja jotka seuraavat tätä fragmenttia joudutaan shiftaamaan */
|
||||
|
||||
/* TODO: selitä tämä koko else höskä */
|
||||
|
||||
if (i + 1 < MAX_DATAGRAMS) {
|
||||
sinfo.shift_needed = true;
|
||||
sinfo.shift_offset = buff.next_off - MAX_DATAGRAMS * MAX_PAYLOAD + i * MAX_PAYLOAD;
|
||||
sinfo.shift_size = buff.headers[i].msg_len - RTP_HDR_SIZE - NAL_HDR_SIZE - FU_HDR_SIZE;
|
||||
}
|
||||
|
||||
/* TODO: selitä */
|
||||
size_t len = buff.headers[i].msg_len - RTP_HDR_SIZE - NAL_HDR_SIZE - FU_HDR_SIZE;
|
||||
size_t off = __calculate_offset(buff.next_off, i);
|
||||
|
||||
if (buff.inactive.find(c_ts) == buff.inactive.end()) {
|
||||
buff.tss.push(c_ts);
|
||||
|
||||
INACTIVE(c_ts).pkts_received = 0;
|
||||
INACTIVE(c_ts).frame = kvz_rtp::frame::alloc_rtp_frame(DEFAULT_ALLOC_SIZE + NAL_HDR_SIZE);
|
||||
INACTIVE(c_ts).total_size = DEFAULT_ALLOC_SIZE + NAL_HDR_SIZE;
|
||||
INACTIVE(c_ts).next_off = MAX_PAYLOAD + NAL_HDR_SIZE;
|
||||
INACTIVE(c_ts).received_size = MAX_PAYLOAD + NAL_HDR_SIZE;
|
||||
|
||||
if (type == FT_START) {
|
||||
INACTIVE(c_ts).s_seq = c_seq;
|
||||
INACTIVE(c_ts).start = kvz_rtp::clock::hrc::now();
|
||||
|
||||
fprintf(stderr, "start %u: copy %u bytes from %u to %u\n", c_ts, MAX_PAYLOAD, off, NAL_HDR_SIZE);
|
||||
std::memcpy(
|
||||
INACTIVE(c_ts).frame->payload + NAL_HDR_SIZE,
|
||||
buff.frame->payload + off,
|
||||
MAX_PAYLOAD
|
||||
);
|
||||
} else {
|
||||
LOG_WARN("frame must be copied to probation area");
|
||||
}
|
||||
|
||||
/* This is not the first fragment of an inactive frame, copy it to correct frame
|
||||
* or to probation area if its place cannot be determined (s_seq must be known) */
|
||||
} else {
|
||||
if (buff.inactive[c_ts].s_seq != INVALID_SEQ) {
|
||||
fprintf(stderr, "not start %u: copy %u bytes from %u to %u\n", c_ts, MAX_PAYLOAD, off, INACTIVE(c_ts).next_off);
|
||||
std::memcpy(
|
||||
INACTIVE(c_ts).frame->payload + INACTIVE(c_ts).next_off,
|
||||
buff.frame->payload + off,
|
||||
MAX_PAYLOAD
|
||||
);
|
||||
|
||||
INACTIVE(c_ts).rinfo.insert(
|
||||
std::make_pair<uint16_t, struct reloc_info>(
|
||||
(uint16_t)c_seq,
|
||||
{ INACTIVE(c_ts).next_off, 0 }
|
||||
)
|
||||
);
|
||||
|
||||
INACTIVE(c_ts).next_off += MAX_PAYLOAD;
|
||||
} else {
|
||||
LOG_WARN("frame must be copied to probation area");
|
||||
/* #ifdef __RTP_NO_PROBATION_ZONE__ */
|
||||
/* auto tmp_frame = kvz_rtp::frame::alloc_rtp_frame(MAX_PAYLOAD); */
|
||||
|
||||
/* std::memcpy( */
|
||||
/* tmp_frame->payload, */
|
||||
/* buff.frame->payload + off, */
|
||||
/* MAX_PAYLOAD */
|
||||
/* ); */
|
||||
|
||||
/* /1* TODO: *1/ */
|
||||
/* /1* INACTIVE(c_ts).probation.push_back(tmp_frame); *1/ */
|
||||
/* #else */
|
||||
/* if (buff.frame->probation_off != buff.frame->probation_len) { */
|
||||
/* std::memcpy( */
|
||||
/* buff.frame->probation + buff.frame->probation_off, */
|
||||
/* buff.frame->payload + off, */
|
||||
/* MAX_PAYLOAD */
|
||||
/* ); */
|
||||
|
||||
/* buff.frame->probation_off += MAX_PAYLOAD; */
|
||||
/* } else { */
|
||||
/* auto tmp_frame = kvz_rtp::frame::alloc_rtp_frame(MAX_PAYLOAD); */
|
||||
|
||||
/* std::memcpy( */
|
||||
/* tmp_frame->payload, */
|
||||
/* buff.frame->payload + off, */
|
||||
/* MAX_PAYLOAD */
|
||||
/* ); */
|
||||
|
||||
/* /1* TODO: *1/ */
|
||||
/* /1* INACTIVE(c_ts).probation.push_back(tmp_frame); *1/ */
|
||||
/* } */
|
||||
/* #endif */
|
||||
}
|
||||
}
|
||||
|
||||
INACTIVE(c_ts).pkts_received++;
|
||||
INACTIVE(c_ts).received_size += MAX_PAYLOAD;
|
||||
}
|
||||
|
||||
/* Create NAL header for the full frame using start fragments information */
|
||||
if (type == FT_START) {
|
||||
if (buff.active_ts == c_ts) {
|
||||
buff.s_seq = c_seq;
|
||||
|
||||
buff.frame->payload[0] = (buff.hevc_ext_buf[i][0] & 0x80) | ((buff.hevc_ext_buf[i][2] & 0x3f) << 1);
|
||||
buff.frame->payload[1] = (buff.hevc_ext_buf[i][1]);
|
||||
} else {
|
||||
INACTIVE(c_ts).s_seq = c_seq;
|
||||
|
||||
INACTIVE(c_ts).frame->payload[1] = (buff.hevc_ext_buf[i][1]);
|
||||
INACTIVE(c_ts).frame->payload[0] = (buff.hevc_ext_buf[i][0] & 0x80) | ((buff.hevc_ext_buf[i][2] & 0x3f) << 1);
|
||||
}
|
||||
}
|
||||
|
||||
if (type == FT_END) {
|
||||
if (buff.active_ts == c_ts)
|
||||
buff.e_seq = c_seq;
|
||||
else
|
||||
buff.inactive[c_ts].e_seq = c_seq;
|
||||
}
|
||||
|
||||
/* TODO: onko parempi hoitaa tämä? */
|
||||
if (buff.prev_f_seq == INVALID_SEQ)
|
||||
goto end;
|
||||
|
||||
/* There are three types of relocations:
|
||||
*
|
||||
* 1) Relocation within read:
|
||||
* This relocation can be done efficiently as we only need to
|
||||
* shuffle memory around and the shuffled objects are spatially
|
||||
* very close (at most MAX_DATAGRAMS * MAX_PAYLOAD bytes apart)
|
||||
*
|
||||
* Relocation within read is also called shifting (defined above)
|
||||
* and this special-case relocation has been taken care of at this
|
||||
* point in the execution
|
||||
*
|
||||
* 2) Relocation to other frame:
|
||||
* This relocation must be done because the received fragment is not part
|
||||
* of this frame
|
||||
*
|
||||
* This relocation has also been taken care of earlier (when the timestamp mismatch
|
||||
* with active and current frame was detected)
|
||||
*
|
||||
* 3) Relocation within frame:
|
||||
* This is the most complex type of relocation. We need to make a note in
|
||||
* the "reloc_needed" vector about this relocations and when TODO */
|
||||
if (buff.s_seq == INVALID_SEQ) {
|
||||
LOG_WARN("Relocation cannot be calculated");
|
||||
} else {
|
||||
|
||||
/* Fragments with S-bit set don't require relocation and
|
||||
* fragments that not part of this frame have already been
|
||||
* copied to correct frame (needing no further relocation for now) */
|
||||
if (type == FT_START || c_ts != buff.active_ts)
|
||||
goto end;
|
||||
|
||||
/* TODO: selitä */
|
||||
size_t off = __calculate_offset(buff.next_off, i);
|
||||
size_t block_off = (off - 2) / MAX_PAYLOAD;
|
||||
|
||||
if (block_off != c_seq - buff.s_seq) {
|
||||
if (!sinfo.shift_needed) {
|
||||
LOG_INFO("relocation needed for fragment: off %zu (real off %zu) index %zu", off, buff.next_off, i);
|
||||
LOG_INFO("s_seq %u c_seq %u\n", buff.s_seq, c_seq);
|
||||
}
|
||||
}
|
||||
}
|
||||
end:
|
||||
p_seq = c_seq;
|
||||
}
|
||||
|
||||
bool frame_changed = false;
|
||||
|
||||
/* If we have received all fragments, the frame can be returned. */
|
||||
if (buff.s_seq != INVALID_SEQ && buff.e_seq != INVALID_SEQ) {
|
||||
if ((ssize_t)buff.pkts_received == (buff.e_seq - buff.s_seq + 1)) {
|
||||
|
||||
/* TODO: resolve all relocations */
|
||||
|
||||
#if 0
|
||||
uint64_t diff = (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::high_resolution_clock::now() - buff.start
|
||||
).count();
|
||||
|
||||
avg_us += diff;
|
||||
avg_us_total++;
|
||||
avg_fs += buff.frame->payload_len;
|
||||
avg_fs_total++;
|
||||
|
||||
LOG_WARN("frame took %u us, %u ms, %u s (avg %u, fsize avg %u)",
|
||||
diff,
|
||||
diff / 1000,
|
||||
diff / 1000 / 1000,
|
||||
avg_us / avg_us_total,
|
||||
avg_fs / avg_fs_total
|
||||
);
|
||||
#endif
|
||||
|
||||
buff.frame->payload_len = buff.received_size;
|
||||
reader->return_frame(buff.frame);
|
||||
|
||||
fprintf(stderr, "------------------------------\n");
|
||||
|
||||
/* TODO: tyhjennä seqs-map / swappaa mappia! */
|
||||
|
||||
frame_changed = true;
|
||||
|
||||
/* Frames are resolved in order meaning that the next oldest frame is resolved next */
|
||||
if (buff.tss.size() != 0) {
|
||||
uint32_t n_ts = __get_next_ts(buff.tss);
|
||||
|
||||
buff.active_ts = n_ts;
|
||||
buff.frame = INACTIVE(n_ts).frame;
|
||||
buff.s_seq = INACTIVE(n_ts).s_seq;
|
||||
buff.e_seq = INACTIVE(n_ts).e_seq;
|
||||
buff.pkts_received = INACTIVE(n_ts).pkts_received;
|
||||
buff.total_size = INACTIVE(n_ts).total_size;
|
||||
buff.received_size = INACTIVE(n_ts).received_size;
|
||||
buff.next_off = INACTIVE(n_ts).next_off;
|
||||
buff.start = INACTIVE(n_ts).start;
|
||||
|
||||
/* Resolve all relocations (if any)
|
||||
* Record in relocation info vector doesn't necessarily mean that the
|
||||
* fragment is in wrong place/its place cannot be determined
|
||||
*
|
||||
* Relocating at this point in the frame's lifetime is a delicate issue IF the fragments
|
||||
* already received aren't contigous: TODO. selitä miksi ei voi välttämättä relokoida */
|
||||
if (INACTIVE(n_ts).rinfo.size() != 0) {
|
||||
/* LOG_ERROR("%zu relocations must be resolved!", INACTIVE(n_ts).rinfo.size()); */
|
||||
|
||||
uint32_t prev_seq = INACTIVE(n_ts).s_seq;
|
||||
|
||||
for (auto& i : INACTIVE(n_ts).rinfo) {
|
||||
if (i.first - 1 != prev_seq) {
|
||||
LOG_WARN("relocation cannot be performed, informatin missing!");
|
||||
}
|
||||
|
||||
/* TODO: varmista että fragmentin tämänhetkinen offset on oikea (saa laskemalla prev_seqin avulla) */
|
||||
|
||||
/* fprintf(stderr, "\t%u at %zu to %zu\n", */
|
||||
/* i.first, */
|
||||
/* i.second.c_off, */
|
||||
/* i.second.d_off */
|
||||
/* ); */
|
||||
prev_seq = i.first;
|
||||
}
|
||||
}
|
||||
|
||||
buff.inactive.erase(n_ts);
|
||||
|
||||
} else {
|
||||
buff.total_size = DEFAULT_ALLOC_SIZE + NAL_HDR_SIZE;
|
||||
buff.received_size = NAL_HDR_SIZE;
|
||||
buff.frame = kvz_rtp::frame::alloc_rtp_frame(DEFAULT_ALLOC_SIZE + NAL_HDR_SIZE);
|
||||
buff.s_seq = INVALID_SEQ;
|
||||
buff.e_seq = INVALID_SEQ;
|
||||
buff.pkts_received = 0;
|
||||
buff.next_off = 2;
|
||||
|
||||
/* TODO: selitä */
|
||||
buff.active_ts = INVALID_TS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (sinfo.shift_needed && !frame_changed) {
|
||||
buff.next_off = sinfo.shift_offset;
|
||||
sinfo.shift_needed = false;
|
||||
}
|
||||
|
||||
buff.prev_f_seq = p_seq;
|
||||
}
|
||||
|
||||
return ret;
|
||||
return RTP_OK;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue