J2K compress: use async API

This commit is contained in:
Martin Pulec
2018-06-12 15:48:24 +02:00
parent 36c97b3147
commit 5d45bada24

View File

@@ -66,77 +66,59 @@
using namespace std;
struct encoded_image {
char *data;
int len;
struct video_desc *desc;
};
struct state_video_compress_j2k {
struct module module_data;
struct cmpto_j2k_enc_ctx *context;
struct cmpto_j2k_enc_cfg *enc_settings;
pthread_cond_t frame_ready;
pthread_mutex_t lock;
queue<struct encoded_image *> *encoded_images;
pthread_t thread_id;
};
static void j2k_compressed_frame_dispose(struct video_frame *frame);
static void j2k_compress_done(struct module *mod);
static void *j2k_compress_worker(void *args);
static void *j2k_compress_worker(void *args)
/// @todo
/// "return {}" below means that UG quits (it is poison pill) - is it ok?
static std::shared_ptr<video_frame> j2k_compress_pop(struct module *state)
{
struct state_video_compress_j2k *s =
(struct state_video_compress_j2k *) args;
(struct state_video_compress_j2k *) state;
while (true) {
struct cmpto_j2k_enc_img *img;
int status;
CHECK_OK(cmpto_j2k_enc_ctx_get_encoded_img(
s->context,
1,
&img /* Set to NULL if encoder stopped */,
&status), "Encode image", continue);
if (status != CMPTO_J2K_ENC_IMG_OK) {
const char * encoding_error;
CHECK_OK(cmpto_j2k_enc_img_get_error(img, &encoding_error), "get error status", NOOP);
log_msg(LOG_LEVEL_ERROR, "Image encoding failed: %s\n", encoding_error);
// some better solution?
continue;
}
if (img == NULL) {
break;
}
struct video_desc *desc;
size_t len;
CHECK_OK(cmpto_j2k_enc_img_get_custom_data(img, (void **) &desc, &len),
"get custom data", continue);
size_t size;
void * ptr;
CHECK_OK(cmpto_j2k_enc_img_get_cstream(img, &ptr, &size),
"get cstream", continue);
struct encoded_image *encoded = (struct encoded_image *)
malloc(sizeof(struct encoded_image));
encoded->data = (char *) malloc(size);
memcpy(encoded->data, ptr, size);
encoded->len = size;
encoded->desc = (struct video_desc *) malloc(sizeof(struct video_frame));
memcpy(encoded->desc, desc, sizeof(struct video_frame));
encoded->desc->color_spec = codec_is_a_rgb(desc->color_spec) ? J2KR : J2K;
CHECK_OK(cmpto_j2k_enc_img_destroy(img), "Destroy image", NOOP);
pthread_mutex_lock(&s->lock);
s->encoded_images->push(encoded);
pthread_cond_signal(&s->frame_ready);
pthread_mutex_unlock(&s->lock);
struct cmpto_j2k_enc_img *img;
int status;
CHECK_OK(cmpto_j2k_enc_ctx_get_encoded_img(
s->context,
1,
&img /* Set to NULL if encoder stopped */,
&status), "Encode image", continue);
if (status != CMPTO_J2K_ENC_IMG_OK) {
const char * encoding_error;
CHECK_OK(cmpto_j2k_enc_img_get_error(img, &encoding_error), "get error status", NOOP);
log_msg(LOG_LEVEL_ERROR, "Image encoding failed: %s\n", encoding_error);
// some better solution?
return {};
}
return NULL;
if (!img) {
// pass poison pill
return {};
}
struct video_desc *desc;
size_t len;
CHECK_OK(cmpto_j2k_enc_img_get_custom_data(img, (void **) &desc, &len),
"get custom data", return {});
size_t size;
void * ptr;
CHECK_OK(cmpto_j2k_enc_img_get_cstream(img, &ptr, &size),
"get cstream", return {});
struct video_frame *out = vf_alloc_desc(*desc);
out->tiles[0].data_len = size;
out->tiles[0].data = (char *) malloc(size);
memcpy(out->tiles[0].data, ptr, size);
out->color_spec = codec_is_a_rgb(desc->color_spec) ? J2KR : J2K;
CHECK_OK(cmpto_j2k_enc_img_destroy(img), "Destroy image", NOOP);
out->dispose = j2k_compressed_frame_dispose;
return shared_ptr<video_frame>(out, out->dispose);
}
static struct module * j2k_compress_init(struct module *parent, const char *c_cfg)
@@ -198,8 +180,6 @@ static struct module * j2k_compress_init(struct module *parent, const char *c_cf
if (j2k_error != CMPTO_OK) {
goto error;
}
assert(pthread_cond_init(&s->frame_ready, NULL) == 0);
assert(pthread_mutex_init(&s->lock, NULL) == 0);
module_init_default(&s->module_data);
s->module_data.cls = MODULE_CLASS_DATA;
@@ -207,11 +187,6 @@ static struct module * j2k_compress_init(struct module *parent, const char *c_cf
s->module_data.deleter = j2k_compress_done;
module_register(&s->module_data, parent);
s->encoded_images = new queue<struct encoded_image *>();
assert(pthread_create(&s->thread_id, NULL, j2k_compress_worker,
(void *) s) == 0);
return &s->module_data;
error:
@@ -233,18 +208,20 @@ static void release_cstream(void * custom_data, size_t custom_data_size, const v
delete *(shared_ptr<video_frame> **) ((char *) custom_data + sizeof(struct video_desc));
}
static shared_ptr<video_frame> j2k_compress(struct module *mod, shared_ptr<video_frame> tx)
static void j2k_compress_push(struct module *state, std::shared_ptr<video_frame> tx)
{
struct state_video_compress_j2k *s =
(struct state_video_compress_j2k *) mod->priv_data;
(struct state_video_compress_j2k *) state;
struct cmpto_j2k_enc_img *img;
int j2k_error;
struct video_desc desc;
void *udata;
shared_ptr<video_frame> **ref;
if (tx == NULL)
goto get_frame_from_queue;
if (tx == NULL) {
CHECK_OK(cmpto_j2k_enc_ctx_stop(s->context), "stop", NOOP);
return;
}
assert(tx->tile_count == 1); // TODO
@@ -271,13 +248,13 @@ static shared_ptr<video_frame> j2k_compress(struct module *mod, shared_ptr<video
j2k_error = cmpto_j2k_enc_img_create(s->context, &img);
if (j2k_error != CMPTO_OK) {
return NULL;
return;
}
j2k_error = cmpto_j2k_enc_img_set_samples(img, tx->tiles[0].data, tx->tiles[0].data_len, release_cstream);
if (j2k_error != CMPTO_OK) {
return NULL;
return;
}
desc = video_desc_from_frame(tx.get());
@@ -287,7 +264,7 @@ static shared_ptr<video_frame> j2k_compress(struct module *mod, shared_ptr<video
sizeof(struct video_desc) + sizeof(shared_ptr<video_frame> *),
&udata);
if (j2k_error != CMPTO_OK) {
return NULL;
return;
}
memcpy(udata, &desc, sizeof(desc));
ref = (shared_ptr<video_frame> **)((char *) udata + sizeof(struct video_desc));
@@ -295,35 +272,10 @@ static shared_ptr<video_frame> j2k_compress(struct module *mod, shared_ptr<video
j2k_error = cmpto_j2k_enc_img_encode(img, s->enc_settings);
if (j2k_error != CMPTO_OK) {
return NULL;
}
get_frame_from_queue:
pthread_mutex_lock(&s->lock);
struct encoded_image *encoded_img = NULL;
if (s->encoded_images->size() > 0) {
encoded_img = s->encoded_images->front();
s->encoded_images->pop();
}
pthread_mutex_unlock(&s->lock);
if (encoded_img != NULL) {
struct video_frame *out = vf_alloc_desc(*(encoded_img->desc));
free(encoded_img->desc);
out->tiles[0].data = encoded_img->data;
out->tiles[0].data_len =
encoded_img->len;
out->dispose = j2k_compressed_frame_dispose;
free(encoded_img);
assert (out->tiles[0].data_len != 0);
return shared_ptr<video_frame>(out, out->dispose);
} else {
return {};
return;
}
}
static void j2k_compress_done(struct module *mod)
{
struct state_video_compress_j2k *s =
@@ -331,9 +283,6 @@ static void j2k_compress_done(struct module *mod)
cmpto_j2k_enc_cfg_destroy(s->enc_settings);
cmpto_j2k_enc_ctx_destroy(s->context);
pthread_cond_destroy(&s->frame_ready);
pthread_mutex_destroy(&s->lock);
delete s->encoded_images;
free(s);
}
@@ -341,10 +290,10 @@ static void j2k_compress_done(struct module *mod)
struct video_compress_info j2k_info = {
"j2k",
j2k_compress_init,
j2k_compress,
NULL,
NULL,
NULL,
j2k_compress_push,
j2k_compress_pop,
[] { return list<compress_preset>{}; }
};