Implement RTCP packet handler

This commit is contained in:
Aaro Altonen 2020-08-10 09:18:53 +03:00
parent d32b2b82b9
commit c05eadbced
2 changed files with 57 additions and 61 deletions

View File

@ -131,6 +131,9 @@ namespace uvg_rtp {
* + clock rate for calculating the correct increment */
void set_sender_ts_info(uint64_t clock_start, uint32_t clock_rate, uint32_t rtp_ts_start);
/* Update various session statistics */
void update_session_statistics(uvg_rtp::frame::rtp_frame *frame);
/* Return SSRCs of all participants */
std::vector<uint32_t> get_participants();
@ -160,15 +163,15 @@ namespace uvg_rtp {
bool collision_detected(uint32_t ssrc, sockaddr_in& src_addr);
/* Move participant from initial_peers_ to participants_ */
void add_participant(uint32_t ssrc);
rtp_error_t add_participant(uint32_t ssrc);
/* We've got a message from new source (the SSRC of the frame is not known to us)
* Initialize statistics for the peer and move it to participants_ */
void init_new_participant(uvg_rtp::frame::rtp_frame *frame);
rtp_error_t init_new_participant(uvg_rtp::frame::rtp_frame *frame);
/* Initialize the RTP Sequence related stuff of peer
* This function assumes that the peer already exists in the participants_ map */
void init_participant_seq(uint32_t ssrc, uint16_t base_seq);
rtp_error_t init_participant_seq(uint32_t ssrc, uint16_t base_seq);
/* Update the SSRC's sequence related data in participants_ map
*

View File

@ -78,9 +78,9 @@ rtp_error_t uvg_rtp::rtcp::add_participant(std::string dst_addr, uint16_t dst_po
LOG_ERROR("Failed to make the socket non-blocking!");
#endif
/* Set read timeout (5s for now)
/* Set read timeout (5s for now)
*
* This means that the socket is listened for 5s at a time and after the timeout,
* This means that the socket is listened for 5s at a time and after the timeout,
* Send Report is sent to all participants */
struct timeval tv;
tv.tv_sec = 3;
@ -104,16 +104,26 @@ rtp_error_t uvg_rtp::rtcp::add_participant(std::string dst_addr, uint16_t dst_po
return RTP_OK;
}
void uvg_rtp::rtcp::add_participant(uint32_t ssrc)
rtp_error_t uvg_rtp::rtcp::add_participant(uint32_t ssrc)
{
participants_[ssrc] = initial_participants_.back();
initial_participants_.pop_back();
/* RTCP is not in use for this media stream,
* create a "fake" participant that is only used for storing statistics information */
if (initial_participants_.empty()) {
if (!(participants_[ssrc] = new struct participant))
return RTP_MEMORY_ERROR;
zero_stats(&participants_[ssrc]->stats);
} else {
participants_[ssrc] = initial_participants_.back();
initial_participants_.pop_back();
}
num_receivers_++;
participants_[ssrc]->r_frame = nullptr;
participants_[ssrc]->s_frame = nullptr;
participants_[ssrc]->sdes_frame = nullptr;
participants_[ssrc]->app_frame = nullptr;
return RTP_OK;
}
void uvg_rtp::rtcp::update_rtcp_bandwidth(size_t pkt_size)
@ -123,7 +133,6 @@ void uvg_rtp::rtcp::update_rtcp_bandwidth(size_t pkt_size)
avg_rtcp_pkt_pize_ = rtcp_byte_count_ / rtcp_pkt_count_;
}
void uvg_rtp::rtcp::zero_stats(uvg_rtp::rtcp::statistics *stats)
{
stats->received_pkts = 0;
@ -273,12 +282,15 @@ void uvg_rtp::rtcp::receiver_inc_sent_pkts(uint32_t sender_ssrc, size_t n)
LOG_WARN("Got RTP packet from unknown source: 0x%x", sender_ssrc);
}
void uvg_rtp::rtcp::init_new_participant(uvg_rtp::frame::rtp_frame *frame)
rtp_error_t uvg_rtp::rtcp::init_new_participant(uvg_rtp::frame::rtp_frame *frame)
{
assert(frame);
rtp_error_t ret;
uvg_rtp::rtcp::add_participant(frame->header.ssrc);
uvg_rtp::rtcp::init_participant_seq(frame->header.ssrc, frame->header.seq);
if ((ret = uvg_rtp::rtcp::add_participant(frame->header.ssrc)) != RTP_OK)
return ret;
if ((ret = uvg_rtp::rtcp::init_participant_seq(frame->header.ssrc, frame->header.seq)) != RTP_OK)
return ret;
/* Set the probation to MIN_SEQUENTIAL (2)
*
@ -292,16 +304,20 @@ void uvg_rtp::rtcp::init_new_participant(uvg_rtp::frame::rtp_frame *frame)
participants_[frame->header.ssrc]->stats.initial_ntp = uvg_rtp::clock::ntp::now();
senders_++;
return ret;
}
void uvg_rtp::rtcp::init_participant_seq(uint32_t ssrc, uint16_t base_seq)
rtp_error_t uvg_rtp::rtcp::init_participant_seq(uint32_t ssrc, uint16_t base_seq)
{
if (participants_.find(ssrc) == participants_.end())
return;
return RTP_NOT_FOUND;
participants_[ssrc]->stats.base_seq = base_seq;
participants_[ssrc]->stats.max_seq = base_seq;
participants_[ssrc]->stats.bad_seq = (uint16_t)RTP_SEQ_MOD + 1;
return RTP_OK;
}
rtp_error_t uvg_rtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq)
@ -404,49 +420,8 @@ bool uvg_rtp::rtcp::collision_detected(uint32_t ssrc, sockaddr_in& src_addr)
return false;
}
rtp_error_t uvg_rtp::rtcp::receiver_update_stats(uvg_rtp::frame::rtp_frame *frame)
void uvg_rtp::rtcp::update_session_statistics(uvg_rtp::frame::rtp_frame *frame)
{
if (!frame)
return RTP_OK;
if (uvg_rtp::rtcp::collision_detected(frame->header.ssrc, frame->src_addr)) {
LOG_WARN("collision detected, packet must be dropped");
/* check if the SSRC of remote is ours, we need to send RTCP BYE
* and reinitialize ourselves */
if (frame->header.ssrc == ssrc_) {
terminate_self();
participants_.erase(ssrc_);
return RTP_SSRC_COLLISION;
}
return RTP_INVALID_VALUE;
}
/* RTCP runner is not running so we don't need to do any other checks,
* just create new participant so we can check for SSRC collisions */
if (!runner_) {
if (participants_.find(frame->header.ssrc) == participants_.end()) {
auto participant = new struct participant;
participant->address = frame->src_addr;
participant->socket = nullptr;
participants_[frame->header.ssrc] = participant;
}
return RTP_OK;
}
if (!uvg_rtp::rtcp::is_participant(frame->header.ssrc)) {
LOG_WARN("Got RTP packet from unknown source: 0x%x", frame->header.ssrc);
init_new_participant(frame);
}
if (uvg_rtp::rtcp::update_participant_seq(frame->header.ssrc, frame->header.seq) != RTP_OK) {
LOG_WARN("Invalid packet received from remote!");
return RTP_INVALID_VALUE;
}
auto p = participants_[frame->header.ssrc];
p->stats.received_pkts += 1;
@ -470,8 +445,6 @@ rtp_error_t uvg_rtp::rtcp::receiver_update_stats(uvg_rtp::frame::rtp_frame *fram
p->stats.transit = transit;
p->stats.jitter += (1.f / 16.f) * ((double)d - p->stats.jitter);
return RTP_OK;
}
rtp_error_t uvg_rtp::rtcp::generate_report()
@ -532,7 +505,7 @@ rtp_error_t uvg_rtp::rtcp::handle_incoming_packet(uint8_t *buffer, size_t size)
LOG_ERROR("Cannot handle padded packets!");
return RTP_INVALID_VALUE;
}
if (header->pkt_type > uvg_rtp::frame::RTCP_FT_BYE ||
header->pkt_type < uvg_rtp::frame::RTCP_FT_SR) {
LOG_ERROR("Invalid packet type (%u)!", header->pkt_type);
@ -615,7 +588,27 @@ void uvg_rtp::rtcp::rtcp_runner(uvg_rtp::rtcp *rtcp)
* is detected, the RTP context is updated */
rtp_error_t uvg_rtp::rtcp::packet_handler(void *arg, int flags, frame::rtp_frame **out)
{
(void)arg, (void)flags, (void)out;
rtp_error_t ret;
uvg_rtp::frame::rtp_frame *frame = *out;
uvg_rtp::rtcp *rtcp = (uvg_rtp::rtcp *)arg;
/* If this is the first packet from remote, move the participant from initial_participants_
* to participants_, initialize its state and put it on probation until enough valid
* packets from them have been received
*
* Otherwise update and monitor the received sequence numbers to determine whether something
* has gone awry with the sender's sequence number calculations/delivery of packets */
if (!rtcp->is_participant(frame->header.ssrc)) {
if ((ret = rtcp->init_new_participant(frame)) != RTP_OK)
return RTP_GENERIC_ERROR;
} else if (rtcp->update_participant_seq(frame->header.ssrc, frame->header.seq) != RTP_OK) {
return RTP_GENERIC_ERROR;
}
/* Finally update the jitter/transit/received/dropped bytes/pkts statistics */
rtcp->update_session_statistics(frame);
/* Even though RTCP collects information from the packet, this is not the packet's final destination.
* Thus return RTP_PKT_NOT_HANDLED to indicate that the packet should be passed on to other handlers */
return RTP_PKT_NOT_HANDLED;
}