Start using the new generic frame everywhere

This commit is contained in:
Aaro Altonen 2019-05-22 09:24:29 +03:00
parent 509d096b1d
commit bd8b9119db
11 changed files with 121 additions and 239 deletions

View File

@ -5,9 +5,9 @@
#include <iostream>
#include "conn.hh"
#include "util.hh"
#include "rtp_hevc.hh"
#include "rtp_opus.hh"
#include "util.hh"
RTPConnection::RTPConnection(bool reader):
reader_(reader)

View File

@ -6,13 +6,13 @@
#include <netinet/ip.h>
#endif
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "frame.hh"
#include "util.hh"
#include "rtp_generic.hh"
class RTPConnection {
@ -40,8 +40,6 @@ public:
void incTotalBytes(uint32_t nbytes);
void incProcessedPackets(uint32_t npackets);
void fillFrame(RTPGeneric::GenericFrame *frame);
void setConfig(void *config);
void *getConfig();

View File

@ -5,7 +5,7 @@
RTPFrame::Frame *RTPFrame::allocFrame(size_t payloadLen, frame_type_t type)
{
if (payloadLen == 0 || !(type <= FRAME_TYPE_HEVC_FU && type >= FRAME_TYPE_GENERIC)) {
if (payloadLen == 0 || INVALID_FRAME_TYPE(type)) {
LOG_ERROR("Invalid parameter!");
return nullptr;
}
@ -33,14 +33,14 @@ RTPFrame::Frame *RTPFrame::allocFrame(size_t payloadLen, frame_type_t type)
if (!frame)
return nullptr;
if ((frame->start = new uint8_t[payloadLen + headerLen]) == nullptr) {
if ((frame->header = new uint8_t[payloadLen + headerLen]) == nullptr) {
delete frame;
return nullptr;
}
frame->headerLen = headerLen;
frame->dataLen = payloadLen;
frame->data = frame->start + headerLen;
frame->data = frame->header + headerLen;
return frame;
}
@ -50,8 +50,8 @@ int RTPFrame::deallocFrame(RTPFrame::Frame *frame)
if (!frame)
return RTP_INVALID_VALUE;
if (frame->start)
delete frame->start;
if (frame->header)
delete frame->header;
delete frame;
return RTP_OK;
@ -64,7 +64,7 @@ int RTPFrame::sendFrame(RTPConnection *conn, RTPFrame::Frame *frame)
int ret;
if ((ret = RTPSender::writeGenericHeader(conn, frame->start, frame->headerLen)) != RTP_OK) {
if ((ret = RTPSender::writeGenericHeader(conn, frame->header, frame->headerLen)) != RTP_OK) {
LOG_ERROR("Failed to send header! Size %zu, Type %d", frame->headerLen, frame->frameType);
return ret;
}

View File

@ -2,7 +2,17 @@
#include "util.hh"
#define INVALID_FRAME_TYPE(ft) (ft < FRAME_TYPE_GENERIC || ft > FRAME_TYPE_HEVC_FU)
namespace RTPFrame {
enum HEADER_SIZES {
HEADER_SIZE_RTP = 12,
HEADER_SIZE_OPUS = 1,
HEADER_SIZE_HEVC_RTP = 2,
HEADER_SIZE_HEVC_FU = 1,
};
typedef enum FRAME_TYPE {
FRAME_TYPE_GENERIC = 0, // payload length + RTP Header size (N + 12)
FRAME_TYPE_OPUS = 1, // payload length + RTP Header size + Opus header (N + 12 + 0 [for now])
@ -10,16 +20,17 @@ namespace RTPFrame {
} frame_type_t;
struct Frame {
uint32_t rtpTimestamp;
uint32_t rtpSsrc;
uint16_t rtpSequence;
uint8_t rtpPayload;
uint32_t timestamp;
uint32_t ssrc;
uint16_t seq;
uint8_t payload;
uint8_t marker;
uint8_t *start;
uint8_t *header;
size_t headerLen;
uint8_t *data;
size_t dataLen;
size_t headerLen;
rtp_format_t rtpFormat;
frame_type_t frameType;
@ -33,4 +44,38 @@ namespace RTPFrame {
/* TODO: */
int sendFrame(RTPConnection *conn, RTPFrame::Frame *frame);
/* get pointer to RTP Header or nullptr if frame is invalid */
uint8_t *getRTPHeader(RTPFrame::Frame *frame)
{
if (!frame || INVALID_FRAME_TYPE(frame->frameType))
return nullptr;
return frame->header;
}
/* */
uint8_t *getOpusHeader(RTPFrame::Frame *frame)
{
if (!frame || !frame->header || frame->frameType != FRAME_TYPE_OPUS)
return nullptr;
return frame->header + HEADER_SIZE_RTP;
}
uint8_t *getHEVCRTPHeader(RTPFrame::Frame *frame)
{
if (!frame || !frame->header || frame->frameType != FRAME_TYPE_HEVC_FU)
return nullptr;
return frame->header + HEADER_SIZE_RTP;
}
uint8_t *getHEVCFUHeader(RTPFrame::Frame *frame)
{
if (!frame || !frame->header || frame->frameType != FRAME_TYPE_HEVC_FU)
return nullptr;
return frame->header + HEADER_SIZE_RTP + HEADER_SIZE_HEVC_RTP;
}
};

View File

@ -1,9 +1,9 @@
#include <cstring>
#include <iostream>
#include "reader.hh"
#include "debug.hh"
#include "frame.hh"
#include "reader.hh"
RTPReader::RTPReader(std::string srcAddr, int srcPort):
RTPConnection(true),
@ -18,14 +18,7 @@ RTPReader::RTPReader(std::string srcAddr, int srcPort):
RTPReader::~RTPReader()
{
active_ = false;
// TODO how to stop thread???
// private global variable set here from true to false and thread exist???
//
/* runner_.std::thread::~thread(); */
delete inPacketBuffer_;
/* delete runner_; */
}
int RTPReader::start()
@ -69,7 +62,7 @@ int RTPReader::start()
return 0;
}
RTPGeneric::GenericFrame *RTPReader::pullFrame()
RTPFrame::Frame *RTPReader::pullFrame()
{
while (framesOut_.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
@ -98,7 +91,7 @@ uint32_t RTPReader::getInPacketBufferLength() const
return inPacketBufferLen_;
}
void RTPReader::addOutgoingFrame(RTPGeneric::GenericFrame *frame)
void RTPReader::addOutgoingFrame(RTPFrame::Frame *frame)
{
if (!frame)
return;
@ -111,7 +104,7 @@ bool RTPReader::receiveHookInstalled()
return receiveHook_ != nullptr;
}
void RTPReader::installReceiveHook(void *arg, void (*hook)(void *arg, RTPGeneric::GenericFrame *))
void RTPReader::installReceiveHook(void *arg, void (*hook)(void *arg, RTPFrame::Frame *))
{
if (hook == nullptr)
{
@ -123,7 +116,7 @@ void RTPReader::installReceiveHook(void *arg, void (*hook)(void *arg, RTPGeneric
receiveHookArg_ = arg;
}
void RTPReader::receiveHook(RTPGeneric::GenericFrame *frame)
void RTPReader::receiveHook(RTPFrame::Frame *frame)
{
if (receiveHook_)
return receiveHook_(receiveHookArg_, frame);
@ -158,43 +151,41 @@ int RTPReader::frameReceiver(RTPReader *reader)
#endif
return -1;
} else {
/* LOG_INFO("Got %d bytes", ret); */
LOG_DEBUG("got %d bytes", ret);
const uint8_t *inBuffer = reader->getInPacketBuffer();
RTPGeneric::GenericFrame *frame = RTPGeneric::createGenericFrame();
RTPFrame::Frame *frame = RTPFrame::allocFrame(ret - RTP_HEADER_SIZE, RTPFrame::FRAME_TYPE_GENERIC);
if (!frame) {
LOG_ERROR("Failed to allocate GenericFrame!");
LOG_ERROR("Failed to allocate RTP Frame!");
continue;
}
frame->marker = (inBuffer[1] & 0x80) ? 1 : 0;
frame->rtp_payload = (inBuffer[1] & 0x7f);
frame->rtp_sequence = ntohs(*(uint16_t *)&inBuffer[2]);
frame->rtp_timestamp = ntohl(*(uint32_t *)&inBuffer[4]);
frame->rtp_ssrc = ntohl(*(uint32_t *)&inBuffer[8]);
frame->marker = (inBuffer[1] & 0x80) ? 1 : 0;
frame->payload = (inBuffer[1] & 0x7f);
frame->seq = ntohs(*(uint16_t *)&inBuffer[2]);
frame->timestamp = ntohl(*(uint32_t *)&inBuffer[4]);
frame->ssrc = ntohl(*(uint32_t *)&inBuffer[8]);
if (ret - 12 <= 0) {
LOG_WARN("Got an invalid payload of size %d", ret);
continue;
}
frame->data = new uint8_t[ret - 12];
frame->dataLen = ret - 12;
frame->header = new uint8_t[ret];
frame->headerLen = ret;
if (!frame->data) {
LOG_ERROR("Failed to allocate buffer for GenericFrame!");
LOG_ERROR("Failed to allocate buffer for RTP frame!");
continue;
}
memcpy(frame->data, &inBuffer[12], frame->dataLen);
reader->addOutgoingFrame(frame);
memcpy(frame->header, inBuffer, ret);
if (reader->receiveHookInstalled())
reader->receiveHook(frame);
else
reader->addOutgoingFrame(frame);
}
}
LOG_INFO("FrameReceiver thread exiting...");

View File

@ -3,7 +3,7 @@
#include <thread>
#include "conn.hh"
#include "rtp_generic.hh"
#include "frame.hh"
class RTPReader : public RTPConnection {
@ -11,8 +11,10 @@ public:
RTPReader(std::string srcAddr, int srcPort);
~RTPReader();
// TODO
RTPGeneric::GenericFrame *pullFrame();
/*
*
* NOTE: this operation is blocking */
RTPFrame::Frame *pullFrame();
// open socket and start runner_
int start();
@ -20,13 +22,13 @@ public:
bool active();
bool receiveHookInstalled();
void receiveHook(RTPGeneric::GenericFrame *frame);
void installReceiveHook(void *arg, void (*hook)(void *arg, RTPGeneric::GenericFrame *));
void receiveHook(RTPFrame::Frame *frame);
void installReceiveHook(void *arg, void (*hook)(void *arg, RTPFrame::Frame *));
uint8_t *getInPacketBuffer() const;
uint32_t getInPacketBufferLength() const;
void addOutgoingFrame(RTPGeneric::GenericFrame *frame);
void addOutgoingFrame(RTPFrame::Frame *frame);
private:
static int frameReceiver(RTPReader *reader);
@ -43,10 +45,10 @@ private:
uint8_t *inPacketBuffer_; /* Buffer for incoming packet (MAX_PACKET) */
uint32_t inPacketBufferLen_;
std::vector<RTPGeneric::GenericFrame *> framesOut_;
std::vector<RTPFrame::Frame *> framesOut_;
std::mutex framesMtx_;
// TODO
void *receiveHookArg_;
void (*receiveHook_)(void *arg, RTPGeneric::GenericFrame *frame);
void (*receiveHook_)(void *arg, RTPFrame::Frame *frame);
};

View File

@ -11,158 +11,20 @@
#include "debug.hh"
#include "conn.hh"
#include "rtp_generic.hh"
#include "send.hh"
#include "util.hh"
#include "writer.hh"
RTPGeneric::GenericFrame *RTPGeneric::createGenericFrame()
// TODO implement frame splitting if dataLen > MTU
// TODO write timestamp to RTP header
int RTPGeneric::pushGenericFrame(RTPConnection *conn, uint8_t *data, size_t dataLen, uint32_t timestamp)
{
RTPGeneric::GenericFrame *frame = new RTPGeneric::GenericFrame();
int ret;
if (!frame)
return nullptr;
frame->data = nullptr;
frame->dataLen = 0;
frame->rtp_timestamp = 0;
frame->marker = 0;
frame->rtp_ssrc = 0;
frame->rtp_sequence = 0;
frame->rtp_payload = 0;
frame->rtp_format = RTP_FORMAT_GENERIC;
return frame;
}
RTPGeneric::GenericFrame *RTPGeneric::createGenericFrame(uint32_t dataLen)
{
RTPGeneric::GenericFrame *frame = RTPGeneric::createGenericFrame();
if (!frame)
return nullptr;
if ((frame->data = new uint8_t[dataLen + RTP_HEADER_SIZE]) == nullptr) {
delete frame;
return nullptr;
if ((ret = RTPSender::writeRTPHeader(conn)) != RTP_OK) {
LOG_ERROR("Failed to write RTP Header for Opus frame!");
return ret;
}
frame->dataLen = dataLen + RTP_HEADER_SIZE;
frame->data += RTP_HEADER_SIZE;
return frame;
}
void RTPGeneric::destroyGenericFrame(RTPGeneric::GenericFrame *frame)
{
if (!frame)
return;
if (frame->data)
delete (frame->data - RTP_HEADER_SIZE);
delete frame;
}
int RTPGeneric::pushGenericFrame(RTPConnection *conn, RTPGeneric::GenericFrame *frame)
{
if (!conn || !frame)
return RTP_INVALID_VALUE;
uint8_t *buffer = frame->data - RTP_HEADER_SIZE;
buffer[0] = 2 << 6; /* RTP version */
buffer[1] = (frame->rtp_format & 0x7f) | (0 << 7);
*(uint16_t *)&buffer[2] = htons(frame->rtp_sequence);
*(uint32_t *)&buffer[4] = htonl(frame->rtp_timestamp);
*(uint32_t *)&buffer[8] = htonl(frame->rtp_ssrc);
RTPWriter *writer = dynamic_cast<RTPWriter *>(conn);
sockaddr_in outAddr = writer->getOutAddress();
if (sendto(conn->getSocket(), buffer, frame->dataLen, 0, (struct sockaddr *)&outAddr, sizeof(outAddr)) == -1) {
perror("pushGenericFrame");
return RTP_GENERIC_ERROR;
}
conn->incRTPSequence(1);
frame->rtp_sequence++;
/* Update statistics */
#ifdef __RTP_STATS__
conn->incProcessedBytes(frame->dataLen - RTP_HEADER_SIZE);
conn->incOverheadBytes(RTP_HEADER_SIZE);
conn->incTotalBytes(frame->dataLen);
conn->incProcessedPackets(1);
#endif
return RTP_OK;
}
int RTPGeneric::pushGenericFrame(RTPConnection *conn, uint8_t *data, uint32_t dataLen, uint32_t timestamp)
{
(void)timestamp;
if (!conn || !data)
return RTP_INVALID_VALUE;
RTPWriter *writer = dynamic_cast<RTPWriter *>(conn);
sockaddr_in outAddr = writer->getOutAddress();
#ifdef __linux__
uint8_t header[RTP_HEADER_SIZE] = { 0 };
header[0] = 2 << 6; // RTP version
header[1] = (conn->getPayloadType() & 0x7f) | (0 << 7);
*(uint16_t *)&header[2] = htons(conn->getSequence());
*(uint32_t *)&header[4] = htonl(conn->getTimestamp());
*(uint32_t *)&header[8] = htonl(conn->getSSRC());
if (sendto(conn->getSocket(), header, RTP_HEADER_SIZE, MSG_MORE, (struct sockaddr *)&outAddr, sizeof(outAddr)) == -1) {
perror("pushGenericFrame");
return RTP_GENERIC_ERROR;
}
if (sendto(conn->getSocket(), data, dataLen, 0, (struct sockaddr *)&outAddr, sizeof(outAddr)) == -1) {
perror("pushGenericFrame");
return RTP_GENERIC_ERROR;
}
#else
/* TODO:
*
*
* winsock API supports MSG_PARTIAL with WSASend, use that!
*
* */
uint8_t buffer[MAX_PACKET] = { 0 };
buffer[0] = 2 << 6; // RTP version
buffer[1] = (conn->getPayloadType() & 0x7f) | (0 << 7);
*(uint16_t *)&buffer[2] = htons(conn->getSequence());
*(uint32_t *)&buffer[4] = htonl(conn->getTimestamp());
*(uint32_t *)&buffer[8] = htonl(conn->getSSRC());
memcpy(&buffer[12], data, dataLen);
if (sendto(conn->getSocket(), buffer, dataLen + 12, 0, (struct sockaddr *)&outAddr, sizeof(outAddr)) == -1) {
perror("pushGenericFrame");
return RTP_GENERIC_ERROR;
}
#endif
conn->incRTPSequence(1);
#ifdef __RTP_STATS__
/* Update statistics */
conn->incProcessedBytes(dataLen);
conn->incOverheadBytes(12);
conn->incTotalBytes(dataLen + 12);
conn->incProcessedPackets(1);
#endif
return RTP_OK;
return RTPSender::writePayload(conn, data, dataLen);
}

View File

@ -5,22 +5,5 @@
class RTPConnection;
namespace RTPGeneric {
struct GenericFrame {
uint32_t rtp_timestamp;
uint32_t dataLen;
uint32_t rtp_ssrc;
uint16_t rtp_sequence;
uint8_t *data;
uint8_t rtp_payload;
uint8_t marker;
rtp_format_t rtp_format;
};
RTPGeneric::GenericFrame *createGenericFrame();
RTPGeneric::GenericFrame *createGenericFrame(uint32_t dataLen);
void destroyGenericFrame(RTPGeneric::GenericFrame *frame);
int pushGenericFrame(RTPConnection *conn, RTPGeneric::GenericFrame *frame);
int pushGenericFrame(RTPConnection *conn, uint8_t *data, uint32_t dataLen, uint32_t timestamp);
int pushGenericFrame(RTPConnection *conn, uint8_t *data, size_t dataLen, uint32_t timestamp);
};

View File

@ -6,7 +6,7 @@
#include "rtp_hevc.hh"
#include "conn.hh"
using RTPGeneric::GenericFrame;
/* using RTPGeneric::GenericFrame; */
static int nextFrameStart(uint8_t *data, uint32_t offset, uint32_t dataLen, uint8_t& startLen)
{
@ -32,22 +32,22 @@ static int nextFrameStart(uint8_t *data, uint32_t offset, uint32_t dataLen, uint
static int __pushHevcFrame(RTPConnection *conn, uint8_t *data, uint32_t dataLen, uint32_t timestamp)
{
int ret = 0;
uint32_t bufferlen = 0;
uint32_t dataPos = 0;
uint32_t dataLeft = dataLen;
GenericFrame *frame = nullptr;
int ret = 0;
uint32_t bufferlen = 0;
uint32_t dataPos = 0;
uint32_t dataLeft = dataLen;
RTPFrame::Frame *frame = nullptr;
if (dataLen <= MAX_PAYLOAD) {
LOG_DEBUG("send unfrag size %u, type %u", dataLen, (uint32_t)((data[0] >> 1) & 0x3F));
return RTPGeneric::pushGenericFrame(conn, data, dataLen, 0);
/* return RTPGeneric::pushGenericFrame(conn, data, dataLen, 0); */
}
if ((frame = RTPGeneric::createGenericFrame(MAX_PAYLOAD)) == nullptr)
return -1;
/* if ((frame = RTPFrame::allocFrame(MAX_PAYLOAD, FRAME_TYPE_HEVC_FU)) == nullptr) */
/* return RTP_GENERIC_ERROR; */
conn->fillFrame(frame);
frame->rtp_format = RTP_FORMAT_HEVC;
/* conn->fillFrame(frame); */
frame->rtpFormat = RTP_FORMAT_HEVC;
LOG_DEBUG("send frag size: %u, type %u", dataLen, (data[0] >> 1) & 0x3F);
uint8_t nalType = (data[0] >> 1) & 0x3F;
@ -64,8 +64,8 @@ static int __pushHevcFrame(RTPConnection *conn, uint8_t *data, uint32_t dataLen,
while (dataLeft + 3 > MAX_PAYLOAD) {
memcpy(&frame->data[3], &data[dataPos], MAX_PAYLOAD - 3);
if ((ret = RTPGeneric::pushGenericFrame(conn, frame)))
goto end;
/* if ((ret = RTPGeneric::pushGenericFrame(conn, frame))) */
/* goto end; */
dataPos += (MAX_PAYLOAD - 3);
dataLeft -= (MAX_PAYLOAD - 3);
@ -78,14 +78,14 @@ static int __pushHevcFrame(RTPConnection *conn, uint8_t *data, uint32_t dataLen,
frame->data[2] |= 1 << 6;
memcpy(&frame->data[3], &data[dataPos], dataLeft);
ret = RTPGeneric::pushGenericFrame(conn, frame->data, dataLeft + 3, 1);
/* ret = RTPGeneric::pushGenericFrame(conn, frame->data, dataLeft + 3, 1); */
end:
RTPGeneric::destroyGenericFrame(frame);
RTPFrame::deallocFrame(frame);
return ret;
}
int RTPHevc::pushHevcFrame(RTPConnection *conn, uint8_t *data, uint32_t dataLen, uint32_t timestamp)
int RTPHevc::pushHevcFrame(RTPConnection *conn, uint8_t *data, size_t dataLen, uint32_t timestamp)
{
uint8_t startLen;
uint32_t previousOffset = 0;

View File

@ -3,5 +3,5 @@
#include "rtp_generic.hh"
namespace RTPHevc {
int pushHevcFrame(RTPConnection *conn, uint8_t *data, uint32_t dataLen, uint32_t timestamp);
int pushHevcFrame(RTPConnection *conn, uint8_t *data, size_t dataLen, uint32_t timestamp);
};

View File

@ -3,6 +3,7 @@
#include <stdint.h>
#include "conn.hh"
#include "frame.hh"
class RTPWriter : public RTPConnection {
@ -18,7 +19,7 @@ public:
int pushFrame(uint8_t *data, uint32_t datalen, rtp_format_t fmt, uint32_t timestamp);
// TODO
int pushGenericFrameFrame(RTPGeneric::GenericFrame *frame);
int pushFrame(RTPFrame::Frame *frame);
sockaddr_in getOutAddress();