hd_rum_translator: Reuse frames for ports with same compression

This commit is contained in:
Martin Piatka
2022-01-18 12:41:24 +01:00
parent 263a3e945f
commit 7460563e98
5 changed files with 302 additions and 221 deletions

View File

@@ -1,10 +1,11 @@
/**
* @file hd-rum-translator/hd-rum-decompress.cpp
* @author Martin Pulec <pulec@cesnet.cz>
* @author Martin Piatka <piatka@cesnet.cz>
* @brief decompressing part of transcoding reflector
*/
/*
* Copyright (c) 2013-2019 CESNET, z. s. p. o.
* Copyright (c) 2013-2022 CESNET, z. s. p. o.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -72,32 +73,11 @@ using namespace std;
namespace hd_rum_decompress {
struct state_transcoder_decompress : public frame_recv_delegate {
struct output_port_info {
inline output_port_info(void *s, bool a) : state(s), active(a) {}
void *state;
bool active;
};
struct message {
inline message(shared_ptr<video_frame> && f) : type(FRAME), frame(std::move(f)) {}
inline message() : type(QUIT) {}
inline message(int ri) : type(REMOVE_INDEX), remove_index(ri) {}
inline message(void *ns) : type(NEW_RECOMPRESS), new_recompress_state(ns) {}
inline message(message && original);
inline ~message();
enum { FRAME, REMOVE_INDEX, NEW_RECOMPRESS, QUIT } type;
union {
shared_ptr<video_frame> frame;
int remove_index;
void *new_recompress_state;
};
};
vector<output_port_info> output_ports;
ultragrid_rtp_video_rxtx* video_rxtx;
queue<message> received_frame;
struct state_recompress *recompress;
std::queue<std::shared_ptr<video_frame>> received_frame;
mutex lock;
condition_variable have_frame_cv;
@@ -141,50 +121,14 @@ void state_transcoder_decompress::frame_arrived(struct video_frame *f, struct au
fprintf(stderr, "Hd-rum-decompress max queue size (%d) reached!\n", MAX_QUEUE_SIZE);
}
frame_consumed_cv.wait(l, [this]{ return received_frame.size() < MAX_QUEUE_SIZE; });
received_frame.push(shared_ptr<video_frame>(f, deleter));
received_frame.emplace(f, deleter);
l.unlock();
have_frame_cv.notify_one();
}
inline state_transcoder_decompress::message::message(message && original)
: type(original.type)
{
switch (original.type) {
case FRAME:
new (&frame) shared_ptr<video_frame>(std::move(original.frame));
break;
case REMOVE_INDEX:
remove_index = original.remove_index;
break;
case NEW_RECOMPRESS:
new_recompress_state = original.new_recompress_state;
break;
case QUIT:
break;
}
}
inline state_transcoder_decompress::message::~message() {
// shared_ptr has non-trivial destructor
if (type == FRAME) {
frame.~shared_ptr<video_frame>();
}
}
} // end of hd-rum-decompress namespace
using namespace hd_rum_decompress;
void hd_rum_decompress_set_active(void *state, void *recompress_port, bool active)
{
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
for (auto && port : s->output_ports) {
if (port.state == recompress_port) {
port.active = active;
}
}
}
ssize_t hd_rum_decompress_write(void *state, void *buf, size_t count)
{
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
@@ -200,26 +144,13 @@ void state_transcoder_decompress::worker()
unique_lock<mutex> l(lock);
have_frame_cv.wait(l, [this]{return !received_frame.empty();});
message msg(std::move(received_frame.front()));
auto frame = std::move(received_frame.front());
l.unlock();
switch (msg.type) {
case message::QUIT:
if(!frame){
should_exit = true;
break;
case message::REMOVE_INDEX:
recompress_done(output_ports[msg.remove_index].state);
output_ports.erase(output_ports.begin() + msg.remove_index);
break;
case message::NEW_RECOMPRESS:
output_ports.emplace_back(msg.new_recompress_state, true);
break;
case message::FRAME:
for (unsigned int i = 0; i < output_ports.size(); ++i) {
if (output_ports[i].active)
recompress_process_async(output_ports[i].state, msg.frame);
}
break;
} else {
recompress_process_async(recompress, frame);
}
// we are removing from queue now because special messages are "accepted" when queue is empty
@@ -230,7 +161,7 @@ void state_transcoder_decompress::worker()
}
}
void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter)
void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter, struct state_recompress *recompress)
{
struct state_transcoder_decompress *s;
int force_ip_version = 0;
@@ -238,6 +169,8 @@ void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf co
s = new state_transcoder_decompress();
chrono::steady_clock::time_point start_time(chrono::steady_clock::now());
s->recompress = recompress;
char cfg[128] = "";
int ret;
@@ -320,11 +253,6 @@ void hd_rum_decompress_done(void *state) {
s->worker_thread.join();
// cleanup
for (unsigned int i = 0; i < s->output_ports.size(); ++i) {
recompress_done(s->output_ports[i].state);
}
display_put_frame(s->display, NULL, 0);
s->display_thread.join();
s->video_rxtx->join();
@@ -336,38 +264,3 @@ void hd_rum_decompress_done(void *state) {
delete s;
}
void hd_rum_decompress_remove_port(void *state, int index) {
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
unique_lock<mutex> l(s->lock);
s->received_frame.push(index);
s->have_frame_cv.notify_one();
s->frame_consumed_cv.wait(l, [s]{ return s->received_frame.size() == 0; });
}
void hd_rum_decompress_append_port(void *state, void *recompress_state)
{
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
unique_lock<mutex> l(s->lock);
s->received_frame.push(recompress_state);
s->have_frame_cv.notify_one();
s->frame_consumed_cv.wait(l, [s]{ return s->received_frame.size() == 0; });
}
int hd_rum_decompress_get_num_active_ports(void *state)
{
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
int ret = 0;
for (auto && port : s->output_ports) {
if (port.active) {
ret += 1;
}
}
return ret;
}

View File

@@ -1,10 +1,10 @@
/**
* @file hd-rum-translator/hd-rum-decompress.h
* @author Martin Piatka <piatka@cesnet.cz>
* @author Martin Pulec <martin.pulec@cesnet.cz>
* @author Martin Piatka <piatka@cesnet.cz>
*/
/*
* Copyright (c) 2014-2016 CESNET, z. s. p. o.
* Copyright (c) 2014-2022 CESNET, z. s. p. o.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,7 @@ extern "C" {
#endif
struct module;
struct state_recompress;
enum hd_rum_mode_t {NORMAL, BLEND, CONFERENCE};
struct hd_rum_output_conf{
@@ -49,12 +50,8 @@ struct hd_rum_output_conf{
};
ssize_t hd_rum_decompress_write(void *state, void *buf, size_t count);
void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter);
void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter, struct state_recompress *recompress);
void hd_rum_decompress_done(void *state);
void hd_rum_decompress_set_active(void *decompress_state, void *recompress_state, bool active);
void hd_rum_decompress_remove_port(void *decompress_state, int index);
void hd_rum_decompress_append_port(void *decompress_state, void *recompress_state);
int hd_rum_decompress_get_num_active_ports(void *decompress_state);
#ifdef __cplusplus
}

View File

@@ -1,13 +1,14 @@
/**
* @file hd-rum-translator/hd-rum-recompress.cpp
* @author Martin Pulec <pulec@cesnet.cz>
* @author Martin Piatka <piatka@cesnet.cz>
*
* Component of the transcoding reflector that takes an uncompressed frame,
* recompresses it to another compression and sends it to destination
* (therefore it wraps the whole sending part of UltraGrid).
*/
/*
* Copyright (c) 2013-2019 CESNET, z. s. p. o.
* Copyright (c) 2013-2022 CESNET, z. s. p. o.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -47,6 +48,8 @@
#include <cinttypes>
#include <chrono>
#include <memory>
#include <thread>
#include <string>
#include "hd-rum-translator/hd-rum-recompress.h"
@@ -55,43 +58,74 @@
#include "host.h"
#include "rtp/rtp.h"
#include "video_compress.h"
#include "video_rxtx/ultragrid_rtp.h"
namespace {
struct compress_state_deleter{
void operator()(struct compress_state *s){ module_done(CAST_MODULE(s)); }
};
}
using namespace std;
struct state_recompress {
state_recompress(unique_ptr<ultragrid_rtp_video_rxtx> && vr, string const & h, int tp)
: video_rxtx(std::move(vr)), host(h), t0(chrono::system_clock::now()),
frames(0), tx_port(tp) {
}
struct recompress_output_port {
recompress_output_port() = default;
recompress_output_port(struct module *parent,
std::string host, unsigned short rx_port,
unsigned short tx_port, int mtu, const char *fec, long long bitrate);
unique_ptr<ultragrid_rtp_video_rxtx> video_rxtx;
string host;
chrono::system_clock::time_point t0;
int frames;
std::unique_ptr<ultragrid_rtp_video_rxtx> video_rxtx;
std::string host;
int tx_port;
std::chrono::steady_clock::time_point t0;
int frames;
bool active;
};
void *recompress_init(struct module *parent,
const char *host, const char *compress, unsigned short rx_port,
unsigned short tx_port, int mtu, char *fec, long long bitrate)
struct recompress_worker_ctx {
std::string compress_cfg;
std::unique_ptr<compress_state, compress_state_deleter> compress;
std::mutex ports_mut;
std::vector<recompress_output_port> ports;
std::thread thread;
};
struct state_recompress {
struct module *parent;
std::mutex mut;
std::map<std::string, recompress_worker_ctx> workers;
std::vector<std::pair<std::string, int>> index_to_port;
};
recompress_output_port::recompress_output_port(struct module *parent,
std::string host, unsigned short rx_port,
unsigned short tx_port, int mtu, const char *fec, long long bitrate) :
host(std::move(host)),
tx_port(tx_port),
frames(0),
active(true)
{
int force_ip_version = 0;
chrono::steady_clock::time_point start_time(chrono::steady_clock::now());
auto start_time = std::chrono::steady_clock::now();
map<string, param_u> params;
std::map<std::string, param_u> params;
// common
params["parent"].ptr = parent;
params["exporter"].ptr = NULL;
params["compression"].str = compress;
params["compression"].str = "none";
params["rxtx_mode"].i = MODE_SENDER;
params["paused"].b = false;
//RTP
params["mtu"].i = mtu;
params["receiver"].str = host;
params["receiver"].str = this->host.c_str();
params["rx_port"].i = rx_port;
params["tx_port"].i = tx_port;
params["force_ip_version"].i = force_ip_version;
@@ -106,66 +140,188 @@ void *recompress_init(struct module *parent,
params["decoder_mode"].l = VIDEO_NORMAL;
params["display_device"].ptr = NULL;
try {
auto rxtx = video_rxtx::create("ultragrid_rtp", params);
if (strchr(host, ':') != NULL) {
rxtx->m_port_id = string("[") + host + "]:" + to_string(tx_port);
} else {
rxtx->m_port_id = string(host) + ":" + to_string(tx_port);
}
return new state_recompress(
decltype(state_recompress::video_rxtx)(dynamic_cast<ultragrid_rtp_video_rxtx *>(rxtx)),
host,
tx_port
);
} catch (...) {
return nullptr;
auto rxtx = video_rxtx::create("ultragrid_rtp", params);
if (host.find(':') != std::string::npos) {
rxtx->m_port_id = "[" + host + "]:" + to_string(tx_port);
} else {
rxtx->m_port_id = host + ":" + to_string(tx_port);
}
video_rxtx.reset(dynamic_cast<ultragrid_rtp_video_rxtx *>(rxtx));
}
void recompress_process_async(void *state, shared_ptr<video_frame> frame)
static void recompress_port_write(recompress_output_port& port, shared_ptr<video_frame> frame)
{
auto s = static_cast<state_recompress *>(state);
port.frames += 1;
s->frames += 1;
auto now = chrono::steady_clock::now();
chrono::system_clock::time_point now = chrono::system_clock::now();
double seconds = chrono::duration_cast<chrono::microseconds>(now - s->t0).count() / 1000000.0;
double seconds = chrono::duration_cast<chrono::seconds>(now - port.t0).count();
if(seconds > 5) {
double fps = s->frames / seconds;
double fps = port.frames / seconds;
log_msg(LOG_LEVEL_INFO, "[0x%08" PRIx32 "->%s:%d:0x%08" PRIx32 "] %d frames in %g seconds = %g FPS\n",
frame->ssrc,
s->host.c_str(), s->tx_port,
s->video_rxtx->get_ssrc(),
s->frames, seconds, fps);
s->t0 = now;
s->frames = 0;
port.host.c_str(), port.tx_port,
port.video_rxtx->get_ssrc(),
port.frames, seconds, fps);
port.t0 = now;
port.frames = 0;
}
s->video_rxtx->send(frame);
port.video_rxtx->send(frame);
}
void recompress_assign_ssrc(void *state, uint32_t ssrc)
{
// UNIMPLEMENTED NOW
UNUSED(state);
UNUSED(ssrc);
static void recompress_worker(struct recompress_worker_ctx *ctx){
assert(ctx->compress);
while(auto frame = compress_pop(ctx->compress.get())){
if(!frame){
//poisoned
break;
}
std::lock_guard<std::mutex> lock(ctx->ports_mut);
for(auto& port : ctx->ports){
if(port.active)
recompress_port_write(port, frame);
}
}
}
uint32_t recompress_get_ssrc(void *state)
static int move_port_to_worker(struct state_recompress *s, const std::string& compress,
recompress_output_port&& port)
{
auto s = static_cast<state_recompress *>(state);
auto& worker = s->workers[compress];
if(!worker.compress){
worker.compress_cfg = compress;
compress_state *cmp = nullptr;
int ret = compress_init(s->parent, compress, &cmp);
if(ret != 0)
return -1;
worker.compress.reset(cmp);
return s->video_rxtx->get_ssrc();
worker.thread = std::thread(recompress_worker, &worker);
}
std::lock_guard<std::mutex> lock(worker.ports_mut);
int index_in_worker = worker.ports.size();
worker.ports.push_back(std::move(port));
return index_in_worker;
}
void recompress_done(void *state)
int recompress_add_port(struct state_recompress *s,
const char *host, const char *compress, unsigned short rx_port,
unsigned short tx_port, int mtu, const char *fec, long long bitrate)
{
auto s = static_cast<state_recompress *>(state);
auto port = recompress_output_port(s->parent, host, rx_port, tx_port,
mtu, fec, bitrate);
s->video_rxtx->join();
std::lock_guard<std::mutex> lock(s->mut);
int index_in_worker = move_port_to_worker(s, compress, std::move(port));
int index_of_port = s->index_to_port.size();
s->index_to_port.emplace_back(compress, index_in_worker);
return index_of_port;
}
static void extract_port(struct state_recompress *s,
const std::string& compress_cfg, int i,
recompress_output_port *move_to = nullptr)
{
auto& worker = s->workers[compress_cfg];
{
std::unique_lock<std::mutex> lock(worker.ports_mut);
if(move_to)
*move_to = std::move(worker.ports[i]);
worker.ports.erase(worker.ports.begin() + i);
if(worker.ports.empty()){
//poison compress
compress_frame(worker.compress.get(), nullptr);
worker.thread.join();
s->workers.erase(compress_cfg);
}
}
for(auto& p : s->index_to_port){
if(p.first == compress_cfg && p.second > i)
p.second--;
}
}
void recompress_remove_port(struct state_recompress *s, int index){
std::lock_guard<std::mutex> lock(s->mut);
auto [compress_cfg, i] = s->index_to_port[index];
extract_port(s, compress_cfg, i);
s->index_to_port.erase(s->index_to_port.begin() + index);
}
uint32_t recompress_get_port_ssrc(struct state_recompress *s, int idx){
std::lock_guard<std::mutex> lock(s->mut);
auto [compress_cfg, i] = s->index_to_port[idx];
std::lock_guard<std::mutex> work_lock(s->workers[compress_cfg].ports_mut);
return s->workers[compress_cfg].ports[i].video_rxtx->get_ssrc();
}
void recompress_port_set_active(struct state_recompress *s,
int index, bool active)
{
std::lock_guard<std::mutex> lock(s->mut);
auto [compress_cfg, i] = s->index_to_port[index];
std::unique_lock<std::mutex> worker_lock(s->workers[compress_cfg].ports_mut);
s->workers[compress_cfg].ports[i].active = active;
}
static int worker_get_num_active_ports(const recompress_worker_ctx& worker){
int ret = 0;
for(const auto& port : worker.ports){
if(port.active)
ret++;
}
return ret;
}
int recompress_get_num_active_ports(struct state_recompress *s){
std::lock_guard<std::mutex> lock(s->mut);
int ret = 0;
for(const auto& worker : s->workers){
ret += worker_get_num_active_ports(worker.second);
}
return ret;
}
struct state_recompress *recompress_init(struct module *parent) {
auto state = new state_recompress();
if(!state)
return nullptr;
state->parent = parent;
return state;
}
void recompress_process_async(state_recompress *s, std::shared_ptr<video_frame> frame){
std::lock_guard<std::mutex> lock(s->mut);
for(const auto& worker : s->workers){
if(worker_get_num_active_ports(worker.second) > 0)
compress_frame(worker.second.compress.get(), frame);
}
}
void recompress_done(struct state_recompress *s) {
std::lock_guard<std::mutex> lock(s->mut);
for(auto& worker : s->workers){
//poison compress
compress_frame(worker.second.compress.get(), nullptr);
worker.second.thread.join();
}
delete s;
}

View File

@@ -1,9 +1,10 @@
/**
* @file hd-rum-translator/hd-rum/recompress.h
* @author Martin Pulec <martin.pulec@cesnet.cz>
* @author Martin Piatka <piatka@cesnet.cz>
*/
/*
* Copyright (c) 2013-2015 CESNET, z. s. p. o.
* Copyright (c) 2013-2022 CESNET, z. s. p. o.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -48,12 +49,24 @@ extern "C" {
struct module;
struct video_frame;
void *recompress_init(struct module *parent, const char *host, const char *compress,
unsigned short rx_port, unsigned short tx_port, int mtu, char *fec,
long long bitrate);
void recompress_assign_ssrc(void *state, uint32_t ssrc);
void recompress_done(void *state);
uint32_t recompress_get_ssrc(void *state);
struct state_recompress;
struct state_recompress *recompress_init(struct module *parent);
void recompress_done(struct state_recompress *state);
uint32_t recompress_get_port_ssrc(struct state_recompress *s, int idx);
int recompress_add_port(struct state_recompress *s,
const char *host, const char *compress, unsigned short rx_port,
unsigned short tx_port, int mtu, const char *fec, long long bitrate);
void recompress_remove_port(struct state_recompress *s, int index);
void recompress_port_set_active(struct state_recompress *s,
int index, bool active);
int recompress_get_num_active_ports(struct state_recompress *s);
#ifdef __cplusplus
}
@@ -62,5 +75,5 @@ uint32_t recompress_get_ssrc(void *state);
#ifdef __cplusplus
#include <memory>
#include <string>
void recompress_process_async(void *state, std::shared_ptr<video_frame> frame);
void recompress_process_async(state_recompress *state, std::shared_ptr<video_frame> frame);
#endif

View File

@@ -1,6 +1,7 @@
/**
* @file hd-rum-translator/hd-rum-translator.cpp
* @author Martin Pulec <pulec@cesnet.cz>
* @author Martin Piatka <piatka@cesnet.cz>
*
* Main part of transcoding reflector. This component provides a runtime
* for the reflector. Componets are following:
@@ -12,7 +13,7 @@
* compresses and sends frame to receiver
*/
/*
* Copyright (c) 2013-2021 CESNET, z. s. p. o.
* Copyright (c) 2013-2022 CESNET, z. s. p. o.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -104,7 +105,6 @@ struct replica {
mod.priv_data = this;
module_register(&mod, parent);
type = replica::type_t::NONE;
recompress = nullptr;
}
~replica() {
@@ -125,7 +125,6 @@ struct replica {
};
enum type_t type;
socket_udp *sock;
void *recompress;
};
struct hd_rum_translator_state {
@@ -159,6 +158,7 @@ struct hd_rum_translator_state {
vector<replica *> replicas;
void *decompress;
struct state_recompress *recompress;
};
/*
@@ -280,8 +280,8 @@ static struct response *change_replica_type(struct hd_rum_translator_state *s,
return new_response(RESPONSE_BAD_REQUEST, NULL);
}
hd_rum_decompress_set_active(s->decompress, r->recompress,
r->type == replica::type_t::RECOMPRESS);
recompress_port_set_active(s->recompress, index,
r->type == replica::type_t::RECOMPRESS);
return new_response(RESPONSE_OK, NULL);
}
@@ -340,7 +340,7 @@ static void *writer(void *arg)
}
}
if (index >= 0) {
hd_rum_decompress_remove_port(s->decompress, index);
recompress_remove_port(s->recompress, index);
delete s->replicas[index];
s->replicas.erase(s->replicas.begin() + index);
log_msg(LOG_LEVEL_NOTICE, "Deleted output port %d.\n", index);
@@ -391,29 +391,36 @@ static void *writer(void *arg)
if (compress) {
rep->type = replica::type_t::RECOMPRESS;
char *fec = NULL;
rep->recompress = recompress_init(&rep->mod,
int idx = recompress_add_port(s->recompress,
host, compress,
0, tx_port, 1500, fec, RATE_UNLIMITED);
if (!rep->recompress) {
if (idx < 0) {
delete s->replicas[s->replicas.size() - 1];
s->replicas.erase(s->replicas.end() - 1);
log_msg(LOG_LEVEL_ERROR, "Unable to create recompress!\n");
} else {
hd_rum_decompress_append_port(s->decompress, rep->recompress);
hd_rum_decompress_set_active(s->decompress, rep->recompress, true);
log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_ssrc(rep->recompress));
assert((unsigned) idx == s->replicas.size() - 1);
recompress_port_set_active(s->recompress, idx, true);
log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx));
}
} else {
rep->type = replica::type_t::USE_SOCK;
char compress[] = "none";
char *fec = NULL;
rep->recompress = recompress_init(&rep->mod,
int idx = recompress_add_port(s->recompress,
host, compress,
0, tx_port, 1500, fec, RATE_UNLIMITED);
hd_rum_decompress_append_port(s->decompress, rep->recompress);
hd_rum_decompress_set_active(s->decompress, rep->recompress, false);
log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port);
if (idx < 0) {
delete s->replicas[s->replicas.size() - 1];
s->replicas.erase(s->replicas.end() - 1);
log_msg(LOG_LEVEL_ERROR, "Unable to create recompress!\n");
} else {
assert((unsigned) idx == s->replicas.size() - 1);
recompress_port_set_active(s->recompress, idx, false);
log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port);
}
}
} else {
r = new_response(RESPONSE_BAD_REQUEST, NULL);
@@ -429,7 +436,7 @@ static void *writer(void *arg)
}
// pass it for transcoding if needed
if (hd_rum_decompress_get_num_active_ports(s->decompress) > 0) {
if (recompress_get_num_active_ports(s->recompress) > 0) {
ssize_t ret = hd_rum_decompress_write(s->decompress, s->qhead->buf, s->qhead->size);
if (ret < 0) {
perror("hd_rum_decompress_write");
@@ -689,6 +696,9 @@ static void hd_rum_translator_deinit(struct hd_rum_translator_state *s) {
hd_rum_decompress_done(s->decompress);
}
if(s->recompress)
recompress_done(s->recompress);
for (unsigned int i = 0; i < s->replicas.size(); i++) {
delete s->replicas[i];
}
@@ -799,8 +809,15 @@ int main(int argc, char **argv)
control_start(state.control_state);
}
//one shared recompressor, which manages compressions and sends to all hosts
state.recompress = recompress_init(&state.mod);
if(!state.recompress) {
EXIT(EXIT_FAIL_COMPRESS);
}
// we need only one shared receiver decompressor for all recompressing streams
state.decompress = hd_rum_decompress_init(&state.mod, params.out_conf, params.capture_filter);
state.decompress = hd_rum_decompress_init(&state.mod, params.out_conf,
params.capture_filter, state.recompress);
if(!state.decompress) {
EXIT(EXIT_FAIL_DECODER);
}
@@ -823,26 +840,31 @@ int main(int argc, char **argv)
state.replicas[i]->type = replica::type_t::USE_SOCK;
char compress[] = "none";
char *fec = NULL;
state.replicas[i]->recompress = recompress_init(&state.replicas[i]->mod,
int idx = recompress_add_port(state.recompress,
params.hosts[i].addr, compress,
0, tx_port, params.hosts[i].mtu, fec, params.hosts[i].bitrate);
hd_rum_decompress_append_port(state.decompress, state.replicas[i]->recompress);
hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, false);
} else {
state.replicas[i]->type = replica::type_t::RECOMPRESS;
state.replicas[i]->recompress = recompress_init(&state.replicas[i]->mod,
params.hosts[i].addr, params.hosts[i].compression,
0, tx_port, params.hosts[i].mtu, params.hosts[i].fec, params.hosts[i].bitrate);
if(state.replicas[i]->recompress == 0) {
if(idx < 0) {
fprintf(stderr, "Initializing output port '%s' failed!\n",
params.hosts[i].addr);
EXIT(EXIT_FAILURE);
}
// we don't care about this clients, we only tell decompressor to
assert(idx == i);
recompress_port_set_active(state.recompress, i, false);
} else {
state.replicas[i]->type = replica::type_t::RECOMPRESS;
int idx = recompress_add_port(state.recompress,
params.hosts[i].addr, params.hosts[i].compression,
0, tx_port, params.hosts[i].mtu, params.hosts[i].fec, params.hosts[i].bitrate);
if(idx < 0) {
fprintf(stderr, "Initializing output port '%s' failed!\n",
params.hosts[i].addr);
EXIT(EXIT_FAILURE);
}
assert(idx == i);
// we don't care about this clients, we only tell recompressor to
// take care about them
hd_rum_decompress_append_port(state.decompress, state.replicas[i]->recompress);
hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, true);
recompress_port_set_active(state.recompress, i, true);
}
}