From ad4edc9747d13a5e93d21ff41abe49bdac9935ee Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Tue, 11 Aug 2020 05:16:19 +0300 Subject: [PATCH] Refactor RTCP code Move RTCP runner related code to a separate file for clarity and reorder code in src/rtcp.cc to move related code close to each other --- include/rtcp.hh | 123 ++++++------ src/rtcp.cc | 445 +++++++++++++++---------------------------- src/rtcp/app.cc | 9 + src/rtcp/module.mk | 3 +- src/rtcp/receiver.cc | 9 + src/rtcp/runner.cc | 62 ++++++ src/rtcp/sdes.cc | 9 + src/rtcp/sender.cc | 15 +- 8 files changed, 328 insertions(+), 347 deletions(-) create mode 100644 src/rtcp/runner.cc diff --git a/include/rtcp.hh b/include/rtcp.hh index 0fdafcd..87a296b 100644 --- a/include/rtcp.hh +++ b/include/rtcp.hh @@ -15,6 +15,11 @@ namespace uvg_rtp { class connection; + enum ROLE { + RECEIVER, + SENDER + }; + /* TODO: explain these constants */ const int RTP_SEQ_MOD = 1 << 16; const int MIN_SEQUENTIAL = 2; @@ -22,6 +27,49 @@ namespace uvg_rtp { const int MAX_MISORDER = 100; const int MIN_TIMEOUT = 5000; + struct rtcp_statistics { + /* receiver stats */ + uint32_t received_pkts; /* Number of packets received */ + uint32_t dropped_pkts; /* Number of dropped RTP packets */ + uint32_t received_bytes; /* Number of bytes received excluding RTP Header */ + + /* sender stats */ + uint32_t sent_pkts; /* Number of sent RTP packets */ + uint32_t sent_bytes; /* Number of sent bytes excluding RTP Header */ + + uint32_t jitter; /* TODO: */ + uint32_t transit; /* TODO: */ + + /* Receiver clock related stuff */ + uint64_t initial_ntp; /* Wallclock reading when the first RTP packet was received */ + uint32_t initial_rtp; /* RTP timestamp of the first RTP packet received */ + uint32_t clock_rate; /* Rate of the clock (used for jitter calculations) */ + + uint32_t lsr; /* Middle 32 bits of the 64-bit NTP timestamp of previous SR */ + uvg_rtp::clock::hrc::hrc_t sr_ts; /* When the last SR was received (used to calculate delay) */ + + uint16_t max_seq; /* Highest sequence number received */ + uint16_t base_seq; /* First sequence number received */ + uint16_t bad_seq; /* TODO: */ + uint16_t cycles; /* Number of sequence cycles */ + }; + + struct rtcp_participant { + uvg_rtp::socket *socket; /* socket associated with this participant */ + sockaddr_in address; /* address of the participant */ + struct rtcp_statistics stats; /* RTCP session statistics of the participant */ + + int probation; /* has the participant been fully accepted to the session */ + int role; /* is the participant a sender or a receiver */ + + /* Save the latest RTCP packets received from this participant + * Users can query these packets using the SSRC of participant */ + uvg_rtp::frame::rtcp_sender_frame *s_frame; + uvg_rtp::frame::rtcp_receiver_frame *r_frame; + uvg_rtp::frame::rtcp_sdes_frame *sdes_frame; + uvg_rtp::frame::rtcp_app_frame *app_frame; + }; + class rtcp : public runner { public: rtcp(uint32_t ssrc, bool receiver); @@ -87,7 +135,7 @@ namespace uvg_rtp { /* create RTCP BYE packet using our own SSRC and send it to all participants */ rtp_error_t terminate_self(); - /* TODO: */ + /* Return a reference to vector that contains the sockets of all participants */ std::vector& get_sockets(); /* Somebody joined the multicast group the owner of this RTCP instance is part of @@ -137,7 +185,9 @@ namespace uvg_rtp { /* Return SSRCs of all participants */ std::vector get_participants(); - /* TODO: */ + /* Alternate way to get RTCP packets is to install a hook for them. So instead of + * polling an RTCP packet, user can install a function that is called when + * a specific RTCP packet is received. */ rtp_error_t install_sender_hook(void (*hook)(uvg_rtp::frame::rtcp_sender_frame *)); rtp_error_t install_receiver_hook(void (*hook)(uvg_rtp::frame::rtcp_receiver_frame *)); rtp_error_t install_sdes_hook(void (*hook)(uvg_rtp::frame::rtcp_sdes_frame *)); @@ -193,7 +243,17 @@ namespace uvg_rtp { rtp_error_t generate_sender_report(); rtp_error_t generate_receiver_report(); - bool receiver_; + /* Because struct statistics contains uvgRTP clock object we cannot + * zero it out without compiler complaining about it so all the fields + * must be set to zero manually */ + void zero_stats(uvg_rtp::rtcp_statistics *stats); + + /* Pointer to RTP context from which clock rate etc. info is collected and which is + * used to change SSRC if a collision is detected */ + uvg_rtp::rtp *rtp_; + + /* are we a sender or a receiver */ + int our_role_; /* TODO: time_t?? */ size_t tp_; /* the last time an RTCP packet was transmitted */ @@ -238,57 +298,15 @@ namespace uvg_rtp { /* The first value of RTP timestamp (aka t = 0) */ uint32_t rtp_ts_start_; - struct statistics { - /* receiver stats */ - uint32_t received_pkts; /* Number of packets received */ - uint32_t dropped_pkts; /* Number of dropped RTP packets */ - uint32_t received_bytes; /* Number of bytes received excluding RTP Header */ - - /* sender stats */ - uint32_t sent_pkts; /* Number of sent RTP packets */ - uint32_t sent_bytes; /* Number of sent bytes excluding RTP Header */ - - uint32_t jitter; /* TODO: */ - uint32_t transit; /* TODO: */ - - /* Receiver clock related stuff */ - uint64_t initial_ntp; /* Wallclock reading when the first RTP packet was received */ - uint32_t initial_rtp; /* RTP timestamp of the first RTP packet received */ - uint32_t clock_rate; /* Rate of the clock (used for jitter calculations) */ - - uint32_t lsr; /* Middle 32 bits of the 64-bit NTP timestamp of previous SR */ - uvg_rtp::clock::hrc::hrc_t sr_ts; /* When the last SR was received (used to calculate delay) */ - - uint16_t max_seq; /* Highest sequence number received */ - uint16_t base_seq; /* First sequence number received */ - uint16_t bad_seq; /* TODO: */ - uint16_t cycles; /* Number of sequence cycles */ - }; - - struct participant { - uvg_rtp::socket *socket; /* socket associated with this participant */ - sockaddr_in address; /* address of the participant */ - struct statistics stats; /* RTCP session statistics of the participant */ - - int probation; /* TODO: */ - bool sender; /* Sender will create report block for other sender only */ - - /* Save the latest RTCP packets received from this participant - * Users can query these packets using the SSRC of participant */ - uvg_rtp::frame::rtcp_sender_frame *s_frame; - uvg_rtp::frame::rtcp_receiver_frame *r_frame; - uvg_rtp::frame::rtcp_sdes_frame *sdes_frame; - uvg_rtp::frame::rtcp_app_frame *app_frame; - }; - - std::map participants_; + std::map participants_; size_t num_receivers_; /* statistics for RTCP Sender and Receiver Reports */ - struct statistics sender_stats; + struct rtcp_statistics our_stats; - /* TODO: */ - std::vector initial_participants_; + /* If we expect frames from remote but haven't received anything from remote yet, + * the participant resides in this vector until he's moved to participants_ */ + std::vector initial_participants_; /* Vector of sockets the RTCP runner is listening to * @@ -300,10 +318,5 @@ namespace uvg_rtp { void (*receiver_hook_)(uvg_rtp::frame::rtcp_receiver_frame *); void (*sdes_hook_)(uvg_rtp::frame::rtcp_sdes_frame *); void (*app_hook_)(uvg_rtp::frame::rtcp_app_frame *); - - /* Because struct statistics contains uvgRTP clock object we cannot - * zero it out without compiler complaining about it so all the fields - * must be set to zero manually */ - void zero_stats(uvg_rtp::rtcp::statistics *stats); }; }; diff --git a/src/rtcp.cc b/src/rtcp.cc index c8921ea..3dce610 100644 --- a/src/rtcp.cc +++ b/src/rtcp.cc @@ -14,155 +14,36 @@ #include "rtcp.hh" #include "util.hh" -/* TODO: Find the actual used sizes somehow? */ #define UDP_HEADER_SIZE 8 #define IP_HEADER_SIZE 20 -uvg_rtp::rtcp::rtcp(uint32_t ssrc, bool receiver): - receiver_(receiver), +uvg_rtp::rtcp::rtcp(uvg_rtp::rtp *rtp): + rtp_(rtp), our_role_(RECEIVER), tp_(0), tc_(0), tn_(0), pmembers_(0), members_(0), senders_(0), rtcp_bandwidth_(0), we_sent_(0), avg_rtcp_pkt_pize_(0), rtcp_pkt_count_(0), initial_(true), num_receivers_(0) { - ssrc_ = ssrc; + ssrc_ = rtp->get_ssrc(); + clock_rate_ = rtp->get_clock_rate(); clock_start_ = 0; - clock_rate_ = 0; rtp_ts_start_ = 0; runner_ = nullptr; - zero_stats(&sender_stats); -} - -uvg_rtp::rtcp::rtcp(uvg_rtp::rtp *rtp) -{ + zero_stats(&our_stats); } uvg_rtp::rtcp::~rtcp() { } -rtp_error_t uvg_rtp::rtcp::add_participant(std::string dst_addr, uint16_t dst_port, uint16_t src_port, uint32_t clock_rate) -{ - if (dst_addr == "" || !dst_port || !src_port) { - LOG_ERROR("Invalid values given (%s, %d, %d), cannot create RTCP instance", - dst_addr.c_str(), dst_port, src_port); - return RTP_INVALID_VALUE; - } - - rtp_error_t ret; - struct participant *p; - - if (!(p = new struct participant)) - return RTP_MEMORY_ERROR; - - zero_stats(&p->stats); - - if (!(p->socket = new uvg_rtp::socket(RTP_CTX_NO_FLAGS))) - return RTP_MEMORY_ERROR; - - if ((ret = p->socket->init(AF_INET, SOCK_DGRAM, 0)) != RTP_OK) - return ret; - - int enable = 1; - - if ((ret = p->socket->setsockopt(SOL_SOCKET, SO_REUSEADDR, (const char *)&enable, sizeof(int))) != RTP_OK) - return ret; - -#ifdef _WIN32 - /* Make the socket non-blocking */ - int enabled = 1; - - if (::ioctlsocket(p->socket->get_raw_socket(), FIONBIO, (u_long *)&enabled) < 0) - LOG_ERROR("Failed to make the socket non-blocking!"); -#endif - - /* Set read timeout (5s for now) - * - * This means that the socket is listened for 5s at a time and after the timeout, - * Send Report is sent to all participants */ - struct timeval tv; - tv.tv_sec = 3; - tv.tv_usec = 0; - - if ((ret = p->socket->setsockopt(SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) != RTP_OK) - return ret; - - LOG_WARN("Binding to port %d (source port)", src_port); - - if ((ret = p->socket->bind(AF_INET, INADDR_ANY, src_port)) != RTP_OK) - return ret; - - p->sender = false; - p->address = p->socket->create_sockaddr(AF_INET, dst_addr, dst_port); - p->stats.clock_rate = clock_rate; - - initial_participants_.push_back(p); - sockets_.push_back(*p->socket); - - return RTP_OK; -} - -rtp_error_t uvg_rtp::rtcp::add_participant(uint32_t ssrc) -{ - /* RTCP is not in use for this media stream, - * create a "fake" participant that is only used for storing statistics information */ - if (initial_participants_.empty()) { - if (!(participants_[ssrc] = new struct participant)) - return RTP_MEMORY_ERROR; - zero_stats(&participants_[ssrc]->stats); - } else { - participants_[ssrc] = initial_participants_.back(); - initial_participants_.pop_back(); - } - num_receivers_++; - - participants_[ssrc]->r_frame = nullptr; - participants_[ssrc]->s_frame = nullptr; - participants_[ssrc]->sdes_frame = nullptr; - participants_[ssrc]->app_frame = nullptr; - - return RTP_OK; -} - -void uvg_rtp::rtcp::update_rtcp_bandwidth(size_t pkt_size) -{ - rtcp_pkt_count_++; - rtcp_byte_count_ += pkt_size + UDP_HEADER_SIZE + IP_HEADER_SIZE; - avg_rtcp_pkt_pize_ = rtcp_byte_count_ / rtcp_pkt_count_; -} - -void uvg_rtp::rtcp::zero_stats(uvg_rtp::rtcp::statistics *stats) -{ - stats->received_pkts = 0; - stats->dropped_pkts = 0; - stats->received_bytes = 0; - - stats->sent_pkts = 0; - stats->sent_bytes = 0; - - stats->jitter = 0; - stats->transit = 0; - - stats->initial_ntp = 0; - stats->initial_rtp = 0; - stats->clock_rate = 0; - stats->lsr = 0; - - stats->max_seq = 0; - stats->base_seq = 0; - stats->bad_seq = 0; - stats->cycles = 0; -} - rtp_error_t uvg_rtp::rtcp::start() { if (sockets_.empty()) { LOG_ERROR("Cannot start RTCP Runner because no connections have been initialized"); return RTP_INVALID_VALUE; } - active_ = true; if (!(runner_ = new std::thread(rtcp_runner, this))) { @@ -209,20 +90,117 @@ free_mem: return RTP_OK; } -std::vector& uvg_rtp::rtcp::get_sockets() +rtp_error_t uvg_rtp::rtcp::add_participant(std::string dst_addr, uint16_t dst_port, uint16_t src_port, uint32_t clock_rate) { - return sockets_; -} - -std::vector uvg_rtp::rtcp::get_participants() -{ - std::vector ssrcs; - - for (auto& i : participants_) { - ssrcs.push_back(i.first); + if (dst_addr == "" || !dst_port || !src_port) { + LOG_ERROR("Invalid values given (%s, %d, %d), cannot create RTCP instance", + dst_addr.c_str(), dst_port, src_port); + return RTP_INVALID_VALUE; } - return ssrcs; + rtp_error_t ret; + rtcp_participant *p; + + if (!(p = new rtcp_participant)) + return RTP_MEMORY_ERROR; + + zero_stats(&p->stats); + + if (!(p->socket = new uvg_rtp::socket(RTP_CTX_NO_FLAGS))) + return RTP_MEMORY_ERROR; + + if ((ret = p->socket->init(AF_INET, SOCK_DGRAM, 0)) != RTP_OK) + return ret; + + int enable = 1; + + if ((ret = p->socket->setsockopt(SOL_SOCKET, SO_REUSEADDR, (const char *)&enable, sizeof(int))) != RTP_OK) + return ret; + +#ifdef _WIN32 + /* Make the socket non-blocking */ + int enabled = 1; + + if (::ioctlsocket(p->socket->get_raw_socket(), FIONBIO, (u_long *)&enabled) < 0) + LOG_ERROR("Failed to make the socket non-blocking!"); +#endif + + /* Set read timeout (5s for now) + * + * This means that the socket is listened for 5s at a time and after the timeout, + * Send Report is sent to all participants */ + struct timeval tv; + tv.tv_sec = 3; + tv.tv_usec = 0; + + if ((ret = p->socket->setsockopt(SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) != RTP_OK) + return ret; + + LOG_WARN("Binding to port %d (source port)", src_port); + + if ((ret = p->socket->bind(AF_INET, INADDR_ANY, src_port)) != RTP_OK) + return ret; + + p->role = RECEIVER; + p->address = p->socket->create_sockaddr(AF_INET, dst_addr, dst_port); + p->stats.clock_rate = clock_rate; + + initial_participants_.push_back(p); + sockets_.push_back(*p->socket); + + return RTP_OK; +} + +rtp_error_t uvg_rtp::rtcp::add_participant(uint32_t ssrc) +{ + /* RTCP is not in use for this media stream, + * create a "fake" participant that is only used for storing statistics information */ + if (initial_participants_.empty()) { + if (!(participants_[ssrc] = new rtcp_participant)) + return RTP_MEMORY_ERROR; + zero_stats(&participants_[ssrc]->stats); + } else { + participants_[ssrc] = initial_participants_.back(); + initial_participants_.pop_back(); + } + num_receivers_++; + + participants_[ssrc]->r_frame = nullptr; + participants_[ssrc]->s_frame = nullptr; + participants_[ssrc]->sdes_frame = nullptr; + participants_[ssrc]->app_frame = nullptr; + + return RTP_OK; +} + +void uvg_rtp::rtcp::update_rtcp_bandwidth(size_t pkt_size) +{ + rtcp_pkt_count_ += 1; + rtcp_byte_count_ += pkt_size + UDP_HEADER_SIZE + IP_HEADER_SIZE; + avg_rtcp_pkt_pize_ = rtcp_byte_count_ / rtcp_pkt_count_; +} + +void uvg_rtp::rtcp::zero_stats(uvg_rtp::rtcp_statistics *stats) +{ + stats->received_pkts = 0; + stats->dropped_pkts = 0; + stats->received_bytes = 0; + + stats->sent_pkts = 0; + stats->sent_bytes = 0; + + stats->jitter = 0; + stats->transit = 0; + + stats->initial_ntp = 0; + stats->initial_rtp = 0; + stats->clock_rate = 0; + stats->lsr = 0; + + stats->max_seq = 0; + stats->base_seq = 0; + stats->bad_seq = 0; + stats->cycles = 0; } bool uvg_rtp::rtcp::is_participant(uint32_t ssrc) @@ -230,21 +208,6 @@ bool uvg_rtp::rtcp::is_participant(uint32_t ssrc) return participants_.find(ssrc) != participants_.end(); } -void uvg_rtp::rtcp::sender_inc_sent_bytes(size_t n) -{ - sender_stats.sent_bytes += n; -} - -void uvg_rtp::rtcp::sender_inc_sent_pkts(size_t n) -{ - sender_stats.sent_pkts += n; -} - -void uvg_rtp::rtcp::sender_inc_seq_cycle_count() -{ - sender_stats.cycles++; -} - void uvg_rtp::rtcp::set_sender_ts_info(uint64_t clock_start, uint32_t clock_rate, uint32_t rtp_ts_start) { clock_start_ = clock_start; @@ -257,29 +220,9 @@ void uvg_rtp::rtcp::sender_update_stats(uvg_rtp::frame::rtp_frame *frame) if (!frame) return; - sender_stats.sent_pkts += 1; - sender_stats.sent_bytes += frame->payload_len; - sender_stats.max_seq = frame->header.seq; -} - -void uvg_rtp::rtcp::receiver_inc_sent_bytes(uint32_t sender_ssrc, size_t n) -{ - if (is_participant(sender_ssrc)) { - participants_[sender_ssrc]->stats.sent_bytes += n; - return; - } - - LOG_WARN("Got RTP packet from unknown source: 0x%x", sender_ssrc); -} - -void uvg_rtp::rtcp::receiver_inc_sent_pkts(uint32_t sender_ssrc, size_t n) -{ - if (is_participant(sender_ssrc)) { - participants_[sender_ssrc]->stats.sent_pkts += n; - return; - } - - LOG_WARN("Got RTP packet from unknown source: 0x%x", sender_ssrc); + our_stats.sent_pkts += 1; + our_stats.sent_bytes += frame->payload_len; + our_stats.max_seq = frame->header.seq; } rtp_error_t uvg_rtp::rtcp::init_new_participant(uvg_rtp::frame::rtp_frame *frame) @@ -391,17 +334,17 @@ rtp_error_t uvg_rtp::rtcp::reset_rtcp_state(uint32_t ssrc) if (participants_.find(ssrc) != participants_.end()) return RTP_SSRC_COLLISION; - sender_stats.received_pkts = 0; - sender_stats.dropped_pkts = 0; - sender_stats.received_bytes = 0; - sender_stats.sent_pkts = 0; - sender_stats.sent_bytes = 0; - sender_stats.jitter = 0; - sender_stats.transit = 0; - sender_stats.max_seq = 0; - sender_stats.base_seq = 0; - sender_stats.bad_seq = 0; - sender_stats.cycles = 0; + our_stats.received_pkts = 0; + our_stats.dropped_pkts = 0; + our_stats.received_bytes = 0; + our_stats.sent_pkts = 0; + our_stats.sent_bytes = 0; + our_stats.jitter = 0; + our_stats.transit = 0; + our_stats.max_seq = 0; + our_stats.base_seq = 0; + our_stats.bad_seq = 0; + our_stats.cycles = 0; return RTP_OK; } @@ -447,47 +390,41 @@ void uvg_rtp::rtcp::update_session_statistics(uvg_rtp::frame::rtp_frame *frame) p->stats.jitter += (1.f / 16.f) * ((double)d - p->stats.jitter); } -rtp_error_t uvg_rtp::rtcp::generate_report() +/* RTCP packet handler is responsible for doing two things: + * + * - it checks whether the packet is coming from an existing user and if so, + * updates that user's session statistics. If the packet is coming from a user, + * the user is put on probation where they will stay until enough valid packets + * have been received. + * - it keeps track of participants' SSRCs and if a collision + * is detected, the RTP context is updated */ +rtp_error_t uvg_rtp::rtcp::packet_handler(void *arg, int flags, frame::rtp_frame **out) { - if (receiver_) - return generate_receiver_report(); - return generate_sender_report(); -} + (void)flags; -rtp_error_t uvg_rtp::rtcp::install_sender_hook(void (*hook)(uvg_rtp::frame::rtcp_sender_frame *)) -{ - if (!hook) - return RTP_INVALID_VALUE; + rtp_error_t ret; + uvg_rtp::frame::rtp_frame *frame = *out; + uvg_rtp::rtcp *rtcp = (uvg_rtp::rtcp *)arg; - sender_hook_ = hook; - return RTP_OK; -} + /* If this is the first packet from remote, move the participant from initial_participants_ + * to participants_, initialize its state and put it on probation until enough valid + * packets from them have been received + * + * Otherwise update and monitor the received sequence numbers to determine whether something + * has gone awry with the sender's sequence number calculations/delivery of packets */ + if (!rtcp->is_participant(frame->header.ssrc)) { + if ((ret = rtcp->init_new_participant(frame)) != RTP_OK) + return RTP_GENERIC_ERROR; + } else if (rtcp->update_participant_seq(frame->header.ssrc, frame->header.seq) != RTP_OK) { + return RTP_GENERIC_ERROR; + } -rtp_error_t uvg_rtp::rtcp::install_receiver_hook(void (*hook)(uvg_rtp::frame::rtcp_receiver_frame *)) -{ - if (!hook) - return RTP_INVALID_VALUE; + /* Finally update the jitter/transit/received/dropped bytes/pkts statistics */ + rtcp->update_session_statistics(frame); - receiver_hook_ = hook; - return RTP_OK; -} - -rtp_error_t uvg_rtp::rtcp::install_sdes_hook(void (*hook)(uvg_rtp::frame::rtcp_sdes_frame *)) -{ - if (!hook) - return RTP_INVALID_VALUE; - - sdes_hook_ = hook; - return RTP_OK; -} - -rtp_error_t uvg_rtp::rtcp::install_app_hook(void (*hook)(uvg_rtp::frame::rtcp_app_frame *)) -{ - if (!hook) - return RTP_INVALID_VALUE; - - app_hook_ = hook; - return RTP_OK; + /* Even though RTCP collects information from the packet, this is not the packet's final destination. + * Thus return RTP_PKT_NOT_HANDLED to indicate that the packet should be passed on to other handlers */ + return RTP_PKT_NOT_HANDLED; } rtp_error_t uvg_rtp::rtcp::handle_incoming_packet(uint8_t *buffer, size_t size) @@ -544,71 +481,3 @@ rtp_error_t uvg_rtp::rtcp::handle_incoming_packet(uint8_t *buffer, size_t size) return ret; } - -void uvg_rtp::rtcp::rtcp_runner(uvg_rtp::rtcp *rtcp) -{ - LOG_INFO("RTCP instance created!"); - - uvg_rtp::clock::hrc::hrc_t start, end; - int nread, diff, timeout = MIN_TIMEOUT; - uint8_t buffer[MAX_PACKET]; - rtp_error_t ret; - - while (rtcp->active()) { - start = uvg_rtp::clock::hrc::now(); - ret = uvg_rtp::poll::poll(rtcp->get_sockets(), buffer, MAX_PACKET, timeout, &nread); - - if (ret == RTP_OK && nread > 0) { - (void)rtcp->handle_incoming_packet(buffer, (size_t)nread); - } else if (ret == RTP_INTERRUPTED) { - /* do nothing */ - } else { - LOG_ERROR("recvfrom failed, %d", ret); - } - - diff = uvg_rtp::clock::hrc::diff_now(start); - - if (diff >= MIN_TIMEOUT) { - if ((ret = rtcp->generate_report()) != RTP_OK && ret != RTP_NOT_READY) { - LOG_ERROR("Failed to send RTCP status report!"); - } - - timeout = MIN_TIMEOUT; - } - } -} - -/* RTCP packet handler is responsible for doing two things: - * - * - it checks whether the packet is coming from an existing user and if so, - * updates that user's session statistics. If the packet is coming from a user, - * the user is put on probation where they will stay until enough valid packets - * have been received. - * - it keeps track of participants' SSRCs and if a collision - * is detected, the RTP context is updated */ -rtp_error_t uvg_rtp::rtcp::packet_handler(void *arg, int flags, frame::rtp_frame **out) -{ - rtp_error_t ret; - uvg_rtp::frame::rtp_frame *frame = *out; - uvg_rtp::rtcp *rtcp = (uvg_rtp::rtcp *)arg; - - /* If this is the first packet from remote, move the participant from initial_participants_ - * to participants_, initialize its state and put it on probation until enough valid - * packets from them have been received - * - * Otherwise update and monitor the received sequence numbers to determine whether something - * has gone awry with the sender's sequence number calculations/delivery of packets */ - if (!rtcp->is_participant(frame->header.ssrc)) { - if ((ret = rtcp->init_new_participant(frame)) != RTP_OK) - return RTP_GENERIC_ERROR; - } else if (rtcp->update_participant_seq(frame->header.ssrc, frame->header.seq) != RTP_OK) { - return RTP_GENERIC_ERROR; - } - - /* Finally update the jitter/transit/received/dropped bytes/pkts statistics */ - rtcp->update_session_statistics(frame); - - /* Even though RTCP collects information from the packet, this is not the packet's final destination. - * Thus return RTP_PKT_NOT_HANDLED to indicate that the packet should be passed on to other handlers */ - return RTP_PKT_NOT_HANDLED; -} diff --git a/src/rtcp/app.cc b/src/rtcp/app.cc index d44f253..bc8b0b7 100644 --- a/src/rtcp/app.cc +++ b/src/rtcp/app.cc @@ -15,6 +15,15 @@ uvg_rtp::frame::rtcp_app_frame *uvg_rtp::rtcp::get_app_packet(uint32_t ssrc) return frame; } +rtp_error_t uvg_rtp::rtcp::install_app_hook(void (*hook)(uvg_rtp::frame::rtcp_app_frame *)) +{ + if (!hook) + return RTP_INVALID_VALUE; + + app_hook_ = hook; + return RTP_OK; +} + rtp_error_t uvg_rtp::rtcp::handle_app_packet(uvg_rtp::frame::rtcp_app_frame *frame, size_t size) { if (!frame) diff --git a/src/rtcp/module.mk b/src/rtcp/module.mk index 0930b2c..5b332bb 100644 --- a/src/rtcp/module.mk +++ b/src/rtcp/module.mk @@ -3,4 +3,5 @@ SOURCES += \ src/rtcp/sdes.cc \ src/rtcp/bye.cc \ src/rtcp/receiver.cc \ - src/rtcp/sender.cc + src/rtcp/sender.cc \ + src/rtcp/runner.cc diff --git a/src/rtcp/receiver.cc b/src/rtcp/receiver.cc index 57d340c..9b1a298 100644 --- a/src/rtcp/receiver.cc +++ b/src/rtcp/receiver.cc @@ -15,6 +15,15 @@ uvg_rtp::frame::rtcp_receiver_frame *uvg_rtp::rtcp::get_receiver_packet(uint32_t return frame; } +rtp_error_t uvg_rtp::rtcp::install_receiver_hook(void (*hook)(uvg_rtp::frame::rtcp_receiver_frame *)) +{ + if (!hook) + return RTP_INVALID_VALUE; + + receiver_hook_ = hook; + return RTP_OK; +} + rtp_error_t uvg_rtp::rtcp::handle_receiver_report_packet(uvg_rtp::frame::rtcp_receiver_frame *frame, size_t size) { (void)size; diff --git a/src/rtcp/runner.cc b/src/rtcp/runner.cc new file mode 100644 index 0000000..43f2dd1 --- /dev/null +++ b/src/rtcp/runner.cc @@ -0,0 +1,62 @@ +#ifdef _WIN32 +#else +#endif + +#include "rtcp.hh" +#include "poll.hh" + +std::vector& uvg_rtp::rtcp::get_sockets() +{ + return sockets_; +} + +std::vector uvg_rtp::rtcp::get_participants() +{ + std::vector ssrcs; + + for (auto& i : participants_) { + ssrcs.push_back(i.first); + } + + return ssrcs; +} + +rtp_error_t uvg_rtp::rtcp::generate_report() +{ + if (our_role_ == RECEIVER) + return generate_receiver_report(); + return generate_sender_report(); +} + +void uvg_rtp::rtcp::rtcp_runner(uvg_rtp::rtcp *rtcp) +{ + LOG_INFO("RTCP instance created!"); + + uvg_rtp::clock::hrc::hrc_t start, end; + int nread, diff, timeout = MIN_TIMEOUT; + uint8_t buffer[MAX_PACKET]; + rtp_error_t ret; + + while (rtcp->active()) { + start = uvg_rtp::clock::hrc::now(); + ret = uvg_rtp::poll::poll(rtcp->get_sockets(), buffer, MAX_PACKET, timeout, &nread); + + if (ret == RTP_OK && nread > 0) { + (void)rtcp->handle_incoming_packet(buffer, (size_t)nread); + } else if (ret == RTP_INTERRUPTED) { + /* do nothing */ + } else { + LOG_ERROR("recvfrom failed, %d", ret); + } + + diff = uvg_rtp::clock::hrc::diff_now(start); + + if (diff >= MIN_TIMEOUT) { + if ((ret = rtcp->generate_report()) != RTP_OK && ret != RTP_NOT_READY) { + LOG_ERROR("Failed to send RTCP status report!"); + } + + timeout = MIN_TIMEOUT; + } + } +} diff --git a/src/rtcp/sdes.cc b/src/rtcp/sdes.cc index c5a3ae4..08b2805 100644 --- a/src/rtcp/sdes.cc +++ b/src/rtcp/sdes.cc @@ -15,6 +15,15 @@ uvg_rtp::frame::rtcp_sdes_frame *uvg_rtp::rtcp::get_sdes_packet(uint32_t ssrc) return frame; } +rtp_error_t uvg_rtp::rtcp::install_sdes_hook(void (*hook)(uvg_rtp::frame::rtcp_sdes_frame *)) +{ + if (!hook) + return RTP_INVALID_VALUE; + + sdes_hook_ = hook; + return RTP_OK; +} + rtp_error_t uvg_rtp::rtcp::handle_sdes_packet(uvg_rtp::frame::rtcp_sdes_frame *frame, size_t size) { if (!frame) diff --git a/src/rtcp/sender.cc b/src/rtcp/sender.cc index 4ef5abd..c20d365 100644 --- a/src/rtcp/sender.cc +++ b/src/rtcp/sender.cc @@ -15,6 +15,15 @@ uvg_rtp::frame::rtcp_sender_frame *uvg_rtp::rtcp::get_sender_packet(uint32_t ssr return frame; } +rtp_error_t uvg_rtp::rtcp::install_sender_hook(void (*hook)(uvg_rtp::frame::rtcp_sender_frame *)) +{ + if (!hook) + return RTP_INVALID_VALUE; + + sender_hook_ = hook; + return RTP_OK; +} + rtp_error_t uvg_rtp::rtcp::handle_sender_report_packet(uvg_rtp::frame::rtcp_sender_frame *frame, size_t size) { (void)size; @@ -132,13 +141,13 @@ rtp_error_t uvg_rtp::rtcp::generate_sender_report() frame->s_info.ntp_msw = timestamp >> 32; frame->s_info.ntp_lsw = timestamp & 0xffffffff; frame->s_info.rtp_ts = rtp_ts_start_ + (uvg_rtp::clock::ntp::diff(timestamp, clock_start_)) * clock_rate_ / 1000; - frame->s_info.pkt_cnt = sender_stats.sent_pkts; - frame->s_info.byte_cnt = sender_stats.sent_bytes; + frame->s_info.pkt_cnt = our_stats.sent_pkts; + frame->s_info.byte_cnt = our_stats.sent_bytes; LOG_DEBUG("Sender Report from 0x%x has %zu blocks", ssrc_, senders_); for (auto& participant : participants_) { - if (!participant.second->sender) + if (participant.second->role == RECEIVER) continue; frame->blocks[ptr].ssrc = participant.first;