Control socket: report when video prop change

This commit is contained in:
Martin Pulec
2016-05-02 11:26:00 +02:00
parent 6ffaff45ff
commit 08be232a19
6 changed files with 59 additions and 26 deletions

View File

@@ -97,7 +97,7 @@ enum connection_type {
CLIENT
};
#define MAX_STAT_QUEUE 100
#define MAX_STAT_EVENT_QUEUE 100
struct control_state {
struct module mod;
@@ -115,9 +115,9 @@ struct control_state {
bool started;
thread stat_thread_id;
condition_variable stat_cv;
queue<string> stat_queue;
thread stat_event_thread_id;
condition_variable stat_event_cv;
queue<string> stat_event_queue;
bool stats_on;
};
@@ -129,7 +129,7 @@ static bool parse_msg(char *buffer, int buffer_len, /* out */ char *message, int
static int process_msg(struct control_state *s, fd_t client_fd, char *message, struct client *clients);
static ssize_t write_all(fd_t fd, const void *buf, size_t count);
static void * control_thread(void *args);
static void * stat_thread(void *args);
static void * stat_event_thread(void *args);
static void send_response(fd_t fd, struct response *resp);
#ifndef HAVE_LINUX
@@ -258,7 +258,7 @@ void control_start(struct control_state *s)
platform_pipe_init(s->internal_fd);
s->control_thread_id = thread(control_thread, s);
s->stat_thread_id = thread(stat_thread, s);
s->stat_event_thread_id = thread(stat_event_thread, s);
s->started = true;
}
@@ -294,7 +294,7 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s
} else if (strcasecmp(message, "exit") == 0) {
exit_uv(0);
resp = new_response(RESPONSE_OK, NULL);
} else if (prefix_matches(message, "stats ")) {
} else if (prefix_matches(message, "stats ") || prefix_matches(message, "event ")) {
if (is_internal_port(client_fd)) {
struct client *cur = clients;
char *new_msg = NULL;
@@ -315,13 +315,13 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s
int ret = write_all(cur->fd, new_msg, strlen(new_msg));
if (ret != (int) strlen(new_msg)) {
fprintf(stderr, "Cannot write stats!\n");
log_msg(LOG_LEVEL_WARNING, "Cannot write stats/event!\n");
}
cur = cur->next;
}
free(new_msg);
return ret;
} else {
} else if (prefix_matches(message, "stats ")) {
const char *toggle = suffix(message, "stats ");
if (strcasecmp(toggle, "on") == 0) {
s->stats_on = true;
@@ -695,21 +695,21 @@ static void * control_thread(void *args)
return NULL;
}
static void *stat_thread(void *args)
static void *stat_event_thread(void *args)
{
struct control_state *s = (struct control_state *) args;
while (1) {
std::unique_lock<std::mutex> lk(s->stats_lock);
s->stat_cv.wait(lk, [s] { return s->stat_queue.size() > 0; });
string &line = s->stat_queue.front();
s->stat_event_cv.wait(lk, [s] { return s->stat_event_queue.size() > 0; });
string &line = s->stat_event_queue.front();
if (line.empty()) {
break;
}
int ret = write_all(s->internal_fd[1], line.c_str(), line.length());
s->stat_queue.pop();
s->stat_event_queue.pop();
if (ret <= 0) {
fprintf(stderr, "Cannot write stat line!\n");
}
@@ -728,10 +728,10 @@ void control_done(struct control_state *s)
if(s->started) {
s->stats_lock.lock();
s->stat_queue.push({});
s->stat_event_queue.push({});
s->stats_lock.unlock();
s->stat_cv.notify_one();
s->stat_thread_id.join();
s->stat_event_cv.notify_one();
s->stat_event_thread_id.join();
int ret = write_all(s->internal_fd[1], "quit\r\n", 6);
if (ret > 0) {
@@ -750,20 +750,34 @@ void control_done(struct control_state *s)
delete s;
}
static void control_report_stats_event(struct control_state *s, const std::string &report_line)
{
std::unique_lock<std::mutex> lk(s->stats_lock);
if (s->stat_event_queue.size() < MAX_STAT_EVENT_QUEUE) {
s->stat_event_queue.push(report_line);
} else {
log_msg(LOG_LEVEL_WARNING, "Cannot write stats/event - queue full!!!");
}
lk.unlock();
s->stat_event_cv.notify_one();
}
void control_report_stats(struct control_state *s, const std::string &report_line)
{
if (!s || !s->stats_on) {
return;
}
std::unique_lock<std::mutex> lk(s->stats_lock);
if (s->stat_queue.size() < MAX_STAT_QUEUE) {
s->stat_queue.push("stats " + report_line + "\r\n");
} else {
fprintf(stderr, "Cannot write stats!!!");
}
lk.unlock();
s->stat_cv.notify_one();
control_report_stats_event(s, "stats " + report_line + "\r\n");
}
void control_report_event(struct control_state *s, const std::string &report_line)
{
if (!s) {
return;
}
control_report_stats_event(s, "event " + report_line + "\r\n");
}

View File

@@ -63,6 +63,7 @@ int control_init(int port, int connection_type, struct control_state **state, st
void control_start(struct control_state *state);
void control_done(struct control_state *s);
void control_report_stats(struct control_state *state, const std::string & stat_line);
void control_report_event(struct control_state *state, const std::string & event_line);
#endif // control_socket_h_

View File

@@ -1324,6 +1324,9 @@ static int reconfigure_if_needed(struct state_video_decoder *decoder,
{
if (!video_desc_eq_excl_param(decoder->received_vid_desc, network_desc, PARAM_TILE_COUNT)) {
LOG(LOG_LEVEL_NOTICE) << "[video dec.] New incoming video format detected: " << network_desc << endl;
control_report_event(decoder->control, string("received video changed - ") +
(string) network_desc);
decoder->received_vid_desc = network_desc;
#ifdef RECONFIGURE_IN_FUTURE_THREAD

View File

@@ -46,6 +46,7 @@
#include <stddef.h>
#ifdef __cplusplus
#include <string>
extern "C" {
#endif
@@ -118,6 +119,7 @@ struct video_desc {
bool operator==(video_desc const &) const;
bool operator!=(video_desc const &) const;
bool operator!() const;
operator std::string() const;
#endif
};

View File

@@ -222,3 +222,10 @@ bool video_desc::operator!() const
return color_spec == VIDEO_CODEC_NONE;
}
video_desc::operator string() const
{
ostringstream oss;
oss << *this;
return oss.str();
}

View File

@@ -125,7 +125,13 @@ void *(*ultragrid_rtp_video_rxtx::get_receiver_thread())(void *arg) {
void ultragrid_rtp_video_rxtx::send_frame(shared_ptr<video_frame> tx_frame)
{
m_video_desc = video_desc_from_frame(tx_frame.get());
auto new_desc = video_desc_from_frame(tx_frame.get());
if (new_desc != m_video_desc) {
control_report_event(m_control, string("captured video changed - ") +
(string) new_desc);
m_video_desc = new_desc;
}
if (m_fec_state) {
tx_frame = m_fec_state->encode(tx_frame);
}