Implement RCE_FRAGMENT_GENERIC for the new architecture

This commit is contained in:
Aaro Altonen 2020-10-12 18:44:59 +03:00
parent 4253f636bf
commit c74d6a75a7
5 changed files with 60 additions and 72 deletions

View File

@ -53,7 +53,7 @@ namespace uvg_rtp {
static rtp_error_t packet_handler(void *arg, int flags, frame::rtp_frame **frame);
/* Return pointer to the internal frame info structure which is relayed to packet handler */
media_frame_info_t *get_hevc_frame_info();
media_frame_info_t *get_media_frame_info();
protected:
virtual rtp_error_t push_media_frame(uint8_t *data, size_t data_len, int flags);

View File

@ -133,6 +133,7 @@ namespace uvg_rtp {
* Return RTP_INVALID_VALUE if one of the parameters is invalid
* Return RTP_MEMORY_ERROR if the maximum amount of chunks/messages is exceeded */
rtp_error_t enqueue_message(uint8_t *message, size_t message_len);
rtp_error_t enqueue_message(uint8_t *message, size_t message_len, bool set_marker);
/* Cache all messages in "buffers" in order to frame queue
*

View File

@ -47,82 +47,46 @@ rtp_error_t uvg_rtp::formats::media::push_media_frame(uint8_t *data, size_t data
return ret;
}
/* TODO: Bring back the support for RCE_FRAGMENT_GENERIC
* It requires support for modifying the active packet's RTP header,
* functionality currently not provided by the frame queue */
if (data_len > rtp_ctx_->get_payload_size()) {
if (flags_ & RCE_FRAGMENT_GENERIC) {
LOG_ERROR("Generic frame fragmentation currently not supported!");
return RTP_NOT_SUPPORTED;
if (!(flags_ & RCE_FRAGMENT_GENERIC)) {
if (data_len > rtp_ctx_->get_payload_size()) {
LOG_WARN("Packet is larger (%zu bytes) than maximum payload size (%zu bytes)",
data_len, rtp_ctx_->get_payload_size());
}
LOG_WARN("Payload is too large and will be truncated (%zu vs %zu)",
data_len, rtp_ctx_->get_payload_size()
);
if ((ret = fqueue_->enqueue_message(data, data_len)) != RTP_OK) {
LOG_ERROR("Failed to enqueue message: %d", ret);
(void)fqueue_->deinit_transaction();
return ret;
}
return fqueue_->flush_queue();
}
if ((ret = fqueue_->enqueue_message(data, data_len)) != RTP_OK) {
LOG_ERROR("Failed to enqueue message: %d", ret);
(void)fqueue_->deinit_transaction();
size_t payload_size = rtp_ctx_->get_payload_size();
ssize_t data_left = data_len;
ssize_t data_pos = 0;
bool set_marker = true;
while (data_left > (ssize_t)payload_size) {
if ((ret = fqueue_->enqueue_message(data + data_pos, payload_size, set_marker)) != RTP_OK) {
LOG_ERROR("Failed to enqueue packet when fragmenting generic frame");
return ret;
}
data_pos += payload_size;
data_left -= payload_size;
set_marker = false;
}
if ((ret = fqueue_->enqueue_message(data + data_pos, data_left, true)) != RTP_OK) {
LOG_ERROR("Failed to enqueue packet when fragmenting generic frame");
return ret;
}
return fqueue_->flush_queue();
#if 0
std::vector<std::pair<size_t, uint8_t *>> buffers;
size_t payload_size = rtp_ctx_->get_payload_size();
uint8_t header[uvg_rtp::frame::HEADER_SIZE_RTP];
/* fill RTP header with our session's values
* and push the header to the buffer vector to use vectored I/O */
rtp_ctx_->fill_header(header);
buffers.push_back(std::make_pair(sizeof(header), header));
buffers.push_back(std::make_pair(data_len, data));
if (data_len > payload_size) {
if (flags_ & RCE_FRAGMENT_GENERIC) {
rtp_error_t ret = RTP_OK;
ssize_t data_left = data_len;
ssize_t data_pos = 0;
/* set marker bit for the first fragment */
header[1] |= (1 << 7);
while (data_left > (ssize_t)payload_size) {
buffers.at(1).first = payload_size;
buffers.at(1).second = data + data_pos;
if ((ret = socket_->sendto(buffers, 0)) != RTP_OK)
return ret;
rtp_ctx_->inc_sequence();
rtp_ctx_->update_sequence(header);
data_pos += payload_size;
data_left -= payload_size;
/* clear marker bit for middle fragments */
header[1] &= 0x7f;
}
/* set marker bit for the last frame */
header[1] |= (1 << 7);
buffers.at(1).first = data_left;
buffers.at(1).second = data + data_pos;
return socket_->sendto(buffers, 0);
} else {
LOG_WARN("Packet is larger (%zu bytes) than payload_size (%zu bytes)", data_len, payload_size);
}
}
#endif
}
uvg_rtp::formats::media_frame_info_t *uvg_rtp::formats::media::get_hevc_frame_info()
uvg_rtp::formats::media_frame_info_t *uvg_rtp::formats::media::get_media_frame_info()
{
return &minfo_;
}
@ -184,7 +148,7 @@ rtp_error_t uvg_rtp::formats::media::packet_handler(void *arg, int flags, uvg_rt
minfo->frames[ts].e_seq = INVALID_SEQ;
minfo->frames[ts].fragments[seq] = frame;
minfo->frames[ts].size = frame->payload_len;
*out = nullptr;
*out = nullptr;
} else {
return RTP_PKT_READY;
}

View File

@ -159,7 +159,12 @@ rtp_error_t uvg_rtp::media_stream::init()
case RTP_FORMAT_OPUS:
case RTP_FORMAT_GENERIC:
media_ = new uvg_rtp::formats::media(socket_, rtp_, ctx_config_.flags);
pkt_dispatcher_->install_aux_handler(rtp_handler_key_, nullptr, media_->packet_handler, nullptr);
pkt_dispatcher_->install_aux_handler(
rtp_handler_key_,
media_->get_media_frame_info(),
media_->packet_handler,
nullptr
);
break;
default:
@ -278,7 +283,12 @@ rtp_error_t uvg_rtp::media_stream::init(uvg_rtp::zrtp *zrtp)
case RTP_FORMAT_OPUS:
case RTP_FORMAT_GENERIC:
media_ = new uvg_rtp::formats::media(socket_, rtp_, ctx_config_.flags);
pkt_dispatcher_->install_aux_handler(rtp_handler_key_, nullptr, media_->packet_handler, nullptr);
pkt_dispatcher_->install_aux_handler(
rtp_handler_key_,
media_->get_media_frame_info(),
media_->packet_handler,
nullptr
);
break;
default:
@ -393,7 +403,12 @@ rtp_error_t uvg_rtp::media_stream::add_srtp_ctx(uint8_t *key, uint8_t *salt)
case RTP_FORMAT_OPUS:
case RTP_FORMAT_GENERIC:
media_ = new uvg_rtp::formats::media(socket_, rtp_, ctx_config_.flags);
pkt_dispatcher_->install_aux_handler(rtp_handler_key_, nullptr, media_->packet_handler, nullptr);
pkt_dispatcher_->install_aux_handler(
rtp_handler_key_,
media_->get_media_frame_info(),
media_->packet_handler,
nullptr
);
break;
default:

View File

@ -238,7 +238,7 @@ rtp_error_t uvg_rtp::frame_queue::deinit_transaction()
return uvg_rtp::frame_queue::deinit_transaction(active_->key);
}
rtp_error_t uvg_rtp::frame_queue::enqueue_message(uint8_t *message, size_t message_len)
rtp_error_t uvg_rtp::frame_queue::enqueue_message(uint8_t *message, size_t message_len, bool set_marker)
{
if (!message || !message_len)
return RTP_INVALID_VALUE;
@ -250,6 +250,9 @@ rtp_error_t uvg_rtp::frame_queue::enqueue_message(uint8_t *message, size_t messa
/* update the RTP header at "rtpheaders_ptr_" */
uvg_rtp::frame_queue::update_rtp_header();
if (set_marker)
((uint8_t *)&active_->rtp_headers[active_->rtphdr_ptr])[1] |= (1 << 7);
/* Push RTP header first and then push all payload buffers */
tmp.push_back({
sizeof(active_->rtp_headers[active_->rtphdr_ptr]),
@ -277,6 +280,11 @@ rtp_error_t uvg_rtp::frame_queue::enqueue_message(uint8_t *message, size_t messa
return RTP_OK;
}
rtp_error_t uvg_rtp::frame_queue::enqueue_message(uint8_t *message, size_t message_len)
{
return enqueue_message(message, message_len, false);
}
rtp_error_t uvg_rtp::frame_queue::enqueue_message(std::vector<std::pair<size_t, uint8_t *>>& buffers)
{
if (!buffers.size())