From fecff36f43fcb002f640adbf735eb21b90e0235d Mon Sep 17 00:00:00 2001 From: Martin Pulec Date: Thu, 11 Jul 2013 12:47:55 +0200 Subject: [PATCH] Control: start after modules are able to respond --- src/control_socket.cpp | 29 ++++++++------ src/control_socket.h | 1 + src/main.c | 86 +++++++++++++++++++++++++----------------- 3 files changed, 70 insertions(+), 46 deletions(-) diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 084f3966a..09b397a27 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -101,6 +101,8 @@ struct control_state { enum connection_type connection_type; fd_t socket_fd; + + bool started; }; #define CONTROL_EXIT -1 @@ -160,6 +162,7 @@ int control_init(int port, struct control_state **state, struct module *root_mod control_state *s = new control_state; s->root_module = root_module; + s->started = false; pthread_mutex_init(&s->stats_lock, NULL); @@ -225,22 +228,26 @@ int control_init(int port, struct control_state **state, struct module *root_mod } } + module_register(&s->mod, root_module); + + *state = s; + return 0; +} + +void control_start(struct control_state *s) +{ fd_t sock; sock = create_internal_port(&s->local_port); if(pthread_create(&s->thread_id, NULL, control_thread, s) != 0) { fprintf(stderr, "Unable to create thread.\n"); free(s); - return -1; + abort(); } s->internal_fd[0] = accept(sock, NULL, NULL); close(sock); - - module_register(&s->mod, root_module); - - *state = s; - return 0; + s->started = true; } #define prefix_matches(x,y) strncasecmp(x, y, strlen(y)) == 0 @@ -617,11 +624,11 @@ void control_done(struct control_state *s) module_done(&s->mod); - write_all(s->internal_fd[0], "quit\r\n", 6); - - pthread_join(s->thread_id, NULL); - - close(s->internal_fd[0]); + if(s->started) { + write_all(s->internal_fd[0], "quit\r\n", 6); + pthread_join(s->thread_id, NULL); + close(s->internal_fd[0]); + } if(s->connection_type == SERVER) { // for client, the socket has already been closed // by the time of control_thread exit diff --git a/src/control_socket.h b/src/control_socket.h index b48a55918..1284e326a 100644 --- a/src/control_socket.h +++ b/src/control_socket.h @@ -59,6 +59,7 @@ struct stats; * @retval 0 if success */ int control_init(int port, struct control_state **state, struct module *root_module); +void control_start(struct control_state *state); void control_done(struct control_state *s); void control_add_stats(struct control_state *state, struct stats *stats); void control_remove_stats(struct control_state *state, struct stats *stats); diff --git a/src/main.c b/src/main.c index 058beaee6..59c3f67cf 100644 --- a/src/main.c +++ b/src/main.c @@ -17,25 +17,25 @@ * Redistribution and use in source and binary forms, with or without * modification, is permitted provided that the following conditions * are met: - * + * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. - * + * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. - * + * * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgement: - * + * * This product includes software developed by the University of Southern * California Information Sciences Institute. This product also includes * software developed by CESNET z.s.p.o. - * + * * 4. Neither the name of the University nor of the Institute may be used * to endorse or promote products derived from this software without * specific prior written permission. - * + * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY @@ -144,15 +144,15 @@ struct state_uv { unsigned int connections_count; int mode; // MODE_SENDER, MODE_RECEIVER or both - + struct vidcap *capture_device; struct capture_filter *capture_filter; struct timeval start_time; struct pdb *participants; - + char *decoder_mode; char *postprocess; - + uint32_t ts; struct tx *tx; struct display *display_device; @@ -163,7 +163,7 @@ struct state_uv { bool ipv6; const char *requested_mcast_if; unsigned requested_mtu; - + enum tx_protocol tx_protocol; struct state_audio *audio; @@ -173,6 +173,10 @@ struct state_uv { struct module *root_module; const char *requested_encryption; + + struct module receiver_mod; + + pthread_mutex_t init_lock; }; static int exit_status = EXIT_SUCCESS; @@ -385,7 +389,7 @@ static struct rtp **initialize_network(const char *addrs, int recv_port_base, free(tmp); tmp = strdup(addrs); - devices = (struct rtp **) + devices = (struct rtp **) malloc((required_connections + 1) * sizeof(struct rtp *)); for(index = 0, addr = strtok_r(tmp, ",", &saveptr); @@ -403,7 +407,7 @@ static struct rtp **initialize_network(const char *addrs, int recv_port_base, rtp_recv_callback, (void *)participants, use_ipv6); if (devices[index] != NULL) { - rtp_set_option(devices[index], RTP_OPT_WEAK_VALIDATION, + rtp_set_option(devices[index], RTP_OPT_WEAK_VALIDATION, TRUE); rtp_set_sdes(devices[index], rtp_my_ssrc(devices[index]), RTCP_SDES_TOOL, @@ -430,7 +434,7 @@ static struct rtp **initialize_network(const char *addrs, int recv_port_base, } if(devices != NULL) devices[index] = NULL; free(tmp); - + return devices; } @@ -558,7 +562,6 @@ struct rtp **change_tx_port(struct state_uv *uv, int tx_port) static void *receiver_thread(void *arg) { struct state_uv *uv = (struct state_uv *)arg; - struct module mod; struct pdb_e *cp; struct timeval curr_time; @@ -578,10 +581,6 @@ static void *receiver_thread(void *arg) initialize_video_decompress(); - module_init_default(&mod); - mod.cls = MODULE_CLASS_RECEIVER; - module_register(&mod, uv->root_module); - fr = 1; struct module *control_mod = get_module(get_root_module(uv->root_module), "control"); @@ -606,7 +605,7 @@ static void *receiver_thread(void *arg) /* to match the video capture rate, so the transmitter works. */ if (fr) { gettimeofday(&curr_time, NULL); - receiver_process_messages(uv, &mod); + receiver_process_messages(uv, &uv->receiver_mod); fr = 0; } @@ -618,7 +617,7 @@ static void *receiver_thread(void *arg) // timeout if (ret == FALSE) { // processing is needed here in case we are not receiving any data - receiver_process_messages(uv, &mod); + receiver_process_messages(uv, &uv->receiver_mod); //printf("Failed to receive data\n"); } total_received += ret; @@ -654,7 +653,7 @@ static void *receiver_thread(void *arg) (cp->playout_buffer, curr_time, decode_frame, cp->video_decoder_state)) { tiles_post++; /* we have data from all connections we need */ - if(tiles_post == uv->connections_count) + if(tiles_post == uv->connections_count) { tiles_post = 0; gettimeofday(&curr_time, NULL); @@ -726,8 +725,8 @@ static void *receiver_thread(void *arg) pdb_iter_done(&it); } - module_done(&mod); - + module_done(&uv->receiver_mod); + #ifdef SHARED_DECODER destroy_decoder(shared_decoder); #else @@ -793,6 +792,8 @@ static void *capture_thread(void *arg) goto compress_done; } + pthread_mutex_unlock(&uv->init_lock); + while (!should_exit_sender) { /* Capture and transmit video... */ tx_frame = vidcap_grab(uv->capture_device, &audio); @@ -910,7 +911,7 @@ int main(int argc, char *argv[]) struct control_state *control = NULL; int bitrate = 0; - + const char *audio_host = NULL; int audio_rx_port = -1, audio_tx_port = -1; @@ -921,7 +922,7 @@ int main(int argc, char *argv[]) char *requested_capture_filter = NULL; audio_codec_t audio_codec = AC_PCM; - + pthread_t receiver_thread_id, tx_thread_id; bool receiver_thread_started = false, @@ -1003,6 +1004,7 @@ int main(int argc, char *argv[]) init_root_module(&root_mod, uv); uv->root_module = &root_mod; + pthread_mutex_init(&uv->init_lock, NULL); perf_init(); perf_record(UVP_INIT, 0); @@ -1074,7 +1076,7 @@ int main(int argc, char *argv[]) sage_opts = optarg; break; case 'r': - audio_recv = optarg; + audio_recv = optarg; break; case 's': audio_send = optarg; @@ -1154,7 +1156,7 @@ int main(int argc, char *argv[]) case OPT_CUDA_DEVICE: #ifdef HAVE_CUDA if(strcmp("help", optarg) == 0) { - struct compress_state *compression; + struct compress_state *compression; int ret = compress_init(&root_mod, "JPEG:list_devices", &compression); if(ret >= 0) { if(ret == 0) { @@ -1229,7 +1231,7 @@ int main(int argc, char *argv[]) return EXIT_FAIL_USAGE; } } - + argc -= optind; argv += optind; @@ -1328,7 +1330,7 @@ int main(int argc, char *argv[]) if(ret != 0) { goto cleanup; } - + if(!audio_host) { audio_host = uv->requested_receiver; } @@ -1394,7 +1396,7 @@ int main(int argc, char *argv[]) #else printf("WARNING: System does not support real-time scheduling\n"); #endif /* HAVE_SCHED_SETSCHEDULER */ -#endif /* USE_RT */ +#endif /* USE_RT */ if (uv->tx_protocol == IHDTV) { #ifdef HAVE_IHDTV @@ -1524,7 +1526,7 @@ int main(int argc, char *argv[]) if(uv->tx_protocol == ULTRAGRID_RTP || uv->tx_protocol == SAGE) { /* following block only shows help (otherwise initialized in receiver thread */ - if((uv->postprocess && strstr(uv->postprocess, "help") != NULL) || + if((uv->postprocess && strstr(uv->postprocess, "help") != NULL) || (uv->decoder_mode && strstr(uv->decoder_mode, "help") != NULL)) { struct state_decoder *dec = decoder_init(uv->decoder_mode, uv->postprocess, NULL, uv->requested_encryption); @@ -1555,6 +1557,11 @@ int main(int argc, char *argv[]) } if(uv->mode & MODE_RECEIVER) { + // init module here so as it is capable of receiving messages + module_init_default(&uv->receiver_mod); + uv->receiver_mod.cls = MODULE_CLASS_RECEIVER; + module_register(&uv->receiver_mod, uv->root_module); + if (pthread_create (&receiver_thread_id, NULL, receiver_thread, (void *)uv) != 0) { @@ -1567,24 +1574,32 @@ int main(int argc, char *argv[]) } if(uv->mode & MODE_SENDER) { + pthread_mutex_lock(&uv->init_lock); if (pthread_create (&tx_thread_id, NULL, capture_thread, (void *) &root_mod) != 0) { perror("Unable to create capture thread!\n"); + pthread_mutex_unlock(&uv->init_lock); exit_uv(EXIT_FAILURE); goto cleanup; } else { + // wait for sender module initialization + pthread_mutex_lock(&uv->init_lock); + pthread_mutex_unlock(&uv->init_lock); tx_thread_started = true; } } } - + if(audio_get_display_flags(uv->audio)) { audio_register_put_callback(uv->audio, (void (*)(void *, struct audio_frame *)) display_put_audio_frame, uv->display_device); - audio_register_reconfigure_callback(uv->audio, (int (*)(void *, int, int, + audio_register_reconfigure_callback(uv->audio, (int (*)(void *, int, int, int)) display_reconfigure_audio, uv->display_device); } + // should be started after requested modules are able to respond after start + control_start(control); + if (strcmp("none", uv->requested_display) != 0) display_run(uv->display_device); @@ -1628,12 +1643,13 @@ cleanup: video_export_destroy(uv->video_exporter); - free(uv); free(export_dir); - + lib_common_done(); module_done(&root_mod); + pthread_mutex_destroy(&uv->init_lock); + free(uv); #if defined DEBUG && defined HAVE_LINUX muntrace();