From eb064c7a81e348afb29cbccfe710cbf9d7b7291c Mon Sep 17 00:00:00 2001 From: Martin Pulec Date: Fri, 20 Feb 2015 13:20:06 +0100 Subject: [PATCH] Collect and report more statistics Statistics are collected and reported through control socket where it can be subscribed and further processed (eg. by CoUniverse). --- Makefile.in | 1 - src/control_socket.cpp | 63 ++++++++++++------- src/control_socket.h | 18 +++--- src/hd-rum-translator/hd-rum-translator.cpp | 47 +++++++------- src/module.c | 13 ++++ src/module.h | 2 + src/rtp/pbuf.c | 10 +++ src/rtp/pbuf.h | 1 + src/rtp/video_decoders.cpp | 15 ++++- src/stats.cpp | 68 --------------------- src/stats.h | 66 ++++++++++++++------ src/video_rxtx.cpp | 37 ++++++++--- src/video_rxtx.h | 5 +- src/video_rxtx/rtp.cpp | 1 - src/video_rxtx/ultragrid_rtp.cpp | 42 +++++++------ src/video_rxtx/ultragrid_rtp.h | 6 ++ 16 files changed, 224 insertions(+), 171 deletions(-) delete mode 100644 src/stats.cpp diff --git a/Makefile.in b/Makefile.in index ca749dada..a151fbcbc 100644 --- a/Makefile.in +++ b/Makefile.in @@ -111,7 +111,6 @@ OBJS = @OBJS@ \ src/ihdtv/ihdtv.o \ src/module.o \ src/rtsp/rtsp_utils.o \ - src/stats.o \ src/utils/config_file.o \ src/utils/list.o \ src/lib_common.o \ diff --git a/src/control_socket.cpp b/src/control_socket.cpp index c307ea230..d0ac4904d 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -52,7 +52,9 @@ #include "control_socket.h" -#include +#include +#include +#include #include "debug.h" #include "messaging.h" @@ -95,7 +97,8 @@ struct control_state { int network_port; struct module *root_module; - set stats; + multimap stats; // first member is ID of stream if applicable + std::map stats_id_port_mapping; // this maps ID from above to index 0..n pthread_mutex_t stats_lock; enum connection_type connection_type; @@ -627,33 +630,39 @@ static void * control_thread(void *args) struct timeval curr_time; gettimeofday(&curr_time, NULL); if(tv_diff(curr_time, last_report_sent) > report_interval_sec) { - char buffer[1025]; - bool empty = true; - memset(buffer, '\0', sizeof(buffer)); - strncpy(buffer + strlen(buffer), "stats", sizeof(buffer) - - strlen(buffer) - 1); + bool first = true; + int32_t last_id = -1; + ostringstream buffer; + buffer << "stats"; pthread_mutex_lock(&s->stats_lock); - for(set::iterator it = s->stats.begin(); + for(auto it = s->stats.begin(); it != s->stats.end(); ++it) { - empty = false; - strncpy(buffer + strlen(buffer), " ", sizeof(buffer) - - strlen(buffer) - 1); - stats_format(*it, buffer + strlen(buffer), - sizeof(buffer) - strlen(buffer)); + int32_t id = it->first; + if ((first || last_id != id) && id != -1) { + buffer << " -"; + if (s->stats_id_port_mapping.find(id) != s->stats_id_port_mapping.end()) { + buffer << s->stats_id_port_mapping.at(id); + } else { + buffer << "UNKNOWN"; + } + } + last_id = id; + first = false; + buffer << " " + it->second->get_stat(); } pthread_mutex_unlock(&s->stats_lock); - strncpy(buffer + strlen(buffer), "\r\n", sizeof(buffer) - - strlen(buffer) - 1); + buffer << "\r\n"; - if(strlen(buffer) < 1024 && !empty) { + if (!first) { // are there any stats to report? cur = clients; + string str = buffer.str(); while(cur) { if(is_internal_port(cur->fd)) { // skip local FD cur = cur->next; continue; } - write_all(cur->fd, buffer, strlen(buffer)); + write_all(cur->fd, str.c_str(), str.length()); cur = cur->next; } } @@ -702,17 +711,29 @@ void control_done(struct control_state *s) delete s; } -void control_add_stats(struct control_state *s, struct stats *stats) +void control_add_stats(struct control_state *s, struct stats_reportable *stats, int32_t port_id) { pthread_mutex_lock(&s->stats_lock); - s->stats.insert(stats); + s->stats.emplace(port_id, stats); pthread_mutex_unlock(&s->stats_lock); } -void control_remove_stats(struct control_state *s, struct stats *stats) +void control_remove_stats(struct control_state *s, struct stats_reportable *stats) { pthread_mutex_lock(&s->stats_lock); - s->stats.erase(stats); + for (auto it = s->stats.begin(); it != s->stats.end(); ++it) { + if (it->second == stats) { + s->stats.erase(it); + break; + } + } + pthread_mutex_unlock(&s->stats_lock); +} + +void control_replace_port_mapping(struct control_state *s, std::map &&m) +{ + pthread_mutex_lock(&s->stats_lock); + s->stats_id_port_mapping = move(m); pthread_mutex_unlock(&s->stats_lock); } diff --git a/src/control_socket.h b/src/control_socket.h index c22969b83..701943164 100644 --- a/src/control_socket.h +++ b/src/control_socket.h @@ -45,13 +45,14 @@ * */ -#ifdef __cplusplus -extern "C" { -#endif // __cplusplus +#ifndef control_socket_h_ +#define control_socket_h_ + +#include struct control_state; struct module; -struct stats; +struct stats_reportable; #define CONTROL_DEFAULT_PORT 5054 @@ -61,10 +62,9 @@ struct stats; int control_init(int port, int connection_type, struct control_state **state, struct module *root_module); void control_start(struct control_state *state); void control_done(struct control_state *s); -void control_add_stats(struct control_state *state, struct stats *stats); -void control_remove_stats(struct control_state *state, struct stats *stats); +void control_add_stats(struct control_state *state, struct stats_reportable *stats, int32_t port_id = -1); +void control_remove_stats(struct control_state *state, struct stats_reportable *stats); +void control_replace_port_mapping(struct control_state *state, std::map &&); -#ifdef __cplusplus -} -#endif // __cplusplus +#endif // control_socket_h_ diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index d4cdbc388..bd7d4b999 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -22,10 +22,10 @@ #include "hd-rum-translator/hd-rum-decompress.h" #include "messaging.h" #include "module.h" -#include "stats.h" #include "utils/misc.h" #include "tv.h" +#include #include #include @@ -51,8 +51,8 @@ struct replica { }; struct hd_rum_translator_state { - hd_rum_translator_state() : mod(), queue(nullptr), qhead(nullptr), qtail(nullptr), qempty(1), - qfull(0), decompress(nullptr) { + hd_rum_translator_state() : mod(), control_state(nullptr), queue(nullptr), qhead(nullptr), + qtail(nullptr), qempty(1), qfull(0), decompress(nullptr) { module_init_default(&mod); mod.cls = MODULE_CLASS_ROOT; pthread_mutex_init(&qempty_mtx, NULL); @@ -68,6 +68,7 @@ struct hd_rum_translator_state { module_done(&mod); } struct module mod; + struct control_state *control_state; struct item *queue; struct item *qhead; struct item *qtail; @@ -125,6 +126,7 @@ static void replica_init(struct replica *s, const char *addr, int tx_port, int b module_init_default(&s->mod); s->mod.cls = MODULE_CLASS_PORT; s->mod.priv_data = s; + s->mod.id = lrand48(); module_register(&s->mod, parent); } @@ -254,6 +256,17 @@ void change_replica_type(struct hd_rum_translator_state *s, r->type == replica::type_t::RECOMPRESS); } +static void update_mapping(struct hd_rum_translator_state *s) +{ + map mapping; + int i = 0; + for (auto && r : s->replicas) { + mapping[r->mod.id] = i++; + } + + control_replace_port_mapping(s->control_state, move(mapping)); +} + static void *writer(void *arg) { struct hd_rum_translator_state *s = @@ -277,6 +290,7 @@ static void *writer(void *arg) replica_done(s->replicas[index]); delete s->replicas[index]; s->replicas.erase(s->replicas.begin() + index); + update_mapping(s); } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { s->replicas.push_back(new replica()); struct replica *rep = s->replicas[s->replicas.size() - 1]; @@ -286,6 +300,7 @@ static void *writer(void *arg) int tx_port = atoi(strtok_r(NULL, " ", &save_ptr)); char *compress = strtok_r(NULL, " ", &save_ptr); replica_init(rep, host, tx_port, 100*1000, &s->mod); + update_mapping(s); if (compress) { rep->type = replica::type_t::RECOMPRESS; char *fec = NULL; @@ -494,7 +509,6 @@ int main(int argc, char **argv) int host_count; int control_port = CONTROL_DEFAULT_PORT; int control_connection_type = 0; - struct control_state *control_state = NULL; #ifdef WIN32 WSADATA wsaData; @@ -597,11 +611,11 @@ int main(int argc, char **argv) rep = new replica(); } - if(control_init(control_port, control_connection_type, &control_state, &state.mod) != 0) { + if(control_init(control_port, control_connection_type, &state.control_state, &state.mod) != 0) { fprintf(stderr, "Warning: Unable to create remote control.\n"); return EXIT_FAILURE; } - control_start(control_state); + control_start(state.control_state); // we need only one shared receiver decompressor for all recompressing streams state.decompress = hd_rum_decompress_init(&state.mod);; @@ -650,20 +664,16 @@ int main(int argc, char **argv) hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, true); } } + update_mapping(&state); if (pthread_create(&thread, NULL, writer, (void *) &state)) { fprintf(stderr, "cannot create writer thread\n"); return 2; } - struct stats *stat_received = stats_new_statistics( - control_state, - "received"); uint64_t received_data = 0; - struct timeval t0, t; - struct timeval t_report; + struct timeval t0; gettimeofday(&t0, NULL); - gettimeofday(&t_report, NULL); unsigned long long int last_data = 0ull; @@ -681,17 +691,14 @@ int main(int argc, char **argv) pthread_cond_signal(&state.qempty_cond); pthread_mutex_unlock(&state.qempty_mtx); + struct timeval t; gettimeofday(&t, NULL); - if(tv_diff(t, t0) > 1.0) { - stats_update_int(stat_received, received_data); - t0 = t; - } - double seconds = tv_diff(t, t_report); + double seconds = tv_diff(t, t0); if (seconds > 5.0) { unsigned long long int cur_data = (received_data - last_data); unsigned long long int bps = cur_data / seconds; fprintf(stderr, "Received %llu bytes in %g seconds = %llu B/s.\n", cur_data, seconds, bps); - t_report = t; + t0 = t; last_data = received_data; } } @@ -720,8 +727,6 @@ int main(int argc, char **argv) pthread_cond_signal(&state.qempty_cond); pthread_mutex_unlock(&state.qempty_mtx); - stats_destroy(stat_received); - pthread_join(thread, NULL); if(state.decompress) { @@ -733,7 +738,7 @@ int main(int argc, char **argv) delete state.replicas[i]; } - control_done(control_state); + control_done(state.control_state); #ifdef WIN32 WSACleanup(); diff --git a/src/module.c b/src/module.c index 09f3e914f..2bc0d66d0 100644 --- a/src/module.c +++ b/src/module.c @@ -276,3 +276,16 @@ void dump_tree(struct module *node, int indent) { } } +bool get_port_id(struct module *node, uint32_t *id) +{ + while(node->parent) { + if (node->cls == MODULE_CLASS_PORT) { + *id = node->id; + return true; + } + node = node->parent; + } + + return false; +} + diff --git a/src/module.h b/src/module.h index 5ec933300..8232be1ea 100644 --- a/src/module.h +++ b/src/module.h @@ -100,6 +100,7 @@ struct module { struct simple_linked_list *msg_queue_childs; ///< messages for childern that were not delivered void *priv_data; + uint32_t id; #ifdef __cplusplus module() = default; @@ -139,6 +140,7 @@ struct module *get_matching_child(struct module *node, const char *path); struct module *get_root_module(struct module *node); struct module *get_parent_module(struct module *node); +bool get_port_id(struct module *child_node, uint32_t *id); void dump_tree(struct module *root, int indent); diff --git a/src/rtp/pbuf.c b/src/rtp/pbuf.c index b023c1640..004e4d868 100644 --- a/src/rtp/pbuf.c +++ b/src/rtp/pbuf.c @@ -93,6 +93,7 @@ struct pbuf { int last_rtp_seq; int should_arrived; int cumulative_count; + int last_received, last_expected; uint32_t last_display_ts; }; @@ -302,6 +303,8 @@ void pbuf_insert(struct pbuf *playout_buf, rtp_packet * pkt) playout_buf->cumulative_count, (double) playout_buf->cumulative_count / playout_buf->should_arrived * 100.0); + playout_buf->last_received = playout_buf->cumulative_count; + playout_buf->last_expected = playout_buf->should_arrived; playout_buf->should_arrived = playout_buf->cumulative_count = 0; playout_buf->last_display_ts = pkt->ts; @@ -470,3 +473,10 @@ void pbuf_set_playout_delay(struct pbuf *playout_buf, double playout_delay) playout_buf->playout_delay = playout_delay; } +void pbuf_get_packet_count(struct pbuf *playout_buf, int *expected_pkts, int *received_pkts) +{ + *expected_pkts = playout_buf->last_expected; + *received_pkts = playout_buf->last_received; + +} + diff --git a/src/rtp/pbuf.h b/src/rtp/pbuf.h index 71a1d3dd4..ca101b258 100644 --- a/src/rtp/pbuf.h +++ b/src/rtp/pbuf.h @@ -117,6 +117,7 @@ int pbuf_decode(struct pbuf *playout_buf, struct timeval curr_time, //struct video_frame *framebuffer, int i, struct state_decoder *decoder); void pbuf_remove(struct pbuf *playout_buf, struct timeval curr_time); void pbuf_set_playout_delay(struct pbuf *playout_buf, double playout_delay); +void pbuf_get_packet_count(struct pbuf *playout_buf, int *expected_pkts, int *received_pkts); #ifdef __cplusplus } diff --git a/src/rtp/video_decoders.cpp b/src/rtp/video_decoders.cpp index 878786830..f09aa8d18 100644 --- a/src/rtp/video_decoders.cpp +++ b/src/rtp/video_decoders.cpp @@ -80,12 +80,14 @@ #include "host.h" #include "lib_common.h" #include "messaging.h" +#include "module.h" #include "perf.h" #include "rtp/fec.h" #include "rtp/rtp.h" #include "rtp/rtp_callback.h" #include "rtp/pbuf.h" #include "rtp/video_decoders.h" +#include "stats.h" #include "utils/synchronized_queue.h" #include "utils/timed_message.h" #include "video.h" @@ -262,6 +264,7 @@ struct state_video_decoder // for statistics /// @{ volatile unsigned long int displayed, dropped, corrupted, missing; + shared_ptr> s_total_frames, s_corrupt_frames, s_displayed_frames; volatile unsigned long int fec_ok, fec_nok; long int last_buffer_number; /// @} @@ -430,7 +433,7 @@ static void *fec_thread(void *args) { } else if (!corrupted_frame_counted) { corrupted_frame_counted = true; // count it here because decoder accepts corrupted frames - decoder->corrupted++; + decoder->s_corrupt_frames->update(++decoder->corrupted); } verbose_msg("\n"); } @@ -445,8 +448,9 @@ static void *fec_thread(void *args) { cleanup: if(ret == FALSE) { - decoder->corrupted++; + decoder->s_corrupt_frames->update(decoder->corrupted++); decoder->dropped++; + decoder->s_total_frames->update(decoder->displayed + decoder->missing + decoder->dropped); } } @@ -556,7 +560,8 @@ static void *decompress_thread(void *args) { int ret = display_put_frame(decoder->display, decoder->frame, putf_flags); if (ret == 0) { - decoder->displayed++; + decoder->s_displayed_frames->update(++decoder->displayed); + decoder->s_total_frames->update(decoder->displayed + decoder->missing + decoder->dropped); } else { decoder->dropped++; } @@ -657,6 +662,10 @@ struct state_video_decoder *video_decoder_init(struct module *parent, return NULL; } + s->s_total_frames = shared_ptr>(new stats(parent, "totalframes")); + s->s_corrupt_frames = shared_ptr>(new stats(parent, "corruptframes")); + s->s_displayed_frames = shared_ptr>(new stats(parent, "displayedframes")); + return s; } diff --git a/src/stats.cpp b/src/stats.cpp deleted file mode 100644 index 962112917..000000000 --- a/src/stats.cpp +++ /dev/null @@ -1,68 +0,0 @@ -#include "stats.h" - -#include -#include - -#include "control_socket.h" -#include "compat/platform_spin.h" -#include "debug.h" -#include "messaging.h" -#include "module.h" -#include "utils/resource_manager.h" - -#define MAX_ITEMS 100 - -using namespace std; - -struct stats { - public: - stats(string name, struct control_state *control) - : m_name(name), m_val(0), m_control(control) { - platform_spin_init(&m_spin); - control_add_stats(control, this); - } - - ~stats() { - control_remove_stats(m_control, this); - platform_spin_destroy(&m_spin); - } - - void update_int(int64_t val) { - platform_spin_lock(&m_spin); - m_val = val; - platform_spin_unlock(&m_spin); - } - - void get_stat(char *buffer, int buffer_len) { - platform_spin_lock(&m_spin); - snprintf(buffer, buffer_len, "%s %lld", m_name.c_str(), (long long) m_val); - platform_spin_unlock(&m_spin); - } - - private: - string m_name; - int64_t m_val; - struct control_state *m_control; - platform_spin_t m_spin; -}; - -struct stats *stats_new_statistics(struct control_state *control, const char * name) -{ - return new stats(string(name), control); -} - -void stats_update_int(struct stats *s, int64_t val) -{ - return s->update_int(val); -} - -void stats_format(struct stats *s, char *buffer, int buffer_len) -{ - s->get_stat(buffer, buffer_len); -} - -void stats_destroy(struct stats *s) -{ - delete s; -} - diff --git a/src/stats.h b/src/stats.h index 6953cfa1b..6120e241a 100644 --- a/src/stats.h +++ b/src/stats.h @@ -1,27 +1,53 @@ -#ifndef STATS_H_ -#define STATS_H_ +#ifndef stat_h_ +#define stat_h_ -#ifdef HAVE_CONFIG_H -#include "config.h" -#include "config_unix.h" -#include "config_win32.h" -#endif +#include +#include +#include -#ifdef __cplusplus -extern "C" { -#endif +#include "control_socket.h" +#include "module.h" -struct control_state; -struct stats; +struct stats_reportable { + virtual std::string get_stat() = 0; +}; -struct stats *stats_new_statistics(struct control_state *control, const char * name); -void stats_update_int(struct stats *, int64_t); -void stats_format(struct stats *s, char *buffer, int buffer_len); -void stats_destroy(struct stats *); +template +struct stats : public stats_reportable { + public: + /** + * @param mod should be the module that collect the statistics or its closest + * ancestor because it should be used to identify branch for which statistics + * are reported. + */ + stats(struct module *mod, std::string name) + : m_name(name), m_val() { + m_control = (struct control_state *) get_module(get_root_module(mod), "control"); + uint32_t id; + bool ret = get_port_id(mod, &id); + control_add_stats(m_control, this, ret ? id : -1); + } -#ifdef __cplusplus -} -#endif + ~stats() { + control_remove_stats(m_control, this); + } -#endif// STATS_H_ + void update(T val) { + m_val = val; + } + + std::string get_stat() { + std::ostringstream oss; + oss << m_name << " " << m_val.load(); + return oss.str(); + } + + + private: + struct control_state *m_control; + std::string m_name; + std::atomic m_val; +}; + +#endif // stat_h_ diff --git a/src/video_rxtx.cpp b/src/video_rxtx.cpp index 302ee1299..51d5781f2 100644 --- a/src/video_rxtx.cpp +++ b/src/video_rxtx.cpp @@ -45,8 +45,8 @@ #include "debug.h" #include -#include #include +#include #include "host.h" #include "lib_common.h" @@ -114,7 +114,8 @@ void register_video_rxtx(enum rxtx_protocol proto, struct video_rxtx_info info) video_rxtx::video_rxtx(map const ¶ms): m_paused(false), m_rxtx_mode(params.at("rxtx_mode").i), m_compression(nullptr), - m_video_exporter(static_cast(params.at("exporter").ptr)) { + m_video_exporter(static_cast(params.at("exporter").ptr)), + m_frames(0), m_t0(std::chrono::steady_clock::now()) { module_init_default(&m_sender_mod); m_sender_mod.cls = MODULE_CLASS_SENDER; @@ -196,9 +197,15 @@ void *video_rxtx::sender_loop() { memset(&saved_vid_desc, 0, sizeof(saved_vid_desc)); - struct module *control_mod = get_module(get_root_module(&m_sender_mod), "control"); - struct stats *stat_data_sent = stats_new_statistics((struct control_state *) - control_mod, "data"); + stats stat_sendbytes(&m_sender_mod, "sendbytes"); + stats stat_inexpected(&m_sender_mod, "inexpected"); + /** + * @todo + * Theset 3 values should be 3 distinct values in future. + */ + stats stat_inactual(&m_sender_mod, "inactual"); + stats stat_outexpected(&m_sender_mod, "outexpected"); + stats stat_outactual(&m_sender_mod, "outactual"); while(1) { check_sender_messages(); @@ -212,12 +219,26 @@ void *video_rxtx::sender_loop() { video_export(m_video_exporter, tx_frame.get()); if (!m_paused) { + stat_inexpected.update(tx_frame->fps); send_frame(tx_frame); + m_frames += 1; + std::chrono::steady_clock::time_point curr_time = + std::chrono::steady_clock::now(); + double seconds = + std::chrono::duration_cast>(curr_time - m_t0).count(); + if (seconds >= 5.0) { + double fps = m_frames / seconds; + stat_inactual.update(fps); + stat_outexpected.update(fps); + stat_outactual.update(fps); + m_t0 = curr_time; + m_frames = 0; + } + rtp_video_rxtx *rtp_rxtx = dynamic_cast(this); if (rtp_rxtx) { - stats_update_int(stat_data_sent, - rtp_get_bytes_sent(rtp_rxtx->m_network_devices[0])); + stat_sendbytes.update(rtp_get_bytes_sent(rtp_rxtx->m_network_devices[0])); } } } @@ -226,8 +247,6 @@ exit: module_done(CAST_MODULE(m_compression)); m_compression = nullptr; - stats_destroy(stat_data_sent); - return NULL; } diff --git a/src/video_rxtx.h b/src/video_rxtx.h index 38cbd3737..5dc4d1cf2 100644 --- a/src/video_rxtx.h +++ b/src/video_rxtx.h @@ -38,9 +38,10 @@ #ifndef VIDEO_RXTX_H_ #define VIDEO_RXTX_H_ +#include #include -#include #include +#include #include "module.h" @@ -105,6 +106,8 @@ private: struct compress_state *m_compression; pthread_mutex_t m_lock; struct video_export *m_video_exporter; + int m_frames; + std::chrono::steady_clock::time_point m_t0; pthread_t m_thread_id; }; diff --git a/src/video_rxtx/rtp.cpp b/src/video_rxtx/rtp.cpp index 7ea214fff..bd9b7ac0e 100644 --- a/src/video_rxtx/rtp.cpp +++ b/src/video_rxtx/rtp.cpp @@ -61,7 +61,6 @@ #include "rtp/pbuf.h" #include "rtp/rtp_callback.h" #include "tfrc.h" -#include "stats.h" #include "transmit.h" #include "tv.h" #include "ug_runtime_error.h" diff --git a/src/video_rxtx/ultragrid_rtp.cpp b/src/video_rxtx/ultragrid_rtp.cpp index 1cc3e4abf..db6ace3b9 100644 --- a/src/video_rxtx/ultragrid_rtp.cpp +++ b/src/video_rxtx/ultragrid_rtp.cpp @@ -72,12 +72,13 @@ #include "video_rxtx/ultragrid_rtp.h" #include "utils/worker.h" +#include #include using namespace std; ultragrid_rtp_video_rxtx::ultragrid_rtp_video_rxtx(const map ¶ms) : - rtp_video_rxtx(params) + rtp_video_rxtx(params), m_stat_nanoperframeactual((struct module *) params.at("parent").ptr, "nanoperframeactual"), m_t0(std::chrono::steady_clock::now()), m_duration(std::chrono::nanoseconds::zero()), m_frames(0) { if ((params.at("postprocess").ptr != NULL && strstr((const char *) params.at("postprocess").ptr, "help") != NULL)) { @@ -142,6 +143,7 @@ void *ultragrid_rtp_video_rxtx::send_frame_async_callback(void *arg) { void ultragrid_rtp_video_rxtx::send_frame_async(shared_ptr tx_frame) { + std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); lock_guard lock(m_network_devices_lock); if (m_connections_count == 1) { /* normal case - only one connection */ @@ -163,8 +165,21 @@ void ultragrid_rtp_video_rxtx::send_frame_async(shared_ptr tx_frame m_async_sending_lock.lock(); m_async_sending = false; - m_async_sending_cv.notify_all(); m_async_sending_lock.unlock(); + m_async_sending_cv.notify_all(); + + std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now(); + m_frames += 1; + m_duration += std::chrono::duration_cast(t1 - t0); + auto seconds = + std::chrono::duration_cast>(t1 - m_t0).count(); + if (seconds >= 5.0) { + m_stat_nanoperframeactual.update(m_duration.count() / m_frames); + m_t0 = t1; + m_frames = 0; + m_duration = std::chrono::nanoseconds::zero(); + } + } void ultragrid_rtp_video_rxtx::receiver_process_messages() @@ -279,14 +294,9 @@ void *ultragrid_rtp_video_rxtx::receiver_loop() fr = 1; - struct module *control_mod = get_module(get_root_module(&m_sender_mod), "control"); - struct stats *stat_loss = stats_new_statistics( - (struct control_state *) control_mod, - "loss"); - struct stats *stat_received = stats_new_statistics( - (struct control_state *) control_mod, - "received"); - uint64_t total_received = 0ull; + stats stat_loss(&m_sender_mod, "loss"); + stats stat_expectedpacket(&m_sender_mod, "expectedpacket"); + stats stat_receivedpacket(&m_sender_mod, "receivedpacket"); while (!should_exit_receiver) { struct timeval timeout; @@ -315,8 +325,6 @@ void *ultragrid_rtp_video_rxtx::receiver_loop() receiver_process_messages(); //printf("Failed to receive data\n"); } - total_received += ret; - stats_update_int(stat_received, total_received); /* Decode and render for each participant in the conference... */ pdb_iter_t it; @@ -380,9 +388,12 @@ void *ultragrid_rtp_video_rxtx::receiver_loop() } last_tile_received = curr_time; uint32_t sender_ssrc = cp->ssrc; - stats_update_int(stat_loss, - rtp_compute_fract_lost(m_network_devices[0], + stat_loss.update(rtp_compute_fract_lost(m_network_devices[0], sender_ssrc)); + int expected_pkts, received_pkts; + pbuf_get_packet_count(cp->playout_buffer, &expected_pkts, &received_pkts); + stat_receivedpacket.update(received_pkts); + stat_expectedpacket.update(expected_pkts); } /* dual-link TIMEOUT - we won't wait for next tiles */ @@ -433,9 +444,6 @@ void *ultragrid_rtp_video_rxtx::receiver_loop() // pass posioned pill to display display_put_frame(m_display_device, NULL, PUTF_BLOCKING); - stats_destroy(stat_loss); - stats_destroy(stat_received); - return 0; } diff --git a/src/video_rxtx/ultragrid_rtp.h b/src/video_rxtx/ultragrid_rtp.h index c40cc1bad..2b7f9d673 100644 --- a/src/video_rxtx/ultragrid_rtp.h +++ b/src/video_rxtx/ultragrid_rtp.h @@ -38,6 +38,7 @@ #ifndef VIDEO_RXTX_ULTRAGRID_RTP_H_ #define VIDEO_RXTX_ULTRAGRID_RTP_H_ +#include "stats.h" #include "video_rxtx.h" #include "video_rxtx/rtp.h" @@ -82,6 +83,11 @@ private: std::condition_variable m_async_sending_cv; std::mutex m_async_sending_lock; /// @} + + stats m_stat_nanoperframeactual; + std::chrono::steady_clock::time_point m_t0; + std::chrono::nanoseconds m_duration; + int m_frames; }; video_rxtx *create_video_rxtx_ultragrid_rtp(std::map const ¶ms);