diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 1d7d1c9fc..832a1cde8 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -97,7 +97,7 @@ enum connection_type { CLIENT }; -#define MAX_STAT_QUEUE 100 +#define MAX_STAT_EVENT_QUEUE 100 struct control_state { struct module mod; @@ -115,9 +115,9 @@ struct control_state { bool started; - thread stat_thread_id; - condition_variable stat_cv; - queue stat_queue; + thread stat_event_thread_id; + condition_variable stat_event_cv; + queue stat_event_queue; bool stats_on; }; @@ -129,7 +129,7 @@ static bool parse_msg(char *buffer, int buffer_len, /* out */ char *message, int static int process_msg(struct control_state *s, fd_t client_fd, char *message, struct client *clients); static ssize_t write_all(fd_t fd, const void *buf, size_t count); static void * control_thread(void *args); -static void * stat_thread(void *args); +static void * stat_event_thread(void *args); static void send_response(fd_t fd, struct response *resp); #ifndef HAVE_LINUX @@ -258,7 +258,7 @@ void control_start(struct control_state *s) platform_pipe_init(s->internal_fd); s->control_thread_id = thread(control_thread, s); - s->stat_thread_id = thread(stat_thread, s); + s->stat_event_thread_id = thread(stat_event_thread, s); s->started = true; } @@ -294,7 +294,7 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s } else if (strcasecmp(message, "exit") == 0) { exit_uv(0); resp = new_response(RESPONSE_OK, NULL); - } else if (prefix_matches(message, "stats ")) { + } else if (prefix_matches(message, "stats ") || prefix_matches(message, "event ")) { if (is_internal_port(client_fd)) { struct client *cur = clients; char *new_msg = NULL; @@ -315,13 +315,13 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s int ret = write_all(cur->fd, new_msg, strlen(new_msg)); if (ret != (int) strlen(new_msg)) { - fprintf(stderr, "Cannot write stats!\n"); + log_msg(LOG_LEVEL_WARNING, "Cannot write stats/event!\n"); } cur = cur->next; } free(new_msg); return ret; - } else { + } else if (prefix_matches(message, "stats ")) { const char *toggle = suffix(message, "stats "); if (strcasecmp(toggle, "on") == 0) { s->stats_on = true; @@ -695,21 +695,21 @@ static void * control_thread(void *args) return NULL; } -static void *stat_thread(void *args) +static void *stat_event_thread(void *args) { struct control_state *s = (struct control_state *) args; while (1) { std::unique_lock lk(s->stats_lock); - s->stat_cv.wait(lk, [s] { return s->stat_queue.size() > 0; }); - string &line = s->stat_queue.front(); + s->stat_event_cv.wait(lk, [s] { return s->stat_event_queue.size() > 0; }); + string &line = s->stat_event_queue.front(); if (line.empty()) { break; } int ret = write_all(s->internal_fd[1], line.c_str(), line.length()); - s->stat_queue.pop(); + s->stat_event_queue.pop(); if (ret <= 0) { fprintf(stderr, "Cannot write stat line!\n"); } @@ -728,10 +728,10 @@ void control_done(struct control_state *s) if(s->started) { s->stats_lock.lock(); - s->stat_queue.push({}); + s->stat_event_queue.push({}); s->stats_lock.unlock(); - s->stat_cv.notify_one(); - s->stat_thread_id.join(); + s->stat_event_cv.notify_one(); + s->stat_event_thread_id.join(); int ret = write_all(s->internal_fd[1], "quit\r\n", 6); if (ret > 0) { @@ -750,20 +750,34 @@ void control_done(struct control_state *s) delete s; } +static void control_report_stats_event(struct control_state *s, const std::string &report_line) +{ + std::unique_lock lk(s->stats_lock); + + if (s->stat_event_queue.size() < MAX_STAT_EVENT_QUEUE) { + s->stat_event_queue.push(report_line); + } else { + log_msg(LOG_LEVEL_WARNING, "Cannot write stats/event - queue full!!!"); + } + lk.unlock(); + s->stat_event_cv.notify_one(); +} + void control_report_stats(struct control_state *s, const std::string &report_line) { if (!s || !s->stats_on) { return; } - std::unique_lock lk(s->stats_lock); - - if (s->stat_queue.size() < MAX_STAT_QUEUE) { - s->stat_queue.push("stats " + report_line + "\r\n"); - } else { - fprintf(stderr, "Cannot write stats!!!"); - } - lk.unlock(); - s->stat_cv.notify_one(); + control_report_stats_event(s, "stats " + report_line + "\r\n"); +} + +void control_report_event(struct control_state *s, const std::string &report_line) +{ + if (!s) { + return; + } + + control_report_stats_event(s, "event " + report_line + "\r\n"); } diff --git a/src/control_socket.h b/src/control_socket.h index 9e4077c9b..d4b26e5b8 100644 --- a/src/control_socket.h +++ b/src/control_socket.h @@ -63,6 +63,7 @@ int control_init(int port, int connection_type, struct control_state **state, st void control_start(struct control_state *state); void control_done(struct control_state *s); void control_report_stats(struct control_state *state, const std::string & stat_line); +void control_report_event(struct control_state *state, const std::string & event_line); #endif // control_socket_h_ diff --git a/src/rtp/video_decoders.cpp b/src/rtp/video_decoders.cpp index 9009633be..28438adf2 100644 --- a/src/rtp/video_decoders.cpp +++ b/src/rtp/video_decoders.cpp @@ -1324,6 +1324,9 @@ static int reconfigure_if_needed(struct state_video_decoder *decoder, { if (!video_desc_eq_excl_param(decoder->received_vid_desc, network_desc, PARAM_TILE_COUNT)) { LOG(LOG_LEVEL_NOTICE) << "[video dec.] New incoming video format detected: " << network_desc << endl; + control_report_event(decoder->control, string("received video changed - ") + + (string) network_desc); + decoder->received_vid_desc = network_desc; #ifdef RECONFIGURE_IN_FUTURE_THREAD diff --git a/src/types.h b/src/types.h index c50c0c654..c8cd2ddb7 100644 --- a/src/types.h +++ b/src/types.h @@ -46,6 +46,7 @@ #include #ifdef __cplusplus +#include extern "C" { #endif @@ -118,6 +119,7 @@ struct video_desc { bool operator==(video_desc const &) const; bool operator!=(video_desc const &) const; bool operator!() const; + operator std::string() const; #endif }; diff --git a/src/video.cpp b/src/video.cpp index b49ed7008..79258e062 100644 --- a/src/video.cpp +++ b/src/video.cpp @@ -222,3 +222,10 @@ bool video_desc::operator!() const return color_spec == VIDEO_CODEC_NONE; } +video_desc::operator string() const +{ + ostringstream oss; + oss << *this; + return oss.str(); +} + diff --git a/src/video_rxtx/ultragrid_rtp.cpp b/src/video_rxtx/ultragrid_rtp.cpp index 0a6c91095..c8dcf8729 100644 --- a/src/video_rxtx/ultragrid_rtp.cpp +++ b/src/video_rxtx/ultragrid_rtp.cpp @@ -125,7 +125,13 @@ void *(*ultragrid_rtp_video_rxtx::get_receiver_thread())(void *arg) { void ultragrid_rtp_video_rxtx::send_frame(shared_ptr tx_frame) { - m_video_desc = video_desc_from_frame(tx_frame.get()); + auto new_desc = video_desc_from_frame(tx_frame.get()); + if (new_desc != m_video_desc) { + control_report_event(m_control, string("captured video changed - ") + + (string) new_desc); + m_video_desc = new_desc; + + } if (m_fec_state) { tx_frame = m_fec_state->encode(tx_frame); }