From 7460563e98ce2a2ea0923257382d60be3d535575 Mon Sep 17 00:00:00 2001 From: Martin Piatka Date: Tue, 18 Jan 2022 12:41:24 +0100 Subject: [PATCH] hd_rum_translator: Reuse frames for ports with same compression --- src/hd-rum-translator/hd-rum-decompress.cpp | 133 +--------- src/hd-rum-translator/hd-rum-decompress.h | 11 +- src/hd-rum-translator/hd-rum-recompress.cpp | 268 ++++++++++++++++---- src/hd-rum-translator/hd-rum-recompress.h | 29 ++- src/hd-rum-translator/hd-rum-translator.cpp | 82 +++--- 5 files changed, 302 insertions(+), 221 deletions(-) diff --git a/src/hd-rum-translator/hd-rum-decompress.cpp b/src/hd-rum-translator/hd-rum-decompress.cpp index 13c13d21b..51a316cc6 100644 --- a/src/hd-rum-translator/hd-rum-decompress.cpp +++ b/src/hd-rum-translator/hd-rum-decompress.cpp @@ -1,10 +1,11 @@ /** * @file hd-rum-translator/hd-rum-decompress.cpp * @author Martin Pulec + * @author Martin Piatka * @brief decompressing part of transcoding reflector */ /* - * Copyright (c) 2013-2019 CESNET, z. s. p. o. + * Copyright (c) 2013-2022 CESNET, z. s. p. o. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -72,32 +73,11 @@ using namespace std; namespace hd_rum_decompress { struct state_transcoder_decompress : public frame_recv_delegate { - struct output_port_info { - inline output_port_info(void *s, bool a) : state(s), active(a) {} - void *state; - bool active; - }; - - struct message { - inline message(shared_ptr && f) : type(FRAME), frame(std::move(f)) {} - inline message() : type(QUIT) {} - inline message(int ri) : type(REMOVE_INDEX), remove_index(ri) {} - inline message(void *ns) : type(NEW_RECOMPRESS), new_recompress_state(ns) {} - inline message(message && original); - inline ~message(); - enum { FRAME, REMOVE_INDEX, NEW_RECOMPRESS, QUIT } type; - union { - shared_ptr frame; - int remove_index; - void *new_recompress_state; - }; - }; - - vector output_ports; - ultragrid_rtp_video_rxtx* video_rxtx; - queue received_frame; + struct state_recompress *recompress; + + std::queue> received_frame; mutex lock; condition_variable have_frame_cv; @@ -141,50 +121,14 @@ void state_transcoder_decompress::frame_arrived(struct video_frame *f, struct au fprintf(stderr, "Hd-rum-decompress max queue size (%d) reached!\n", MAX_QUEUE_SIZE); } frame_consumed_cv.wait(l, [this]{ return received_frame.size() < MAX_QUEUE_SIZE; }); - received_frame.push(shared_ptr(f, deleter)); + received_frame.emplace(f, deleter); l.unlock(); have_frame_cv.notify_one(); } - -inline state_transcoder_decompress::message::message(message && original) - : type(original.type) -{ - switch (original.type) { - case FRAME: - new (&frame) shared_ptr(std::move(original.frame)); - break; - case REMOVE_INDEX: - remove_index = original.remove_index; - break; - case NEW_RECOMPRESS: - new_recompress_state = original.new_recompress_state; - break; - case QUIT: - break; - } -} - -inline state_transcoder_decompress::message::~message() { - // shared_ptr has non-trivial destructor - if (type == FRAME) { - frame.~shared_ptr(); - } -} } // end of hd-rum-decompress namespace using namespace hd_rum_decompress; -void hd_rum_decompress_set_active(void *state, void *recompress_port, bool active) -{ - struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state; - - for (auto && port : s->output_ports) { - if (port.state == recompress_port) { - port.active = active; - } - } -} - ssize_t hd_rum_decompress_write(void *state, void *buf, size_t count) { struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state; @@ -200,26 +144,13 @@ void state_transcoder_decompress::worker() unique_lock l(lock); have_frame_cv.wait(l, [this]{return !received_frame.empty();}); - message msg(std::move(received_frame.front())); + auto frame = std::move(received_frame.front()); l.unlock(); - switch (msg.type) { - case message::QUIT: + if(!frame){ should_exit = true; - break; - case message::REMOVE_INDEX: - recompress_done(output_ports[msg.remove_index].state); - output_ports.erase(output_ports.begin() + msg.remove_index); - break; - case message::NEW_RECOMPRESS: - output_ports.emplace_back(msg.new_recompress_state, true); - break; - case message::FRAME: - for (unsigned int i = 0; i < output_ports.size(); ++i) { - if (output_ports[i].active) - recompress_process_async(output_ports[i].state, msg.frame); - } - break; + } else { + recompress_process_async(recompress, frame); } // we are removing from queue now because special messages are "accepted" when queue is empty @@ -230,7 +161,7 @@ void state_transcoder_decompress::worker() } } -void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter) +void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter, struct state_recompress *recompress) { struct state_transcoder_decompress *s; int force_ip_version = 0; @@ -238,6 +169,8 @@ void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf co s = new state_transcoder_decompress(); chrono::steady_clock::time_point start_time(chrono::steady_clock::now()); + s->recompress = recompress; + char cfg[128] = ""; int ret; @@ -320,11 +253,6 @@ void hd_rum_decompress_done(void *state) { s->worker_thread.join(); - // cleanup - for (unsigned int i = 0; i < s->output_ports.size(); ++i) { - recompress_done(s->output_ports[i].state); - } - display_put_frame(s->display, NULL, 0); s->display_thread.join(); s->video_rxtx->join(); @@ -336,38 +264,3 @@ void hd_rum_decompress_done(void *state) { delete s; } - -void hd_rum_decompress_remove_port(void *state, int index) { - struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state; - - unique_lock l(s->lock); - s->received_frame.push(index); - s->have_frame_cv.notify_one(); - s->frame_consumed_cv.wait(l, [s]{ return s->received_frame.size() == 0; }); -} - - -void hd_rum_decompress_append_port(void *state, void *recompress_state) -{ - struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state; - - unique_lock l(s->lock); - s->received_frame.push(recompress_state); - s->have_frame_cv.notify_one(); - s->frame_consumed_cv.wait(l, [s]{ return s->received_frame.size() == 0; }); -} - -int hd_rum_decompress_get_num_active_ports(void *state) -{ - struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state; - - int ret = 0; - for (auto && port : s->output_ports) { - if (port.active) { - ret += 1; - } - } - - return ret; -} - diff --git a/src/hd-rum-translator/hd-rum-decompress.h b/src/hd-rum-translator/hd-rum-decompress.h index 9d3785b7a..fc24822a6 100644 --- a/src/hd-rum-translator/hd-rum-decompress.h +++ b/src/hd-rum-translator/hd-rum-decompress.h @@ -1,10 +1,10 @@ /** * @file hd-rum-translator/hd-rum-decompress.h - * @author Martin Piatka * @author Martin Pulec + * @author Martin Piatka */ /* - * Copyright (c) 2014-2016 CESNET, z. s. p. o. + * Copyright (c) 2014-2022 CESNET, z. s. p. o. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,6 +41,7 @@ extern "C" { #endif struct module; +struct state_recompress; enum hd_rum_mode_t {NORMAL, BLEND, CONFERENCE}; struct hd_rum_output_conf{ @@ -49,12 +50,8 @@ struct hd_rum_output_conf{ }; ssize_t hd_rum_decompress_write(void *state, void *buf, size_t count); -void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter); +void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter, struct state_recompress *recompress); void hd_rum_decompress_done(void *state); -void hd_rum_decompress_set_active(void *decompress_state, void *recompress_state, bool active); -void hd_rum_decompress_remove_port(void *decompress_state, int index); -void hd_rum_decompress_append_port(void *decompress_state, void *recompress_state); -int hd_rum_decompress_get_num_active_ports(void *decompress_state); #ifdef __cplusplus } diff --git a/src/hd-rum-translator/hd-rum-recompress.cpp b/src/hd-rum-translator/hd-rum-recompress.cpp index 20746f902..822a4b301 100644 --- a/src/hd-rum-translator/hd-rum-recompress.cpp +++ b/src/hd-rum-translator/hd-rum-recompress.cpp @@ -1,13 +1,14 @@ /** * @file hd-rum-translator/hd-rum-recompress.cpp * @author Martin Pulec + * @author Martin Piatka * * Component of the transcoding reflector that takes an uncompressed frame, * recompresses it to another compression and sends it to destination * (therefore it wraps the whole sending part of UltraGrid). */ /* - * Copyright (c) 2013-2019 CESNET, z. s. p. o. + * Copyright (c) 2013-2022 CESNET, z. s. p. o. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -47,6 +48,8 @@ #include #include #include +#include +#include #include "hd-rum-translator/hd-rum-recompress.h" @@ -55,43 +58,74 @@ #include "host.h" #include "rtp/rtp.h" +#include "video_compress.h" + #include "video_rxtx/ultragrid_rtp.h" +namespace { +struct compress_state_deleter{ + void operator()(struct compress_state *s){ module_done(CAST_MODULE(s)); } +}; +} + using namespace std; -struct state_recompress { - state_recompress(unique_ptr && vr, string const & h, int tp) - : video_rxtx(std::move(vr)), host(h), t0(chrono::system_clock::now()), - frames(0), tx_port(tp) { - } +struct recompress_output_port { + recompress_output_port() = default; + recompress_output_port(struct module *parent, + std::string host, unsigned short rx_port, + unsigned short tx_port, int mtu, const char *fec, long long bitrate); - unique_ptr video_rxtx; - string host; - - chrono::system_clock::time_point t0; - int frames; + std::unique_ptr video_rxtx; + std::string host; int tx_port; + + std::chrono::steady_clock::time_point t0; + int frames; + + bool active; }; -void *recompress_init(struct module *parent, - const char *host, const char *compress, unsigned short rx_port, - unsigned short tx_port, int mtu, char *fec, long long bitrate) +struct recompress_worker_ctx { + std::string compress_cfg; + std::unique_ptr compress; + + std::mutex ports_mut; + std::vector ports; + + std::thread thread; +}; + +struct state_recompress { + struct module *parent; + std::mutex mut; + std::map workers; + std::vector> index_to_port; +}; + +recompress_output_port::recompress_output_port(struct module *parent, + std::string host, unsigned short rx_port, + unsigned short tx_port, int mtu, const char *fec, long long bitrate) : + host(std::move(host)), + tx_port(tx_port), + frames(0), + active(true) { int force_ip_version = 0; - chrono::steady_clock::time_point start_time(chrono::steady_clock::now()); + auto start_time = std::chrono::steady_clock::now(); - map params; + std::map params; // common params["parent"].ptr = parent; params["exporter"].ptr = NULL; - params["compression"].str = compress; + params["compression"].str = "none"; params["rxtx_mode"].i = MODE_SENDER; params["paused"].b = false; //RTP params["mtu"].i = mtu; - params["receiver"].str = host; + params["receiver"].str = this->host.c_str(); params["rx_port"].i = rx_port; params["tx_port"].i = tx_port; params["force_ip_version"].i = force_ip_version; @@ -106,66 +140,188 @@ void *recompress_init(struct module *parent, params["decoder_mode"].l = VIDEO_NORMAL; params["display_device"].ptr = NULL; - try { - auto rxtx = video_rxtx::create("ultragrid_rtp", params); - if (strchr(host, ':') != NULL) { - rxtx->m_port_id = string("[") + host + "]:" + to_string(tx_port); - } else { - rxtx->m_port_id = string(host) + ":" + to_string(tx_port); - } - - return new state_recompress( - decltype(state_recompress::video_rxtx)(dynamic_cast(rxtx)), - host, - tx_port - ); - } catch (...) { - return nullptr; + auto rxtx = video_rxtx::create("ultragrid_rtp", params); + if (host.find(':') != std::string::npos) { + rxtx->m_port_id = "[" + host + "]:" + to_string(tx_port); + } else { + rxtx->m_port_id = host + ":" + to_string(tx_port); } + + video_rxtx.reset(dynamic_cast(rxtx)); } -void recompress_process_async(void *state, shared_ptr frame) +static void recompress_port_write(recompress_output_port& port, shared_ptr frame) { - auto s = static_cast(state); + port.frames += 1; - s->frames += 1; + auto now = chrono::steady_clock::now(); - chrono::system_clock::time_point now = chrono::system_clock::now(); - double seconds = chrono::duration_cast(now - s->t0).count() / 1000000.0; + double seconds = chrono::duration_cast(now - port.t0).count(); if(seconds > 5) { - double fps = s->frames / seconds; + double fps = port.frames / seconds; log_msg(LOG_LEVEL_INFO, "[0x%08" PRIx32 "->%s:%d:0x%08" PRIx32 "] %d frames in %g seconds = %g FPS\n", frame->ssrc, - s->host.c_str(), s->tx_port, - s->video_rxtx->get_ssrc(), - s->frames, seconds, fps); - s->t0 = now; - s->frames = 0; + port.host.c_str(), port.tx_port, + port.video_rxtx->get_ssrc(), + port.frames, seconds, fps); + port.t0 = now; + port.frames = 0; } - s->video_rxtx->send(frame); + port.video_rxtx->send(frame); } -void recompress_assign_ssrc(void *state, uint32_t ssrc) -{ - // UNIMPLEMENTED NOW - UNUSED(state); - UNUSED(ssrc); +static void recompress_worker(struct recompress_worker_ctx *ctx){ + assert(ctx->compress); + + while(auto frame = compress_pop(ctx->compress.get())){ + if(!frame){ + //poisoned + break; + } + + std::lock_guard lock(ctx->ports_mut); + for(auto& port : ctx->ports){ + if(port.active) + recompress_port_write(port, frame); + } + } } -uint32_t recompress_get_ssrc(void *state) +static int move_port_to_worker(struct state_recompress *s, const std::string& compress, + recompress_output_port&& port) { - auto s = static_cast(state); + auto& worker = s->workers[compress]; + if(!worker.compress){ + worker.compress_cfg = compress; + compress_state *cmp = nullptr; + int ret = compress_init(s->parent, compress, &cmp); + if(ret != 0) + return -1; + worker.compress.reset(cmp); - return s->video_rxtx->get_ssrc(); + worker.thread = std::thread(recompress_worker, &worker); + } + + std::lock_guard lock(worker.ports_mut); + int index_in_worker = worker.ports.size(); + worker.ports.push_back(std::move(port)); + + return index_in_worker; } -void recompress_done(void *state) +int recompress_add_port(struct state_recompress *s, + const char *host, const char *compress, unsigned short rx_port, + unsigned short tx_port, int mtu, const char *fec, long long bitrate) { - auto s = static_cast(state); + auto port = recompress_output_port(s->parent, host, rx_port, tx_port, + mtu, fec, bitrate); - s->video_rxtx->join(); + std::lock_guard lock(s->mut); + int index_in_worker = move_port_to_worker(s, compress, std::move(port)); + int index_of_port = s->index_to_port.size(); + s->index_to_port.emplace_back(compress, index_in_worker); + + return index_of_port; +} + +static void extract_port(struct state_recompress *s, + const std::string& compress_cfg, int i, + recompress_output_port *move_to = nullptr) +{ + auto& worker = s->workers[compress_cfg]; + { + std::unique_lock lock(worker.ports_mut); + if(move_to) + *move_to = std::move(worker.ports[i]); + worker.ports.erase(worker.ports.begin() + i); + + if(worker.ports.empty()){ + //poison compress + compress_frame(worker.compress.get(), nullptr); + worker.thread.join(); + s->workers.erase(compress_cfg); + } + } + + for(auto& p : s->index_to_port){ + if(p.first == compress_cfg && p.second > i) + p.second--; + } +} + +void recompress_remove_port(struct state_recompress *s, int index){ + std::lock_guard lock(s->mut); + auto [compress_cfg, i] = s->index_to_port[index]; + + extract_port(s, compress_cfg, i); + s->index_to_port.erase(s->index_to_port.begin() + index); +} + +uint32_t recompress_get_port_ssrc(struct state_recompress *s, int idx){ + std::lock_guard lock(s->mut); + auto [compress_cfg, i] = s->index_to_port[idx]; + + std::lock_guard work_lock(s->workers[compress_cfg].ports_mut); + return s->workers[compress_cfg].ports[i].video_rxtx->get_ssrc(); +} + +void recompress_port_set_active(struct state_recompress *s, + int index, bool active) +{ + std::lock_guard lock(s->mut); + auto [compress_cfg, i] = s->index_to_port[index]; + + std::unique_lock worker_lock(s->workers[compress_cfg].ports_mut); + s->workers[compress_cfg].ports[i].active = active; +} + +static int worker_get_num_active_ports(const recompress_worker_ctx& worker){ + int ret = 0; + for(const auto& port : worker.ports){ + if(port.active) + ret++; + } + return ret; +} + +int recompress_get_num_active_ports(struct state_recompress *s){ + std::lock_guard lock(s->mut); + int ret = 0; + for(const auto& worker : s->workers){ + ret += worker_get_num_active_ports(worker.second); + } + + return ret; +} + +struct state_recompress *recompress_init(struct module *parent) { + auto state = new state_recompress(); + if(!state) + return nullptr; + + state->parent = parent; + + return state; +} + +void recompress_process_async(state_recompress *s, std::shared_ptr frame){ + std::lock_guard lock(s->mut); + for(const auto& worker : s->workers){ + if(worker_get_num_active_ports(worker.second) > 0) + compress_frame(worker.second.compress.get(), frame); + } +} + +void recompress_done(struct state_recompress *s) { + std::lock_guard lock(s->mut); + for(auto& worker : s->workers){ + //poison compress + compress_frame(worker.second.compress.get(), nullptr); + + worker.second.thread.join(); + } delete s; } diff --git a/src/hd-rum-translator/hd-rum-recompress.h b/src/hd-rum-translator/hd-rum-recompress.h index 27c4c424f..13e047a83 100644 --- a/src/hd-rum-translator/hd-rum-recompress.h +++ b/src/hd-rum-translator/hd-rum-recompress.h @@ -1,9 +1,10 @@ /** * @file hd-rum-translator/hd-rum/recompress.h * @author Martin Pulec + * @author Martin Piatka */ /* - * Copyright (c) 2013-2015 CESNET, z. s. p. o. + * Copyright (c) 2013-2022 CESNET, z. s. p. o. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,12 +49,24 @@ extern "C" { struct module; struct video_frame; -void *recompress_init(struct module *parent, const char *host, const char *compress, - unsigned short rx_port, unsigned short tx_port, int mtu, char *fec, - long long bitrate); -void recompress_assign_ssrc(void *state, uint32_t ssrc); -void recompress_done(void *state); -uint32_t recompress_get_ssrc(void *state); +struct state_recompress; + +struct state_recompress *recompress_init(struct module *parent); +void recompress_done(struct state_recompress *state); + +uint32_t recompress_get_port_ssrc(struct state_recompress *s, int idx); + + +int recompress_add_port(struct state_recompress *s, + const char *host, const char *compress, unsigned short rx_port, + unsigned short tx_port, int mtu, const char *fec, long long bitrate); + +void recompress_remove_port(struct state_recompress *s, int index); + +void recompress_port_set_active(struct state_recompress *s, + int index, bool active); + +int recompress_get_num_active_ports(struct state_recompress *s); #ifdef __cplusplus } @@ -62,5 +75,5 @@ uint32_t recompress_get_ssrc(void *state); #ifdef __cplusplus #include #include -void recompress_process_async(void *state, std::shared_ptr frame); +void recompress_process_async(state_recompress *state, std::shared_ptr frame); #endif diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index f023f4149..33eaa91ed 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -1,6 +1,7 @@ /** * @file hd-rum-translator/hd-rum-translator.cpp * @author Martin Pulec + * @author Martin Piatka * * Main part of transcoding reflector. This component provides a runtime * for the reflector. Componets are following: @@ -12,7 +13,7 @@ * compresses and sends frame to receiver */ /* - * Copyright (c) 2013-2021 CESNET, z. s. p. o. + * Copyright (c) 2013-2022 CESNET, z. s. p. o. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -104,7 +105,6 @@ struct replica { mod.priv_data = this; module_register(&mod, parent); type = replica::type_t::NONE; - recompress = nullptr; } ~replica() { @@ -125,7 +125,6 @@ struct replica { }; enum type_t type; socket_udp *sock; - void *recompress; }; struct hd_rum_translator_state { @@ -159,6 +158,7 @@ struct hd_rum_translator_state { vector replicas; void *decompress; + struct state_recompress *recompress; }; /* @@ -280,8 +280,8 @@ static struct response *change_replica_type(struct hd_rum_translator_state *s, return new_response(RESPONSE_BAD_REQUEST, NULL); } - hd_rum_decompress_set_active(s->decompress, r->recompress, - r->type == replica::type_t::RECOMPRESS); + recompress_port_set_active(s->recompress, index, + r->type == replica::type_t::RECOMPRESS); return new_response(RESPONSE_OK, NULL); } @@ -340,7 +340,7 @@ static void *writer(void *arg) } } if (index >= 0) { - hd_rum_decompress_remove_port(s->decompress, index); + recompress_remove_port(s->recompress, index); delete s->replicas[index]; s->replicas.erase(s->replicas.begin() + index); log_msg(LOG_LEVEL_NOTICE, "Deleted output port %d.\n", index); @@ -391,29 +391,36 @@ static void *writer(void *arg) if (compress) { rep->type = replica::type_t::RECOMPRESS; char *fec = NULL; - rep->recompress = recompress_init(&rep->mod, + int idx = recompress_add_port(s->recompress, host, compress, 0, tx_port, 1500, fec, RATE_UNLIMITED); - if (!rep->recompress) { + if (idx < 0) { delete s->replicas[s->replicas.size() - 1]; s->replicas.erase(s->replicas.end() - 1); log_msg(LOG_LEVEL_ERROR, "Unable to create recompress!\n"); } else { - hd_rum_decompress_append_port(s->decompress, rep->recompress); - hd_rum_decompress_set_active(s->decompress, rep->recompress, true); - log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_ssrc(rep->recompress)); + assert((unsigned) idx == s->replicas.size() - 1); + recompress_port_set_active(s->recompress, idx, true); + log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); } } else { rep->type = replica::type_t::USE_SOCK; char compress[] = "none"; char *fec = NULL; - rep->recompress = recompress_init(&rep->mod, + int idx = recompress_add_port(s->recompress, host, compress, 0, tx_port, 1500, fec, RATE_UNLIMITED); - hd_rum_decompress_append_port(s->decompress, rep->recompress); - hd_rum_decompress_set_active(s->decompress, rep->recompress, false); - log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port); + if (idx < 0) { + delete s->replicas[s->replicas.size() - 1]; + s->replicas.erase(s->replicas.end() - 1); + + log_msg(LOG_LEVEL_ERROR, "Unable to create recompress!\n"); + } else { + assert((unsigned) idx == s->replicas.size() - 1); + recompress_port_set_active(s->recompress, idx, false); + log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port); + } } } else { r = new_response(RESPONSE_BAD_REQUEST, NULL); @@ -429,7 +436,7 @@ static void *writer(void *arg) } // pass it for transcoding if needed - if (hd_rum_decompress_get_num_active_ports(s->decompress) > 0) { + if (recompress_get_num_active_ports(s->recompress) > 0) { ssize_t ret = hd_rum_decompress_write(s->decompress, s->qhead->buf, s->qhead->size); if (ret < 0) { perror("hd_rum_decompress_write"); @@ -689,6 +696,9 @@ static void hd_rum_translator_deinit(struct hd_rum_translator_state *s) { hd_rum_decompress_done(s->decompress); } + if(s->recompress) + recompress_done(s->recompress); + for (unsigned int i = 0; i < s->replicas.size(); i++) { delete s->replicas[i]; } @@ -799,8 +809,15 @@ int main(int argc, char **argv) control_start(state.control_state); } + //one shared recompressor, which manages compressions and sends to all hosts + state.recompress = recompress_init(&state.mod); + if(!state.recompress) { + EXIT(EXIT_FAIL_COMPRESS); + } + // we need only one shared receiver decompressor for all recompressing streams - state.decompress = hd_rum_decompress_init(&state.mod, params.out_conf, params.capture_filter); + state.decompress = hd_rum_decompress_init(&state.mod, params.out_conf, + params.capture_filter, state.recompress); if(!state.decompress) { EXIT(EXIT_FAIL_DECODER); } @@ -823,26 +840,31 @@ int main(int argc, char **argv) state.replicas[i]->type = replica::type_t::USE_SOCK; char compress[] = "none"; char *fec = NULL; - state.replicas[i]->recompress = recompress_init(&state.replicas[i]->mod, + int idx = recompress_add_port(state.recompress, params.hosts[i].addr, compress, 0, tx_port, params.hosts[i].mtu, fec, params.hosts[i].bitrate); - hd_rum_decompress_append_port(state.decompress, state.replicas[i]->recompress); - hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, false); - } else { - state.replicas[i]->type = replica::type_t::RECOMPRESS; - - state.replicas[i]->recompress = recompress_init(&state.replicas[i]->mod, - params.hosts[i].addr, params.hosts[i].compression, - 0, tx_port, params.hosts[i].mtu, params.hosts[i].fec, params.hosts[i].bitrate); - if(state.replicas[i]->recompress == 0) { + if(idx < 0) { fprintf(stderr, "Initializing output port '%s' failed!\n", params.hosts[i].addr); EXIT(EXIT_FAILURE); } - // we don't care about this clients, we only tell decompressor to + assert(idx == i); + recompress_port_set_active(state.recompress, i, false); + } else { + state.replicas[i]->type = replica::type_t::RECOMPRESS; + + int idx = recompress_add_port(state.recompress, + params.hosts[i].addr, params.hosts[i].compression, + 0, tx_port, params.hosts[i].mtu, params.hosts[i].fec, params.hosts[i].bitrate); + if(idx < 0) { + fprintf(stderr, "Initializing output port '%s' failed!\n", + params.hosts[i].addr); + EXIT(EXIT_FAILURE); + } + assert(idx == i); + // we don't care about this clients, we only tell recompressor to // take care about them - hd_rum_decompress_append_port(state.decompress, state.replicas[i]->recompress); - hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, true); + recompress_port_set_active(state.recompress, i, true); } }