diff --git a/Makefile.in b/Makefile.in index 0cd748b06..40f107970 100644 --- a/Makefile.in +++ b/Makefile.in @@ -48,7 +48,7 @@ VPATH = @srcdir@ # autogenerated headers GENERATED_HEADERS = @GENERATED_HEADERS@ -ALL_INCLUDES = $(wildcard $(srcdir)/src/*.h $(srcdir)/src/*/*.h $(srcdir)/src/*/*/*.h) +ALL_INCLUDES = $(wildcard $(srcdir)/src/*.h $(srcdir)/src/*/*.h $(srcdir)/src/*/*/*.h) $(wildcard $(srcdir)/unittest/*.h) OBJS = @OBJS@ \ src/blackmagic_common.o \ @@ -422,7 +422,8 @@ configure-messages: tests: test/run_tests @test/run_tests -UNITTEST_OBJS = unittest/run_tests.o +UNITTEST_OBJS = unittest/run_tests.o \ + unittest/video_desc_test.o unittest/run_tests: $(UNITTEST_OBJS) $(OBJS) $(LINKER) $(LDFLAGS) $(UNITTEST_OBJS) $(OBJS) $(LIBS) -lcppunit -o $@ diff --git a/src/audio/audio.cpp b/src/audio/audio.cpp index 0d06c2d09..4aceb9ba5 100644 --- a/src/audio/audio.cpp +++ b/src/audio/audio.cpp @@ -52,6 +52,8 @@ #include "config_win32.h" #endif +#include +#include #include #include #include @@ -81,10 +83,7 @@ #include "pdb.h" #include "utils/worker.h" -#include -#include -#include -#include +using namespace std; static volatile bool should_exit_audio = false; @@ -515,7 +514,7 @@ static struct rtp *initialize_audio_network(struct audio_network_parameters *par return r; } -static void audio_receiver_process_message(struct state_audio *s, struct msg_receiver *msg, struct state_audio_decoder *decoder) +static struct response * audio_receiver_process_message(struct state_audio *s, struct msg_receiver *msg, struct state_audio_decoder *decoder) { switch (msg->type) { case RECEIVER_MSG_CHANGE_RX_PORT: @@ -525,7 +524,8 @@ static void audio_receiver_process_message(struct state_audio *s, struct msg_rec s->audio_network_device = initialize_audio_network( &s->audio_network_parameters); if (!s->audio_network_device) { - fprintf(stderr, "Changing RX port failed!"); + log_msg(LOG_LEVEL_ERROR, "Changing RX port failed!"); + return new_response(RESPONSE_INT_SERV_ERR, "Changing RX port failed!"); } break; case RECEIVER_MSG_INCREASE_VOLUME: @@ -540,6 +540,8 @@ static void audio_receiver_process_message(struct state_audio *s, struct msg_rec default: abort(); } + + return new_response(RESPONSE_OK, NULL); } struct audio_decoder { @@ -566,8 +568,8 @@ static void *audio_receiver_thread(void *arg) while (!should_exit_audio) { struct message *msg; while((msg= check_message(&s->audio_receiver_module))) { - audio_receiver_process_message(s, (struct msg_receiver *) msg, pbuf_data.decoder); - free_message(msg); + struct response *r = audio_receiver_process_message(s, (struct msg_receiver *) msg, pbuf_data.decoder); + free_message(msg, r); } bool decoded = false; @@ -720,33 +722,43 @@ echo_play(s->echo_state, &pbuf_data.buffer); return NULL; } -static void audio_sender_process_message(struct state_audio *s, struct msg_sender *msg) +static struct response *audio_sender_process_message(struct state_audio *s, struct msg_sender *msg) { assert(s->audio_tx_mode == MODE_SENDER); int ret; switch (msg->type) { case SENDER_MSG_CHANGE_RECEIVER: - ret = rtp_change_dest(s->audio_network_device, - msg->receiver); - - if (ret == FALSE) { - fprintf(stderr, "Changing audio receiver to: %s failed!\n", + { + ret = rtp_change_dest(s->audio_network_device, msg->receiver); - } - if (rtcp_change_dest(s->audio_network_device, - msg->receiver) == FALSE){ - fprintf(stderr, "Changing rtcp audio receiver to: %s failed!\n", - msg->receiver); - } + ostringstream oss; + if (ret == FALSE) { + oss << "Changing audio receiver to: " << msg->receiver << " failed!\n"; + } - break; + if (rtcp_change_dest(s->audio_network_device, + msg->receiver) == FALSE){ + oss << "Changing rtcp audio receiver to: " << + msg->receiver << " failed!\n"; + } + + if (!oss.str().empty()) { + LOG(LOG_LEVEL_ERROR) << oss.str(); + return new_response(RESPONSE_INT_SERV_ERR, oss.str().c_str()); + } + + break; + } case SENDER_MSG_CHANGE_PORT: rtp_done(s->audio_network_device); s->audio_network_parameters.send_port = msg->port; s->audio_network_device = initialize_audio_network( &s->audio_network_parameters); + if (!s->audio_network_device) { + return new_response(RESPONSE_INT_SERV_ERR, NULL); + } break; case SENDER_MSG_PAUSE: s->paused = true; @@ -754,11 +766,13 @@ static void audio_sender_process_message(struct state_audio *s, struct msg_sende case SENDER_MSG_PLAY: s->paused = false; break; + case SENDER_MSG_QUERY_VIDEO_MODE: + return new_response(RESPONSE_BAD_REQUEST, NULL); case SENDER_MSG_CHANGE_FEC: - fprintf(stderr, "Not implemented!\n"); - abort(); - + LOG(LOG_LEVEL_ERROR) << "Not implemented!\n"; + return new_response(RESPONSE_NOT_IMPL, NULL); } + return new_response(RESPONSE_OK, NULL); } struct asend_stats_processing_data { @@ -842,8 +856,8 @@ static void *audio_sender_thread(void *arg) while (!should_exit_audio) { struct message *msg; while((msg= check_message(&s->audio_sender_module))) { - audio_sender_process_message(s, (struct msg_sender *) msg); - free_message(msg); + struct response *r = audio_sender_process_message(s, (struct msg_sender *) msg); + free_message(msg, r); } if ((s->audio_tx_mode & MODE_RECEIVER) == 0) { // otherwise receiver thread does the stuff... diff --git a/src/capture_filter.cpp b/src/capture_filter.cpp index 3672b9c6a..de28d698c 100644 --- a/src/capture_filter.cpp +++ b/src/capture_filter.cpp @@ -183,7 +183,7 @@ void capture_filter_destroy(struct capture_filter *state) free(state); } -static void process_message(struct capture_filter *s, struct msg_universal *msg) +static struct response *process_message(struct capture_filter *s, struct msg_universal *msg) { if (strncmp("delete ", msg->text, strlen("delete ")) == 0) { int index = atoi(msg->text + strlen("delete ")); @@ -192,6 +192,7 @@ static void process_message(struct capture_filter *s, struct msg_universal *msg) if (!inst) { fprintf(stderr, "Unable to remove capture filter index %d.\n", index); + return new_response(RESPONSE_INT_SERV_ERR, NULL); } else { printf("Capture filter #%d removed successfully.\n", index); inst->functions->done(inst->state); @@ -208,12 +209,15 @@ static void process_message(struct capture_filter *s, struct msg_universal *msg) if (create_filter(s, fmt) != 0) { fprintf(stderr, "Cannot create capture filter: %s.\n", msg->text); + return new_response(RESPONSE_INT_SERV_ERR, NULL); } else { printf("Capture filter \"%s\" created successfully.\n", msg->text); } free(fmt); } + + return new_response(RESPONSE_OK, NULL); } struct video_frame *capture_filter(struct capture_filter *state, struct video_frame *frame) { @@ -221,8 +225,8 @@ struct video_frame *capture_filter(struct capture_filter *state, struct video_fr struct message *msg; while ((msg = check_message(&s->mod))) { - process_message(s, (struct msg_universal *) msg); - free_message(msg); + struct response *r = process_message(s, (struct msg_universal *) msg); + free_message(msg, r); } for(void *it = simple_linked_list_it_init(s->filters); diff --git a/src/capture_filter/blank.cpp b/src/capture_filter/blank.cpp index 78fe73525..eda4c755b 100644 --- a/src/capture_filter/blank.cpp +++ b/src/capture_filter/blank.cpp @@ -181,9 +181,13 @@ static void done(void *state) delete s; } -static void process_message(struct state_blank *s, struct msg_universal *msg) +static struct response * process_message(struct state_blank *s, struct msg_universal *msg) { - parse(s, msg->text); + if (parse(s, msg->text)) { + return new_response(RESPONSE_OK, NULL); + } else { + return new_response(RESPONSE_BAD_REQUEST, NULL); + } } /** @@ -253,8 +257,8 @@ static struct video_frame *filter(void *state, struct video_frame *in) struct message *msg; while ((msg = check_message(&s->mod))) { - process_message(s, (struct msg_universal *) msg); - free_message(msg); + struct response *r = process_message(s, (struct msg_universal *) msg); + free_message(msg, r); } if (width <= 0 || height <= 0) diff --git a/src/control_socket.cpp b/src/control_socket.cpp index e8a6d9d4d..1a2f1247d 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -361,7 +361,7 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s send_message(s->root_module, path, (struct message *) msg); struct response *resp_audio = send_message(s->root_module, path_audio, (struct message *) msg_audio); - resp_audio->deleter(resp_audio); + free_response(resp_audio); } else if (prefix_matches(message, "receiver-port ")) { struct msg_receiver *msg = (struct msg_receiver *) @@ -382,7 +382,7 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s send_message(s->root_module, path, (struct message *) msg); struct response *resp_audio = send_message(s->root_module, path_audio, (struct message *) msg_audio); - resp_audio->deleter(resp_audio); + free_response(resp_audio); } else if(prefix_matches(message, "fec ")) { struct msg_change_fec_data *msg = (struct msg_change_fec_data *) new_message(sizeof(struct msg_change_fec_data)); @@ -395,7 +395,7 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s msg->media_type = TX_MEDIA_VIDEO; strncpy(msg->fec, fec + 6, sizeof(msg->fec) - 1); } else { - resp = new_response(RESPONSE_NOT_FOUND, strdup("unknown media type")); + resp = new_response(RESPONSE_NOT_FOUND, "unknown media type"); free(msg); } @@ -453,7 +453,7 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s if(!resp) { fprintf(stderr, "No response received!\n"); snprintf(buf, sizeof(buf), "(unknown path: %s)", path); - resp = new_response(RESPONSE_INT_SERV_ERR, strdup(buf)); + resp = new_response(RESPONSE_INT_SERV_ERR, buf); } send_response(client_fd, resp); @@ -464,12 +464,12 @@ static void send_response(fd_t fd, struct response *resp) { char buffer[1024]; - snprintf(buffer, sizeof(buffer) - 2, "%d %s", resp->status, - response_status_to_text(resp->status)); + snprintf(buffer, sizeof(buffer) - 2, "%d %s", response_get_status(resp), + response_status_to_text(response_get_status(resp))); - if(resp->text) { + if (response_get_text(resp)) { strncat(buffer, " ", sizeof(buffer) - strlen(buffer) - 1 - 2); - strncat(buffer, resp->text, sizeof(buffer) - strlen(buffer) - 1 - 2); + strncat(buffer, response_get_text(resp), sizeof(buffer) - strlen(buffer) - 1 - 2); } strcat(buffer, "\r\n"); @@ -478,7 +478,7 @@ static void send_response(fd_t fd, struct response *resp) perror("Unable to write response"); } - resp->deleter(resp); + free_response(resp); } static bool parse_msg(char *buffer, int buffer_len, /* out */ char *message, int *new_buffer_len) diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index b899c76e9..4a55c84b9 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -158,7 +158,7 @@ static struct item *qinit(int qsize) return queue; } -void change_replica_type(struct hd_rum_translator_state *s, +struct response *change_replica_type(struct hd_rum_translator_state *s, struct module *mod, struct message *msg) { struct replica *r = (struct replica *) mod->priv_data; @@ -171,11 +171,13 @@ void change_replica_type(struct hd_rum_translator_state *s, r->type = replica::type_t::RECOMPRESS; } else { fprintf(stderr, "Unknown replica type \"%s\"\n", data->text); - return; + return new_response(RESPONSE_BAD_REQUEST, NULL); } hd_rum_decompress_set_active(s->decompress, r->recompress, r->type == replica::type_t::RECOMPRESS); + + return new_response(RESPONSE_OK, NULL); } static void *writer(void *arg) @@ -188,13 +190,14 @@ static void *writer(void *arg) for (unsigned int i = 0; i < s->replicas.size(); i++) { struct message *msg; while ((msg = check_message(&s->replicas[i]->mod))) { - change_replica_type(s, &s->replicas[i]->mod, msg); - free_message(msg); + struct response *r = change_replica_type(s, &s->replicas[i]->mod, msg); + free_message(msg, r); } } struct msg_universal *msg; while ((msg = (struct msg_universal *) check_message(&s->mod))) { + struct response *r = NULL; if (strncasecmp(msg->text, "delete-port ", strlen("delete-port ")) == 0) { int index = atoi(msg->text + strlen("delete-port ")); hd_rum_decompress_remove_port(s->decompress, index); @@ -228,9 +231,11 @@ static void *writer(void *arg) hd_rum_decompress_append_port(s->decompress, rep->recompress); hd_rum_decompress_set_active(s->decompress, rep->recompress, false); } + } else { + r = new_response(RESPONSE_BAD_REQUEST, NULL); } - free_message((struct message *) msg); + free_message((struct message *) msg, r ? r : new_response(RESPONSE_OK, NULL)); } // then process incoming packets @@ -311,7 +316,7 @@ static bool parse_fmt(int argc, char **argv, char **bufsize, unsigned short *por *control_connection_type = atoi(strchr(item, ':') + 1); } } else if(strcmp(argv[start_index], "--capabilities") == 0) { - print_capabilities(CAPABILITY_COMPRESS); + print_capabilities(CAPABILITY_COMPRESS, NULL, false); return false; } else if(strcmp(argv[start_index], "--help") == 0) { usage(argv[0]); diff --git a/src/host.cpp b/src/host.cpp index 77c3e24f8..7c60a24b4 100644 --- a/src/host.cpp +++ b/src/host.cpp @@ -9,9 +9,13 @@ #include "host.h" +#include "messaging.h" #include "video_capture.h" #include "video_compress.h" +#include "video.h" +#include #include +#include using namespace std; @@ -39,14 +43,35 @@ bool color_term = (getenv("TERM") && (strcmp(getenv("TERM"), "xterm") == 0 || st bool ldgm_device_gpu = false; const char *window_title = NULL; +#include -void print_capabilities(int mask) +void print_capabilities(int mask, struct module *root, bool use_vidcap) { if (mask & CAPABILITY_COMPRESS) { + /// try to figure out actual video format and consequently number of pixels per sec + int pixs_per_sec_uncompressed = 0; + if (use_vidcap && root) { + for (int attempt = 0; attempt < 2; ++attempt) { + struct msg_sender *m = (struct msg_sender *) new_message(sizeof(struct msg_sender)); + m->type = SENDER_MSG_QUERY_VIDEO_MODE; + struct response *r = send_message_sync(root, "sender", (struct message *) m, 1000); + if (response_get_status(r) == RESPONSE_OK) { + const char *text = response_get_text(r); + struct video_desc desc; + istringstream iss(text); + iss >> desc; + pixs_per_sec_uncompressed = desc.width * desc.height * desc.fps; + + break; + } + free_response(r); + sleep(1); + } + } cout << "Compressions:" << endl; auto const & compress_capabilities = get_compress_capabilities(); for (auto const & it : compress_capabilities) { - cout << "(" << it.name << ";" << it.quality << ";" << it.bitrate << ";" << + cout << "(" << it.name << ";" << it.quality << ";" << setiosflags(ios_base::fixed) << setprecision(2) << it.bpp * pixs_per_sec_uncompressed << ";" << it.enc_prop.latency << ";" << it.enc_prop.cpu_cores << ";" << it.enc_prop.gpu_gflops << ";" << it.dec_prop.latency << ";" << it.dec_prop.cpu_cores << ";" << it.dec_prop.gpu_gflops << ")\n"; diff --git a/src/host.h b/src/host.h index 6d24b8286..c398da82c 100644 --- a/src/host.h +++ b/src/host.h @@ -72,6 +72,7 @@ extern "C" { #endif +struct module; struct video_frame; struct vidcap_params; @@ -122,7 +123,7 @@ extern char *sage_network_device; #define CAPABILITY_COMPRESS (1<<0) #define CAPABILITY_CAPTURE (1<<1) -void print_capabilities(int mask); +void print_capabilities(int mask, struct module *root, bool use_vidcap); #ifdef __cplusplus } diff --git a/src/keyboard_control.cpp b/src/keyboard_control.cpp index 6174210c9..f6ab77796 100644 --- a/src/keyboard_control.cpp +++ b/src/keyboard_control.cpp @@ -135,7 +135,7 @@ void keyboard_control::run() } auto resp = send_message(m_root, path, (struct message *) m); - resp->deleter(resp); + free_response(resp); break; } diff --git a/src/main.cpp b/src/main.cpp index fb362c51a..3448bf909 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -484,7 +484,7 @@ int main(int argc, char *argv[]) long packet_rate; const char *requested_mcast_if = NULL; - unsigned requested_mtu = 0; + unsigned requested_mtu = 1500; const char *postprocess = NULL; const char *requested_display = "none"; const char *requested_receiver = "::1"; @@ -499,6 +499,8 @@ int main(int argc, char *argv[]) const chrono::steady_clock::time_point start_time(chrono::steady_clock::now()); keyboard_control kc{}; + bool print_capabilities_req = false; + #ifdef USE_MTRACE mtrace(); #endif @@ -843,8 +845,7 @@ int main(int argc, char *argv[]) window_title = optarg; break; case OPT_CAPABILITIES: - print_capabilities(CAPABILITY_CAPTURE | CAPABILITY_COMPRESS); - return EXIT_SUCCESS; + print_capabilities_req = true; break; case OPT_AUDIO_DELAY: audio_delay = atoi(optarg); @@ -863,11 +864,6 @@ int main(int argc, char *argv[]) argc -= optind; argv += optind; - if (requested_mtu == 0) // mtu wasn't specified on the command line - { - requested_mtu = 1500; // the default value for RTP - } - printf("%s", PACKAGE_STRING); #ifdef GIT_VERSION printf(" (rev %s)", GIT_VERSION); @@ -1185,6 +1181,11 @@ int main(int argc, char *argv[]) exit_uv(i); } + if (print_capabilities_req) { + print_capabilities(CAPABILITY_CAPTURE | CAPABILITY_COMPRESS, &root_mod, strcmp("none", vidcap_params_get_driver(vidcap_params_head)) != 0); + exit_uv(EXIT_SUCCESS); + } + cleanup: if (strcmp("none", requested_display) != 0 && receiver_thread_started) diff --git a/src/messaging.cpp b/src/messaging.cpp index b570ad7cf..6d416a470 100644 --- a/src/messaging.cpp +++ b/src/messaging.cpp @@ -1,9 +1,13 @@ #include "messaging.h" +#include #include #include +#include #include +#include #include +#include #include "debug.h" #include "module.h" @@ -13,7 +17,40 @@ #define MAX_MESSAGES 100 #define MAX_MESSAGES_FOR_NOT_EXISTING_RECV 10 -struct response *send_message(struct module *root, const char *const_path, struct message *msg) +using namespace std; +using namespace ultragrid; + +struct response { + int status; + char text[]; +}; + + +namespace { +struct responder { + responder() : received_response(nullptr) {} + static void receive_response(void *s, struct response *r) { + (*((shared_ptr *) s))->receive_response_real(r); + } + void receive_response_real(struct response *r) { + unique_lock lk(lock); + received_response = r; + lk.unlock(); + cv.notify_one(); + } + + struct response *received_response; + condition_variable cv; + mutex lock; +}; + +struct pair_msg_path { + struct message *msg; + char path[]; +}; +} + +static struct response *send_message_common(struct module *root, const char *const_path, struct message *msg, bool sync, int timeout_ms = 0) { /** * @invariant @@ -23,6 +60,13 @@ struct response *send_message(struct module *root, const char *const_path, struc char *item, *save_ptr; tmp = path = strdup(const_path); struct module *receiver = root; + shared_ptr responder; + + if (sync) { + msg->send_response = responder::receive_response; + responder = shared_ptr(new struct responder()); + msg->priv_data = new shared_ptr(responder); + } pthread_mutex_lock(&receiver->lock); @@ -41,21 +85,30 @@ struct response *send_message(struct module *root, const char *const_path, struc if (simple_linked_list_size(old_receiver->msg_queue_childs) > MAX_MESSAGES_FOR_NOT_EXISTING_RECV) { printf("Dropping some old messages for %s (queue full).\n", const_path); struct pair_msg_path *mp = (struct pair_msg_path *) simple_linked_list_pop(old_receiver->msg_queue_childs); - free_message(mp->msg); + free_message(mp->msg, new_response(RESPONSE_NOT_FOUND, "Receiver not found")); free(mp); } struct pair_msg_path *saved_message = (struct pair_msg_path *) - malloc(sizeof(struct pair_msg_path)); + malloc(sizeof(struct pair_msg_path) + strlen(const_path + (item - tmp)) + 1); saved_message->msg = msg; - memset(saved_message->path, 0, sizeof(saved_message->path)); - strncpy(saved_message->path, const_path + (item - tmp), sizeof(saved_message->path) - 1); + strcpy(saved_message->path, const_path + (item - tmp)); simple_linked_list_append(old_receiver->msg_queue_childs, saved_message); pthread_mutex_unlock(&old_receiver->lock); free(tmp); - return new_response(RESPONSE_ACCEPTED, strdup("(receiver not yet exists)")); + if (!sync) { + return new_response(RESPONSE_ACCEPTED, "(receiver not yet exists)"); + } else { + unique_lock lk(responder->lock); + responder->cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [responder]{return responder->received_response != NULL;}); + if (responder->received_response) { + return responder->received_response; + } else { + return new_response(RESPONSE_ACCEPTED, NULL); + } + } } pthread_mutex_lock(&receiver->lock); pthread_mutex_unlock(&old_receiver->lock); @@ -63,29 +116,52 @@ struct response *send_message(struct module *root, const char *const_path, struc free(tmp); - lock_guard guard(receiver->lock, lock_guard_retain_ownership_t()); + //pthread_mutex_guard guard(receiver->lock, lock_guard_retain_ownership_t()); if (simple_linked_list_size(receiver->msg_queue) >= MAX_MESSAGES) { struct message *m = (struct message *) simple_linked_list_pop(receiver->msg_queue); - free_message(m); + free_message(m, new_response(RESPONSE_INT_SERV_ERR, "Too much unprocessed messages")); printf("Dropping some messages for %s - queue full.\n", const_path); } simple_linked_list_append(receiver->msg_queue, msg); - return new_response(RESPONSE_ACCEPTED, NULL); + pthread_mutex_unlock(&receiver->lock); + + if (!sync) { + return new_response(RESPONSE_ACCEPTED, NULL); + } else { + unique_lock lk(responder->lock); + responder->cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [responder]{return responder->received_response != NULL;}); + if (responder->received_response) { + return responder->received_response; + } else { + return new_response(RESPONSE_ACCEPTED, NULL); + } + } +} + +struct response *send_message(struct module *root, const char *const_path, struct message *msg) +{ + return send_message_common(root, const_path, msg, false); + +} + +struct response *send_message_sync(struct module *root, const char *const_path, struct message *msg, int timeout_ms) +{ + return send_message_common(root, const_path, msg, true, timeout_ms); } void module_check_undelivered_messages(struct module *node) { - lock_guard guard(node->lock); + pthread_mutex_guard guard(node->lock); for(void *it = simple_linked_list_it_init(node->msg_queue_childs); it != NULL; ) { struct pair_msg_path *msg = (struct pair_msg_path *) simple_linked_list_it_next(&it); struct module *receiver = get_matching_child(node, msg->path); if (receiver) { struct response *resp = send_message_to_receiver(receiver, msg->msg); - resp->deleter(resp); + free_response(resp); simple_linked_list_remove(node->msg_queue_childs, msg); free(msg); // reinit iterator @@ -96,7 +172,7 @@ void module_check_undelivered_messages(struct module *node) struct response *send_message_to_receiver(struct module *receiver, struct message *msg) { - lock_guard guard(receiver->lock); + pthread_mutex_guard guard(receiver->lock); simple_linked_list_append(receiver->msg_queue, msg); return new_response(RESPONSE_ACCEPTED, NULL); } @@ -111,55 +187,82 @@ struct message *new_message(size_t len) return ret; } -void free_message(struct message *msg) +/** + * Frees message + * + * Additionally, it enforces user to pass response to be sent to sender. + */ +void free_message(struct message *msg, struct response *r) { - if(msg && msg->data_deleter) { + if (!msg) { + return; + } + + if (r) { + if (msg->send_response) { + msg->send_response(msg->priv_data, r); + } else { + free_response(r); + } + } + + if (msg->data_deleter) { msg->data_deleter(msg); } - if(msg) { - free(msg); - } -} -static void response_deleter(struct response *response) -{ - free(response->text); - free(response); + if (msg->priv_data) { + delete (shared_ptr *) msg->priv_data; + } + + free(msg); } /** * Creates new response * * @param status status - * @param text optional text contained in message, will be freeed after send (with free()) + * @param text optional text contained in message */ -struct response *new_response(int status, char *text) +struct response *new_response(int status, const char *text) { - struct response *resp = (struct response *) malloc(sizeof(struct response)); + struct response *resp = (struct response *) malloc(sizeof(struct response) + (text ? strlen(text) : 0) + 1); resp->status = status; - resp->text = text; - resp->deleter = response_deleter; + if (text) { + strcpy(resp->text, text); + } else { + resp->text[0] = '\0'; + } return resp; } +void free_response(struct response *r) { + free(r); +} + +int response_get_status(struct response *r) { + return r->status; +} + +const char *response_get_text(struct response *r) { + return r->text; +} + const char *response_status_to_text(int status) { - struct { - int status; - const char *text; - } mapping[] = { - { 200, "OK" }, - { 202, "Accepted" }, - { 400, "Bad Request" }, - { 404, "Not Found" }, - { 500, "Internal Server Error" }, - { 501, "Not Implemented" }, - { 0, NULL }, + const static unordered_map mapping = { + { RESPONSE_OK, "OK" }, + { RESPONSE_ACCEPTED, "Accepted" }, + { RESPONSE_NO_CONTENT, "No Content" }, + { RESPONSE_BAD_REQUEST, "Bad Request" }, + { RESPONSE_NOT_FOUND, "Not Found" }, + { RESPONSE_REQ_TIMEOUT, "Request Timeout" }, + { RESPONSE_INT_SERV_ERR, "Internal Server Error" }, + { RESPONSE_NOT_IMPL, "Not Implemented" }, }; - for(int i = 0; mapping[i].status != 0; ++i) { - if(status == mapping[i].status) - return mapping[i].text; + auto it = mapping.find(status); + if (it != mapping.end()) { + return it->second; } return NULL; @@ -167,7 +270,7 @@ const char *response_status_to_text(int status) struct message *check_message(struct module *mod) { - lock_guard guard(mod->lock); + pthread_mutex_guard guard(mod->lock); if(simple_linked_list_size(mod->msg_queue) > 0) { return (struct message *) simple_linked_list_pop(mod->msg_queue); diff --git a/src/messaging.h b/src/messaging.h index efe418b6d..a251fffee 100644 --- a/src/messaging.h +++ b/src/messaging.h @@ -15,17 +15,14 @@ extern "C" { struct messaging; struct module; - -struct response { - int status; - void (*deleter)(struct response*); - char *text; -}; +struct response; #define RESPONSE_OK 200 #define RESPONSE_ACCEPTED 202 +#define RESPONSE_NO_CONTENT 204 #define RESPONSE_BAD_REQUEST 400 #define RESPONSE_NOT_FOUND 404 +#define RESPONSE_REQ_TIMEOUT 408 #define RESPONSE_INT_SERV_ERR 500 #define RESPONSE_NOT_IMPL 501 @@ -37,6 +34,11 @@ struct message { * Please note that the deleter must not delete the struct itself. */ void (*data_deleter)(struct message *); + + // following members are used internally and should not be touched anyhow + // except from messaging.cpp + void (*send_response)(void *priv_data, struct response *); + void *priv_data; }; enum msg_sender_type { @@ -45,6 +47,7 @@ enum msg_sender_type { SENDER_MSG_PLAY, SENDER_MSG_PAUSE, SENDER_MSG_CHANGE_FEC, + SENDER_MSG_QUERY_VIDEO_MODE, }; struct msg_sender { @@ -101,20 +104,17 @@ struct msg_universal { char text[8192]; }; -struct pair_msg_path { - char path[8192]; - struct message *msg; -}; - -struct response *new_response(int status, char *optional_message); - -typedef struct response *(*msg_callback_t)(struct module *mod, struct message *msg); +struct response *new_response(int status, const char *optional_message); +void free_response(struct response *r); +int response_get_status(struct response *r); +const char *response_get_text(struct response *r); void module_check_undelivered_messages(struct module *); struct response *send_message(struct module *, const char *path, struct message *msg) __attribute__ ((warn_unused_result)); +struct response *send_message_sync(struct module *, const char *path, struct message *msg, int timeout_ms) __attribute__ ((warn_unused_result)); struct response *send_message_to_receiver(struct module *, struct message *msg) __attribute__ ((warn_unused_result)); struct message *new_message(size_t length) __attribute__ ((warn_unused_result)); -void free_message(struct message *m); +void free_message(struct message *m, struct response *r); const char *response_status_to_text(int status); struct message *check_message(struct module *); diff --git a/src/rtp/video_decoders.cpp b/src/rtp/video_decoders.cpp index 89ba14d60..d77f6292f 100644 --- a/src/rtp/video_decoders.cpp +++ b/src/rtp/video_decoders.cpp @@ -1357,7 +1357,7 @@ static bool reconfigure_decoder(struct state_video_decoder *decoder, msg->new_desc = decoder->received_vid_desc; struct response *resp = send_message_to_receiver(decoder->parent, (struct message *) msg); - resp->deleter(resp); + free_response(resp); return true; } diff --git a/src/rtsp/BasicRTSPOnlySubsession.cpp b/src/rtsp/BasicRTSPOnlySubsession.cpp index 92dda1661..3a0ea309e 100644 --- a/src/rtsp/BasicRTSPOnlySubsession.cpp +++ b/src/rtsp/BasicRTSPOnlySubsession.cpp @@ -305,7 +305,7 @@ void BasicRTSPOnlySubsession::deleteStream(unsigned clientSessionId, msgV1->type = SENDER_MSG_CHANGE_PORT; struct response *resp; resp = send_message(fmod, pathV, (struct message *) msgV1); - resp->deleter(resp); + free_response(resp); //CHANGE DST ADDRESS struct msg_sender *msgV2 = (struct msg_sender *) new_message( @@ -313,7 +313,7 @@ void BasicRTSPOnlySubsession::deleteStream(unsigned clientSessionId, strncpy(msgV2->receiver, "127.0.0.1", sizeof(msgV2->receiver) - 1); msgV2->type = SENDER_MSG_CHANGE_RECEIVER; resp = send_message(fmod, pathV, (struct message *) msgV2); - resp->deleter(resp); + free_response(resp); } } @@ -335,7 +335,7 @@ void BasicRTSPOnlySubsession::deleteStream(unsigned clientSessionId, msgA1->type = SENDER_MSG_CHANGE_PORT; struct response *resp; resp = send_message(fmod, pathA, (struct message *) msgA1); - resp->deleter(resp); + free_response(resp); //CHANGE DST ADDRESS struct msg_sender *msgA2 = (struct msg_sender *) new_message( @@ -343,7 +343,7 @@ void BasicRTSPOnlySubsession::deleteStream(unsigned clientSessionId, strncpy(msgA2->receiver, "127.0.0.1", sizeof(msgA2->receiver) - 1); msgA2->type = SENDER_MSG_CHANGE_RECEIVER; resp = send_message(fmod, pathA, (struct message *) msgA2); - resp->deleter(resp); + free_response(resp); } } } diff --git a/src/transmit.cpp b/src/transmit.cpp index f40bb4724..92295c1d6 100644 --- a/src/transmit.cpp +++ b/src/transmit.cpp @@ -183,7 +183,7 @@ static void tx_update(struct tx *tx, struct video_frame *frame, int substream) msg->type = SENDER_MSG_CHANGE_FEC; struct response *resp = send_message_to_receiver(get_parent_module(&tx->mod), (struct message *) msg); - resp->deleter(resp); + free_response(resp); tx->avg_len_last = tx->avg_len; } } @@ -287,7 +287,7 @@ static bool set_fec(struct tx *tx, const char *fec_const) msg->type = SENDER_MSG_CHANGE_FEC; struct response *resp = send_message_to_receiver(get_parent_module(&tx->mod), (struct message *) msg); - resp->deleter(resp); + free_response(resp); } else { // delay creation until we have avarage frame size tx->max_loss = atof(fec_cfg); } @@ -304,7 +304,7 @@ static bool set_fec(struct tx *tx, const char *fec_const) fec_cfg ? fec_cfg : ""); msg->type = SENDER_MSG_CHANGE_FEC; struct response *resp = send_message_to_receiver(get_parent_module(&tx->mod), (struct message *) msg); - resp->deleter(resp); + free_response(resp); tx->fec_scheme = FEC_RS; } } else { @@ -323,16 +323,19 @@ static void fec_check_messages(struct tx *tx) struct msg_change_fec_data *data = (struct msg_change_fec_data *) msg; if(tx->media_type != data->media_type) { fprintf(stderr, "[Transmit] FEC media type mismatch!\n"); - free_message(msg); + free_message(msg, new_response(RESPONSE_BAD_REQUEST, NULL)); continue; } + struct response *r; if (set_fec(tx, data->fec)) { + r = new_response(RESPONSE_OK, NULL); printf("[Transmit] FEC set to new setting.\n"); } else { + r = new_response(RESPONSE_INT_SERV_ERR, NULL); fprintf(stderr, "[Transmit] Unable to reconfigure FEC!\n"); } - free_message(msg); + free_message(msg, r); } } diff --git a/src/types.h b/src/types.h index 314e54c32..9a6b4809e 100644 --- a/src/types.h +++ b/src/types.h @@ -111,6 +111,10 @@ struct video_desc { double fps; enum interlacing_t interlacing; unsigned int tile_count; +#ifdef __cplusplus + bool operator==(video_desc const &) const; + bool operator!() const; +#endif }; typedef enum h264_frame_type { @@ -208,7 +212,6 @@ struct video_frame { h264_frame_type_t h264_frame_type; struct fec_desc fec_params; - uint32_t ssrc; }; diff --git a/src/utils/lock_guard.h b/src/utils/lock_guard.h index 1ff97dfa1..c1a53500e 100644 --- a/src/utils/lock_guard.h +++ b/src/utils/lock_guard.h @@ -14,6 +14,8 @@ #include "compat/platform_spin.h" #include +namespace ultragrid { + struct lock_guard_retain_ownership_t { }; @@ -40,13 +42,15 @@ class generic_lock_guard { }; typedef class generic_lock_guard - lock_guard; + pthread_mutex_guard; typedef class generic_lock_guard - rwlock_guard_write; + pthread_rwlock_guard_write; typedef class generic_lock_guard - rwlock_guard_read; + pthread_rwlock_guard_read; typedef class generic_lock_guard - spinlock_guard; + platform_spin_guard; + +} // end of namespace ultragrid #endif // LOCK_GUARD_H_ diff --git a/src/utils/resource_manager.cpp b/src/utils/resource_manager.cpp index 7dede2ce7..50ebe2bbb 100644 --- a/src/utils/resource_manager.cpp +++ b/src/utils/resource_manager.cpp @@ -69,6 +69,7 @@ typedef int type_t; using namespace std; +using namespace ultragrid; class options_t { public: @@ -190,7 +191,7 @@ class resource_manager_t { resource *acquire(string name, type_t type, options_t const & options) { resource *ret; - spinlock_guard lock(m_access_lock); + platform_spin_guard lock(m_access_lock); string item_name = name + "#" + resource::get_suffix(type); obj_map_t::iterator it = m_objs.find(item_name); @@ -207,7 +208,7 @@ class resource_manager_t { } void release(string name, type_t type) { - spinlock_guard lock(m_access_lock); + platform_spin_guard lock(m_access_lock); string item_name = name + "#" + resource::get_suffix(type); obj_map_t::iterator it = m_objs.find(item_name); diff --git a/src/video.cpp b/src/video.cpp index d1e736a0d..cfe4e1c7c 100644 --- a/src/video.cpp +++ b/src/video.cpp @@ -45,6 +45,7 @@ #include "config_win32.h" #endif // HAVE_CONFIG_H +#include "debug.h" #include "video.h" #include @@ -138,6 +139,9 @@ std::ostream& operator<<(std::ostream& os, const video_desc& desc) { std::streamsize p = os.precision(); ios_base::fmtflags f = os.flags(); + if (desc.tile_count > 1) { + os << desc.tile_count << "*"; + } os << desc.width << "x" << desc.height << " @" << setprecision(2) << setiosflags(ios_base::fixed) << desc.fps * (desc.interlacing == PROGRESSIVE || desc.interlacing == SEGMENTED_FRAME ? 1 : 2) << get_interlacing_suffix(desc.interlacing) << ", codec " @@ -147,3 +151,69 @@ std::ostream& operator<<(std::ostream& os, const video_desc& desc) return os; } +std::istream& operator>>(std::istream& is, video_desc& desc) +{ + video_desc out; + char buf[1024]; + char *line = buf; + + char interl_suffix[4] = ""; + char codec_name[11] = ""; + + out.tile_count = 1; + + unsigned int i = 0; + int num_spaces = 0; + while (is.good() && i < sizeof buf - 1) { + char c; + is.get(c); + if (!is.good()) { + break; + } + if (num_spaces == 3 && + !(isalnum(c) || c == '.')) // allowed symbols in codec name + { + is.unget(); + break; + } else { + buf[i++] = c; + } + if (c == ' ') { + num_spaces += 1; + } + } + buf[i] = '\0'; + + /// @todo regex matching would be better + if (strchr(line, '*')) { + out.tile_count = atoi(line); + line = strchr(line, '*') + 1; + } + int ret = sscanf(line, "%dx%d @%lf%3[a-z], codec %10s", &out.width, &out.height, + &out.fps, interl_suffix, codec_name); + + if (ret != 5) { + is.setstate(std::ios::failbit); + log_msg(LOG_LEVEL_ERROR, "Malformed video_desc representation!\n"); + return is; + } + + out.color_spec = get_codec_from_name(codec_name); + out.interlacing = get_interlacing_from_suffix(interl_suffix); + out.fps = out.fps / (out.interlacing == PROGRESSIVE || out.interlacing == SEGMENTED_FRAME ? 1 : 2); + desc = out; + return is; +} + +bool video_desc::operator==(video_desc const & other) const +{ + return video_desc_eq(*this, other); + +} + +bool video_desc::operator!() const +{ + return color_spec == VIDEO_CODEC_NONE; + +} + diff --git a/src/video.h b/src/video.h index 0e906f85c..13e11c5bd 100644 --- a/src/video.h +++ b/src/video.h @@ -86,7 +86,9 @@ enum video_mode guess_video_mode(int num_substreams); #endif // __cplusplus #ifdef __cplusplus +#include #include +std::istream& operator>>(std::istream& is, video_desc& desc); std::ostream& operator<<(std::ostream& os, const video_desc& desc); #endif diff --git a/src/video_capture/switcher.c b/src/video_capture/switcher.c index 29c736c2c..0f0544f02 100644 --- a/src/video_capture/switcher.c +++ b/src/video_capture/switcher.c @@ -215,6 +215,7 @@ vidcap_switcher_grab(void *state, struct audio_frame **audio) while ((msg = check_message(&s->mod))) { struct msg_universal *msg_univ = (struct msg_universal *) msg; int new_selected_device = atoi(msg_univ->text); + struct response *r; if (new_selected_device >= 0 && new_selected_device < s->devices_cnt){ if (s->excl_init) { @@ -226,8 +227,11 @@ vidcap_switcher_grab(void *state, struct audio_frame **audio) } s->selected_device = new_selected_device; + r = new_response(RESPONSE_OK, NULL); + } else { + r = new_response(RESPONSE_BAD_REQUEST, NULL); } - free_message(msg); + free_message(msg, r); } frame = vidcap_grab(s->devices[s->selected_device], &audio_frame); diff --git a/src/video_compress.cpp b/src/video_compress.cpp index 51882d0eb..eea66c668 100644 --- a/src/video_compress.cpp +++ b/src/video_compress.cpp @@ -287,6 +287,7 @@ list get_compress_capabilities() */ static void compress_process_message(compress_state_proxy *proxy, struct msg_change_compress_data *data) { + struct response *r; /* In this case we are only changing some parameter of compression. * This means that we pass the parameter to compress driver. */ if(data->what == CHANGE_PARAMS) { @@ -299,7 +300,7 @@ static void compress_process_message(compress_state_proxy *proxy, struct msg_cha sizeof(tmp_data->config_string) - 1); struct response *resp = send_message_to_receiver(proxy->ptr->state[i], (struct message *) tmp_data); - resp->deleter(resp); + r = resp; } } else { @@ -312,10 +313,13 @@ static void compress_process_message(compress_state_proxy *proxy, struct msg_cha struct compress_state_real *old = proxy->ptr; proxy->ptr = new_state; compress_done_real(old); + r = new_response(RESPONSE_OK, NULL); + } else { + r = new_response(RESPONSE_INT_SERV_ERR, NULL); } } - free_message((struct message *) data); + free_message((struct message *) data, r); } /** diff --git a/src/video_compress.h b/src/video_compress.h index 08e92241c..9686610c0 100644 --- a/src/video_compress.h +++ b/src/video_compress.h @@ -124,7 +124,7 @@ struct compress_preset { std::string name; int quality; - long bitrate; + double bpp; ///< bits per pixel compress_prop enc_prop; compress_prop dec_prop; }; diff --git a/src/video_compress/cuda_dxt.cpp b/src/video_compress/cuda_dxt.cpp index d7572be91..6c55199fd 100644 --- a/src/video_compress/cuda_dxt.cpp +++ b/src/video_compress/cuda_dxt.cpp @@ -294,8 +294,8 @@ struct compress_info_t cuda_dxt_info = { NULL, { #if 0 - { "DXT1", 35, 250*1000*1000, {7, 0.2, 10}, {} }, - { "DXT5", 50, 500*1000*1000, {10, 0.2, 20}, {} }, + { "DXT1", 35, 4.0, {7, 0.2, 10}, {} }, + { "DXT5", 50, 8.0, 0.2, 20}, {} }, #endif } }; diff --git a/src/video_compress/dxt_glsl.cpp b/src/video_compress/dxt_glsl.cpp index 923ecb9dc..1e7b6b6ff 100644 --- a/src/video_compress/dxt_glsl.cpp +++ b/src/video_compress/dxt_glsl.cpp @@ -333,8 +333,8 @@ struct compress_info_t rtdxt_info = { NULL, dxt_is_supported, { - { "DXT1", 35, 250*1000*1000, {75, 0.3, 25}, {15, 0.1, 10} }, - { "DXT5", 50, 500*1000*1000, {75, 0.3, 35}, {15, 0.1, 20} }, + { "DXT1", 35, 4.0, {75, 0.3, 25}, {15, 0.1, 10} }, + { "DXT5", 50, 8.0, {75, 0.3, 35}, {15, 0.1, 20} }, } }; diff --git a/src/video_compress/jpeg.cpp b/src/video_compress/jpeg.cpp index 88f5fe67c..6044c88c6 100644 --- a/src/video_compress/jpeg.cpp +++ b/src/video_compress/jpeg.cpp @@ -194,13 +194,16 @@ static void jpeg_check_messages(struct state_video_compress_jpeg *s) while ((msg = check_message(&s->module_data))) { struct msg_change_compress_data *data = (struct msg_change_compress_data *) msg; + struct response *r; if (parse_fmt(s, data->config_string) == 0) { + r = new_response(RESPONSE_OK, NULL); printf("[Libavcodec] Compression successfully changed.\n"); } else { + r = new_response(RESPONSE_BAD_REQUEST, NULL); fprintf(stderr, "[Libavcodec] Unable to change compression!\n"); } memset(&s->saved_desc, 0, sizeof(s->saved_desc)); - free_message(msg); + free_message(msg, r); } } @@ -397,9 +400,9 @@ struct compress_info_t jpeg_info = { NULL, jpeg_is_supported, { - { "60", 60, 30*1000*1000, {10, 0.6, 75}, {10, 0.6, 75} }, - { "80", 70, 36*1000*1000, {12, 0.6, 90}, {15, 0.6, 100} }, - { "90", 80, 44*1000*1000, {15, 0.6, 100}, {20, 0.6, 150} }, + { "60", 60, 0.68, {10, 0.6, 75}, {10, 0.6, 75} }, + { "80", 70, 0.87, {12, 0.6, 90}, {15, 0.6, 100} }, + { "90", 80, 1.54, {15, 0.6, 100}, {20, 0.6, 150} }, }, }; diff --git a/src/video_compress/libavcodec.cpp b/src/video_compress/libavcodec.cpp index b8e246ebc..6a3e2ce96 100644 --- a/src/video_compress/libavcodec.cpp +++ b/src/video_compress/libavcodec.cpp @@ -1139,13 +1139,16 @@ static void libavcodec_check_messages(struct state_video_compress_libav *s) while ((msg = check_message(&s->module_data))) { struct msg_change_compress_data *data = (struct msg_change_compress_data *) msg; + struct response *r; if (parse_fmt(s, data->config_string) == 0) { log_msg(LOG_LEVEL_NOTICE, "[Libavcodec] Compression successfully changed.\n"); + r = new_response(RESPONSE_OK, NULL); } else { log_msg(LOG_LEVEL_ERROR, "[Libavcodec] Unable to change compression!\n"); + r = new_response(RESPONSE_INT_SERV_ERR, NULL); } memset(&s->saved_desc, 0, sizeof(s->saved_desc)); - free_message(msg); + free_message(msg, r); } } @@ -1159,9 +1162,9 @@ struct compress_info_t libavcodec_info = { libavcodec_compress_tile, libavcodec_is_supported, { - { "codec=H.264:bpp=0.096", 20, 5*1000*1000, {25, 1.5, 0}, {15, 1, 0} }, - { "codec=H.264:bpp=0.193", 30, 10*1000*1000, {28, 1.5, 0}, {20, 1, 0} }, - { "codec=H.264:bitrate=0.289", 50, 15*1000*1000, {30, 1.5, 0}, {25, 1, 0} }, + { "codec=H.264:bpp=0.096", 20, 0.096, {25, 1.5, 0}, {15, 1, 0} }, + { "codec=H.264:bpp=0.193", 30, 0.193, {28, 1.5, 0}, {20, 1, 0} }, + { "codec=H.264:bitrate=0.289", 50, 0.289, {30, 1.5, 0}, {25, 1, 0} }, #if 0 { "codec=MJPEG", 35, 50*1000*1000, {20, 0.75, 0}, {10, 0.5, 0} }, #endif diff --git a/src/video_compress/none.cpp b/src/video_compress/none.cpp index 1534f8650..f05cf8817 100644 --- a/src/video_compress/none.cpp +++ b/src/video_compress/none.cpp @@ -116,7 +116,7 @@ struct compress_info_t none_info = { NULL, []{return true;}, // uncompressed video is always supported { - { "", 100, static_cast((1920*1080*25*2*8)*1.03), {0, 1, 0}, {0, 1, 0} }, + { "", 100, 16.0, {0, 1, 0}, {0, 1, 0} }, }, }; diff --git a/src/video_display/gl.cpp b/src/video_display/gl.cpp index f116b87fe..3006bec6d 100644 --- a/src/video_display/gl.cpp +++ b/src/video_display/gl.cpp @@ -660,12 +660,15 @@ static void glut_idle_callback(void) struct message *msg; while ((msg = check_message(&s->mod))) { auto msg_univ = reinterpret_cast(msg); + struct response *r; if (strncasecmp(msg_univ->text, "win-title ", strlen("win_title ")) == 0) { glutSetWindowTitle(msg_univ->text + strlen("win_title ")); + r = new_response(RESPONSE_OK, NULL); } else { fprintf(stderr, "[GL] Unknown command received: %s\n", msg_univ->text); + r = new_response(RESPONSE_BAD_REQUEST, NULL); } - free_message(msg); + free_message(msg, r); } unique_lock lk(s->lock); diff --git a/src/video_display/sdl.cpp b/src/video_display/sdl.cpp index 658047c19..0f09b6af3 100644 --- a/src/video_display/sdl.cpp +++ b/src/video_display/sdl.cpp @@ -265,13 +265,16 @@ static int display_sdl_handle_events(struct state_sdl *s) struct message *msg; while ((msg = check_message(&s->mod))) { auto msg_univ = reinterpret_cast(msg); + struct response *r; if (strncasecmp(msg_univ->text, "win-title ", strlen("win_title ")) == 0) { const char *title = msg_univ->text + strlen("win_title"); SDL_WM_SetCaption(title, title); + r = new_response(RESPONSE_OK, NULL); } else { fprintf(stderr, "[SDL] Unknown command received: %s\n", msg_univ->text); + r = new_response(RESPONSE_BAD_REQUEST, NULL); } - free_message(msg); + free_message(msg, r); } SDL_Event sdl_event; diff --git a/src/video_frame.c b/src/video_frame.c index 9b62200f9..cdb8f493a 100644 --- a/src/video_frame.c +++ b/src/video_frame.c @@ -193,24 +193,34 @@ const char *get_interlacing_description(enum interlacing_t interlacing) return NULL; } +static const char *interlacing_suffixes[] = { + [PROGRESSIVE] = "p", + [UPPER_FIELD_FIRST] = "tff", + [LOWER_FIELD_FIRST] = "bff", + [INTERLACED_MERGED] = "i", + [SEGMENTED_FRAME] = "psf", +}; + const char *get_interlacing_suffix(enum interlacing_t interlacing) { - switch (interlacing) { - case PROGRESSIVE: - return "p"; - case UPPER_FIELD_FIRST: - return "tff"; - case LOWER_FIELD_FIRST: - return "bff"; - case INTERLACED_MERGED: - return "i"; - case SEGMENTED_FRAME: - return "psf"; + if (interlacing < sizeof interlacing_suffixes / sizeof interlacing_suffixes[0]) + return interlacing_suffixes[interlacing]; + else + return NULL; +} + +enum interlacing_t get_interlacing_from_suffix(const char *suffix) +{ + for (size_t i = 0; i < sizeof interlacing_suffixes / sizeof interlacing_suffixes[0]; ++i) { + if (interlacing_suffixes[i] && strcmp(suffix, interlacing_suffixes[i]) == 0) { + return i; + } } - return NULL; + return PROGRESSIVE; } + /** * @todo * Needs to be more efficient diff --git a/src/video_frame.h b/src/video_frame.h index 975335c8f..72e041703 100644 --- a/src/video_frame.h +++ b/src/video_frame.h @@ -150,6 +150,7 @@ const char *get_interlacing_description(enum interlacing_t interlacing); * Eg. p, i or psf */ const char *get_interlacing_suffix(enum interlacing_t interlacing); +enum interlacing_t get_interlacing_from_suffix(const char *suffix); void il_lower_to_merged(char *dst, char *src, int linesize, int height, void **stored_state); /* these functions transcode one interlacing format to another */ diff --git a/src/video_rxtx.cpp b/src/video_rxtx.cpp index d9efe2fcb..6b832f7fa 100644 --- a/src/video_rxtx.cpp +++ b/src/video_rxtx.cpp @@ -195,8 +195,8 @@ void video_rxtx::check_sender_messages() { // process external messages struct message *msg_external; while((msg_external = check_message(&m_sender_mod))) { - process_message((struct msg_sender *) msg_external); - free_message(msg_external); + struct response *r = process_message((struct msg_sender *) msg_external); + free_message(msg_external, r); } } diff --git a/src/video_rxtx.h b/src/video_rxtx.h index 4bc62d3bf..5d03af4e8 100644 --- a/src/video_rxtx.h +++ b/src/video_rxtx.h @@ -102,7 +102,8 @@ private: virtual void *(*get_receiver_thread())(void *arg) = 0; static void *sender_thread(void *args); void *sender_loop(); - virtual void process_message(struct msg_sender *) { + virtual struct response *process_message(struct msg_sender *) { + return NULL; } struct compress_state *m_compression; diff --git a/src/video_rxtx/rtp.cpp b/src/video_rxtx/rtp.cpp index 733b50b04..94691cff1 100644 --- a/src/video_rxtx/rtp.cpp +++ b/src/video_rxtx/rtp.cpp @@ -74,27 +74,32 @@ using namespace std; -void rtp_video_rxtx::process_message(struct msg_sender *msg) +struct response *rtp_video_rxtx::process_message(struct msg_sender *msg) { int ret; switch(msg->type) { case SENDER_MSG_CHANGE_RECEIVER: - assert(m_rxtx_mode == MODE_SENDER); // sender only - assert(m_connections_count == 1); - ret = rtp_change_dest(m_network_devices[0], - msg->receiver); - - if(ret == FALSE) { - fprintf(stderr, "Changing receiver to: %s failed!\n", + { + ostringstream oss; + assert(m_rxtx_mode == MODE_SENDER); // sender only + assert(m_connections_count == 1); + ret = rtp_change_dest(m_network_devices[0], msg->receiver); - } - if (rtcp_change_dest(m_network_devices[0], - msg->receiver) == FALSE){ - fprintf(stderr, "Changing rtcp receiver to: %s failed!\n", - msg->receiver); + if(ret == FALSE) { + oss << "Changing receiver to: " << msg->receiver << " failed!\n"; + } + + if (rtcp_change_dest(m_network_devices[0], + msg->receiver) == FALSE){ + oss << "Changing rtcp receiver to: " << msg->receiver << " failed!\n"; + } + if (!oss.str().empty()) { + return new_response(RESPONSE_INT_SERV_ERR, NULL); + } else { + m_requested_receiver = msg->receiver; + } } - m_requested_receiver = msg->receiver; break; case SENDER_MSG_CHANGE_PORT: assert(m_rxtx_mode == MODE_SENDER); // sender only @@ -111,16 +116,27 @@ void rtp_video_rxtx::process_message(struct msg_sender *msg) delete m_fec_state; m_fec_state = fec::create_from_config(msg->fec_cfg); if (!m_fec_state) { - fprintf(stderr, "Unable to initalize LDGM!\n"); - exit_uv(1); + log_msg(LOG_LEVEL_ERROR, "Unable to initalize LDGM!\n"); + return new_response(RESPONSE_INT_SERV_ERR, NULL); } } break; + case SENDER_MSG_QUERY_VIDEO_MODE: + if (!m_video_desc) { + return new_response(RESPONSE_NO_CONTENT, NULL); + } else { + ostringstream oss; + oss << m_video_desc; + return new_response(RESPONSE_OK, oss.str().c_str()); + } + break; } + + return new_response(RESPONSE_OK, NULL); } rtp_video_rxtx::rtp_video_rxtx(map const ¶ms) : - video_rxtx(params), m_fec_state(NULL), m_start_time(*(const std::chrono::steady_clock::time_point *) params.at("start_time").ptr) + video_rxtx(params), m_fec_state(NULL), m_start_time(*(const std::chrono::steady_clock::time_point *) params.at("start_time").ptr), m_video_desc{} { m_participants = pdb_init(0); m_requested_receiver = (const char *) params.at("receiver").ptr; diff --git a/src/video_rxtx/rtp.h b/src/video_rxtx/rtp.h index 2dfd685a6..c35d5b3d9 100644 --- a/src/video_rxtx/rtp.h +++ b/src/video_rxtx/rtp.h @@ -80,8 +80,9 @@ protected: const char *m_requested_mcast_if; fec *m_fec_state; const std::chrono::steady_clock::time_point m_start_time; + video_desc m_video_desc; private: - void process_message(struct msg_sender *); + struct response *process_message(struct msg_sender *); void change_tx_port(int tx_port); }; diff --git a/src/video_rxtx/ultragrid_rtp.cpp b/src/video_rxtx/ultragrid_rtp.cpp index 6279b2fe6..4c64e4780 100644 --- a/src/video_rxtx/ultragrid_rtp.cpp +++ b/src/video_rxtx/ultragrid_rtp.cpp @@ -120,6 +120,7 @@ void *(*ultragrid_rtp_video_rxtx::get_receiver_thread())(void *arg) { void ultragrid_rtp_video_rxtx::send_frame(shared_ptr tx_frame) { + m_video_desc = video_desc_from_frame(tx_frame.get()); if (m_fec_state) { tx_frame = m_fec_state->encode(tx_frame); } @@ -211,20 +212,28 @@ void ultragrid_rtp_video_rxtx::receiver_process_messages() struct msg_receiver *msg; while ((msg = (struct msg_receiver *) check_message(&m_receiver_mod))) { lock_guard lock(m_network_devices_lock); + struct response *r = NULL; switch (msg->type) { case RECEIVER_MSG_CHANGE_RX_PORT: - assert(m_rxtx_mode == MODE_RECEIVER); // receiver only - destroy_rtp_devices(m_network_devices); - m_recv_port_number = msg->new_rx_port; - m_network_devices = initialize_network(m_requested_receiver.c_str(), - m_recv_port_number, - m_send_port_number, m_participants, m_ipv6, - m_requested_mcast_if); - if (!m_network_devices) { - throw runtime_error("Changing RX port failed!"); + { + assert(m_rxtx_mode == MODE_RECEIVER); // receiver only + auto old_devices = m_network_devices; + auto old_port = m_recv_port_number; + m_recv_port_number = msg->new_rx_port; + m_network_devices = initialize_network(m_requested_receiver.c_str(), + m_recv_port_number, + m_send_port_number, m_participants, m_ipv6, + m_requested_mcast_if); + if (!m_network_devices) { + r = new_response(RESPONSE_INT_SERV_ERR, "Changing RX port failed!"); + m_network_devices = old_devices; + m_recv_port_number = old_port; + } else { + destroy_rtp_devices(m_network_devices); + } + break; } - break; case RECEIVER_MSG_VIDEO_PROP_CHANGED: { pdb_iter_t it; @@ -244,7 +253,7 @@ void ultragrid_rtp_video_rxtx::receiver_process_messages() abort(); } - free_message((struct message *) msg); + free_message((struct message *) msg, r ? r : new_response(RESPONSE_OK, NULL)); } } diff --git a/unittest/video_desc_test.cpp b/unittest/video_desc_test.cpp new file mode 100644 index 000000000..5e5ceee6c --- /dev/null +++ b/unittest/video_desc_test.cpp @@ -0,0 +1,52 @@ +#include +#include "video_desc_test.h" + +#include +#include "video.h" + +using namespace std; + +// Registers the fixture into the 'registry' +CPPUNIT_TEST_SUITE_REGISTRATION( video_desc_test ); + +video_desc_test::video_desc_test() : m_test_desc{{1920, 1080, UYVY, 25, INTERLACED_MERGED, 1}, + {1920, 1080, DXT5, 60, PROGRESSIVE, 4}, + {640, 480, H264, 15, PROGRESSIVE, 1}} +{ +} + +video_desc_test::~video_desc_test() +{ +} + +void +video_desc_test::setUp() +{ +} + + +void +video_desc_test::tearDown() +{ +} + +void +video_desc_test::testIOOperatorSymetry() +{ + for (const auto & i : m_test_desc) { + video_desc tmp; + + ostringstream oss; + oss << i; + istringstream iss(oss.str()); + iss >> tmp; + + // Check + ostringstream oss2; + oss2 << tmp; + string err_elem = oss.str() + " vs " + oss2.str(); + CPPUNIT_ASSERT_MESSAGE(err_elem, video_desc_eq(tmp, i)); + CPPUNIT_ASSERT_EQUAL_MESSAGE(err_elem, tmp, i); + } +} + diff --git a/unittest/video_desc_test.h b/unittest/video_desc_test.h new file mode 100644 index 000000000..5a41df163 --- /dev/null +++ b/unittest/video_desc_test.h @@ -0,0 +1,26 @@ +#ifndef VIDEO_DESC_TEST_H +#define VIDEO_DESC_TEST_H + +#include +#include + +#include "types.h" + +class video_desc_test : public CPPUNIT_NS::TestFixture +{ + CPPUNIT_TEST_SUITE( video_desc_test ); + CPPUNIT_TEST( testIOOperatorSymetry ); + CPPUNIT_TEST_SUITE_END(); + +public: + video_desc_test(); + ~video_desc_test(); + void setUp(); + void tearDown(); + + void testIOOperatorSymetry(); +private: + const std::list m_test_desc; // tested desc +}; + +#endif // VIDEO_DESC_TEST_H