Files
nDPId/nDPIsrvd.c
Toni Uhlig a41ddafa88 Git tag/commit version printing for nDPId/nDPIsrvd. Reduces confusion.
* disabled subshell spawn for run_tests.sh, common pitfall while using counters

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
2021-06-08 15:23:33 +02:00

940 lines
27 KiB
C

#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <syslog.h>
#include <sys/epoll.h>
#include <sys/signalfd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include "config.h"
#include "nDPIsrvd.h"
#include "utils.h"
enum sock_type
{
JSON_SOCK,
SERV_SOCK
};
struct remote_desc
{
enum sock_type sock_type;
int fd;
struct nDPIsrvd_buffer buf;
union {
struct
{
int json_sockfd;
struct sockaddr_un peer;
unsigned long long int json_bytes;
} event_json;
struct
{
int serv_sockfd;
struct sockaddr_in peer;
char peer_addr[INET_ADDRSTRLEN];
} event_serv;
};
};
static struct
{
struct remote_desc * desc;
size_t desc_size;
size_t desc_used;
} remotes = {NULL, 0, 0};
static int nDPIsrvd_main_thread_shutdown = 0;
static int json_sockfd;
static int serv_sockfd;
static struct nDPIsrvd_address serv_address = {
.raw.sa_family = 0xFFFF,
};
static struct
{
int log_to_stderr;
char * pidfile;
char * json_sockpath;
char * serv_optarg;
char * user;
char * group;
} nDPIsrvd_options = {};
static int fcntl_add_flags(int fd, int flags)
{
int cur_flags = fcntl(fd, F_GETFL, 0);
if (cur_flags == -1)
{
return 1;
}
return fcntl(fd, F_SETFL, cur_flags | flags);
}
static int fcntl_del_flags(int fd, int flags)
{
int cur_flags = fcntl(fd, F_GETFL, 0);
if (cur_flags == -1)
{
return 1;
}
return fcntl(fd, F_SETFL, cur_flags & ~flags);
}
static int create_listen_sockets(void)
{
json_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
serv_sockfd = socket(serv_address.raw.sa_family, SOCK_STREAM, 0);
if (json_sockfd < 0 || serv_sockfd < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error opening socket: %s", strerror(errno));
return 1;
}
int opt = 1;
if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 ||
setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "setsockopt with SO_REUSEADDR failed: %s", strerror(errno));
return 1;
}
struct sockaddr_un json_addr;
json_addr.sun_family = AF_UNIX;
if (snprintf(json_addr.sun_path, sizeof(json_addr.sun_path), "%s", nDPIsrvd_options.json_sockpath) <= 0)
{
syslog(LOG_DAEMON | LOG_ERR, "snprintf failed: %s", strerror(errno));
return 1;
}
if (bind(json_sockfd, (struct sockaddr *)&json_addr, sizeof(json_addr)) < 0)
{
unlink(nDPIsrvd_options.json_sockpath);
syslog(LOG_DAEMON | LOG_ERR,
"Error on binding UNIX socket (collector) to %s: %s",
nDPIsrvd_options.json_sockpath,
strerror(errno));
return 1;
}
if (bind(serv_sockfd, &serv_address.raw, serv_address.size) < 0)
{
syslog(LOG_DAEMON | LOG_ERR,
"Error on binding socket (distributor) to %s: %s",
nDPIsrvd_options.serv_optarg,
strerror(errno));
unlink(nDPIsrvd_options.json_sockpath);
return 1;
}
if (listen(json_sockfd, 16) < 0 || listen(serv_sockfd, 16) < 0)
{
unlink(nDPIsrvd_options.json_sockpath);
syslog(LOG_DAEMON | LOG_ERR, "Error on listen: %s", strerror(errno));
return 1;
}
if (fcntl_add_flags(json_sockfd, O_NONBLOCK) != 0)
{
unlink(nDPIsrvd_options.json_sockpath);
syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags for the collector socket: %s", strerror(errno));
return 1;
}
if (fcntl_add_flags(serv_sockfd, O_NONBLOCK) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags for the distributor socket: %s", strerror(errno));
return 1;
}
return 0;
}
static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, int remote_fd)
{
if (remotes.desc_used == remotes.desc_size)
{
return NULL;
}
for (size_t i = 0; i < remotes.desc_size; ++i)
{
if (remotes.desc[i].fd == -1)
{
remotes.desc_used++;
if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0)
{
return NULL;
}
remotes.desc[i].sock_type = type;
remotes.desc[i].fd = remote_fd;
return &remotes.desc[i];
}
}
return NULL;
}
static int add_event(int epollfd, int fd, void * ptr)
{
struct epoll_event event = {};
if (ptr != NULL)
{
event.data.ptr = ptr;
}
else
{
event.data.fd = fd;
}
event.events = EPOLLIN;
return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}
static int del_event(int epollfd, int fd)
{
return epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
}
static void disconnect_client(int epollfd, struct remote_desc * const current)
{
if (current->fd > -1)
{
del_event(epollfd, current->fd);
if (close(current->fd) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error closing fd: %s", strerror(errno));
}
current->fd = -1;
remotes.desc_used--;
}
nDPIsrvd_buffer_free(&current->buf);
}
static int nDPIsrvd_parse_options(int argc, char ** argv)
{
int opt;
while ((opt = getopt(argc, argv, "lc:dp:s:u:g:vh")) != -1)
{
switch (opt)
{
case 'l':
nDPIsrvd_options.log_to_stderr = 1;
break;
case 'c':
free(nDPIsrvd_options.json_sockpath);
nDPIsrvd_options.json_sockpath = strdup(optarg);
break;
case 'd':
daemonize_enable();
break;
case 'p':
free(nDPIsrvd_options.pidfile);
nDPIsrvd_options.pidfile = strdup(optarg);
break;
case 's':
free(nDPIsrvd_options.serv_optarg);
nDPIsrvd_options.serv_optarg = strdup(optarg);
break;
case 'u':
free(nDPIsrvd_options.user);
nDPIsrvd_options.user = strdup(optarg);
break;
case 'g':
free(nDPIsrvd_options.group);
nDPIsrvd_options.group = strdup(optarg);
break;
case 'v':
fprintf(stderr, "%s", get_nDPId_version());
return 1;
case 'h':
default:
fprintf(stderr, "%s\n", get_nDPId_version());
fprintf(stderr,
"Usage: %s [-l] [-c path-to-unix-sock] [-d] [-p pidfile]\n"
"\t[-s path-to-unix-socket|distributor-host:port] [-u user] [-g group]\n"
"\t[-v] [-h]\n",
argv[0]);
return 1;
}
}
if (nDPIsrvd_options.pidfile == NULL)
{
nDPIsrvd_options.pidfile = strdup(nDPIsrvd_PIDFILE);
}
if (is_path_absolute("Pidfile", nDPIsrvd_options.pidfile) != 0)
{
return 1;
}
if (nDPIsrvd_options.json_sockpath == NULL)
{
nDPIsrvd_options.json_sockpath = strdup(COLLECTOR_UNIX_SOCKET);
}
if (is_path_absolute("JSON socket", nDPIsrvd_options.json_sockpath) != 0)
{
return 1;
}
if (nDPIsrvd_options.serv_optarg == NULL)
{
nDPIsrvd_options.serv_optarg = strdup(DISTRIBUTOR_UNIX_SOCKET);
}
if (nDPIsrvd_setup_address(&serv_address, nDPIsrvd_options.serv_optarg) != 0)
{
fprintf(stderr, "%s: Could not parse address `%s'\n", argv[0], nDPIsrvd_options.serv_optarg);
return 1;
}
if (serv_address.raw.sa_family == AF_UNIX && is_path_absolute("SERV socket", nDPIsrvd_options.serv_optarg) != 0)
{
return 1;
}
if (optind < argc)
{
fprintf(stderr, "%s: Unexpected argument after options\n", argv[0]);
return 1;
}
return 0;
}
static struct remote_desc * accept_remote(int server_fd,
enum sock_type socktype,
struct sockaddr * const sockaddr,
socklen_t * const addrlen)
{
int client_fd = accept(server_fd, sockaddr, addrlen);
if (client_fd < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
return NULL;
}
struct remote_desc * current = get_unused_remote_descriptor(socktype, client_fd);
if (current == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
return NULL;
}
return current;
}
static int new_connection(int epollfd, int eventfd)
{
union {
struct sockaddr_un event_json;
struct sockaddr_un event_serv;
} sockaddr;
socklen_t peer_addr_len;
enum sock_type stype;
int server_fd;
if (eventfd == json_sockfd)
{
peer_addr_len = sizeof(sockaddr.event_json);
stype = JSON_SOCK;
server_fd = json_sockfd;
}
else if (eventfd == serv_sockfd)
{
peer_addr_len = sizeof(sockaddr.event_serv);
stype = SERV_SOCK;
server_fd = serv_sockfd;
}
else
{
return 1;
}
struct remote_desc * current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len);
if (current == NULL)
{
return 1;
}
char const * sock_type = NULL;
switch (current->sock_type)
{
case JSON_SOCK:
sock_type = "collector";
current->event_json.json_bytes = 0;
syslog(LOG_DAEMON, "New collector connection");
break;
case SERV_SOCK:
sock_type = "distributor";
if (inet_ntop(current->event_serv.peer.sin_family,
&current->event_serv.peer.sin_addr,
&current->event_serv.peer_addr[0],
sizeof(current->event_serv.peer_addr)) == NULL)
{
if (errno == EAFNOSUPPORT)
{
syslog(LOG_DAEMON | LOG_ERR, "New distributor connection.");
}
else
{
syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno));
}
current->event_serv.peer_addr[0] = '\0';
}
else
{
syslog(LOG_DAEMON,
"New distributor connection from %.*s:%u",
(int)sizeof(current->event_serv.peer_addr),
current->event_serv.peer_addr,
ntohs(current->event_serv.peer.sin_port));
}
break;
}
/* nonblocking fd is mandatory */
if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error setting %s fd flags: %s", sock_type, strerror(errno));
disconnect_client(epollfd, current);
return 1;
}
/* shutdown writing end for collector clients */
if (current->sock_type == JSON_SOCK)
{
shutdown(current->fd, SHUT_WR); // collector
/* setup epoll event */
if (add_event(epollfd, current->fd, current) != 0)
{
disconnect_client(epollfd, current);
return 1;
}
}
else
{
shutdown(current->fd, SHUT_RD); // distributor
}
return 0;
}
static int handle_collector_protocol(int epollfd, struct remote_desc * const current)
{
char * json_str_start = NULL;
if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
{
syslog(LOG_DAEMON | LOG_ERR,
"BUG: JSON invalid opening character: '%c'",
current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
disconnect_client(epollfd, current);
return 1;
}
errno = 0;
current->event_json.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10);
current->event_json.json_bytes += json_str_start - current->buf.ptr.text;
if (errno == ERANGE)
{
syslog(LOG_DAEMON | LOG_ERR, "BUG: Size of JSON exceeds limit");
disconnect_client(epollfd, current);
return 1;
}
if (json_str_start == current->buf.ptr.text)
{
syslog(LOG_DAEMON | LOG_ERR,
"BUG: Missing size before JSON string: \"%.*s\"",
NETWORK_BUFFER_LENGTH_DIGITS,
current->buf.ptr.text);
disconnect_client(epollfd, current);
return 1;
}
if (current->event_json.json_bytes > current->buf.max)
{
syslog(LOG_DAEMON | LOG_ERR,
"BUG: JSON string too big: %llu > %zu",
current->event_json.json_bytes,
current->buf.max);
disconnect_client(epollfd, current);
return 1;
}
if (current->event_json.json_bytes > current->buf.used)
{
return 1;
}
if (current->buf.ptr.text[current->event_json.json_bytes - 2] != '}' ||
current->buf.ptr.text[current->event_json.json_bytes - 1] != '\n')
{
syslog(LOG_DAEMON | LOG_ERR,
"BUG: Invalid JSON string: %.*s",
(int)current->event_json.json_bytes,
current->buf.ptr.text);
disconnect_client(epollfd, current);
return 1;
}
return 0;
}
static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
if (current->sock_type != JSON_SOCK)
{
return 0;
}
/* read JSON strings (or parts) from the UNIX socket (collecting) */
if (current->buf.used == current->buf.max)
{
syslog(LOG_DAEMON, "Collector read buffer full. No more read possible.");
}
else
{
errno = 0;
ssize_t bytes_read =
read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used);
if (bytes_read < 0 || errno != 0)
{
disconnect_client(epollfd, current);
return 1;
}
if (bytes_read == 0)
{
syslog(LOG_DAEMON, "Collector connection closed during read");
disconnect_client(epollfd, current);
return 1;
}
current->buf.used += bytes_read;
}
while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
{
if (handle_collector_protocol(epollfd, current) != 0)
{
break;
}
for (size_t i = 0; i < remotes.desc_size; ++i)
{
if (remotes.desc[i].fd < 0)
{
continue;
}
if (remotes.desc[i].sock_type != SERV_SOCK)
{
continue;
}
if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
{
syslog(LOG_DAEMON | LOG_ERR,
"Buffer capacity threshold (%zu of max %zu bytes) reached, "
"falling back to blocking mode.",
remotes.desc[i].buf.used,
remotes.desc[i].buf.max);
/*
* FIXME: Maybe switch to a Multithreading distributor data transmission,
* so that we do not have to switch back to blocking mode here!
* NOTE: If *one* distributer peer is too slow, all other distributors are
* affected by this. This causes starvation and leads to a possible data loss on
* the nDPId collector side.
*/
if (fcntl_del_flags(remotes.desc[i].fd, O_NONBLOCK) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) !=
(ssize_t)remotes.desc[i].buf.used)
{
syslog(LOG_DAEMON | LOG_ERR,
"Could not drain buffer by %zu bytes. (forced)",
remotes.desc[i].buf.used);
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
remotes.desc[i].buf.used = 0;
if (fcntl_add_flags(remotes.desc[i].fd, O_NONBLOCK) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
}
memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used,
current->buf.ptr.raw,
current->event_json.json_bytes);
remotes.desc[i].buf.used += current->event_json.json_bytes;
errno = 0;
ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used);
if (errno == EAGAIN)
{
continue;
}
if (bytes_written < 0 || errno != 0)
{
if (remotes.desc[i].event_serv.peer_addr[0] == '\0')
{
syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno));
}
else
{
syslog(LOG_DAEMON | LOG_ERR,
"Distributor connection to %.*s:%u closed, send failed: %s",
(int)sizeof(remotes.desc[i].event_serv.peer_addr),
remotes.desc[i].event_serv.peer_addr,
ntohs(remotes.desc[i].event_serv.peer.sin_port),
strerror(errno));
}
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
if (bytes_written == 0)
{
syslog(LOG_DAEMON,
"Distributor connection to %.*s:%u closed during write",
(int)sizeof(remotes.desc[i].event_serv.peer_addr),
remotes.desc[i].event_serv.peer_addr,
ntohs(remotes.desc[i].event_serv.peer.sin_port));
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
if ((size_t)bytes_written < remotes.desc[i].buf.used)
{
syslog(LOG_DAEMON,
"Distributor wrote less than expected to %.*s:%u: %zd < %zu",
(int)sizeof(remotes.desc[i].event_serv.peer_addr),
remotes.desc[i].event_serv.peer_addr,
ntohs(remotes.desc[i].event_serv.peer.sin_port),
bytes_written,
remotes.desc[i].buf.used);
memmove(remotes.desc[i].buf.ptr.raw,
remotes.desc[i].buf.ptr.raw + bytes_written,
remotes.desc[i].buf.used - bytes_written);
remotes.desc[i].buf.used -= bytes_written;
continue;
}
remotes.desc[i].buf.used = 0;
}
memmove(current->buf.ptr.raw,
current->buf.ptr.raw + current->event_json.json_bytes,
current->buf.used - current->event_json.json_bytes);
current->buf.used -= current->event_json.json_bytes;
current->event_json.json_bytes = 0;
}
return 0;
}
static int handle_incoming_data_event(int epollfd, struct epoll_event * const event)
{
struct remote_desc * current = (struct remote_desc *)event->data.ptr;
if ((event->events & EPOLLIN) == 0)
{
return 1;
}
if (current == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "%s", "remote descriptor got from event data invalid");
return 1;
}
if (current->fd < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
return 1;
}
return handle_incoming_data(epollfd, current);
}
static int setup_signalfd(int epollfd)
{
sigset_t mask;
int sfd;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGTERM);
sigaddset(&mask, SIGQUIT);
if (sigprocmask(SIG_BLOCK, &mask, NULL) == -1)
{
return -1;
}
sfd = signalfd(-1, &mask, 0);
if (sfd == -1)
{
return -1;
}
if (add_event(epollfd, sfd, NULL) != 0)
{
return -1;
}
if (fcntl_add_flags(sfd, O_NONBLOCK) != 0)
{
return -1;
}
return sfd;
}
static int mainloop(int epollfd)
{
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
int signalfd = setup_signalfd(epollfd);
while (nDPIsrvd_main_thread_shutdown == 0)
{
int nready = epoll_wait(epollfd, events, events_size, -1);
for (int i = 0; i < nready; i++)
{
if (events[i].events & EPOLLERR)
{
syslog(LOG_DAEMON | LOG_ERR,
"Epoll event error: %s",
(errno != 0 ? strerror(errno) : "Client disconnected"));
if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd)
{
struct remote_desc * current = (struct remote_desc *)events[i].data.ptr;
disconnect_client(epollfd, current);
}
continue;
}
if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd)
{
/* New connection to collector / distributor. */
if (new_connection(epollfd, events[i].data.fd) != 0)
{
continue;
}
}
else if (events[i].data.fd == signalfd)
{
struct signalfd_siginfo fdsi;
ssize_t s;
s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo));
if (s != sizeof(struct signalfd_siginfo))
{
syslog(LOG_DAEMON | LOG_ERR,
"Invalid signal fd read size. Got %zd, wanted %zu bytes.",
s,
sizeof(struct signalfd_siginfo));
continue;
}
if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT)
{
nDPIsrvd_main_thread_shutdown = 1;
break;
}
}
else
{
/* Incoming data. */
if (handle_incoming_data_event(epollfd, &events[i]) != 0)
{
continue;
}
}
}
}
close(signalfd);
return 0;
}
static int create_evq(void)
{
return epoll_create1(EPOLL_CLOEXEC);
}
static int setup_event_queue(void)
{
int epollfd = create_evq();
if (epollfd < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error creating epoll: %s", strerror(errno));
return -1;
}
if (add_event(epollfd, json_sockfd, NULL) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error adding JSON fd to epoll: %s", strerror(errno));
return -1;
}
if (add_event(epollfd, serv_sockfd, NULL) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error adding SERV fd to epoll: %s", strerror(errno));
return -1;
}
return epollfd;
}
static void close_event_queue(int epollfd)
{
for (size_t i = 0; i < remotes.desc_size; ++i)
{
disconnect_client(epollfd, &remotes.desc[i]);
}
close(epollfd);
}
static int setup_remote_descriptors(size_t max_descriptors)
{
remotes.desc_used = 0;
remotes.desc_size = max_descriptors;
remotes.desc = (struct remote_desc *)calloc(remotes.desc_size, sizeof(*remotes.desc));
if (remotes.desc == NULL)
{
return -1;
}
for (size_t i = 0; i < remotes.desc_size; ++i)
{
remotes.desc[i].fd = -1;
}
return 0;
}
#ifndef NO_MAIN
int main(int argc, char ** argv)
{
int retval = 1;
int epollfd;
if (argc == 0)
{
return 1;
}
if (nDPIsrvd_parse_options(argc, argv) != 0)
{
return 1;
}
openlog("nDPIsrvd", LOG_CONS | LOG_PERROR, LOG_DAEMON);
if (access(nDPIsrvd_options.json_sockpath, F_OK) == 0)
{
syslog(LOG_DAEMON | LOG_ERR,
"UNIX socket %s exists; nDPIsrvd already running? "
"Please remove the socket manually or change socket path.",
nDPIsrvd_options.json_sockpath);
return 1;
}
if (daemonize_with_pidfile(nDPIsrvd_options.pidfile) != 0)
{
goto error;
}
closelog();
openlog("nDPIsrvd", LOG_CONS | (nDPIsrvd_options.log_to_stderr != 0 ? LOG_PERROR : 0), LOG_DAEMON);
if (setup_remote_descriptors(32) != 0)
{
goto error;
}
if (create_listen_sockets() != 0)
{
goto error;
}
syslog(LOG_DAEMON, "collector listen on %s", nDPIsrvd_options.json_sockpath);
syslog(LOG_DAEMON, "distributor listen on %s", nDPIsrvd_options.serv_optarg);
switch (serv_address.raw.sa_family)
{
default:
goto error;
case AF_INET:
case AF_INET6:
syslog(LOG_DAEMON | LOG_ERR,
"Please keep in mind that using a TCP Socket may leak sensitive information to "
"everyone with access to the device/network. You've been warned!");
break;
case AF_UNIX:
break;
}
errno = 0;
if (change_user_group(nDPIsrvd_options.user,
nDPIsrvd_options.group,
nDPIsrvd_options.pidfile,
nDPIsrvd_options.json_sockpath,
(serv_address.raw.sa_family == AF_UNIX ? nDPIsrvd_options.serv_optarg : NULL)) != 0)
{
if (errno != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Change user/group failed: %s", strerror(errno));
}
else
{
syslog(LOG_DAEMON | LOG_ERR, "Change user/group failed.");
}
goto error;
}
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
signal(SIGQUIT, SIG_IGN);
epollfd = setup_event_queue();
if (epollfd < 0)
{
goto error;
}
retval = mainloop(epollfd);
close_event_queue(epollfd);
error:
close(json_sockfd);
close(serv_sockfd);
daemonize_shutdown(nDPIsrvd_options.pidfile);
syslog(LOG_DAEMON | LOG_NOTICE, "Bye.");
closelog();
unlink(nDPIsrvd_options.json_sockpath);
unlink(nDPIsrvd_options.serv_optarg);
return retval;
}
#endif