JPEG: support for multiple GPUs

Requires new GPUJPEG.
This commit is contained in:
Martin Pulec
2016-01-13 11:22:18 +01:00
parent 18e6e42c65
commit 4a978beb2c
3 changed files with 361 additions and 201 deletions

Submodule gpujpeg updated: 7cd7cf9220...eb48870958

View File

@@ -198,6 +198,8 @@ struct video_frame {
struct fec_desc fec_params;
uint32_t ssrc;
uint32_t seq; ///< sequential number
};
/**

View File

@@ -3,7 +3,7 @@
* @author Martin Pulec <pulec@cesnet.cz>
*/
/*
* Copyright (c) 2011-2014 CESNET, z. s. p. o.
* Copyright (c) 2011-2016 CESNET, z. s. p. o.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -47,195 +47,310 @@
#include "module.h"
#include "lib_common.h"
#include "libgpujpeg/gpujpeg_encoder.h"
#include "utils/synchronized_queue.h"
#include "utils/video_frame_pool.h"
#include "video.h"
#include <memory>
#include <pthread.h>
#include <stdlib.h>
#include <map>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
namespace {
struct state_video_compress_jpeg;
struct state_video_compress_jpeg {
struct module module_data;
/**
* @brief state for single instance of encoder running on one GPU
*/
struct encoder_state {
private:
void cleanup_state();
shared_ptr<video_frame> compress_step(shared_ptr<video_frame> frame);
bool configure_with(struct video_desc desc);
struct gpujpeg_encoder *encoder;
struct gpujpeg_parameters encoder_param;
struct state_video_compress_jpeg *m_parent_state;
int m_device_id;
struct gpujpeg_encoder *m_encoder;
struct video_desc m_saved_desc;
video_frame_pool<default_data_allocator> m_pool;
decoder_t m_decoder;
bool m_rgb;
int m_encoder_input_linesize;
unique_ptr<char []> m_decoded;
public:
encoder_state(struct state_video_compress_jpeg *s, int device_id) :
m_parent_state(s), m_device_id(device_id), m_encoder{}, m_saved_desc{},
m_decoder{}, m_rgb{}, m_encoder_input_linesize{},
m_occupied{}
{
}
~encoder_state() {
cleanup_state();
}
void worker();
void compress(shared_ptr<video_frame> frame);
decoder_t decoder;
unique_ptr<char []> decoded;
unsigned int rgb:1;
codec_t color_spec;
struct video_desc saved_desc;
int restart_interval;
int encoder_input_linesize;
video_frame_pool<default_data_allocator> pool;
synchronized_queue<shared_ptr<struct video_frame>, 1> m_in_queue; ///< queue for uncompressed frames
thread m_thread_id;
bool m_occupied; ///< protected by state_video_compress_jpeg::m_occupancy_lock
};
static bool configure_with(struct state_video_compress_jpeg *s, struct video_frame *frame);
static void cleanup_state(struct state_video_compress_jpeg *s);
static void jpeg_check_messages(struct state_video_compress_jpeg *s);
static bool parse_fmt(struct state_video_compress_jpeg *s, char *fmt);
static void jpeg_compress_done(struct module *mod);
struct state_video_compress_jpeg {
private:
state_video_compress_jpeg(struct module *parent, const char *opts);
static bool configure_with(struct state_video_compress_jpeg *s, struct video_frame *frame)
{
unsigned int x;
vector<struct encoder_state *> m_workers;
bool m_uses_worker_threads; ///< true if cuda_devices_count > 1
s->saved_desc.width = frame->tiles[0].width;
s->saved_desc.height = frame->tiles[0].height;
s->saved_desc.color_spec = frame->color_spec;
s->saved_desc.fps = frame->fps;
s->saved_desc.interlacing = frame->interlacing;
s->saved_desc.tile_count = frame->tile_count;
map<uint32_t, shared_ptr<struct video_frame>> m_out_frames; ///< frames decoded out of order
uint32_t m_in_seq; ///< seq of next frame to be encoded
uint32_t m_out_seq; ///< seq of next frame to be decoded
for (x = 0; x < frame->tile_count; ++x) {
if (vf_get_tile(frame, x)->width != vf_get_tile(frame, 0)->width ||
vf_get_tile(frame, x)->width != vf_get_tile(frame, 0)->width) {
fprintf(stderr,"[JPEG] Requested to compress tiles of different size!");
return false;
size_t m_ended_count; ///< number of workers ended
public:
~state_video_compress_jpeg() {
if (m_uses_worker_threads) {
for (auto worker : m_workers) {
worker->m_thread_id.join();
}
}
for (auto worker : m_workers) {
delete worker;
}
}
static state_video_compress_jpeg *create(struct module *parent, const char *opts);
bool parse_fmt(char *fmt);
void push(std::shared_ptr<video_frame> in_frame);
std::shared_ptr<video_frame> pop();
struct module m_module_data;
int m_restart_interval;
int m_quality;
synchronized_queue<shared_ptr<struct video_frame>, 1> m_out_queue; ///< queue for compressed frames
mutex m_occupancy_lock;
condition_variable m_worker_finished;
};
/**
* @brief Compresses single frame
*
* This function is called either from within jpeg_compress_push() if only one
* CUDA device is used to avoid context switches that introduce some overhead
* (measured ~4% performance drop).
*
* When there are multiple CUDA devices to be used, it is called from encoder_state::worker().
*/
void encoder_state::compress(shared_ptr<video_frame> frame)
{
if (frame) {
uint32_t seq = frame->seq;
auto out = compress_step(move(frame));
if (out) {
out->seq = seq;
} else {
log_msg(LOG_LEVEL_WARNING, "[JPEG] Failed to encode frame!\n");
out = shared_ptr<video_frame>(vf_alloc(1), vf_free);
out->seq = seq;
}
m_parent_state->m_out_queue.push(out);
} else { // pass poison pill
m_parent_state->m_out_queue.push({});
}
}
/**
* Worker thread that is used if multiple CUDA devices are used - every device
* has its own thread.
*/
void encoder_state::worker() {
while (true) {
auto frame = m_in_queue.pop();
if (!frame) { // poison pill - pass and exit
m_parent_state->m_out_queue.push(frame);
break;
}
compress(move(frame));
unique_lock<mutex> lk(m_parent_state->m_occupancy_lock);
m_occupied = false;
lk.unlock();
m_parent_state->m_worker_finished.notify_one();
}
}
/**
* Configures GPUJPEG encoder with provided parameters.
*/
bool encoder_state::configure_with(struct video_desc desc)
{
struct video_desc compressed_desc;
compressed_desc = video_desc_from_frame(frame);
compressed_desc = desc;
compressed_desc.color_spec = JPEG;
bool try_slow = false;
s->decoder = get_decoder_from_to(frame->color_spec, UYVY, try_slow);
if (s->decoder) {
s->rgb = FALSE;
m_decoder = get_decoder_from_to(desc.color_spec, UYVY, try_slow);
if (m_decoder) {
m_rgb = false;
} else {
s->decoder = get_decoder_from_to(frame->color_spec, RGB, try_slow);
if (s->decoder) {
s->rgb = TRUE;
m_decoder = get_decoder_from_to(desc.color_spec, RGB, try_slow);
if (m_decoder) {
m_rgb = true;
} else {
fprintf(stderr, "[JPEG] Unsupported codec: %s\n",
get_codec_name(frame->color_spec));
log_msg(LOG_LEVEL_ERROR, "[JPEG] Unsupported codec: %s\n",
get_codec_name(desc.color_spec));
if (!try_slow) {
fprintf(stderr, "[JPEG] Slow decoders not tried!\n");
log_msg(LOG_LEVEL_WARNING, "[JPEG] Slow decoders not tried!\n");
}
return false;
}
}
s->encoder_param.verbose = 0;
s->encoder_param.segment_info = 1;
if(s->rgb) {
s->encoder_param.interleaved = 0;
s->encoder_param.restart_interval = s->restart_interval == -1 ? 8
: s->restart_interval;
/* LUMA */
s->encoder_param.sampling_factor[0].horizontal = 1;
s->encoder_param.sampling_factor[0].vertical = 1;
/* Cb and Cr */
s->encoder_param.sampling_factor[1].horizontal = 1;
s->encoder_param.sampling_factor[1].vertical = 1;
s->encoder_param.sampling_factor[2].horizontal = 1;
s->encoder_param.sampling_factor[2].vertical = 1;
struct gpujpeg_parameters encoder_param;
gpujpeg_set_default_parameters(&encoder_param);
if (m_parent_state->m_quality != -1) {
encoder_param.quality = m_parent_state->m_quality;
} else {
s->encoder_param.interleaved = 1;
s->encoder_param.restart_interval = s->restart_interval == -1 ? 4
: s->restart_interval;
/* LUMA */
s->encoder_param.sampling_factor[0].horizontal = 2;
s->encoder_param.sampling_factor[0].vertical = 1;
/* Cb and Cr */
s->encoder_param.sampling_factor[1].horizontal = 1;
s->encoder_param.sampling_factor[1].vertical = 1;
s->encoder_param.sampling_factor[2].horizontal = 1;
s->encoder_param.sampling_factor[2].vertical = 1;
log_msg(LOG_LEVEL_INFO, "[JPEG] setting default encode parameters (quality: %d)\n",
encoder_param.quality);
}
if (m_parent_state->m_restart_interval != -1) {
encoder_param.restart_interval = m_parent_state->m_restart_interval;
} else {
encoder_param.restart_interval = m_rgb ? 8 : 4;
}
encoder_param.verbose = 0;
encoder_param.segment_info = 1;
/* LUMA */
encoder_param.sampling_factor[0].vertical = 1;
encoder_param.sampling_factor[0].horizontal = m_rgb ? 1 : 2;
/* Cb and Cr */
encoder_param.sampling_factor[1].horizontal = 1;
encoder_param.sampling_factor[1].vertical = 1;
encoder_param.sampling_factor[2].horizontal = 1;
encoder_param.sampling_factor[2].vertical = 1;
encoder_param.interleaved = m_rgb ? 0 : 1;
struct gpujpeg_image_parameters param_image;
gpujpeg_image_set_default_parameters(&param_image);
param_image.width = frame->tiles[0].width;
param_image.height = frame->tiles[0].height;
param_image.width = desc.width;
param_image.height = desc.height;
param_image.comp_count = 3;
if(s->rgb) {
param_image.color_space = GPUJPEG_RGB;
param_image.sampling_factor = GPUJPEG_4_4_4;
} else {
param_image.color_space = GPUJPEG_YCBCR_BT709;
param_image.sampling_factor = GPUJPEG_4_2_2;
}
param_image.color_space = m_rgb ? GPUJPEG_RGB : GPUJPEG_YCBCR_BT709;
param_image.sampling_factor = m_rgb ? GPUJPEG_4_4_4 : GPUJPEG_4_2_2;
s->encoder = gpujpeg_encoder_create(&s->encoder_param, &param_image);
m_encoder = gpujpeg_encoder_create(&encoder_param, &param_image);
int data_len = frame->tiles[0].width * frame->tiles[0].height * 3;
s->pool.reconfigure(compressed_desc, data_len);
int data_len = desc.width * desc.height * 3;
m_pool.reconfigure(compressed_desc, data_len);
s->encoder_input_linesize = frame->tiles[0].width *
m_encoder_input_linesize = desc.width *
(param_image.color_space == GPUJPEG_RGB ? 3 : 2);
if(!s->encoder) {
fprintf(stderr, "[JPEG] Failed to create encoder.\n");
if(!m_encoder) {
log_msg(LOG_LEVEL_ERROR, "[JPEG] Failed to create GPUJPEG encoder.\n");
exit_uv(128);
return false;
}
s->decoded = unique_ptr<char []>(new char[4 * frame->tiles[0].width * frame->tiles[0].height]);
m_decoded = unique_ptr<char []>(new char[4 * desc.width * desc.height]);
m_saved_desc = desc;
return true;
}
static void jpeg_check_messages(struct state_video_compress_jpeg *s)
{
struct message *msg;
while ((msg = check_message(&s->module_data))) {
struct msg_change_compress_data *data =
(struct msg_change_compress_data *) msg;
struct response *r;
if (parse_fmt(s, data->config_string) == 0) {
r = new_response(RESPONSE_OK, NULL);
printf("[Libavcodec] Compression successfully changed.\n");
} else {
r = new_response(RESPONSE_BAD_REQUEST, NULL);
fprintf(stderr, "[Libavcodec] Unable to change compression!\n");
}
memset(&s->saved_desc, 0, sizeof(s->saved_desc));
free_message(msg, r);
}
}
static bool parse_fmt(struct state_video_compress_jpeg *s, char *fmt)
bool state_video_compress_jpeg::parse_fmt(char *fmt)
{
if(fmt && fmt[0] != '\0') {
char *tok, *save_ptr = NULL;
gpujpeg_set_default_parameters(&s->encoder_param);
tok = strtok_r(fmt, ":", &save_ptr);
s->encoder_param.quality = atoi(tok);
if (s->encoder_param.quality <= 0 || s->encoder_param.quality > 100) {
fprintf(stderr, "[JPEG] Error: Quality should be in interval [1-100]!\n");
m_quality = atoi(tok);
if (m_quality <= 0 || m_quality > 100) {
log_msg(LOG_LEVEL_ERROR, "[JPEG] Error: Quality should be in interval [1-100]!\n");
return false;
}
tok = strtok_r(NULL, ":", &save_ptr);
if(tok) {
s->restart_interval = atoi(tok);
if (s->restart_interval < 0) {
fprintf(stderr, "[JPEG] Error: Restart interval should be non-negative!\n");
m_restart_interval = atoi(tok);
if (m_restart_interval < 0) {
log_msg(LOG_LEVEL_ERROR, "[JPEG] Error: Restart interval should be non-negative!\n");
return false;
}
}
tok = strtok_r(NULL, ":", &save_ptr);
if(tok) {
fprintf(stderr, "[JPEG] WARNING: Trailing configuration parameters.\n");
log_msg(LOG_LEVEL_WARNING, "[JPEG] WARNING: Trailing configuration parameters.\n");
}
}
return true;
}
state_video_compress_jpeg::state_video_compress_jpeg(struct module *parent, const char *opts) :
m_uses_worker_threads{}, m_in_seq{},
m_out_seq{}, m_ended_count{},
m_module_data{}, m_restart_interval(-1), m_quality(-1)
{
if(opts && opts[0] != '\0') {
char *fmt = strdup(opts);
if (!parse_fmt(fmt)) {
free(fmt);
throw;
}
free(fmt);
}
module_init_default(&m_module_data);
m_module_data.cls = MODULE_CLASS_DATA;
m_module_data.priv_data = this;
m_module_data.deleter = [](struct module *mod) {
struct state_video_compress_jpeg *s = (struct state_video_compress_jpeg *) mod->priv_data;
delete s;
};
module_register(&m_module_data, parent);
}
/**
* Creates JPEG encoding state and creates JPEG workers for every GPU that
* will be used for compression (if cuda_devices_count > 1).
*/
state_video_compress_jpeg *state_video_compress_jpeg::create(struct module *parent, const char *opts) {
assert(cuda_devices_count > 0);
auto ret = new state_video_compress_jpeg(parent, opts);
for (unsigned int i = 0; i < cuda_devices_count; ++i) {
ret->m_workers.push_back(new encoder_state(ret, cuda_devices[i]));
}
if (cuda_devices_count > 1) {
ret->m_uses_worker_threads = true;
}
if (ret->m_uses_worker_threads) {
for (auto worker : ret->m_workers) {
worker->m_thread_id = thread(&encoder_state::worker, worker);
}
}
return ret;
}
struct module * jpeg_compress_init(struct module *parent, const char *opts)
{
struct state_video_compress_jpeg *s;
@@ -250,114 +365,83 @@ struct module * jpeg_compress_init(struct module *parent, const char *opts)
return &compress_init_noerr;
}
s = new state_video_compress_jpeg();
s->restart_interval = -1;
gpujpeg_set_default_parameters(&s->encoder_param);
if(opts && opts[0] != '\0') {
char *fmt = strdup(opts);
if (!parse_fmt(s, fmt)) {
free(fmt);
delete s;
return NULL;
}
free(fmt);
} else {
printf("[JPEG] setting default encode parameters (quality: %d)\n",
s->encoder_param.quality
);
try {
s = state_video_compress_jpeg::create(parent, opts);
} catch (...) {
return NULL;
}
s->encoder = NULL; /* not yet configured */
module_init_default(&s->module_data);
s->module_data.cls = MODULE_CLASS_DATA;
s->module_data.priv_data = s;
s->module_data.deleter = jpeg_compress_done;
module_register(&s->module_data, parent);
return &s->module_data;
return &s->m_module_data;
}
shared_ptr<video_frame> jpeg_compress(struct module *mod, shared_ptr<video_frame> tx)
/**
* Performs actual compression with GPUJPEG. Reconfigures encoder if needed.
* @return compressed frame, {} if failed
*/
shared_ptr<video_frame> encoder_state::compress_step(shared_ptr<video_frame> tx)
{
struct state_video_compress_jpeg *s = (struct state_video_compress_jpeg *) mod->priv_data;
int i;
unsigned char *line1, *line2;
gpujpeg_set_device(m_device_id);
unsigned int x;
jpeg_check_messages(s);
gpujpeg_set_device(cuda_devices[0]);
if(!s->encoder) {
int ret;
printf("Initializing CUDA device %d...\n", cuda_devices[0]);
ret = gpujpeg_init_device(cuda_devices[0], TRUE);
// first run - initialize device
if (!m_encoder) {
log_msg(LOG_LEVEL_INFO, "Initializing CUDA device %d...\n", m_device_id);
int ret = gpujpeg_init_device(m_device_id, TRUE);
if(ret != 0) {
fprintf(stderr, "[JPEG] initializing CUDA device %d failed.\n", cuda_devices[0]);
exit_uv(127);
return {};
}
ret = configure_with(s, tx.get());
if (!ret) {
log_msg(LOG_LEVEL_ERROR, "[JPEG] initializing CUDA device %d failed.\n", m_device_id);
exit_uv(127);
return {};
}
}
struct video_desc desc;
desc = video_desc_from_frame(tx.get());
struct video_desc desc = video_desc_from_frame(tx.get());
// if format changed, reconfigure
if(!video_desc_eq_excl_param(s->saved_desc, desc, PARAM_INTERLACING)) {
cleanup_state(s);
int ret;
ret = configure_with(s, tx.get());
// if format has changed, reconfigure
if(!video_desc_eq_excl_param(m_saved_desc, desc, PARAM_INTERLACING)) {
cleanup_state();
int ret = configure_with(desc);
if(!ret) {
exit_uv(127);
return NULL;
}
}
shared_ptr<video_frame> out = s->pool.get_frame();
shared_ptr<video_frame> out = m_pool.get_frame();
for (x = 0; x < tx->tile_count; ++x) {
for (unsigned int x = 0; x < out->tile_count; ++x) {
struct tile *in_tile = vf_get_tile(tx.get(), x);
struct tile *out_tile = vf_get_tile(out.get(), x);
uint8_t *jpeg_enc_input_data;
if ((void *) s->decoder != (void *) memcpy) {
line1 = (unsigned char *) in_tile->data;
line2 = (unsigned char *) s->decoded.get();
if ((void *) m_decoder != (void *) memcpy) {
unsigned char *line1 = (unsigned char *) in_tile->data;
unsigned char *line2 = (unsigned char *) m_decoded.get();
for (i = 0; i < (int) in_tile->height; ++i) {
s->decoder(line2, line1, s->encoder_input_linesize,
for (int i = 0; i < (int) in_tile->height; ++i) {
m_decoder(line2, line1, m_encoder_input_linesize,
0, 8, 16);
line1 += vc_get_linesize(in_tile->width, tx->color_spec);
line2 += s->encoder_input_linesize;
line2 += m_encoder_input_linesize;
}
jpeg_enc_input_data = (uint8_t *) s->decoded.get();
jpeg_enc_input_data = (uint8_t *) m_decoded.get();
} else {
jpeg_enc_input_data = (uint8_t *) in_tile->data;
}
/*if(s->interlaced_input)
vc_deinterlace((unsigned char *) s->decoded, s->encoder_input_linesize,
s->out->tiles[0].height);*/
uint8_t *compressed;
int size;
int ret;
struct gpujpeg_encoder_input encoder_input;
gpujpeg_encoder_input_set_image(&encoder_input, jpeg_enc_input_data);
ret = gpujpeg_encoder_encode(s->encoder, &encoder_input, &compressed, &size);
ret = gpujpeg_encoder_input_copy_image(&encoder_input, m_encoder, jpeg_enc_input_data);
if (ret != 0) {
return {};
}
if (x == out->tile_count - 1) { // optimalization - dispose frame as soon as
// it is not needed
tx = {};
}
ret = gpujpeg_encoder_encode(m_encoder, &encoder_input, &compressed, &size);
if(ret != 0) {
return {};
@@ -370,29 +454,103 @@ shared_ptr<video_frame> jpeg_compress(struct module *mod, shared_ptr<video_frame
return out;
}
static void jpeg_compress_done(struct module *mod)
void encoder_state::cleanup_state()
{
struct state_video_compress_jpeg *s = (struct state_video_compress_jpeg *) mod->priv_data;
cleanup_state(s);
delete s;
if (m_encoder)
gpujpeg_encoder_destroy(m_encoder);
m_encoder = NULL;
}
static void cleanup_state(struct state_video_compress_jpeg *s)
void state_video_compress_jpeg::push(std::shared_ptr<video_frame> in_frame)
{
if (s->encoder)
gpujpeg_encoder_destroy(s->encoder);
s->encoder = NULL;
if (in_frame) {
in_frame->seq = m_in_seq++;
}
if (!m_uses_worker_threads) {
m_workers[0]->compress(in_frame);
} else {
if (!in_frame) {
for (auto worker : m_workers) { // pass poison pill to all workers
worker->m_in_queue.push({});
}
} else {
int index;
unique_lock<mutex> lk(m_occupancy_lock);
// wait for/select not occupied worker
m_worker_finished.wait(lk, [this, &index]{
index = 0;
for (auto worker : m_workers) {
if (!worker->m_occupied) return true;
index++;
}
return false;
});
m_workers[index]->m_occupied = true;
lk.unlock();
m_workers[index]->m_in_queue.push(in_frame);
}
}
}
/**
* @brief returns compressed frame
*
* This function takes frames from state_video_compress_jpeg::m_out_queue. It checks
* sequential number of frame from queue - if it is in the same order that
* was sent to encoder, it is returned (according to state_video_compress_jpeg::m_out_seq).
* If not, it is stored in state_video_compress_jpeg::m_out_frames and this function
* further waits for frame with appropriate seq. Frames that was not successfully encoded
* have data_len member set to 0 and are skipped here.
*/
std::shared_ptr<video_frame> state_video_compress_jpeg::pop()
{
start:
if (m_out_frames.find(m_out_seq) != m_out_frames.end()) {
auto frame = m_out_frames[m_out_seq];
m_out_frames.erase(m_out_seq);
m_out_seq += 1;
if (frame->tiles[0].data_len == 0) { // was error processing that frame, skip
goto start;
} else {
return frame;
}
} else {
while (true) {
auto frame = m_out_queue.pop();
if (!frame) {
if (++m_ended_count == m_workers.size()) {
return {};
} else {
continue;
}
}
if (frame->seq == m_out_seq) {
m_out_seq += 1;
if (frame->tiles[0].data_len == 0) { // error - skip this frame
goto start;
} else {
return frame;
}
} else {
m_out_frames[frame->seq] = frame;
}
}
}
}
const struct video_compress_info jpeg_info = {
"JPEG",
jpeg_compress_init,
jpeg_compress,
NULL,
NULL,
NULL,
[](struct module *mod, std::shared_ptr<video_frame> in_frame) {
static_cast<struct state_video_compress_jpeg *>(mod->priv_data)->push(in_frame);
},
[](struct module *mod) {
return static_cast<struct state_video_compress_jpeg *>(mod->priv_data)->pop();
},
[] {
return gpujpeg_init_device(cuda_devices[0], TRUE) == 0 ? list<compress_preset>{
{ "60", 60, [](const struct video_desc *d){return (long)(d->width * d->height * d->fps * 0.68);},