diff --git a/src/rtp/video_decoders.cpp b/src/rtp/video_decoders.cpp index 61f3ad226..b73f70653 100644 --- a/src/rtp/video_decoders.cpp +++ b/src/rtp/video_decoders.cpp @@ -181,9 +181,9 @@ struct frame_msg { }; struct main_msg_reconfigure { - inline main_msg_reconfigure(struct video_desc d, struct frame_msg *lf) : desc(d), last_frame(lf) {} + inline main_msg_reconfigure(struct video_desc d, unique_ptr &&f) : desc(d), last_frame(move(f)) {} struct video_desc desc; - struct frame_msg *last_frame; + unique_ptr last_frame; }; } @@ -225,7 +225,7 @@ struct state_video_decoder * has been processed and we can write to a new one */ pthread_cond_t buffer_swapped_cv; ///< condition variable associated with @ref buffer_swapped - synchronized_queue decompress_queue; + synchronized_queue, 1> decompress_queue; codec_t out_codec; // display or postprocessor @@ -239,7 +239,7 @@ struct state_video_decoder int pp_output_frames_count; /// @} - synchronized_queue fec_queue; + synchronized_queue, 1> fec_queue; enum video_mode video_mode; ///< video mode set for this decoder unsigned merged_fb:1; ///< flag if the display device driver requires tiled video or not @@ -289,10 +289,10 @@ static void *fec_thread(void *args) { struct fec_desc desc(FEC_NONE); while(1) { - struct frame_msg *data = decoder->fec_queue.pop(); + unique_ptr data = decoder->fec_queue.pop(); if (data->poisoned) { - decoder->decompress_queue.push(data); + decoder->decompress_queue.push(move(data)); break; // exit from loop } @@ -353,8 +353,7 @@ static void *fec_thread(void *args) { parse_video_hdr(video_hdr, &network_desc); if (!video_desc_eq_excl_param(decoder->received_vid_desc, network_desc, PARAM_TILE_COUNT)) { - decoder->msg_queue.push(new main_msg_reconfigure(network_desc, data)); - data = nullptr; + decoder->msg_queue.push(new main_msg_reconfigure(network_desc, move(data))); ERROR_GOTO_CLEANUP } @@ -432,11 +431,10 @@ static void *fec_thread(void *args) { decoder->buffer_swapped = false; } pthread_mutex_unlock(&decoder->lock); - decoder->decompress_queue.push(data); + decoder->decompress_queue.push(move(data)); cleanup: if(ret == FALSE) { - delete data; decoder->corrupted++; decoder->dropped++; } @@ -454,10 +452,9 @@ static void *decompress_thread(void *args) { struct tile *tile; while(1) { - frame_msg *msg = decoder->decompress_queue.pop(); + unique_ptr msg = decoder->decompress_queue.pop(); if(msg->poisoned) { - delete msg; break; } @@ -554,8 +551,6 @@ static void *decompress_thread(void *args) { } skip_frame: - delete msg; - pthread_mutex_lock(&decoder->lock); { // we have put the video frame and requested another one which is @@ -761,9 +756,9 @@ bool video_decoder_register_display(struct state_video_decoder *decoder, struct void video_decoder_remove_display(struct state_video_decoder *decoder) { if(decoder->display) { - frame_msg *msg = new frame_msg(0, {}); + unique_ptr msg(new frame_msg(0, {})); msg->poisoned = true; - decoder->fec_queue.push(msg); + decoder->fec_queue.push(move(msg)); pthread_join(decoder->fec_thread_id, NULL); pthread_join(decoder->decompress_thread_id, NULL); @@ -1486,14 +1481,13 @@ int decode_video_frame(struct coded_data *cdata, void *decoder_data) #endif } if (msg_reconf->last_frame) { - decoder->fec_queue.push(msg_reconf->last_frame); + decoder->fec_queue.push(move(msg_reconf->last_frame)); } delete msg_reconf; } int pt; bool buffer_swapped = false; - struct frame_msg *fec_msg = NULL; while (cdata != NULL) { uint32_t *hdr; @@ -1738,18 +1732,20 @@ next_packet: assert(ret == TRUE); // format message - fec_msg = new struct frame_msg(max_substreams, fec_desc(fec::fec_type_from_pt(pt), k, m, c, seed)); - std::copy(buffer_len, buffer_len + sizeof buffer_len / sizeof buffer_len[0], - fec_msg->recv_buffer_len.begin()); - std::copy(buffer_num, buffer_num + sizeof buffer_num / sizeof buffer_num[0], fec_msg->buffer_num.begin()); - std::copy(recv_buffer, recv_buffer + sizeof recv_buffer / sizeof recv_buffer[0], fec_msg->recv_buffer.begin()); - fec_msg->pckt_list = std::move(pckt_list); - fec_msg->ssrc = ssrc; + { + unique_ptr fec_msg (new frame_msg(max_substreams, fec_desc(fec::fec_type_from_pt(pt), k, m, c, seed))); + std::copy(buffer_len, buffer_len + sizeof buffer_len / sizeof buffer_len[0], + fec_msg->recv_buffer_len.begin()); + std::copy(buffer_num, buffer_num + sizeof buffer_num / sizeof buffer_num[0], fec_msg->buffer_num.begin()); + std::copy(recv_buffer, recv_buffer + sizeof recv_buffer / sizeof recv_buffer[0], fec_msg->recv_buffer.begin()); + fec_msg->pckt_list = std::move(pckt_list); + fec_msg->ssrc = ssrc; - if(decoder->fec_queue.size() > 0) { - decoder->slow_msg.print("Your computer may be too SLOW to play this !!!"); + if(decoder->fec_queue.size() > 0) { + decoder->slow_msg.print("Your computer may be too SLOW to play this !!!"); + } + decoder->fec_queue.push(move(fec_msg)); } - decoder->fec_queue.push(fec_msg); cleanup: ; unsigned int frame_size = 0; diff --git a/src/utils/synchronized_queue.h b/src/utils/synchronized_queue.h index e6ea24a39..7e613eedb 100644 --- a/src/utils/synchronized_queue.h +++ b/src/utils/synchronized_queue.h @@ -41,6 +41,7 @@ #include #include #include +#include struct msg { virtual ~msg() {} @@ -77,6 +78,17 @@ public: m_queue_incremented.notify_one(); } + void push(T && message) + { + std::unique_lock l(m_lock); + if (max_len != -1) { + m_queue_decremented.wait(l, [this]{return m_queue.size() < (unsigned int) max_len;}); + } + m_queue.push(std::move(message)); + l.unlock(); + m_queue_incremented.notify_one(); + } + T pop(bool nonblocking = false) { std::unique_lock l(m_lock); @@ -85,7 +97,7 @@ public: } m_queue_incremented.wait(l, [this]{return m_queue.size() > 0;}); - T ret = m_queue.front(); + T ret = std::move(m_queue.front()); m_queue.pop(); l.unlock();