diff --git a/gpujpeg b/gpujpeg index 7cd7cf922..eb4887095 160000 --- a/gpujpeg +++ b/gpujpeg @@ -1 +1 @@ -Subproject commit 7cd7cf9220b184caca2a7613a53379a735aba1d8 +Subproject commit eb48870958de182bcf88cb695e90d5b1379a8fec diff --git a/src/types.h b/src/types.h index 878327c01..4bc78346e 100644 --- a/src/types.h +++ b/src/types.h @@ -198,6 +198,8 @@ struct video_frame { struct fec_desc fec_params; uint32_t ssrc; + + uint32_t seq; ///< sequential number }; /** diff --git a/src/video_compress/jpeg.cpp b/src/video_compress/jpeg.cpp index 37795f765..ca20db46d 100644 --- a/src/video_compress/jpeg.cpp +++ b/src/video_compress/jpeg.cpp @@ -3,7 +3,7 @@ * @author Martin Pulec */ /* - * Copyright (c) 2011-2014 CESNET, z. s. p. o. + * Copyright (c) 2011-2016 CESNET, z. s. p. o. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -47,195 +47,310 @@ #include "module.h" #include "lib_common.h" #include "libgpujpeg/gpujpeg_encoder.h" +#include "utils/synchronized_queue.h" #include "utils/video_frame_pool.h" #include "video.h" #include -#include -#include +#include +#include +#include +#include using namespace std; namespace { +struct state_video_compress_jpeg; -struct state_video_compress_jpeg { - struct module module_data; +/** + * @brief state for single instance of encoder running on one GPU + */ +struct encoder_state { +private: + void cleanup_state(); + shared_ptr compress_step(shared_ptr frame); + bool configure_with(struct video_desc desc); - struct gpujpeg_encoder *encoder; - struct gpujpeg_parameters encoder_param; + struct state_video_compress_jpeg *m_parent_state; + int m_device_id; + struct gpujpeg_encoder *m_encoder; + struct video_desc m_saved_desc; + video_frame_pool m_pool; + decoder_t m_decoder; + bool m_rgb; + int m_encoder_input_linesize; + unique_ptr m_decoded; +public: + encoder_state(struct state_video_compress_jpeg *s, int device_id) : + m_parent_state(s), m_device_id(device_id), m_encoder{}, m_saved_desc{}, + m_decoder{}, m_rgb{}, m_encoder_input_linesize{}, + m_occupied{} + { + } + ~encoder_state() { + cleanup_state(); + } + void worker(); + void compress(shared_ptr frame); - decoder_t decoder; - unique_ptr decoded; - unsigned int rgb:1; - codec_t color_spec; - - struct video_desc saved_desc; - - int restart_interval; - - int encoder_input_linesize; - - video_frame_pool pool; + synchronized_queue, 1> m_in_queue; ///< queue for uncompressed frames + thread m_thread_id; + bool m_occupied; ///< protected by state_video_compress_jpeg::m_occupancy_lock }; -static bool configure_with(struct state_video_compress_jpeg *s, struct video_frame *frame); -static void cleanup_state(struct state_video_compress_jpeg *s); -static void jpeg_check_messages(struct state_video_compress_jpeg *s); -static bool parse_fmt(struct state_video_compress_jpeg *s, char *fmt); -static void jpeg_compress_done(struct module *mod); +struct state_video_compress_jpeg { +private: + state_video_compress_jpeg(struct module *parent, const char *opts); -static bool configure_with(struct state_video_compress_jpeg *s, struct video_frame *frame) -{ - unsigned int x; + vector m_workers; + bool m_uses_worker_threads; ///< true if cuda_devices_count > 1 - s->saved_desc.width = frame->tiles[0].width; - s->saved_desc.height = frame->tiles[0].height; - s->saved_desc.color_spec = frame->color_spec; - s->saved_desc.fps = frame->fps; - s->saved_desc.interlacing = frame->interlacing; - s->saved_desc.tile_count = frame->tile_count; + map> m_out_frames; ///< frames decoded out of order + uint32_t m_in_seq; ///< seq of next frame to be encoded + uint32_t m_out_seq; ///< seq of next frame to be decoded - for (x = 0; x < frame->tile_count; ++x) { - if (vf_get_tile(frame, x)->width != vf_get_tile(frame, 0)->width || - vf_get_tile(frame, x)->width != vf_get_tile(frame, 0)->width) { - fprintf(stderr,"[JPEG] Requested to compress tiles of different size!"); - return false; + size_t m_ended_count; ///< number of workers ended +public: + ~state_video_compress_jpeg() { + if (m_uses_worker_threads) { + for (auto worker : m_workers) { + worker->m_thread_id.join(); + } + } + + for (auto worker : m_workers) { + delete worker; } } + static state_video_compress_jpeg *create(struct module *parent, const char *opts); + bool parse_fmt(char *fmt); + void push(std::shared_ptr in_frame); + std::shared_ptr pop(); + struct module m_module_data; + int m_restart_interval; + int m_quality; + + synchronized_queue, 1> m_out_queue; ///< queue for compressed frames + mutex m_occupancy_lock; + condition_variable m_worker_finished; +}; + +/** + * @brief Compresses single frame + * + * This function is called either from within jpeg_compress_push() if only one + * CUDA device is used to avoid context switches that introduce some overhead + * (measured ~4% performance drop). + * + * When there are multiple CUDA devices to be used, it is called from encoder_state::worker(). + */ +void encoder_state::compress(shared_ptr frame) +{ + if (frame) { + uint32_t seq = frame->seq; + auto out = compress_step(move(frame)); + if (out) { + out->seq = seq; + } else { + log_msg(LOG_LEVEL_WARNING, "[JPEG] Failed to encode frame!\n"); + out = shared_ptr(vf_alloc(1), vf_free); + out->seq = seq; + } + m_parent_state->m_out_queue.push(out); + } else { // pass poison pill + m_parent_state->m_out_queue.push({}); + } +} + +/** + * Worker thread that is used if multiple CUDA devices are used - every device + * has its own thread. + */ +void encoder_state::worker() { + while (true) { + auto frame = m_in_queue.pop(); + + if (!frame) { // poison pill - pass and exit + m_parent_state->m_out_queue.push(frame); + break; + } + + compress(move(frame)); + + unique_lock lk(m_parent_state->m_occupancy_lock); + m_occupied = false; + lk.unlock(); + m_parent_state->m_worker_finished.notify_one(); + } +} + +/** + * Configures GPUJPEG encoder with provided parameters. + */ +bool encoder_state::configure_with(struct video_desc desc) +{ struct video_desc compressed_desc; - compressed_desc = video_desc_from_frame(frame); + compressed_desc = desc; compressed_desc.color_spec = JPEG; bool try_slow = false; - s->decoder = get_decoder_from_to(frame->color_spec, UYVY, try_slow); - if (s->decoder) { - s->rgb = FALSE; + m_decoder = get_decoder_from_to(desc.color_spec, UYVY, try_slow); + if (m_decoder) { + m_rgb = false; } else { - s->decoder = get_decoder_from_to(frame->color_spec, RGB, try_slow); - if (s->decoder) { - s->rgb = TRUE; + m_decoder = get_decoder_from_to(desc.color_spec, RGB, try_slow); + if (m_decoder) { + m_rgb = true; } else { - fprintf(stderr, "[JPEG] Unsupported codec: %s\n", - get_codec_name(frame->color_spec)); + log_msg(LOG_LEVEL_ERROR, "[JPEG] Unsupported codec: %s\n", + get_codec_name(desc.color_spec)); if (!try_slow) { - fprintf(stderr, "[JPEG] Slow decoders not tried!\n"); + log_msg(LOG_LEVEL_WARNING, "[JPEG] Slow decoders not tried!\n"); } return false; } } - s->encoder_param.verbose = 0; - s->encoder_param.segment_info = 1; - - if(s->rgb) { - s->encoder_param.interleaved = 0; - s->encoder_param.restart_interval = s->restart_interval == -1 ? 8 - : s->restart_interval; - /* LUMA */ - s->encoder_param.sampling_factor[0].horizontal = 1; - s->encoder_param.sampling_factor[0].vertical = 1; - /* Cb and Cr */ - s->encoder_param.sampling_factor[1].horizontal = 1; - s->encoder_param.sampling_factor[1].vertical = 1; - s->encoder_param.sampling_factor[2].horizontal = 1; - s->encoder_param.sampling_factor[2].vertical = 1; + struct gpujpeg_parameters encoder_param; + gpujpeg_set_default_parameters(&encoder_param); + if (m_parent_state->m_quality != -1) { + encoder_param.quality = m_parent_state->m_quality; } else { - s->encoder_param.interleaved = 1; - s->encoder_param.restart_interval = s->restart_interval == -1 ? 4 - : s->restart_interval; - /* LUMA */ - s->encoder_param.sampling_factor[0].horizontal = 2; - s->encoder_param.sampling_factor[0].vertical = 1; - /* Cb and Cr */ - s->encoder_param.sampling_factor[1].horizontal = 1; - s->encoder_param.sampling_factor[1].vertical = 1; - s->encoder_param.sampling_factor[2].horizontal = 1; - s->encoder_param.sampling_factor[2].vertical = 1; + log_msg(LOG_LEVEL_INFO, "[JPEG] setting default encode parameters (quality: %d)\n", + encoder_param.quality); } + if (m_parent_state->m_restart_interval != -1) { + encoder_param.restart_interval = m_parent_state->m_restart_interval; + } else { + encoder_param.restart_interval = m_rgb ? 8 : 4; + } + + encoder_param.verbose = 0; + encoder_param.segment_info = 1; + + /* LUMA */ + encoder_param.sampling_factor[0].vertical = 1; + encoder_param.sampling_factor[0].horizontal = m_rgb ? 1 : 2; + /* Cb and Cr */ + encoder_param.sampling_factor[1].horizontal = 1; + encoder_param.sampling_factor[1].vertical = 1; + encoder_param.sampling_factor[2].horizontal = 1; + encoder_param.sampling_factor[2].vertical = 1; + + encoder_param.interleaved = m_rgb ? 0 : 1; struct gpujpeg_image_parameters param_image; gpujpeg_image_set_default_parameters(¶m_image); - param_image.width = frame->tiles[0].width; - param_image.height = frame->tiles[0].height; + param_image.width = desc.width; + param_image.height = desc.height; param_image.comp_count = 3; - if(s->rgb) { - param_image.color_space = GPUJPEG_RGB; - param_image.sampling_factor = GPUJPEG_4_4_4; - } else { - param_image.color_space = GPUJPEG_YCBCR_BT709; - param_image.sampling_factor = GPUJPEG_4_2_2; - } + param_image.color_space = m_rgb ? GPUJPEG_RGB : GPUJPEG_YCBCR_BT709; + param_image.sampling_factor = m_rgb ? GPUJPEG_4_4_4 : GPUJPEG_4_2_2; - s->encoder = gpujpeg_encoder_create(&s->encoder_param, ¶m_image); + m_encoder = gpujpeg_encoder_create(&encoder_param, ¶m_image); - int data_len = frame->tiles[0].width * frame->tiles[0].height * 3; - s->pool.reconfigure(compressed_desc, data_len); + int data_len = desc.width * desc.height * 3; + m_pool.reconfigure(compressed_desc, data_len); - s->encoder_input_linesize = frame->tiles[0].width * + m_encoder_input_linesize = desc.width * (param_image.color_space == GPUJPEG_RGB ? 3 : 2); - if(!s->encoder) { - fprintf(stderr, "[JPEG] Failed to create encoder.\n"); + if(!m_encoder) { + log_msg(LOG_LEVEL_ERROR, "[JPEG] Failed to create GPUJPEG encoder.\n"); exit_uv(128); return false; } - s->decoded = unique_ptr(new char[4 * frame->tiles[0].width * frame->tiles[0].height]); + m_decoded = unique_ptr(new char[4 * desc.width * desc.height]); + + m_saved_desc = desc; + return true; } -static void jpeg_check_messages(struct state_video_compress_jpeg *s) -{ - struct message *msg; - while ((msg = check_message(&s->module_data))) { - struct msg_change_compress_data *data = - (struct msg_change_compress_data *) msg; - struct response *r; - if (parse_fmt(s, data->config_string) == 0) { - r = new_response(RESPONSE_OK, NULL); - printf("[Libavcodec] Compression successfully changed.\n"); - } else { - r = new_response(RESPONSE_BAD_REQUEST, NULL); - fprintf(stderr, "[Libavcodec] Unable to change compression!\n"); - } - memset(&s->saved_desc, 0, sizeof(s->saved_desc)); - free_message(msg, r); - } -} - -static bool parse_fmt(struct state_video_compress_jpeg *s, char *fmt) +bool state_video_compress_jpeg::parse_fmt(char *fmt) { if(fmt && fmt[0] != '\0') { char *tok, *save_ptr = NULL; - gpujpeg_set_default_parameters(&s->encoder_param); tok = strtok_r(fmt, ":", &save_ptr); - s->encoder_param.quality = atoi(tok); - if (s->encoder_param.quality <= 0 || s->encoder_param.quality > 100) { - fprintf(stderr, "[JPEG] Error: Quality should be in interval [1-100]!\n"); + m_quality = atoi(tok); + if (m_quality <= 0 || m_quality > 100) { + log_msg(LOG_LEVEL_ERROR, "[JPEG] Error: Quality should be in interval [1-100]!\n"); return false; } tok = strtok_r(NULL, ":", &save_ptr); if(tok) { - s->restart_interval = atoi(tok); - if (s->restart_interval < 0) { - fprintf(stderr, "[JPEG] Error: Restart interval should be non-negative!\n"); + m_restart_interval = atoi(tok); + if (m_restart_interval < 0) { + log_msg(LOG_LEVEL_ERROR, "[JPEG] Error: Restart interval should be non-negative!\n"); return false; } } tok = strtok_r(NULL, ":", &save_ptr); if(tok) { - fprintf(stderr, "[JPEG] WARNING: Trailing configuration parameters.\n"); + log_msg(LOG_LEVEL_WARNING, "[JPEG] WARNING: Trailing configuration parameters.\n"); } } return true; } +state_video_compress_jpeg::state_video_compress_jpeg(struct module *parent, const char *opts) : + m_uses_worker_threads{}, m_in_seq{}, + m_out_seq{}, m_ended_count{}, + m_module_data{}, m_restart_interval(-1), m_quality(-1) +{ + if(opts && opts[0] != '\0') { + char *fmt = strdup(opts); + if (!parse_fmt(fmt)) { + free(fmt); + throw; + } + free(fmt); + } + + module_init_default(&m_module_data); + m_module_data.cls = MODULE_CLASS_DATA; + m_module_data.priv_data = this; + m_module_data.deleter = [](struct module *mod) { + struct state_video_compress_jpeg *s = (struct state_video_compress_jpeg *) mod->priv_data; + delete s; + }; + + module_register(&m_module_data, parent); +} + +/** + * Creates JPEG encoding state and creates JPEG workers for every GPU that + * will be used for compression (if cuda_devices_count > 1). + */ +state_video_compress_jpeg *state_video_compress_jpeg::create(struct module *parent, const char *opts) { + assert(cuda_devices_count > 0); + + auto ret = new state_video_compress_jpeg(parent, opts); + + for (unsigned int i = 0; i < cuda_devices_count; ++i) { + ret->m_workers.push_back(new encoder_state(ret, cuda_devices[i])); + } + + if (cuda_devices_count > 1) { + ret->m_uses_worker_threads = true; + } + + if (ret->m_uses_worker_threads) { + for (auto worker : ret->m_workers) { + worker->m_thread_id = thread(&encoder_state::worker, worker); + } + } + + return ret; +} + struct module * jpeg_compress_init(struct module *parent, const char *opts) { struct state_video_compress_jpeg *s; @@ -250,114 +365,83 @@ struct module * jpeg_compress_init(struct module *parent, const char *opts) return &compress_init_noerr; } - s = new state_video_compress_jpeg(); - - s->restart_interval = -1; - - gpujpeg_set_default_parameters(&s->encoder_param); - - if(opts && opts[0] != '\0') { - char *fmt = strdup(opts); - if (!parse_fmt(s, fmt)) { - free(fmt); - delete s; - return NULL; - } - free(fmt); - } else { - printf("[JPEG] setting default encode parameters (quality: %d)\n", - s->encoder_param.quality - ); + try { + s = state_video_compress_jpeg::create(parent, opts); + } catch (...) { + return NULL; } - s->encoder = NULL; /* not yet configured */ - - module_init_default(&s->module_data); - s->module_data.cls = MODULE_CLASS_DATA; - s->module_data.priv_data = s; - s->module_data.deleter = jpeg_compress_done; - module_register(&s->module_data, parent); - - return &s->module_data; + return &s->m_module_data; } -shared_ptr jpeg_compress(struct module *mod, shared_ptr tx) +/** + * Performs actual compression with GPUJPEG. Reconfigures encoder if needed. + * @return compressed frame, {} if failed + */ +shared_ptr encoder_state::compress_step(shared_ptr tx) { - struct state_video_compress_jpeg *s = (struct state_video_compress_jpeg *) mod->priv_data; - int i; - unsigned char *line1, *line2; + gpujpeg_set_device(m_device_id); - unsigned int x; - - jpeg_check_messages(s); - - gpujpeg_set_device(cuda_devices[0]); - - if(!s->encoder) { - int ret; - printf("Initializing CUDA device %d...\n", cuda_devices[0]); - ret = gpujpeg_init_device(cuda_devices[0], TRUE); + // first run - initialize device + if (!m_encoder) { + log_msg(LOG_LEVEL_INFO, "Initializing CUDA device %d...\n", m_device_id); + int ret = gpujpeg_init_device(m_device_id, TRUE); if(ret != 0) { - fprintf(stderr, "[JPEG] initializing CUDA device %d failed.\n", cuda_devices[0]); - exit_uv(127); - return {}; - } - ret = configure_with(s, tx.get()); - if (!ret) { + log_msg(LOG_LEVEL_ERROR, "[JPEG] initializing CUDA device %d failed.\n", m_device_id); exit_uv(127); return {}; } } - struct video_desc desc; - desc = video_desc_from_frame(tx.get()); + struct video_desc desc = video_desc_from_frame(tx.get()); - // if format changed, reconfigure - if(!video_desc_eq_excl_param(s->saved_desc, desc, PARAM_INTERLACING)) { - cleanup_state(s); - int ret; - ret = configure_with(s, tx.get()); + // if format has changed, reconfigure + if(!video_desc_eq_excl_param(m_saved_desc, desc, PARAM_INTERLACING)) { + cleanup_state(); + int ret = configure_with(desc); if(!ret) { exit_uv(127); return NULL; } } - shared_ptr out = s->pool.get_frame(); + shared_ptr out = m_pool.get_frame(); - for (x = 0; x < tx->tile_count; ++x) { + for (unsigned int x = 0; x < out->tile_count; ++x) { struct tile *in_tile = vf_get_tile(tx.get(), x); struct tile *out_tile = vf_get_tile(out.get(), x); uint8_t *jpeg_enc_input_data; - if ((void *) s->decoder != (void *) memcpy) { - line1 = (unsigned char *) in_tile->data; - line2 = (unsigned char *) s->decoded.get(); + if ((void *) m_decoder != (void *) memcpy) { + unsigned char *line1 = (unsigned char *) in_tile->data; + unsigned char *line2 = (unsigned char *) m_decoded.get(); - for (i = 0; i < (int) in_tile->height; ++i) { - s->decoder(line2, line1, s->encoder_input_linesize, + for (int i = 0; i < (int) in_tile->height; ++i) { + m_decoder(line2, line1, m_encoder_input_linesize, 0, 8, 16); line1 += vc_get_linesize(in_tile->width, tx->color_spec); - line2 += s->encoder_input_linesize; + line2 += m_encoder_input_linesize; } - jpeg_enc_input_data = (uint8_t *) s->decoded.get(); + jpeg_enc_input_data = (uint8_t *) m_decoded.get(); } else { jpeg_enc_input_data = (uint8_t *) in_tile->data; } - /*if(s->interlaced_input) - vc_deinterlace((unsigned char *) s->decoded, s->encoder_input_linesize, - s->out->tiles[0].height);*/ - uint8_t *compressed; int size; int ret; - struct gpujpeg_encoder_input encoder_input; - gpujpeg_encoder_input_set_image(&encoder_input, jpeg_enc_input_data); - ret = gpujpeg_encoder_encode(s->encoder, &encoder_input, &compressed, &size); + ret = gpujpeg_encoder_input_copy_image(&encoder_input, m_encoder, jpeg_enc_input_data); + if (ret != 0) { + return {}; + } + if (x == out->tile_count - 1) { // optimalization - dispose frame as soon as + // it is not needed + tx = {}; + } + ret = gpujpeg_encoder_encode(m_encoder, &encoder_input, &compressed, &size); if(ret != 0) { return {}; @@ -370,29 +454,103 @@ shared_ptr jpeg_compress(struct module *mod, shared_ptrpriv_data; - - cleanup_state(s); - - delete s; + if (m_encoder) + gpujpeg_encoder_destroy(m_encoder); + m_encoder = NULL; } -static void cleanup_state(struct state_video_compress_jpeg *s) +void state_video_compress_jpeg::push(std::shared_ptr in_frame) { - if (s->encoder) - gpujpeg_encoder_destroy(s->encoder); - s->encoder = NULL; + + if (in_frame) { + in_frame->seq = m_in_seq++; + } + + if (!m_uses_worker_threads) { + m_workers[0]->compress(in_frame); + } else { + if (!in_frame) { + for (auto worker : m_workers) { // pass poison pill to all workers + worker->m_in_queue.push({}); + } + } else { + int index; + unique_lock lk(m_occupancy_lock); + // wait for/select not occupied worker + m_worker_finished.wait(lk, [this, &index]{ + index = 0; + for (auto worker : m_workers) { + if (!worker->m_occupied) return true; + index++; + } + return false; + }); + m_workers[index]->m_occupied = true; + lk.unlock(); + m_workers[index]->m_in_queue.push(in_frame); + } + } +} + +/** + * @brief returns compressed frame + * + * This function takes frames from state_video_compress_jpeg::m_out_queue. It checks + * sequential number of frame from queue - if it is in the same order that + * was sent to encoder, it is returned (according to state_video_compress_jpeg::m_out_seq). + * If not, it is stored in state_video_compress_jpeg::m_out_frames and this function + * further waits for frame with appropriate seq. Frames that was not successfully encoded + * have data_len member set to 0 and are skipped here. + */ +std::shared_ptr state_video_compress_jpeg::pop() +{ +start: + if (m_out_frames.find(m_out_seq) != m_out_frames.end()) { + auto frame = m_out_frames[m_out_seq]; + m_out_frames.erase(m_out_seq); + m_out_seq += 1; + if (frame->tiles[0].data_len == 0) { // was error processing that frame, skip + goto start; + } else { + return frame; + } + } else { + while (true) { + auto frame = m_out_queue.pop(); + if (!frame) { + if (++m_ended_count == m_workers.size()) { + return {}; + } else { + continue; + } + } + if (frame->seq == m_out_seq) { + m_out_seq += 1; + if (frame->tiles[0].data_len == 0) { // error - skip this frame + goto start; + } else { + return frame; + } + } else { + m_out_frames[frame->seq] = frame; + } + } + } } const struct video_compress_info jpeg_info = { "JPEG", jpeg_compress_init, - jpeg_compress, - NULL, NULL, NULL, + [](struct module *mod, std::shared_ptr in_frame) { + static_cast(mod->priv_data)->push(in_frame); + }, + [](struct module *mod) { + return static_cast(mod->priv_data)->pop(); + }, [] { return gpujpeg_init_device(cuda_devices[0], TRUE) == 0 ? list{ { "60", 60, [](const struct video_desc *d){return (long)(d->width * d->height * d->fps * 0.68);},