Initial commit

This commit is contained in:
Aaro Altonen 2020-06-10 08:45:42 +03:00
commit 7151dbab95
48 changed files with 4362 additions and 0 deletions

25
COPYING Normal file
View File

@ -0,0 +1,25 @@
BSD 2-Clause License
Copyright (c) 2020, Tampere University
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

73
Makefile Normal file
View File

@ -0,0 +1,73 @@
.PHONY: all clean
CXX = g++
CXXFLAGS = -Wall -Wextra -O2 -std=c++11 -g
uvgrtp_receiver:
$(CXX) $(CXXFLAGS) -o uvgrtp/receiver uvgrtp/receiver.cc util/util.cc -luvgrtp -lkvazaar -lpthread
uvgrtp_sender:
$(CXX) $(CXXFLAGS) -o uvgrtp/sender uvgrtp/sender.cc util/util.cc -luvgrtp -lkvazaar -lpthread
uvgrtp_latency_sender:
$(CXX) $(CXXFLAGS) -o uvgrtp/latency_sender uvgrtp/latency_sender.cc util/util.cc -luvgrtp -lkvazaar -lpthread
uvgrtp_latency_receiver:
$(CXX) $(CXXFLAGS) -o uvgrtp/latency_receiver uvgrtp/latency_receiver.cc util/util.cc -luvgrtp -lkvazaar -lpthread
ffmpeg_sender:
$(CXX) $(CXXFLAGS) -Wno-unused -Wno-deprecated-declarations -Wno-unused-result -o ffmpeg/sender \
ffmpeg/sender.cc util/util.cc -lkvazaar `pkg-config --libs libavformat` -lpthread
ffmpeg_receiver:
$(CXX) $(CXXFLAGS) -Wno-unused -Wno-deprecated-declarations -Wno-unused-result -o ffmpeg/receiver \
ffmpeg/receiver.cc util/util.cc -lkvazaar -lavformat -lavcodec -lswscale -lz -lavutil -lpthread
ffmpeg_latency_sender:
$(CXX) $(CXXFLAGS) -Wno-unused -Wno-deprecated-declarations -Wno-unused-result -o ffmpeg/latency_sender \
ffmpeg/latency_sender.cc util/util.cc -lkvazaar `pkg-config --libs libavformat` -lpthread
ffmpeg_latency_receiver:
$(CXX) $(CXXFLAGS) -Wno-unused -Wno-deprecated-declarations -Wno-unused-result -o ffmpeg/latency_receiver \
ffmpeg/latency_receiver.cc util/util.cc -lkvazaar -lavformat -lavcodec -lswscale -lz -lavutil -lpthread
live555_sender:
$(CXX) $(CXXFLAGS) live555/sender.cc live555/source.cc util/util.cc -o live555/sender \
-I /usr/local/include/liveMedia \
-I /usr/local/include/groupsock \
-I /usr/local/include/BasicUsageEnvironment \
-I /usr/local/include/UsageEnvironment \
-lpthread -lliveMedia -lgroupsock -lBasicUsageEnvironment \
-lUsageEnvironment -lcrypto -lssl
live555_receiver:
$(CXX) $(CXXFLAGS) live555/receiver.cc live555/sink.cc util/util.cc -o live555/receiver \
-I /usr/local/include/liveMedia \
-I /usr/local/include/groupsock \
-I /usr/local/include/BasicUsageEnvironment \
-I /usr/local/include/UsageEnvironment \
-lkvazaar -lpthread -lliveMedia -lgroupsock -lBasicUsageEnvironment \
-lUsageEnvironment -lcrypto -lssl
live555_latency_sender:
$(CXX) $(CXXFLAGS) live555/latency_sender.cc util/util.cc -o live555/latency_sender \
-I /usr/local/include/liveMedia \
-I /usr/local/include/groupsock \
-I /usr/local/include/BasicUsageEnvironment \
-I /usr/local/include/UsageEnvironment \
-lkvazaar -lpthread -lliveMedia -lgroupsock -lBasicUsageEnvironment \
-lUsageEnvironment -lcrypto -lssl
live555_latency_receiver:
$(CXX) $(CXXFLAGS) live555/latency_receiver.cc -o live555/latency_receiver \
-I /usr/local/include/liveMedia \
-I /usr/local/include/groupsock \
-I /usr/local/include/BasicUsageEnvironment \
-I /usr/local/include/UsageEnvironment \
-lkvazaar -lpthread -lliveMedia -lgroupsock -lBasicUsageEnvironment \
-lUsageEnvironment -lcrypto -lssl
clean:
rm -f uvgrtp/receiver uvgrtp/sender uvgrtp/latency_sender uvgrtp/latency_receiver \
ffmpeg/receiver ffmpeg/sender ffmpeg/latency_sender ffmpeg/latency_receiver \
live555/receiver live555/sender live555/latency

126
README.md Normal file
View File

@ -0,0 +1,126 @@
# RTP Benchmarks
This repository contains all benchmarking code related to the benchmark of uvgRTP against LIVE555 and FFmpeg.
Directories [uvgrtp](), [ffmpeg](), and [live555]() contain the C++ implementations for RTP (latency) senders and receivers.
Script benchmark.pl can be used to automate the benchmark runs and its usage is described below.
Script parse.pl can be used to parse the output of benchmark runs.
This repository also contains file [udperf.c](udperf.c) which can be used to test the throughput of a network and it was used for the paper to determine the upper limit for UDP traffic using 1500-byte Ethernet frames.
## Running the benchmarks
### Example 1
Benchmark uvgRTP's send goodput. Run the benchmark configuration with 8 different thread settings
so first starting with 8 threads, then 7 threads then 6 etc.
Each thread configuration will test all FPS values between the range 30 - 480 and and each FPS
is tested 20 times. FPS is doubled so the tested values are: 30, 60, 120, 480
Each FPS value for each thread configuration provides one log file
Sender
```
./benchmark.pl \
--lib uvgrtp \
--role send \
--addr 127.0.0.1 \
--port 9999 \
--threads 3 \
--start 30 \
--end 480 \
--iter 20
```
Receiver
```
./benchmark.pl \
--lib uvgrtp \
--role recv \
--use-nc \
--addr 127.0.0.1 \
--port 9999 \
--threads 3 \
--start 30 \
--end 480 \
--iter 20
```
### Example 2
Benchmark uvgRTP's send goodput using netcat
Sender
```
./benchmark.pl \
--lib uvgrtp \
--role send \
--use-nc \
--addr 127.0.0.1 \
--port 9999 \
--threads 3 \
--start 30 \
--end 60 \
```
Receiver
```
./benchmark.pl \
--lib uvgrtp \
--role recv \
--use-nc \
--addr 127.0.0.1 \
--port 9999 \
--threads 3 \
--start 30 \
--end 60 \
```
## Parsing the benchmark results
If the log file matches the pattern `.*(send|recv).*(\d+)threads.*(\d+)fps.*(\d+)iter.*` you don't
have to provide `--role` `--threads` or `--iter`
### Parsing one output file
#### Parse one benchmark, generic file name
```
./parse.pl \
--lib ffmpeg \
--role recv \
--path log_file \
--threads 3
--iter 20
```
#### Parse one benchmark, output file generated by benchmark.pl
```
./parse.pl --path results/uvgrtp/send_results_4threads_240fps_10iter
```
### Parsing multiple output files
NB: path must point to a directory!
NB2: `--iter` needn't be provided if file names follow the pattern defined above
#### Find best configuration
Find the best configurations for maximizing single-thread performance and total performance
where frame loss is less than 2%
```
./parse.pl --path results/uvgrtp/all --iter 10 --parse=best --frame-loss=2
```
#### Output goodput/frame loss values to a CSV file
```
./parse.pl --path results/uvgrtp/all --parse=csv
```

242
benchmark.pl Executable file
View File

@ -0,0 +1,242 @@
#!/usr/bin/env perl
use warnings;
use strict;
use IO::Socket;
use IO::Socket::INET;
use Getopt::Long;
$| = 1; # autoflush
my $DEFAULT_ADDR = "10.21.25.200";
my $DEFAULT_PORT = 9999;
sub clamp {
my ($start, $end) = @_;
my @clamped = (0, 0);
$clamped[0] = $start < 30 ? 30 : $start;
$clamped[1] = $end > 5000 ? 5000 : $end;
return @clamped;
}
sub mk_ssock {
my $s = IO::Socket::INET->new(
LocalAddr => $_[0],
LocalPort => $_[1],
Proto => "tcp",
Type => SOCK_STREAM,
Listen => 1,
) or die "Couldn't connect to $_[0]:$_[1]: $@\n";
return $s;
}
sub mk_rsock {
my $s = IO::Socket::INET->new(
PeerAddr => $_[0],
PeerPort => $_[1],
Proto => "tcp",
Type => SOCK_STREAM,
Timeout => 1,
) or die "Couldn't connect to $_[0]:$_[1]: $@\n";
return $s;
}
sub send_benchmark {
my ($lib, $addr, $port, $iter, $threads, $gen_recv, $mode, $e, @fps_vals) = @_;
my ($socket, $remote, $data);
my @execs = split ",", $e;
$socket = mk_ssock($addr, $port);
$remote = $socket->accept();
foreach (@execs) {
my $exec = $_;
foreach ((1 .. $threads)) {
my $thread = $_;
foreach (@fps_vals) {
my $fps = $_;
my $logname = "send_results_$thread" . "threads_$fps". "fps_$iter" . "iter_$exec";
for ((1 .. $iter)) {
$remote->recv($data, 16);
system ("time ./$lib/$exec $addr $thread $fps $mode >> $lib/results/$logname 2>&1");
$remote->send("end") if $gen_recv;
}
}
}
}
}
sub recv_benchmark {
my ($lib, $addr, $port, $iter, $threads, $e, @fps_vals) = @_;
my $socket = mk_rsock($addr, $port);
my @execs = split ",", $e;
foreach (@execs) {
my $exec = $_;
foreach ((1 .. $threads)) {
my $thread = $_;
foreach (@fps_vals) {
my $logname = "recv_results_$thread" . "threads_$_". "fps_$iter" . "iter_$exec";
for ((1 .. $iter)) {
$socket->send("start");
system ("time ./$lib/receiver $addr $thread >> $lib/results/$logname 2>&1");
}
}
}
}
}
# use netcat to capture the stream
sub recv_generic {
my ($lib, $addr, $port, $iter, $threads, @fps_vals) = @_;
# my ($sfps, $efps) = clamp($start, $end);
my $socket = mk_rsock($addr, $port);
my $ports = "";
# spawn N netcats using gnu parallel, send message to sender to start sending,
# wait for message from sender that all the packets have been sent, sleep a tiny bit
# move receiver output from separate files to one common file and proceed to next iteration
$ports .= (8888 + $_ * 2) . " " for ((0 .. $threads - 1));
while ($threads ne 0) {
foreach (@fps_vals) {
my $logname = "recv_results_$threads" . "threads_$_". "fps";
system "parallel --files nc -kluvw 0 $addr ::: $ports &";
$socket->send("start");
$socket->recv(my $data, 16);
sleep 1;
system "killall nc";
open my $fhz, '>>', "$lib/results/$logname";
opendir my $dir, "/tmp";
foreach my $of (grep (/par.+\.par/i, readdir $dir)) {
print $fhz -s "/tmp/$of";
print $fhz "\n";
unlink "/tmp/$of";
}
closedir $dir;
}
$threads--;
}
}
sub lat_send {
my ($lib, $addr, $port) = @_;
my ($socket, $remote, $data);
$socket = mk_ssock($addr, $port);
$remote = $socket->accept();
for ((1 .. 100)) {
$remote->recv($data, 16);
system ("./$lib/latency_sender >> $lib/results/latencies 2>&1");
}
}
sub lat_recv {
my ($lib, $addr, $port) = @_;
my $socket = mk_rsock($addr, $port);
for ((1 .. 100)) {
$socket->send("start");
system ("./$lib/latency_receiver 2>&1 >/dev/null");
sleep 2;
}
}
# TODO explain every parameter
sub print_help {
print "usage (benchmark):\n ./benchmark.pl \n"
. "\t--lib <uvgrtp|ffmpeg|live555>\n"
. "\t--role <send|recv>\n"
. "\t--addr <server address>\n"
. "\t--port <server port>\n"
. "\t--threads <# of threads>\n"
. "\t--mode <strict|best-effort>\n"
. "\t--start <start fps>\n"
. "\t--end <end fps>\n\n";
print "usage (latency):\n ./benchmark.pl \n"
. "\t--latency\n"
. "\t--role <send|recv>\n"
. "\t--addr <server address>\n"
. "\t--port <server port>\n"
. "\t--lib <uvgrtp|ffmpeg|live555>\n\n" and exit;
}
GetOptions(
"lib|l=s" => \(my $lib = ""),
"role|r=s" => \(my $role = ""),
"addr|a=s" => \(my $addr = ""),
"port|p=i" => \(my $port = 0),
"iter|i=i" => \(my $iter = 10),
"threads|t=i" => \(my $threads = 1),
"start|s=f" => \(my $start = 0),
"end|e=f" => \(my $end = 0),
"step=i" => \(my $step = 0),
"use-nc" => \(my $nc = 0),
"fps=s" => \(my $fps = ""),
"latency" => \(my $lat = 0),
"mode=s" => \(my $mode = "best-effort"),
"exec=s" => \(my $exec = "default"),
"help" => \(my $help = 0)
) or die "failed to parse command line!\n";
$port = $DEFAULT_PORT if !$port;
$addr = $DEFAULT_ADDR if !$addr;
print_help() if $help or !$lib;
print_help() if ((!$start or !$end) and !$fps) and !$lat;
print_help() if not grep /$mode/, ("strict", "best-effort");
die "not implemented\n" if !grep (/$lib/, ("uvgrtp", "ffmpeg", "live555"));
my @fps_vals = ();
if (!$lat) {
if ($fps) {
@fps_vals = split ",", $fps;
} else {
($start, $end) = clamp($start, $end);
for (my $i = $start; $i <= $end; ) {
push @fps_vals, $i;
if ($step) { $i += $step; }
else { $i *= 2; }
}
}
}
if ($role eq "send") {
if ($lat) {
system "make $lib" . "_latency_sender";
lat_send($lib, $addr, $port);
} else {
if ($exec eq "default") {
system "make $lib" . "_sender";
$exec = "sender";
}
send_benchmark($lib, $addr, $port, $iter, $threads, $nc, $mode, $exec, @fps_vals);
}
} elsif ($role eq "recv" ) {
if ($lat) {
system "make $lib" . "_latency_receiver";
lat_recv($lib, $addr, $port);
} elsif (!$nc) {
if ($exec eq "default") {
system "make $lib" . "_receiver";
$exec = "receiver";
}
recv_benchmark($lib, $addr, $port, $iter, $threads, $exec, @fps_vals);
} else {
recv_generic($lib, $addr, $port, $iter, $threads, @fps_vals);
}
} else {
print "invalid role: '$role'\n" and exit;
}

208
ffmpeg/latency_receiver.cc Normal file
View File

@ -0,0 +1,208 @@
extern "C" {
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include <libavutil/opt.h>
#include <libavutil/channel_layout.h>
#include <libavutil/common.h>
#include <libavutil/imgutils.h>
#include <libavutil/mathematics.h>
#include <libavutil/samplefmt.h>
#include <stdbool.h>
}
#include <chrono>
#include <thread>
#include <string>
#include <atomic>
extern void *get_mem(int argc, char **argv, size_t& len);
#define WIDTH 3840
#define HEIGHT 2160
#define FPS 200
#define SLEEP 8
std::chrono::high_resolution_clock::time_point fs, fe;
std::atomic<bool> received(false);
struct ffmpeg_ctx {
AVFormatContext *sender;
AVFormatContext *receiver;
};
static ffmpeg_ctx *init_ffmpeg(const char *ip)
{
avcodec_register_all();
av_register_all();
avformat_network_init();
av_log_set_level(AV_LOG_PANIC);
ffmpeg_ctx *ctx = new ffmpeg_ctx;
enum AVCodecID codec_id = AV_CODEC_ID_H265;
int i, ret, x, y, got_output;
AVCodecContext *c = NULL;
AVCodec *codec;
AVFrame *frame;
AVPacket pkt;
codec = avcodec_find_encoder(codec_id);
c = avcodec_alloc_context3(codec);
c->width = HEIGHT;
c->height = WIDTH;
c->time_base.num = 1;
c->time_base.den = FPS;
c->pix_fmt = AV_PIX_FMT_YUV420P;
c->codec_type = AVMEDIA_TYPE_VIDEO;
c->flags = AV_CODEC_FLAG_GLOBAL_HEADER;
avcodec_open2(c, codec, NULL);
frame = av_frame_alloc();
frame->format = c->pix_fmt;
frame->width = c->width;
frame->height = c->height;
ret = av_image_alloc(frame->data, frame->linesize, c->width, c->height,
c->pix_fmt, 32);
AVOutputFormat *fmt = av_guess_format("rtp", NULL, NULL);
ret = avformat_alloc_output_context2(&ctx->sender, fmt, fmt->name, "rtp://10.21.25.200:8889");
avio_open(&ctx->sender->pb, ctx->sender->filename, AVIO_FLAG_WRITE);
struct AVStream* stream = avformat_new_stream(ctx->sender, codec);
stream->codecpar->width = WIDTH;
stream->codecpar->height = HEIGHT;
stream->codecpar->codec_id = AV_CODEC_ID_HEVC;
stream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
stream->time_base.num = 1;
stream->time_base.den = FPS;
char buf[256];
AVDictionary *d_s = NULL;
AVDictionary *d_r = NULL;
snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000);
av_dict_set(&d_s, "buffer_size", buf, 32);
/* Flush the underlying I/O stream after each packet.
*
* Default is -1 (auto), which means that the underlying protocol will decide,
* 1 enables it, and has the effect of reducing the latency,
* 0 disables it and may increase IO throughput in some cases. */
snprintf(buf, sizeof(buf), "%d", 1);
av_dict_set(&d_s, "flush_packets", NULL, 32);
/* Set maximum buffering duration for interleaving. The duration is expressed in microseconds,
* and defaults to 10000000 (10 seconds).
*
* To ensure all the streams are interleaved correctly, libavformat will wait until it has
* at least one packet for each stream before actually writing any packets to the output file.
* When some streams are "sparse" (i.e. there are large gaps between successive packets),
* this can result in excessive buffering.
*
* This field specifies the maximum difference between the timestamps of the first and
* the last packet in the muxing queue, above which libavformat will output a packet regardless of
* whether it has queued a packet for all the streams.
*
* If set to 0, libavformat will continue buffering packets until it has a packet for each stream,
* regardless of the maximum timestamp difference between the buffered packets. */
snprintf(buf, sizeof(buf), "%d", 1000);
av_dict_set(&d_s, "max_interleave_delta", buf, 32);
/* avioflags flags (input/output)
*
* Possible values:
* direct
* Reduce buffering. */
snprintf(buf, sizeof(buf), "direct");
av_dict_set(&d_s, "avioflags", buf, 32);
(void)avformat_write_header(ctx->sender, &d_s);
/* When sender has been initialized, initialize receiver */
ctx->receiver = avformat_alloc_context();
int video_stream_index;
av_dict_set(&d_r, "protocol_whitelist", "file,udp,rtp", 0);
/* input buffer size */
snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000);
av_dict_set(&d_r, "buffer_size", buf, 32);
/* avioflags flags (input/output)
*
* Possible values:
* direct
* Reduce buffering. */
snprintf(buf, sizeof(buf), "direct");
av_dict_set(&d_r, "avioflags", buf, 32);
/* Reduce the latency introduced by buffering during initial input streams analysis. */
av_dict_set(&d_r, "nobuffer", NULL, 32);
/* Set probing size in bytes, i.e. the size of the data to analyze to get stream information.
*
* A higher value will enable detecting more information in case it is dispersed into the stream,
* but will increase latency. Must be an integer not lesser than 32. It is 5000000 by default. */
snprintf(buf, sizeof(buf), "%d", 32);
av_dict_set(&d_r, "probesize", buf, 32);
/* Set number of frames used to probe fps. */
snprintf(buf, sizeof(buf), "%d", 2);
av_dict_set(&d_r, "fpsprobesize", buf, 32);
ctx->receiver->flags = AVFMT_FLAG_NONBLOCK;
if (!strcmp(ip, "127.0.0.1"))
snprintf(buf, sizeof(buf), "ffmpeg/sdp/localhost/hevc_0.sdp");
else
snprintf(buf, sizeof(buf), "ffmpeg/sdp/lan/hevc_0.sdp");
if (avformat_open_input(&ctx->receiver, buf, NULL, &d_r) != 0) {
fprintf(stderr, "nothing found!\n");
return NULL;
}
if (avformat_find_stream_info(ctx->receiver, NULL) < 0) {
fprintf(stderr, "stream info not found!\n");
return NULL;
}
/* search video stream */
for (size_t i = 0; i < ctx->receiver->nb_streams; i++) {
if (ctx->receiver->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO)
video_stream_index = i;
}
return ctx;
}
static int receiver(void)
{
AVPacket pkt;
ffmpeg_ctx *ctx;
std::string addr("10.21.25.200");
if (!(ctx = init_ffmpeg(addr.c_str())))
return EXIT_FAILURE;
av_init_packet(&pkt);
av_read_play(ctx->receiver);
while (av_read_frame(ctx->receiver, &pkt) >= 0) {
av_write_frame(ctx->sender, &pkt);
}
return EXIT_SUCCESS;
}
int main(int argc, char **argv)
{
(void)argc, (void)argv;
return receiver();
}

319
ffmpeg/latency_sender.cc Normal file
View File

@ -0,0 +1,319 @@
extern "C" {
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include <libavutil/opt.h>
#include <libavutil/channel_layout.h>
#include <libavutil/common.h>
#include <libavutil/imgutils.h>
#include <libavutil/mathematics.h>
#include <libavutil/samplefmt.h>
#include <stdbool.h>
}
#include <atomic>
#include <deque>
#include <chrono>
#include <string>
#include <thread>
#include <unordered_map>
using namespace std::chrono;
extern void *get_mem(int argc, char **argv, size_t& len);
#define WIDTH 3840
#define HEIGHT 2160
#define FPS 30
#define SLEEP 8
std::chrono::high_resolution_clock::time_point fs, fe;
std::atomic<bool> ready(false);
uint64_t ff_key = 0;
static std::unordered_map<uint64_t, high_resolution_clock::time_point> timestamps;
static std::deque<high_resolution_clock::time_point> timestamps2;
high_resolution_clock::time_point start2;
struct ffmpeg_ctx {
AVFormatContext *sender;
AVFormatContext *receiver;
};
static ffmpeg_ctx *init_ffmpeg(const char *ip)
{
avcodec_register_all();
av_register_all();
avformat_network_init();
av_log_set_level(AV_LOG_PANIC);
ffmpeg_ctx *ctx = new ffmpeg_ctx;
enum AVCodecID codec_id = AV_CODEC_ID_H265;
int i, ret, x, y, got_output;
AVCodecContext *c = NULL;
AVCodec *codec;
AVFrame *frame;
AVPacket pkt;
codec = avcodec_find_encoder(codec_id);
c = avcodec_alloc_context3(codec);
c->width = HEIGHT;
c->height = WIDTH;
c->time_base.num = 1;
c->time_base.den = FPS;
c->pix_fmt = AV_PIX_FMT_YUV420P;
c->codec_type = AVMEDIA_TYPE_VIDEO;
c->flags = AV_CODEC_FLAG_GLOBAL_HEADER;
avcodec_open2(c, codec, NULL);
frame = av_frame_alloc();
frame->format = c->pix_fmt;
frame->width = c->width;
frame->height = c->height;
ret = av_image_alloc(frame->data, frame->linesize, c->width, c->height,
c->pix_fmt, 32);
AVOutputFormat *fmt = av_guess_format("rtp", NULL, NULL);
ret = avformat_alloc_output_context2(&ctx->sender, fmt, fmt->name, "rtp://10.21.25.2:8888");
avio_open(&ctx->sender->pb, ctx->sender->filename, AVIO_FLAG_WRITE);
struct AVStream* stream = avformat_new_stream(ctx->sender, codec);
stream->codecpar->width = WIDTH;
stream->codecpar->height = HEIGHT;
stream->codecpar->codec_id = AV_CODEC_ID_HEVC;
stream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
stream->time_base.num = 1;
stream->time_base.den = FPS;
char buf[256];
AVDictionary *d_s = NULL;
AVDictionary *d_r = NULL;
snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000);
av_dict_set(&d_s, "buffer_size", buf, 32);
#if 1
/* Flush the underlying I/O stream after each packet.
*
* Default is -1 (auto), which means that the underlying protocol will decide,
* 1 enables it, and has the effect of reducing the latency,
* 0 disables it and may increase IO throughput in some cases. */
snprintf(buf, sizeof(buf), "%d", 1);
av_dict_set(&d_s, "flush_packets", NULL, 32);
/* Set maximum buffering duration for interleaving. The duration is expressed in microseconds,
* and defaults to 10000000 (10 seconds).
*
* To ensure all the streams are interleaved correctly, libavformat will wait until it has
* at least one packet for each stream before actually writing any packets to the output file.
* When some streams are "sparse" (i.e. there are large gaps between successive packets),
* this can result in excessive buffering.
*
* This field specifies the maximum difference between the timestamps of the first and
* the last packet in the muxing queue, above which libavformat will output a packet regardless of
* whether it has queued a packet for all the streams.
*
* If set to 0, libavformat will continue buffering packets until it has a packet for each stream,
* regardless of the maximum timestamp difference between the buffered packets. */
snprintf(buf, sizeof(buf), "%d", 1000);
av_dict_set(&d_s, "max_interleave_delta", buf, 32);
/* avioflags flags (input/output)
*
* Possible values:
* direct
* Reduce buffering. */
snprintf(buf, sizeof(buf), "direct");
av_dict_set(&d_s, "avioflags", buf, 32);
#endif
(void)avformat_write_header(ctx->sender, &d_s);
/* When sender has been initialized, initialize receiver */
ctx->receiver = avformat_alloc_context();
int video_stream_index;
av_dict_set(&d_r, "protocol_whitelist", "file,udp,rtp", 0);
/* input buffer size */
snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000);
av_dict_set(&d_r, "buffer_size", buf, 32);
#if 1
/* avioflags flags (input/output)
*
* Possible values:
* direct
* Reduce buffering. */
snprintf(buf, sizeof(buf), "direct");
av_dict_set(&d_r, "avioflags", buf, 32);
/* Reduce the latency introduced by buffering during initial input streams analysis. */
av_dict_set(&d_r, "nobuffer", NULL, 32);
/* Set probing size in bytes, i.e. the size of the data to analyze to get stream information.
*
* A higher value will enable detecting more information in case it is dispersed into the stream,
* but will increase latency. Must be an integer not lesser than 32. It is 5000000 by default. */
snprintf(buf, sizeof(buf), "%d", 32);
av_dict_set(&d_r, "probesize", buf, 32);
/* Set number of frames used to probe fps. */
snprintf(buf, sizeof(buf), "%d", 2);
av_dict_set(&d_r, "fpsprobesize", buf, 32);
#endif
ctx->receiver->flags = AVFMT_FLAG_NONBLOCK;
if (!strcmp(ip, "127.0.0.1"))
snprintf(buf, sizeof(buf), "ffmpeg/sdp/localhost/lat_hevc.sdp");
else
snprintf(buf, sizeof(buf), "ffmpeg/sdp/lan/lat_hevc.sdp");
if (avformat_open_input(&ctx->receiver, buf, NULL, &d_r) != 0) {
fprintf(stderr, "nothing found!\n");
return NULL;
}
for (size_t i = 0; i < ctx->receiver->nb_streams; i++) {
if (ctx->receiver->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO)
video_stream_index = i;
}
return ctx;
}
static void receiver(ffmpeg_ctx *ctx)
{
uint64_t key = 0;
uint64_t diff = 0;
uint64_t frame_total = 0;
uint64_t intra_total = 0;
uint64_t inter_total = 0;
uint64_t frames = 0;
uint64_t intras = 0;
uint64_t inters = 0;
AVPacket packet;
av_init_packet(&packet);
std::chrono::high_resolution_clock::time_point start;
start = std::chrono::high_resolution_clock::now();
/* start reading packets from stream */
av_read_play(ctx->receiver);
while (av_read_frame(ctx->receiver, &packet) >= 0) {
key = packet.size - 1;
if (!frames)
key = ff_key;
auto diff = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - timestamps2.front()
).count();
timestamps2.pop_front();
if (((packet.data[3] >> 1) & 0x3f) == 19)
intra_total += (diff / 1000), intras++;
else if (((packet.data[3] >> 1) & 0x3f) == 1)
inter_total += (diff / 1000), inters++;
if (++frames < 596)
frame_total += (diff / 1000);
else
break;
timestamps.erase(key);
av_free_packet(&packet);
av_init_packet(&packet);
}
fprintf(stderr, "%zu: intra %lf, inter %lf, avg %lf\n",
frames,
intra_total / (float)intras,
inter_total / (float)inters,
frame_total / (float)frames
);
ready = true;
}
static int sender(void)
{
size_t len = 0;
void *mem = get_mem(0, NULL, len);
std::string addr("10.21.25.2");
ffmpeg_ctx *ctx = init_ffmpeg(addr.c_str());
(void)new std::thread(receiver, ctx);
uint64_t chunk_size = 0;
uint64_t diff = 0;
uint64_t counter = 0;
uint64_t total = 0;
uint64_t current = 0;
uint64_t key = 0;
uint64_t period = (uint64_t)((1000 / (float)FPS) * 1000);
AVPacket pkt;
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
for (size_t i = 0; i < len; ) {
memcpy(&chunk_size, (uint8_t *)mem + i, sizeof(uint64_t));
/* Start code lookup/merging of small packets causes the incoming frame size
* to differ quite significantly from "chunk_size" */
if (!i)
ff_key = chunk_size;
i += sizeof(uint64_t);
if (timestamps.find(chunk_size) != timestamps.end()) {
fprintf(stderr, "cannot use %zu for key!\n", chunk_size);
continue;
}
timestamps[chunk_size] = std::chrono::high_resolution_clock::now();
timestamps2.push_back(std::chrono::high_resolution_clock::now());
av_init_packet(&pkt);
pkt.data = (uint8_t *)mem + i;
pkt.size = chunk_size;
av_interleaved_write_frame(ctx->sender, &pkt);
av_packet_unref(&pkt);
auto runtime = (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start
).count();
if (runtime < current * period)
std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime));
current++;
i += chunk_size;
}
while (!ready.load())
;
return 0;
}
int main(int argc, char **argv)
{
(void)argc, (void)argv;
return sender();
}

151
ffmpeg/receiver.cc Normal file
View File

@ -0,0 +1,151 @@
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavformat/avio.h>
#include <libswscale/swscale.h>
}
#include <atomic>
#include <cstdio>
#include <chrono>
#include <thread>
struct thread_info {
size_t pkts;
size_t bytes;
std::chrono::high_resolution_clock::time_point start;
} *thread_info;
std::atomic<int> nready(0);
void thread_func(char *addr, int thread_num)
{
AVFormatContext *format_ctx = avformat_alloc_context();
AVCodecContext *codec_ctx = NULL;
int video_stream_index = 0;
/* register everything */
av_register_all();
avformat_network_init();
av_log_set_level(AV_LOG_PANIC);
/* open rtsp */
AVDictionary *d = NULL;
av_dict_set(&d, "protocol_whitelist", "file,udp,rtp", 0);
char buf[256];
/* input buffer size */
snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000);
av_dict_set(&d, "buffer_size", buf, 32);
#if 1
snprintf(buf, sizeof(buf), "%d", 10000000);
av_dict_set(&d, "max_delay", buf, 32);
snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000);
av_dict_set(&d, "recv_buffer_size", buf, 32);
snprintf(buf, sizeof(buf), "%d", 40 * 1000 * 1000);
av_dict_set(&d, "rcvbuf", buf, 32);
/* avioflags flags (input/output)
*
* Possible values:
* direct
* Reduce buffering. */
snprintf(buf, sizeof(buf), "direct");
av_dict_set(&d, "avioflags", buf, 32);
/* Reduce the latency introduced by buffering during initial input streams analysis. */
av_dict_set(&d, "nobuffer", NULL, 32);
/* Set probing size in bytes, i.e. the size of the data to analyze to get stream information.
*
* A higher value will enable detecting more information in case it is dispersed into the stream,
* but will increase latency. Must be an integer not lesser than 32. It is 5000000 by default. */
snprintf(buf, sizeof(buf), "%d", 32);
av_dict_set(&d, "probesize", buf, 32);
/* Set number of frames used to probe fps. */
snprintf(buf, sizeof(buf), "%d", 2);
av_dict_set(&d, "fpsprobesize", buf, 32);
#endif
if (!strcmp(addr, "127.0.0.1"))
snprintf(buf, sizeof(buf), "ffmpeg/sdp/localhost/hevc_%d.sdp", thread_num / 2);
else
snprintf(buf, sizeof(buf), "ffmpeg/sdp/lan/hevc_%d.sdp", thread_num / 2);
if (avformat_open_input(&format_ctx, buf, NULL, &d)) {
fprintf(stderr, "failed to open input file\n");
nready++;
return;
}
if (avformat_find_stream_info(format_ctx, NULL) < 0) {
fprintf(stderr, "failed to find stream info!\n");
nready++;
return;
}
for (size_t i = 0; i < format_ctx->nb_streams; i++) {
if (format_ctx->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO)
video_stream_index = i;
}
size_t pkts = 0;
size_t size = 0;
AVPacket packet;
av_init_packet(&packet);
std::chrono::high_resolution_clock::time_point start, last;
start = std::chrono::high_resolution_clock::now();
/* start reading packets from stream */
av_read_play(format_ctx);
while (av_read_frame(format_ctx, &packet) >= 0) {
if (packet.stream_index == video_stream_index)
size += packet.size;
av_free_packet(&packet);
av_init_packet(&packet);
if (++pkts == 598)
break;
else
last = std::chrono::high_resolution_clock::now();
}
if (pkts == 598) {
fprintf(stderr, "%zu %zu %zu\n", size, pkts,
std::chrono::duration_cast<std::chrono::milliseconds>(last - start).count()
);
} else {
fprintf(stderr, "discard %zu %zu %zu\n", size, pkts,
std::chrono::duration_cast<std::chrono::milliseconds>(last - start).count()
);
}
av_read_pause(format_ctx);
nready++;
}
int main(int argc, char **argv)
{
if (argc != 3) {
fprintf(stderr, "usage: ./%s <remote address> <number of threads>\n", __FILE__);
return -1;
}
int nthreads = atoi(argv[2]);
thread_info = (struct thread_info *)calloc(nthreads, sizeof(*thread_info));
for (int i = 0; i < nthreads; ++i)
new std::thread(thread_func, argv[1], i * 2);
while (nready.load() != nthreads)
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 10.21.25.200
s=No Name
c=IN IP4 10.21.25.200
t=0 0
m=video 8888 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 10.21.25.200
s=No Name
c=IN IP4 10.21.25.200
t=0 0
m=video 8890 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 10.21.25.200
s=No Name
c=IN IP4 10.21.25.200
t=0 0
m=video 8892 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 10.21.25.200
s=No Name
c=IN IP4 10.21.25.200
t=0 0
m=video 8894 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 10.21.25.200
s=No Name
c=IN IP4 10.21.25.200
t=0 0
m=video 8896 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 10.21.25.200
s=No Name
c=IN IP4 10.21.25.200
t=0 0
m=video 8898 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 10.21.25.200
s=No Name
c=IN IP4 10.21.25.200
t=0 0
m=video 8900 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 10.21.25.200
s=No Name
c=IN IP4 10.21.25.200
t=0 0
m=video 8902 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 10.21.25.200
s=No Name
c=IN IP4 10.21.25.200
t=0 0
m=video 8889 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
m=video 8888 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
m=video 8890 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
m=video 8892 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
m=video 8894 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
m=video 8896 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
m=video 8898 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
m=video 8900 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
m=video 8902 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

View File

@ -0,0 +1,8 @@
v=0
o=user 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
m=video 8889 RTP/AVP 96
a=rtpmap:96 H265/90000
a=recvonly

159
ffmpeg/sender.cc Normal file
View File

@ -0,0 +1,159 @@
extern "C" {
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include <libavutil/opt.h>
#include <libavutil/channel_layout.h>
#include <libavutil/common.h>
#include <libavutil/imgutils.h>
#include <libavutil/mathematics.h>
#include <libavutil/samplefmt.h>
#include <stdbool.h>
}
#include <atomic>
#include <chrono>
#include <thread>
extern void *get_mem(int argc, char **argv, size_t& len);
#define WIDTH 3840
#define HEIGHT 2160
std::atomic<int> nready(0);
void thread_func(void *mem, size_t len, char *addr_, int thread_num, double fps, bool strict)
{
char addr[64] = { 0 };
enum AVCodecID codec_id = AV_CODEC_ID_H265;
AVCodec *codec;
AVCodecContext *c = NULL;
int i, ret, x, y, got_output;
AVFrame *frame;
AVPacket pkt;
codec = avcodec_find_encoder(codec_id);
c = avcodec_alloc_context3(codec);
av_log_set_level(AV_LOG_PANIC);
c->width = HEIGHT;
c->height = WIDTH;
c->time_base.num = 1;
c->time_base.den = fps;
c->pix_fmt = AV_PIX_FMT_YUV420P;
c->codec_type = AVMEDIA_TYPE_VIDEO;
c->flags = AV_CODEC_FLAG_GLOBAL_HEADER;
avcodec_open2(c, codec, NULL);
frame = av_frame_alloc();
frame->format = c->pix_fmt;
frame->width = c->width;
frame->height = c->height;
ret = av_image_alloc(frame->data, frame->linesize, c->width, c->height,
c->pix_fmt, 32);
AVFormatContext* avfctx;
AVOutputFormat* fmt = av_guess_format("rtp", NULL, NULL);
snprintf(addr, 64, "rtp://10.21.25.2:%d", 8888 + thread_num);
ret = avformat_alloc_output_context2(&avfctx, fmt, fmt->name, addr);
avio_open(&avfctx->pb, avfctx->filename, AVIO_FLAG_WRITE);
struct AVStream* stream = avformat_new_stream(avfctx, codec);
/* stream->codecpar->bit_rate = 400000; */
stream->codecpar->width = WIDTH;
stream->codecpar->height = HEIGHT;
stream->codecpar->codec_id = AV_CODEC_ID_HEVC;
stream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
stream->time_base.num = 1;
stream->time_base.den = fps;
(void)avformat_write_header(avfctx, NULL);
uint64_t chunk_size, total_size;
uint64_t fpt_ms = 0;
uint64_t fsize = 0;
uint32_t frames = 0;
uint64_t diff = 0;
uint64_t current = 0;
uint64_t period = (uint64_t)((1000 / (float)fps) * 1000);
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
for (size_t rounds = 0; rounds < 1; ++rounds) {
for (size_t i = 0; i < len; ) {
memcpy(&chunk_size, (uint8_t *)mem + i, sizeof(uint64_t));
i += sizeof(uint64_t);
total_size += chunk_size;
av_init_packet(&pkt);
pkt.data = (uint8_t *)mem + i;
pkt.size = chunk_size;
av_interleaved_write_frame(avfctx, &pkt);
av_packet_unref(&pkt);
auto runtime = (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start
).count();
if (runtime < current * period)
std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime));
frames++;
current++;
i += chunk_size;
fsize += chunk_size;
}
}
diff = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count();
fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n",
fsize, fsize / 1000, fsize / 1000 / 1000,
diff, diff / 1000
);
end:
nready++;
avcodec_close(c);
av_free(c);
av_freep(&frame->data[0]);
av_frame_free(&frame);
}
int main(int argc, char **argv)
{
if (argc != 5) {
fprintf(stderr, "usage: ./%s <remote address> <number of threads> <fps> <mode>\n", __FILE__);
return -1;
}
avcodec_register_all();
av_register_all();
avformat_network_init();
size_t len = 0;
void *mem = get_mem(0, NULL, len);
int nthreads = atoi(argv[2]);
bool strict = !strcmp(argv[4], "strict");
std::thread **threads = (std::thread **)malloc(sizeof(std::thread *) * nthreads);
for (int i = 0; i < nthreads; ++i)
threads[i] = new std::thread(thread_func, mem, len, argv[1], i * 2, atof(argv[3]), strict);
while (nready.load() != nthreads)
std::this_thread::sleep_for(std::chrono::milliseconds(20));
for (int i = 0; i < nthreads; ++i) {
threads[i]->join();
delete threads[i];
}
free(threads);
}

88
live555/latency.cc Normal file
View File

@ -0,0 +1,88 @@
#include <liveMedia/liveMedia.hh>
#include <BasicUsageEnvironment.hh>
#include <GroupsockHelper.hh>
#include <thread>
#include <mutex>
#include "latsink.hh"
#include "latsource.hh"
#include "sink.hh"
#include "source.hh"
static int receiver(char *addr)
{
(void)addr;
TaskScheduler *scheduler = BasicTaskScheduler::createNew();
UsageEnvironment *env = BasicUsageEnvironment::createNew(*scheduler);
Port rtpPort(8888);
struct in_addr dst_addr;
dst_addr.s_addr = our_inet_addr("0.0.0.0");
Groupsock rtpGroupsock(*env, dst_addr, rtpPort, 255);
OutPacketBuffer::maxSize = 40 * 1000 * 1000;
RTPSource *source = H265VideoRTPSource::createNew(*env, &rtpGroupsock, 96);
RTPLatencySink *sink = new RTPLatencySink(*env);
sink->startPlaying(*source, nullptr, nullptr);
env->taskScheduler().doEventLoop();
return 0;
}
static int sender(char *addr)
{
(void)addr;
std::mutex lat_mtx;
H265VideoStreamDiscreteFramer *framer;
H265LatencyFramedSource *framedSource;
TaskScheduler *scheduler;
UsageEnvironment *env;
RTPSink *videoSink;
RTPSource *source;
RTPSink_ *sink;
scheduler = BasicTaskScheduler::createNew();
env = BasicUsageEnvironment::createNew(*scheduler);
OutPacketBuffer::maxSize = 40 * 1000 * 1000;
Port send_port(8889);
struct in_addr dst_addr;
dst_addr.s_addr = our_inet_addr("127.0.0.1");
Port recv_port(8888);
struct in_addr src_addr;
src_addr.s_addr = our_inet_addr("127.0.0.1");
Groupsock send_socket(*env, dst_addr, send_port, 255);
Groupsock recv_socket(*env, src_addr, recv_port, 255);
/* sender */
videoSink = H265VideoRTPSink::createNew(*env, &send_socket, 96);
framedSource = H265LatencyFramedSource::createNew(*env, lat_mtx);
framer = H265VideoStreamDiscreteFramer::createNew(*env, framedSource);
/* receiver */
source = H265VideoRTPSource::createNew(*env, &recv_socket, 96);
sink = new RTPSink_(*env);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
videoSink->startPlaying(*framer, NULL, videoSink);
sink->startPlaying(*source, nullptr, nullptr);
env->taskScheduler().doEventLoop();
return 0;
}
int main(int argc, char **argv)
{
if (argc != 3) {
fprintf(stderr, "usage: ./%s <send|recv> <ip>\n", __FILE__);
exit(EXIT_FAILURE);
}
return !strcmp(argv[1], "send") ? sender(argv[2]) : receiver(argv[2]);
}

255
live555/latency_receiver.cc Normal file
View File

@ -0,0 +1,255 @@
#include <BasicUsageEnvironment.hh>
#include <FramedSource.hh>
#include <GroupsockHelper.hh>
#include <liveMedia/liveMedia.hh>
#include <RTPInterface.hh>
#include <chrono>
#include <climits>
#include <mutex>
#include <thread>
#include "latsink.hh"
#include "source.hh"
#define BUFFER_SIZE 40 * 1000 * 1000
EventTriggerId H265FramedSource::eventTriggerId = 0;
unsigned H265FramedSource::referenceCount = 0;
H265FramedSource *framedSource;
static size_t frames = 0;
static size_t bytes = 0;
static uint8_t *nal_ptr = nullptr;
static size_t nal_size = 0;
static std::mutex lat_mtx;
static std::chrono::high_resolution_clock::time_point start;
static std::chrono::high_resolution_clock::time_point last;
static uint8_t *buf;
static size_t offset = 0;
/* static size_t bytes = 0; */
static uint64_t current = 0;
static uint64_t period = 0;
std::chrono::high_resolution_clock::time_point s_tmr, e_tmr;
static void thread_func(void)
{
unsigned prev_frames = UINT_MAX;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
if (prev_frames == frames)
break;
prev_frames = frames;
}
fprintf(stderr, "%zu %zu %lu\n", bytes, frames,
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count()
);
exit(EXIT_FAILURE);
}
RTPLatencySink::RTPLatencySink(UsageEnvironment& env):
MediaSink(env)
{
fReceiveBuffer = new uint8_t[BUFFER_SIZE];
}
RTPLatencySink::~RTPLatencySink()
{
}
void RTPLatencySink::uninit()
{
stopPlaying();
delete fReceiveBuffer;
}
void RTPLatencySink::afterGettingFrame(
void *clientData,
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
)
{
((RTPLatencySink *)clientData)->afterGettingFrame(
frameSize,
numTruncatedBytes,
presentationTime,
durationInMicroseconds
);
}
void RTPLatencySink::afterGettingFrame(
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
)
{
(void)frameSize, (void)numTruncatedBytes;
(void)presentationTime, (void)durationInMicroseconds;
/* start loop that monitors activity and if there has been
* no activity for 2s (same as uvgRTP) the receiver is stopped) */
if (!frames)
(void)new std::thread(thread_func);
//fprintf(stderr, "got frame %zu %zu\n", frames + 1, frameSize);
nal_ptr = fReceiveBuffer;
nal_size = frameSize;
lat_mtx.unlock();
framedSource->deliver_frame();
if (++frames == 602) {
fprintf(stderr, "%zu %zu %lu\n", bytes, frames,
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count()
);
exit(EXIT_SUCCESS);
}
continuePlaying();
}
void RTPLatencySink::process()
{
}
Boolean RTPLatencySink::continuePlaying()
{
if (!fSource)
return False;
fSource->getNextFrame(
fReceiveBuffer,
BUFFER_SIZE,
afterGettingFrame,
this,
onSourceClosure,
this
);
return True;
}
H265FramedSource *H265FramedSource::createNew(UsageEnvironment& env, unsigned fps)
{
return new H265FramedSource(env, fps);
}
H265FramedSource::H265FramedSource(UsageEnvironment& env, unsigned fps):
FramedSource(env),
fps_(fps)
{
period = (uint64_t)((1000 / fps) * 1000);
if (!eventTriggerId)
eventTriggerId = envir().taskScheduler().createEventTrigger(deliverFrame0);
}
void H265FramedSource::deliver_frame()
{
deliverFrame();
}
H265FramedSource::~H265FramedSource()
{
if (!--referenceCount) {
envir().taskScheduler().deleteEventTrigger(eventTriggerId);
eventTriggerId = 0;
}
}
void H265FramedSource::doGetNextFrame()
{
deliverFrame();
}
void H265FramedSource::deliverFrame0(void *clientData)
{
((H265FramedSource *)clientData)->deliverFrame();
}
void H265FramedSource::deliverFrame()
{
if (!isCurrentlyAwaitingData())
return;
if (!lat_mtx.try_lock())
return;
uint8_t *newFrameDataStart = nal_ptr;
unsigned newFrameSize = nal_size;
bytes += newFrameSize;
if (newFrameSize > fMaxSize) {
fFrameSize = fMaxSize;
fNumTruncatedBytes = newFrameSize - fMaxSize;
} else {
fFrameSize = newFrameSize;
}
fDurationInMicroseconds = 0;
memmove(fTo, newFrameDataStart, fFrameSize);
FramedSource::afterGetting(this);
}
static int receiver(void)
{
H265VideoStreamDiscreteFramer *framer;
TaskScheduler *scheduler;
UsageEnvironment *env;
RTPLatencySink *sink;
struct in_addr addr;
RTPSink *videoSink;
RTPSource *source;
scheduler = BasicTaskScheduler::createNew();
env = BasicUsageEnvironment::createNew(*scheduler);
OutPacketBuffer::maxSize = 40 * 1000 * 1000;
lat_mtx.lock();
/* receiver */
addr.s_addr = our_inet_addr("0.0.0.0");
Groupsock recv_sock(*env, addr, Port(8888), 255);
source = H265VideoRTPSource::createNew(*env, &recv_sock, 96);
sink = new RTPLatencySink(*env);
/* sender */
addr.s_addr = our_inet_addr("10.21.25.200");
Groupsock send_socket(*env, addr, Port(8889), 255);
framedSource = H265FramedSource::createNew(*env, 30);
framer = H265VideoStreamDiscreteFramer::createNew(*env, framedSource);
videoSink = H265VideoRTPSink::createNew(*env, &send_socket, 96);
videoSink->startPlaying(*framer, NULL, videoSink);
sink->startPlaying(*source, nullptr, nullptr);
env->taskScheduler().doEventLoop();
return 0;
}
int main(int argc, char **argv)
{
(void)argc, (void)argv;
return receiver();
}

392
live555/latency_sender.cc Normal file
View File

@ -0,0 +1,392 @@
#include <BasicUsageEnvironment.hh>
#include <FramedSource.hh>
#include <GroupsockHelper.hh>
#include <liveMedia/liveMedia.hh>
#include <RTPInterface.hh>
#include <chrono>
#include <climits>
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_map>
#include "latsource.hh"
#include "sink.hh"
using namespace std::chrono;
#define BUFFER_SIZE 40 * 1000 * 1000
#define KEY(frame, size) ((((frame) % 64) << 26) + (size))
EventTriggerId H265LatencyFramedSource::eventTriggerId = 0;
unsigned H265LatencyFramedSource::referenceCount = 0;
extern void *get_mem(int, char **, size_t&);
extern int get_next_frame_start(uint8_t *, uint32_t, uint32_t, uint8_t&);
static uint64_t current = 0;
static uint64_t period = 0;
static bool initialized = false;
static size_t frames = 0;
static size_t nintras = 0;
static size_t ninters = 0;
static size_t intra_total = 0;
static size_t inter_total = 0;
static size_t frame_total = 0;
static std::mutex lat_mtx;
static std::queue<std::pair<size_t, uint8_t *>> nals;
static high_resolution_clock::time_point s_tmr, start;
typedef std::pair<high_resolution_clock::time_point, size_t> finfo;
static std::unordered_map<uint64_t, finfo> timestamps;
static const uint8_t *ff_avc_find_startcode_internal(const uint8_t *p, const uint8_t *end)
{
const uint8_t *a = p + 4 - ((intptr_t)p & 3);
for (end -= 3; p < a && p < end; p++) {
if (p[0] == 0 && p[1] == 0 && p[2] == 1)
return p;
}
for (end -= 3; p < end; p += 4) {
uint32_t x = *(const uint32_t*)p;
// if ((x - 0x01000100) & (~x) & 0x80008000) // little endian
// if ((x - 0x00010001) & (~x) & 0x00800080) // big endian
if ((x - 0x01010101) & (~x) & 0x80808080) { // generic
if (p[1] == 0) {
if (p[0] == 0 && p[2] == 1)
return p;
if (p[2] == 0 && p[3] == 1)
return p+1;
}
if (p[3] == 0) {
if (p[2] == 0 && p[4] == 1)
return p+2;
if (p[4] == 0 && p[5] == 1)
return p+3;
}
}
}
for (end += 3; p < end; p++) {
if (p[0] == 0 && p[1] == 0 && p[2] == 1)
return p;
}
return end + 3;
}
const uint8_t *ff_avc_find_startcode(const uint8_t *p, const uint8_t *end)
{
const uint8_t *out= ff_avc_find_startcode_internal(p, end);
if (p < out && out < end && !out[-1]) out--;
return out;
}
static std::pair<size_t, uint8_t *> find_next_nal(void)
{
static size_t len = 0;
static uint8_t *p = NULL;
static uint8_t *end = NULL;
static uint8_t *nal_start = NULL;
static uint8_t *nal_end = NULL;
if (!p) {
p = (uint8_t *)get_mem(0, NULL, len);
end = p + len;
len = 0;
nal_start = (uint8_t *)ff_avc_find_startcode(p, end);
}
while (nal_start < end && !*(nal_start++))
;
if (nal_start == end)
return std::make_pair(0, nullptr);
nal_end = (uint8_t *)ff_avc_find_startcode(nal_start, end);
auto ret = std::make_pair((size_t)(nal_end - nal_start), (uint8_t *)nal_start);
len += 4 + nal_end - nal_start;
nal_start = nal_end;
return ret;
}
H265LatencyFramedSource *H265LatencyFramedSource::createNew(UsageEnvironment& env)
{
return new H265LatencyFramedSource(env);
}
H265LatencyFramedSource::H265LatencyFramedSource(UsageEnvironment& env):
FramedSource(env)
{
period = (uint64_t)((1000 / (float)30) * 1000);
if (!eventTriggerId)
eventTriggerId = envir().taskScheduler().createEventTrigger(deliverFrame0);
}
H265LatencyFramedSource::~H265LatencyFramedSource()
{
if (!--referenceCount) {
envir().taskScheduler().deleteEventTrigger(eventTriggerId);
eventTriggerId = 0;
}
}
void H265LatencyFramedSource::doGetNextFrame()
{
if (!initialized) {
s_tmr = std::chrono::high_resolution_clock::now();
initialized = true;
}
deliverFrame();
}
void H265LatencyFramedSource::deliverFrame0(void *clientData)
{
((H265LatencyFramedSource *)clientData)->deliverFrame();
}
void H265LatencyFramedSource::deliverFrame()
{
if (!isCurrentlyAwaitingData())
return;
auto nal = find_next_nal();
if (!nal.first || !nal.second)
return;
uint64_t runtime = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - s_tmr
).count();
if (runtime < current * period)
std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime));
/* try to hold fps for intra/inter frames only */
if (nal.first > 1500)
++current;
/* Start timer for the frame
* RTP sink will calculate the time difference once the frame is received */
/* printf("send frame %u, size %u\n", current, nal.first); */
uint64_t key = nal.first;
if (timestamps.find(key) != timestamps.end()) {
fprintf(stderr, "cannot use size as timestamp for this frame!\n");
exit(EXIT_FAILURE);
}
timestamps[key].first = std::chrono::high_resolution_clock::now();
timestamps[key].second = nal.first;
uint8_t *newFrameDataStart = nal.second;
unsigned newFrameSize = nal.first;
if (newFrameSize > fMaxSize) {
fFrameSize = fMaxSize;
fNumTruncatedBytes = newFrameSize - fMaxSize;
} else {
fFrameSize = newFrameSize;
}
fDurationInMicroseconds = 0;
memmove(fTo, newFrameDataStart, fFrameSize);
FramedSource::afterGetting(this);
}
static void thread_func(void)
{
unsigned prev_frames = UINT_MAX;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
if (prev_frames == frames) {
/* fprintf(stderr, "frame lost\n"); */
break;
}
prev_frames = frames;
}
fprintf(stderr, "%zu: intra %lf, inter %lf, avg %lf\n",
frames,
intra_total / (float)nintras,
inter_total / (float)ninters,
frame_total / (float)frames
);
exit(EXIT_FAILURE);
}
RTPSink_::RTPSink_(UsageEnvironment& env):
MediaSink(env)
{
fReceiveBuffer = new uint8_t[BUFFER_SIZE];
}
RTPSink_::~RTPSink_()
{
}
void RTPSink_::uninit()
{
stopPlaying();
delete fReceiveBuffer;
}
void RTPSink_::afterGettingFrame(
void *clientData,
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
)
{
((RTPSink_ *)clientData)->afterGettingFrame(
frameSize,
numTruncatedBytes,
presentationTime,
durationInMicroseconds
);
}
void RTPSink_::afterGettingFrame(
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
)
{
(void)frameSize, (void)numTruncatedBytes;
(void)presentationTime, (void)durationInMicroseconds;
/* start loop that monitors activity and if there has been
* no activity for 2s (same as uvgRTP) the receiver is stopped) */
if (!frames)
(void)new std::thread(thread_func);
uint64_t diff;
uint8_t nal_type;
uint64_t key = frameSize;
/* printf("recv frame %zu, size %u\n", frames + 1, frameSize); */
if (timestamps.find(key) == timestamps.end()) {
printf("frame %zu,%zu not found from set!\n", frames + 1, key);
exit(EXIT_FAILURE);
}
if (timestamps[key].second != frameSize) {
printf("frame size mismatch (%zu vs %zu)\n", timestamps[key].second, frameSize);
exit(EXIT_FAILURE);
}
diff = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - timestamps[key].first
).count();
timestamps.erase(key);
nal_type = (fReceiveBuffer[0] >> 1) & 0x3f;
if (nal_type == 19 || nal_type == 1) {
if (nal_type == 19)
nintras++, intra_total += (diff / 1000);
else
ninters++, inter_total += (diff / 1000);
frame_total += (diff / 1000);
}
if (++frames == 601) {
fprintf(stderr, "%zu: intra %lf, inter %lf, avg %lf\n",
frames,
intra_total / (float)nintras,
inter_total / (float)ninters,
frame_total / (float)frames
);
exit(EXIT_SUCCESS);
}
continuePlaying();
}
Boolean RTPSink_::continuePlaying()
{
if (!fSource)
return False;
fSource->getNextFrame(
fReceiveBuffer,
BUFFER_SIZE,
afterGettingFrame,
this,
onSourceClosure,
this
);
return True;
}
static int sender(char *addr)
{
(void)addr;
H265VideoStreamDiscreteFramer *framer;
H265LatencyFramedSource *framedSource;
TaskScheduler *scheduler;
UsageEnvironment *env;
RTPSink *videoSink;
RTPSource *source;
RTPSink_ *sink;
scheduler = BasicTaskScheduler::createNew();
env = BasicUsageEnvironment::createNew(*scheduler);
OutPacketBuffer::maxSize = 40 * 1000 * 1000;
Port send_port(8888);
struct in_addr dst_addr;
dst_addr.s_addr = our_inet_addr("10.21.25.2");
Port recv_port(8889);
struct in_addr src_addr;
src_addr.s_addr = our_inet_addr("0.0.0.0");
Groupsock send_socket(*env, dst_addr, send_port, 255);
Groupsock recv_socket(*env, src_addr, recv_port, 255);
/* sender */
videoSink = H265VideoRTPSink::createNew(*env, &send_socket, 96);
framedSource = H265LatencyFramedSource::createNew(*env);
framer = H265VideoStreamDiscreteFramer::createNew(*env, framedSource);
/* receiver */
source = H265VideoRTPSource::createNew(*env, &recv_socket, 96);
sink = new RTPSink_(*env);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
videoSink->startPlaying(*framer, NULL, videoSink);
sink->startPlaying(*source, nullptr, nullptr);
env->taskScheduler().doEventLoop();
return 0;
}
int main(int argc, char **argv)
{
(void)argc, (void)argv;
return sender(argv[2]);
}

116
live555/latsink.cc Normal file
View File

@ -0,0 +1,116 @@
#include <chrono>
#include <climits>
#include <thread>
#include <RTPInterface.hh>
#include "latsink.hh"
#define BUFFER_SIZE 40 * 1000 * 1000
static size_t frames = 0;
static size_t bytes = 0;
static std::chrono::high_resolution_clock::time_point start;
static std::chrono::high_resolution_clock::time_point last;
static void thread_func(void)
{
unsigned prev_frames = UINT_MAX;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
if (prev_frames == frames)
break;
prev_frames = frames;
}
fprintf(stderr, "%zu %zu %lu\n", bytes, frames,
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count()
);
exit(EXIT_FAILURE);
}
RTPLatencySink::RTPLatencySink(UsageEnvironment& env):
MediaSink(env)
{
fReceiveBuffer = new uint8_t[BUFFER_SIZE];
}
RTPLatencySink::~RTPLatencySink()
{
}
void RTPLatencySink::uninit()
{
stopPlaying();
delete fReceiveBuffer;
}
void RTPLatencySink::afterGettingFrame(
void *clientData,
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
)
{
((RTPLatencySink *)clientData)->afterGettingFrame(
frameSize,
numTruncatedBytes,
presentationTime,
durationInMicroseconds
);
}
void RTPLatencySink::afterGettingFrame(
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
)
{
(void)frameSize, (void)numTruncatedBytes;
(void)presentationTime, (void)durationInMicroseconds;
/* start loop that monitors activity and if there has been
* no activity for 2s (same as uvgRTP) the receiver is stopped) */
if (!frames)
(void)new std::thread(thread_func);
fprintf(stderr, "got frame\n");
if (++frames == 601) {
fprintf(stderr, "%zu %zu %lu\n", bytes, frames,
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count()
);
exit(EXIT_SUCCESS);
}
continuePlaying();
}
void RTPLatencySink::process()
{
}
Boolean RTPLatencySink::continuePlaying()
{
if (!fSource)
return False;
fSource->getNextFrame(
fReceiveBuffer,
BUFFER_SIZE,
afterGettingFrame,
this,
onSourceClosure,
this
);
return True;
}

32
live555/latsink.hh Normal file
View File

@ -0,0 +1,32 @@
#include <H265VideoRTPSink.hh>
class RTPLatencySink : public MediaSink
{
public:
RTPLatencySink(UsageEnvironment& env);
virtual ~RTPLatencySink();
void uninit();
static void afterGettingFrame(
void *clientData,
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
);
void afterGettingFrame(
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
);
protected:
void process();
private:
virtual Boolean continuePlaying();
uint8_t *fReceiveBuffer;
};

185
live555/latsource.cc Normal file
View File

@ -0,0 +1,185 @@
#include <GroupsockHelper.hh>
#include <FramedSource.hh>
#include "latsource.hh"
#include <chrono>
#include <mutex>
#include <queue>
#include <thread>
EventTriggerId H265LatencyFramedSource::eventTriggerId = 0;
unsigned H265LatencyFramedSource::referenceCount = 0;
extern void *get_mem(int, char **, size_t&);
extern int get_next_frame_start(uint8_t *, uint32_t, uint32_t, uint8_t&);
static uint8_t *buf;
static size_t offset = 0;
static size_t bytes = 0;
static uint64_t current = 0;
static uint64_t period = 0;
static bool initialized = false;
std::mutex delivery_mtx;
std::queue<std::pair<size_t, uint8_t *>> nals;
std::chrono::high_resolution_clock::time_point s_tmr, e_tmr;
static const uint8_t *ff_avc_find_startcode_internal(const uint8_t *p, const uint8_t *end)
{
const uint8_t *a = p + 4 - ((intptr_t)p & 3);
for (end -= 3; p < a && p < end; p++) {
if (p[0] == 0 && p[1] == 0 && p[2] == 1)
return p;
}
for (end -= 3; p < end; p += 4) {
uint32_t x = *(const uint32_t*)p;
// if ((x - 0x01000100) & (~x) & 0x80008000) // little endian
// if ((x - 0x00010001) & (~x) & 0x00800080) // big endian
if ((x - 0x01010101) & (~x) & 0x80808080) { // generic
if (p[1] == 0) {
if (p[0] == 0 && p[2] == 1)
return p;
if (p[2] == 0 && p[3] == 1)
return p+1;
}
if (p[3] == 0) {
if (p[2] == 0 && p[4] == 1)
return p+2;
if (p[4] == 0 && p[5] == 1)
return p+3;
}
}
}
for (end += 3; p < end; p++) {
if (p[0] == 0 && p[1] == 0 && p[2] == 1)
return p;
}
return end + 3;
}
const uint8_t *ff_avc_find_startcode(const uint8_t *p, const uint8_t *end)
{
const uint8_t *out= ff_avc_find_startcode_internal(p, end);
if (p < out && out < end && !out[-1]) out--;
return out;
}
static std::pair<size_t, uint8_t *> find_next_nal(void)
{
static size_t len = 0;
static uint8_t *p = NULL;
static uint8_t *end = NULL;
static uint8_t *nal_start = NULL;
static uint8_t *nal_end = NULL;
if (!p) {
p = (uint8_t *)get_mem(0, NULL, len);
end = p + len;
len = 0;
nal_start = (uint8_t *)ff_avc_find_startcode(p, end);
}
while (nal_start < end && !*(nal_start++))
;
if (nal_start == end)
return std::make_pair(0, nullptr);
nal_end = (uint8_t *)ff_avc_find_startcode(nal_start, end);
auto ret = std::make_pair((size_t)(nal_end - nal_start), (uint8_t *)nal_start);
len += 4 + nal_end - nal_start;
nal_start = nal_end;
return ret;
}
H265LatencyFramedSource *H265LatencyFramedSource::createNew(UsageEnvironment& env, std::mutex& lat_mtx)
{
return new H265LatencyFramedSource(env, lat_mtx);
}
H265LatencyFramedSource::H265LatencyFramedSource(UsageEnvironment& env, std::mutex& lat_mtx):
FramedSource(env),
mtx_(lat_mtx)
{
period = (uint64_t)((1000 / (float)30) * 1000);
if (!eventTriggerId)
eventTriggerId = envir().taskScheduler().createEventTrigger(deliverFrame0);
}
H265LatencyFramedSource::~H265LatencyFramedSource()
{
if (!--referenceCount) {
envir().taskScheduler().deleteEventTrigger(eventTriggerId);
eventTriggerId = 0;
}
}
void H265LatencyFramedSource::doGetNextFrame()
{
if (!initialized) {
s_tmr = std::chrono::high_resolution_clock::now();
initialized = true;
}
deliverFrame();
}
void H265LatencyFramedSource::deliverFrame0(void *clientData)
{
((H265LatencyFramedSource *)clientData)->deliverFrame();
}
void H265LatencyFramedSource::deliverFrame()
{
if (!isCurrentlyAwaitingData())
return;
mtx_.lock();
fprintf(stderr, "send frame\n");
auto nal = find_next_nal();
if (!nal.first || !nal.second) {
e_tmr = std::chrono::high_resolution_clock::now();
uint64_t diff = (uint64_t)std::chrono::duration_cast<std::chrono::milliseconds>(e_tmr - s_tmr).count();
fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n",
bytes, bytes / 1000, bytes / 1000 / 1000,
diff, diff / 1000
);
exit(EXIT_SUCCESS);
}
uint64_t runtime = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - s_tmr
).count();
if (runtime < current * period)
std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime));
/* try to hold fps for intra/inter frames only */
if (nal.first > 1500)
++current;
uint8_t *newFrameDataStart = nal.second;
unsigned newFrameSize = nal.first;
bytes += newFrameSize;
if (newFrameSize > fMaxSize) {
fFrameSize = fMaxSize;
fNumTruncatedBytes = newFrameSize - fMaxSize;
} else {
fFrameSize = newFrameSize;
}
fDurationInMicroseconds = 0;
memmove(fTo, newFrameDataStart, fFrameSize);
FramedSource::afterGetting(this);
}

23
live555/latsource.hh Normal file
View File

@ -0,0 +1,23 @@
#ifndef __h265_framed_source_h__
#define __h265_framed_source_h__
#include <FramedSource.hh>
class H265LatencyFramedSource : public FramedSource {
public:
static H265LatencyFramedSource *createNew(UsageEnvironment& env);
static EventTriggerId eventTriggerId;
protected:
H265LatencyFramedSource(UsageEnvironment& env);
virtual ~H265LatencyFramedSource();
private:
void deliverFrame();
virtual void doGetNextFrame();
static void deliverFrame0(void *clientData);
static unsigned referenceCount;
};
#endif /* __h265_framed_source_h__ */

25
live555/receiver.cc Normal file
View File

@ -0,0 +1,25 @@
#include <liveMedia/liveMedia.hh>
#include <BasicUsageEnvironment.hh>
#include <GroupsockHelper.hh>
#include "sink.hh"
int main(int argc, char **argv)
{
(void)argc, (void)argv;
TaskScheduler *scheduler = BasicTaskScheduler::createNew();
UsageEnvironment *env = BasicUsageEnvironment::createNew(*scheduler);
Port rtpPort(8888);
struct in_addr dst_addr;
dst_addr.s_addr = our_inet_addr("0.0.0.0");
Groupsock rtpGroupsock(*env, dst_addr, rtpPort, 255);
OutPacketBuffer::maxSize = 40 * 1000 * 1000;
RTPSource *source = H265VideoRTPSource::createNew(*env, &rtpGroupsock, 96);
RTPSink_ *sink = new RTPSink_(*env);
sink->startPlaying(*source, nullptr, nullptr);
env->taskScheduler().doEventLoop();
}

37
live555/sender.cc Normal file
View File

@ -0,0 +1,37 @@
#include <liveMedia/liveMedia.hh>
#include <BasicUsageEnvironment.hh>
#include <GroupsockHelper.hh>
#include "source.hh"
#include "H265VideoStreamDiscreteFramer.hh"
int main(int argc, char **argv)
{
if (argc != 5) {
fprintf(stderr, "usage: ./%s <addr> <# of threads> <fps> <mode>\n", __FILE__);
return -1;
}
H265VideoStreamDiscreteFramer *framer;
H265FramedSource *framedSource;
TaskScheduler *scheduler;
UsageEnvironment *env;
RTPSink *videoSink;
scheduler = BasicTaskScheduler::createNew();
env = BasicUsageEnvironment::createNew(*scheduler);
framedSource = H265FramedSource::createNew(*env, atoi(argv[3]));
framer = H265VideoStreamDiscreteFramer::createNew(*env, framedSource);
Port rtpPort(8888);
struct in_addr dst_addr;
dst_addr.s_addr = our_inet_addr("10.21.25.2");
Groupsock rtpGroupsock(*env, dst_addr, rtpPort, 255);
OutPacketBuffer::maxSize = 40 * 1000 * 1000;
videoSink = H265VideoRTPSink::createNew(*env, &rtpGroupsock, 96);
videoSink->startPlaying(*framer, NULL, videoSink);
env->taskScheduler().doEventLoop();
}

114
live555/sink.cc Normal file
View File

@ -0,0 +1,114 @@
#include <chrono>
#include <climits>
#include <thread>
#include <RTPInterface.hh>
#include "sink.hh"
#define BUFFER_SIZE 1600000
size_t frames = 0;
size_t bytes = 0;
std::chrono::high_resolution_clock::time_point start;
std::chrono::high_resolution_clock::time_point last;
static void thread_func(void)
{
unsigned prev_frames = UINT_MAX;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
if (prev_frames == frames)
break;
prev_frames = frames;
}
fprintf(stderr, "%zu %zu %lu\n", bytes, frames,
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count()
);
exit(EXIT_FAILURE);
}
RTPSink_::RTPSink_(UsageEnvironment& env):
MediaSink(env)
{
fReceiveBuffer = new uint8_t[BUFFER_SIZE];
}
RTPSink_::~RTPSink_()
{
}
void RTPSink_::uninit()
{
stopPlaying();
delete fReceiveBuffer;
}
void RTPSink_::afterGettingFrame(
void *clientData,
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
)
{
((RTPSink_ *)clientData)->afterGettingFrame(
frameSize,
numTruncatedBytes,
presentationTime,
durationInMicroseconds
);
}
void RTPSink_::afterGettingFrame(
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
)
{
(void)frameSize, (void)numTruncatedBytes;
(void)presentationTime, (void)durationInMicroseconds;
/* start loop that monitors activity and if there has been
* no activity for 2s (same as uvgRTP) the receiver is stopped) */
if (!frames)
(void)new std::thread(thread_func);
if (++frames == 601) {
fprintf(stderr, "%zu %zu %lu\n", bytes, frames,
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count()
);
exit(EXIT_SUCCESS);
}
continuePlaying();
}
void RTPSink_::process()
{
}
Boolean RTPSink_::continuePlaying()
{
if (!fSource)
return False;
fSource->getNextFrame(
fReceiveBuffer,
BUFFER_SIZE,
afterGettingFrame,
this,
onSourceClosure,
this
);
return True;
}

32
live555/sink.hh Normal file
View File

@ -0,0 +1,32 @@
#include <H265VideoRTPSink.hh>
class RTPSink_ : public MediaSink
{
public:
RTPSink_(UsageEnvironment& env);
virtual ~RTPSink_();
void uninit();
static void afterGettingFrame(
void *clientData,
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
);
void afterGettingFrame(
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds
);
protected:
void process();
private:
virtual Boolean continuePlaying();
uint8_t *fReceiveBuffer;
};

186
live555/source.cc Normal file
View File

@ -0,0 +1,186 @@
#include <GroupsockHelper.hh>
#include <FramedSource.hh>
#include "source.hh"
#include <chrono>
#include <mutex>
#include <queue>
#include <thread>
EventTriggerId H265FramedSource::eventTriggerId = 0;
unsigned H265FramedSource::referenceCount = 0;
extern void *get_mem(int, char **, size_t&);
extern int get_next_frame_start(uint8_t *, uint32_t, uint32_t, uint8_t&);
uint8_t *buf;
size_t offset = 0;
size_t bytes = 0;
uint64_t current = 0;
uint64_t period = 0;
bool initialized = false;
std::mutex delivery_mtx;
std::queue<std::pair<size_t, uint8_t *>> nals;
std::chrono::high_resolution_clock::time_point s_tmr, e_tmr;
static const uint8_t *ff_avc_find_startcode_internal(const uint8_t *p, const uint8_t *end)
{
const uint8_t *a = p + 4 - ((intptr_t)p & 3);
for (end -= 3; p < a && p < end; p++) {
if (p[0] == 0 && p[1] == 0 && p[2] == 1)
return p;
}
for (end -= 3; p < end; p += 4) {
uint32_t x = *(const uint32_t*)p;
// if ((x - 0x01000100) & (~x) & 0x80008000) // little endian
// if ((x - 0x00010001) & (~x) & 0x00800080) // big endian
if ((x - 0x01010101) & (~x) & 0x80808080) { // generic
if (p[1] == 0) {
if (p[0] == 0 && p[2] == 1)
return p;
if (p[2] == 0 && p[3] == 1)
return p+1;
}
if (p[3] == 0) {
if (p[2] == 0 && p[4] == 1)
return p+2;
if (p[4] == 0 && p[5] == 1)
return p+3;
}
}
}
for (end += 3; p < end; p++) {
if (p[0] == 0 && p[1] == 0 && p[2] == 1)
return p;
}
return end + 3;
}
const uint8_t *ff_avc_find_startcode(const uint8_t *p, const uint8_t *end)
{
const uint8_t *out= ff_avc_find_startcode_internal(p, end);
if (p < out && out < end && !out[-1]) out--;
return out;
}
static std::pair<size_t, uint8_t *> find_next_nal(void)
{
static size_t len = 0;
static uint8_t *p = NULL;
static uint8_t *end = NULL;
static uint8_t *nal_start = NULL;
static uint8_t *nal_end = NULL;
if (!p) {
p = (uint8_t *)get_mem(0, NULL, len);
end = p + len;
len = 0;
nal_start = (uint8_t *)ff_avc_find_startcode(p, end);
}
while (nal_start < end && !*(nal_start++))
;
if (nal_start == end)
return std::make_pair(0, nullptr);
nal_end = (uint8_t *)ff_avc_find_startcode(nal_start, end);
auto ret = std::make_pair((size_t)(nal_end - nal_start), (uint8_t *)nal_start);
len += 4 + nal_end - nal_start;
nal_start = nal_end;
return ret;
}
H265FramedSource *H265FramedSource::createNew(UsageEnvironment& env, unsigned fps)
{
return new H265FramedSource(env, fps);
}
H265FramedSource::H265FramedSource(UsageEnvironment& env, unsigned fps):
FramedSource(env),
fps_(fps)
{
period = (uint64_t)((1000 / (float)fps) * 1000);
if (!eventTriggerId)
eventTriggerId = envir().taskScheduler().createEventTrigger(deliverFrame0);
}
H265FramedSource::~H265FramedSource()
{
if (!--referenceCount) {
envir().taskScheduler().deleteEventTrigger(eventTriggerId);
eventTriggerId = 0;
}
}
void H265FramedSource::doGetNextFrame()
{
if (!initialized) {
s_tmr = std::chrono::high_resolution_clock::now();
initialized = true;
}
deliverFrame();
}
void H265FramedSource::deliverFrame0(void *clientData)
{
((H265FramedSource *)clientData)->deliverFrame();
}
void H265FramedSource::deliverFrame()
{
if (!isCurrentlyAwaitingData())
return;
delivery_mtx.lock();
auto nal = find_next_nal();
if (!nal.first || !nal.second) {
e_tmr = std::chrono::high_resolution_clock::now();
uint64_t diff = (uint64_t)std::chrono::duration_cast<std::chrono::milliseconds>(e_tmr - s_tmr).count();
fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n",
bytes, bytes / 1000, bytes / 1000 / 1000,
diff, diff / 1000
);
exit(EXIT_SUCCESS);
}
uint64_t runtime = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - s_tmr
).count();
if (runtime < current * period)
std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime));
/* try to hold fps for intra/inter frames only */
if (nal.first > 1500)
++current;
uint8_t *newFrameDataStart = nal.second;
unsigned newFrameSize = nal.first;
bytes += newFrameSize;
if (newFrameSize > fMaxSize) {
fFrameSize = fMaxSize;
fNumTruncatedBytes = newFrameSize - fMaxSize;
} else {
fFrameSize = newFrameSize;
}
fDurationInMicroseconds = 0;
memmove(fTo, newFrameDataStart, fFrameSize);
delivery_mtx.unlock();
FramedSource::afterGetting(this);
}

36
live555/source.hh Normal file
View File

@ -0,0 +1,36 @@
#ifndef __h265_framed_source_h__
#define __h265_framed_source_h__
#include <FramedSource.hh>
class H265FramedSource: public FramedSource {
public:
static H265FramedSource *createNew(UsageEnvironment& env, unsigned fps);
public:
static EventTriggerId eventTriggerId;
// Note that this is defined here to be a static class variable, because this code is intended to illustrate how to
// encapsulate a *single* device - not a set of devices.
// You can, however, redefine this to be a non-static member variable.
void deliver_frame();
protected:
H265FramedSource(UsageEnvironment& env, unsigned fps);
// called only by createNew(), or by subclass constructors
virtual ~H265FramedSource();
private:
// redefined virtual functions:
virtual void doGetNextFrame();
//virtual void doStopGettingFrames(); // optional
private:
static void deliverFrame0(void* clientData);
void deliverFrame();
private:
static unsigned referenceCount; // used to count how many instances of this class currently exist
unsigned fps_;
};
#endif /* __h265_framed_source_h__ */

427
parse.pl Executable file
View File

@ -0,0 +1,427 @@
#!/usr/bin/env perl
use warnings;
use strict;
use Getopt::Long;
use Cwd qw(realpath);
my $TOTAL_FRAMES_UVGRTP = 602;
my $TOTAL_FRAMES_LIVE555 = 601;
my $TOTAL_FRAMES_FFMPEG = 598;
my $TOTAL_BYTES = 411410113;
# open the file, validate it and return file handle to caller
sub open_file {
my ($path, $expect) = @_;
my $lines = 0;
open(my $fh, '<', $path) or die "failed to open file: $path";
$lines++ while (<$fh>);
# if ($lines != $expect) {
# return undef;
# }
seek $fh, 0, 0;
return $fh;
}
sub goodput {
if ($_[2] eq "mbit") { return ($_[0] / 1000 / 1000) / $_[1] * 8 * 1000; }
elsif ($_[2] eq "mb") { return ($_[0] / 1000 / 1000) / $_[1] * 1000; }
else { return ($_[0] / 1000 / 1000) / $_[1] * 8; }
}
sub get_frame_count {
return ($_[0] eq "uvgrtp") ? $TOTAL_FRAMES_UVGRTP :
($_[0] eq "ffmpeg") ? $TOTAL_FRAMES_FFMPEG : $TOTAL_FRAMES_LIVE555;
}
sub parse_send {
my ($lib, $iter, $threads, $path, $unit) = @_;
my ($t_usr, $t_sys, $t_cpu, $t_total, $t_time);
my ($t_sgp, $t_tgp, $fh);
if ($lib eq "uvgrtp") {
my $e = ($iter * ($threads + 2));
$fh = open_file($path, $e);
return if not defined $fh;
} else {
open $fh, '<', $path or die "failed to open file\n";
}
# each iteration parses one benchmark run
# and each benchmark run can have 1..N entries, one for each thread
START: while (my $line = <$fh>) {
my $rt_avg = 0;
my $rb_avg = 0;
next if index ($line, "kB") == -1 or index ($line, "MB") == -1;
# for multiple threads there are two numbers:
# - single thread performance
# -> for each thread, calculate the speed at which the data was sent,
# sum all those together and divide by the number of threads
#
# - total performance
# -> (amount of data * number of threads) / total time spent
#
for (my $i = 0; $i < $threads; $i++) {
next START if grep /terminated|corrupt/, $line;
my @nums = $line =~ /(\d+)/g;
$rt_avg += $nums[3];
$line = <$fh>;
}
$rt_avg /= $threads;
next START if grep /terminated|corrupt/, $line;
$line = <$fh> if grep /flush|Command/, $line;
my ($usr, $sys, $total, $cpu) = ($line =~ m/(\d+\.\d+)user\s(\d+\.\d+)system\s0:(\d+.\d+)elapsed\s(\d+)%CPU/);
# discard line about inputs, outputs and pagefaults
$line = <$fh>;
# update total
$t_usr += $usr;
$t_sys += $sys;
$t_cpu += $cpu;
$t_total += $total;
$t_sgp += goodput($TOTAL_BYTES, $rt_avg, $unit);
}
$t_sgp = $t_sgp / $iter;
$t_tgp = ($threads > 1) ? goodput($TOTAL_BYTES * $threads, $t_total / $iter, $unit) : $t_sgp;
close $fh;
return ($path, $t_usr / $iter, $t_sys / $iter, $t_cpu / $iter, $t_total / $iter, $t_sgp, $t_tgp);
}
sub parse_recv {
my ($lib, $iter, $threads, $path, $unit) = @_;
my ($t_usr, $t_sys, $t_cpu, $t_total, $tb_avg, $tf_avg, $tt_avg, $fh);
my $tf = get_frame_count($lib);
if ($lib eq "uvgrtp") {
my $e = ($iter * ($threads + 2));
$fh = open_file($path, $e);
} else {
open $fh, '<', $path or die "failed to open file $path\n";
}
# each iteration parses one benchmark run
while (my $line = <$fh>) {
my ($a_f, $a_b, $a_t) = (0) x 3;
# make sure this is a line produced by the benchmarking script before proceeding
if ($lib eq "ffmpeg") {
my @nums = $line =~ /(\d+)/g;
next if $#nums != 2 or grep /jitter/, $line;
}
# calculate avg bytes/frames/time
for (my $i = 0; $i < $threads; $i++) {
my @nums = $line =~ /(\d+)/g;
$a_b += $nums[0];
$a_f += $nums[1];
$a_t += $nums[2];
$line = <$fh>;
}
$tf_avg += ($a_f / $threads);
$tb_avg += ($a_b / $threads);
$tt_avg += ($a_t / $threads);
my ($usr, $sys, $total, $cpu) = ($line =~ m/(\d+\.\d+)user\s(\d+\.\d+)system\s0:(\d+.\d+)elapsed\s(\d+)%CPU/);
# discard line about inputs, outputs and pagefaults
$line = <$fh>;
# update total
$t_usr += $usr;
$t_sys += $sys;
$t_cpu += $cpu;
$t_total += $total;
}
my $bytes = 100 * (($tb_avg / $iter) / $TOTAL_BYTES);
my $frames = 100 * (($tf_avg / $iter) / $tf);
my $gp = goodput(($TOTAL_BYTES * ($bytes / 100), ($tt_avg / $iter)), $unit);
close $fh;
return ($path, $t_usr / $iter, $t_sys / $iter, $t_cpu / $iter, $t_total / $iter, $frames, $bytes, $gp);
}
sub print_recv {
my ($path, $usr, $sys, $cpu, $total, $a_f, $a_b, $a_t) = parse_recv(@_);
if (defined $path) {
print "$path: \n";
print "\tuser: $usr \n";
print "\tsystem: $sys \n";
print "\tcpu: $cpu \n";
print "\ttotal: $total\n";
print "\tavg frames: $a_f\n";
print "\tavg bytes: $a_b\n";
print "\trecv goodput: $a_t\n";
}
}
sub print_send {
my ($path, $usr, $sys, $cpu, $total, $sgp, $tgp) = parse_send(@_);
if (defined $path) {
print "$path: \n";
print "\tuser: $usr\n";
print "\tsystem: $sys\n";
print "\tcpu: $cpu\n";
print "\ttotal: $total\n";
print "\tgoodput, single: $sgp\n";
print "\tgoodput, total: $tgp\n";
}
}
sub parse_csv {
my ($lib, $iter, $path, $unit) = @_;
my ($threads, $fps, $ofps, $fiter, %a) = (0) x 4;
opendir my $dir, realpath($path);
foreach my $fh (grep /(recv|send)/, readdir $dir) {
($threads, $ofps, $fiter) = ($fh =~ /(\d+)threads_(\d+)fps_(\d+)iter/g);
$iter = $fiter if $fiter;
print "unable to determine iter, skipping file $fh\n" and next if !$iter;
$fps = sprintf("%05d", $ofps);
my @values;
if (grep /recv/, $fh) {
@values = parse_recv($lib, $iter, $threads, realpath($path) . "/" . $fh, $unit);
shift @values;
if (not exists $a{"$threads $fps"}) {
$a{"$threads $fps"} = join(" ", @values);
} else {
$a{"$threads $fps"} = join(" ", @values) . " " . $a{"$threads $fps"};
}
} else {
@values = parse_send($lib, $iter, $threads, realpath($path) . "/" . $fh, $unit);
shift @values;
if (not exists $a{"$threads $fps"}) {
$a{"$threads $fps"} = join(" ", @values) . " $ofps";
} else {
$a{"$threads $fps"} = $a{"$threads $fps"} . " " . join(" ", @values) . " $ofps";
}
}
}
my $c_key = 0;
open my $cfh, '>', "$lib.csv" or die "failed to open file: $lib.csv";
my (@r_u, @r_s, @r_c, @r_t, @r_f, @r_b, @r_m) = () x 7;
my (@s_u, @s_s, @s_c, @s_t, @s_sg, @s_tg, @s_f) = () x 7;
foreach my $key (sort(keys %a)) {
my $spz = (split " ", $key)[0];
if ($spz != $c_key){
if ($spz ne 1) {
print $cfh "recv usr;" . join(";", @r_u) . "\n";
print $cfh "recv sys;" . join(";", @r_s) . "\n";
print $cfh "recv cpu;" . join(";", @r_c) . "\n";
print $cfh "recv total;" . join(";", @r_t) . "\n";
print $cfh "frames received;". join(";", @r_f) . "\n";
print $cfh "bytes received;" . join(";", @r_b) . "\n";
print $cfh "time estimate;" . join(";", @r_m) . "\n";
print $cfh "send usr;" . join(";", @s_u) . "\n";
print $cfh "send sys;" . join(";", @s_s) . "\n";
print $cfh "send cpu;" . join(";", @s_c) . "\n";
print $cfh "send total;" . join(";", @s_t) . "\n";
print $cfh "single goodput;" . join(";", @s_sg) . "\n";
print $cfh "total goodput;" . join(";", @s_tg) . "\n";
print $cfh "fps;" . join(";", @s_f) . "\n\n";
}
print $cfh "$spz threads;\n";
$c_key = $spz;
(@r_f, @r_b, @r_m, @r_c, @r_u, @r_s, @r_t) = () x 7;
(@s_c, @s_u, @s_s, @s_t, @s_sg, @s_tg, @s_f) = () x 7;
}
my @comp = split " ", $a{$key};
push @r_u, $comp[0]; push @r_s, $comp[1]; push @r_c, $comp[2];
push @r_t, $comp[3]; push @r_f, $comp[4]; push @r_b, $comp[5];
push @r_m, $comp[6]; push @s_u, $comp[7]; push @s_s, $comp[8];
push @s_c, $comp[9]; push @s_t, $comp[10]; push @s_sg, $comp[11];
push @s_tg, $comp[12]; push @s_f, $comp[13];
}
print $cfh "recv usr;" . join(";", @r_u) . "\n";
print $cfh "recv sys;" . join(";", @r_s) . "\n";
print $cfh "recv cpu;" . join(";", @r_c) . "\n";
print $cfh "recv total;" . join(";", @r_t) . "\n";
print $cfh "frames received;". join(";", @r_f) . "\n";
print $cfh "bytes received;" . join(";", @r_b) . "\n";
print $cfh "recv goodput;" . join(";", @r_m) . "\n";
print $cfh "send usr;" . join(";", @s_u) . "\n";
print $cfh "send sys;" . join(";", @s_s) . "\n";
print $cfh "send cpu;" . join(";", @s_c) . "\n";
print $cfh "send total;" . join(";", @s_t) . "\n";
print $cfh "single goodput;" . join(";", @s_sg) . "\n";
print $cfh "total goodput;" . join(";", @s_tg) . "\n";
print $cfh "fps;" . join(";", @s_f) . "\n";
close $cfh;
}
sub parse {
my ($lib, $iter, $path, $pkt_loss, $frame_loss, $type, $unit) = @_;
my ($tgp, $tgp_k, $sgp, $sgp_k, $threads, $fps, $fiter, %a) = (0) x 7;
opendir my $dir, realpath($path);
foreach my $fh (grep /recv/, readdir $dir) {
($threads, $fps, $fiter) = ($fh =~ /(\d+)threads_(\d+)fps_(\d+)iter/g);
$iter = $fiter if $fiter;
print "unable to determine iter, skipping file $fh\n" and next if !$iter;
my @values = parse_recv($lib, $iter, $threads, realpath($path) . "/" . $fh, $unit);
if (100.0 - $values[5] <= $frame_loss and 100.0 - $values[6] <= $pkt_loss) {
$a{"$threads $fps"} = $path;
}
}
rewinddir $dir;
foreach my $fh (grep /send/, readdir $dir) {
($threads, $fps, $fiter) = ($fh =~ /(\d+)threads_(\d+)fps_(\d+)iter/g);
$iter = $fiter if $fiter;
print "unable to determine iter, skipping file $fh\n" and next if !$iter;
my @values = parse_send($lib, $iter, $threads, realpath($path) . "/" . $fh, $unit);
if (exists $a{"$threads $fps"}) {
if ($type eq "best") {
if ($values[5] > $sgp) {
$sgp = $values[5];
$sgp_k = $fh;
}
if ($values[6] > $tgp) {
$tgp = $values[6];
$tgp_k = $fh;
}
} else {
print "$fh: $values[5] $values[6]\n" if exists $a{"$threads $fps"};
}
}
}
closedir $dir;
exit if $type eq "all";
if ($sgp_k) {
print "best goodput, single thread: $sgp_k\n";
($threads, $fps) = ($sgp_k =~ /(\d+)threads_(\d+)/g);
print_send($lib, $iter, $threads, realpath($path) . "/" . $sgp_k, $unit);
} else {
print "nothing found for single best goodput\n";
}
if ($tgp_k) {
print "\nbest goodput, total: $tgp_k\n";
($threads, $fps) = ($tgp_k =~ /(\d+)threads_(\d+)/g);
print_send($lib, $iter, $threads, realpath($path) . "/" . $tgp_k, $unit);
} else {
print "nothing found for total best goodput\n";
}
}
sub parse_latency {
my ($lib, $iter, $path, $unit) = @_;
my ($ts, $avg, $intra, $inter, $cnt) = (0) x 5;
open my $fh, '<', $path or die "failed to open file $path\n";
# each iteration parses one benchmark run
while (my $line = <$fh>) {
my @nums = ($line =~ m/(\d+).*intra\s(\d+\.\d+).*inter\s(\d+\.\d+).*avg\s(\d+\.\d+)/);
$frames += $nums[0];
$intra += $nums[1];
$inter += $nums[2];
$avg += $nums[3];
$cnt += 1;
}
$intra /= $cnt;
$inter /= $cnt;
$avg /= $cnt;
$frames /= get_frame_count($lib);
print "$frames: intra $intra, inter $inter, avg $avg\n";
}
sub print_help {
print "usage (one file, send/recv):\n ./parse.pl \n"
. "\t--lib <uvgrtp|ffmpeg|live555>\n"
. "\t--role <send|recv>\n"
. "\t--unit <mb|mbit|gbit> (defaults to mb)\n"
. "\t--path <path to log file>\n"
. "\t--iter <# of iterations>)\n"
. "\t--threads <# of threads used in the benchmark> (defaults to 1)\n\n";
print "usage (latency):\n ./parse.pl \n"
. "\t--unit <mb|mbit|gbit> (defaults to mb)\n"
. "\t--path <path to log file>\n"
. "\t--parse latency\n\n";
print "usage (directory):\n ./parse.pl \n"
. "\t--parse <best|all|csv>\n"
. "\t--lib <uvgrtp|ffmpeg|live555>\n"
. "\t--iter <# of iterations>)\n"
. "\t--unit <mb|mbit|gbit> (defaults to mb)\n"
. "\t--packet-loss <allowed percentage of dropped packets> (optional)\n"
. "\t--frame-loss <allowed percentage of dropped frames> (optional)\n"
. "\t--path <path to folder with send and recv output files>\n" and exit;
}
GetOptions(
"lib|l=s" => \(my $lib = ""),
"role|r=s" => \(my $role = ""),
"path|p=s" => \(my $path = ""),
"threadst|=i" => \(my $threads = 0),
"iter|i=i" => \(my $iter = 0),
"parse|s=s" => \(my $parse = ""),
"packet-loss|p=f" => \(my $pkt_loss = 100.0),
"frame-loss|f=f" => \(my $frame_loss = 100.0),
"unit=s" => \(my $unit = "mb"),
"help" => \(my $help = 0)
) or die "failed to parse command line!\n";
$lib = $1 if (!$lib and $path =~ m/.*(uvgrtp|ffmpeg|live555).*/i);
$role = $1 if (!$role and $path =~ m/.*(recv|send).*/i);
$threads = $1 if (!$threads and $path =~ m/.*_(\d+)threads.*/i);
$iter = $1 if (!$iter and $path =~ m/.*_(\d+)iter.*/i);
print_help() if $help or (!$lib and $parse ne "latency");
print_help() if !$iter and !$parse;
print_help() if !$parse and (!$role or !$threads);
print_help() if !grep /$unit/, ("mb", "mbit", "gbit");
die "not implemented\n" if !grep (/$lib/, ("uvgrtp", "ffmpeg", "live555"));
if ($parse eq "best" or $parse eq "all") {
parse($lib, $iter, $path, $pkt_loss, $frame_loss, $parse, $unit);
} elsif ($parse eq "csv") {
parse_csv($lib, $iter, $path, $unit);
} elsif ($parse eq "latency") {
parse_latency($lib, $iter, $path, $unit);
} elsif ($role eq "send") {
print_send($lib, $iter, $threads, $path, $unit);
} elsif ($role eq "recv") {
print_recv($lib, $iter, $threads, $path, $unit);
} else {
die "unknown option!\n";
}

181
udperf.c Normal file
View File

@ -0,0 +1,181 @@
/*
* compile: gcc udperf.c
* start server: ./a.out -s
* start client: ./a.out -a 127.0.0.1
*/
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#ifndef PORT
# define PORT 8888
#endif
#ifndef PKT_LEN
# define PKT_LEN 1458
#endif
#ifndef MAX_ROUNDS
# define MAX_ROUNDS 10
#endif
#ifndef MAX_PKTS
# define MAX_PKTS 350000
#endif
static inline float diff_ms(struct timespec s, struct timespec e)
{
return ((float)((e.tv_sec - s.tv_sec) * (long)1e9 + e.tv_nsec - s.tv_nsec)) / 1000 / 1000;
}
static int usage(void)
{
fprintf(stderr, "server: ./%s -s\n", __FILE__);
fprintf(stderr, "client: ./%s -a <ip of server>\n", __FILE__);
return -1;
}
static int server(void)
{
int s_u, s_t, s_n;
struct sockaddr_in sa_u, sa_t;
/* initialize server udp socket */
memset(&sa_u, 0, sizeof(sa_u));
sa_u.sin_family = AF_INET;
sa_u.sin_port = htons(PORT);
sa_u.sin_addr.s_addr = htonl(INADDR_ANY);
s_u = socket(AF_INET, SOCK_DGRAM, 0);
(void)bind(s_u, (struct sockaddr *)&sa_u, sizeof(sa_u));
/* initialize server tcp socket */
memset(&sa_t, 0, sizeof(sa_t));
sa_t.sin_family = AF_INET;
sa_t.sin_port = htons(PORT + 1);
sa_t.sin_addr.s_addr = htonl(INADDR_ANY);
s_t = socket(AF_INET, SOCK_STREAM, 0);
(void)setsockopt(s_t, SOL_SOCKET, SO_REUSEADDR, &(int){ 1 }, sizeof(int));
(void)bind(s_t, (struct sockaddr *)&sa_t, sizeof(sa_t));
(void)listen(s_t, 1);
s_n = accept(s_t, (struct sockaddr *)&sa_t, &(socklen_t){ sizeof(sa_t) });
/* receive packets from remote and once select() timeouts,
* send how many packets were received */
uint8_t buffer[PKT_LEN];
fd_set read_fs;
FD_ZERO(&read_fs);
for (int i = 0, npkts = 0; i < MAX_ROUNDS; ++i, npkts = 0) {
while (npkts != MAX_PKTS) {
FD_SET(s_u, &read_fs);
if (!select(s_u + 1, &read_fs, NULL, NULL, &(struct timeval){ 2, 0 }))
break;
while (recv(s_u, buffer, PKT_LEN, MSG_DONTWAIT) > 0)
++npkts;
}
(void)send(s_n, &npkts, sizeof(int), 0);
}
return 0;
}
static int client(char *server_addr)
{
float total = 0;
struct timespec start, end;
uint8_t data[PKT_LEN] = { 0 };
struct sockaddr_in sa_t, sa_u;
int s_u, s_t, runtime, pkts, bytes;
if (!server_addr)
return usage();
/* initialize client udp socket */
memset(&sa_u, 0, sizeof(sa_u));
sa_u.sin_family = AF_INET;
sa_u.sin_port = htons(PORT);
(void)inet_pton(AF_INET, server_addr, &sa_u.sin_addr);
s_u = socket(AF_INET, SOCK_DGRAM, 0);
/* initialize client tcp socket */
memset(&sa_t, 0, sizeof(sa_t));
sa_t.sin_family = AF_INET;
sa_t.sin_port = htons(PORT + 1);
(void)inet_pton(AF_INET, server_addr, &sa_t.sin_addr);
s_t = socket(AF_INET, SOCK_STREAM, 0);
(void)connect(s_t, (struct sockaddr *)&sa_t, sizeof(sa_t));
for (int i = 0; i < MAX_ROUNDS; ++i) {
clock_gettime(CLOCK_MONOTONIC, &start);
for (int k = 0; k < MAX_PKTS; ++k)
(void)sendto(s_u, data, PKT_LEN, 0, (struct sockaddr *)&sa_u, sizeof(sa_u));
clock_gettime(CLOCK_MONOTONIC, &end);
runtime = diff_ms(start, end);
bytes = MAX_PKTS * PKT_LEN;
/* read how many packets the server received */
(void)recv(s_t, &pkts, sizeof(int), 0);
fprintf(stderr, "%.2lf Gb/s, %.2lf Mb/s, %d MB transferred, %.2f%% packets received\n",
(float)bytes * 8 / 1000 / 1000 / 1000 / runtime * 1000,
(float)bytes * 8 / 1000 / 1000 / runtime * 1000,
bytes / 1000 / 1000,
((float)pkts / MAX_PKTS) * 100
);
total += (float)bytes * 8 / 1000 / 1000 / 1000 / runtime * 1000;
}
fprintf(stderr, "\naverage goodput: %.2lf Gb/s\n", (float)total / MAX_ROUNDS);
return 0;
}
int main(int argc, char **argv)
{
char *cvalue = NULL;
int srv = 0;
int c;
opterr = 0;
while ((c = getopt(argc, argv, "sa:")) != -1) {
switch (c) {
case 'a':
cvalue = optarg;
break;
case 's':
srv = 1;
break;
default:
return usage();
}
}
return srv ? server() : client(cvalue);
}

199
util/live555_util.cc Normal file
View File

@ -0,0 +1,199 @@
#include <chrono>
#include <mutex>
#include <thread>
#include "live555_util.hh"
#define MAX_WRITE_SIZE 1444
extern int get_next_frame_start(uint8_t *data, uint32_t offset, uint32_t data_len, uint8_t& start_len);
FramedSourceCustom::FramedSourceCustom(UsageEnvironment *env)
:FramedSource(*env)
{
len_ = 0;
off_ = 0;
chunk_ptr_ = 0;
afterEvent_ = envir().taskScheduler().createEventTrigger((TaskFunc*)FramedSource::afterGetting);
}
FramedSourceCustom::~FramedSourceCustom()
{
}
void FramedSourceCustom::doGetNextFrame()
{
if (isCurrentlyAwaitingData())
sendFrame();
}
void FramedSourceCustom::doStopGettingFrames()
{
noMoreTasks_ = true;
}
void FramedSourceCustom::splitIntoNals()
{
uint8_t start_len;
int32_t prev_offset = 0;
int offset = get_next_frame_start((uint8_t *)c_chunk_, 0, c_chunk_len_, start_len);
prev_offset = offset;
while (offset != -1) {
offset = get_next_frame_start((uint8_t *)c_chunk_, offset, c_chunk_len_, start_len);
if (offset > 4 && offset != -1) {
nals_.push(std::make_pair(offset - prev_offset - start_len, &c_chunk_[prev_offset]));
prev_offset = offset;
}
}
if (prev_offset == -1)
prev_offset = 0;
nals_.push(std::make_pair(c_chunk_len_ - prev_offset, &c_chunk_[prev_offset]));
}
void FramedSourceCustom::sendFrame()
{
/* initialization is not ready but scheduler
* is already asking for frames, send empty frame */
if (len_ == 0 && off_ == 0) {
fFrameSize = 0;
envir().taskScheduler().triggerEvent(afterEvent_, this);
return;
}
if (c_chunk_ == nullptr) {
fpt_start_ = std::chrono::high_resolution_clock::now();
if (chunks_.empty()) {
printStats();
}
/* TODO: framer */
auto cinfo = chunks_.front();
chunks_.pop();
c_chunk_ = cinfo.second;
c_chunk_len_ = cinfo.first;
c_chunk_off_ = 0;
splitIntoNals();
}
if (c_nal_ == nullptr) {
auto ninfo = nals_.front();
nals_.pop();
c_nal_ = ninfo.second;
c_nal_len_ = ninfo.first;
c_nal_off_ = 0;
}
void *send_ptr = nullptr;
size_t send_len = 0;
size_t send_off = 0;
if (c_nal_len_ < MAX_WRITE_SIZE) {
send_len = c_nal_len_;
send_ptr = c_nal_;
send_off = 0;
} else {
int left = c_nal_len_ - c_nal_off_;
if (left < MAX_WRITE_SIZE)
send_len = left;
else
send_len = MAX_WRITE_SIZE;
send_ptr = c_nal_;
send_off = c_nal_off_;
}
memcpy(fTo, (uint8_t *)send_ptr + send_off, send_len);
fFrameSize = send_len;
afterGetting(this);
/* check if we need to change chunk or nal unit */
bool nal_written_fully = (c_nal_len_ <= c_nal_off_ + send_len);
if (nal_written_fully && nals_.empty()) {
c_chunk_ = nullptr;
n_calls_++;
fpt_end_ = std::chrono::high_resolution_clock::now();
diff_total_ += std::chrono::duration_cast<std::chrono::microseconds>(fpt_end_ - fpt_start_).count();
} else {
if (!nal_written_fully) {
c_nal_off_ += send_len;
} else {
c_nal_ = nullptr;
}
}
}
void FramedSourceCustom::printStats()
{
*stop_ = 1;
end_ = std::chrono::high_resolution_clock::now();
uint64_t diff = (uint64_t)std::chrono::duration_cast<std::chrono::milliseconds>(end_ - start_).count();
fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n",
total_size_, total_size_ / 1000, total_size_ / 1000 / 1000,
diff, diff / 1000
);
fprintf(stderr, "n calls %u\n", n_calls_);
fprintf(stderr, "avg processing time of frame: %lu\n", diff_total_ / n_calls_);
}
void FramedSourceCustom::startFramedSource(void *mem, size_t len, char *stop_rtp)
{
mem_ = mem;
len_ = len;
off_ = 0;
n_calls_ = 0;
diff_total_ = 0;
stop_ = stop_rtp;
c_chunk_ = nullptr;
c_chunk_len_ = 0;
c_chunk_off_ = 0;
total_size_ = 0;
uint64_t chunk_size = 0;
for (size_t i = 0, k = 0; i < len && k < 3000; k++) {
memcpy(&chunk_size, (uint8_t *)mem + i, sizeof(uint64_t));
i += sizeof(uint64_t);
chunks_.push(std::make_pair(chunk_size, (uint8_t *)mem_ + i));
i += chunk_size;
total_size_ += chunk_size;
}
start_ = std::chrono::high_resolution_clock::now();
}
void createConnection(
UsageEnvironment *env,
Connection& connection,
std::string sess_addr,
std::string ip_addr,
uint16_t portNum
)
{
sockaddr_in addr, sess;
inet_pton(AF_INET, ip_addr.c_str(), &addr.sin_addr);
inet_pton(AF_INET, sess_addr.c_str(), &sess.sin_addr);
connection.rtpPort = new Port(0);
connection.rtpGroupsock = new Groupsock(*env, sess.sin_addr, addr.sin_addr, *connection.rtpPort);
connection.rtpGroupsock->changeDestinationParameters(addr.sin_addr, portNum, 255);
}

78
util/live555_util.hh Normal file
View File

@ -0,0 +1,78 @@
#pragma once
#include <liveMedia.hh>
#include <FramedSource.hh>
#include <UsageEnvironment.hh>
#include <GroupsockHelper.hh>
#include <BasicUsageEnvironment.hh>
#include <Groupsock.hh>
#include <GroupsockHelper.hh>
#include <cstring>
#include <string>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <queue>
#include <chrono>
#include <mutex>
class FramedSourceCustom : public FramedSource
{
public:
FramedSourceCustom(UsageEnvironment *env);
~FramedSourceCustom();
void startFramedSource(void *mem, size_t len, char *stop_rtp);
virtual void doGetNextFrame();
protected:
virtual void doStopGettingFrames();
private:
void sendFrame();
void printStats();
void splitIntoNals();
EventTriggerId afterEvent_;
bool separateInput_;
bool ending_;
bool removeStartCodes_;
char *stop_;
int n_calls_;
uint64_t diff_total_;
uint64_t total_size_;
void *mem_;
int len_;
int off_;
uint8_t *c_nal_;
size_t c_nal_len_;
size_t c_nal_off_;
std::queue<std::pair<size_t, uint8_t *>> nals_;
uint8_t *c_chunk_;
size_t c_chunk_len_;
size_t c_chunk_off_;
int chunk_ptr_;
std::queue<std::pair<size_t, uint8_t *>> chunks_;
std::mutex mutex_;
TaskToken currentTask_;
std::chrono::high_resolution_clock::time_point start_, end_, fpt_end_, fpt_start_;
bool noMoreTasks_;
};
struct Connection {
Port *rtpPort;
Port *rtcpPort;
Groupsock *rtpGroupsock;
Groupsock *rtcpGroupsock;
};
void createConnection(UsageEnvironment *env, Connection& conn, std::string sess, std::string ip, uint16_t port);

172
util/util.cc Normal file
View File

@ -0,0 +1,172 @@
#include <iostream>
#include <cstdio>
#include <thread>
#include <cstdlib>
#include <cstring>
#include <kvazaar.h>
#include <stdint.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <assert.h>
int kvazaar_encode(char *input, char *output);
void *get_mem(int argc, char **argv, size_t& len)
{
char *input = NULL;
char *output = NULL;
if (argc != 3) {
input = (char *)"util/video.raw";
output = (char *)"util/out.hevc";
} else {
input = argv[1];
output = argv[2];
}
if (access(output, F_OK) == -1) {
(void)kvazaar_encode(input, output);
}
int fd = open(output, O_RDONLY, 0);
struct stat st;
stat(output, &st);
len = st.st_size;
void *mem = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_POPULATE, fd, 0);
madvise(mem, len, MADV_SEQUENTIAL | MADV_WILLNEED);
return mem;
}
int get_next_frame_start(uint8_t *data, uint32_t offset, uint32_t data_len, uint8_t& start_len)
{
uint8_t zeros = 0;
uint32_t pos = 0;
while (offset + pos < data_len) {
if (zeros >= 2 && data[offset + pos] == 1) {
start_len = zeros + 1;
return offset + pos + 1;
}
if (data[offset + pos] == 0)
zeros++;
else
zeros = 0;
pos++;
}
return -1;
}
int kvazaar_encode(char *input, char *output)
{
FILE *inputFile = fopen(input, "r");
FILE *outputFile = fopen(output, "w");
int width = 3840;
int height = 2160;
kvz_encoder* enc = NULL;
const kvz_api * const api = kvz_api_get(8);
kvz_config* config = api->config_alloc();
api->config_init(config);
api->config_parse(config, "preset", "ultrafast");
config->width = width;
config->height = height;
config->hash = kvz_hash::KVZ_HASH_NONE;
config->intra_period = 64;
config->qp = 21;
config->framerate_num = 120;
config->framerate_denom = 1;
enc = api->encoder_open(config);
if (!enc) {
fprintf(stderr, "Failed to open encoder.\n");
return EXIT_FAILURE;
}
kvz_picture *img_in[16];
for (uint32_t i = 0; i < 16; ++i) {
img_in[i] = api->picture_alloc_csp(KVZ_CSP_420, width, height);
}
uint8_t inputCounter = 0;
uint8_t outputCounter = 0;
bool done = false;
/* int r = 0; */
while (!done) {
kvz_data_chunk* chunks_out = NULL;
kvz_picture *img_rec = NULL;
kvz_picture *img_src = NULL;
uint32_t len_out = 0;
kvz_frame_info info_out;
if (!fread(img_in[inputCounter]->y, width*height, 1, inputFile)) {
done = true;
continue;
}
if (!fread(img_in[inputCounter]->u, width*height>>2, 1, inputFile)) {
done = true;
continue;
}
if (!fread(img_in[inputCounter]->v, width*height>>2, 1, inputFile)) {
done = true;
continue;
}
if (!api->encoder_encode(enc,
img_in[inputCounter],
&chunks_out, &len_out, &img_rec, &img_src, &info_out))
{
fprintf(stderr, "Failed to encode image.\n");
for (uint32_t i = 0; i < 16; i++) {
api->picture_free(img_in[i]);
}
return EXIT_FAILURE;
}
inputCounter = (inputCounter + 1) % 16;
if (chunks_out == NULL && img_in == NULL) {
// We are done since there is no more input and output left.
goto cleanup;
}
if (chunks_out != NULL) {
uint64_t written = 0;
// Write data into the output file.
for (kvz_data_chunk *chunk = chunks_out; chunk != NULL; chunk = chunk->next) {
written += chunk->len;
}
fprintf(stderr, "write chunk size: %lu\n", written);
fwrite(&written, sizeof(uint64_t), 1, outputFile);
for (kvz_data_chunk *chunk = chunks_out; chunk != NULL; chunk = chunk->next) {
fwrite(chunk->data, chunk->len, 1, outputFile);
}
outputCounter = (outputCounter + 1) % 16;
/* if (++r > 5) */
/* goto cleanup; */
}
}
cleanup:
fclose(inputFile);
fclose(outputFile);
return 0;
}

View File

@ -0,0 +1,41 @@
#include <uvgrtp/lib.hh>
#include <uvgrtp/clock.hh>
#include <cstring>
#include <algorithm>
size_t nframes = 0;
void hook_receiver(void *arg, uvg_rtp::frame::rtp_frame *frame)
{
auto hevc = (uvg_rtp::media_stream *)arg;
hevc->push_frame(frame->payload, frame->payload_len, 0);
nframes++;
}
int receiver(void)
{
uvg_rtp::context rtp_ctx;
std::string addr("10.21.25.200");
auto sess = rtp_ctx.create_session(addr);
auto hevc = sess->create_stream(
8889,
8888,
RTP_FORMAT_HEVC,
RCE_SYSTEM_CALL_DISPATCHER
);
hevc->install_receive_hook(hevc, hook_receiver);
while (nframes != 602)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return 0;
}
int main(int argc, char **argv)
{
(void)argc, (void)argv;
return receiver();
}

110
uvgrtp/latency_sender.cc Normal file
View File

@ -0,0 +1,110 @@
#include <uvgrtp/lib.hh>
#include <uvgrtp/clock.hh>
#include <cstring>
#include <algorithm>
extern void *get_mem(int argc, char **argv, size_t& len);
std::chrono::high_resolution_clock::time_point start2;
size_t frames = 0;
size_t ninters = 0;
size_t nintras = 0;
size_t total = 0;
size_t total_intra = 0;
size_t total_inter = 0;
static void hook_sender(void *arg, uvg_rtp::frame::rtp_frame *frame)
{
(void)arg, (void)frame;
if (frame) {
uint64_t diff = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start2
).count();
switch ((frame->payload[0] >> 1) & 0x3f) {
case 19:
total += (diff / 1000);
total_intra += (diff / 1000);
nintras++, frames++;
break;
case 1:
total += (diff / 1000);
total_inter += (diff / 1000);
ninters++, frames++;
break;
}
}
}
static int sender(void)
{
size_t len = 0;
void *mem = get_mem(0, NULL, len);
uint64_t csize = 0;
uint64_t diff = 0;
uint64_t current = 0;
uint64_t chunk_size = 0;
uint64_t period = (uint64_t)((1000 / (float)30) * 1000);
rtp_error_t ret = RTP_OK;
std::string addr("10.21.25.2");
uvg_rtp::context rtp_ctx;
auto sess = rtp_ctx.create_session(addr);
auto hevc = sess->create_stream(
8888,
8889,
RTP_FORMAT_HEVC,
RCE_SYSTEM_CALL_DISPATCHER
);
hevc->install_receive_hook(nullptr, hook_sender);
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
for (int rounds = 0; rounds < 1; ++rounds) {
for (size_t offset = 0, k = 0; offset < len; ++k) {
memcpy(&chunk_size, (uint8_t *)mem + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
start2 = std::chrono::high_resolution_clock::now();
if ((ret = hevc->push_frame((uint8_t *)mem + offset, chunk_size, 0)) != RTP_OK) {
fprintf(stderr, "push_frame() failed!\n");
for (;;);
}
auto runtime = (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start
).count();
if (runtime < current * period)
std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime));
current += 1;
offset += chunk_size;
}
}
rtp_ctx.destroy_session(sess);
fprintf(stderr, "%zu: intra %lf, inter %lf, avg %lf\n",
frames,
total_intra / (float)nintras,
total_inter / (float)ninters,
total / (float)frames
);
return 0;
}
int main(int argc, char **argv)
{
(void)argc, (void)argv;
return sender();
}

89
uvgrtp/receiver.cc Normal file
View File

@ -0,0 +1,89 @@
#include <uvgrtp/lib.hh>
#include <uvgrtp/clock.hh>
#include <cstring>
#include <algorithm>
struct thread_info {
size_t pkts;
size_t bytes;
std::chrono::high_resolution_clock::time_point start;
std::chrono::high_resolution_clock::time_point last;
} *thread_info;
std::atomic<int> nready(0);
void hook(void *arg, uvg_rtp::frame::rtp_frame *frame)
{
int tid = *(int *)arg;
if (thread_info[tid].pkts == 0)
thread_info[tid].start = std::chrono::high_resolution_clock::now();
/* receiver returns NULL to indicate that it has not received a frame in 10s
* and the sender has likely stopped sending frames long time ago so the benchmark
* can proceed to next run and ma*/
if (!frame) {
fprintf(stderr, "discard %zu %zu %lu\n", thread_info[tid].bytes, thread_info[tid].pkts,
std::chrono::duration_cast<std::chrono::milliseconds>(
thread_info[tid].last - thread_info[tid].start
).count()
);
nready++;
while (1)
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
thread_info[tid].last = std::chrono::high_resolution_clock::now();
thread_info[tid].bytes += frame->payload_len;
(void)uvg_rtp::frame::dealloc_frame(frame);
if (++thread_info[tid].pkts == 602) {
fprintf(stderr, "%zu %zu %lu\n", thread_info[tid].bytes, thread_info[tid].pkts,
std::chrono::duration_cast<std::chrono::milliseconds>(
thread_info[tid].last - thread_info[tid].start
).count()
);
nready++;
}
}
void thread_func(char *addr, int thread_num)
{
std::string addr_("10.21.25.200");
uvg_rtp::context rtp_ctx;
auto sess = rtp_ctx.create_session(addr_);
auto hevc = sess->create_stream(
8888 + thread_num,
8889 + thread_num,
RTP_FORMAT_HEVC,
0
);
int tid = thread_num / 2;
hevc->install_receive_hook(&tid, hook);
for (;;)
std::this_thread::sleep_for(std::chrono::milliseconds(200));
rtp_ctx.destroy_session(sess);
}
int main(int argc, char **argv)
{
if (argc != 3) {
fprintf(stderr, "usage: ./%s <remote address> <number of threads>\n", __FILE__);
return -1;
}
int nthreads = atoi(argv[2]);
thread_info = (struct thread_info *)calloc(nthreads, sizeof(*thread_info));
for (int i = 0; i < nthreads; ++i)
new std::thread(thread_func, argv[1], i * 2);
while (nready.load() != nthreads)
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}

97
uvgrtp/sender.cc Normal file
View File

@ -0,0 +1,97 @@
#include <uvgrtp/lib.hh>
#include <uvgrtp/clock.hh>
#include <cstring>
#include <algorithm>
#define MAX(a, b) (((int)(a) > (int)(b)) ? (int)(a) : (int)(b))
extern void *get_mem(int argc, char **argv, size_t& len);
std::atomic<int> nready(0);
void thread_func(void *mem, size_t len, char *addr, int thread_num, double fps, bool strict)
{
size_t bytes_sent = 0;
uint64_t chunk_size = 0;
uint64_t total_size = 0;
uint64_t diff = 0;
uint64_t current = 0;
uint64_t period = (uint64_t)((1000 / (float)fps) * 1000);
rtp_error_t ret = RTP_OK;
std::string addr_("10.21.25.2");
uvg_rtp::context rtp_ctx;
auto sess = rtp_ctx.create_session(addr_);
auto hevc = sess->create_stream(
8889 + thread_num,
8888 + thread_num,
RTP_FORMAT_HEVC,
RCE_SYSTEM_CALL_DISPATCHER
);
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
for (int rounds = 0; rounds < 1; ++rounds) {
for (size_t offset = 0, k = 0; offset < len; ++k) {
memcpy(&chunk_size, (uint8_t *)mem + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
total_size += chunk_size;
if ((ret = hevc->push_frame((uint8_t *)mem + offset, chunk_size, 0)) != RTP_OK) {
fprintf(stderr, "push_frame() failed!\n");
for (;;);
}
auto runtime = (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start
).count();
if (runtime < current * period)
std::this_thread::sleep_for(std::chrono::microseconds(current * period - runtime));
current += 1;
offset += chunk_size;
bytes_sent += chunk_size;
}
}
rtp_ctx.destroy_session(sess);
auto end = std::chrono::high_resolution_clock::now();
diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
fprintf(stderr, "%lu bytes, %lu kB, %lu MB took %lu ms %lu s\n",
bytes_sent, bytes_sent / 1000, bytes_sent / 1000 / 1000,
diff, diff / 1000
);
end:
nready++;
}
int main(int argc, char **argv)
{
if (argc != 5) {
fprintf(stderr, "usage: ./%s <remote address> <number of threads> <fps> <mode>\n", __FILE__);
return -1;
}
size_t len = 0;
void *mem = get_mem(0, NULL, len);
int nthreads = atoi(argv[2]);
bool strict = !strcmp(argv[4], "strict");
std::thread **threads = (std::thread **)malloc(sizeof(std::thread *) * nthreads);
for (int i = 0; i < nthreads; ++i)
threads[i] = new std::thread(thread_func, mem, len, argv[1], i * 2, atof(argv[3]), strict);
while (nready.load() != nthreads)
std::this_thread::sleep_for(std::chrono::milliseconds(20));
for (int i = 0; i < nthreads; ++i) {
threads[i]->join();
delete threads[i];
}
free(threads);
}