diff --git a/src/audio/audio.c b/src/audio/audio.c index eba91659c..33b2216fb 100644 --- a/src/audio/audio.c +++ b/src/audio/audio.c @@ -144,7 +144,7 @@ static void *audio_sender_thread(void *arg); static void *audio_receiver_thread(void *arg); static struct rtp *initialize_audio_network(char *addr, int recv_port, int send_port, struct pdb *participants, bool use_ipv6, - char *mcast_if); + const char *mcast_if); static void audio_channel_map_usage(void); static void audio_scale_usage(void); @@ -181,11 +181,11 @@ static void audio_scale_usage(void) /** * take care that addrs can also be comma-separated list of addresses ! */ -struct state_audio * audio_cfg_init(struct module *parent, char *addrs, int recv_port, int send_port, +struct state_audio * audio_cfg_init(struct module *parent, const char *addrs, int recv_port, int send_port, const char *send_cfg, const char *recv_cfg, char *jack_cfg, char *fec_cfg, const char *encryption, char *audio_channel_map, const char *audio_scale, - bool echo_cancellation, bool use_ipv6, char *mcast_if, audio_codec_t audio_codec, + bool echo_cancellation, bool use_ipv6, const char *mcast_if, audio_codec_t audio_codec, int resample_to) { struct state_audio *s = NULL; @@ -435,7 +435,7 @@ void audio_done(struct state_audio *s) static struct rtp *initialize_audio_network(char *addr, int recv_port, int send_port, struct pdb *participants, bool use_ipv6, - char *mcast_if) // GiX + const char *mcast_if) // GiX { struct rtp *r; double rtcp_bw = 1024 * 512; // FIXME: something about 5% for rtcp is said in rfc diff --git a/src/audio/audio.h b/src/audio/audio.h index 8f32b2dd5..a33e9e7fb 100644 --- a/src/audio/audio.h +++ b/src/audio/audio.h @@ -118,11 +118,11 @@ typedef struct struct module; -struct state_audio * audio_cfg_init(struct module *parent, char *addrs, int recv_port, int send_port, +struct state_audio * audio_cfg_init(struct module *parent, const char *addrs, int recv_port, int send_port, const char *send_cfg, const char *recv_cfg, char *jack_cfg, char *fec_cfg, const char *encryption, char *audio_channel_map, const char *audio_scale, - bool echo_cancellation, bool use_ipv6, char *mcast_iface, audio_codec_t audio_codec, + bool echo_cancellation, bool use_ipv6, const char *mcast_iface, audio_codec_t audio_codec, int resample_to); void audio_finish(struct state_audio *s); void audio_done(struct state_audio *s); diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 33e52df1e..05da823e5 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -288,6 +288,16 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message) append_message_path(path, sizeof(path), path_sender); resp = send_message(s->root_module, path, (struct message *) msg); + } else if (prefix_matches(message, "receiver-port ")) { + struct msg_receiver *msg = + (struct msg_receiver *) + new_message(sizeof(struct msg_receiver)); + msg->new_rx_port = atoi(suffix(message, "receiver-port ")); + + enum module_class path_receiver[] = { MODULE_CLASS_RECEIVER, MODULE_CLASS_NONE }; + append_message_path(path, sizeof(path), path_receiver); + resp = + send_message(s->root_module, path, (struct message *) msg); } else if(prefix_matches(message, "fec ")) { struct msg_change_fec_data *msg = (struct msg_change_fec_data *) new_message(sizeof(struct msg_change_fec_data)); @@ -424,6 +434,20 @@ static fd_t connect_to_internal_channel(int local_port) return fd; } +static struct client *add_client(struct client *clients, int fd) { + struct client *new_client = (struct client *) + malloc(sizeof(struct client)); + new_client->fd = fd; + new_client->prev = NULL; + new_client->next = clients; + if (new_client->next) { + new_client->next->prev = new_client; + } + new_client->buff_len = 0; + + return new_client; +} + static void * control_thread(void *args) { struct control_state *s = (struct control_state *) args; @@ -432,25 +456,14 @@ static void * control_thread(void *args) s->internal_fd[1] = connect_to_internal_channel(s->local_port); if(s->connection_type == CLIENT) { - struct client *new_client = (struct client *) - malloc(sizeof(struct client)); - new_client->fd = s->socket_fd; - new_client->prev = NULL; - new_client->next = clients; - new_client->buff_len = 0; - clients = new_client; + clients = add_client(clients, s->socket_fd); } struct sockaddr_storage client_addr; socklen_t len; errno = 0; - struct client *new_client = (struct client *) malloc(sizeof(struct client)); - new_client->fd = s->internal_fd[1]; - new_client->prev = NULL; - new_client->next = clients; - new_client->buff_len = 0; - clients = new_client; + clients = add_client(clients, s->internal_fd[1]); bool should_exit = false; @@ -489,13 +502,8 @@ static void * control_thread(void *args) if(select(max_fd, &fds, NULL, NULL, timeout_ptr) >= 1) { if(s->connection_type == SERVER && FD_ISSET(s->socket_fd, &fds)) { - struct client *new_client = (struct client *) - malloc(sizeof(struct client)); - new_client->fd = accept(s->socket_fd, (struct sockaddr *) &client_addr, &len); - new_client->prev = NULL; - new_client->next = clients; - new_client->buff_len = 0; - clients = new_client; + int fd = accept(s->socket_fd, (struct sockaddr *) &client_addr, &len); + clients = add_client(clients, fd); } struct client *cur = clients; diff --git a/src/host.c b/src/host.c index dcc276afc..c32d39cc0 100644 --- a/src/host.c +++ b/src/host.c @@ -32,7 +32,7 @@ int uv_argc; char **uv_argv; char *export_dir = NULL; -char *sage_network_device = NULL; +const char *sage_receiver = NULL; extern void (*vidcap_free_devices_extrn)(); extern display_type_t *(*display_get_device_details_extrn)(int i); diff --git a/src/host.h b/src/host.h index 8e3eacc8a..41d73b8d0 100644 --- a/src/host.h +++ b/src/host.h @@ -74,7 +74,7 @@ extern unsigned int audio_capture_channels; extern unsigned int cuda_devices[]; extern unsigned int cuda_devices_count; -extern char *sage_network_device; +extern const char *sage_receiver; // for aggregate.c struct vidcap; diff --git a/src/main.c b/src/main.c index 46d8f032a..ccdc2a6f4 100644 --- a/src/main.c +++ b/src/main.c @@ -131,6 +131,9 @@ #define INITIAL_VIDEO_RECV_BUFFER_SIZE ((4*1920*1080)*110/100) #endif +#define MODE_SENDER 1 +#define MODE_RECEIVER 2 + struct state_uv { int recv_port_number; int send_port_number; @@ -139,10 +142,12 @@ struct state_uv { struct display *sage_tx_device; // == SAGE }; unsigned int connections_count; + + int mode; // sender or receiver struct vidcap *capture_device; struct capture_filter *capture_filter; - struct timeval start_time, curr_time; + struct timeval start_time; struct pdb *participants; char *decoder_mode; @@ -154,6 +159,9 @@ struct state_uv { char *requested_compression; const char *requested_display; const char *requested_capture; + const char *requested_receiver; + bool ipv6; + const char *requested_mcast_if; unsigned requested_mtu; enum tx_protocol tx_protocol; @@ -185,10 +193,6 @@ static long frame_begin[2]; // // prototypes // -static struct rtp **initialize_network(char *addrs, int recv_port_base, - int send_port_base, struct pdb *participants, bool use_ipv6, - char *mcast_if); - static void list_video_display_devices(void); static void list_video_capture_devices(void); static void display_buf_increase_warning(int size); @@ -373,9 +377,9 @@ static void display_buf_increase_warning(int size) } -static struct rtp **initialize_network(char *addrs, int recv_port_base, +static struct rtp **initialize_network(const char *addrs, int recv_port_base, int send_port_base, struct pdb *participants, bool use_ipv6, - char *mcast_if) + const char *mcast_if) { struct rtp **devices = NULL; double rtcp_bw = 5 * 1024 * 1024; /* FIXME */ @@ -402,7 +406,7 @@ static struct rtp **initialize_network(char *addrs, int recv_port_base, devices = (struct rtp **) malloc((required_connections + 1) * sizeof(struct rtp *)); - for(index = 0, addr = strtok_r(addrs, ",", &saveptr); + for(index = 0, addr = strtok_r(tmp, ",", &saveptr); index < required_connections; ++index, addr = strtok_r(NULL, ",", &saveptr), recv_port += 2, send_port += 2) { @@ -535,12 +539,31 @@ static void remove_display_from_decoders(struct state_uv *uv) { } } +static void receiver_process_messages(struct state_uv *uv, struct module *receiver_mod) +{ + struct msg_receiver *msg; + while ((msg = (struct msg_receiver *) check_message(receiver_mod))) { + assert(uv->mode == MODE_RECEIVER); // receiver only + destroy_devices(uv->network_devices); + uv->recv_port_number = msg->new_rx_port; + uv->network_devices = initialize_network(uv->requested_receiver, uv->recv_port_number, + uv->send_port_number, uv->participants, uv->ipv6, + uv->requested_mcast_if); + if (!uv->network_devices) { + fprintf(stderr, "Changing RX port failed!\n"); + abort(); + } + free_message((struct message *) msg); + } +} + static void *receiver_thread(void *arg) { struct state_uv *uv = (struct state_uv *)arg; + struct module mod; struct pdb_e *cp; - struct timeval timeout; + struct timeval curr_time; int fr; int ret; unsigned int tiles_post = 0; @@ -557,6 +580,10 @@ static void *receiver_thread(void *arg) initialize_video_decompress(); + module_init_default(&mod); + mod.cls = MODULE_CLASS_RECEIVER; + module_register(&mod, uv->root_module); + pthread_mutex_unlock(&uv->master_lock); fr = 1; @@ -572,16 +599,18 @@ static void *receiver_thread(void *arg) uint64_t total_received = 0ull; while (!should_exit_receiver) { + struct timeval timeout; /* Housekeeping and RTCP... */ - gettimeofday(&uv->curr_time, NULL); - uv->ts = tv_diff(uv->curr_time, uv->start_time) * 90000; - rtp_update(uv->network_devices[0], uv->curr_time); - rtp_send_ctrl(uv->network_devices[0], uv->ts, 0, uv->curr_time); + gettimeofday(&curr_time, NULL); + uv->ts = tv_diff(curr_time, uv->start_time) * 90000; + rtp_update(uv->network_devices[0], curr_time); + rtp_send_ctrl(uv->network_devices[0], uv->ts, 0, curr_time); /* Receive packets from the network... The timeout is adjusted */ /* to match the video capture rate, so the transmitter works. */ if (fr) { - gettimeofday(&uv->curr_time, NULL); + gettimeofday(&curr_time, NULL); + receiver_process_messages(uv, &mod); fr = 0; } @@ -590,11 +619,12 @@ static void *receiver_thread(void *arg) timeout.tv_usec = 10000; ret = rtp_recv_poll_r(uv->network_devices, &timeout, uv->ts); - /* - if (ret == FALSE) { - printf("Failed to receive data\n"); - } - */ + // timeout + if (ret == FALSE) { + // processing is needed here in case we are not receiving any data + receiver_process_messages(uv, &mod); + //printf("Failed to receive data\n"); + } total_received += ret; stats_update_int(stat_received, total_received); @@ -602,10 +632,10 @@ static void *receiver_thread(void *arg) pdb_iter_t it; cp = pdb_iter_init(uv->participants, &it); while (cp != NULL) { - if (tfrc_feedback_is_due(cp->tfrc_state, uv->curr_time)) { + if (tfrc_feedback_is_due(cp->tfrc_state, curr_time)) { debug_msg("tfrc rate %f\n", tfrc_feedback_txrate(cp->tfrc_state, - uv->curr_time)); + curr_time)); } if(cp->video_decoder_state == NULL) { @@ -625,13 +655,13 @@ static void *receiver_thread(void *arg) /* Decode and render video... */ if (pbuf_decode - (cp->playout_buffer, uv->curr_time, decode_frame, cp->video_decoder_state)) { + (cp->playout_buffer, curr_time, decode_frame, cp->video_decoder_state)) { tiles_post++; /* we have data from all connections we need */ if(tiles_post == uv->connections_count) { tiles_post = 0; - gettimeofday(&uv->curr_time, NULL); + gettimeofday(&curr_time, NULL); fr = 1; #if 0 display_put_frame(uv->display_device, @@ -640,7 +670,7 @@ static void *receiver_thread(void *arg) display_get_frame(uv->display_device); #endif } - last_tile_received = uv->curr_time; + last_tile_received = curr_time; uint32_t sender_ssrc = cp->ssrc; stats_update_int(stat_loss, rtp_compute_fract_lost(uv->network_devices[0], @@ -648,10 +678,10 @@ static void *receiver_thread(void *arg) } /* dual-link TIMEOUT - we won't wait for next tiles */ - if(tiles_post > 1 && tv_diff(uv->curr_time, last_tile_received) > + if(tiles_post > 1 && tv_diff(curr_time, last_tile_received) > 999999 / 59.94 / uv->connections_count) { tiles_post = 0; - gettimeofday(&uv->curr_time, NULL); + gettimeofday(&curr_time, NULL); fr = 1; #if 0 display_put_frame(uv->display_device, @@ -659,7 +689,7 @@ static void *receiver_thread(void *arg) cp->video_decoder_state->frame_buffer = display_get_frame(uv->display_device); #endif - last_tile_received = uv->curr_time; + last_tile_received = curr_time; } if(cp->video_decoder_state->decoded % 100 == 99) { @@ -694,11 +724,13 @@ static void *receiver_thread(void *arg) } - pbuf_remove(cp->playout_buffer, uv->curr_time); + pbuf_remove(cp->playout_buffer, curr_time); cp = pdb_iter_next(&it); } pdb_iter_done(&it); } + + module_done(&mod); #ifdef SHARED_DECODER destroy_decoder(shared_decoder); @@ -708,6 +740,7 @@ static void *receiver_thread(void *arg) remove_display_from_decoders(uv); #endif // SHARED_DECODER + // pass posioned pill to display display_put_frame(uv->display_device, NULL, PUTF_BLOCKING); stats_destroy(stat_loss); @@ -857,7 +890,6 @@ int main(int argc, char *argv[]) #if defined HAVE_SCHED_SETSCHEDULER && defined USE_RT struct sched_param sp; #endif - char *network_device = NULL; char *capture_cfg = NULL; char *display_cfg = NULL; const char *audio_recv = "none"; @@ -869,8 +901,6 @@ int main(int argc, char *argv[]) char *audio_scale = "mixauto"; bool echo_cancellation = false; - bool use_ipv6 = false; - char *mcast_if = NULL; bool should_export = false; char *export_opts = NULL; @@ -881,7 +911,7 @@ int main(int argc, char *argv[]) int bitrate = 0; - char *audio_host = NULL; + const char *audio_host = NULL; int audio_rx_port = -1, audio_tx_port = -1; struct module root_mod; @@ -1109,7 +1139,7 @@ int main(int argc, char *argv[]) } break; case '6': - use_ipv6 = true; + uv->ipv6 = true; break; case OPT_AUDIO_CHANNEL_MAP: audio_channel_map = optarg; @@ -1156,7 +1186,7 @@ int main(int argc, char *argv[]) return EXIT_FAIL_USAGE; #endif // HAVE_CUDA case OPT_MCAST_IF: - mcast_if = optarg; + uv->requested_mcast_if = optarg; break; case 'A': audio_host = optarg; @@ -1265,12 +1295,12 @@ int main(int argc, char *argv[]) } } else { if (argc == 0) { - network_device = strdup("localhost"); + uv->requested_receiver = "localhost"; } else { - network_device = (char *) argv[0]; + uv->requested_receiver = argv[0]; } if(uv->tx_protocol == SAGE) { - sage_network_device = network_device; + sage_receiver = uv->requested_receiver; } } @@ -1302,13 +1332,13 @@ int main(int argc, char *argv[]) } if(!audio_host) { - audio_host = network_device; + audio_host = uv->requested_receiver; } uv->audio = audio_cfg_init (&root_mod, audio_host, audio_rx_port, audio_tx_port, audio_send, audio_recv, jack_cfg, requested_audio_fec, uv->requested_encryption, audio_channel_map, - audio_scale, echo_cancellation, use_ipv6, mcast_if, + audio_scale, echo_cancellation, uv->ipv6, uv->requested_mcast_if, audio_codec, compressed_audio_sample_rate); free(requested_audio_fec); if(!uv->audio) @@ -1447,8 +1477,9 @@ int main(int argc, char *argv[]) #endif // HAVE_IHDTV } else if(uv->tx_protocol == ULTRAGRID_RTP) { if ((uv->network_devices = - initialize_network(network_device, uv->recv_port_number, - uv->send_port_number, uv->participants, use_ipv6, mcast_if)) + initialize_network(uv->requested_receiver, uv->recv_port_number, + uv->send_port_number, uv->participants, uv->ipv6, + uv->requested_mcast_if)) == NULL) { printf("Unable to open network\n"); exit_uv(EXIT_FAIL_NETWORK); @@ -1519,6 +1550,13 @@ int main(int argc, char *argv[]) } if (strcmp("none", uv->requested_display) != 0) { + uv->mode |= MODE_RECEIVER; + } + if (strcmp("none", uv->requested_capture) != 0) { + uv->mode |= MODE_SENDER; + } + + if(uv->mode & MODE_RECEIVER) { pthread_mutex_lock(&uv->master_lock); if (pthread_create (&receiver_thread_id, NULL, receiver_thread, @@ -1531,7 +1569,7 @@ int main(int argc, char *argv[]) } } - if (strcmp("none", uv->requested_capture) != 0) { + if(uv->mode & MODE_SENDER) { pthread_mutex_lock(&uv->master_lock); if (pthread_create (&tx_thread_id, NULL, compress_thread, diff --git a/src/messaging.h b/src/messaging.h index c111e78f2..56dd71c34 100644 --- a/src/messaging.h +++ b/src/messaging.h @@ -51,6 +51,11 @@ struct msg_sender { char receiver[128]; }; +struct msg_receiver { + struct message m; + uint16_t new_rx_port; +}; + struct msg_change_fec_data { struct message m; enum tx_media_type media_type; diff --git a/src/module.c b/src/module.c index eec393edd..10648b94b 100644 --- a/src/module.c +++ b/src/module.c @@ -122,6 +122,7 @@ const char *module_class_name_pairs[] = { [MODULE_CLASS_COMPRESS] = "compress", [MODULE_CLASS_DATA] = "data", [MODULE_CLASS_SENDER] = "sender", + [MODULE_CLASS_RECEIVER] = "receiver", [MODULE_CLASS_TX] = "transmit", [MODULE_CLASS_AUDIO] = "audio", [MODULE_CLASS_CONTROL] = "control", diff --git a/src/module.h b/src/module.h index 4776cbfa0..468d4e696 100644 --- a/src/module.h +++ b/src/module.h @@ -68,6 +68,7 @@ enum module_class { MODULE_CLASS_COMPRESS, MODULE_CLASS_DATA, MODULE_CLASS_SENDER, + MODULE_CLASS_RECEIVER, MODULE_CLASS_TX, MODULE_CLASS_AUDIO, MODULE_CLASS_CONTROL, diff --git a/src/rtp/rtp.c b/src/rtp/rtp.c index 518480bbe..84c4dffb7 100644 --- a/src/rtp/rtp.c +++ b/src/rtp/rtp.c @@ -1047,7 +1047,7 @@ struct rtp *rtp_init(const char *addr, * Returns: An opaque session identifier to be used in future calls to * the RTP library functions, or NULL on failure. */ -struct rtp *rtp_init_if(const char *addr, char *iface, +struct rtp *rtp_init_if(const char *addr, const char *iface, uint16_t rx_port, uint16_t tx_port, int ttl, double rtcp_bw, int tfrc_on, rtp_callback callback, uint8_t * userdata, @@ -3814,6 +3814,14 @@ void rtp_flush_recv_buf(struct rtp *session) udp_flush_recv_buf(session->rtp_socket); } +/** + * rtp_change_dest: + * Changes RTP destination address. + * There must be only one sending thread. + * @session: The RTP Session. + * @addr: New Receiver Address. + * Returns TRUE if ok, FALSE if not + */ int rtp_change_dest(struct rtp *session, const char *addr) { return udp_change_dest(session->rtp_socket, addr); diff --git a/src/rtp/rtp.h b/src/rtp/rtp.h index 7b16f89f8..fd94f6499 100644 --- a/src/rtp/rtp.h +++ b/src/rtp/rtp.h @@ -219,7 +219,7 @@ rtp_t rtp_init(const char *addr, rtp_callback callback, uint8_t *userdata, bool use_ipv6); -rtp_t rtp_init_if(const char *addr, char *iface, +rtp_t rtp_init_if(const char *addr, const char *iface, uint16_t rx_port, uint16_t tx_port, int ttl, double rtcp_bw, int tfrc_on, @@ -283,10 +283,6 @@ int rtp_set_recv_buf(struct rtp *session, int bufsize); int rtp_set_send_buf(struct rtp *session, int bufsize); void rtp_flush_recv_buf(struct rtp *session); -/** - * @retval TRUE if changed successfully - * @retval FALSE if not - */ int rtp_change_dest(struct rtp *session, const char *addr); uint64_t rtp_get_bytes_sent(struct rtp *session); int rtp_compute_fract_lost(struct rtp *session, uint32_t ssrc); diff --git a/src/video_display/sage.cpp b/src/video_display/sage.cpp index c366af9c2..35d6f3841 100644 --- a/src/video_display/sage.cpp +++ b/src/video_display/sage.cpp @@ -169,7 +169,7 @@ void *display_sage_init(char *fmt, unsigned int flags) assert(s != NULL); s->confName = NULL; - s->fsIP = sage_network_device; // NULL unless in SAGE TX mode + s->fsIP = sage_receiver; // NULL unless in SAGE TX mode s->requestedDisplayCodec = (codec_t) -1; if(fmt) { @@ -224,7 +224,7 @@ void *display_sage_init(char *fmt, unsigned int flags) } } - if(sage_network_device == NULL) { + if(sage_receiver == NULL) { // read config file only if we are in dispaly mode (not sender mode) struct stat sb; if(s->confName) {