diff --git a/Makefile.in b/Makefile.in index 08dc378dc..bfd2c19c6 100644 --- a/Makefile.in +++ b/Makefile.in @@ -100,6 +100,7 @@ OBJS = @OBJS@ \ src/capture_filter/scale.o \ src/compat/drand48.o \ src/compat/gettimeofday.o \ + src/compat/platform_pipe.o \ src/compat/platform_semaphore.o \ src/compat/platform_spin.o \ src/compat/platform_time.o \ diff --git a/src/compat/platform_pipe.cpp b/src/compat/platform_pipe.cpp new file mode 100644 index 000000000..47c1d5c05 --- /dev/null +++ b/src/compat/platform_pipe.cpp @@ -0,0 +1,146 @@ +/** + * @file src/compat/platform_pipe.cpp + * @author Martin Pulec + */ +/* + * Copyright (c) 2015 CESNET, z. s. p. o. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, is permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of CESNET nor the names of its contributors may be + * used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, + * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + * AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#include "config_unix.h" +#include "config_win32.h" +#endif // HAVE_CONFIG_H + +#include "debug.h" + +#include "compat/platform_pipe.h" + +#include + +#ifdef WIN32 +#define CLOSESOCKET closesocket +#else +#define CLOSESOCKET close +#endif + +using std::thread; + +static fd_t open_socket(int *port) +{ + fd_t sock; + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock == INVALID_SOCKET) { + return INVALID_SOCKET; + } + + struct sockaddr_in s_in; + memset(&s_in, 0, sizeof(s_in)); + s_in.sin_family = AF_INET; + s_in.sin_addr.s_addr = htonl(INADDR_ANY); + s_in.sin_port = htons(0); + if (::bind(sock, (const struct sockaddr *) &s_in, + sizeof(s_in)) != 0) { + return INVALID_SOCKET; + } + if (listen(sock, 10) != 0) { + return INVALID_SOCKET; + } + socklen_t len = sizeof(s_in); + if (getsockname(sock, (struct sockaddr *) &s_in, &len) != 0) { + return INVALID_SOCKET; + } + *port = ntohs(s_in.sin_port); + return sock; +} + +static fd_t connect_to_socket(int local_port) +{ + struct sockaddr_in s_in; + memset(&s_in, 0, sizeof(s_in)); + s_in.sin_family = AF_INET; + s_in.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + s_in.sin_port = htons(local_port); + fd_t fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd == INVALID_SOCKET) { + return INVALID_SOCKET; + } + int ret; + ret = connect(fd, (struct sockaddr *) &s_in, + sizeof(s_in)); + if (ret != 0) { + return INVALID_SOCKET; + } + + return fd; +} + +struct params { + int port; + fd_t sock; +}; + +static void * worker(void *args) +{ + struct params *p = (struct params *) args; + p->sock = connect_to_socket(p->port); + + return NULL; +} + + +int platform_pipe_init(fd_t p[2]) +{ + struct params par; + fd_t sock = open_socket(&par.port); + + thread thr(worker, &par); + + p[0] = accept(sock, NULL, NULL); + if (p[0] == INVALID_SOCKET) { + return -1; + } + thr.join(); + p[1] = par.sock; + if (p[1] == INVALID_SOCKET) { + return -1; + } + CLOSESOCKET(sock); + + return 0; +} + +void platform_pipe_close(fd_t pipe) +{ + CLOSESOCKET(pipe); +} + diff --git a/src/compat/platform_pipe.h b/src/compat/platform_pipe.h new file mode 100644 index 000000000..da3daed7a --- /dev/null +++ b/src/compat/platform_pipe.h @@ -0,0 +1,52 @@ +/** + * @file compat/platform_pipe.h + * @author Martin Pulec + */ +/* + * Copyright (c) 2015 CESNET, z. s. p. o. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, is permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of CESNET nor the names of its contributors may be + * used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, + * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + * AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef platform_pipe_h +#define platform_pipe_h + +#ifdef __cplusplus +extern "C" { +#endif // __cplusplus + +int platform_pipe_init(fd_t pipe[2]); +void platform_pipe_close(fd_t pipe); + +#ifdef __cplusplus +} +#endif // __cplusplus + +#endif /* _PLATFORM_SEMAPHORE_H */ diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 05f603863..856ac9d43 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -51,6 +51,7 @@ #endif // HAVE_CONFIG_H #include "control_socket.h" +#include "compat/platform_pipe.h" #include #include @@ -102,7 +103,6 @@ struct control_state { thread control_thread_id; /// @var internal_fd is used for internal communication fd_t internal_fd[2]; - int local_port; int network_port; struct module *root_module; @@ -152,33 +152,6 @@ static ssize_t write_all(fd_t fd, const void *buf, size_t count) return count; } -/** - * Creates listening socket for internal communication. - * This can be closed after accepted. - * - * @param[out] port port to be connected to - * @returns listening socket descriptor - */ -static fd_t create_internal_port(int *port) -{ - fd_t sock; - sock = socket(AF_INET, SOCK_STREAM, 0); - assert(sock != INVALID_SOCKET); - struct sockaddr_in s_in; - memset(&s_in, 0, sizeof(s_in)); - s_in.sin_family = AF_INET; - s_in.sin_addr.s_addr = htonl(INADDR_ANY); - s_in.sin_port = htons(0); - assert(::bind(sock, (const struct sockaddr *) &s_in, - sizeof(s_in)) == 0); - assert(listen(sock, 10) == 0); - socklen_t len = sizeof(s_in); - assert(getsockname(sock, (struct sockaddr *) &s_in, &len) == 0); - *port = ntohs(s_in.sin_port); - return sock; -} - - int control_init(int port, int connection_type, struct control_state **state, struct module *root_module) { control_state *s = new control_state(); @@ -281,20 +254,17 @@ void control_start(struct control_state *s) return; } - fd_t sock; - sock = create_internal_port(&s->local_port); + platform_pipe_init(s->internal_fd); s->control_thread_id = thread(control_thread, s); s->stat_thread_id = thread(stat_thread, s); - s->internal_fd[0] = accept(sock, NULL, NULL); - CLOSESOCKET(sock); s->started = true; } #define prefix_matches(x,y) strncasecmp(x, y, strlen(y)) == 0 #define suffix(x,y) x + strlen(y) -#define is_internal_port(x) (x == s->internal_fd[1]) +#define is_internal_port(x) (x == s->internal_fd[0]) /** * @retval -1 exit thread @@ -543,26 +513,6 @@ static bool parse_msg(char *buffer, int buffer_len, /* out */ char *message, int return ret; } -/** - * connects to internal communication channel - */ -static fd_t connect_to_internal_channel(int local_port) -{ - struct sockaddr_in s_in; - memset(&s_in, 0, sizeof(s_in)); - s_in.sin_family = AF_INET; - s_in.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - s_in.sin_port = htons(local_port); - fd_t fd = socket(AF_INET, SOCK_STREAM, 0); - assert(fd != INVALID_SOCKET); - int ret; - ret = connect(fd, (struct sockaddr *) &s_in, - sizeof(s_in)); - assert(ret == 0); - - return fd; -} - static struct client *add_client(struct client *clients, int fd) { struct client *new_client = (struct client *) malloc(sizeof(struct client)); @@ -582,8 +532,6 @@ static void * control_thread(void *args) struct control_state *s = (struct control_state *) args; struct client *clients = NULL; - s->internal_fd[1] = connect_to_internal_channel(s->local_port); - if(s->connection_type == CLIENT) { clients = add_client(clients, s->socket_fd); } @@ -592,7 +540,7 @@ static void * control_thread(void *args) errno = 0; - clients = add_client(clients, s->internal_fd[1]); + clients = add_client(clients, s->internal_fd[0]); bool should_exit = false; @@ -695,7 +643,7 @@ static void * control_thread(void *args) free(tmp); } - CLOSESOCKET(s->internal_fd[1]); + platform_pipe_close(s->internal_fd[0]); return NULL; } @@ -713,7 +661,7 @@ static void *stat_thread(void *args) break; } - int ret = write_all(s->internal_fd[0], line.c_str(), line.length()); + int ret = write_all(s->internal_fd[1], line.c_str(), line.length()); s->stat_queue.pop(); if (ret <= 0) { fprintf(stderr, "Cannot write stat line!\n"); @@ -738,10 +686,10 @@ void control_done(struct control_state *s) s->stat_cv.notify_one(); s->stat_thread_id.join(); - int ret = write_all(s->internal_fd[0], "quit\r\n", 6); + int ret = write_all(s->internal_fd[1], "quit\r\n", 6); if (ret > 0) { s->control_thread_id.join(); - CLOSESOCKET(s->internal_fd[0]); + platform_pipe_close(s->internal_fd[1]); } else { fprintf(stderr, "Cannot exit control thread!\n"); }