Send data in separate thread from compress/grab

Warning: currently works correctly only for JPEG.
For others there is need to add double-buffering (compression and
capture drivers)
This commit is contained in:
Martin Pulec
2012-05-11 01:24:43 +02:00
parent af9ae73595
commit e9670b5c69
11 changed files with 210 additions and 68 deletions

View File

@@ -54,6 +54,7 @@
#include <string.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include "config.h"
#include "config_unix.h"
#include "config_win32.h"
@@ -112,6 +113,15 @@ struct state_uv {
int use_ihdtv_protocol;
struct state_audio *audio;
volatile unsigned int has_item_to_send:1;
volatile unsigned int sender_waiting:1;
volatile unsigned int grabber_waiting:1;
pthread_mutex_t sender_lock;
pthread_cond_t grabber_cv;
pthread_cond_t sender_cv;
struct video_frame * volatile tx_frame;
};
long packet_rate = 13600;
@@ -136,6 +146,7 @@ struct display *initialize_video_display(const char *requested_display,
char *fmt, unsigned int flags);
struct vidcap *initialize_video_capture(const char *requested_capture,
char *fmt, unsigned int flags);
static void sender_finish(struct state_uv *uv);
#ifndef WIN32
static void signal_handler(int signal)
@@ -153,8 +164,9 @@ void _exit_uv(int status) {
wait_to_finish = TRUE;
should_exit = TRUE;
if(!threads_joined) {
if(uv_state->capture_device)
if(uv_state->capture_device) {
vidcap_finish(uv_state->capture_device);
}
if(uv_state->display_device)
display_finish(uv_state->display_device);
if(uv_state->audio)
@@ -520,21 +532,32 @@ static void *receiver_thread(void *arg)
return 0;
}
static void *sender_thread(void *arg)
{
struct state_uv *uv = (struct state_uv *)arg;
static void sender_finish(struct state_uv *uv) {
pthread_mutex_lock(&uv->sender_lock);
struct video_frame *tx_frame, *splitted_frames = NULL;
struct audio_frame *audio;
//struct video_frame *splitted_frames = NULL;
if(uv->sender_waiting) {
uv->has_item_to_send = TRUE;
pthread_cond_signal(&uv->sender_cv);
}
pthread_mutex_unlock(&uv->sender_lock);
while(uv->sender_waiting)
;
pthread_mutex_lock(&uv->sender_lock);
uv->has_item_to_send = FALSE;
if(uv->grabber_waiting) {
pthread_cond_signal(&uv->grabber_cv);
}
pthread_mutex_unlock(&uv->sender_lock);
}
static void *sender_thread(void *arg) {
struct state_uv *uv = (struct state_uv *)arg;
struct video_frame *splitted_frames = NULL;
int tile_y_count;
struct compress_state *compression;
compression = compress_init(uv->requested_compression);
if(compression == NULL) {
fprintf(stderr, "Error initializing compression.\n");
exit_uv(0);
}
tile_y_count = uv->connections_count;
@@ -544,6 +567,85 @@ static void *sender_thread(void *arg)
splitted_frames = vf_alloc(tile_y_count);
}
while(!should_exit) {
pthread_mutex_lock(&uv->sender_lock);
while(!uv->has_item_to_send) {
uv->sender_waiting = TRUE;
pthread_cond_wait(&uv->sender_cv, &uv->sender_lock);
uv->sender_waiting = FALSE;
}
struct video_frame *tx_frame = uv->tx_frame;
if(should_exit) {
uv->has_item_to_send = FALSE;
pthread_mutex_unlock(&uv->sender_lock);
goto exit;
}
pthread_mutex_unlock(&uv->sender_lock);
if(uv->connections_count == 1) { /* normal case - only one connection */
tx_send(uv->tx, tx_frame,
uv->network_devices[0]);
} else { /* split */
int i;
//assert(frame_count == 1);
vf_split_horizontal(splitted_frames, tx_frame,
tile_y_count);
for (i = 0; i < tile_y_count; ++i) {
tx_send_tile(uv->tx, splitted_frames, i,
uv->network_devices[i]);
}
}
pthread_mutex_lock(&uv->sender_lock);
uv->has_item_to_send = FALSE;
if(uv->grabber_waiting) {
pthread_cond_signal(&uv->grabber_cv);
}
pthread_mutex_unlock(&uv->sender_lock);
}
exit:
vf_free(splitted_frames);
return NULL;
}
static void *grabber_thread(void *arg)
{
struct state_uv *uv = (struct state_uv *)arg;
struct video_frame *tx_frame;
struct audio_frame *audio;
//struct video_frame *splitted_frames = NULL;
pthread_t sender_thread_id;
int i = 0;
struct compress_state *compression;
compression = compress_init(uv->requested_compression);
if(compression == NULL) {
fprintf(stderr, "Error initializing compression.\n");
exit_uv(0);
goto compress_done;
}
if (pthread_create
(&sender_thread_id, NULL, sender_thread,
(void *)uv) != 0) {
perror("Unable to create sender thread!\n");
exit_uv(EXIT_FAILURE);
goto join_thread;
}
while (!should_exit) {
/* Capture and transmit video... */
tx_frame = vidcap_grab(uv->capture_device, &audio);
@@ -552,27 +654,36 @@ static void *sender_thread(void *arg)
audio_sdi_send(uv->audio, audio);
}
//TODO: Unghetto this
tx_frame = compress_frame(compression, tx_frame);
tx_frame = compress_frame(compression, tx_frame, i);
if(!tx_frame)
continue;
if(uv->connections_count == 1) { /* normal case - only one connection */
tx_send(uv->tx, tx_frame,
uv->network_devices[0]);
} else { /* split */
int i;
//assert(frame_count == 1);
vf_split_horizontal(splitted_frames, tx_frame,
tile_y_count);
for (i = 0; i < tile_y_count; ++i) {
tx_send_tile(uv->tx, splitted_frames, i,
uv->network_devices[i]);
}
i = (i + 1) % 2;
pthread_mutex_lock(&uv->sender_lock);
while(uv->has_item_to_send) {
uv->grabber_waiting = TRUE;
pthread_cond_wait(&uv->grabber_cv, &uv->sender_lock);
uv->grabber_waiting = FALSE;
}
uv->tx_frame = tx_frame;
uv->has_item_to_send = TRUE;
if(uv->sender_waiting) {
pthread_cond_signal(&uv->sender_cv);
}
pthread_mutex_unlock(&uv->sender_lock);
}
}
vf_free(splitted_frames);
join_thread:
sender_finish(uv);
pthread_join(sender_thread_id, NULL);
compress_done:
compress_done(compression);
return NULL;
@@ -598,7 +709,7 @@ int main(int argc, char *argv[])
struct state_uv *uv;
int ch;
pthread_t receiver_thread_id, sender_thread_id;
pthread_t receiver_thread_id, grabber_thread_id;
unsigned vidcap_flags = 0,
display_flags = 0;
@@ -648,6 +759,13 @@ int main(int argc, char *argv[])
uv->network_devices = NULL;
uv->port_number = PORT_BASE;
uv->has_item_to_send = FALSE;
uv->sender_waiting = FALSE;
uv->grabber_waiting = FALSE;
pthread_mutex_init(&uv->sender_lock, NULL);
pthread_cond_init(&uv->grabber_cv, NULL);
pthread_cond_init(&uv->sender_cv, NULL);
perf_init();
perf_record(UVP_INIT, 0);
@@ -812,6 +930,7 @@ int main(int argc, char *argv[])
#endif /* USE_RT */
if (uv->use_ihdtv_protocol) {
#if 0
ihdtv_connection tx_connection, rx_connection;
printf("Initializing ihdtv protocol\n");
@@ -882,7 +1001,7 @@ int main(int argc, char *argv[])
while (!should_exit)
sleep(1);
#endif
} else {
if ((uv->network_devices =
initialize_network(network_device, uv->port_number, uv->participants)) == NULL) {
@@ -928,6 +1047,7 @@ int main(int argc, char *argv[])
exit_uv(EXIT_SUCCESS);
goto cleanup_wait_display;
}
if (strcmp("none", uv->requested_display) != 0) {
if (pthread_create
@@ -938,13 +1058,14 @@ int main(int argc, char *argv[])
goto cleanup_wait_display;
}
}
if (strcmp("none", uv->requested_capture) != 0) {
if (pthread_create
(&sender_thread_id, NULL, sender_thread,
(&grabber_thread_id, NULL, grabber_thread,
(void *)uv) != 0) {
perror("Unable to create capture thread!\n");
exit_uv(EXIT_FAILURE);
goto cleanup_wait_display;
goto cleanup_wait_capture;
}
}
}
@@ -965,7 +1086,7 @@ cleanup_wait_display:
cleanup_wait_capture:
if (strcmp("none", uv->requested_capture) != 0)
pthread_join(sender_thread_id, NULL);
pthread_join(grabber_thread_id, NULL);
cleanup_wait_audio:
/* also wait for audio threads */

View File

@@ -218,10 +218,10 @@ const char *get_compress_name(struct compress_state *s)
return NULL;
}
struct video_frame *compress_frame(struct compress_state *s, struct video_frame *frame)
struct video_frame *compress_frame(struct compress_state *s, struct video_frame *frame, int buffer_index)
{
if(s)
return s->handle->compress(s->state, frame);
return s->handle->compress(s->state, frame, buffer_index);
else
return NULL;
}

View File

@@ -65,7 +65,7 @@ typedef void *(*compress_init_t)(char *cfg);
* @param uncompressed frame
* @return compressed frame
*/
typedef struct video_frame * (*compress_compress_t)(void *state, struct video_frame *frame);
typedef struct video_frame * (*compress_compress_t)(void *state, struct video_frame *frame, int buffer_index);
/**
* Cleanup function
*/
@@ -74,7 +74,7 @@ typedef void (*compress_done_t)(void *);
void show_compress_help(void);
struct compress_state *compress_init(char *config_string);
const char *get_compress_name(struct compress_state *);
struct video_frame *compress_frame(struct compress_state *, struct video_frame*);
struct video_frame *compress_frame(struct compress_state *, struct video_frame*, int buffer_index);
void compress_done(struct compress_state *);
#endif /* __video_compress_h */

View File

@@ -261,7 +261,7 @@ void * dxt_glsl_compress_init(char * opts)
return s;
}
struct video_frame * dxt_glsl_compress(void *arg, struct video_frame * tx)
struct video_frame * dxt_glsl_compress(void *arg, struct video_frame * tx, int buffer_idx)
{
struct video_compress *s = (struct video_compress *) arg;
int i;

View File

@@ -48,5 +48,5 @@
#include "video_codec.h"
void * dxt_glsl_compress_init(char * opts);
struct video_frame * dxt_glsl_compress(void *args, struct video_frame * tx);
struct video_frame * dxt_glsl_compress(void *args, struct video_frame * tx, int buffer_index);
void dxt_glsl_compress_done(void *args);

View File

@@ -272,7 +272,7 @@ void *fastdxt_init(const char *num_threads_str)
return compress;
}
struct video_frame * fastdxt_compress(void *args, struct video_frame *tx)
struct video_frame * fastdxt_compress(void *args, struct video_frame *tx, int buffer_idx)
{
/* This thread will be called from main.c and handle the compress_threads */
struct video_compress *compress = (struct video_compress *)args;

View File

@@ -48,5 +48,5 @@
#include "video_codec.h"
void * fastdxt_init(const char *num_threads_str);
struct video_frame * fastdxt_compress(void *args, struct video_frame * tx);
struct video_frame * fastdxt_compress(void *args, struct video_frame * tx, int buffer_index);
void fastdxt_done(void *args);

View File

@@ -61,7 +61,7 @@ struct compress_jpeg_state {
struct gpujpeg_encoder *encoder;
struct gpujpeg_parameters encoder_param;
struct video_frame *out;
struct video_frame *out[2];
decoder_t decoder;
char *decoded;
@@ -74,8 +74,11 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra
static int configure_with(struct compress_jpeg_state *s, struct video_frame *frame)
{
unsigned int x;
int frame_idx;
s->out = vf_alloc(frame->tile_count);
for (frame_idx = 0; frame_idx < 2; frame_idx++) {
s->out[frame_idx] = vf_alloc(frame->tile_count);
}
for (x = 0; x < frame->tile_count; ++x) {
if (vf_get_tile(frame, x)->width != vf_get_tile(frame, 0)->width ||
@@ -84,14 +87,18 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra
exit_uv(129);
return FALSE;
}
vf_get_tile(s->out, x)->width = vf_get_tile(frame, 0)->width;
vf_get_tile(s->out, x)->height = vf_get_tile(frame, 0)->height;
}
s->out->interlacing = frame->interlacing;
s->out->fps = frame->fps;
s->out->color_spec = s->color_spec;
for (frame_idx = 0; frame_idx < 2; frame_idx++) {
for (x = 0; x < frame->tile_count; ++x) {
vf_get_tile(s->out[frame_idx], x)->width = vf_get_tile(frame, 0)->width;
vf_get_tile(s->out[frame_idx], x)->height = vf_get_tile(frame, 0)->height;
}
s->out[frame_idx]->interlacing = frame->interlacing;
s->out[frame_idx]->fps = frame->fps;
s->out[frame_idx]->color_spec = s->color_spec;
s->out[frame_idx]->color_spec = JPEG;
}
switch (frame->color_spec) {
case RGB:
@@ -158,12 +165,11 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra
}
s->out->color_spec = JPEG;
struct gpujpeg_image_parameters param_image;
gpujpeg_image_set_default_parameters(&param_image);
param_image.width = s->out->tiles[0].width;
param_image.height = s->out->tiles[0].height;
param_image.width = s->out[0]->tiles[0].width;
param_image.height = s->out[0]->tiles[0].height;
/*
* IMPORTANT:
@@ -186,10 +192,12 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra
s->encoder = gpujpeg_encoder_create(&s->encoder_param, &param_image);
for (x = 0; x < frame->tile_count; ++x) {
vf_get_tile(s->out, x)->data = (char *) malloc(s->out->tiles[0].width * s->out->tiles[0].height * 3);
vf_get_tile(s->out, x)->linesize = s->out->tiles[0].width * (param_image.color_space == GPUJPEG_RGB ? 3 : 2);
for (frame_idx = 0; frame_idx < 2; frame_idx++) {
for (x = 0; x < frame->tile_count; ++x) {
vf_get_tile(s->out[frame_idx], x)->data = (char *) malloc(s->out[frame_idx]->tiles[0].width * s->out[frame_idx]->tiles[0].height * 3);
vf_get_tile(s->out[frame_idx], x)->linesize = s->out[frame_idx]->tiles[0].width * (param_image.color_space == GPUJPEG_RGB ? 3 : 2);
}
}
if(!s->encoder) {
@@ -198,17 +206,20 @@ static int configure_with(struct compress_jpeg_state *s, struct video_frame *fra
return FALSE;
}
s->decoded = malloc(4 * s->out->tiles[0].width * s->out->tiles[0].height);
s->decoded = malloc(4 * s->out[0]->tiles[0].width * s->out[0]->tiles[0].height);
return TRUE;
}
void * jpeg_compress_init(char * opts)
{
struct compress_jpeg_state *s;
int frame_idx;
s = (struct compress_jpeg_state *) malloc(sizeof(struct compress_jpeg_state));
s->out = NULL;
for (frame_idx = 0; frame_idx < 2; frame_idx++) {
s->out[frame_idx] = NULL;
}
s->decoded = NULL;
if(opts && strcmp(opts, "help") == 0) {
@@ -258,11 +269,12 @@ void * jpeg_compress_init(char * opts)
return s;
}
struct video_frame * jpeg_compress(void *arg, struct video_frame * tx)
struct video_frame * jpeg_compress(void *arg, struct video_frame * tx, int buffer_idx)
{
struct compress_jpeg_state *s = (struct compress_jpeg_state *) arg;
int i;
unsigned char *line1, *line2;
struct video_frame *out;
unsigned int x;
@@ -273,9 +285,11 @@ struct video_frame * jpeg_compress(void *arg, struct video_frame * tx)
return NULL;
}
out = s->out[buffer_idx];
for (x = 0; x < tx->tile_count; ++x) {
struct tile *in_tile = vf_get_tile(tx, x);
struct tile *out_tile = vf_get_tile(s->out, x);
struct tile *out_tile = vf_get_tile(out, x);
line1 = (unsigned char *) in_tile->data;
line2 = (unsigned char *) s->decoded;
@@ -288,7 +302,7 @@ struct video_frame * jpeg_compress(void *arg, struct video_frame * tx)
}
line1 = (unsigned char *) out_tile->data + (in_tile->height - 1) * out_tile->linesize;
for( ; i < s->out->tiles[0].height; ++i) {
for( ; i < (int) out->tiles[0].height; ++i) {
memcpy(line2, line1, out_tile->linesize);
line2 += out_tile->linesize;
}
@@ -313,16 +327,23 @@ struct video_frame * jpeg_compress(void *arg, struct video_frame * tx)
memcpy(out_tile->data, compressed, size);
}
return s->out;
return out;
}
void jpeg_compress_done(void *arg)
{
struct compress_jpeg_state *s = (struct compress_jpeg_state *) arg;
int frame_idx;
if(s->out)
free(s->out->tiles[0].data);
vf_free(s->out);
for (frame_idx = 0; frame_idx < 2; frame_idx++) {
if(s->out[frame_idx]) {
int x;
for (x = 0; x < s->out[frame_idx]->tile_count; ++x) {
free(s->out[frame_idx]->tiles[x].data);
}
}
vf_free(s->out[frame_idx]);
}
if(s->encoder)
gpujpeg_encoder_destroy(s->encoder);

View File

@@ -48,5 +48,5 @@
#include "video.h"
void * jpeg_compress_init(char * opts);
struct video_frame * jpeg_compress(void *args, struct video_frame * tx);
struct video_frame * jpeg_compress(void *args, struct video_frame * tx, int buffer_index);
void jpeg_compress_done(void *args);

View File

@@ -72,7 +72,7 @@ void * none_compress_init(char * opts)
return s;
}
struct video_frame * none_compress(void *arg, struct video_frame * tx)
struct video_frame * none_compress(void *arg, struct video_frame * tx, int buffer_idx)
{
struct none_video_compress *s = (struct none_video_compress *) arg;

View File

@@ -50,5 +50,5 @@
struct gl_context;
void * none_compress_init(char * opts);
struct video_frame * none_compress(void *args, struct video_frame * tx);
struct video_frame * none_compress(void *args, struct video_frame * tx, int buffer_index);
void none_compress_done(void *args);