Separate RTPConnection into RTPReader and RTPWriter

This commit is contained in:
Aaro Altonen 2019-04-01 12:00:37 +03:00
parent e0fe94b13d
commit 008107c33f
11 changed files with 343 additions and 205 deletions

View File

@ -9,88 +9,20 @@
#include "rtp_hevc.hh"
#include "rtp_opus.hh"
RTPConnection::RTPConnection(std::string dstAddr, int dstPort, int srcPort):
dstAddr_(dstAddr),
dstPort_(dstPort),
srcPort_(srcPort),
rtp_payload_(96)
RTPConnection::RTPConnection(bool reader):
reader_(reader)
{
rtp_sequence_ = 45175;
rtp_timestamp_ = 123456;
rtp_ssrc_ = 0x72b644;
rtp_payload_ = RTP_FORMAT_HEVC;
}
RTPConnection::~RTPConnection()
{
delete inPacketBuffer_;
delete runner_;
}
int RTPConnection::open()
{
if ((socket_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("RTPConnection::open");
return RTP_SOCKET_ERROR;
}
int value = 1;
if (setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(int)) < 0)
perror("setsockopt(SO_REUSEADDR) failed");
memset(&addrOut_, 0, sizeof(addrOut_));
addrOut_.sin_family = AF_INET;
inet_pton(AF_INET, dstAddr_.c_str(), &addrOut_.sin_addr);
addrOut_.sin_port = htons(dstPort_);
memset(&addrIn_, 0, sizeof(addrIn_));
addrIn_.sin_family = AF_INET;
addrIn_.sin_addr.s_addr = htonl(INADDR_ANY);
addrIn_.sin_port = htons(srcPort_);
/* if (bind(socket_, (struct sockaddr *) &addrIn_, sizeof(addrIn_)) < 0) { */
/* perror("RTPConnection::open"); */
/* return RTP_BIND_ERROR; */
/* } */
inPacketBuffer_ = new uint8_t[MAX_PACKET];
inPacketBufferLen_ = MAX_PACKET;
/* Start a blocking recv thread */
runner_ = new std::thread(rtpRecvData, this);
id_ = rtpGetUniqueId();
return 0;
}
int RTPConnection::pushFrame(uint8_t *data, uint32_t datalen, rtp_format_t fmt, uint32_t timestamp)
{
switch (fmt) {
case RTP_FORMAT_HEVC:
return RTPHevc::pushHevcFrame(this, data, datalen, timestamp);
case RTP_FORMAT_OPUS:
return RTPOpus::pushOpusFrame(this, data, datalen, timestamp);
default:
return RTPGeneric::pushGenericFrame(this, data, datalen, timestamp);
}
}
RTPGeneric::GenericFrame *RTPConnection::pullFrame()
{
while (framesOut_.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
framesMtx_.lock();
auto nextFrame = framesOut_.front();
framesOut_.erase(framesOut_.begin());
framesMtx_.unlock();
return nextFrame;
}
void RTPConnection::setPayload(rtp_format_t fmt)
void RTPConnection::setPayloadType(rtp_format_t fmt)
{
rtp_payload_ = fmt;
}
@ -125,7 +57,7 @@ uint32_t RTPConnection::getSSRC() const
return rtp_ssrc_;
}
uint8_t RTPConnection::getPayload() const
uint8_t RTPConnection::getPayloadType() const
{
return rtp_payload_;
}
@ -160,21 +92,6 @@ void RTPConnection::incProcessedPackets(uint32_t npackets)
processedPackets_ += npackets;
}
sockaddr_in RTPConnection::getOutAddress() const
{
return addrOut_;
}
uint8_t *RTPConnection::getInPacketBuffer() const
{
return inPacketBuffer_;
}
uint32_t RTPConnection::getInPacketBufferLength() const
{
return inPacketBufferLen_;
}
void RTPConnection::fillFrame(RTPGeneric::GenericFrame *frame)
{
if (!frame)
@ -183,13 +100,4 @@ void RTPConnection::fillFrame(RTPGeneric::GenericFrame *frame)
frame->rtp_sequence = rtp_sequence_;
frame->rtp_timestamp = rtp_timestamp_;
frame->rtp_ssrc = rtp_ssrc_;
/* std::cerr << rtp_sequence_ << " " << rtp_timestamp_ << " " << rtp_ssrc_; */
}
void RTPConnection::addOutgoingFrame(RTPGeneric::GenericFrame *frame)
{
if (!frame)
return;
framesOut_.push_back(frame);
}

View File

@ -17,24 +17,22 @@
class RTPConnection {
public:
RTPConnection(std::string dstAddr, int dstPort, int srcPort);
~RTPConnection();
RTPConnection(bool reader);
virtual ~RTPConnection();
// TODO
int open();
virtual int start() = 0;
// getters
uint32_t getId() const;
uint16_t getSequence() const;
uint32_t getTimestamp() const;
uint32_t getSSRC() const;
uint8_t getPayload() const;
uint8_t getPayloadType() const;
int getSocket() const;
sockaddr_in getOutAddress() const;
uint8_t *getInPacketBuffer() const;
uint32_t getInPacketBufferLength() const;
void setPayload(rtp_format_t fmt);
void setPayloadType(rtp_format_t fmt);
void incRTPSequence(uint16_t seq);
void incProcessedBytes(uint32_t nbytes);
@ -42,39 +40,18 @@ public:
void incTotalBytes(uint32_t nbytes);
void incProcessedPackets(uint32_t npackets);
int pushFrame(uint8_t *data, uint32_t datalen, rtp_format_t fmt, uint32_t timestamp);
RTPGeneric::GenericFrame *pullFrame();
void fillFrame(RTPGeneric::GenericFrame *frame);
void addOutgoingFrame(RTPGeneric::GenericFrame *frame);
void setConfig(uint8_t *config);
uint8_t *getConfig();
private:
std::string dstAddr_;
int dstPort_;
int srcPort_;
protected:
uint8_t *config_;
uint32_t id_;
int socket_;
sockaddr_in addrIn_;
sockaddr_in addrOut_;
/* fRTPFormat inFormat; */
/* fRTPFormat outFormat; */
/* Receiving */
std::thread *runner_;
uint8_t *inPacketBuffer_; /* Buffer for incoming packet (MAX_PACKET) */
uint32_t inPacketBufferLen_;
uint8_t *frameBuffer_; /* Larger buffer for storing incoming frame */
uint32_t frameBufferLen_;
std::vector<RTPGeneric::GenericFrame *> framesOut_;
std::mutex framesMtx_;
private:
bool reader_;
// RTP
uint16_t rtp_sequence_;
@ -87,6 +64,4 @@ private:
uint32_t overheadBytes_;
uint32_t totalBytes_;
uint32_t processedPackets_;
uint8_t *config_;
};

View File

@ -20,36 +20,50 @@ RTPContext::~RTPContext()
std::terminate();
}
RTPConnection *RTPContext::openConnection(std::string dstAddr, int dstPort, int srcPort)
RTPReader *RTPContext::createReader(std::string srcAddr, int srcPort)
{
RTPConnection *conn = new RTPConnection(dstAddr, dstPort, srcPort);
RTPReader *reader = new RTPReader(srcAddr, srcPort);
if (!conn) {
std::cerr << "Failed to create RTP connection!" << std::endl;
if (!reader) {
std::cerr << "Failed to create RTPReader for " << srcAddr << ":" << srcPort << "!" << std::endl;
return nullptr;
}
if (conn->open() < 0) {
std::cerr << "Failed to open RTP connection!" << std::endl;
delete conn;
conns_.insert(std::pair<int, RTPConnection *>(reader->getId(), reader));
return reader;
}
RTPReader *RTPContext::createReader(std::string srcAddr, int srcPort, rtp_format_t fmt)
{
RTPReader *reader = nullptr;
if ((reader = createReader(srcAddr, srcPort)) == nullptr)
return nullptr;
reader->setPayloadType(fmt);
return reader;
}
RTPWriter *RTPContext::createWriter(std::string dstAddr, int dstPort)
{
RTPWriter *writer = new RTPWriter(dstAddr, dstPort);
if (!writer) {
std::cerr << "Failed to create RTPWriter for " << dstAddr << ":" << dstPort << "!" << std::endl;
return nullptr;
}
conns_.insert(std::pair<int, RTPConnection *>(conn->getId(), conn));
return conn;
conns_.insert(std::pair<int, RTPConnection *>(writer->getId(), writer));
return writer;
}
int RTPContext::closeConnection(int id)
RTPWriter *RTPContext::createWriter(std::string dstAddr, int dstPort, rtp_format_t fmt)
{
std::map<uint32_t, RTPConnection *>::iterator it;
RTPWriter *writer = nullptr;
if ((it = conns_.find(id)) == conns_.end()) {
std::cerr << "Connection with id " << id << " does not exist!" << std::endl;
return -1;
}
if ((writer = createWriter(dstAddr, dstPort)) == nullptr)
return nullptr;
delete it->second;
conns_.erase(it);
return 0;
writer->setPayloadType(fmt);
return writer;
}

View File

@ -2,6 +2,8 @@
#include <map>
#include "conn.hh"
#include "reader.hh"
#include "writer.hh"
class RTPContext {
@ -9,6 +11,19 @@ public:
RTPContext();
~RTPContext();
/* Start listening to incoming RTP packets form srcAddr:srcPort
*
* Read packets are stored in a ring buffer which can be read by
* calling RTPReader::pullFrame() */
RTPReader *createReader(std::string srcAddr, int srcPort);
RTPReader *createReader(std::string srcAddr, int srcPort, rtp_format_t fmt);
/* Open connection for writing RTP packets to dstAddr:dstPort
*
* Packets can be sent by calling RTPWriter::pushFrame() */
RTPWriter *createWriter(std::string dstAddr, int dstPort);
RTPWriter *createWriter(std::string dstAddr, int dstPort, rtp_format_t fmt);
RTPConnection *openConnection(std::string dstAddr, int dstPort, int srcPort);
int closeConnection(int id);

144
src/reader.cc Normal file
View File

@ -0,0 +1,144 @@
#include <cstring>
#include <iostream>
#include "reader.hh"
static int frameReceiver(RTPReader *reader)
{
sockaddr_in fromAddr;
std::cerr << "starting to listen to address and port" << std::endl;
while (reader->active()) {
int fromAddrSize = sizeof(fromAddr);
int32_t ret = recvfrom(reader->getSocket(), reader->getInPacketBuffer(), reader->getInPacketBufferLength(),
0, /* no flags */
#ifdef _WIN32
(SOCKADDR *)&fromAddr,
&fromAddrSize
#else
(struct sockaddr *)&fromAddr,
(socklen_t *)&fromAddrSize
#endif
);
if (ret == -1) {
#if _WIN32
int _error = WSAGetLastError();
if (_error != 10035)
std::cerr << "Socket error" << _error << std::endl;
#else
perror("frameReceiver");
#endif
return -1;
} else {
const uint8_t *inBuffer = reader->getInPacketBuffer();
RTPGeneric::GenericFrame *frame = RTPGeneric::createGenericFrame();
frame->marker = (inBuffer[1] & 0x80) ? 1 : 0;
frame->rtp_payload = (inBuffer[1] & 0x7f);
frame->rtp_sequence = ntohs(*(uint16_t *)&inBuffer[2]);
frame->rtp_timestamp = ntohl(*(uint32_t *)&inBuffer[4]);
frame->rtp_ssrc = ntohl(*(uint32_t *)&inBuffer[8]);
frame->data = new uint8_t[ret - 12];
frame->dataLen = ret - 12;
memcpy(frame->data, &inBuffer[12], frame->dataLen);
reader->addOutgoingFrame(frame);
}
}
std::cerr << "THREAD EXITING!!" << std::endl;
}
RTPReader::RTPReader(std::string srcAddr, int srcPort):
RTPConnection(true),
srcAddr_(srcAddr),
srcPort_(srcPort),
active_(false)
{
}
RTPReader::~RTPReader()
{
active_ = false;
runner_->join();
// TODO how to stop thread???
// private global variable set here from true to false and thread exist???
//
/* runner_.std::thread::~thread(); */
delete inPacketBuffer_;
/* delete runner_; */
}
int RTPReader::start()
{
// TODO open socket
if ((socket_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("RTPConnection::open");
return RTP_SOCKET_ERROR;
}
sockaddr_in addrIn_;
memset(&addrIn_, 0, sizeof(addrIn_));
addrIn_.sin_family = AF_INET;
addrIn_.sin_addr.s_addr = htonl(INADDR_ANY);
addrIn_.sin_port = htons(srcPort_);
if (bind(socket_, (struct sockaddr *) &addrIn_, sizeof(addrIn_)) < 0) {
perror("RTPConnection::open");
return RTP_BIND_ERROR;
}
inPacketBuffer_ = new uint8_t[MAX_PACKET];
inPacketBufferLen_ = MAX_PACKET;
active_ = true;
id_ = rtpGetUniqueId();
runner_ = new std::thread(frameReceiver, this);
return 0;
}
RTPGeneric::GenericFrame *RTPReader::pullFrame()
{
while (framesOut_.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
framesMtx_.lock();
auto nextFrame = framesOut_.front();
framesOut_.erase(framesOut_.begin());
framesMtx_.unlock();
return nextFrame;
}
bool RTPReader::active()
{
return active_;
}
uint8_t *RTPReader::getInPacketBuffer() const
{
return inPacketBuffer_;
}
uint32_t RTPReader::getInPacketBufferLength() const
{
return inPacketBufferLen_;
}
void RTPReader::addOutgoingFrame(RTPGeneric::GenericFrame *frame)
{
if (!frame)
return;
framesOut_.push_back(frame);
}

42
src/reader.hh Normal file
View File

@ -0,0 +1,42 @@
#pragma once
#include <thread>
#include "conn.hh"
#include "rtp_generic.hh"
class RTPReader : public RTPConnection {
public:
RTPReader(std::string srcAddr, int srcPort);
~RTPReader();
// TODO
RTPGeneric::GenericFrame *pullFrame();
// open socket and start runner_
int start();
bool active();
uint8_t *getInPacketBuffer() const;
uint32_t getInPacketBufferLength() const;
void addOutgoingFrame(RTPGeneric::GenericFrame *frame);
private:
// TODO implement ring buffer
bool active_;
// connection-related stuff
std::string srcAddr_;
int srcPort_;
// receiver thread related stuff
std::thread *runner_;
uint8_t *inPacketBuffer_; /* Buffer for incoming packet (MAX_PACKET) */
uint32_t inPacketBufferLen_;
std::vector<RTPGeneric::GenericFrame *> framesOut_;
std::mutex framesMtx_;
};

View File

@ -10,6 +10,7 @@
#include "rtp_generic.hh"
#include "conn.hh"
#include "writer.hh"
RTPGeneric::GenericFrame *RTPGeneric::createGenericFrame()
{
@ -76,10 +77,13 @@ int RTPGeneric::pushGenericFrame(RTPConnection *conn, RTPGeneric::GenericFrame *
if (frame->data)
memcpy(&buffer[12], frame->data, frame->dataLen);
sockaddr_in outAddr = conn->getOutAddress();
RTPWriter *writer = dynamic_cast<RTPWriter *>(conn);
sockaddr_in outAddr = writer->getOutAddress();
if (sendto(conn->getSocket(), buffer, frame->dataLen + 12, 0, (struct sockaddr *)&outAddr, sizeof(outAddr)) == -1)
if (sendto(conn->getSocket(), buffer, frame->dataLen + 12, 0, (struct sockaddr *)&outAddr, sizeof(outAddr)) == -1) {
perror("pushGenericFrame");
return -errno;
}
conn->incRTPSequence(1);
frame->rtp_sequence++;
@ -103,7 +107,7 @@ int RTPGeneric::pushGenericFrame(RTPConnection *conn, uint8_t *data, uint32_t da
uint8_t buffer[MAX_PACKET] = { 0 };
buffer[0] = 2 << 6; // RTP version
buffer[1] = (conn->getPayload() & 0x7f) | (0 << 7);
buffer[1] = (conn->getPayloadType() & 0x7f) | (0 << 7);
*(uint16_t *)&buffer[2] = htons(conn->getSequence());
*(uint32_t *)&buffer[4] = htonl(conn->getTimestamp());
@ -111,10 +115,13 @@ int RTPGeneric::pushGenericFrame(RTPConnection *conn, uint8_t *data, uint32_t da
memcpy(&buffer[12], data, dataLen);
sockaddr_in outAddr = conn->getOutAddress();
RTPWriter *writer = dynamic_cast<RTPWriter *>(conn);
sockaddr_in outAddr = writer->getOutAddress();
if (sendto(conn->getSocket(), buffer, dataLen + 12, 0, (struct sockaddr *)&outAddr, sizeof(outAddr)) == -1)
if (sendto(conn->getSocket(), buffer, dataLen + 12, 0, (struct sockaddr *)&outAddr, sizeof(outAddr)) == -1) {
perror("pushGenericFrame");
return -errno;
}
conn->incRTPSequence(1);

View File

@ -14,7 +14,7 @@ int RTPOpus::pushOpusFrame(RTPConnection *conn, uint8_t *data, uint32_t dataLen,
(config->channels > 1 ? (1 << 2) : 0) | 0;
memcpy(&buffer[1], data, dataLen);
conn->setPayload(RTP_FORMAT_OPUS);
conn->setPayloadType(RTP_FORMAT_OPUS);
return RTPGeneric::pushGenericFrame(conn, buffer, dataLen + 1, 0);
}

View File

@ -14,52 +14,3 @@ uint64_t rtpGetUniqueId()
static uint64_t i = 1;
return i++;
}
int rtpRecvData(RTPConnection *conn)
{
sockaddr_in fromAddr;
std::cerr << "starting to listen to address and port" << std::endl;
while (1) {
int fromAddrSize = sizeof(fromAddr);
int32_t ret = recvfrom(conn->getSocket(), conn->getInPacketBuffer(), conn->getInPacketBufferLength(),
0, /* no flags */
#ifdef _WIN32
(SOCKADDR *)&fromAddr,
&fromAddrSize
#else
(struct sockaddr *)&fromAddr,
(socklen_t *)&fromAddrSize
#endif
);
if (ret == -1) {
#if _WIN32
int _error = WSAGetLastError();
if (_error != 10035)
std::cerr << "Socket error" << _error << std::endl;
#else
perror("rtpRecvData:");
#endif
return -1;
} else {
const uint8_t *inBuffer = conn->getInPacketBuffer();
RTPGeneric::GenericFrame *frame = RTPGeneric::createGenericFrame();
frame->marker = (inBuffer[1] & 0x80) ? 1 : 0;
frame->rtp_payload = (inBuffer[1] & 0x7f);
frame->rtp_sequence = ntohs(*(uint16_t *)&inBuffer[2]);
frame->rtp_timestamp = ntohl(*(uint32_t *)&inBuffer[4]);
frame->rtp_ssrc = ntohl(*(uint32_t *)&inBuffer[8]);
frame->data = new uint8_t[ret - 12];
frame->dataLen = ret - 12;
memcpy(frame->data, &inBuffer[12], frame->dataLen);
conn->addOutgoingFrame(frame);
}
}
}

54
src/writer.cc Normal file
View File

@ -0,0 +1,54 @@
#include <arpa/inet.h>
#include <cstring>
#include <iostream>
#include "writer.hh"
#include "rtp_opus.hh"
#include "rtp_hevc.hh"
#include "rtp_generic.hh"
RTPWriter::RTPWriter(std::string dstAddr, int dstPort):
RTPConnection(false),
dstAddr_(dstAddr),
dstPort_(dstPort)
{
}
RTPWriter::~RTPWriter()
{
}
int RTPWriter::start()
{
if ((socket_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("RTPConnection::open");
return RTP_SOCKET_ERROR;
}
memset(&addrOut_, 0, sizeof(addrOut_));
addrOut_.sin_family = AF_INET;
inet_pton(AF_INET, dstAddr_.c_str(), &addrOut_.sin_addr);
addrOut_.sin_port = htons(dstPort_);
id_ = rtpGetUniqueId();
return 0;
}
int RTPWriter::pushFrame(uint8_t *data, uint32_t datalen, rtp_format_t fmt, uint32_t timestamp)
{
switch (fmt) {
case RTP_FORMAT_HEVC:
return RTPHevc::pushHevcFrame(this, data, datalen, timestamp);
case RTP_FORMAT_OPUS:
return RTPOpus::pushOpusFrame(this, data, datalen, timestamp);
default:
return RTPGeneric::pushGenericFrame(this, data, datalen, timestamp);
}
}
sockaddr_in RTPWriter::getOutAddress()
{
return addrOut_;
}

28
src/writer.hh Normal file
View File

@ -0,0 +1,28 @@
#pragma once
#include <stdint.h>
#include "conn.hh"
class RTPWriter : public RTPConnection {
public:
RTPWriter(std::string dstAddr, int dstPort);
~RTPWriter();
// open socket for sending frames
int start();
// TODO
int pushFrame(uint8_t *data, uint32_t datalen, rtp_format_t fmt, uint32_t timestamp);
// TODO
int pushGenericFrameFrame(RTPGeneric::GenericFrame *frame);
sockaddr_in getOutAddress();
private:
std::string dstAddr_;
int dstPort_;
sockaddr_in addrOut_;
};