From 7f1f3b25d47ed5cb36e8af71db84af9e967b202e Mon Sep 17 00:00:00 2001 From: Martin Pulec Date: Thu, 22 Aug 2013 18:19:33 +0200 Subject: [PATCH] Video decoder: simplify a bit --- src/rtp/video_decoders.cpp | 151 ++++++++---------------------------- src/utils/message_queue.cpp | 12 ++- src/utils/message_queue.h | 3 +- 3 files changed, 44 insertions(+), 122 deletions(-) diff --git a/src/rtp/video_decoders.cpp b/src/rtp/video_decoders.cpp index 48f6919bc..cd2d424a2 100644 --- a/src/rtp/video_decoders.cpp +++ b/src/rtp/video_decoders.cpp @@ -65,6 +65,8 @@ * ### Encrypted video (with LDGM) ### * After LDGM decoding, the whole block is decompressed. * + * @todo + * This code is very very messy, it needs to be rewritten. */ #ifdef HAVE_CONFIG_H @@ -84,6 +86,7 @@ #include "rtp/rtp_callback.h" #include "rtp/pbuf.h" #include "rtp/video_decoders.h" +#include "utils/message_queue.h" #include "utils/timed_message.h" #include "video.h" #include "video_decompress.h" @@ -160,7 +163,7 @@ struct main_msg; // message definitions typedef char *char_p; typedef struct packet_counter *packet_counter_p; -struct ldgm_msg { +struct ldgm_msg : public msg { ldgm_msg(int count) : substream_count(count) { buffer_len = new int[count]; buffer_num = new int[count]; @@ -186,8 +189,9 @@ struct ldgm_msg { bool poisoned; }; -struct decompress_msg { - decompress_msg(int count) { +struct decompress_msg : public msg { + decompress_msg(int count) : + poisoned(false) { decompress_buffer = new char_p[count]; fec_buffers = new char_p[count]; buffer_len = new int[count]; @@ -206,11 +210,7 @@ struct decompress_msg { bool poisoned; }; -enum main_msg_type { - RECONFIGURE, -}; - -struct main_msg { +struct main_msg : public msg { virtual ~main_msg(){} }; @@ -218,17 +218,16 @@ struct main_msg_reconfigure : public main_msg { main_msg_reconfigure(struct video_desc d) : desc(d) {} struct video_desc desc; }; - -struct video_new_prop : public main_msg { - double set_fps; - codec_t codec; -}; } /** * @brief Decoder state */ -struct state_video_decoder { +struct state_video_decoder +{ + state_video_decoder() : + decompress_queue(1), ldgm_queue(1) + {} virtual ~state_video_decoder() {} struct module *parent; @@ -264,11 +263,7 @@ struct state_video_decoder { * has been processed and we can write to a new one */ pthread_cond_t buffer_swapped_cv; ///< condition variable associated with @ref buffer_swapped - /// @{ - pthread_cond_t decompress_msg_processed_cv; - pthread_cond_t decompress_msg_ready_cv; - struct decompress_msg *decompress_msg; - /// @} + message_queue decompress_queue; codec_t out_codec; // display or postprocessor @@ -282,11 +277,7 @@ struct state_video_decoder { int pp_output_frames_count; /// @} - /// @{ - pthread_cond_t ldgm_worker_cv; - pthread_cond_t ldgm_boss_cv; - struct ldgm_msg *ldgm_msg; - /// @} + message_queue ldgm_queue; enum video_mode video_mode; ///< video mode set for this decoder unsigned merged_fb:1; ///< flag if the display device driver requires tiled video or not @@ -299,7 +290,7 @@ struct state_video_decoder { /// @} timed_message slow_msg; ///< shows warning ony in certain interval - queue msg_queue; + message_queue msg_queue; struct openssl_decrypt *decrypt; ///< decrypt state }; @@ -334,34 +325,12 @@ static void *ldgm_thread(void *args) { while(1) { struct ldgm_msg *data = NULL; - pthread_mutex_lock(&decoder->lock); - { - while (decoder->ldgm_msg == NULL) { - pthread_cond_wait(&decoder->ldgm_worker_cv, &decoder->lock); - } - data = decoder->ldgm_msg; - decoder->ldgm_msg = NULL; - - pthread_cond_signal(&decoder->ldgm_boss_cv); - } - pthread_mutex_unlock(&decoder->lock); + data = dynamic_cast(decoder->ldgm_queue.pop()); if(data->poisoned) { - // signal it also to decompress thread - pthread_mutex_lock(&decoder->lock); - { - while (decoder->decompress_msg) { - pthread_cond_wait(&decoder->decompress_msg_processed_cv, - &decoder->lock); - } - - decoder->decompress_msg = new decompress_msg(0); - decoder->decompress_msg->poisoned = true; - - /* ...and signal the worker */ - pthread_cond_signal(&decoder->decompress_msg_ready_cv); - } - pthread_mutex_unlock(&decoder->lock); + decompress_msg *msg = new decompress_msg(0); + msg->poisoned = true; + decoder->decompress_queue.push(msg); delete data; break; // exit from loop } @@ -467,10 +436,9 @@ static void *ldgm_thread(void *args) { parse_video_hdr(video_hdr, &network_desc); if (!video_desc_eq_excl_param(decoder->received_vid_desc, network_desc, PARAM_TILE_COUNT)) { - pthread_mutex_lock(&decoder->lock); main_msg *msg = new main_msg_reconfigure(network_desc); decoder->msg_queue.push(msg); - pthread_mutex_unlock(&decoder->lock); + ERROR_GOTO_CLEANUP } if(!frame) { @@ -544,18 +512,10 @@ static void *ldgm_thread(void *args) { pthread_mutex_lock(&decoder->lock); { - while (decoder->decompress_msg) { - pthread_cond_wait(&decoder->decompress_msg_processed_cv, &decoder->lock); - } - - decoder->decompress_msg = decompress_msg; - decoder->decompress_msg->poisoned = false; decoder->buffer_swapped = false; - - /* ...and signal the worker */ - pthread_cond_signal(&decoder->decompress_msg_ready_cv); } pthread_mutex_unlock(&decoder->lock); + decoder->decompress_queue.push(decompress_msg); cleanup: if(ret == FALSE) { @@ -581,17 +541,7 @@ static void *decompress_thread(void *args) { struct tile *tile; while(1) { - struct decompress_msg *msg = NULL; - pthread_mutex_lock(&decoder->lock); - { - while (decoder->decompress_msg == NULL) { - pthread_cond_wait(&decoder->decompress_msg_ready_cv, &decoder->lock); - } - msg = decoder->decompress_msg; - decoder->decompress_msg = NULL; - pthread_cond_signal(&decoder->decompress_msg_processed_cv); - } - pthread_mutex_unlock(&decoder->lock); + decompress_msg *msg = dynamic_cast(decoder->decompress_queue.pop()); if(msg->poisoned) { delete msg; @@ -744,13 +694,8 @@ struct state_video_decoder *video_decoder_init(struct module *parent, s->displayed = s->dropped = s->corrupted = 0ul; pthread_mutex_init(&s->lock, NULL); - pthread_cond_init(&s->decompress_msg_processed_cv, NULL); - pthread_cond_init(&s->decompress_msg_ready_cv, NULL); pthread_cond_init(&s->buffer_swapped_cv, NULL); - pthread_cond_init(&s->ldgm_boss_cv, NULL); - pthread_cond_init(&s->ldgm_worker_cv, NULL); - s->ldgm_msg = NULL; s->buffer_swapped = true; if (encryption) { @@ -906,19 +851,9 @@ bool video_decoder_register_display(struct state_video_decoder *decoder, struct void video_decoder_remove_display(struct state_video_decoder *decoder) { if(decoder->display) { - pthread_mutex_lock(&decoder->lock); - { - while (decoder->ldgm_msg) { - pthread_cond_wait(&decoder->ldgm_boss_cv, &decoder->lock); - } - - decoder->ldgm_msg = new ldgm_msg(0); - decoder->ldgm_msg->poisoned = true; - - /* ...and signal the worker */ - pthread_cond_signal(&decoder->ldgm_worker_cv); - } - pthread_mutex_unlock(&decoder->lock); + ldgm_msg *msg = new ldgm_msg(0); + msg->poisoned = true; + decoder->ldgm_queue.push(msg); pthread_join(decoder->ldgm_thread_id, NULL); pthread_join(decoder->decompress_thread_id, NULL); @@ -963,11 +898,6 @@ void video_decoder_destroy(struct state_video_decoder *decoder) video_decoder_remove_display(decoder); pthread_mutex_destroy(&decoder->lock); - pthread_cond_destroy(&decoder->decompress_msg_processed_cv); - pthread_cond_destroy(&decoder->decompress_msg_ready_cv); - - pthread_cond_destroy(&decoder->ldgm_boss_cv); - pthread_cond_destroy(&decoder->ldgm_worker_cv); cleanup(decoder); @@ -1608,20 +1538,14 @@ int decode_video_frame(struct coded_data *cdata, void *decoder_data) int k = 0, m = 0, c = 0, seed = 0; // LDGM int buffer_number, buffer_length; - // first, dispatch messages - pthread_mutex_lock(&decoder->lock); - while (decoder->msg_queue.size() > 0) { - main_msg *msg = decoder->msg_queue.front(); - decoder->msg_queue.pop(); - pthread_mutex_unlock(&decoder->lock); + main_msg *msg; + while ((msg = dynamic_cast(decoder->msg_queue.pop(true /* nonblock */)))) { if (dynamic_cast(msg)) { main_msg_reconfigure *msg_reconf = dynamic_cast(msg); reconfigure_if_needed(decoder, msg_reconf->desc); } delete msg; - pthread_mutex_lock(&decoder->lock); } - pthread_mutex_unlock(&decoder->lock); int pt; int contained_pt; // if packed is encrypted it encapsulates other pt @@ -1855,23 +1779,10 @@ next_packet: memcpy(ldgm_msg->recv_buffers, recv_buffers, sizeof(recv_buffers)); memcpy(ldgm_msg->pckt_list, pckt_list, sizeof(pckt_list)); - pthread_mutex_lock(&decoder->lock); - { - if(decoder->ldgm_msg) { - decoder->slow_msg.print("Your computer may be too SLOW to play this !!!"); - } - - while (decoder->ldgm_msg) { - pthread_cond_wait(&decoder->ldgm_boss_cv, &decoder->lock); - } - - decoder->ldgm_msg = ldgm_msg; - - /* ...and signal the worker */ - pthread_cond_signal(&decoder->ldgm_worker_cv); + if(decoder->ldgm_queue.size() > 0) { + decoder->slow_msg.print("Your computer may be too SLOW to play this !!!"); } - pthread_mutex_unlock(&decoder->lock); - + decoder->ldgm_queue.push(ldgm_msg); cleanup: ; unsigned int frame_size = 0; diff --git a/src/utils/message_queue.cpp b/src/utils/message_queue.cpp index 8ae4772a0..ad4b33347 100644 --- a/src/utils/message_queue.cpp +++ b/src/utils/message_queue.cpp @@ -57,6 +57,12 @@ message_queue::~message_queue() pthread_cond_destroy(&m_queue_decremented); } +int message_queue::size() +{ + lock_guard guard(m_lock); + return m_queue.size(); +} + void message_queue::push(msg *message) { lock_guard guard(m_lock); @@ -69,9 +75,13 @@ void message_queue::push(msg *message) pthread_cond_signal(&m_queue_incremented); } -msg *message_queue::pop() +msg *message_queue::pop(bool nonblocking) { lock_guard guard(m_lock); + if (m_queue.size() == 0 && nonblocking) { + return NULL; + } + while (m_queue.size() == 0) { pthread_cond_wait(&m_queue_incremented, &m_lock); } diff --git a/src/utils/message_queue.h b/src/utils/message_queue.h index 99b3f5ae8..fec883979 100644 --- a/src/utils/message_queue.h +++ b/src/utils/message_queue.h @@ -52,8 +52,9 @@ public: message_queue(int max_len = -1); virtual ~message_queue(); + int size(); void push(msg *); - msg *pop(); + msg *pop(bool nonblocking = false); private: int m_max_len;