Collect and report more statistics

Statistics are collected and reported through control socket where it
can be subscribed and further processed (eg. by CoUniverse).
This commit is contained in:
Martin Pulec
2015-02-20 13:20:06 +01:00
parent ceaccf8b6e
commit eb064c7a81
16 changed files with 224 additions and 171 deletions

View File

@@ -111,7 +111,6 @@ OBJS = @OBJS@ \
src/ihdtv/ihdtv.o \
src/module.o \
src/rtsp/rtsp_utils.o \
src/stats.o \
src/utils/config_file.o \
src/utils/list.o \
src/lib_common.o \

View File

@@ -52,7 +52,9 @@
#include "control_socket.h"
#include <set>
#include <map>
#include <sstream>
#include <string>
#include "debug.h"
#include "messaging.h"
@@ -95,7 +97,8 @@ struct control_state {
int network_port;
struct module *root_module;
set<struct stats *> stats;
multimap<int32_t, struct stats_reportable *> stats; // first member is ID of stream if applicable
std::map<uint32_t, int> stats_id_port_mapping; // this maps ID from above to index 0..n
pthread_mutex_t stats_lock;
enum connection_type connection_type;
@@ -627,33 +630,39 @@ static void * control_thread(void *args)
struct timeval curr_time;
gettimeofday(&curr_time, NULL);
if(tv_diff(curr_time, last_report_sent) > report_interval_sec) {
char buffer[1025];
bool empty = true;
memset(buffer, '\0', sizeof(buffer));
strncpy(buffer + strlen(buffer), "stats", sizeof(buffer) -
strlen(buffer) - 1);
bool first = true;
int32_t last_id = -1;
ostringstream buffer;
buffer << "stats";
pthread_mutex_lock(&s->stats_lock);
for(set<struct stats *>::iterator it = s->stats.begin();
for(auto it = s->stats.begin();
it != s->stats.end(); ++it) {
empty = false;
strncpy(buffer + strlen(buffer), " ", sizeof(buffer) -
strlen(buffer) - 1);
stats_format(*it, buffer + strlen(buffer),
sizeof(buffer) - strlen(buffer));
int32_t id = it->first;
if ((first || last_id != id) && id != -1) {
buffer << " -";
if (s->stats_id_port_mapping.find(id) != s->stats_id_port_mapping.end()) {
buffer << s->stats_id_port_mapping.at(id);
} else {
buffer << "UNKNOWN";
}
}
last_id = id;
first = false;
buffer << " " + it->second->get_stat();
}
pthread_mutex_unlock(&s->stats_lock);
strncpy(buffer + strlen(buffer), "\r\n", sizeof(buffer) -
strlen(buffer) - 1);
buffer << "\r\n";
if(strlen(buffer) < 1024 && !empty) {
if (!first) { // are there any stats to report?
cur = clients;
string str = buffer.str();
while(cur) {
if(is_internal_port(cur->fd)) { // skip local FD
cur = cur->next;
continue;
}
write_all(cur->fd, buffer, strlen(buffer));
write_all(cur->fd, str.c_str(), str.length());
cur = cur->next;
}
}
@@ -702,17 +711,29 @@ void control_done(struct control_state *s)
delete s;
}
void control_add_stats(struct control_state *s, struct stats *stats)
void control_add_stats(struct control_state *s, struct stats_reportable *stats, int32_t port_id)
{
pthread_mutex_lock(&s->stats_lock);
s->stats.insert(stats);
s->stats.emplace(port_id, stats);
pthread_mutex_unlock(&s->stats_lock);
}
void control_remove_stats(struct control_state *s, struct stats *stats)
void control_remove_stats(struct control_state *s, struct stats_reportable *stats)
{
pthread_mutex_lock(&s->stats_lock);
s->stats.erase(stats);
for (auto it = s->stats.begin(); it != s->stats.end(); ++it) {
if (it->second == stats) {
s->stats.erase(it);
break;
}
}
pthread_mutex_unlock(&s->stats_lock);
}
void control_replace_port_mapping(struct control_state *s, std::map<uint32_t, int> &&m)
{
pthread_mutex_lock(&s->stats_lock);
s->stats_id_port_mapping = move(m);
pthread_mutex_unlock(&s->stats_lock);
}

View File

@@ -45,13 +45,14 @@
*
*/
#ifdef __cplusplus
extern "C" {
#endif // __cplusplus
#ifndef control_socket_h_
#define control_socket_h_
#include <map>
struct control_state;
struct module;
struct stats;
struct stats_reportable;
#define CONTROL_DEFAULT_PORT 5054
@@ -61,10 +62,9 @@ struct stats;
int control_init(int port, int connection_type, struct control_state **state, struct module *root_module);
void control_start(struct control_state *state);
void control_done(struct control_state *s);
void control_add_stats(struct control_state *state, struct stats *stats);
void control_remove_stats(struct control_state *state, struct stats *stats);
void control_add_stats(struct control_state *state, struct stats_reportable *stats, int32_t port_id = -1);
void control_remove_stats(struct control_state *state, struct stats_reportable *stats);
void control_replace_port_mapping(struct control_state *state, std::map<uint32_t, int> &&);
#ifdef __cplusplus
}
#endif // __cplusplus
#endif // control_socket_h_

View File

@@ -22,10 +22,10 @@
#include "hd-rum-translator/hd-rum-decompress.h"
#include "messaging.h"
#include "module.h"
#include "stats.h"
#include "utils/misc.h"
#include "tv.h"
#include <map>
#include <string>
#include <vector>
@@ -51,8 +51,8 @@ struct replica {
};
struct hd_rum_translator_state {
hd_rum_translator_state() : mod(), queue(nullptr), qhead(nullptr), qtail(nullptr), qempty(1),
qfull(0), decompress(nullptr) {
hd_rum_translator_state() : mod(), control_state(nullptr), queue(nullptr), qhead(nullptr),
qtail(nullptr), qempty(1), qfull(0), decompress(nullptr) {
module_init_default(&mod);
mod.cls = MODULE_CLASS_ROOT;
pthread_mutex_init(&qempty_mtx, NULL);
@@ -68,6 +68,7 @@ struct hd_rum_translator_state {
module_done(&mod);
}
struct module mod;
struct control_state *control_state;
struct item *queue;
struct item *qhead;
struct item *qtail;
@@ -125,6 +126,7 @@ static void replica_init(struct replica *s, const char *addr, int tx_port, int b
module_init_default(&s->mod);
s->mod.cls = MODULE_CLASS_PORT;
s->mod.priv_data = s;
s->mod.id = lrand48();
module_register(&s->mod, parent);
}
@@ -254,6 +256,17 @@ void change_replica_type(struct hd_rum_translator_state *s,
r->type == replica::type_t::RECOMPRESS);
}
static void update_mapping(struct hd_rum_translator_state *s)
{
map<uint32_t, int> mapping;
int i = 0;
for (auto && r : s->replicas) {
mapping[r->mod.id] = i++;
}
control_replace_port_mapping(s->control_state, move(mapping));
}
static void *writer(void *arg)
{
struct hd_rum_translator_state *s =
@@ -277,6 +290,7 @@ static void *writer(void *arg)
replica_done(s->replicas[index]);
delete s->replicas[index];
s->replicas.erase(s->replicas.begin() + index);
update_mapping(s);
} else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) {
s->replicas.push_back(new replica());
struct replica *rep = s->replicas[s->replicas.size() - 1];
@@ -286,6 +300,7 @@ static void *writer(void *arg)
int tx_port = atoi(strtok_r(NULL, " ", &save_ptr));
char *compress = strtok_r(NULL, " ", &save_ptr);
replica_init(rep, host, tx_port, 100*1000, &s->mod);
update_mapping(s);
if (compress) {
rep->type = replica::type_t::RECOMPRESS;
char *fec = NULL;
@@ -494,7 +509,6 @@ int main(int argc, char **argv)
int host_count;
int control_port = CONTROL_DEFAULT_PORT;
int control_connection_type = 0;
struct control_state *control_state = NULL;
#ifdef WIN32
WSADATA wsaData;
@@ -597,11 +611,11 @@ int main(int argc, char **argv)
rep = new replica();
}
if(control_init(control_port, control_connection_type, &control_state, &state.mod) != 0) {
if(control_init(control_port, control_connection_type, &state.control_state, &state.mod) != 0) {
fprintf(stderr, "Warning: Unable to create remote control.\n");
return EXIT_FAILURE;
}
control_start(control_state);
control_start(state.control_state);
// we need only one shared receiver decompressor for all recompressing streams
state.decompress = hd_rum_decompress_init(&state.mod);;
@@ -650,20 +664,16 @@ int main(int argc, char **argv)
hd_rum_decompress_set_active(state.decompress, state.replicas[i]->recompress, true);
}
}
update_mapping(&state);
if (pthread_create(&thread, NULL, writer, (void *) &state)) {
fprintf(stderr, "cannot create writer thread\n");
return 2;
}
struct stats *stat_received = stats_new_statistics(
control_state,
"received");
uint64_t received_data = 0;
struct timeval t0, t;
struct timeval t_report;
struct timeval t0;
gettimeofday(&t0, NULL);
gettimeofday(&t_report, NULL);
unsigned long long int last_data = 0ull;
@@ -681,17 +691,14 @@ int main(int argc, char **argv)
pthread_cond_signal(&state.qempty_cond);
pthread_mutex_unlock(&state.qempty_mtx);
struct timeval t;
gettimeofday(&t, NULL);
if(tv_diff(t, t0) > 1.0) {
stats_update_int(stat_received, received_data);
t0 = t;
}
double seconds = tv_diff(t, t_report);
double seconds = tv_diff(t, t0);
if (seconds > 5.0) {
unsigned long long int cur_data = (received_data - last_data);
unsigned long long int bps = cur_data / seconds;
fprintf(stderr, "Received %llu bytes in %g seconds = %llu B/s.\n", cur_data, seconds, bps);
t_report = t;
t0 = t;
last_data = received_data;
}
}
@@ -720,8 +727,6 @@ int main(int argc, char **argv)
pthread_cond_signal(&state.qempty_cond);
pthread_mutex_unlock(&state.qempty_mtx);
stats_destroy(stat_received);
pthread_join(thread, NULL);
if(state.decompress) {
@@ -733,7 +738,7 @@ int main(int argc, char **argv)
delete state.replicas[i];
}
control_done(control_state);
control_done(state.control_state);
#ifdef WIN32
WSACleanup();

View File

@@ -276,3 +276,16 @@ void dump_tree(struct module *node, int indent) {
}
}
bool get_port_id(struct module *node, uint32_t *id)
{
while(node->parent) {
if (node->cls == MODULE_CLASS_PORT) {
*id = node->id;
return true;
}
node = node->parent;
}
return false;
}

View File

@@ -100,6 +100,7 @@ struct module {
struct simple_linked_list *msg_queue_childs; ///< messages for childern that were not delivered
void *priv_data;
uint32_t id;
#ifdef __cplusplus
module() = default;
@@ -139,6 +140,7 @@ struct module *get_matching_child(struct module *node, const char *path);
struct module *get_root_module(struct module *node);
struct module *get_parent_module(struct module *node);
bool get_port_id(struct module *child_node, uint32_t *id);
void dump_tree(struct module *root, int indent);

View File

@@ -93,6 +93,7 @@ struct pbuf {
int last_rtp_seq;
int should_arrived;
int cumulative_count;
int last_received, last_expected;
uint32_t last_display_ts;
};
@@ -302,6 +303,8 @@ void pbuf_insert(struct pbuf *playout_buf, rtp_packet * pkt)
playout_buf->cumulative_count,
(double) playout_buf->cumulative_count /
playout_buf->should_arrived * 100.0);
playout_buf->last_received = playout_buf->cumulative_count;
playout_buf->last_expected = playout_buf->should_arrived;
playout_buf->should_arrived =
playout_buf->cumulative_count = 0;
playout_buf->last_display_ts = pkt->ts;
@@ -470,3 +473,10 @@ void pbuf_set_playout_delay(struct pbuf *playout_buf, double playout_delay)
playout_buf->playout_delay = playout_delay;
}
void pbuf_get_packet_count(struct pbuf *playout_buf, int *expected_pkts, int *received_pkts)
{
*expected_pkts = playout_buf->last_expected;
*received_pkts = playout_buf->last_received;
}

View File

@@ -117,6 +117,7 @@ int pbuf_decode(struct pbuf *playout_buf, struct timeval curr_time,
//struct video_frame *framebuffer, int i, struct state_decoder *decoder);
void pbuf_remove(struct pbuf *playout_buf, struct timeval curr_time);
void pbuf_set_playout_delay(struct pbuf *playout_buf, double playout_delay);
void pbuf_get_packet_count(struct pbuf *playout_buf, int *expected_pkts, int *received_pkts);
#ifdef __cplusplus
}

View File

@@ -80,12 +80,14 @@
#include "host.h"
#include "lib_common.h"
#include "messaging.h"
#include "module.h"
#include "perf.h"
#include "rtp/fec.h"
#include "rtp/rtp.h"
#include "rtp/rtp_callback.h"
#include "rtp/pbuf.h"
#include "rtp/video_decoders.h"
#include "stats.h"
#include "utils/synchronized_queue.h"
#include "utils/timed_message.h"
#include "video.h"
@@ -262,6 +264,7 @@ struct state_video_decoder
// for statistics
/// @{
volatile unsigned long int displayed, dropped, corrupted, missing;
shared_ptr<struct stats<int_fast64_t>> s_total_frames, s_corrupt_frames, s_displayed_frames;
volatile unsigned long int fec_ok, fec_nok;
long int last_buffer_number;
/// @}
@@ -430,7 +433,7 @@ static void *fec_thread(void *args) {
} else if (!corrupted_frame_counted) {
corrupted_frame_counted = true;
// count it here because decoder accepts corrupted frames
decoder->corrupted++;
decoder->s_corrupt_frames->update(++decoder->corrupted);
}
verbose_msg("\n");
}
@@ -445,8 +448,9 @@ static void *fec_thread(void *args) {
cleanup:
if(ret == FALSE) {
decoder->corrupted++;
decoder->s_corrupt_frames->update(decoder->corrupted++);
decoder->dropped++;
decoder->s_total_frames->update(decoder->displayed + decoder->missing + decoder->dropped);
}
}
@@ -556,7 +560,8 @@ static void *decompress_thread(void *args) {
int ret = display_put_frame(decoder->display,
decoder->frame, putf_flags);
if (ret == 0) {
decoder->displayed++;
decoder->s_displayed_frames->update(++decoder->displayed);
decoder->s_total_frames->update(decoder->displayed + decoder->missing + decoder->dropped);
} else {
decoder->dropped++;
}
@@ -657,6 +662,10 @@ struct state_video_decoder *video_decoder_init(struct module *parent,
return NULL;
}
s->s_total_frames = shared_ptr<struct stats<int_fast64_t>>(new stats<int_fast64_t>(parent, "totalframes"));
s->s_corrupt_frames = shared_ptr<struct stats<int_fast64_t>>(new stats<int_fast64_t>(parent, "corruptframes"));
s->s_displayed_frames = shared_ptr<struct stats<int_fast64_t>>(new stats<int_fast64_t>(parent, "displayedframes"));
return s;
}

View File

@@ -1,68 +0,0 @@
#include "stats.h"
#include <map>
#include <string>
#include "control_socket.h"
#include "compat/platform_spin.h"
#include "debug.h"
#include "messaging.h"
#include "module.h"
#include "utils/resource_manager.h"
#define MAX_ITEMS 100
using namespace std;
struct stats {
public:
stats(string name, struct control_state *control)
: m_name(name), m_val(0), m_control(control) {
platform_spin_init(&m_spin);
control_add_stats(control, this);
}
~stats() {
control_remove_stats(m_control, this);
platform_spin_destroy(&m_spin);
}
void update_int(int64_t val) {
platform_spin_lock(&m_spin);
m_val = val;
platform_spin_unlock(&m_spin);
}
void get_stat(char *buffer, int buffer_len) {
platform_spin_lock(&m_spin);
snprintf(buffer, buffer_len, "%s %lld", m_name.c_str(), (long long) m_val);
platform_spin_unlock(&m_spin);
}
private:
string m_name;
int64_t m_val;
struct control_state *m_control;
platform_spin_t m_spin;
};
struct stats *stats_new_statistics(struct control_state *control, const char * name)
{
return new stats(string(name), control);
}
void stats_update_int(struct stats *s, int64_t val)
{
return s->update_int(val);
}
void stats_format(struct stats *s, char *buffer, int buffer_len)
{
s->get_stat(buffer, buffer_len);
}
void stats_destroy(struct stats *s)
{
delete s;
}

View File

@@ -1,27 +1,53 @@
#ifndef STATS_H_
#define STATS_H_
#ifndef stat_h_
#define stat_h_
#ifdef HAVE_CONFIG_H
#include "config.h"
#include "config_unix.h"
#include "config_win32.h"
#endif
#include <atomic>
#include <string>
#include <sstream>
#ifdef __cplusplus
extern "C" {
#endif
#include "control_socket.h"
#include "module.h"
struct control_state;
struct stats;
struct stats_reportable {
virtual std::string get_stat() = 0;
};
struct stats *stats_new_statistics(struct control_state *control, const char * name);
void stats_update_int(struct stats *, int64_t);
void stats_format(struct stats *s, char *buffer, int buffer_len);
void stats_destroy(struct stats *);
template <typename T>
struct stats : public stats_reportable {
public:
/**
* @param mod should be the module that collect the statistics or its closest
* ancestor because it should be used to identify branch for which statistics
* are reported.
*/
stats(struct module *mod, std::string name)
: m_name(name), m_val() {
m_control = (struct control_state *) get_module(get_root_module(mod), "control");
uint32_t id;
bool ret = get_port_id(mod, &id);
control_add_stats(m_control, this, ret ? id : -1);
}
#ifdef __cplusplus
}
#endif
~stats() {
control_remove_stats(m_control, this);
}
#endif// STATS_H_
void update(T val) {
m_val = val;
}
std::string get_stat() {
std::ostringstream oss;
oss << m_name << " " << m_val.load();
return oss.str();
}
private:
struct control_state *m_control;
std::string m_name;
std::atomic<T> m_val;
};
#endif // stat_h_

View File

@@ -45,8 +45,8 @@
#include "debug.h"
#include <sstream>
#include <string>
#include <stdexcept>
#include <string>
#include "host.h"
#include "lib_common.h"
@@ -114,7 +114,8 @@ void register_video_rxtx(enum rxtx_protocol proto, struct video_rxtx_info info)
video_rxtx::video_rxtx(map<string, param_u> const &params): m_paused(false),
m_rxtx_mode(params.at("rxtx_mode").i), m_compression(nullptr),
m_video_exporter(static_cast<struct video_export *>(params.at("exporter").ptr)) {
m_video_exporter(static_cast<struct video_export *>(params.at("exporter").ptr)),
m_frames(0), m_t0(std::chrono::steady_clock::now()) {
module_init_default(&m_sender_mod);
m_sender_mod.cls = MODULE_CLASS_SENDER;
@@ -196,9 +197,15 @@ void *video_rxtx::sender_loop() {
memset(&saved_vid_desc, 0, sizeof(saved_vid_desc));
struct module *control_mod = get_module(get_root_module(&m_sender_mod), "control");
struct stats *stat_data_sent = stats_new_statistics((struct control_state *)
control_mod, "data");
stats<int_fast64_t> stat_sendbytes(&m_sender_mod, "sendbytes");
stats<double> stat_inexpected(&m_sender_mod, "inexpected");
/**
* @todo
* Theset 3 values should be 3 distinct values in future.
*/
stats<double> stat_inactual(&m_sender_mod, "inactual");
stats<double> stat_outexpected(&m_sender_mod, "outexpected");
stats<double> stat_outactual(&m_sender_mod, "outactual");
while(1) {
check_sender_messages();
@@ -212,12 +219,26 @@ void *video_rxtx::sender_loop() {
video_export(m_video_exporter, tx_frame.get());
if (!m_paused) {
stat_inexpected.update(tx_frame->fps);
send_frame(tx_frame);
m_frames += 1;
std::chrono::steady_clock::time_point curr_time =
std::chrono::steady_clock::now();
double seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(curr_time - m_t0).count();
if (seconds >= 5.0) {
double fps = m_frames / seconds;
stat_inactual.update(fps);
stat_outexpected.update(fps);
stat_outactual.update(fps);
m_t0 = curr_time;
m_frames = 0;
}
rtp_video_rxtx *rtp_rxtx = dynamic_cast<rtp_video_rxtx *>(this);
if (rtp_rxtx) {
stats_update_int(stat_data_sent,
rtp_get_bytes_sent(rtp_rxtx->m_network_devices[0]));
stat_sendbytes.update(rtp_get_bytes_sent(rtp_rxtx->m_network_devices[0]));
}
}
}
@@ -226,8 +247,6 @@ exit:
module_done(CAST_MODULE(m_compression));
m_compression = nullptr;
stats_destroy(stat_data_sent);
return NULL;
}

View File

@@ -38,9 +38,10 @@
#ifndef VIDEO_RXTX_H_
#define VIDEO_RXTX_H_
#include <chrono>
#include <map>
#include <string>
#include <memory>
#include <string>
#include "module.h"
@@ -105,6 +106,8 @@ private:
struct compress_state *m_compression;
pthread_mutex_t m_lock;
struct video_export *m_video_exporter;
int m_frames;
std::chrono::steady_clock::time_point m_t0;
pthread_t m_thread_id;
};

View File

@@ -61,7 +61,6 @@
#include "rtp/pbuf.h"
#include "rtp/rtp_callback.h"
#include "tfrc.h"
#include "stats.h"
#include "transmit.h"
#include "tv.h"
#include "ug_runtime_error.h"

View File

@@ -72,12 +72,13 @@
#include "video_rxtx/ultragrid_rtp.h"
#include "utils/worker.h"
#include <chrono>
#include <utility>
using namespace std;
ultragrid_rtp_video_rxtx::ultragrid_rtp_video_rxtx(const map<string, param_u> &params) :
rtp_video_rxtx(params)
rtp_video_rxtx(params), m_stat_nanoperframeactual((struct module *) params.at("parent").ptr, "nanoperframeactual"), m_t0(std::chrono::steady_clock::now()), m_duration(std::chrono::nanoseconds::zero()), m_frames(0)
{
if ((params.at("postprocess").ptr != NULL &&
strstr((const char *) params.at("postprocess").ptr, "help") != NULL)) {
@@ -142,6 +143,7 @@ void *ultragrid_rtp_video_rxtx::send_frame_async_callback(void *arg) {
void ultragrid_rtp_video_rxtx::send_frame_async(shared_ptr<video_frame> tx_frame)
{
std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
lock_guard<mutex> lock(m_network_devices_lock);
if (m_connections_count == 1) { /* normal case - only one connection */
@@ -163,8 +165,21 @@ void ultragrid_rtp_video_rxtx::send_frame_async(shared_ptr<video_frame> tx_frame
m_async_sending_lock.lock();
m_async_sending = false;
m_async_sending_cv.notify_all();
m_async_sending_lock.unlock();
m_async_sending_cv.notify_all();
std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now();
m_frames += 1;
m_duration += std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0);
auto seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(t1 - m_t0).count();
if (seconds >= 5.0) {
m_stat_nanoperframeactual.update(m_duration.count() / m_frames);
m_t0 = t1;
m_frames = 0;
m_duration = std::chrono::nanoseconds::zero();
}
}
void ultragrid_rtp_video_rxtx::receiver_process_messages()
@@ -279,14 +294,9 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
fr = 1;
struct module *control_mod = get_module(get_root_module(&m_sender_mod), "control");
struct stats *stat_loss = stats_new_statistics(
(struct control_state *) control_mod,
"loss");
struct stats *stat_received = stats_new_statistics(
(struct control_state *) control_mod,
"received");
uint64_t total_received = 0ull;
stats<int_fast64_t> stat_loss(&m_sender_mod, "loss");
stats<int_fast64_t> stat_expectedpacket(&m_sender_mod, "expectedpacket");
stats<int_fast64_t> stat_receivedpacket(&m_sender_mod, "receivedpacket");
while (!should_exit_receiver) {
struct timeval timeout;
@@ -315,8 +325,6 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
receiver_process_messages();
//printf("Failed to receive data\n");
}
total_received += ret;
stats_update_int(stat_received, total_received);
/* Decode and render for each participant in the conference... */
pdb_iter_t it;
@@ -380,9 +388,12 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
}
last_tile_received = curr_time;
uint32_t sender_ssrc = cp->ssrc;
stats_update_int(stat_loss,
rtp_compute_fract_lost(m_network_devices[0],
stat_loss.update(rtp_compute_fract_lost(m_network_devices[0],
sender_ssrc));
int expected_pkts, received_pkts;
pbuf_get_packet_count(cp->playout_buffer, &expected_pkts, &received_pkts);
stat_receivedpacket.update(received_pkts);
stat_expectedpacket.update(expected_pkts);
}
/* dual-link TIMEOUT - we won't wait for next tiles */
@@ -433,9 +444,6 @@ void *ultragrid_rtp_video_rxtx::receiver_loop()
// pass posioned pill to display
display_put_frame(m_display_device, NULL, PUTF_BLOCKING);
stats_destroy(stat_loss);
stats_destroy(stat_received);
return 0;
}

View File

@@ -38,6 +38,7 @@
#ifndef VIDEO_RXTX_ULTRAGRID_RTP_H_
#define VIDEO_RXTX_ULTRAGRID_RTP_H_
#include "stats.h"
#include "video_rxtx.h"
#include "video_rxtx/rtp.h"
@@ -82,6 +83,11 @@ private:
std::condition_variable m_async_sending_cv;
std::mutex m_async_sending_lock;
/// @}
stats<std::chrono::nanoseconds::rep> m_stat_nanoperframeactual;
std::chrono::steady_clock::time_point m_t0;
std::chrono::nanoseconds m_duration;
int m_frames;
};
video_rxtx *create_video_rxtx_ultragrid_rtp(std::map<std::string, param_u> const &params);