diff --git a/Makefile.in b/Makefile.in index ab6b4f0a8..27e0a92e7 100644 --- a/Makefile.in +++ b/Makefile.in @@ -117,7 +117,7 @@ OBJS = @OBJS@ \ src/utils/fs_lock.o \ src/utils/list.o \ src/lib_common.o \ - src/utils/message_queue.o \ + src/utils/synchronized_queue.o \ src/utils/misc.o \ src/utils/packet_counter.o \ src/utils/resource_manager.o \ diff --git a/src/rtp/video_decoders.cpp b/src/rtp/video_decoders.cpp index 44183839c..36b5062cc 100644 --- a/src/rtp/video_decoders.cpp +++ b/src/rtp/video_decoders.cpp @@ -85,7 +85,7 @@ #include "rtp/rtp_callback.h" #include "rtp/pbuf.h" #include "rtp/video_decoders.h" -#include "utils/message_queue.h" +#include "utils/synchronized_queue.h" #include "utils/timed_message.h" #include "video.h" #include "video_decompress.h" @@ -245,7 +245,7 @@ struct state_video_decoder * has been processed and we can write to a new one */ pthread_cond_t buffer_swapped_cv; ///< condition variable associated with @ref buffer_swapped - unique_ptr> decompress_queue; + synchronized_queue decompress_queue; codec_t out_codec; // display or postprocessor @@ -259,7 +259,7 @@ struct state_video_decoder int pp_output_frames_count; /// @} - unique_ptr> fec_queue; + synchronized_queue fec_queue; enum video_mode video_mode; ///< video mode set for this decoder unsigned merged_fb:1; ///< flag if the display device driver requires tiled video or not @@ -272,7 +272,7 @@ struct state_video_decoder /// @} timed_message slow_msg; ///< shows warning ony in certain interval - message_queue msg_queue; + synchronized_queue msg_queue; struct openssl_decrypt *decrypt; ///< decrypt state @@ -309,12 +309,12 @@ static void *fec_thread(void *args) { struct fec_desc desc(FEC_NONE); while(1) { - struct fec_msg *data = decoder->fec_queue->pop(); + struct fec_msg *data = decoder->fec_queue.pop(); if(data->poisoned) { decompress_msg *msg = new decompress_msg(0); msg->poisoned = true; - decoder->decompress_queue->push(msg); + decoder->decompress_queue.push(msg); delete data; break; // exit from loop } @@ -458,7 +458,7 @@ static void *fec_thread(void *args) { decoder->buffer_swapped = false; } pthread_mutex_unlock(&decoder->lock); - decoder->decompress_queue->push(decompress_msg); + decoder->decompress_queue.push(decompress_msg); cleanup: if(ret == FALSE) { @@ -486,7 +486,7 @@ static void *decompress_thread(void *args) { struct tile *tile; while(1) { - decompress_msg *msg = decoder->decompress_queue->pop(); + decompress_msg *msg = decoder->decompress_queue.pop(); if(msg->poisoned) { delete msg; @@ -646,9 +646,6 @@ struct state_video_decoder *video_decoder_init(struct module *parent, s->buffer_swapped = true; s->last_buffer_number = -1; - s->decompress_queue = unique_ptr>(new message_queue(1)); - s->fec_queue = unique_ptr>(new message_queue(1)); - if (encryption) { if(openssl_decrypt_init(&s->decrypt, encryption, MODE_AES128_CTR) != 0) { @@ -801,7 +798,7 @@ void video_decoder_remove_display(struct state_video_decoder *decoder) if(decoder->display) { fec_msg *msg = new fec_msg(0, {}); msg->poisoned = true; - decoder->fec_queue->push(msg); + decoder->fec_queue.push(msg); pthread_join(decoder->fec_thread_id, NULL); pthread_join(decoder->decompress_thread_id, NULL); @@ -1523,7 +1520,7 @@ int decode_video_frame(struct coded_data *cdata, void *decoder_data) #endif } if (msg_reconf->last_frame) { - decoder->fec_queue->push(msg_reconf->last_frame); + decoder->fec_queue.push(msg_reconf->last_frame); } delete msg_reconf; } @@ -1780,10 +1777,10 @@ next_packet: memcpy(fec_msg->recv_buffers, recv_buffers, sizeof(recv_buffers)); fec_msg->pckt_list = std::move(pckt_list); - if(decoder->fec_queue->size() > 0) { + if(decoder->fec_queue.size() > 0) { decoder->slow_msg.print("Your computer may be too SLOW to play this !!!"); } - decoder->fec_queue->push(fec_msg); + decoder->fec_queue.push(fec_msg); cleanup: ; unsigned int frame_size = 0; diff --git a/src/utils/message_queue.cpp b/src/utils/synchronized_queue.cpp similarity index 91% rename from src/utils/message_queue.cpp rename to src/utils/synchronized_queue.cpp index fb4682f27..7999fea42 100644 --- a/src/utils/message_queue.cpp +++ b/src/utils/synchronized_queue.cpp @@ -1,5 +1,5 @@ /** - * @file utils/message_queue.cpp + * @file utils/synchronized_queue.cpp * @author Martin Pulec */ /* @@ -40,7 +40,8 @@ #include "config_win32.h" #define NO_EXTERN_MSGQ_MSG -#include "utils/message_queue.h" +#include "utils/synchronized_queue.h" -template class message_queue; +template class synchronized_queue; +template class synchronized_queue; diff --git a/src/utils/message_queue.h b/src/utils/synchronized_queue.h similarity index 80% rename from src/utils/message_queue.h rename to src/utils/synchronized_queue.h index cef89e393..e6ea24a39 100644 --- a/src/utils/message_queue.h +++ b/src/utils/synchronized_queue.h @@ -1,5 +1,5 @@ /** - * @file utils/message_queue.h + * @file utils/synchronized_queue.h * @author Martin Pulec */ /* @@ -35,8 +35,8 @@ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef MESSAGE_QUEUE_H_ -#define MESSAGE_QUEUE_H_ +#ifndef SYNCHRONIZED_QUEUE_H_ +#define SYNCHRONIZED_QUEUE_H_ #include #include @@ -48,19 +48,18 @@ struct msg { struct msg_quit : public msg {}; -template -class message_queue { +/** + * @brief simple blocking synchronized queue + * + * Queue blocks if it size is higher than max_len on push. It also blocks on pop call + * if there is no element in the queue. + * + * @tparam T type to be stored + * @tparam max_len maximal length of the queue until it bloks (-1 means unlimited) + */ +template +class synchronized_queue { public: - - message_queue(int max_len = -1) : - m_max_len(max_len) - { - } - - virtual ~message_queue() - { - } - int size() { std::unique_lock l(m_lock); @@ -70,8 +69,8 @@ public: void push(T const & message) { std::unique_lock l(m_lock); - if (m_max_len != -1) { - m_queue_decremented.wait(l, [this]{return m_queue.size() < (unsigned int) m_max_len;}); + if (max_len != -1) { + m_queue_decremented.wait(l, [this]{return m_queue.size() < (unsigned int) max_len;}); } m_queue.push(message); l.unlock(); @@ -96,7 +95,6 @@ public: private: - int m_max_len; std::queue m_queue; std::mutex m_lock; std::condition_variable m_queue_decremented; @@ -104,8 +102,9 @@ private: }; #ifndef NO_EXTERN_MSGQ_MSG -extern template class message_queue; +extern template class synchronized_queue; +extern template class synchronized_queue; #endif -#endif // MESSAGE_QUEUE_H_ +#endif // SYNCHRONIZED_QUEUE_H_ diff --git a/src/video_compress.cpp b/src/video_compress.cpp index 52313048a..9f09e87f9 100644 --- a/src/video_compress.cpp +++ b/src/video_compress.cpp @@ -53,7 +53,7 @@ #include "messaging.h" #include "module.h" -#include "utils/message_queue.h" +#include "utils/synchronized_queue.h" #include "utils/vf_split.h" #include "utils/worker.h" #include "video.h" @@ -111,7 +111,7 @@ struct compress_state_real { struct compress_state { struct module mod; ///< compress module data struct compress_state_real *ptr; ///< pointer to real compress state - unique_ptr>> queue; + synchronized_queue, 1> queue; }; typedef struct compress_state compress_state_proxy; ///< Used to emphasize that the state is actually a proxy. @@ -339,9 +339,6 @@ int compress_init(struct module *parent, const char *config_string, struct compr proxy->mod.priv_data = proxy; proxy->mod.deleter = compress_done; - proxy->queue = unique_ptr>>( - new message_queue>(1)); - int ret = compress_init_real(&proxy->mod, config_string, &s); if(ret == 0) { proxy->ptr = s; @@ -457,7 +454,7 @@ void compress_frame(compress_state_proxy *proxy, shared_ptr frame) abort(); if (!frame) { // pass poisoned pill - proxy->queue->push(shared_ptr()); + proxy->queue.push(shared_ptr()); return; } @@ -483,7 +480,7 @@ void compress_frame(compress_state_proxy *proxy, shared_ptr frame) return; } - proxy->queue->push(sync_api_frame); + proxy->queue.push(sync_api_frame); } /** @@ -619,6 +616,6 @@ shared_ptr compress_pop(compress_state_proxy *proxy) if(!proxy) return NULL; - return proxy->queue->pop(); + return proxy->queue.pop(); } diff --git a/src/video_decompress/jpeg_to_dxt.cpp b/src/video_decompress/jpeg_to_dxt.cpp index ce9711557..1b3582435 100644 --- a/src/video_decompress/jpeg_to_dxt.cpp +++ b/src/video_decompress/jpeg_to_dxt.cpp @@ -51,7 +51,7 @@ #include "debug.h" #include "host.h" -#include "utils/message_queue.h" +#include "utils/synchronized_queue.h" #include "video.h" #include "video_decompress.h" #include "video_decompress/jpeg.h" @@ -60,13 +60,12 @@ namespace { struct thread_data { thread_data() : - m_in(1), m_out(1), jpeg_decoder(0), desc(), out_codec(), ppb(), dxt_out_buff(0), cuda_dev_index(-1) {} - message_queue<> m_in; + synchronized_queue m_in; // currently only for output frames - message_queue<> m_out; + synchronized_queue m_out; struct gpujpeg_decoder *jpeg_decoder; struct video_desc desc;