Implement RTCP packet handler

This commit is contained in:
Aaro Altonen 2020-08-10 09:18:53 +03:00
parent 863d3a825e
commit 034315a013
2 changed files with 57 additions and 61 deletions

View File

@ -131,6 +131,9 @@ namespace uvg_rtp {
* + clock rate for calculating the correct increment */ * + clock rate for calculating the correct increment */
void set_sender_ts_info(uint64_t clock_start, uint32_t clock_rate, uint32_t rtp_ts_start); void set_sender_ts_info(uint64_t clock_start, uint32_t clock_rate, uint32_t rtp_ts_start);
/* Update various session statistics */
void update_session_statistics(uvg_rtp::frame::rtp_frame *frame);
/* Return SSRCs of all participants */ /* Return SSRCs of all participants */
std::vector<uint32_t> get_participants(); std::vector<uint32_t> get_participants();
@ -160,15 +163,15 @@ namespace uvg_rtp {
bool collision_detected(uint32_t ssrc, sockaddr_in& src_addr); bool collision_detected(uint32_t ssrc, sockaddr_in& src_addr);
/* Move participant from initial_peers_ to participants_ */ /* Move participant from initial_peers_ to participants_ */
void add_participant(uint32_t ssrc); rtp_error_t add_participant(uint32_t ssrc);
/* We've got a message from new source (the SSRC of the frame is not known to us) /* We've got a message from new source (the SSRC of the frame is not known to us)
* Initialize statistics for the peer and move it to participants_ */ * Initialize statistics for the peer and move it to participants_ */
void init_new_participant(uvg_rtp::frame::rtp_frame *frame); rtp_error_t init_new_participant(uvg_rtp::frame::rtp_frame *frame);
/* Initialize the RTP Sequence related stuff of peer /* Initialize the RTP Sequence related stuff of peer
* This function assumes that the peer already exists in the participants_ map */ * This function assumes that the peer already exists in the participants_ map */
void init_participant_seq(uint32_t ssrc, uint16_t base_seq); rtp_error_t init_participant_seq(uint32_t ssrc, uint16_t base_seq);
/* Update the SSRC's sequence related data in participants_ map /* Update the SSRC's sequence related data in participants_ map
* *

View File

@ -78,9 +78,9 @@ rtp_error_t uvg_rtp::rtcp::add_participant(std::string dst_addr, uint16_t dst_po
LOG_ERROR("Failed to make the socket non-blocking!"); LOG_ERROR("Failed to make the socket non-blocking!");
#endif #endif
/* Set read timeout (5s for now) /* Set read timeout (5s for now)
* *
* This means that the socket is listened for 5s at a time and after the timeout, * This means that the socket is listened for 5s at a time and after the timeout,
* Send Report is sent to all participants */ * Send Report is sent to all participants */
struct timeval tv; struct timeval tv;
tv.tv_sec = 3; tv.tv_sec = 3;
@ -104,16 +104,26 @@ rtp_error_t uvg_rtp::rtcp::add_participant(std::string dst_addr, uint16_t dst_po
return RTP_OK; return RTP_OK;
} }
void uvg_rtp::rtcp::add_participant(uint32_t ssrc) rtp_error_t uvg_rtp::rtcp::add_participant(uint32_t ssrc)
{ {
participants_[ssrc] = initial_participants_.back(); /* RTCP is not in use for this media stream,
initial_participants_.pop_back(); * create a "fake" participant that is only used for storing statistics information */
if (initial_participants_.empty()) {
if (!(participants_[ssrc] = new struct participant))
return RTP_MEMORY_ERROR;
zero_stats(&participants_[ssrc]->stats);
} else {
participants_[ssrc] = initial_participants_.back();
initial_participants_.pop_back();
}
num_receivers_++; num_receivers_++;
participants_[ssrc]->r_frame = nullptr; participants_[ssrc]->r_frame = nullptr;
participants_[ssrc]->s_frame = nullptr; participants_[ssrc]->s_frame = nullptr;
participants_[ssrc]->sdes_frame = nullptr; participants_[ssrc]->sdes_frame = nullptr;
participants_[ssrc]->app_frame = nullptr; participants_[ssrc]->app_frame = nullptr;
return RTP_OK;
} }
void uvg_rtp::rtcp::update_rtcp_bandwidth(size_t pkt_size) void uvg_rtp::rtcp::update_rtcp_bandwidth(size_t pkt_size)
@ -123,7 +133,6 @@ void uvg_rtp::rtcp::update_rtcp_bandwidth(size_t pkt_size)
avg_rtcp_pkt_pize_ = rtcp_byte_count_ / rtcp_pkt_count_; avg_rtcp_pkt_pize_ = rtcp_byte_count_ / rtcp_pkt_count_;
} }
void uvg_rtp::rtcp::zero_stats(uvg_rtp::rtcp::statistics *stats) void uvg_rtp::rtcp::zero_stats(uvg_rtp::rtcp::statistics *stats)
{ {
stats->received_pkts = 0; stats->received_pkts = 0;
@ -273,12 +282,15 @@ void uvg_rtp::rtcp::receiver_inc_sent_pkts(uint32_t sender_ssrc, size_t n)
LOG_WARN("Got RTP packet from unknown source: 0x%x", sender_ssrc); LOG_WARN("Got RTP packet from unknown source: 0x%x", sender_ssrc);
} }
void uvg_rtp::rtcp::init_new_participant(uvg_rtp::frame::rtp_frame *frame) rtp_error_t uvg_rtp::rtcp::init_new_participant(uvg_rtp::frame::rtp_frame *frame)
{ {
assert(frame); rtp_error_t ret;
uvg_rtp::rtcp::add_participant(frame->header.ssrc); if ((ret = uvg_rtp::rtcp::add_participant(frame->header.ssrc)) != RTP_OK)
uvg_rtp::rtcp::init_participant_seq(frame->header.ssrc, frame->header.seq); return ret;
if ((ret = uvg_rtp::rtcp::init_participant_seq(frame->header.ssrc, frame->header.seq)) != RTP_OK)
return ret;
/* Set the probation to MIN_SEQUENTIAL (2) /* Set the probation to MIN_SEQUENTIAL (2)
* *
@ -292,16 +304,20 @@ void uvg_rtp::rtcp::init_new_participant(uvg_rtp::frame::rtp_frame *frame)
participants_[frame->header.ssrc]->stats.initial_ntp = uvg_rtp::clock::ntp::now(); participants_[frame->header.ssrc]->stats.initial_ntp = uvg_rtp::clock::ntp::now();
senders_++; senders_++;
return ret;
} }
void uvg_rtp::rtcp::init_participant_seq(uint32_t ssrc, uint16_t base_seq) rtp_error_t uvg_rtp::rtcp::init_participant_seq(uint32_t ssrc, uint16_t base_seq)
{ {
if (participants_.find(ssrc) == participants_.end()) if (participants_.find(ssrc) == participants_.end())
return; return RTP_NOT_FOUND;
participants_[ssrc]->stats.base_seq = base_seq; participants_[ssrc]->stats.base_seq = base_seq;
participants_[ssrc]->stats.max_seq = base_seq; participants_[ssrc]->stats.max_seq = base_seq;
participants_[ssrc]->stats.bad_seq = (uint16_t)RTP_SEQ_MOD + 1; participants_[ssrc]->stats.bad_seq = (uint16_t)RTP_SEQ_MOD + 1;
return RTP_OK;
} }
rtp_error_t uvg_rtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq) rtp_error_t uvg_rtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq)
@ -404,49 +420,8 @@ bool uvg_rtp::rtcp::collision_detected(uint32_t ssrc, sockaddr_in& src_addr)
return false; return false;
} }
rtp_error_t uvg_rtp::rtcp::receiver_update_stats(uvg_rtp::frame::rtp_frame *frame) void uvg_rtp::rtcp::update_session_statistics(uvg_rtp::frame::rtp_frame *frame)
{ {
if (!frame)
return RTP_OK;
if (uvg_rtp::rtcp::collision_detected(frame->header.ssrc, frame->src_addr)) {
LOG_WARN("collision detected, packet must be dropped");
/* check if the SSRC of remote is ours, we need to send RTCP BYE
* and reinitialize ourselves */
if (frame->header.ssrc == ssrc_) {
terminate_self();
participants_.erase(ssrc_);
return RTP_SSRC_COLLISION;
}
return RTP_INVALID_VALUE;
}
/* RTCP runner is not running so we don't need to do any other checks,
* just create new participant so we can check for SSRC collisions */
if (!runner_) {
if (participants_.find(frame->header.ssrc) == participants_.end()) {
auto participant = new struct participant;
participant->address = frame->src_addr;
participant->socket = nullptr;
participants_[frame->header.ssrc] = participant;
}
return RTP_OK;
}
if (!uvg_rtp::rtcp::is_participant(frame->header.ssrc)) {
LOG_WARN("Got RTP packet from unknown source: 0x%x", frame->header.ssrc);
init_new_participant(frame);
}
if (uvg_rtp::rtcp::update_participant_seq(frame->header.ssrc, frame->header.seq) != RTP_OK) {
LOG_WARN("Invalid packet received from remote!");
return RTP_INVALID_VALUE;
}
auto p = participants_[frame->header.ssrc]; auto p = participants_[frame->header.ssrc];
p->stats.received_pkts += 1; p->stats.received_pkts += 1;
@ -470,8 +445,6 @@ rtp_error_t uvg_rtp::rtcp::receiver_update_stats(uvg_rtp::frame::rtp_frame *fram
p->stats.transit = transit; p->stats.transit = transit;
p->stats.jitter += (1.f / 16.f) * ((double)d - p->stats.jitter); p->stats.jitter += (1.f / 16.f) * ((double)d - p->stats.jitter);
return RTP_OK;
} }
rtp_error_t uvg_rtp::rtcp::generate_report() rtp_error_t uvg_rtp::rtcp::generate_report()
@ -532,7 +505,7 @@ rtp_error_t uvg_rtp::rtcp::handle_incoming_packet(uint8_t *buffer, size_t size)
LOG_ERROR("Cannot handle padded packets!"); LOG_ERROR("Cannot handle padded packets!");
return RTP_INVALID_VALUE; return RTP_INVALID_VALUE;
} }
if (header->pkt_type > uvg_rtp::frame::RTCP_FT_BYE || if (header->pkt_type > uvg_rtp::frame::RTCP_FT_BYE ||
header->pkt_type < uvg_rtp::frame::RTCP_FT_SR) { header->pkt_type < uvg_rtp::frame::RTCP_FT_SR) {
LOG_ERROR("Invalid packet type (%u)!", header->pkt_type); LOG_ERROR("Invalid packet type (%u)!", header->pkt_type);
@ -615,7 +588,27 @@ void uvg_rtp::rtcp::rtcp_runner(uvg_rtp::rtcp *rtcp)
* is detected, the RTP context is updated */ * is detected, the RTP context is updated */
rtp_error_t uvg_rtp::rtcp::packet_handler(void *arg, int flags, frame::rtp_frame **out) rtp_error_t uvg_rtp::rtcp::packet_handler(void *arg, int flags, frame::rtp_frame **out)
{ {
(void)arg, (void)flags, (void)out; rtp_error_t ret;
uvg_rtp::frame::rtp_frame *frame = *out;
uvg_rtp::rtcp *rtcp = (uvg_rtp::rtcp *)arg;
/* If this is the first packet from remote, move the participant from initial_participants_
* to participants_, initialize its state and put it on probation until enough valid
* packets from them have been received
*
* Otherwise update and monitor the received sequence numbers to determine whether something
* has gone awry with the sender's sequence number calculations/delivery of packets */
if (!rtcp->is_participant(frame->header.ssrc)) {
if ((ret = rtcp->init_new_participant(frame)) != RTP_OK)
return RTP_GENERIC_ERROR;
} else if (rtcp->update_participant_seq(frame->header.ssrc, frame->header.seq) != RTP_OK) {
return RTP_GENERIC_ERROR;
}
/* Finally update the jitter/transit/received/dropped bytes/pkts statistics */
rtcp->update_session_statistics(frame);
/* Even though RTCP collects information from the packet, this is not the packet's final destination.
* Thus return RTP_PKT_NOT_HANDLED to indicate that the packet should be passed on to other handlers */
return RTP_PKT_NOT_HANDLED; return RTP_PKT_NOT_HANDLED;
} }