Collect and report more statistics

Statistics are collected and reported through control socket where it
can be subscribed and further processed (eg. by CoUniverse).
This commit is contained in:
Martin Pulec
2015-02-20 13:20:06 +01:00
parent ceaccf8b6e
commit eb064c7a81
16 changed files with 224 additions and 171 deletions

View File

@@ -52,7 +52,9 @@
#include "control_socket.h"
#include <set>
#include <map>
#include <sstream>
#include <string>
#include "debug.h"
#include "messaging.h"
@@ -95,7 +97,8 @@ struct control_state {
int network_port;
struct module *root_module;
set<struct stats *> stats;
multimap<int32_t, struct stats_reportable *> stats; // first member is ID of stream if applicable
std::map<uint32_t, int> stats_id_port_mapping; // this maps ID from above to index 0..n
pthread_mutex_t stats_lock;
enum connection_type connection_type;
@@ -627,33 +630,39 @@ static void * control_thread(void *args)
struct timeval curr_time;
gettimeofday(&curr_time, NULL);
if(tv_diff(curr_time, last_report_sent) > report_interval_sec) {
char buffer[1025];
bool empty = true;
memset(buffer, '\0', sizeof(buffer));
strncpy(buffer + strlen(buffer), "stats", sizeof(buffer) -
strlen(buffer) - 1);
bool first = true;
int32_t last_id = -1;
ostringstream buffer;
buffer << "stats";
pthread_mutex_lock(&s->stats_lock);
for(set<struct stats *>::iterator it = s->stats.begin();
for(auto it = s->stats.begin();
it != s->stats.end(); ++it) {
empty = false;
strncpy(buffer + strlen(buffer), " ", sizeof(buffer) -
strlen(buffer) - 1);
stats_format(*it, buffer + strlen(buffer),
sizeof(buffer) - strlen(buffer));
int32_t id = it->first;
if ((first || last_id != id) && id != -1) {
buffer << " -";
if (s->stats_id_port_mapping.find(id) != s->stats_id_port_mapping.end()) {
buffer << s->stats_id_port_mapping.at(id);
} else {
buffer << "UNKNOWN";
}
}
last_id = id;
first = false;
buffer << " " + it->second->get_stat();
}
pthread_mutex_unlock(&s->stats_lock);
strncpy(buffer + strlen(buffer), "\r\n", sizeof(buffer) -
strlen(buffer) - 1);
buffer << "\r\n";
if(strlen(buffer) < 1024 && !empty) {
if (!first) { // are there any stats to report?
cur = clients;
string str = buffer.str();
while(cur) {
if(is_internal_port(cur->fd)) { // skip local FD
cur = cur->next;
continue;
}
write_all(cur->fd, buffer, strlen(buffer));
write_all(cur->fd, str.c_str(), str.length());
cur = cur->next;
}
}
@@ -702,17 +711,29 @@ void control_done(struct control_state *s)
delete s;
}
void control_add_stats(struct control_state *s, struct stats *stats)
void control_add_stats(struct control_state *s, struct stats_reportable *stats, int32_t port_id)
{
pthread_mutex_lock(&s->stats_lock);
s->stats.insert(stats);
s->stats.emplace(port_id, stats);
pthread_mutex_unlock(&s->stats_lock);
}
void control_remove_stats(struct control_state *s, struct stats *stats)
void control_remove_stats(struct control_state *s, struct stats_reportable *stats)
{
pthread_mutex_lock(&s->stats_lock);
s->stats.erase(stats);
for (auto it = s->stats.begin(); it != s->stats.end(); ++it) {
if (it->second == stats) {
s->stats.erase(it);
break;
}
}
pthread_mutex_unlock(&s->stats_lock);
}
void control_replace_port_mapping(struct control_state *s, std::map<uint32_t, int> &&m)
{
pthread_mutex_lock(&s->stats_lock);
s->stats_id_port_mapping = move(m);
pthread_mutex_unlock(&s->stats_lock);
}