Use pipe supplement over loopback

There isn't pipe() command on MSW, so use this version to supplement it.
This commit is contained in:
Martin Pulec
2015-07-13 09:58:52 +02:00
parent 477216b900
commit 3683438e6a
4 changed files with 207 additions and 60 deletions

View File

@@ -51,6 +51,7 @@
#endif // HAVE_CONFIG_H
#include "control_socket.h"
#include "compat/platform_pipe.h"
#include <condition_variable>
#include <map>
@@ -102,7 +103,6 @@ struct control_state {
thread control_thread_id;
/// @var internal_fd is used for internal communication
fd_t internal_fd[2];
int local_port;
int network_port;
struct module *root_module;
@@ -152,33 +152,6 @@ static ssize_t write_all(fd_t fd, const void *buf, size_t count)
return count;
}
/**
* Creates listening socket for internal communication.
* This can be closed after accepted.
*
* @param[out] port port to be connected to
* @returns listening socket descriptor
*/
static fd_t create_internal_port(int *port)
{
fd_t sock;
sock = socket(AF_INET, SOCK_STREAM, 0);
assert(sock != INVALID_SOCKET);
struct sockaddr_in s_in;
memset(&s_in, 0, sizeof(s_in));
s_in.sin_family = AF_INET;
s_in.sin_addr.s_addr = htonl(INADDR_ANY);
s_in.sin_port = htons(0);
assert(::bind(sock, (const struct sockaddr *) &s_in,
sizeof(s_in)) == 0);
assert(listen(sock, 10) == 0);
socklen_t len = sizeof(s_in);
assert(getsockname(sock, (struct sockaddr *) &s_in, &len) == 0);
*port = ntohs(s_in.sin_port);
return sock;
}
int control_init(int port, int connection_type, struct control_state **state, struct module *root_module)
{
control_state *s = new control_state();
@@ -281,20 +254,17 @@ void control_start(struct control_state *s)
return;
}
fd_t sock;
sock = create_internal_port(&s->local_port);
platform_pipe_init(s->internal_fd);
s->control_thread_id = thread(control_thread, s);
s->stat_thread_id = thread(stat_thread, s);
s->internal_fd[0] = accept(sock, NULL, NULL);
CLOSESOCKET(sock);
s->started = true;
}
#define prefix_matches(x,y) strncasecmp(x, y, strlen(y)) == 0
#define suffix(x,y) x + strlen(y)
#define is_internal_port(x) (x == s->internal_fd[1])
#define is_internal_port(x) (x == s->internal_fd[0])
/**
* @retval -1 exit thread
@@ -543,26 +513,6 @@ static bool parse_msg(char *buffer, int buffer_len, /* out */ char *message, int
return ret;
}
/**
* connects to internal communication channel
*/
static fd_t connect_to_internal_channel(int local_port)
{
struct sockaddr_in s_in;
memset(&s_in, 0, sizeof(s_in));
s_in.sin_family = AF_INET;
s_in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
s_in.sin_port = htons(local_port);
fd_t fd = socket(AF_INET, SOCK_STREAM, 0);
assert(fd != INVALID_SOCKET);
int ret;
ret = connect(fd, (struct sockaddr *) &s_in,
sizeof(s_in));
assert(ret == 0);
return fd;
}
static struct client *add_client(struct client *clients, int fd) {
struct client *new_client = (struct client *)
malloc(sizeof(struct client));
@@ -582,8 +532,6 @@ static void * control_thread(void *args)
struct control_state *s = (struct control_state *) args;
struct client *clients = NULL;
s->internal_fd[1] = connect_to_internal_channel(s->local_port);
if(s->connection_type == CLIENT) {
clients = add_client(clients, s->socket_fd);
}
@@ -592,7 +540,7 @@ static void * control_thread(void *args)
errno = 0;
clients = add_client(clients, s->internal_fd[1]);
clients = add_client(clients, s->internal_fd[0]);
bool should_exit = false;
@@ -695,7 +643,7 @@ static void * control_thread(void *args)
free(tmp);
}
CLOSESOCKET(s->internal_fd[1]);
platform_pipe_close(s->internal_fd[0]);
return NULL;
}
@@ -713,7 +661,7 @@ static void *stat_thread(void *args)
break;
}
int ret = write_all(s->internal_fd[0], line.c_str(), line.length());
int ret = write_all(s->internal_fd[1], line.c_str(), line.length());
s->stat_queue.pop();
if (ret <= 0) {
fprintf(stderr, "Cannot write stat line!\n");
@@ -738,10 +686,10 @@ void control_done(struct control_state *s)
s->stat_cv.notify_one();
s->stat_thread_id.join();
int ret = write_all(s->internal_fd[0], "quit\r\n", 6);
int ret = write_all(s->internal_fd[1], "quit\r\n", 6);
if (ret > 0) {
s->control_thread_id.join();
CLOSESOCKET(s->internal_fd[0]);
platform_pipe_close(s->internal_fd[1]);
} else {
fprintf(stderr, "Cannot exit control thread!\n");
}