diff --git a/src/control.cpp b/src/control.cpp index e809e037a..2eb8cc147 100644 --- a/src/control.cpp +++ b/src/control.cpp @@ -206,7 +206,7 @@ static int process_msg(struct control_state *s, int client_fd, char *message) msg->media_type = TX_MEDIA_VIDEO; strncpy(msg->fec, fec + 6, sizeof(msg->fec) - 1); } else { - resp = new_response(RESPONSE_NOT_FOUND, NULL); + resp = new_response(RESPONSE_NOT_FOUND, strdup("unknown media type")); } if(!resp) { diff --git a/src/hd-rum-translator/hd-rum-recompress.cpp b/src/hd-rum-translator/hd-rum-recompress.cpp index aefeb54c9..1eeea20ca 100644 --- a/src/hd-rum-translator/hd-rum-recompress.cpp +++ b/src/hd-rum-translator/hd-rum-recompress.cpp @@ -55,6 +55,7 @@ static void *worker(void *arg) struct state_recompress *s = (struct state_recompress *) arg; struct timeval t0, t; int frames = 0; + struct module sender_mod; int ret = compress_init(s->parent, s->required_compress, &s->compress); if(ret != 0) { @@ -64,6 +65,10 @@ static void *worker(void *arg) return NULL; } + module_init_default(&sender_mod); + sender_mod.cls = MODULE_CLASS_SENDER; + module_register(&sender_mod, s->parent); + pthread_mutex_unlock(&s->lock); gettimeofday(&t0, NULL); @@ -82,6 +87,13 @@ static void *worker(void *arg) } pthread_mutex_unlock(&s->lock); + struct message *msg; + while((msg = check_message(&sender_mod))) { + struct msg_change_receiver_address *data = (struct msg_change_receiver_address *) msg; + rtp_change_dest(s->network_device, + data->receiver); + free_message(msg); + } struct video_frame *tx_frame = compress_frame((struct compress_state *) s->compress, @@ -117,6 +129,8 @@ static void *worker(void *arg) s->compress = NULL; } + module_done(&sender_mod); + return NULL; } diff --git a/src/hd-rum-translator/hd-rum-translator.c b/src/hd-rum-translator/hd-rum-translator.c index 2a7313862..a6515204f 100644 --- a/src/hd-rum-translator/hd-rum-translator.c +++ b/src/hd-rum-translator/hd-rum-translator.c @@ -20,6 +20,8 @@ #include "hd-rum-translator/hd-rum-recompress.h" #include "hd-rum-translator/hd-rum-decompress.h" #include "module.h" +#include "stats.h" +#include "tv.h" #define EXIT_FAIL_USAGE 1 #define EXIT_INIT_PORT 3 @@ -509,11 +511,19 @@ int main(int argc, char **argv) return 2; } + struct stats *stat_received = stats_new_statistics( + control_state, + "received"); + uint64_t received_data = 0; + struct timeval t0, t; + gettimeofday(&t0, NULL); + /* main loop */ while (!should_exit) { while (state.qtail->next != state.qhead && (state.qtail->size = read(sock_in, state.qtail->buf, SIZE)) > 0 && !should_exit) { + received_data += state.qtail->size; state.qtail = state.qtail->next; @@ -521,6 +531,12 @@ int main(int argc, char **argv) state.qempty = 0; pthread_cond_signal(&state.qempty_cond); pthread_mutex_unlock(&state.qempty_mtx); + + gettimeofday(&t, NULL); + if(tv_diff(t, t0) > 1.0) { + stats_update_int(stat_received, received_data); + t0 = t; + } } if (state.qtail->size <= 0) @@ -547,6 +563,7 @@ int main(int argc, char **argv) pthread_cond_signal(&state.qempty_cond); pthread_mutex_unlock(&state.qempty_mtx); + stats_destroy(stat_received); control_done(control_state); pthread_join(thread, NULL); diff --git a/src/messaging.cpp b/src/messaging.cpp index 1670c3227..0374d89a0 100644 --- a/src/messaging.cpp +++ b/src/messaging.cpp @@ -20,6 +20,8 @@ struct response *send_message(struct module *root, const char *const_path, struc */ if(receiver == NULL) { + fprintf(stderr, "%s not found:\n", const_path); + dump_tree(root, 0); snprintf(buf, sizeof(buf), "(path: %s)", const_path); return new_response(RESPONSE_NOT_FOUND, strdup(buf)); } @@ -42,11 +44,12 @@ struct response *send_message(struct module *root, const char *const_path, struc struct response *send_message_to_receiver(struct module *receiver, struct message *msg) { + lock_guard guard(receiver->lock); if(receiver->msg_callback) { - lock_guard guard(receiver->lock); return receiver->msg_callback(receiver, msg); } else { - return new_response(RESPONSE_NOT_IMPL, NULL); + simple_linked_list_append(receiver->msg_queue, msg); + return new_response(RESPONSE_ACCEPTED, NULL); } } @@ -114,3 +117,16 @@ const char *response_status_to_text(int status) return NULL; } +struct message *check_message(struct module *mod) +{ + struct message *ret; + + lock_guard guard(mod->lock); + + if(simple_linked_list_size(mod->msg_queue) > 0) { + return (struct message *) simple_linked_list_pop(mod->msg_queue); + } else { + return NULL; + } +} + diff --git a/src/messaging.h b/src/messaging.h index 076801139..d7733cecd 100644 --- a/src/messaging.h +++ b/src/messaging.h @@ -77,6 +77,7 @@ struct message *new_message(size_t length); void free_message(struct message *m); const char *response_status_to_text(int status); +struct message *check_message(struct module *); #ifdef __cplusplus } diff --git a/src/module.c b/src/module.c index 283cc18b3..eec393edd 100644 --- a/src/module.c +++ b/src/module.c @@ -120,7 +120,7 @@ const char *module_class_name_pairs[] = { [MODULE_CLASS_ROOT] = "root", [MODULE_CLASS_PORT] = "port", [MODULE_CLASS_COMPRESS] = "compress", - [MODULE_CLASS_DATA] = "compress", + [MODULE_CLASS_DATA] = "data", [MODULE_CLASS_SENDER] = "sender", [MODULE_CLASS_TX] = "transmit", [MODULE_CLASS_AUDIO] = "audio", @@ -213,6 +213,17 @@ struct module *get_module(struct module *root, const char *const_path) return receiver; } +void dump_tree(struct module *node, int indent) { + for(int i = 0; i < indent; ++i) putchar(' '); + + printf("%s\n", module_class_name(node->cls)); + + for(void *it = simple_linked_list_it_init(node->childs); it != NULL; ) { + struct module *child = simple_linked_list_it_next(&it); + dump_tree(child, indent + 2); + } +} + void unlock_module(struct module *module) { pthread_mutex_unlock(&module->lock); diff --git a/src/module.h b/src/module.h index 8ce742c9a..4776cbfa0 100644 --- a/src/module.h +++ b/src/module.h @@ -127,6 +127,8 @@ void unlock_module(struct module *module); */ struct module *get_root_module(struct module *node); +void dump_tree(struct module *root, int indent); + #define CAST_MODULE(x) ((struct module *) x) #ifdef __cplusplus