Reflector: report received bytes

+ some small improvements
This commit is contained in:
Martin Pulec
2013-06-26 10:50:25 +02:00
parent 4a6dd92dc6
commit 29820960df
7 changed files with 65 additions and 4 deletions

View File

@@ -206,7 +206,7 @@ static int process_msg(struct control_state *s, int client_fd, char *message)
msg->media_type = TX_MEDIA_VIDEO;
strncpy(msg->fec, fec + 6, sizeof(msg->fec) - 1);
} else {
resp = new_response(RESPONSE_NOT_FOUND, NULL);
resp = new_response(RESPONSE_NOT_FOUND, strdup("unknown media type"));
}
if(!resp) {

View File

@@ -55,6 +55,7 @@ static void *worker(void *arg)
struct state_recompress *s = (struct state_recompress *) arg;
struct timeval t0, t;
int frames = 0;
struct module sender_mod;
int ret = compress_init(s->parent, s->required_compress, &s->compress);
if(ret != 0) {
@@ -64,6 +65,10 @@ static void *worker(void *arg)
return NULL;
}
module_init_default(&sender_mod);
sender_mod.cls = MODULE_CLASS_SENDER;
module_register(&sender_mod, s->parent);
pthread_mutex_unlock(&s->lock);
gettimeofday(&t0, NULL);
@@ -82,6 +87,13 @@ static void *worker(void *arg)
}
pthread_mutex_unlock(&s->lock);
struct message *msg;
while((msg = check_message(&sender_mod))) {
struct msg_change_receiver_address *data = (struct msg_change_receiver_address *) msg;
rtp_change_dest(s->network_device,
data->receiver);
free_message(msg);
}
struct video_frame *tx_frame =
compress_frame((struct compress_state *) s->compress,
@@ -117,6 +129,8 @@ static void *worker(void *arg)
s->compress = NULL;
}
module_done(&sender_mod);
return NULL;
}

View File

@@ -20,6 +20,8 @@
#include "hd-rum-translator/hd-rum-recompress.h"
#include "hd-rum-translator/hd-rum-decompress.h"
#include "module.h"
#include "stats.h"
#include "tv.h"
#define EXIT_FAIL_USAGE 1
#define EXIT_INIT_PORT 3
@@ -509,11 +511,19 @@ int main(int argc, char **argv)
return 2;
}
struct stats *stat_received = stats_new_statistics(
control_state,
"received");
uint64_t received_data = 0;
struct timeval t0, t;
gettimeofday(&t0, NULL);
/* main loop */
while (!should_exit) {
while (state.qtail->next != state.qhead
&& (state.qtail->size = read(sock_in, state.qtail->buf, SIZE)) > 0
&& !should_exit) {
received_data += state.qtail->size;
state.qtail = state.qtail->next;
@@ -521,6 +531,12 @@ int main(int argc, char **argv)
state.qempty = 0;
pthread_cond_signal(&state.qempty_cond);
pthread_mutex_unlock(&state.qempty_mtx);
gettimeofday(&t, NULL);
if(tv_diff(t, t0) > 1.0) {
stats_update_int(stat_received, received_data);
t0 = t;
}
}
if (state.qtail->size <= 0)
@@ -547,6 +563,7 @@ int main(int argc, char **argv)
pthread_cond_signal(&state.qempty_cond);
pthread_mutex_unlock(&state.qempty_mtx);
stats_destroy(stat_received);
control_done(control_state);
pthread_join(thread, NULL);

View File

@@ -20,6 +20,8 @@ struct response *send_message(struct module *root, const char *const_path, struc
*/
if(receiver == NULL) {
fprintf(stderr, "%s not found:\n", const_path);
dump_tree(root, 0);
snprintf(buf, sizeof(buf), "(path: %s)", const_path);
return new_response(RESPONSE_NOT_FOUND, strdup(buf));
}
@@ -42,11 +44,12 @@ struct response *send_message(struct module *root, const char *const_path, struc
struct response *send_message_to_receiver(struct module *receiver, struct message *msg)
{
lock_guard guard(receiver->lock);
if(receiver->msg_callback) {
lock_guard guard(receiver->lock);
return receiver->msg_callback(receiver, msg);
} else {
return new_response(RESPONSE_NOT_IMPL, NULL);
simple_linked_list_append(receiver->msg_queue, msg);
return new_response(RESPONSE_ACCEPTED, NULL);
}
}
@@ -114,3 +117,16 @@ const char *response_status_to_text(int status)
return NULL;
}
struct message *check_message(struct module *mod)
{
struct message *ret;
lock_guard guard(mod->lock);
if(simple_linked_list_size(mod->msg_queue) > 0) {
return (struct message *) simple_linked_list_pop(mod->msg_queue);
} else {
return NULL;
}
}

View File

@@ -77,6 +77,7 @@ struct message *new_message(size_t length);
void free_message(struct message *m);
const char *response_status_to_text(int status);
struct message *check_message(struct module *);
#ifdef __cplusplus
}

View File

@@ -120,7 +120,7 @@ const char *module_class_name_pairs[] = {
[MODULE_CLASS_ROOT] = "root",
[MODULE_CLASS_PORT] = "port",
[MODULE_CLASS_COMPRESS] = "compress",
[MODULE_CLASS_DATA] = "compress",
[MODULE_CLASS_DATA] = "data",
[MODULE_CLASS_SENDER] = "sender",
[MODULE_CLASS_TX] = "transmit",
[MODULE_CLASS_AUDIO] = "audio",
@@ -213,6 +213,17 @@ struct module *get_module(struct module *root, const char *const_path)
return receiver;
}
void dump_tree(struct module *node, int indent) {
for(int i = 0; i < indent; ++i) putchar(' ');
printf("%s\n", module_class_name(node->cls));
for(void *it = simple_linked_list_it_init(node->childs); it != NULL; ) {
struct module *child = simple_linked_list_it_next(&it);
dump_tree(child, indent + 2);
}
}
void unlock_module(struct module *module)
{
pthread_mutex_unlock(&module->lock);

View File

@@ -127,6 +127,8 @@ void unlock_module(struct module *module);
*/
struct module *get_root_module(struct module *node);
void dump_tree(struct module *root, int indent);
#define CAST_MODULE(x) ((struct module *) x)
#ifdef __cplusplus