mirror of
https://github.com/outbackdingo/UltraGrid.git
synced 2026-03-28 07:01:46 +00:00
hd_rum_translator: Reuse frames for ports with same compression
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
/**
|
||||
* @file hd-rum-translator/hd-rum-decompress.cpp
|
||||
* @author Martin Pulec <pulec@cesnet.cz>
|
||||
* @author Martin Piatka <piatka@cesnet.cz>
|
||||
* @brief decompressing part of transcoding reflector
|
||||
*/
|
||||
/*
|
||||
* Copyright (c) 2013-2019 CESNET, z. s. p. o.
|
||||
* Copyright (c) 2013-2022 CESNET, z. s. p. o.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
@@ -72,32 +73,11 @@ using namespace std;
|
||||
|
||||
namespace hd_rum_decompress {
|
||||
struct state_transcoder_decompress : public frame_recv_delegate {
|
||||
struct output_port_info {
|
||||
inline output_port_info(void *s, bool a) : state(s), active(a) {}
|
||||
void *state;
|
||||
bool active;
|
||||
};
|
||||
|
||||
struct message {
|
||||
inline message(shared_ptr<video_frame> && f) : type(FRAME), frame(std::move(f)) {}
|
||||
inline message() : type(QUIT) {}
|
||||
inline message(int ri) : type(REMOVE_INDEX), remove_index(ri) {}
|
||||
inline message(void *ns) : type(NEW_RECOMPRESS), new_recompress_state(ns) {}
|
||||
inline message(message && original);
|
||||
inline ~message();
|
||||
enum { FRAME, REMOVE_INDEX, NEW_RECOMPRESS, QUIT } type;
|
||||
union {
|
||||
shared_ptr<video_frame> frame;
|
||||
int remove_index;
|
||||
void *new_recompress_state;
|
||||
};
|
||||
};
|
||||
|
||||
vector<output_port_info> output_ports;
|
||||
|
||||
ultragrid_rtp_video_rxtx* video_rxtx;
|
||||
|
||||
queue<message> received_frame;
|
||||
struct state_recompress *recompress;
|
||||
|
||||
std::queue<std::shared_ptr<video_frame>> received_frame;
|
||||
|
||||
mutex lock;
|
||||
condition_variable have_frame_cv;
|
||||
@@ -141,50 +121,14 @@ void state_transcoder_decompress::frame_arrived(struct video_frame *f, struct au
|
||||
fprintf(stderr, "Hd-rum-decompress max queue size (%d) reached!\n", MAX_QUEUE_SIZE);
|
||||
}
|
||||
frame_consumed_cv.wait(l, [this]{ return received_frame.size() < MAX_QUEUE_SIZE; });
|
||||
received_frame.push(shared_ptr<video_frame>(f, deleter));
|
||||
received_frame.emplace(f, deleter);
|
||||
l.unlock();
|
||||
have_frame_cv.notify_one();
|
||||
}
|
||||
|
||||
inline state_transcoder_decompress::message::message(message && original)
|
||||
: type(original.type)
|
||||
{
|
||||
switch (original.type) {
|
||||
case FRAME:
|
||||
new (&frame) shared_ptr<video_frame>(std::move(original.frame));
|
||||
break;
|
||||
case REMOVE_INDEX:
|
||||
remove_index = original.remove_index;
|
||||
break;
|
||||
case NEW_RECOMPRESS:
|
||||
new_recompress_state = original.new_recompress_state;
|
||||
break;
|
||||
case QUIT:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
inline state_transcoder_decompress::message::~message() {
|
||||
// shared_ptr has non-trivial destructor
|
||||
if (type == FRAME) {
|
||||
frame.~shared_ptr<video_frame>();
|
||||
}
|
||||
}
|
||||
} // end of hd-rum-decompress namespace
|
||||
|
||||
using namespace hd_rum_decompress;
|
||||
|
||||
void hd_rum_decompress_set_active(void *state, void *recompress_port, bool active)
|
||||
{
|
||||
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
|
||||
|
||||
for (auto && port : s->output_ports) {
|
||||
if (port.state == recompress_port) {
|
||||
port.active = active;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ssize_t hd_rum_decompress_write(void *state, void *buf, size_t count)
|
||||
{
|
||||
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
|
||||
@@ -200,26 +144,13 @@ void state_transcoder_decompress::worker()
|
||||
unique_lock<mutex> l(lock);
|
||||
have_frame_cv.wait(l, [this]{return !received_frame.empty();});
|
||||
|
||||
message msg(std::move(received_frame.front()));
|
||||
auto frame = std::move(received_frame.front());
|
||||
l.unlock();
|
||||
|
||||
switch (msg.type) {
|
||||
case message::QUIT:
|
||||
if(!frame){
|
||||
should_exit = true;
|
||||
break;
|
||||
case message::REMOVE_INDEX:
|
||||
recompress_done(output_ports[msg.remove_index].state);
|
||||
output_ports.erase(output_ports.begin() + msg.remove_index);
|
||||
break;
|
||||
case message::NEW_RECOMPRESS:
|
||||
output_ports.emplace_back(msg.new_recompress_state, true);
|
||||
break;
|
||||
case message::FRAME:
|
||||
for (unsigned int i = 0; i < output_ports.size(); ++i) {
|
||||
if (output_ports[i].active)
|
||||
recompress_process_async(output_ports[i].state, msg.frame);
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
recompress_process_async(recompress, frame);
|
||||
}
|
||||
|
||||
// we are removing from queue now because special messages are "accepted" when queue is empty
|
||||
@@ -230,7 +161,7 @@ void state_transcoder_decompress::worker()
|
||||
}
|
||||
}
|
||||
|
||||
void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter)
|
||||
void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter, struct state_recompress *recompress)
|
||||
{
|
||||
struct state_transcoder_decompress *s;
|
||||
int force_ip_version = 0;
|
||||
@@ -238,6 +169,8 @@ void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf co
|
||||
s = new state_transcoder_decompress();
|
||||
chrono::steady_clock::time_point start_time(chrono::steady_clock::now());
|
||||
|
||||
s->recompress = recompress;
|
||||
|
||||
char cfg[128] = "";
|
||||
int ret;
|
||||
|
||||
@@ -320,11 +253,6 @@ void hd_rum_decompress_done(void *state) {
|
||||
|
||||
s->worker_thread.join();
|
||||
|
||||
// cleanup
|
||||
for (unsigned int i = 0; i < s->output_ports.size(); ++i) {
|
||||
recompress_done(s->output_ports[i].state);
|
||||
}
|
||||
|
||||
display_put_frame(s->display, NULL, 0);
|
||||
s->display_thread.join();
|
||||
s->video_rxtx->join();
|
||||
@@ -336,38 +264,3 @@ void hd_rum_decompress_done(void *state) {
|
||||
|
||||
delete s;
|
||||
}
|
||||
|
||||
void hd_rum_decompress_remove_port(void *state, int index) {
|
||||
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
|
||||
|
||||
unique_lock<mutex> l(s->lock);
|
||||
s->received_frame.push(index);
|
||||
s->have_frame_cv.notify_one();
|
||||
s->frame_consumed_cv.wait(l, [s]{ return s->received_frame.size() == 0; });
|
||||
}
|
||||
|
||||
|
||||
void hd_rum_decompress_append_port(void *state, void *recompress_state)
|
||||
{
|
||||
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
|
||||
|
||||
unique_lock<mutex> l(s->lock);
|
||||
s->received_frame.push(recompress_state);
|
||||
s->have_frame_cv.notify_one();
|
||||
s->frame_consumed_cv.wait(l, [s]{ return s->received_frame.size() == 0; });
|
||||
}
|
||||
|
||||
int hd_rum_decompress_get_num_active_ports(void *state)
|
||||
{
|
||||
struct state_transcoder_decompress *s = (struct state_transcoder_decompress *) state;
|
||||
|
||||
int ret = 0;
|
||||
for (auto && port : s->output_ports) {
|
||||
if (port.active) {
|
||||
ret += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
/**
|
||||
* @file hd-rum-translator/hd-rum-decompress.h
|
||||
* @author Martin Piatka <piatka@cesnet.cz>
|
||||
* @author Martin Pulec <martin.pulec@cesnet.cz>
|
||||
* @author Martin Piatka <piatka@cesnet.cz>
|
||||
*/
|
||||
/*
|
||||
* Copyright (c) 2014-2016 CESNET, z. s. p. o.
|
||||
* Copyright (c) 2014-2022 CESNET, z. s. p. o.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
@@ -41,6 +41,7 @@ extern "C" {
|
||||
#endif
|
||||
|
||||
struct module;
|
||||
struct state_recompress;
|
||||
|
||||
enum hd_rum_mode_t {NORMAL, BLEND, CONFERENCE};
|
||||
struct hd_rum_output_conf{
|
||||
@@ -49,12 +50,8 @@ struct hd_rum_output_conf{
|
||||
};
|
||||
|
||||
ssize_t hd_rum_decompress_write(void *state, void *buf, size_t count);
|
||||
void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter);
|
||||
void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf conf, const char *capture_filter, struct state_recompress *recompress);
|
||||
void hd_rum_decompress_done(void *state);
|
||||
void hd_rum_decompress_set_active(void *decompress_state, void *recompress_state, bool active);
|
||||
void hd_rum_decompress_remove_port(void *decompress_state, int index);
|
||||
void hd_rum_decompress_append_port(void *decompress_state, void *recompress_state);
|
||||
int hd_rum_decompress_get_num_active_ports(void *decompress_state);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
/**
|
||||
* @file hd-rum-translator/hd-rum-recompress.cpp
|
||||
* @author Martin Pulec <pulec@cesnet.cz>
|
||||
* @author Martin Piatka <piatka@cesnet.cz>
|
||||
*
|
||||
* Component of the transcoding reflector that takes an uncompressed frame,
|
||||
* recompresses it to another compression and sends it to destination
|
||||
* (therefore it wraps the whole sending part of UltraGrid).
|
||||
*/
|
||||
/*
|
||||
* Copyright (c) 2013-2019 CESNET, z. s. p. o.
|
||||
* Copyright (c) 2013-2022 CESNET, z. s. p. o.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
@@ -47,6 +48,8 @@
|
||||
#include <cinttypes>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <string>
|
||||
|
||||
|
||||
#include "hd-rum-translator/hd-rum-recompress.h"
|
||||
@@ -55,43 +58,74 @@
|
||||
#include "host.h"
|
||||
#include "rtp/rtp.h"
|
||||
|
||||
#include "video_compress.h"
|
||||
|
||||
#include "video_rxtx/ultragrid_rtp.h"
|
||||
|
||||
namespace {
|
||||
struct compress_state_deleter{
|
||||
void operator()(struct compress_state *s){ module_done(CAST_MODULE(s)); }
|
||||
};
|
||||
}
|
||||
|
||||
using namespace std;
|
||||
|
||||
struct state_recompress {
|
||||
state_recompress(unique_ptr<ultragrid_rtp_video_rxtx> && vr, string const & h, int tp)
|
||||
: video_rxtx(std::move(vr)), host(h), t0(chrono::system_clock::now()),
|
||||
frames(0), tx_port(tp) {
|
||||
}
|
||||
struct recompress_output_port {
|
||||
recompress_output_port() = default;
|
||||
recompress_output_port(struct module *parent,
|
||||
std::string host, unsigned short rx_port,
|
||||
unsigned short tx_port, int mtu, const char *fec, long long bitrate);
|
||||
|
||||
unique_ptr<ultragrid_rtp_video_rxtx> video_rxtx;
|
||||
string host;
|
||||
|
||||
chrono::system_clock::time_point t0;
|
||||
int frames;
|
||||
std::unique_ptr<ultragrid_rtp_video_rxtx> video_rxtx;
|
||||
std::string host;
|
||||
int tx_port;
|
||||
|
||||
std::chrono::steady_clock::time_point t0;
|
||||
int frames;
|
||||
|
||||
bool active;
|
||||
};
|
||||
|
||||
void *recompress_init(struct module *parent,
|
||||
const char *host, const char *compress, unsigned short rx_port,
|
||||
unsigned short tx_port, int mtu, char *fec, long long bitrate)
|
||||
struct recompress_worker_ctx {
|
||||
std::string compress_cfg;
|
||||
std::unique_ptr<compress_state, compress_state_deleter> compress;
|
||||
|
||||
std::mutex ports_mut;
|
||||
std::vector<recompress_output_port> ports;
|
||||
|
||||
std::thread thread;
|
||||
};
|
||||
|
||||
struct state_recompress {
|
||||
struct module *parent;
|
||||
std::mutex mut;
|
||||
std::map<std::string, recompress_worker_ctx> workers;
|
||||
std::vector<std::pair<std::string, int>> index_to_port;
|
||||
};
|
||||
|
||||
recompress_output_port::recompress_output_port(struct module *parent,
|
||||
std::string host, unsigned short rx_port,
|
||||
unsigned short tx_port, int mtu, const char *fec, long long bitrate) :
|
||||
host(std::move(host)),
|
||||
tx_port(tx_port),
|
||||
frames(0),
|
||||
active(true)
|
||||
{
|
||||
int force_ip_version = 0;
|
||||
chrono::steady_clock::time_point start_time(chrono::steady_clock::now());
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
|
||||
map<string, param_u> params;
|
||||
std::map<std::string, param_u> params;
|
||||
|
||||
// common
|
||||
params["parent"].ptr = parent;
|
||||
params["exporter"].ptr = NULL;
|
||||
params["compression"].str = compress;
|
||||
params["compression"].str = "none";
|
||||
params["rxtx_mode"].i = MODE_SENDER;
|
||||
params["paused"].b = false;
|
||||
|
||||
//RTP
|
||||
params["mtu"].i = mtu;
|
||||
params["receiver"].str = host;
|
||||
params["receiver"].str = this->host.c_str();
|
||||
params["rx_port"].i = rx_port;
|
||||
params["tx_port"].i = tx_port;
|
||||
params["force_ip_version"].i = force_ip_version;
|
||||
@@ -106,66 +140,188 @@ void *recompress_init(struct module *parent,
|
||||
params["decoder_mode"].l = VIDEO_NORMAL;
|
||||
params["display_device"].ptr = NULL;
|
||||
|
||||
try {
|
||||
auto rxtx = video_rxtx::create("ultragrid_rtp", params);
|
||||
if (strchr(host, ':') != NULL) {
|
||||
rxtx->m_port_id = string("[") + host + "]:" + to_string(tx_port);
|
||||
} else {
|
||||
rxtx->m_port_id = string(host) + ":" + to_string(tx_port);
|
||||
}
|
||||
|
||||
return new state_recompress(
|
||||
decltype(state_recompress::video_rxtx)(dynamic_cast<ultragrid_rtp_video_rxtx *>(rxtx)),
|
||||
host,
|
||||
tx_port
|
||||
);
|
||||
} catch (...) {
|
||||
return nullptr;
|
||||
auto rxtx = video_rxtx::create("ultragrid_rtp", params);
|
||||
if (host.find(':') != std::string::npos) {
|
||||
rxtx->m_port_id = "[" + host + "]:" + to_string(tx_port);
|
||||
} else {
|
||||
rxtx->m_port_id = host + ":" + to_string(tx_port);
|
||||
}
|
||||
|
||||
video_rxtx.reset(dynamic_cast<ultragrid_rtp_video_rxtx *>(rxtx));
|
||||
}
|
||||
|
||||
void recompress_process_async(void *state, shared_ptr<video_frame> frame)
|
||||
static void recompress_port_write(recompress_output_port& port, shared_ptr<video_frame> frame)
|
||||
{
|
||||
auto s = static_cast<state_recompress *>(state);
|
||||
port.frames += 1;
|
||||
|
||||
s->frames += 1;
|
||||
auto now = chrono::steady_clock::now();
|
||||
|
||||
chrono::system_clock::time_point now = chrono::system_clock::now();
|
||||
double seconds = chrono::duration_cast<chrono::microseconds>(now - s->t0).count() / 1000000.0;
|
||||
double seconds = chrono::duration_cast<chrono::seconds>(now - port.t0).count();
|
||||
if(seconds > 5) {
|
||||
double fps = s->frames / seconds;
|
||||
double fps = port.frames / seconds;
|
||||
log_msg(LOG_LEVEL_INFO, "[0x%08" PRIx32 "->%s:%d:0x%08" PRIx32 "] %d frames in %g seconds = %g FPS\n",
|
||||
frame->ssrc,
|
||||
s->host.c_str(), s->tx_port,
|
||||
s->video_rxtx->get_ssrc(),
|
||||
s->frames, seconds, fps);
|
||||
s->t0 = now;
|
||||
s->frames = 0;
|
||||
port.host.c_str(), port.tx_port,
|
||||
port.video_rxtx->get_ssrc(),
|
||||
port.frames, seconds, fps);
|
||||
port.t0 = now;
|
||||
port.frames = 0;
|
||||
}
|
||||
|
||||
s->video_rxtx->send(frame);
|
||||
port.video_rxtx->send(frame);
|
||||
}
|
||||
|
||||
void recompress_assign_ssrc(void *state, uint32_t ssrc)
|
||||
{
|
||||
// UNIMPLEMENTED NOW
|
||||
UNUSED(state);
|
||||
UNUSED(ssrc);
|
||||
static void recompress_worker(struct recompress_worker_ctx *ctx){
|
||||
assert(ctx->compress);
|
||||
|
||||
while(auto frame = compress_pop(ctx->compress.get())){
|
||||
if(!frame){
|
||||
//poisoned
|
||||
break;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(ctx->ports_mut);
|
||||
for(auto& port : ctx->ports){
|
||||
if(port.active)
|
||||
recompress_port_write(port, frame);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t recompress_get_ssrc(void *state)
|
||||
static int move_port_to_worker(struct state_recompress *s, const std::string& compress,
|
||||
recompress_output_port&& port)
|
||||
{
|
||||
auto s = static_cast<state_recompress *>(state);
|
||||
auto& worker = s->workers[compress];
|
||||
if(!worker.compress){
|
||||
worker.compress_cfg = compress;
|
||||
compress_state *cmp = nullptr;
|
||||
int ret = compress_init(s->parent, compress, &cmp);
|
||||
if(ret != 0)
|
||||
return -1;
|
||||
worker.compress.reset(cmp);
|
||||
|
||||
return s->video_rxtx->get_ssrc();
|
||||
worker.thread = std::thread(recompress_worker, &worker);
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(worker.ports_mut);
|
||||
int index_in_worker = worker.ports.size();
|
||||
worker.ports.push_back(std::move(port));
|
||||
|
||||
return index_in_worker;
|
||||
}
|
||||
|
||||
void recompress_done(void *state)
|
||||
int recompress_add_port(struct state_recompress *s,
|
||||
const char *host, const char *compress, unsigned short rx_port,
|
||||
unsigned short tx_port, int mtu, const char *fec, long long bitrate)
|
||||
{
|
||||
auto s = static_cast<state_recompress *>(state);
|
||||
auto port = recompress_output_port(s->parent, host, rx_port, tx_port,
|
||||
mtu, fec, bitrate);
|
||||
|
||||
s->video_rxtx->join();
|
||||
std::lock_guard<std::mutex> lock(s->mut);
|
||||
int index_in_worker = move_port_to_worker(s, compress, std::move(port));
|
||||
|
||||
int index_of_port = s->index_to_port.size();
|
||||
s->index_to_port.emplace_back(compress, index_in_worker);
|
||||
|
||||
return index_of_port;
|
||||
}
|
||||
|
||||
static void extract_port(struct state_recompress *s,
|
||||
const std::string& compress_cfg, int i,
|
||||
recompress_output_port *move_to = nullptr)
|
||||
{
|
||||
auto& worker = s->workers[compress_cfg];
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(worker.ports_mut);
|
||||
if(move_to)
|
||||
*move_to = std::move(worker.ports[i]);
|
||||
worker.ports.erase(worker.ports.begin() + i);
|
||||
|
||||
if(worker.ports.empty()){
|
||||
//poison compress
|
||||
compress_frame(worker.compress.get(), nullptr);
|
||||
worker.thread.join();
|
||||
s->workers.erase(compress_cfg);
|
||||
}
|
||||
}
|
||||
|
||||
for(auto& p : s->index_to_port){
|
||||
if(p.first == compress_cfg && p.second > i)
|
||||
p.second--;
|
||||
}
|
||||
}
|
||||
|
||||
void recompress_remove_port(struct state_recompress *s, int index){
|
||||
std::lock_guard<std::mutex> lock(s->mut);
|
||||
auto [compress_cfg, i] = s->index_to_port[index];
|
||||
|
||||
extract_port(s, compress_cfg, i);
|
||||
s->index_to_port.erase(s->index_to_port.begin() + index);
|
||||
}
|
||||
|
||||
uint32_t recompress_get_port_ssrc(struct state_recompress *s, int idx){
|
||||
std::lock_guard<std::mutex> lock(s->mut);
|
||||
auto [compress_cfg, i] = s->index_to_port[idx];
|
||||
|
||||
std::lock_guard<std::mutex> work_lock(s->workers[compress_cfg].ports_mut);
|
||||
return s->workers[compress_cfg].ports[i].video_rxtx->get_ssrc();
|
||||
}
|
||||
|
||||
void recompress_port_set_active(struct state_recompress *s,
|
||||
int index, bool active)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(s->mut);
|
||||
auto [compress_cfg, i] = s->index_to_port[index];
|
||||
|
||||
std::unique_lock<std::mutex> worker_lock(s->workers[compress_cfg].ports_mut);
|
||||
s->workers[compress_cfg].ports[i].active = active;
|
||||
}
|
||||
|
||||
static int worker_get_num_active_ports(const recompress_worker_ctx& worker){
|
||||
int ret = 0;
|
||||
for(const auto& port : worker.ports){
|
||||
if(port.active)
|
||||
ret++;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int recompress_get_num_active_ports(struct state_recompress *s){
|
||||
std::lock_guard<std::mutex> lock(s->mut);
|
||||
int ret = 0;
|
||||
for(const auto& worker : s->workers){
|
||||
ret += worker_get_num_active_ports(worker.second);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
struct state_recompress *recompress_init(struct module *parent) {
|
||||
auto state = new state_recompress();
|
||||
if(!state)
|
||||
return nullptr;
|
||||
|
||||
state->parent = parent;
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
void recompress_process_async(state_recompress *s, std::shared_ptr<video_frame> frame){
|
||||
std::lock_guard<std::mutex> lock(s->mut);
|
||||
for(const auto& worker : s->workers){
|
||||
if(worker_get_num_active_ports(worker.second) > 0)
|
||||
compress_frame(worker.second.compress.get(), frame);
|
||||
}
|
||||
}
|
||||
|
||||
void recompress_done(struct state_recompress *s) {
|
||||
std::lock_guard<std::mutex> lock(s->mut);
|
||||
for(auto& worker : s->workers){
|
||||
//poison compress
|
||||
compress_frame(worker.second.compress.get(), nullptr);
|
||||
|
||||
worker.second.thread.join();
|
||||
}
|
||||
delete s;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
/**
|
||||
* @file hd-rum-translator/hd-rum/recompress.h
|
||||
* @author Martin Pulec <martin.pulec@cesnet.cz>
|
||||
* @author Martin Piatka <piatka@cesnet.cz>
|
||||
*/
|
||||
/*
|
||||
* Copyright (c) 2013-2015 CESNET, z. s. p. o.
|
||||
* Copyright (c) 2013-2022 CESNET, z. s. p. o.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
@@ -48,12 +49,24 @@ extern "C" {
|
||||
struct module;
|
||||
struct video_frame;
|
||||
|
||||
void *recompress_init(struct module *parent, const char *host, const char *compress,
|
||||
unsigned short rx_port, unsigned short tx_port, int mtu, char *fec,
|
||||
long long bitrate);
|
||||
void recompress_assign_ssrc(void *state, uint32_t ssrc);
|
||||
void recompress_done(void *state);
|
||||
uint32_t recompress_get_ssrc(void *state);
|
||||
struct state_recompress;
|
||||
|
||||
struct state_recompress *recompress_init(struct module *parent);
|
||||
void recompress_done(struct state_recompress *state);
|
||||
|
||||
uint32_t recompress_get_port_ssrc(struct state_recompress *s, int idx);
|
||||
|
||||
|
||||
int recompress_add_port(struct state_recompress *s,
|
||||
const char *host, const char *compress, unsigned short rx_port,
|
||||
unsigned short tx_port, int mtu, const char *fec, long long bitrate);
|
||||
|
||||
void recompress_remove_port(struct state_recompress *s, int index);
|
||||
|
||||
void recompress_port_set_active(struct state_recompress *s,
|
||||
int index, bool active);
|
||||
|
||||
int recompress_get_num_active_ports(struct state_recompress *s);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
@@ -62,5 +75,5 @@ uint32_t recompress_get_ssrc(void *state);
|
||||
#ifdef __cplusplus
|
||||
#include <memory>
|
||||
#include <string>
|
||||
void recompress_process_async(void *state, std::shared_ptr<video_frame> frame);
|
||||
void recompress_process_async(state_recompress *state, std::shared_ptr<video_frame> frame);
|
||||
#endif
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
/**
|
||||
* @file hd-rum-translator/hd-rum-translator.cpp
|
||||
* @author Martin Pulec <pulec@cesnet.cz>
|
||||
* @author Martin Piatka <piatka@cesnet.cz>
|
||||
*
|
||||
* Main part of transcoding reflector. This component provides a runtime
|
||||
* for the reflector. Componets are following:
|
||||
@@ -12,7 +13,7 @@
|
||||
* compresses and sends frame to receiver
|
||||
*/
|
||||
/*
|
||||
* Copyright (c) 2013-2021 CESNET, z. s. p. o.
|
||||
* Copyright (c) 2013-2022 CESNET, z. s. p. o.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
@@ -104,7 +105,6 @@ struct replica {
|
||||
mod.priv_data = this;
|
||||
module_register(&mod, parent);
|
||||
type = replica::type_t::NONE;
|
||||
recompress = nullptr;
|
||||
}
|
||||
|
||||
~replica() {
|
||||
@@ -125,7 +125,6 @@ struct replica {
|
||||
};
|
||||
enum type_t type;
|
||||
socket_udp *sock;
|
||||
void *recompress;
|
||||
};
|
||||
|
||||
struct hd_rum_translator_state {
|
||||
@@ -159,6 +158,7 @@ struct hd_rum_translator_state {
|
||||
|
||||
vector<replica *> replicas;
|
||||
void *decompress;
|
||||
struct state_recompress *recompress;
|
||||
};
|
||||
|
||||
/*
|
||||
@@ -280,8 +280,8 @@ static struct response *change_replica_type(struct hd_rum_translator_state *s,
|
||||
return new_response(RESPONSE_BAD_REQUEST, NULL);
|
||||
}
|
||||
|
||||
hd_rum_decompress_set_active(s->decompress, r->recompress,
|
||||
r->type == replica::type_t::RECOMPRESS);
|
||||
recompress_port_set_active(s->recompress, index,
|
||||
r->type == replica::type_t::RECOMPRESS);
|
||||
|
||||
return new_response(RESPONSE_OK, NULL);
|
||||
}
|
||||
@@ -340,7 +340,7 @@ static void *writer(void *arg)
|
||||
}
|
||||
}
|
||||
if (index >= 0) {
|
||||
hd_rum_decompress_remove_port(s->decompress, index);
|
||||
recompress_remove_port(s->recompress, index);
|
||||
delete s->replicas[index];
|
||||
s->replicas.erase(s->replicas.begin() + index);
|
||||
log_msg(LOG_LEVEL_NOTICE, "Deleted output port %d.\n", index);
|
||||
@@ -391,29 +391,36 @@ static void *writer(void *arg)
|
||||
if (compress) {
|
||||
rep->type = replica::type_t::RECOMPRESS;
|
||||
char *fec = NULL;
|
||||
rep->recompress = recompress_init(&rep->mod,
|
||||
int idx = recompress_add_port(s->recompress,
|
||||
host, compress,
|
||||
0, tx_port, 1500, fec, RATE_UNLIMITED);
|
||||
if (!rep->recompress) {
|
||||
if (idx < 0) {
|
||||
delete s->replicas[s->replicas.size() - 1];
|
||||
s->replicas.erase(s->replicas.end() - 1);
|
||||
|
||||
log_msg(LOG_LEVEL_ERROR, "Unable to create recompress!\n");
|
||||
} else {
|
||||
hd_rum_decompress_append_port(s->decompress, rep->recompress);
|
||||
hd_rum_decompress_set_active(s->decompress, rep->recompress, true);
|
||||
log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_ssrc(rep->recompress));
|
||||
assert((unsigned) idx == s->replicas.size() - 1);
|
||||
recompress_port_set_active(s->recompress, idx, true);
|
||||
log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx));
|
||||
}
|
||||
} else {
|
||||
rep->type = replica::type_t::USE_SOCK;
|
||||
char compress[] = "none";
|
||||
char *fec = NULL;
|
||||
rep->recompress = recompress_init(&rep->mod,
|
||||
int idx = recompress_add_port(s->recompress,
|
||||
host, compress,
|
||||
0, tx_port, 1500, fec, RATE_UNLIMITED);
|
||||
hd_rum_decompress_append_port(s->decompress, rep->recompress);
|
||||
hd_rum_decompress_set_active(s->decompress, rep->recompress, false);
|
||||
log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port);
|
||||
if (idx < 0) {
|
||||
delete s->replicas[s->replicas.size() - 1];
|
||||
s->replicas.erase(s->replicas.end() - 1);
|
||||
|
||||
log_msg(LOG_LEVEL_ERROR, "Unable to create recompress!\n");
|
||||
} else {
|
||||
assert((unsigned) idx == s->replicas.size() - 1);
|
||||
recompress_port_set_active(s->recompress, idx, false);
|
||||
log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
r = new_response(RESPONSE_BAD_REQUEST, NULL);
|
||||
@@ -429,7 +436,7 @@ static void *writer(void *arg)
|
||||
}
|
||||
|
||||
// pass it for transcoding if needed
|
||||
if (hd_rum_decompress_get_num_active_ports(s->decompress) > 0) {
|
||||
if (recompress_get_num_active_ports(s->recompress) > 0) {
|
||||
ssize_t ret = hd_rum_decompress_write(s->decompress, s->qhead->buf, s->qhead->size);
|
||||
if (ret < 0) {
|
||||
perror("hd_rum_decompress_write");
|
||||
@@ -689,6 +696,9 @@ static void hd_rum_translator_deinit(struct hd_rum_translator_state *s) {
|
||||
hd_rum_decompress_done(s->decompress);
|
||||
}
|
||||
|
||||
if(s->recompress)
|
||||
recompress_done(s->recompress);
|
||||
|
||||
for (unsigned int i = 0; i < s->replicas.size(); i++) {
|
||||
delete s->replicas[i];
|
||||
}
|
||||
@@ -799,8 +809,15 @@ int main(int argc, char **argv)
|
||||
control_start(state.control_state);
|
||||
}
|
||||
|
||||
//one shared recompressor, which manages compressions and sends to all hosts
|
||||
state.recompress = recompress_init(&state.mod);
|
||||
if(!state.recompress) {
|
||||
EXIT(EXIT_FAIL_COMPRESS);
|
||||
}
|
||||
|
||||
// we need only one shared receiver decompressor for all recompressing streams
|
||||
state.decompress = hd_rum_decompress_init(&state.mod, params.out_conf, params.capture_filter);
|
||||
state.decompress = hd_rum_decompress_init(&state.mod, params.out_conf,
|
||||
params.capture_filter, state.recompress);
|
||||
if(!state.decompress) {
|
||||
EXIT(EXIT_FAIL_DECODER);
|
||||
}
|
||||
@@ -823,26 +840,31 @@ int main(int argc, char **argv)
|
||||
state.replicas[i]->type = replica::type_t::USE_SOCK;
|
||||
char compress[] = "none";
|
||||
char *fec = NULL;
|
||||
state.replicas[i]->recompress = recompress_init(&state.replicas[i]->mod,
|
||||
int idx = recompress_add_port(state.recompress,
|
||||
params.hosts[i].addr, compress,
|
||||
0, tx_port, params.hosts[i].mtu, fec, params.hosts[i].bitrate);
|
||||
hd_rum_decompress_append_port(state.decompress, state.replicas[i]->recompress);
|
||||
hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, false);
|
||||
} else {
|
||||
state.replicas[i]->type = replica::type_t::RECOMPRESS;
|
||||
|
||||
state.replicas[i]->recompress = recompress_init(&state.replicas[i]->mod,
|
||||
params.hosts[i].addr, params.hosts[i].compression,
|
||||
0, tx_port, params.hosts[i].mtu, params.hosts[i].fec, params.hosts[i].bitrate);
|
||||
if(state.replicas[i]->recompress == 0) {
|
||||
if(idx < 0) {
|
||||
fprintf(stderr, "Initializing output port '%s' failed!\n",
|
||||
params.hosts[i].addr);
|
||||
EXIT(EXIT_FAILURE);
|
||||
}
|
||||
// we don't care about this clients, we only tell decompressor to
|
||||
assert(idx == i);
|
||||
recompress_port_set_active(state.recompress, i, false);
|
||||
} else {
|
||||
state.replicas[i]->type = replica::type_t::RECOMPRESS;
|
||||
|
||||
int idx = recompress_add_port(state.recompress,
|
||||
params.hosts[i].addr, params.hosts[i].compression,
|
||||
0, tx_port, params.hosts[i].mtu, params.hosts[i].fec, params.hosts[i].bitrate);
|
||||
if(idx < 0) {
|
||||
fprintf(stderr, "Initializing output port '%s' failed!\n",
|
||||
params.hosts[i].addr);
|
||||
EXIT(EXIT_FAILURE);
|
||||
}
|
||||
assert(idx == i);
|
||||
// we don't care about this clients, we only tell recompressor to
|
||||
// take care about them
|
||||
hd_rum_decompress_append_port(state.decompress, state.replicas[i]->recompress);
|
||||
hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, true);
|
||||
recompress_port_set_active(state.recompress, i, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user