From a8bc00a58340ca4b3175b7c184f9538117a5492d Mon Sep 17 00:00:00 2001 From: Jan Frejlach Date: Thu, 30 Oct 2025 14:06:21 +0100 Subject: [PATCH] jpegxs compress: async api --- src/video_compress/jpegxs.cpp | 140 +++++++++++++++++++++++++++++----- 1 file changed, 119 insertions(+), 21 deletions(-) diff --git a/src/video_compress/jpegxs.cpp b/src/video_compress/jpegxs.cpp index 2f9c4b635..1a0f6fcd8 100644 --- a/src/video_compress/jpegxs.cpp +++ b/src/video_compress/jpegxs.cpp @@ -1,6 +1,11 @@ #include #include #include +#include +#include +#include +#include +#include #include #include "debug.h" @@ -8,11 +13,19 @@ #include "video.h" #include "video_compress.h" #include "utils/video_frame_pool.h" +#include "utils/synchronized_queue.h" #include "jpegxs/jpegxs_conv.h" #define MOD_NAME "[JPEG XS enc.] " using std::shared_ptr; +using std::condition_variable; +using std::mutex; +using std::thread; +using std::unique_lock; +using std::unique_ptr; +using std::vector; +using std::queue; namespace { struct state_video_compress_jpegxs; @@ -22,6 +35,14 @@ private: state_video_compress_jpegxs(struct module *parent, const char *opts); public: ~state_video_compress_jpegxs() { + { + std::unique_lock lk(mtx); + } + + if (worker.joinable()) { + worker.join(); + } + svt_jpeg_xs_encoder_close(&encoder); free(out_buf.buffer); free(in_buf.data_yuv[0]); @@ -37,10 +58,18 @@ public: void (*convert_to_planar)(const uint8_t *src, int width, int height, svt_jpeg_xs_image_buffer *dst); bool parse_fmt(char *fmt); static state_video_compress_jpegxs *create(struct module *parent, const char *opts); - void push(std::shared_ptr in_frame); - std::shared_ptr pop(); + void push(shared_ptr in_frame); + shared_ptr pop(); + + synchronized_queue, -1> in_queue; + synchronized_queue, -1> out_queue; + mutex mtx; + thread worker; }; +shared_ptr jpegxs_compress(void *state, shared_ptr frame); +static bool configure_with(struct state_video_compress_jpegxs *s, struct video_desc desc); + state_video_compress_jpegxs::state_video_compress_jpegxs(struct module *parent, const char *opts) { (void) parent; @@ -63,6 +92,70 @@ state_video_compress_jpegxs::state_video_compress_jpegxs(struct module *parent, } } +static void jpegxs_worker(state_video_compress_jpegxs *s) { + while (true) { + + auto frame = s->in_queue.pop(); + + if (!frame) { + s->out_queue.push(frame); + break; + } + + if (!s->configured) { + struct video_desc desc = video_desc_from_frame(frame.get()); + if (!configure_with(s, desc)) { + break;; + } + } + + struct tile *in_tile = vf_get_tile(frame.get(), 0); + int width = in_tile->width; + int height = in_tile->height; + + s->convert_to_planar((const uint8_t *) in_tile->data, width, height, &s->in_buf); + + svt_jpeg_xs_frame_t enc_input; + enc_input.bitstream = s->out_buf; + enc_input.image = s->in_buf; + enc_input.user_prv_ctx_ptr = NULL; + + SvtJxsErrorType_t err = svt_jpeg_xs_encoder_send_picture(&s->encoder, &enc_input, 1); + if (err != SvtJxsErrorNone) { + log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to send frame to encoder, error code: %x\n", err); + continue; + } + + svt_jpeg_xs_frame_t enc_output; + err = svt_jpeg_xs_encoder_get_packet(&s->encoder, &enc_output, 1); + if (err != SvtJxsErrorNone) { + log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to get encoded packet, error code: %x\n", err); + continue; + } + + shared_ptr out_frame = s->pool.get_frame(); + out_frame->color_spec = JPEG_XS; + out_frame->fps = frame->fps; + out_frame->interlacing = frame->interlacing; + out_frame->frame_type = frame->frame_type; + out_frame->tile_count = 1; + + struct tile *out_tile = vf_get_tile(out_frame.get(), 0); + out_tile->width = frame->tiles[0].width; + out_tile->height = frame->tiles[0].height; + size_t enc_size = enc_output.bitstream.used_size; + if (enc_size > out_tile->data_len) { + log_msg(LOG_LEVEL_WARNING, MOD_NAME "Encoded frame too big (%zu > %u)\n", enc_size, out_tile->data_len); + continue; + } + + out_tile->data_len = enc_size; + memcpy(out_tile->data, enc_output.bitstream.buffer, enc_size); + + s->out_queue.push(out_frame); + } +} + state_video_compress_jpegxs *state_video_compress_jpegxs::create(struct module *parent, const char *opts) { auto ret = new state_video_compress_jpegxs(parent, opts); @@ -236,6 +329,8 @@ jpegxs_compress_init(struct module *parent, const char *opts) { s = state_video_compress_jpegxs::create(parent, opts); + s->worker = std::thread(jpegxs_worker, s); + return s; } @@ -267,14 +362,14 @@ shared_ptr jpegxs_compress(void *state, shared_ptr fra SvtJxsErrorType_t err; err = svt_jpeg_xs_encoder_send_picture(&s->encoder, &enc_input, 1 /*blocking*/); if (err != SvtJxsErrorNone) { - log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to send frame to encoder\n"); + log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to send frame to encoder, error code: %x\n", err); return NULL; } svt_jpeg_xs_frame_t enc_output; err = svt_jpeg_xs_encoder_get_packet(&s->encoder, &enc_output, 1 /*blocking*/); if (err != SvtJxsErrorNone) { - log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to get encoded packet\n"); + log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to get encoded packet, error code: %x\n", err); return NULL; } @@ -300,23 +395,26 @@ shared_ptr jpegxs_compress(void *state, shared_ptr fra return out_frame; } -// void state_video_compress_jpegxs::push(std::shared_ptr in_frame) -// { -// // TODO -// } -// std::shared_ptr state_video_compress_jpegxs::pop() -// { -// // TODO -// } +void state_video_compress_jpegxs::push(shared_ptr frame) +{ + in_queue.push(frame); +} -// static auto jpegxs_compress_push(void *state, std::shared_ptr in_frame) { -// static_cast(state)->push(std::move(in_frame)); -// } +shared_ptr state_video_compress_jpegxs::pop() +{ + auto frame = out_queue.pop(); -// static auto jpegxs_compress_pop(void *state) { -// return static_cast(state)->pop(); -// } + return frame; +} + +static void jpegxs_compress_push(void *state, shared_ptr frame) { + static_cast(state)->push(std::move(frame)); +} + +static shared_ptr jpegxs_compress_pop(void *state) { + return static_cast(state)->pop(); +} static void jpegxs_compress_done(void *state) { @@ -332,10 +430,10 @@ static compress_module_info get_jpegxs_module_info() { const struct video_compress_info jpegxs_info = { jpegxs_compress_init, // jpegxs_compress_init jpegxs_compress_done, // jpegxs_compress_done - jpegxs_compress, // jpegxs_compress (synchronous) + NULL, // jpegxs_compress (synchronous) NULL, - NULL, // jpegxs_compress_push (asynchronous) - NULL, // jpegxs_compress_pop + jpegxs_compress_push, // jpegxs_compress_push (asynchronous) + jpegxs_compress_pop, // jpegxs_compress_pop NULL, NULL, get_jpegxs_module_info // get_jpegxs_module_info