mirror of
https://github.com/outbackdingo/UltraGrid.git
synced 2026-03-21 19:40:24 +00:00
Collect and report more statistics
Statistics are collected and reported through control socket where it can be subscribed and further processed (eg. by CoUniverse).
This commit is contained in:
@@ -111,7 +111,6 @@ OBJS = @OBJS@ \
|
||||
src/ihdtv/ihdtv.o \
|
||||
src/module.o \
|
||||
src/rtsp/rtsp_utils.o \
|
||||
src/stats.o \
|
||||
src/utils/config_file.o \
|
||||
src/utils/list.o \
|
||||
src/lib_common.o \
|
||||
|
||||
@@ -52,7 +52,9 @@
|
||||
|
||||
#include "control_socket.h"
|
||||
|
||||
#include <set>
|
||||
#include <map>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
#include "debug.h"
|
||||
#include "messaging.h"
|
||||
@@ -95,7 +97,8 @@ struct control_state {
|
||||
int network_port;
|
||||
struct module *root_module;
|
||||
|
||||
set<struct stats *> stats;
|
||||
multimap<int32_t, struct stats_reportable *> stats; // first member is ID of stream if applicable
|
||||
std::map<uint32_t, int> stats_id_port_mapping; // this maps ID from above to index 0..n
|
||||
pthread_mutex_t stats_lock;
|
||||
|
||||
enum connection_type connection_type;
|
||||
@@ -627,33 +630,39 @@ static void * control_thread(void *args)
|
||||
struct timeval curr_time;
|
||||
gettimeofday(&curr_time, NULL);
|
||||
if(tv_diff(curr_time, last_report_sent) > report_interval_sec) {
|
||||
char buffer[1025];
|
||||
bool empty = true;
|
||||
memset(buffer, '\0', sizeof(buffer));
|
||||
strncpy(buffer + strlen(buffer), "stats", sizeof(buffer) -
|
||||
strlen(buffer) - 1);
|
||||
bool first = true;
|
||||
int32_t last_id = -1;
|
||||
ostringstream buffer;
|
||||
buffer << "stats";
|
||||
pthread_mutex_lock(&s->stats_lock);
|
||||
for(set<struct stats *>::iterator it = s->stats.begin();
|
||||
for(auto it = s->stats.begin();
|
||||
it != s->stats.end(); ++it) {
|
||||
empty = false;
|
||||
strncpy(buffer + strlen(buffer), " ", sizeof(buffer) -
|
||||
strlen(buffer) - 1);
|
||||
stats_format(*it, buffer + strlen(buffer),
|
||||
sizeof(buffer) - strlen(buffer));
|
||||
int32_t id = it->first;
|
||||
if ((first || last_id != id) && id != -1) {
|
||||
buffer << " -";
|
||||
if (s->stats_id_port_mapping.find(id) != s->stats_id_port_mapping.end()) {
|
||||
buffer << s->stats_id_port_mapping.at(id);
|
||||
} else {
|
||||
buffer << "UNKNOWN";
|
||||
}
|
||||
}
|
||||
last_id = id;
|
||||
first = false;
|
||||
buffer << " " + it->second->get_stat();
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&s->stats_lock);
|
||||
strncpy(buffer + strlen(buffer), "\r\n", sizeof(buffer) -
|
||||
strlen(buffer) - 1);
|
||||
buffer << "\r\n";
|
||||
|
||||
if(strlen(buffer) < 1024 && !empty) {
|
||||
if (!first) { // are there any stats to report?
|
||||
cur = clients;
|
||||
string str = buffer.str();
|
||||
while(cur) {
|
||||
if(is_internal_port(cur->fd)) { // skip local FD
|
||||
cur = cur->next;
|
||||
continue;
|
||||
}
|
||||
write_all(cur->fd, buffer, strlen(buffer));
|
||||
write_all(cur->fd, str.c_str(), str.length());
|
||||
cur = cur->next;
|
||||
}
|
||||
}
|
||||
@@ -702,17 +711,29 @@ void control_done(struct control_state *s)
|
||||
delete s;
|
||||
}
|
||||
|
||||
void control_add_stats(struct control_state *s, struct stats *stats)
|
||||
void control_add_stats(struct control_state *s, struct stats_reportable *stats, int32_t port_id)
|
||||
{
|
||||
pthread_mutex_lock(&s->stats_lock);
|
||||
s->stats.insert(stats);
|
||||
s->stats.emplace(port_id, stats);
|
||||
pthread_mutex_unlock(&s->stats_lock);
|
||||
}
|
||||
|
||||
void control_remove_stats(struct control_state *s, struct stats *stats)
|
||||
void control_remove_stats(struct control_state *s, struct stats_reportable *stats)
|
||||
{
|
||||
pthread_mutex_lock(&s->stats_lock);
|
||||
s->stats.erase(stats);
|
||||
for (auto it = s->stats.begin(); it != s->stats.end(); ++it) {
|
||||
if (it->second == stats) {
|
||||
s->stats.erase(it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&s->stats_lock);
|
||||
}
|
||||
|
||||
void control_replace_port_mapping(struct control_state *s, std::map<uint32_t, int> &&m)
|
||||
{
|
||||
pthread_mutex_lock(&s->stats_lock);
|
||||
s->stats_id_port_mapping = move(m);
|
||||
pthread_mutex_unlock(&s->stats_lock);
|
||||
}
|
||||
|
||||
|
||||
@@ -45,13 +45,14 @@
|
||||
*
|
||||
*/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif // __cplusplus
|
||||
#ifndef control_socket_h_
|
||||
#define control_socket_h_
|
||||
|
||||
#include <map>
|
||||
|
||||
struct control_state;
|
||||
struct module;
|
||||
struct stats;
|
||||
struct stats_reportable;
|
||||
|
||||
#define CONTROL_DEFAULT_PORT 5054
|
||||
|
||||
@@ -61,10 +62,9 @@ struct stats;
|
||||
int control_init(int port, int connection_type, struct control_state **state, struct module *root_module);
|
||||
void control_start(struct control_state *state);
|
||||
void control_done(struct control_state *s);
|
||||
void control_add_stats(struct control_state *state, struct stats *stats);
|
||||
void control_remove_stats(struct control_state *state, struct stats *stats);
|
||||
void control_add_stats(struct control_state *state, struct stats_reportable *stats, int32_t port_id = -1);
|
||||
void control_remove_stats(struct control_state *state, struct stats_reportable *stats);
|
||||
void control_replace_port_mapping(struct control_state *state, std::map<uint32_t, int> &&);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif // __cplusplus
|
||||
#endif // control_socket_h_
|
||||
|
||||
|
||||
@@ -22,10 +22,10 @@
|
||||
#include "hd-rum-translator/hd-rum-decompress.h"
|
||||
#include "messaging.h"
|
||||
#include "module.h"
|
||||
#include "stats.h"
|
||||
#include "utils/misc.h"
|
||||
#include "tv.h"
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
@@ -51,8 +51,8 @@ struct replica {
|
||||
};
|
||||
|
||||
struct hd_rum_translator_state {
|
||||
hd_rum_translator_state() : mod(), queue(nullptr), qhead(nullptr), qtail(nullptr), qempty(1),
|
||||
qfull(0), decompress(nullptr) {
|
||||
hd_rum_translator_state() : mod(), control_state(nullptr), queue(nullptr), qhead(nullptr),
|
||||
qtail(nullptr), qempty(1), qfull(0), decompress(nullptr) {
|
||||
module_init_default(&mod);
|
||||
mod.cls = MODULE_CLASS_ROOT;
|
||||
pthread_mutex_init(&qempty_mtx, NULL);
|
||||
@@ -68,6 +68,7 @@ struct hd_rum_translator_state {
|
||||
module_done(&mod);
|
||||
}
|
||||
struct module mod;
|
||||
struct control_state *control_state;
|
||||
struct item *queue;
|
||||
struct item *qhead;
|
||||
struct item *qtail;
|
||||
@@ -125,6 +126,7 @@ static void replica_init(struct replica *s, const char *addr, int tx_port, int b
|
||||
module_init_default(&s->mod);
|
||||
s->mod.cls = MODULE_CLASS_PORT;
|
||||
s->mod.priv_data = s;
|
||||
s->mod.id = lrand48();
|
||||
module_register(&s->mod, parent);
|
||||
}
|
||||
|
||||
@@ -254,6 +256,17 @@ void change_replica_type(struct hd_rum_translator_state *s,
|
||||
r->type == replica::type_t::RECOMPRESS);
|
||||
}
|
||||
|
||||
static void update_mapping(struct hd_rum_translator_state *s)
|
||||
{
|
||||
map<uint32_t, int> mapping;
|
||||
int i = 0;
|
||||
for (auto && r : s->replicas) {
|
||||
mapping[r->mod.id] = i++;
|
||||
}
|
||||
|
||||
control_replace_port_mapping(s->control_state, move(mapping));
|
||||
}
|
||||
|
||||
static void *writer(void *arg)
|
||||
{
|
||||
struct hd_rum_translator_state *s =
|
||||
@@ -277,6 +290,7 @@ static void *writer(void *arg)
|
||||
replica_done(s->replicas[index]);
|
||||
delete s->replicas[index];
|
||||
s->replicas.erase(s->replicas.begin() + index);
|
||||
update_mapping(s);
|
||||
} else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) {
|
||||
s->replicas.push_back(new replica());
|
||||
struct replica *rep = s->replicas[s->replicas.size() - 1];
|
||||
@@ -286,6 +300,7 @@ static void *writer(void *arg)
|
||||
int tx_port = atoi(strtok_r(NULL, " ", &save_ptr));
|
||||
char *compress = strtok_r(NULL, " ", &save_ptr);
|
||||
replica_init(rep, host, tx_port, 100*1000, &s->mod);
|
||||
update_mapping(s);
|
||||
if (compress) {
|
||||
rep->type = replica::type_t::RECOMPRESS;
|
||||
char *fec = NULL;
|
||||
@@ -494,7 +509,6 @@ int main(int argc, char **argv)
|
||||
int host_count;
|
||||
int control_port = CONTROL_DEFAULT_PORT;
|
||||
int control_connection_type = 0;
|
||||
struct control_state *control_state = NULL;
|
||||
#ifdef WIN32
|
||||
WSADATA wsaData;
|
||||
|
||||
@@ -597,11 +611,11 @@ int main(int argc, char **argv)
|
||||
rep = new replica();
|
||||
}
|
||||
|
||||
if(control_init(control_port, control_connection_type, &control_state, &state.mod) != 0) {
|
||||
if(control_init(control_port, control_connection_type, &state.control_state, &state.mod) != 0) {
|
||||
fprintf(stderr, "Warning: Unable to create remote control.\n");
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
control_start(control_state);
|
||||
control_start(state.control_state);
|
||||
|
||||
// we need only one shared receiver decompressor for all recompressing streams
|
||||
state.decompress = hd_rum_decompress_init(&state.mod);;
|
||||
@@ -650,20 +664,16 @@ int main(int argc, char **argv)
|
||||
hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, true);
|
||||
}
|
||||
}
|
||||
update_mapping(&state);
|
||||
|
||||
if (pthread_create(&thread, NULL, writer, (void *) &state)) {
|
||||
fprintf(stderr, "cannot create writer thread\n");
|
||||
return 2;
|
||||
}
|
||||
|
||||
struct stats *stat_received = stats_new_statistics(
|
||||
control_state,
|
||||
"received");
|
||||
uint64_t received_data = 0;
|
||||
struct timeval t0, t;
|
||||
struct timeval t_report;
|
||||
struct timeval t0;
|
||||
gettimeofday(&t0, NULL);
|
||||
gettimeofday(&t_report, NULL);
|
||||
|
||||
unsigned long long int last_data = 0ull;
|
||||
|
||||
@@ -681,17 +691,14 @@ int main(int argc, char **argv)
|
||||
pthread_cond_signal(&state.qempty_cond);
|
||||
pthread_mutex_unlock(&state.qempty_mtx);
|
||||
|
||||
struct timeval t;
|
||||
gettimeofday(&t, NULL);
|
||||
if(tv_diff(t, t0) > 1.0) {
|
||||
stats_update_int(stat_received, received_data);
|
||||
t0 = t;
|
||||
}
|
||||
double seconds = tv_diff(t, t_report);
|
||||
double seconds = tv_diff(t, t0);
|
||||
if (seconds > 5.0) {
|
||||
unsigned long long int cur_data = (received_data - last_data);
|
||||
unsigned long long int bps = cur_data / seconds;
|
||||
fprintf(stderr, "Received %llu bytes in %g seconds = %llu B/s.\n", cur_data, seconds, bps);
|
||||
t_report = t;
|
||||
t0 = t;
|
||||
last_data = received_data;
|
||||
}
|
||||
}
|
||||
@@ -720,8 +727,6 @@ int main(int argc, char **argv)
|
||||
pthread_cond_signal(&state.qempty_cond);
|
||||
pthread_mutex_unlock(&state.qempty_mtx);
|
||||
|
||||
stats_destroy(stat_received);
|
||||
|
||||
pthread_join(thread, NULL);
|
||||
|
||||
if(state.decompress) {
|
||||
@@ -733,7 +738,7 @@ int main(int argc, char **argv)
|
||||
delete state.replicas[i];
|
||||
}
|
||||
|
||||
control_done(control_state);
|
||||
control_done(state.control_state);
|
||||
|
||||
#ifdef WIN32
|
||||
WSACleanup();
|
||||
|
||||
13
src/module.c
13
src/module.c
@@ -276,3 +276,16 @@ void dump_tree(struct module *node, int indent) {
|
||||
}
|
||||
}
|
||||
|
||||
bool get_port_id(struct module *node, uint32_t *id)
|
||||
{
|
||||
while(node->parent) {
|
||||
if (node->cls == MODULE_CLASS_PORT) {
|
||||
*id = node->id;
|
||||
return true;
|
||||
}
|
||||
node = node->parent;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -100,6 +100,7 @@ struct module {
|
||||
struct simple_linked_list *msg_queue_childs; ///< messages for childern that were not delivered
|
||||
|
||||
void *priv_data;
|
||||
uint32_t id;
|
||||
|
||||
#ifdef __cplusplus
|
||||
module() = default;
|
||||
@@ -139,6 +140,7 @@ struct module *get_matching_child(struct module *node, const char *path);
|
||||
struct module *get_root_module(struct module *node);
|
||||
|
||||
struct module *get_parent_module(struct module *node);
|
||||
bool get_port_id(struct module *child_node, uint32_t *id);
|
||||
|
||||
void dump_tree(struct module *root, int indent);
|
||||
|
||||
|
||||
@@ -93,6 +93,7 @@ struct pbuf {
|
||||
int last_rtp_seq;
|
||||
int should_arrived;
|
||||
int cumulative_count;
|
||||
int last_received, last_expected;
|
||||
uint32_t last_display_ts;
|
||||
};
|
||||
|
||||
@@ -302,6 +303,8 @@ void pbuf_insert(struct pbuf *playout_buf, rtp_packet * pkt)
|
||||
playout_buf->cumulative_count,
|
||||
(double) playout_buf->cumulative_count /
|
||||
playout_buf->should_arrived * 100.0);
|
||||
playout_buf->last_received = playout_buf->cumulative_count;
|
||||
playout_buf->last_expected = playout_buf->should_arrived;
|
||||
playout_buf->should_arrived =
|
||||
playout_buf->cumulative_count = 0;
|
||||
playout_buf->last_display_ts = pkt->ts;
|
||||
@@ -470,3 +473,10 @@ void pbuf_set_playout_delay(struct pbuf *playout_buf, double playout_delay)
|
||||
playout_buf->playout_delay = playout_delay;
|
||||
}
|
||||
|
||||
void pbuf_get_packet_count(struct pbuf *playout_buf, int *expected_pkts, int *received_pkts)
|
||||
{
|
||||
*expected_pkts = playout_buf->last_expected;
|
||||
*received_pkts = playout_buf->last_received;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -117,6 +117,7 @@ int pbuf_decode(struct pbuf *playout_buf, struct timeval curr_time,
|
||||
//struct video_frame *framebuffer, int i, struct state_decoder *decoder);
|
||||
void pbuf_remove(struct pbuf *playout_buf, struct timeval curr_time);
|
||||
void pbuf_set_playout_delay(struct pbuf *playout_buf, double playout_delay);
|
||||
void pbuf_get_packet_count(struct pbuf *playout_buf, int *expected_pkts, int *received_pkts);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
@@ -80,12 +80,14 @@
|
||||
#include "host.h"
|
||||
#include "lib_common.h"
|
||||
#include "messaging.h"
|
||||
#include "module.h"
|
||||
#include "perf.h"
|
||||
#include "rtp/fec.h"
|
||||
#include "rtp/rtp.h"
|
||||
#include "rtp/rtp_callback.h"
|
||||
#include "rtp/pbuf.h"
|
||||
#include "rtp/video_decoders.h"
|
||||
#include "stats.h"
|
||||
#include "utils/synchronized_queue.h"
|
||||
#include "utils/timed_message.h"
|
||||
#include "video.h"
|
||||
@@ -262,6 +264,7 @@ struct state_video_decoder
|
||||
// for statistics
|
||||
/// @{
|
||||
volatile unsigned long int displayed, dropped, corrupted, missing;
|
||||
shared_ptr<struct stats<int_fast64_t>> s_total_frames, s_corrupt_frames, s_displayed_frames;
|
||||
volatile unsigned long int fec_ok, fec_nok;
|
||||
long int last_buffer_number;
|
||||
/// @}
|
||||
@@ -430,7 +433,7 @@ static void *fec_thread(void *args) {
|
||||
} else if (!corrupted_frame_counted) {
|
||||
corrupted_frame_counted = true;
|
||||
// count it here because decoder accepts corrupted frames
|
||||
decoder->corrupted++;
|
||||
decoder->s_corrupt_frames->update(++decoder->corrupted);
|
||||
}
|
||||
verbose_msg("\n");
|
||||
}
|
||||
@@ -445,8 +448,9 @@ static void *fec_thread(void *args) {
|
||||
|
||||
cleanup:
|
||||
if(ret == FALSE) {
|
||||
decoder->corrupted++;
|
||||
decoder->s_corrupt_frames->update(decoder->corrupted++);
|
||||
decoder->dropped++;
|
||||
decoder->s_total_frames->update(decoder->displayed + decoder->missing + decoder->dropped);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -556,7 +560,8 @@ static void *decompress_thread(void *args) {
|
||||
int ret = display_put_frame(decoder->display,
|
||||
decoder->frame, putf_flags);
|
||||
if (ret == 0) {
|
||||
decoder->displayed++;
|
||||
decoder->s_displayed_frames->update(++decoder->displayed);
|
||||
decoder->s_total_frames->update(decoder->displayed + decoder->missing + decoder->dropped);
|
||||
} else {
|
||||
decoder->dropped++;
|
||||
}
|
||||
@@ -657,6 +662,10 @@ struct state_video_decoder *video_decoder_init(struct module *parent,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
s->s_total_frames = shared_ptr<struct stats<int_fast64_t>>(new stats<int_fast64_t>(parent, "totalframes"));
|
||||
s->s_corrupt_frames = shared_ptr<struct stats<int_fast64_t>>(new stats<int_fast64_t>(parent, "corruptframes"));
|
||||
s->s_displayed_frames = shared_ptr<struct stats<int_fast64_t>>(new stats<int_fast64_t>(parent, "displayedframes"));
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
#include "stats.h"
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
|
||||
#include "control_socket.h"
|
||||
#include "compat/platform_spin.h"
|
||||
#include "debug.h"
|
||||
#include "messaging.h"
|
||||
#include "module.h"
|
||||
#include "utils/resource_manager.h"
|
||||
|
||||
#define MAX_ITEMS 100
|
||||
|
||||
using namespace std;
|
||||
|
||||
struct stats {
|
||||
public:
|
||||
stats(string name, struct control_state *control)
|
||||
: m_name(name), m_val(0), m_control(control) {
|
||||
platform_spin_init(&m_spin);
|
||||
control_add_stats(control, this);
|
||||
}
|
||||
|
||||
~stats() {
|
||||
control_remove_stats(m_control, this);
|
||||
platform_spin_destroy(&m_spin);
|
||||
}
|
||||
|
||||
void update_int(int64_t val) {
|
||||
platform_spin_lock(&m_spin);
|
||||
m_val = val;
|
||||
platform_spin_unlock(&m_spin);
|
||||
}
|
||||
|
||||
void get_stat(char *buffer, int buffer_len) {
|
||||
platform_spin_lock(&m_spin);
|
||||
snprintf(buffer, buffer_len, "%s %lld", m_name.c_str(), (long long) m_val);
|
||||
platform_spin_unlock(&m_spin);
|
||||
}
|
||||
|
||||
private:
|
||||
string m_name;
|
||||
int64_t m_val;
|
||||
struct control_state *m_control;
|
||||
platform_spin_t m_spin;
|
||||
};
|
||||
|
||||
struct stats *stats_new_statistics(struct control_state *control, const char * name)
|
||||
{
|
||||
return new stats(string(name), control);
|
||||
}
|
||||
|
||||
void stats_update_int(struct stats *s, int64_t val)
|
||||
{
|
||||
return s->update_int(val);
|
||||
}
|
||||
|
||||
void stats_format(struct stats *s, char *buffer, int buffer_len)
|
||||
{
|
||||
s->get_stat(buffer, buffer_len);
|
||||
}
|
||||
|
||||
void stats_destroy(struct stats *s)
|
||||
{
|
||||
delete s;
|
||||
}
|
||||
|
||||
66
src/stats.h
66
src/stats.h
@@ -1,27 +1,53 @@
|
||||
#ifndef STATS_H_
|
||||
#define STATS_H_
|
||||
#ifndef stat_h_
|
||||
#define stat_h_
|
||||
|
||||
#ifdef HAVE_CONFIG_H
|
||||
#include "config.h"
|
||||
#include "config_unix.h"
|
||||
#include "config_win32.h"
|
||||
#endif
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "control_socket.h"
|
||||
#include "module.h"
|
||||
|
||||
struct control_state;
|
||||
struct stats;
|
||||
struct stats_reportable {
|
||||
virtual std::string get_stat() = 0;
|
||||
};
|
||||
|
||||
struct stats *stats_new_statistics(struct control_state *control, const char * name);
|
||||
void stats_update_int(struct stats *, int64_t);
|
||||
void stats_format(struct stats *s, char *buffer, int buffer_len);
|
||||
void stats_destroy(struct stats *);
|
||||
template <typename T>
|
||||
struct stats : public stats_reportable {
|
||||
public:
|
||||
/**
|
||||
* @param mod should be the module that collect the statistics or its closest
|
||||
* ancestor because it should be used to identify branch for which statistics
|
||||
* are reported.
|
||||
*/
|
||||
stats(struct module *mod, std::string name)
|
||||
: m_name(name), m_val() {
|
||||
m_control = (struct control_state *) get_module(get_root_module(mod), "control");
|
||||
uint32_t id;
|
||||
bool ret = get_port_id(mod, &id);
|
||||
control_add_stats(m_control, this, ret ? id : -1);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
~stats() {
|
||||
control_remove_stats(m_control, this);
|
||||
}
|
||||
|
||||
#endif// STATS_H_
|
||||
void update(T val) {
|
||||
m_val = val;
|
||||
}
|
||||
|
||||
std::string get_stat() {
|
||||
std::ostringstream oss;
|
||||
oss << m_name << " " << m_val.load();
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
struct control_state *m_control;
|
||||
std::string m_name;
|
||||
std::atomic<T> m_val;
|
||||
};
|
||||
|
||||
#endif // stat_h_
|
||||
|
||||
|
||||
@@ -45,8 +45,8 @@
|
||||
#include "debug.h"
|
||||
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
|
||||
#include "host.h"
|
||||
#include "lib_common.h"
|
||||
@@ -114,7 +114,8 @@ void register_video_rxtx(enum rxtx_protocol proto, struct video_rxtx_info info)
|
||||
|
||||
video_rxtx::video_rxtx(map<string, param_u> const ¶ms): m_paused(false),
|
||||
m_rxtx_mode(params.at("rxtx_mode").i), m_compression(nullptr),
|
||||
m_video_exporter(static_cast<struct video_export *>(params.at("exporter").ptr)) {
|
||||
m_video_exporter(static_cast<struct video_export *>(params.at("exporter").ptr)),
|
||||
m_frames(0), m_t0(std::chrono::steady_clock::now()) {
|
||||
|
||||
module_init_default(&m_sender_mod);
|
||||
m_sender_mod.cls = MODULE_CLASS_SENDER;
|
||||
@@ -196,9 +197,15 @@ void *video_rxtx::sender_loop() {
|
||||
|
||||
memset(&saved_vid_desc, 0, sizeof(saved_vid_desc));
|
||||
|
||||
struct module *control_mod = get_module(get_root_module(&m_sender_mod), "control");
|
||||
struct stats *stat_data_sent = stats_new_statistics((struct control_state *)
|
||||
control_mod, "data");
|
||||
stats<int_fast64_t> stat_sendbytes(&m_sender_mod, "sendbytes");
|
||||
stats<double> stat_inexpected(&m_sender_mod, "inexpected");
|
||||
/**
|
||||
* @todo
|
||||
* Theset 3 values should be 3 distinct values in future.
|
||||
*/
|
||||
stats<double> stat_inactual(&m_sender_mod, "inactual");
|
||||
stats<double> stat_outexpected(&m_sender_mod, "outexpected");
|
||||
stats<double> stat_outactual(&m_sender_mod, "outactual");
|
||||
|
||||
while(1) {
|
||||
check_sender_messages();
|
||||
@@ -212,12 +219,26 @@ void *video_rxtx::sender_loop() {
|
||||
video_export(m_video_exporter, tx_frame.get());
|
||||
|
||||
if (!m_paused) {
|
||||
stat_inexpected.update(tx_frame->fps);
|
||||
send_frame(tx_frame);
|
||||
|
||||
m_frames += 1;
|
||||
std::chrono::steady_clock::time_point curr_time =
|
||||
std::chrono::steady_clock::now();
|
||||
double seconds =
|
||||
std::chrono::duration_cast<std::chrono::duration<double>>(curr_time - m_t0).count();
|
||||
if (seconds >= 5.0) {
|
||||
double fps = m_frames / seconds;
|
||||
stat_inactual.update(fps);
|
||||
stat_outexpected.update(fps);
|
||||
stat_outactual.update(fps);
|
||||
m_t0 = curr_time;
|
||||
m_frames = 0;
|
||||
}
|
||||
|
||||
rtp_video_rxtx *rtp_rxtx = dynamic_cast<rtp_video_rxtx *>(this);
|
||||
if (rtp_rxtx) {
|
||||
stats_update_int(stat_data_sent,
|
||||
rtp_get_bytes_sent(rtp_rxtx->m_network_devices[0]));
|
||||
stat_sendbytes.update(rtp_get_bytes_sent(rtp_rxtx->m_network_devices[0]));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -226,8 +247,6 @@ exit:
|
||||
module_done(CAST_MODULE(m_compression));
|
||||
m_compression = nullptr;
|
||||
|
||||
stats_destroy(stat_data_sent);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
@@ -38,9 +38,10 @@
|
||||
#ifndef VIDEO_RXTX_H_
|
||||
#define VIDEO_RXTX_H_
|
||||
|
||||
#include <chrono>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "module.h"
|
||||
|
||||
@@ -105,6 +106,8 @@ private:
|
||||
struct compress_state *m_compression;
|
||||
pthread_mutex_t m_lock;
|
||||
struct video_export *m_video_exporter;
|
||||
int m_frames;
|
||||
std::chrono::steady_clock::time_point m_t0;
|
||||
|
||||
pthread_t m_thread_id;
|
||||
};
|
||||
|
||||
@@ -61,7 +61,6 @@
|
||||
#include "rtp/pbuf.h"
|
||||
#include "rtp/rtp_callback.h"
|
||||
#include "tfrc.h"
|
||||
#include "stats.h"
|
||||
#include "transmit.h"
|
||||
#include "tv.h"
|
||||
#include "ug_runtime_error.h"
|
||||
|
||||
@@ -72,12 +72,13 @@
|
||||
#include "video_rxtx/ultragrid_rtp.h"
|
||||
#include "utils/worker.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <utility>
|
||||
|
||||
using namespace std;
|
||||
|
||||
ultragrid_rtp_video_rxtx::ultragrid_rtp_video_rxtx(const map<string, param_u> ¶ms) :
|
||||
rtp_video_rxtx(params)
|
||||
rtp_video_rxtx(params), m_stat_nanoperframeactual((struct module *) params.at("parent").ptr, "nanoperframeactual"), m_t0(std::chrono::steady_clock::now()), m_duration(std::chrono::nanoseconds::zero()), m_frames(0)
|
||||
{
|
||||
if ((params.at("postprocess").ptr != NULL &&
|
||||
strstr((const char *) params.at("postprocess").ptr, "help") != NULL)) {
|
||||
@@ -142,6 +143,7 @@ void *ultragrid_rtp_video_rxtx::send_frame_async_callback(void *arg) {
|
||||
|
||||
void ultragrid_rtp_video_rxtx::send_frame_async(shared_ptr<video_frame> tx_frame)
|
||||
{
|
||||
std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
|
||||
lock_guard<mutex> lock(m_network_devices_lock);
|
||||
|
||||
if (m_connections_count == 1) { /* normal case - only one connection */
|
||||
@@ -163,8 +165,21 @@ void ultragrid_rtp_video_rxtx::send_frame_async(shared_ptr<video_frame> tx_frame
|
||||
|
||||
m_async_sending_lock.lock();
|
||||
m_async_sending = false;
|
||||
m_async_sending_cv.notify_all();
|
||||
m_async_sending_lock.unlock();
|
||||
m_async_sending_cv.notify_all();
|
||||
|
||||
std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now();
|
||||
m_frames += 1;
|
||||
m_duration += std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0);
|
||||
auto seconds =
|
||||
std::chrono::duration_cast<std::chrono::duration<double>>(t1 - m_t0).count();
|
||||
if (seconds >= 5.0) {
|
||||
m_stat_nanoperframeactual.update(m_duration.count() / m_frames);
|
||||
m_t0 = t1;
|
||||
m_frames = 0;
|
||||
m_duration = std::chrono::nanoseconds::zero();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void ultragrid_rtp_video_rxtx::receiver_process_messages()
|
||||
@@ -279,14 +294,9 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
|
||||
|
||||
fr = 1;
|
||||
|
||||
struct module *control_mod = get_module(get_root_module(&m_sender_mod), "control");
|
||||
struct stats *stat_loss = stats_new_statistics(
|
||||
(struct control_state *) control_mod,
|
||||
"loss");
|
||||
struct stats *stat_received = stats_new_statistics(
|
||||
(struct control_state *) control_mod,
|
||||
"received");
|
||||
uint64_t total_received = 0ull;
|
||||
stats<int_fast64_t> stat_loss(&m_sender_mod, "loss");
|
||||
stats<int_fast64_t> stat_expectedpacket(&m_sender_mod, "expectedpacket");
|
||||
stats<int_fast64_t> stat_receivedpacket(&m_sender_mod, "receivedpacket");
|
||||
|
||||
while (!should_exit_receiver) {
|
||||
struct timeval timeout;
|
||||
@@ -315,8 +325,6 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
|
||||
receiver_process_messages();
|
||||
//printf("Failed to receive data\n");
|
||||
}
|
||||
total_received += ret;
|
||||
stats_update_int(stat_received, total_received);
|
||||
|
||||
/* Decode and render for each participant in the conference... */
|
||||
pdb_iter_t it;
|
||||
@@ -380,9 +388,12 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
|
||||
}
|
||||
last_tile_received = curr_time;
|
||||
uint32_t sender_ssrc = cp->ssrc;
|
||||
stats_update_int(stat_loss,
|
||||
rtp_compute_fract_lost(m_network_devices[0],
|
||||
stat_loss.update(rtp_compute_fract_lost(m_network_devices[0],
|
||||
sender_ssrc));
|
||||
int expected_pkts, received_pkts;
|
||||
pbuf_get_packet_count(cp->playout_buffer, &expected_pkts, &received_pkts);
|
||||
stat_receivedpacket.update(received_pkts);
|
||||
stat_expectedpacket.update(expected_pkts);
|
||||
}
|
||||
|
||||
/* dual-link TIMEOUT - we won't wait for next tiles */
|
||||
@@ -433,9 +444,6 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
|
||||
// pass posioned pill to display
|
||||
display_put_frame(m_display_device, NULL, PUTF_BLOCKING);
|
||||
|
||||
stats_destroy(stat_loss);
|
||||
stats_destroy(stat_received);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
#ifndef VIDEO_RXTX_ULTRAGRID_RTP_H_
|
||||
#define VIDEO_RXTX_ULTRAGRID_RTP_H_
|
||||
|
||||
#include "stats.h"
|
||||
#include "video_rxtx.h"
|
||||
#include "video_rxtx/rtp.h"
|
||||
|
||||
@@ -82,6 +83,11 @@ private:
|
||||
std::condition_variable m_async_sending_cv;
|
||||
std::mutex m_async_sending_lock;
|
||||
/// @}
|
||||
|
||||
stats<std::chrono::nanoseconds::rep> m_stat_nanoperframeactual;
|
||||
std::chrono::steady_clock::time_point m_t0;
|
||||
std::chrono::nanoseconds m_duration;
|
||||
int m_frames;
|
||||
};
|
||||
|
||||
video_rxtx *create_video_rxtx_ultragrid_rtp(std::map<std::string, param_u> const ¶ms);
|
||||
|
||||
Reference in New Issue
Block a user