Collect and report more statistics

Statistics are collected and reported through control socket where it
can be subscribed and further processed (eg. by CoUniverse).
This commit is contained in:
Martin Pulec
2015-02-20 13:20:06 +01:00
parent ceaccf8b6e
commit eb064c7a81
16 changed files with 224 additions and 171 deletions

View File

@@ -111,7 +111,6 @@ OBJS = @OBJS@ \
src/ihdtv/ihdtv.o \ src/ihdtv/ihdtv.o \
src/module.o \ src/module.o \
src/rtsp/rtsp_utils.o \ src/rtsp/rtsp_utils.o \
src/stats.o \
src/utils/config_file.o \ src/utils/config_file.o \
src/utils/list.o \ src/utils/list.o \
src/lib_common.o \ src/lib_common.o \

View File

@@ -52,7 +52,9 @@
#include "control_socket.h" #include "control_socket.h"
#include <set> #include <map>
#include <sstream>
#include <string>
#include "debug.h" #include "debug.h"
#include "messaging.h" #include "messaging.h"
@@ -95,7 +97,8 @@ struct control_state {
int network_port; int network_port;
struct module *root_module; struct module *root_module;
set<struct stats *> stats; multimap<int32_t, struct stats_reportable *> stats; // first member is ID of stream if applicable
std::map<uint32_t, int> stats_id_port_mapping; // this maps ID from above to index 0..n
pthread_mutex_t stats_lock; pthread_mutex_t stats_lock;
enum connection_type connection_type; enum connection_type connection_type;
@@ -627,33 +630,39 @@ static void * control_thread(void *args)
struct timeval curr_time; struct timeval curr_time;
gettimeofday(&curr_time, NULL); gettimeofday(&curr_time, NULL);
if(tv_diff(curr_time, last_report_sent) > report_interval_sec) { if(tv_diff(curr_time, last_report_sent) > report_interval_sec) {
char buffer[1025]; bool first = true;
bool empty = true; int32_t last_id = -1;
memset(buffer, '\0', sizeof(buffer)); ostringstream buffer;
strncpy(buffer + strlen(buffer), "stats", sizeof(buffer) - buffer << "stats";
strlen(buffer) - 1);
pthread_mutex_lock(&s->stats_lock); pthread_mutex_lock(&s->stats_lock);
for(set<struct stats *>::iterator it = s->stats.begin(); for(auto it = s->stats.begin();
it != s->stats.end(); ++it) { it != s->stats.end(); ++it) {
empty = false; int32_t id = it->first;
strncpy(buffer + strlen(buffer), " ", sizeof(buffer) - if ((first || last_id != id) && id != -1) {
strlen(buffer) - 1); buffer << " -";
stats_format(*it, buffer + strlen(buffer), if (s->stats_id_port_mapping.find(id) != s->stats_id_port_mapping.end()) {
sizeof(buffer) - strlen(buffer)); buffer << s->stats_id_port_mapping.at(id);
} else {
buffer << "UNKNOWN";
}
}
last_id = id;
first = false;
buffer << " " + it->second->get_stat();
} }
pthread_mutex_unlock(&s->stats_lock); pthread_mutex_unlock(&s->stats_lock);
strncpy(buffer + strlen(buffer), "\r\n", sizeof(buffer) - buffer << "\r\n";
strlen(buffer) - 1);
if(strlen(buffer) < 1024 && !empty) { if (!first) { // are there any stats to report?
cur = clients; cur = clients;
string str = buffer.str();
while(cur) { while(cur) {
if(is_internal_port(cur->fd)) { // skip local FD if(is_internal_port(cur->fd)) { // skip local FD
cur = cur->next; cur = cur->next;
continue; continue;
} }
write_all(cur->fd, buffer, strlen(buffer)); write_all(cur->fd, str.c_str(), str.length());
cur = cur->next; cur = cur->next;
} }
} }
@@ -702,17 +711,29 @@ void control_done(struct control_state *s)
delete s; delete s;
} }
void control_add_stats(struct control_state *s, struct stats *stats) void control_add_stats(struct control_state *s, struct stats_reportable *stats, int32_t port_id)
{ {
pthread_mutex_lock(&s->stats_lock); pthread_mutex_lock(&s->stats_lock);
s->stats.insert(stats); s->stats.emplace(port_id, stats);
pthread_mutex_unlock(&s->stats_lock); pthread_mutex_unlock(&s->stats_lock);
} }
void control_remove_stats(struct control_state *s, struct stats *stats) void control_remove_stats(struct control_state *s, struct stats_reportable *stats)
{ {
pthread_mutex_lock(&s->stats_lock); pthread_mutex_lock(&s->stats_lock);
s->stats.erase(stats); for (auto it = s->stats.begin(); it != s->stats.end(); ++it) {
if (it->second == stats) {
s->stats.erase(it);
break;
}
}
pthread_mutex_unlock(&s->stats_lock);
}
void control_replace_port_mapping(struct control_state *s, std::map<uint32_t, int> &&m)
{
pthread_mutex_lock(&s->stats_lock);
s->stats_id_port_mapping = move(m);
pthread_mutex_unlock(&s->stats_lock); pthread_mutex_unlock(&s->stats_lock);
} }

View File

@@ -45,13 +45,14 @@
* *
*/ */
#ifdef __cplusplus #ifndef control_socket_h_
extern "C" { #define control_socket_h_
#endif // __cplusplus
#include <map>
struct control_state; struct control_state;
struct module; struct module;
struct stats; struct stats_reportable;
#define CONTROL_DEFAULT_PORT 5054 #define CONTROL_DEFAULT_PORT 5054
@@ -61,10 +62,9 @@ struct stats;
int control_init(int port, int connection_type, struct control_state **state, struct module *root_module); int control_init(int port, int connection_type, struct control_state **state, struct module *root_module);
void control_start(struct control_state *state); void control_start(struct control_state *state);
void control_done(struct control_state *s); void control_done(struct control_state *s);
void control_add_stats(struct control_state *state, struct stats *stats); void control_add_stats(struct control_state *state, struct stats_reportable *stats, int32_t port_id = -1);
void control_remove_stats(struct control_state *state, struct stats *stats); void control_remove_stats(struct control_state *state, struct stats_reportable *stats);
void control_replace_port_mapping(struct control_state *state, std::map<uint32_t, int> &&);
#ifdef __cplusplus #endif // control_socket_h_
}
#endif // __cplusplus

View File

@@ -22,10 +22,10 @@
#include "hd-rum-translator/hd-rum-decompress.h" #include "hd-rum-translator/hd-rum-decompress.h"
#include "messaging.h" #include "messaging.h"
#include "module.h" #include "module.h"
#include "stats.h"
#include "utils/misc.h" #include "utils/misc.h"
#include "tv.h" #include "tv.h"
#include <map>
#include <string> #include <string>
#include <vector> #include <vector>
@@ -51,8 +51,8 @@ struct replica {
}; };
struct hd_rum_translator_state { struct hd_rum_translator_state {
hd_rum_translator_state() : mod(), queue(nullptr), qhead(nullptr), qtail(nullptr), qempty(1), hd_rum_translator_state() : mod(), control_state(nullptr), queue(nullptr), qhead(nullptr),
qfull(0), decompress(nullptr) { qtail(nullptr), qempty(1), qfull(0), decompress(nullptr) {
module_init_default(&mod); module_init_default(&mod);
mod.cls = MODULE_CLASS_ROOT; mod.cls = MODULE_CLASS_ROOT;
pthread_mutex_init(&qempty_mtx, NULL); pthread_mutex_init(&qempty_mtx, NULL);
@@ -68,6 +68,7 @@ struct hd_rum_translator_state {
module_done(&mod); module_done(&mod);
} }
struct module mod; struct module mod;
struct control_state *control_state;
struct item *queue; struct item *queue;
struct item *qhead; struct item *qhead;
struct item *qtail; struct item *qtail;
@@ -125,6 +126,7 @@ static void replica_init(struct replica *s, const char *addr, int tx_port, int b
module_init_default(&s->mod); module_init_default(&s->mod);
s->mod.cls = MODULE_CLASS_PORT; s->mod.cls = MODULE_CLASS_PORT;
s->mod.priv_data = s; s->mod.priv_data = s;
s->mod.id = lrand48();
module_register(&s->mod, parent); module_register(&s->mod, parent);
} }
@@ -254,6 +256,17 @@ void change_replica_type(struct hd_rum_translator_state *s,
r->type == replica::type_t::RECOMPRESS); r->type == replica::type_t::RECOMPRESS);
} }
static void update_mapping(struct hd_rum_translator_state *s)
{
map<uint32_t, int> mapping;
int i = 0;
for (auto && r : s->replicas) {
mapping[r->mod.id] = i++;
}
control_replace_port_mapping(s->control_state, move(mapping));
}
static void *writer(void *arg) static void *writer(void *arg)
{ {
struct hd_rum_translator_state *s = struct hd_rum_translator_state *s =
@@ -277,6 +290,7 @@ static void *writer(void *arg)
replica_done(s->replicas[index]); replica_done(s->replicas[index]);
delete s->replicas[index]; delete s->replicas[index];
s->replicas.erase(s->replicas.begin() + index); s->replicas.erase(s->replicas.begin() + index);
update_mapping(s);
} else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) {
s->replicas.push_back(new replica()); s->replicas.push_back(new replica());
struct replica *rep = s->replicas[s->replicas.size() - 1]; struct replica *rep = s->replicas[s->replicas.size() - 1];
@@ -286,6 +300,7 @@ static void *writer(void *arg)
int tx_port = atoi(strtok_r(NULL, " ", &save_ptr)); int tx_port = atoi(strtok_r(NULL, " ", &save_ptr));
char *compress = strtok_r(NULL, " ", &save_ptr); char *compress = strtok_r(NULL, " ", &save_ptr);
replica_init(rep, host, tx_port, 100*1000, &s->mod); replica_init(rep, host, tx_port, 100*1000, &s->mod);
update_mapping(s);
if (compress) { if (compress) {
rep->type = replica::type_t::RECOMPRESS; rep->type = replica::type_t::RECOMPRESS;
char *fec = NULL; char *fec = NULL;
@@ -494,7 +509,6 @@ int main(int argc, char **argv)
int host_count; int host_count;
int control_port = CONTROL_DEFAULT_PORT; int control_port = CONTROL_DEFAULT_PORT;
int control_connection_type = 0; int control_connection_type = 0;
struct control_state *control_state = NULL;
#ifdef WIN32 #ifdef WIN32
WSADATA wsaData; WSADATA wsaData;
@@ -597,11 +611,11 @@ int main(int argc, char **argv)
rep = new replica(); rep = new replica();
} }
if(control_init(control_port, control_connection_type, &control_state, &state.mod) != 0) { if(control_init(control_port, control_connection_type, &state.control_state, &state.mod) != 0) {
fprintf(stderr, "Warning: Unable to create remote control.\n"); fprintf(stderr, "Warning: Unable to create remote control.\n");
return EXIT_FAILURE; return EXIT_FAILURE;
} }
control_start(control_state); control_start(state.control_state);
// we need only one shared receiver decompressor for all recompressing streams // we need only one shared receiver decompressor for all recompressing streams
state.decompress = hd_rum_decompress_init(&state.mod);; state.decompress = hd_rum_decompress_init(&state.mod);;
@@ -650,20 +664,16 @@ int main(int argc, char **argv)
hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, true); hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, true);
} }
} }
update_mapping(&state);
if (pthread_create(&thread, NULL, writer, (void *) &state)) { if (pthread_create(&thread, NULL, writer, (void *) &state)) {
fprintf(stderr, "cannot create writer thread\n"); fprintf(stderr, "cannot create writer thread\n");
return 2; return 2;
} }
struct stats *stat_received = stats_new_statistics(
control_state,
"received");
uint64_t received_data = 0; uint64_t received_data = 0;
struct timeval t0, t; struct timeval t0;
struct timeval t_report;
gettimeofday(&t0, NULL); gettimeofday(&t0, NULL);
gettimeofday(&t_report, NULL);
unsigned long long int last_data = 0ull; unsigned long long int last_data = 0ull;
@@ -681,17 +691,14 @@ int main(int argc, char **argv)
pthread_cond_signal(&state.qempty_cond); pthread_cond_signal(&state.qempty_cond);
pthread_mutex_unlock(&state.qempty_mtx); pthread_mutex_unlock(&state.qempty_mtx);
struct timeval t;
gettimeofday(&t, NULL); gettimeofday(&t, NULL);
if(tv_diff(t, t0) > 1.0) { double seconds = tv_diff(t, t0);
stats_update_int(stat_received, received_data);
t0 = t;
}
double seconds = tv_diff(t, t_report);
if (seconds > 5.0) { if (seconds > 5.0) {
unsigned long long int cur_data = (received_data - last_data); unsigned long long int cur_data = (received_data - last_data);
unsigned long long int bps = cur_data / seconds; unsigned long long int bps = cur_data / seconds;
fprintf(stderr, "Received %llu bytes in %g seconds = %llu B/s.\n", cur_data, seconds, bps); fprintf(stderr, "Received %llu bytes in %g seconds = %llu B/s.\n", cur_data, seconds, bps);
t_report = t; t0 = t;
last_data = received_data; last_data = received_data;
} }
} }
@@ -720,8 +727,6 @@ int main(int argc, char **argv)
pthread_cond_signal(&state.qempty_cond); pthread_cond_signal(&state.qempty_cond);
pthread_mutex_unlock(&state.qempty_mtx); pthread_mutex_unlock(&state.qempty_mtx);
stats_destroy(stat_received);
pthread_join(thread, NULL); pthread_join(thread, NULL);
if(state.decompress) { if(state.decompress) {
@@ -733,7 +738,7 @@ int main(int argc, char **argv)
delete state.replicas[i]; delete state.replicas[i];
} }
control_done(control_state); control_done(state.control_state);
#ifdef WIN32 #ifdef WIN32
WSACleanup(); WSACleanup();

View File

@@ -276,3 +276,16 @@ void dump_tree(struct module *node, int indent) {
} }
} }
bool get_port_id(struct module *node, uint32_t *id)
{
while(node->parent) {
if (node->cls == MODULE_CLASS_PORT) {
*id = node->id;
return true;
}
node = node->parent;
}
return false;
}

View File

@@ -100,6 +100,7 @@ struct module {
struct simple_linked_list *msg_queue_childs; ///< messages for childern that were not delivered struct simple_linked_list *msg_queue_childs; ///< messages for childern that were not delivered
void *priv_data; void *priv_data;
uint32_t id;
#ifdef __cplusplus #ifdef __cplusplus
module() = default; module() = default;
@@ -139,6 +140,7 @@ struct module *get_matching_child(struct module *node, const char *path);
struct module *get_root_module(struct module *node); struct module *get_root_module(struct module *node);
struct module *get_parent_module(struct module *node); struct module *get_parent_module(struct module *node);
bool get_port_id(struct module *child_node, uint32_t *id);
void dump_tree(struct module *root, int indent); void dump_tree(struct module *root, int indent);

View File

@@ -93,6 +93,7 @@ struct pbuf {
int last_rtp_seq; int last_rtp_seq;
int should_arrived; int should_arrived;
int cumulative_count; int cumulative_count;
int last_received, last_expected;
uint32_t last_display_ts; uint32_t last_display_ts;
}; };
@@ -302,6 +303,8 @@ void pbuf_insert(struct pbuf *playout_buf, rtp_packet * pkt)
playout_buf->cumulative_count, playout_buf->cumulative_count,
(double) playout_buf->cumulative_count / (double) playout_buf->cumulative_count /
playout_buf->should_arrived * 100.0); playout_buf->should_arrived * 100.0);
playout_buf->last_received = playout_buf->cumulative_count;
playout_buf->last_expected = playout_buf->should_arrived;
playout_buf->should_arrived = playout_buf->should_arrived =
playout_buf->cumulative_count = 0; playout_buf->cumulative_count = 0;
playout_buf->last_display_ts = pkt->ts; playout_buf->last_display_ts = pkt->ts;
@@ -470,3 +473,10 @@ void pbuf_set_playout_delay(struct pbuf *playout_buf, double playout_delay)
playout_buf->playout_delay = playout_delay; playout_buf->playout_delay = playout_delay;
} }
void pbuf_get_packet_count(struct pbuf *playout_buf, int *expected_pkts, int *received_pkts)
{
*expected_pkts = playout_buf->last_expected;
*received_pkts = playout_buf->last_received;
}

View File

@@ -117,6 +117,7 @@ int pbuf_decode(struct pbuf *playout_buf, struct timeval curr_time,
//struct video_frame *framebuffer, int i, struct state_decoder *decoder); //struct video_frame *framebuffer, int i, struct state_decoder *decoder);
void pbuf_remove(struct pbuf *playout_buf, struct timeval curr_time); void pbuf_remove(struct pbuf *playout_buf, struct timeval curr_time);
void pbuf_set_playout_delay(struct pbuf *playout_buf, double playout_delay); void pbuf_set_playout_delay(struct pbuf *playout_buf, double playout_delay);
void pbuf_get_packet_count(struct pbuf *playout_buf, int *expected_pkts, int *received_pkts);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@@ -80,12 +80,14 @@
#include "host.h" #include "host.h"
#include "lib_common.h" #include "lib_common.h"
#include "messaging.h" #include "messaging.h"
#include "module.h"
#include "perf.h" #include "perf.h"
#include "rtp/fec.h" #include "rtp/fec.h"
#include "rtp/rtp.h" #include "rtp/rtp.h"
#include "rtp/rtp_callback.h" #include "rtp/rtp_callback.h"
#include "rtp/pbuf.h" #include "rtp/pbuf.h"
#include "rtp/video_decoders.h" #include "rtp/video_decoders.h"
#include "stats.h"
#include "utils/synchronized_queue.h" #include "utils/synchronized_queue.h"
#include "utils/timed_message.h" #include "utils/timed_message.h"
#include "video.h" #include "video.h"
@@ -262,6 +264,7 @@ struct state_video_decoder
// for statistics // for statistics
/// @{ /// @{
volatile unsigned long int displayed, dropped, corrupted, missing; volatile unsigned long int displayed, dropped, corrupted, missing;
shared_ptr<struct stats<int_fast64_t>> s_total_frames, s_corrupt_frames, s_displayed_frames;
volatile unsigned long int fec_ok, fec_nok; volatile unsigned long int fec_ok, fec_nok;
long int last_buffer_number; long int last_buffer_number;
/// @} /// @}
@@ -430,7 +433,7 @@ static void *fec_thread(void *args) {
} else if (!corrupted_frame_counted) { } else if (!corrupted_frame_counted) {
corrupted_frame_counted = true; corrupted_frame_counted = true;
// count it here because decoder accepts corrupted frames // count it here because decoder accepts corrupted frames
decoder->corrupted++; decoder->s_corrupt_frames->update(++decoder->corrupted);
} }
verbose_msg("\n"); verbose_msg("\n");
} }
@@ -445,8 +448,9 @@ static void *fec_thread(void *args) {
cleanup: cleanup:
if(ret == FALSE) { if(ret == FALSE) {
decoder->corrupted++; decoder->s_corrupt_frames->update(decoder->corrupted++);
decoder->dropped++; decoder->dropped++;
decoder->s_total_frames->update(decoder->displayed + decoder->missing + decoder->dropped);
} }
} }
@@ -556,7 +560,8 @@ static void *decompress_thread(void *args) {
int ret = display_put_frame(decoder->display, int ret = display_put_frame(decoder->display,
decoder->frame, putf_flags); decoder->frame, putf_flags);
if (ret == 0) { if (ret == 0) {
decoder->displayed++; decoder->s_displayed_frames->update(++decoder->displayed);
decoder->s_total_frames->update(decoder->displayed + decoder->missing + decoder->dropped);
} else { } else {
decoder->dropped++; decoder->dropped++;
} }
@@ -657,6 +662,10 @@ struct state_video_decoder *video_decoder_init(struct module *parent,
return NULL; return NULL;
} }
s->s_total_frames = shared_ptr<struct stats<int_fast64_t>>(new stats<int_fast64_t>(parent, "totalframes"));
s->s_corrupt_frames = shared_ptr<struct stats<int_fast64_t>>(new stats<int_fast64_t>(parent, "corruptframes"));
s->s_displayed_frames = shared_ptr<struct stats<int_fast64_t>>(new stats<int_fast64_t>(parent, "displayedframes"));
return s; return s;
} }

View File

@@ -1,68 +0,0 @@
#include "stats.h"
#include <map>
#include <string>
#include "control_socket.h"
#include "compat/platform_spin.h"
#include "debug.h"
#include "messaging.h"
#include "module.h"
#include "utils/resource_manager.h"
#define MAX_ITEMS 100
using namespace std;
struct stats {
public:
stats(string name, struct control_state *control)
: m_name(name), m_val(0), m_control(control) {
platform_spin_init(&m_spin);
control_add_stats(control, this);
}
~stats() {
control_remove_stats(m_control, this);
platform_spin_destroy(&m_spin);
}
void update_int(int64_t val) {
platform_spin_lock(&m_spin);
m_val = val;
platform_spin_unlock(&m_spin);
}
void get_stat(char *buffer, int buffer_len) {
platform_spin_lock(&m_spin);
snprintf(buffer, buffer_len, "%s %lld", m_name.c_str(), (long long) m_val);
platform_spin_unlock(&m_spin);
}
private:
string m_name;
int64_t m_val;
struct control_state *m_control;
platform_spin_t m_spin;
};
struct stats *stats_new_statistics(struct control_state *control, const char * name)
{
return new stats(string(name), control);
}
void stats_update_int(struct stats *s, int64_t val)
{
return s->update_int(val);
}
void stats_format(struct stats *s, char *buffer, int buffer_len)
{
s->get_stat(buffer, buffer_len);
}
void stats_destroy(struct stats *s)
{
delete s;
}

View File

@@ -1,27 +1,53 @@
#ifndef STATS_H_ #ifndef stat_h_
#define STATS_H_ #define stat_h_
#ifdef HAVE_CONFIG_H #include <atomic>
#include "config.h" #include <string>
#include "config_unix.h" #include <sstream>
#include "config_win32.h"
#endif
#ifdef __cplusplus #include "control_socket.h"
extern "C" { #include "module.h"
#endif
struct control_state; struct stats_reportable {
struct stats; virtual std::string get_stat() = 0;
};
struct stats *stats_new_statistics(struct control_state *control, const char * name); template <typename T>
void stats_update_int(struct stats *, int64_t); struct stats : public stats_reportable {
void stats_format(struct stats *s, char *buffer, int buffer_len); public:
void stats_destroy(struct stats *); /**
* @param mod should be the module that collect the statistics or its closest
* ancestor because it should be used to identify branch for which statistics
* are reported.
*/
stats(struct module *mod, std::string name)
: m_name(name), m_val() {
m_control = (struct control_state *) get_module(get_root_module(mod), "control");
uint32_t id;
bool ret = get_port_id(mod, &id);
control_add_stats(m_control, this, ret ? id : -1);
}
#ifdef __cplusplus ~stats() {
} control_remove_stats(m_control, this);
#endif }
#endif// STATS_H_ void update(T val) {
m_val = val;
}
std::string get_stat() {
std::ostringstream oss;
oss << m_name << " " << m_val.load();
return oss.str();
}
private:
struct control_state *m_control;
std::string m_name;
std::atomic<T> m_val;
};
#endif // stat_h_

View File

@@ -45,8 +45,8 @@
#include "debug.h" #include "debug.h"
#include <sstream> #include <sstream>
#include <string>
#include <stdexcept> #include <stdexcept>
#include <string>
#include "host.h" #include "host.h"
#include "lib_common.h" #include "lib_common.h"
@@ -114,7 +114,8 @@ void register_video_rxtx(enum rxtx_protocol proto, struct video_rxtx_info info)
video_rxtx::video_rxtx(map<string, param_u> const &params): m_paused(false), video_rxtx::video_rxtx(map<string, param_u> const &params): m_paused(false),
m_rxtx_mode(params.at("rxtx_mode").i), m_compression(nullptr), m_rxtx_mode(params.at("rxtx_mode").i), m_compression(nullptr),
m_video_exporter(static_cast<struct video_export *>(params.at("exporter").ptr)) { m_video_exporter(static_cast<struct video_export *>(params.at("exporter").ptr)),
m_frames(0), m_t0(std::chrono::steady_clock::now()) {
module_init_default(&m_sender_mod); module_init_default(&m_sender_mod);
m_sender_mod.cls = MODULE_CLASS_SENDER; m_sender_mod.cls = MODULE_CLASS_SENDER;
@@ -196,9 +197,15 @@ void *video_rxtx::sender_loop() {
memset(&saved_vid_desc, 0, sizeof(saved_vid_desc)); memset(&saved_vid_desc, 0, sizeof(saved_vid_desc));
struct module *control_mod = get_module(get_root_module(&m_sender_mod), "control"); stats<int_fast64_t> stat_sendbytes(&m_sender_mod, "sendbytes");
struct stats *stat_data_sent = stats_new_statistics((struct control_state *) stats<double> stat_inexpected(&m_sender_mod, "inexpected");
control_mod, "data"); /**
* @todo
* Theset 3 values should be 3 distinct values in future.
*/
stats<double> stat_inactual(&m_sender_mod, "inactual");
stats<double> stat_outexpected(&m_sender_mod, "outexpected");
stats<double> stat_outactual(&m_sender_mod, "outactual");
while(1) { while(1) {
check_sender_messages(); check_sender_messages();
@@ -212,12 +219,26 @@ void *video_rxtx::sender_loop() {
video_export(m_video_exporter, tx_frame.get()); video_export(m_video_exporter, tx_frame.get());
if (!m_paused) { if (!m_paused) {
stat_inexpected.update(tx_frame->fps);
send_frame(tx_frame); send_frame(tx_frame);
m_frames += 1;
std::chrono::steady_clock::time_point curr_time =
std::chrono::steady_clock::now();
double seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(curr_time - m_t0).count();
if (seconds >= 5.0) {
double fps = m_frames / seconds;
stat_inactual.update(fps);
stat_outexpected.update(fps);
stat_outactual.update(fps);
m_t0 = curr_time;
m_frames = 0;
}
rtp_video_rxtx *rtp_rxtx = dynamic_cast<rtp_video_rxtx *>(this); rtp_video_rxtx *rtp_rxtx = dynamic_cast<rtp_video_rxtx *>(this);
if (rtp_rxtx) { if (rtp_rxtx) {
stats_update_int(stat_data_sent, stat_sendbytes.update(rtp_get_bytes_sent(rtp_rxtx->m_network_devices[0]));
rtp_get_bytes_sent(rtp_rxtx->m_network_devices[0]));
} }
} }
} }
@@ -226,8 +247,6 @@ exit:
module_done(CAST_MODULE(m_compression)); module_done(CAST_MODULE(m_compression));
m_compression = nullptr; m_compression = nullptr;
stats_destroy(stat_data_sent);
return NULL; return NULL;
} }

View File

@@ -38,9 +38,10 @@
#ifndef VIDEO_RXTX_H_ #ifndef VIDEO_RXTX_H_
#define VIDEO_RXTX_H_ #define VIDEO_RXTX_H_
#include <chrono>
#include <map> #include <map>
#include <string>
#include <memory> #include <memory>
#include <string>
#include "module.h" #include "module.h"
@@ -105,6 +106,8 @@ private:
struct compress_state *m_compression; struct compress_state *m_compression;
pthread_mutex_t m_lock; pthread_mutex_t m_lock;
struct video_export *m_video_exporter; struct video_export *m_video_exporter;
int m_frames;
std::chrono::steady_clock::time_point m_t0;
pthread_t m_thread_id; pthread_t m_thread_id;
}; };

View File

@@ -61,7 +61,6 @@
#include "rtp/pbuf.h" #include "rtp/pbuf.h"
#include "rtp/rtp_callback.h" #include "rtp/rtp_callback.h"
#include "tfrc.h" #include "tfrc.h"
#include "stats.h"
#include "transmit.h" #include "transmit.h"
#include "tv.h" #include "tv.h"
#include "ug_runtime_error.h" #include "ug_runtime_error.h"

View File

@@ -72,12 +72,13 @@
#include "video_rxtx/ultragrid_rtp.h" #include "video_rxtx/ultragrid_rtp.h"
#include "utils/worker.h" #include "utils/worker.h"
#include <chrono>
#include <utility> #include <utility>
using namespace std; using namespace std;
ultragrid_rtp_video_rxtx::ultragrid_rtp_video_rxtx(const map<string, param_u> &params) : ultragrid_rtp_video_rxtx::ultragrid_rtp_video_rxtx(const map<string, param_u> &params) :
rtp_video_rxtx(params) rtp_video_rxtx(params), m_stat_nanoperframeactual((struct module *) params.at("parent").ptr, "nanoperframeactual"), m_t0(std::chrono::steady_clock::now()), m_duration(std::chrono::nanoseconds::zero()), m_frames(0)
{ {
if ((params.at("postprocess").ptr != NULL && if ((params.at("postprocess").ptr != NULL &&
strstr((const char *) params.at("postprocess").ptr, "help") != NULL)) { strstr((const char *) params.at("postprocess").ptr, "help") != NULL)) {
@@ -142,6 +143,7 @@ void *ultragrid_rtp_video_rxtx::send_frame_async_callback(void *arg) {
void ultragrid_rtp_video_rxtx::send_frame_async(shared_ptr<video_frame> tx_frame) void ultragrid_rtp_video_rxtx::send_frame_async(shared_ptr<video_frame> tx_frame)
{ {
std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
lock_guard<mutex> lock(m_network_devices_lock); lock_guard<mutex> lock(m_network_devices_lock);
if (m_connections_count == 1) { /* normal case - only one connection */ if (m_connections_count == 1) { /* normal case - only one connection */
@@ -163,8 +165,21 @@ void ultragrid_rtp_video_rxtx::send_frame_async(shared_ptr<video_frame> tx_frame
m_async_sending_lock.lock(); m_async_sending_lock.lock();
m_async_sending = false; m_async_sending = false;
m_async_sending_cv.notify_all();
m_async_sending_lock.unlock(); m_async_sending_lock.unlock();
m_async_sending_cv.notify_all();
std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now();
m_frames += 1;
m_duration += std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0);
auto seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(t1 - m_t0).count();
if (seconds >= 5.0) {
m_stat_nanoperframeactual.update(m_duration.count() / m_frames);
m_t0 = t1;
m_frames = 0;
m_duration = std::chrono::nanoseconds::zero();
}
} }
void ultragrid_rtp_video_rxtx::receiver_process_messages() void ultragrid_rtp_video_rxtx::receiver_process_messages()
@@ -279,14 +294,9 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
fr = 1; fr = 1;
struct module *control_mod = get_module(get_root_module(&m_sender_mod), "control"); stats<int_fast64_t> stat_loss(&m_sender_mod, "loss");
struct stats *stat_loss = stats_new_statistics( stats<int_fast64_t> stat_expectedpacket(&m_sender_mod, "expectedpacket");
(struct control_state *) control_mod, stats<int_fast64_t> stat_receivedpacket(&m_sender_mod, "receivedpacket");
"loss");
struct stats *stat_received = stats_new_statistics(
(struct control_state *) control_mod,
"received");
uint64_t total_received = 0ull;
while (!should_exit_receiver) { while (!should_exit_receiver) {
struct timeval timeout; struct timeval timeout;
@@ -315,8 +325,6 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
receiver_process_messages(); receiver_process_messages();
//printf("Failed to receive data\n"); //printf("Failed to receive data\n");
} }
total_received += ret;
stats_update_int(stat_received, total_received);
/* Decode and render for each participant in the conference... */ /* Decode and render for each participant in the conference... */
pdb_iter_t it; pdb_iter_t it;
@@ -380,9 +388,12 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
} }
last_tile_received = curr_time; last_tile_received = curr_time;
uint32_t sender_ssrc = cp->ssrc; uint32_t sender_ssrc = cp->ssrc;
stats_update_int(stat_loss, stat_loss.update(rtp_compute_fract_lost(m_network_devices[0],
rtp_compute_fract_lost(m_network_devices[0],
sender_ssrc)); sender_ssrc));
int expected_pkts, received_pkts;
pbuf_get_packet_count(cp->playout_buffer, &expected_pkts, &received_pkts);
stat_receivedpacket.update(received_pkts);
stat_expectedpacket.update(expected_pkts);
} }
/* dual-link TIMEOUT - we won't wait for next tiles */ /* dual-link TIMEOUT - we won't wait for next tiles */
@@ -433,9 +444,6 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
// pass posioned pill to display // pass posioned pill to display
display_put_frame(m_display_device, NULL, PUTF_BLOCKING); display_put_frame(m_display_device, NULL, PUTF_BLOCKING);
stats_destroy(stat_loss);
stats_destroy(stat_received);
return 0; return 0;
} }

View File

@@ -38,6 +38,7 @@
#ifndef VIDEO_RXTX_ULTRAGRID_RTP_H_ #ifndef VIDEO_RXTX_ULTRAGRID_RTP_H_
#define VIDEO_RXTX_ULTRAGRID_RTP_H_ #define VIDEO_RXTX_ULTRAGRID_RTP_H_
#include "stats.h"
#include "video_rxtx.h" #include "video_rxtx.h"
#include "video_rxtx/rtp.h" #include "video_rxtx/rtp.h"
@@ -82,6 +83,11 @@ private:
std::condition_variable m_async_sending_cv; std::condition_variable m_async_sending_cv;
std::mutex m_async_sending_lock; std::mutex m_async_sending_lock;
/// @} /// @}
stats<std::chrono::nanoseconds::rep> m_stat_nanoperframeactual;
std::chrono::steady_clock::time_point m_t0;
std::chrono::nanoseconds m_duration;
int m_frames;
}; };
video_rxtx *create_video_rxtx_ultragrid_rtp(std::map<std::string, param_u> const &params); video_rxtx *create_video_rxtx_ultragrid_rtp(std::map<std::string, param_u> const &params);