diff --git a/ultragrid/src/main.c b/ultragrid/src/main.c index 830eed1a4..cd532a413 100644 --- a/ultragrid/src/main.c +++ b/ultragrid/src/main.c @@ -54,6 +54,7 @@ #include #include #include +#include #include "config.h" #include "config_unix.h" #include "config_win32.h" @@ -112,6 +113,15 @@ struct state_uv { int use_ihdtv_protocol; struct state_audio *audio; + + volatile unsigned int has_item_to_send:1; + volatile unsigned int sender_waiting:1; + volatile unsigned int grabber_waiting:1; + pthread_mutex_t sender_lock; + pthread_cond_t grabber_cv; + pthread_cond_t sender_cv; + + struct video_frame * volatile tx_frame; }; long packet_rate = 13600; @@ -136,6 +146,7 @@ struct display *initialize_video_display(const char *requested_display, char *fmt, unsigned int flags); struct vidcap *initialize_video_capture(const char *requested_capture, char *fmt, unsigned int flags); +static void sender_finish(struct state_uv *uv); #ifndef WIN32 static void signal_handler(int signal) @@ -153,8 +164,9 @@ void _exit_uv(int status) { wait_to_finish = TRUE; should_exit = TRUE; if(!threads_joined) { - if(uv_state->capture_device) + if(uv_state->capture_device) { vidcap_finish(uv_state->capture_device); + } if(uv_state->display_device) display_finish(uv_state->display_device); if(uv_state->audio) @@ -520,21 +532,32 @@ static void *receiver_thread(void *arg) return 0; } -static void *sender_thread(void *arg) -{ - struct state_uv *uv = (struct state_uv *)arg; +static void sender_finish(struct state_uv *uv) { + pthread_mutex_lock(&uv->sender_lock); - struct video_frame *tx_frame, *splitted_frames = NULL; - struct audio_frame *audio; - //struct video_frame *splitted_frames = NULL; + if(uv->sender_waiting) { + uv->has_item_to_send = TRUE; + pthread_cond_signal(&uv->sender_cv); + } + + pthread_mutex_unlock(&uv->sender_lock); + + while(uv->sender_waiting) + ; + + pthread_mutex_lock(&uv->sender_lock); + uv->has_item_to_send = FALSE; + if(uv->grabber_waiting) { + pthread_cond_signal(&uv->grabber_cv); + } + pthread_mutex_unlock(&uv->sender_lock); + +} + +static void *sender_thread(void *arg) { + struct state_uv *uv = (struct state_uv *)arg; + struct video_frame *splitted_frames = NULL; int tile_y_count; - - struct compress_state *compression; - compression = compress_init(uv->requested_compression); - if(compression == NULL) { - fprintf(stderr, "Error initializing compression.\n"); - exit_uv(0); - } tile_y_count = uv->connections_count; @@ -544,6 +567,85 @@ static void *sender_thread(void *arg) splitted_frames = vf_alloc(tile_y_count); } + while(!should_exit) { + pthread_mutex_lock(&uv->sender_lock); + while(!uv->has_item_to_send) { + uv->sender_waiting = TRUE; + pthread_cond_wait(&uv->sender_cv, &uv->sender_lock); + uv->sender_waiting = FALSE; + } + struct video_frame *tx_frame = uv->tx_frame; + + if(should_exit) { + uv->has_item_to_send = FALSE; + pthread_mutex_unlock(&uv->sender_lock); + goto exit; + } + + pthread_mutex_unlock(&uv->sender_lock); + + + if(uv->connections_count == 1) { /* normal case - only one connection */ + tx_send(uv->tx, tx_frame, + uv->network_devices[0]); + } else { /* split */ + int i; + + //assert(frame_count == 1); + vf_split_horizontal(splitted_frames, tx_frame, + tile_y_count); + for (i = 0; i < tile_y_count; ++i) { + tx_send_tile(uv->tx, splitted_frames, i, + uv->network_devices[i]); + } + } + + pthread_mutex_lock(&uv->sender_lock); + + uv->has_item_to_send = FALSE; + + if(uv->grabber_waiting) { + pthread_cond_signal(&uv->grabber_cv); + } + pthread_mutex_unlock(&uv->sender_lock); + } + +exit: + vf_free(splitted_frames); + + + + return NULL; +} + +static void *grabber_thread(void *arg) +{ + struct state_uv *uv = (struct state_uv *)arg; + + struct video_frame *tx_frame; + struct audio_frame *audio; + //struct video_frame *splitted_frames = NULL; + pthread_t sender_thread_id; + int i = 0; + + struct compress_state *compression; + compression = compress_init(uv->requested_compression); + if(compression == NULL) { + fprintf(stderr, "Error initializing compression.\n"); + exit_uv(0); + goto compress_done; + } + + if (pthread_create + (&sender_thread_id, NULL, sender_thread, + (void *)uv) != 0) { + perror("Unable to create sender thread!\n"); + exit_uv(EXIT_FAILURE); + goto join_thread; + } + + + while (!should_exit) { /* Capture and transmit video... */ tx_frame = vidcap_grab(uv->capture_device, &audio); @@ -552,27 +654,36 @@ static void *sender_thread(void *arg) audio_sdi_send(uv->audio, audio); } //TODO: Unghetto this - tx_frame = compress_frame(compression, tx_frame); + tx_frame = compress_frame(compression, tx_frame, i); if(!tx_frame) continue; - if(uv->connections_count == 1) { /* normal case - only one connection */ - tx_send(uv->tx, tx_frame, - uv->network_devices[0]); - } else { /* split */ - int i; - //assert(frame_count == 1); - vf_split_horizontal(splitted_frames, tx_frame, - tile_y_count); - for (i = 0; i < tile_y_count; ++i) { - tx_send_tile(uv->tx, splitted_frames, i, - uv->network_devices[i]); - } + i = (i + 1) % 2; + + pthread_mutex_lock(&uv->sender_lock); + while(uv->has_item_to_send) { + uv->grabber_waiting = TRUE; + pthread_cond_wait(&uv->grabber_cv, &uv->sender_lock); + uv->grabber_waiting = FALSE; } + + uv->tx_frame = tx_frame; + + uv->has_item_to_send = TRUE; + if(uv->sender_waiting) { + pthread_cond_signal(&uv->sender_cv); + } + pthread_mutex_unlock(&uv->sender_lock); } } - vf_free(splitted_frames); - + + + +join_thread: + sender_finish(uv); + pthread_join(sender_thread_id, NULL); + +compress_done: compress_done(compression); return NULL; @@ -598,7 +709,7 @@ int main(int argc, char *argv[]) struct state_uv *uv; int ch; - pthread_t receiver_thread_id, sender_thread_id; + pthread_t receiver_thread_id, grabber_thread_id; unsigned vidcap_flags = 0, display_flags = 0; @@ -648,6 +759,13 @@ int main(int argc, char *argv[]) uv->network_devices = NULL; uv->port_number = PORT_BASE; + uv->has_item_to_send = FALSE; + uv->sender_waiting = FALSE; + uv->grabber_waiting = FALSE; + pthread_mutex_init(&uv->sender_lock, NULL); + pthread_cond_init(&uv->grabber_cv, NULL); + pthread_cond_init(&uv->sender_cv, NULL); + perf_init(); perf_record(UVP_INIT, 0); @@ -812,6 +930,7 @@ int main(int argc, char *argv[]) #endif /* USE_RT */ if (uv->use_ihdtv_protocol) { +#if 0 ihdtv_connection tx_connection, rx_connection; printf("Initializing ihdtv protocol\n"); @@ -882,7 +1001,7 @@ int main(int argc, char *argv[]) while (!should_exit) sleep(1); - +#endif } else { if ((uv->network_devices = initialize_network(network_device, uv->port_number, uv->participants)) == NULL) { @@ -928,6 +1047,7 @@ int main(int argc, char *argv[]) exit_uv(EXIT_SUCCESS); goto cleanup_wait_display; } + if (strcmp("none", uv->requested_display) != 0) { if (pthread_create @@ -938,13 +1058,14 @@ int main(int argc, char *argv[]) goto cleanup_wait_display; } } + if (strcmp("none", uv->requested_capture) != 0) { if (pthread_create - (&sender_thread_id, NULL, sender_thread, + (&grabber_thread_id, NULL, grabber_thread, (void *)uv) != 0) { perror("Unable to create capture thread!\n"); exit_uv(EXIT_FAILURE); - goto cleanup_wait_display; + goto cleanup_wait_capture; } } } @@ -965,7 +1086,7 @@ cleanup_wait_display: cleanup_wait_capture: if (strcmp("none", uv->requested_capture) != 0) - pthread_join(sender_thread_id, NULL); + pthread_join(grabber_thread_id, NULL); cleanup_wait_audio: /* also wait for audio threads */ diff --git a/ultragrid/src/video_compress.c b/ultragrid/src/video_compress.c index 60f9e3a73..40fc252fa 100644 --- a/ultragrid/src/video_compress.c +++ b/ultragrid/src/video_compress.c @@ -218,10 +218,10 @@ const char *get_compress_name(struct compress_state *s) return NULL; } -struct video_frame *compress_frame(struct compress_state *s, struct video_frame *frame) +struct video_frame *compress_frame(struct compress_state *s, struct video_frame *frame, int buffer_index) { if(s) - return s->handle->compress(s->state, frame); + return s->handle->compress(s->state, frame, buffer_index); else return NULL; } diff --git a/ultragrid/src/video_compress.h b/ultragrid/src/video_compress.h index 8b8cf7549..77a40938b 100644 --- a/ultragrid/src/video_compress.h +++ b/ultragrid/src/video_compress.h @@ -65,7 +65,7 @@ typedef void *(*compress_init_t)(char *cfg); * @param uncompressed frame * @return compressed frame */ -typedef struct video_frame * (*compress_compress_t)(void *state, struct video_frame *frame); +typedef struct video_frame * (*compress_compress_t)(void *state, struct video_frame *frame, int buffer_index); /** * Cleanup function */ @@ -74,7 +74,7 @@ typedef void (*compress_done_t)(void *); void show_compress_help(void); struct compress_state *compress_init(char *config_string); const char *get_compress_name(struct compress_state *); -struct video_frame *compress_frame(struct compress_state *, struct video_frame*); +struct video_frame *compress_frame(struct compress_state *, struct video_frame*, int buffer_index); void compress_done(struct compress_state *); #endif /* __video_compress_h */ diff --git a/ultragrid/src/video_compress/dxt_glsl.c b/ultragrid/src/video_compress/dxt_glsl.c index 7f97cdc8a..aba52c0d2 100644 --- a/ultragrid/src/video_compress/dxt_glsl.c +++ b/ultragrid/src/video_compress/dxt_glsl.c @@ -261,7 +261,7 @@ void * dxt_glsl_compress_init(char * opts) return s; } -struct video_frame * dxt_glsl_compress(void *arg, struct video_frame * tx) +struct video_frame * dxt_glsl_compress(void *arg, struct video_frame * tx, int buffer_idx) { struct video_compress *s = (struct video_compress *) arg; int i; diff --git a/ultragrid/src/video_compress/dxt_glsl.h b/ultragrid/src/video_compress/dxt_glsl.h index 7ea8d561b..0b6cb34b7 100644 --- a/ultragrid/src/video_compress/dxt_glsl.h +++ b/ultragrid/src/video_compress/dxt_glsl.h @@ -48,5 +48,5 @@ #include "video_codec.h" void * dxt_glsl_compress_init(char * opts); -struct video_frame * dxt_glsl_compress(void *args, struct video_frame * tx); +struct video_frame * dxt_glsl_compress(void *args, struct video_frame * tx, int buffer_index); void dxt_glsl_compress_done(void *args); diff --git a/ultragrid/src/video_compress/fastdxt.c b/ultragrid/src/video_compress/fastdxt.c index 690e156b1..407c0ff22 100644 --- a/ultragrid/src/video_compress/fastdxt.c +++ b/ultragrid/src/video_compress/fastdxt.c @@ -272,7 +272,7 @@ void *fastdxt_init(const char *num_threads_str) return compress; } -struct video_frame * fastdxt_compress(void *args, struct video_frame *tx) +struct video_frame * fastdxt_compress(void *args, struct video_frame *tx, int buffer_idx) { /* This thread will be called from main.c and handle the compress_threads */ struct video_compress *compress = (struct video_compress *)args; diff --git a/ultragrid/src/video_compress/fastdxt.h b/ultragrid/src/video_compress/fastdxt.h index e7bb84ce1..124fd60e1 100644 --- a/ultragrid/src/video_compress/fastdxt.h +++ b/ultragrid/src/video_compress/fastdxt.h @@ -48,5 +48,5 @@ #include "video_codec.h" void * fastdxt_init(const char *num_threads_str); -struct video_frame * fastdxt_compress(void *args, struct video_frame * tx); +struct video_frame * fastdxt_compress(void *args, struct video_frame * tx, int buffer_index); void fastdxt_done(void *args); diff --git a/ultragrid/src/video_compress/jpeg.c b/ultragrid/src/video_compress/jpeg.c index c910dc745..69c1a5051 100644 --- a/ultragrid/src/video_compress/jpeg.c +++ b/ultragrid/src/video_compress/jpeg.c @@ -61,7 +61,7 @@ struct compress_jpeg_state { struct gpujpeg_encoder *encoder; struct gpujpeg_parameters encoder_param; - struct video_frame *out; + struct video_frame *out[2]; decoder_t decoder; char *decoded; @@ -74,8 +74,11 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra static int configure_with(struct compress_jpeg_state *s, struct video_frame *frame) { unsigned int x; + int frame_idx; - s->out = vf_alloc(frame->tile_count); + for (frame_idx = 0; frame_idx < 2; frame_idx++) { + s->out[frame_idx] = vf_alloc(frame->tile_count); + } for (x = 0; x < frame->tile_count; ++x) { if (vf_get_tile(frame, x)->width != vf_get_tile(frame, 0)->width || @@ -84,14 +87,18 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra exit_uv(129); return FALSE; } - - vf_get_tile(s->out, x)->width = vf_get_tile(frame, 0)->width; - vf_get_tile(s->out, x)->height = vf_get_tile(frame, 0)->height; } - s->out->interlacing = frame->interlacing; - s->out->fps = frame->fps; - s->out->color_spec = s->color_spec; + for (frame_idx = 0; frame_idx < 2; frame_idx++) { + for (x = 0; x < frame->tile_count; ++x) { + vf_get_tile(s->out[frame_idx], x)->width = vf_get_tile(frame, 0)->width; + vf_get_tile(s->out[frame_idx], x)->height = vf_get_tile(frame, 0)->height; + } + s->out[frame_idx]->interlacing = frame->interlacing; + s->out[frame_idx]->fps = frame->fps; + s->out[frame_idx]->color_spec = s->color_spec; + s->out[frame_idx]->color_spec = JPEG; + } switch (frame->color_spec) { case RGB: @@ -158,12 +165,11 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra } - s->out->color_spec = JPEG; struct gpujpeg_image_parameters param_image; gpujpeg_image_set_default_parameters(¶m_image); - param_image.width = s->out->tiles[0].width; - param_image.height = s->out->tiles[0].height; + param_image.width = s->out[0]->tiles[0].width; + param_image.height = s->out[0]->tiles[0].height; /* * IMPORTANT: @@ -186,10 +192,12 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra s->encoder = gpujpeg_encoder_create(&s->encoder_param, ¶m_image); - for (x = 0; x < frame->tile_count; ++x) { - vf_get_tile(s->out, x)->data = (char *) malloc(s->out->tiles[0].width * s->out->tiles[0].height * 3); - vf_get_tile(s->out, x)->linesize = s->out->tiles[0].width * (param_image.color_space == GPUJPEG_RGB ? 3 : 2); + for (frame_idx = 0; frame_idx < 2; frame_idx++) { + for (x = 0; x < frame->tile_count; ++x) { + vf_get_tile(s->out[frame_idx], x)->data = (char *) malloc(s->out[frame_idx]->tiles[0].width * s->out[frame_idx]->tiles[0].height * 3); + vf_get_tile(s->out[frame_idx], x)->linesize = s->out[frame_idx]->tiles[0].width * (param_image.color_space == GPUJPEG_RGB ? 3 : 2); + } } if(!s->encoder) { @@ -198,17 +206,20 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra return FALSE; } - s->decoded = malloc(4 * s->out->tiles[0].width * s->out->tiles[0].height); + s->decoded = malloc(4 * s->out[0]->tiles[0].width * s->out[0]->tiles[0].height); return TRUE; } void * jpeg_compress_init(char * opts) { struct compress_jpeg_state *s; + int frame_idx; s = (struct compress_jpeg_state *) malloc(sizeof(struct compress_jpeg_state)); - s->out = NULL; + for (frame_idx = 0; frame_idx < 2; frame_idx++) { + s->out[frame_idx] = NULL; + } s->decoded = NULL; if(opts && strcmp(opts, "help") == 0) { @@ -258,11 +269,12 @@ void * jpeg_compress_init(char * opts) return s; } -struct video_frame * jpeg_compress(void *arg, struct video_frame * tx) +struct video_frame * jpeg_compress(void *arg, struct video_frame * tx, int buffer_idx) { struct compress_jpeg_state *s = (struct compress_jpeg_state *) arg; int i; unsigned char *line1, *line2; + struct video_frame *out; unsigned int x; @@ -273,9 +285,11 @@ struct video_frame * jpeg_compress(void *arg, struct video_frame * tx) return NULL; } + out = s->out[buffer_idx]; + for (x = 0; x < tx->tile_count; ++x) { struct tile *in_tile = vf_get_tile(tx, x); - struct tile *out_tile = vf_get_tile(s->out, x); + struct tile *out_tile = vf_get_tile(out, x); line1 = (unsigned char *) in_tile->data; line2 = (unsigned char *) s->decoded; @@ -288,7 +302,7 @@ struct video_frame * jpeg_compress(void *arg, struct video_frame * tx) } line1 = (unsigned char *) out_tile->data + (in_tile->height - 1) * out_tile->linesize; - for( ; i < s->out->tiles[0].height; ++i) { + for( ; i < (int) out->tiles[0].height; ++i) { memcpy(line2, line1, out_tile->linesize); line2 += out_tile->linesize; } @@ -313,16 +327,23 @@ struct video_frame * jpeg_compress(void *arg, struct video_frame * tx) memcpy(out_tile->data, compressed, size); } - return s->out; + return out; } void jpeg_compress_done(void *arg) { struct compress_jpeg_state *s = (struct compress_jpeg_state *) arg; + int frame_idx; - if(s->out) - free(s->out->tiles[0].data); - vf_free(s->out); + for (frame_idx = 0; frame_idx < 2; frame_idx++) { + if(s->out[frame_idx]) { + int x; + for (x = 0; x < s->out[frame_idx]->tile_count; ++x) { + free(s->out[frame_idx]->tiles[x].data); + } + } + vf_free(s->out[frame_idx]); + } if(s->encoder) gpujpeg_encoder_destroy(s->encoder); diff --git a/ultragrid/src/video_compress/jpeg.h b/ultragrid/src/video_compress/jpeg.h index 4c68191e6..e160b3ccc 100644 --- a/ultragrid/src/video_compress/jpeg.h +++ b/ultragrid/src/video_compress/jpeg.h @@ -48,5 +48,5 @@ #include "video.h" void * jpeg_compress_init(char * opts); -struct video_frame * jpeg_compress(void *args, struct video_frame * tx); +struct video_frame * jpeg_compress(void *args, struct video_frame * tx, int buffer_index); void jpeg_compress_done(void *args); diff --git a/ultragrid/src/video_compress/none.c b/ultragrid/src/video_compress/none.c index e1e4b8001..eb802dab9 100644 --- a/ultragrid/src/video_compress/none.c +++ b/ultragrid/src/video_compress/none.c @@ -72,7 +72,7 @@ void * none_compress_init(char * opts) return s; } -struct video_frame * none_compress(void *arg, struct video_frame * tx) +struct video_frame * none_compress(void *arg, struct video_frame * tx, int buffer_idx) { struct none_video_compress *s = (struct none_video_compress *) arg; diff --git a/ultragrid/src/video_compress/none.h b/ultragrid/src/video_compress/none.h index 439327f58..09596d67b 100644 --- a/ultragrid/src/video_compress/none.h +++ b/ultragrid/src/video_compress/none.h @@ -50,5 +50,5 @@ struct gl_context; void * none_compress_init(char * opts); -struct video_frame * none_compress(void *args, struct video_frame * tx); +struct video_frame * none_compress(void *args, struct video_frame * tx, int buffer_index); void none_compress_done(void *args);