From 5d45bada2437b5abc7f4944e72c0dbb306400dfc Mon Sep 17 00:00:00 2001 From: Martin Pulec Date: Tue, 12 Jun 2018 15:48:24 +0200 Subject: [PATCH] J2K compress: use async API --- src/video_compress/j2k.cpp | 153 +++++++++++++------------------------ 1 file changed, 51 insertions(+), 102 deletions(-) diff --git a/src/video_compress/j2k.cpp b/src/video_compress/j2k.cpp index dc736f612..a7f4a9a42 100644 --- a/src/video_compress/j2k.cpp +++ b/src/video_compress/j2k.cpp @@ -66,77 +66,59 @@ using namespace std; -struct encoded_image { - char *data; - int len; - struct video_desc *desc; -}; - struct state_video_compress_j2k { struct module module_data; struct cmpto_j2k_enc_ctx *context; struct cmpto_j2k_enc_cfg *enc_settings; - - pthread_cond_t frame_ready; - pthread_mutex_t lock; - queue *encoded_images; - - pthread_t thread_id; }; +static void j2k_compressed_frame_dispose(struct video_frame *frame); static void j2k_compress_done(struct module *mod); -static void *j2k_compress_worker(void *args); -static void *j2k_compress_worker(void *args) +/// @todo +/// "return {}" below means that UG quits (it is poison pill) - is it ok? +static std::shared_ptr j2k_compress_pop(struct module *state) { struct state_video_compress_j2k *s = - (struct state_video_compress_j2k *) args; + (struct state_video_compress_j2k *) state; - while (true) { - struct cmpto_j2k_enc_img *img; - int status; - CHECK_OK(cmpto_j2k_enc_ctx_get_encoded_img( - s->context, - 1, - &img /* Set to NULL if encoder stopped */, - &status), "Encode image", continue); - if (status != CMPTO_J2K_ENC_IMG_OK) { - const char * encoding_error; - CHECK_OK(cmpto_j2k_enc_img_get_error(img, &encoding_error), "get error status", NOOP); - log_msg(LOG_LEVEL_ERROR, "Image encoding failed: %s\n", encoding_error); - // some better solution? - continue; - } - - if (img == NULL) { - break; - } - struct video_desc *desc; - size_t len; - CHECK_OK(cmpto_j2k_enc_img_get_custom_data(img, (void **) &desc, &len), - "get custom data", continue); - size_t size; - void * ptr; - CHECK_OK(cmpto_j2k_enc_img_get_cstream(img, &ptr, &size), - "get cstream", continue); - struct encoded_image *encoded = (struct encoded_image *) - malloc(sizeof(struct encoded_image)); - encoded->data = (char *) malloc(size); - memcpy(encoded->data, ptr, size); - encoded->len = size; - encoded->desc = (struct video_desc *) malloc(sizeof(struct video_frame)); - memcpy(encoded->desc, desc, sizeof(struct video_frame)); - encoded->desc->color_spec = codec_is_a_rgb(desc->color_spec) ? J2KR : J2K; - CHECK_OK(cmpto_j2k_enc_img_destroy(img), "Destroy image", NOOP); - - pthread_mutex_lock(&s->lock); - s->encoded_images->push(encoded); - pthread_cond_signal(&s->frame_ready); - pthread_mutex_unlock(&s->lock); + struct cmpto_j2k_enc_img *img; + int status; + CHECK_OK(cmpto_j2k_enc_ctx_get_encoded_img( + s->context, + 1, + &img /* Set to NULL if encoder stopped */, + &status), "Encode image", continue); + if (status != CMPTO_J2K_ENC_IMG_OK) { + const char * encoding_error; + CHECK_OK(cmpto_j2k_enc_img_get_error(img, &encoding_error), "get error status", NOOP); + log_msg(LOG_LEVEL_ERROR, "Image encoding failed: %s\n", encoding_error); + // some better solution? + return {}; } - return NULL; + if (!img) { + // pass poison pill + return {}; + } + struct video_desc *desc; + size_t len; + CHECK_OK(cmpto_j2k_enc_img_get_custom_data(img, (void **) &desc, &len), + "get custom data", return {}); + size_t size; + void * ptr; + CHECK_OK(cmpto_j2k_enc_img_get_cstream(img, &ptr, &size), + "get cstream", return {}); + + struct video_frame *out = vf_alloc_desc(*desc); + out->tiles[0].data_len = size; + out->tiles[0].data = (char *) malloc(size); + memcpy(out->tiles[0].data, ptr, size); + out->color_spec = codec_is_a_rgb(desc->color_spec) ? J2KR : J2K; + CHECK_OK(cmpto_j2k_enc_img_destroy(img), "Destroy image", NOOP); + out->dispose = j2k_compressed_frame_dispose; + return shared_ptr(out, out->dispose); } static struct module * j2k_compress_init(struct module *parent, const char *c_cfg) @@ -198,8 +180,6 @@ static struct module * j2k_compress_init(struct module *parent, const char *c_cf if (j2k_error != CMPTO_OK) { goto error; } - assert(pthread_cond_init(&s->frame_ready, NULL) == 0); - assert(pthread_mutex_init(&s->lock, NULL) == 0); module_init_default(&s->module_data); s->module_data.cls = MODULE_CLASS_DATA; @@ -207,11 +187,6 @@ static struct module * j2k_compress_init(struct module *parent, const char *c_cf s->module_data.deleter = j2k_compress_done; module_register(&s->module_data, parent); - s->encoded_images = new queue(); - - assert(pthread_create(&s->thread_id, NULL, j2k_compress_worker, - (void *) s) == 0); - return &s->module_data; error: @@ -233,18 +208,20 @@ static void release_cstream(void * custom_data, size_t custom_data_size, const v delete *(shared_ptr **) ((char *) custom_data + sizeof(struct video_desc)); } -static shared_ptr j2k_compress(struct module *mod, shared_ptr tx) +static void j2k_compress_push(struct module *state, std::shared_ptr tx) { struct state_video_compress_j2k *s = - (struct state_video_compress_j2k *) mod->priv_data; + (struct state_video_compress_j2k *) state; struct cmpto_j2k_enc_img *img; int j2k_error; struct video_desc desc; void *udata; shared_ptr **ref; - if (tx == NULL) - goto get_frame_from_queue; + if (tx == NULL) { + CHECK_OK(cmpto_j2k_enc_ctx_stop(s->context), "stop", NOOP); + return; + } assert(tx->tile_count == 1); // TODO @@ -271,13 +248,13 @@ static shared_ptr j2k_compress(struct module *mod, shared_ptr