nDPId daemonize / pidfile support + improved syslog logging

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2020-08-05 12:02:28 +02:00
parent ea636f4ab6
commit 88aa768184
2 changed files with 108 additions and 40 deletions

View File

@@ -9,6 +9,7 @@
#define NETWORK_BUFFER_MAX_SIZE 8192
/* nDPId default config options */
#define nDPId_PIDFILE "/tmp/ndpid.pid"
#define nDPId_MAX_FLOW_ROOTS_PER_THREAD 2048
#define nDPId_MAX_IDLE_FLOWS_PER_THREAD 64
#define nDPId_TICK_RESOLUTION 1000

147
nDPId.c
View File

@@ -198,7 +198,9 @@ static int main_thread_shutdown = 0;
static uint32_t global_flow_id = 0;
static char * pcap_file_or_interface = NULL;
static int daemonize = 0;
static int log_to_stderr = 0;
static char pidfile[UNIX_PATH_MAX] = nDPId_PIDFILE;
static char json_sockpath[UNIX_PATH_MAX] = COLLECTOR_UNIX_SOCKET;
static void free_workflow(struct nDPId_workflow ** const workflow);
@@ -229,9 +231,7 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
if (workflow->pcap_handle == NULL)
{
syslog(LOG_DAEMON | LOG_ERR,
"pcap_open_live / pcap_open_offline_with_tstamp_precision: %s\n",
pcap_error_buffer);
syslog(LOG_DAEMON | LOG_ERR, "pcap_open_live / pcap_open_offline_with_tstamp_precision: %s", pcap_error_buffer);
free_workflow(&workflow);
return NULL;
}
@@ -267,7 +267,8 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
ndpi_set_protocol_detection_bitmask2(workflow->ndpi_struct, &protos);
ndpi_finalize_initalization(workflow->ndpi_struct);
if (ndpi_init_serializer_ll(&workflow->ndpi_serializer, ndpi_serialization_format_json, NETWORK_BUFFER_MAX_SIZE) != 1)
if (ndpi_init_serializer_ll(&workflow->ndpi_serializer, ndpi_serialization_format_json, NETWORK_BUFFER_MAX_SIZE) !=
1)
{
return NULL;
}
@@ -330,7 +331,7 @@ static int setup_reader_threads(char const * const file_or_device)
file_or_default_device = pcap_lookupdev(pcap_error_buffer);
if (file_or_default_device == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "pcap_lookupdev: %s\n", pcap_error_buffer);
syslog(LOG_DAEMON | LOG_ERR, "pcap_lookupdev: %s", pcap_error_buffer);
return 1;
}
}
@@ -610,7 +611,7 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl
if (jsonize_l3_l4_dpi(workflow, flow) != 0)
{
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %4u] flow2json/dpi2json failed\n",
"[%8llu, %4u] flow2json/dpi2json failed",
workflow->packets_captured,
flow->flow_id);
}
@@ -673,13 +674,15 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
int s_ret;
char newline_json_str[NETWORK_BUFFER_MAX_SIZE];
s_ret =
snprintf(newline_json_str, sizeof(newline_json_str), "%zu%.*s", json_str_len, (int)json_str_len, json_str);
s_ret = snprintf(newline_json_str, sizeof(newline_json_str), "%zu%.*s", json_str_len, (int)json_str_len, json_str);
if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str))
{
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d] JSON buffer prepare failed: snprintf returned %d, buffer size %zu",
workflow->packets_captured, reader_thread->array_index, s_ret, sizeof(newline_json_str));
workflow->packets_captured,
reader_thread->array_index,
s_ret,
sizeof(newline_json_str));
return;
}
@@ -724,7 +727,7 @@ static void serialize_and_send(struct nDPId_reader_thread * const reader_thread)
{
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d] jsonize failed, buffer length: %u\n",
"[%8llu, %d] jsonize failed, buffer length: %u",
reader_thread->workflow->packets_captured,
reader_thread->array_index,
json_str_len);
@@ -1600,7 +1603,7 @@ static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread
{
syslog(LOG_DAEMON | LOG_ERR,
"Error while reading pcap file: '%s'\n",
"Error while reading pcap file: '%s'",
pcap_geterr(reader_thread->workflow->pcap_handle));
reader_thread->workflow->error_or_eof = 1;
}
@@ -1644,6 +1647,28 @@ static int processing_threads_error_or_eof(void)
return 1;
}
static int create_pidfile(char const * const pidfile)
{
int pfd;
pfd = open(pidfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (pfd < 0)
{
return 1;
}
if (dprintf(pfd, "%d", getpid()) <= 0)
{
close(pfd);
return 1;
}
close(pfd);
return 0;
}
static int start_reader_threads(void)
{
sigset_t thread_signal_set, old_signal_set;
@@ -1653,10 +1678,23 @@ static int start_reader_threads(void)
sigdelset(&thread_signal_set, SIGTERM);
if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "pthread_sigmask: %s\n", strerror(errno));
syslog(LOG_DAEMON | LOG_ERR, "pthread_sigmask: %s", strerror(errno));
return 1;
}
if (daemonize != 0)
{
if (daemon(0, 0) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "daemon: %s", strerror(errno));
return 1;
}
}
if (create_pidfile(pidfile) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "create pidfile: %s", strerror(errno));
return 1;
}
openlog("nDPId", LOG_CONS | (log_to_stderr != 0 ? LOG_PERROR : 0), LOG_DAEMON);
for (int i = 0; i < reader_thread_count; ++i)
@@ -1671,14 +1709,14 @@ static int start_reader_threads(void)
if (pthread_create(&reader_threads[i].thread_id, NULL, processing_thread, &reader_threads[i]) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "pthread_create: %s\n", strerror(errno));
syslog(LOG_DAEMON | LOG_ERR, "pthread_create: %s", strerror(errno));
return 1;
}
}
if (pthread_sigmask(SIG_BLOCK, &old_signal_set, NULL) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "pthread_sigmask: %s\n", strerror(errno));
syslog(LOG_DAEMON | LOG_ERR, "pthread_sigmask: %s", strerror(errno));
return 1;
}
@@ -1698,7 +1736,10 @@ static int stop_reader_threads(void)
break_pcap_loop(&reader_threads[i]);
}
printf("------------------------------------ Stopping reader threads\n");
if (daemonize == 0)
{
printf("------------------------------------ Stopping reader threads\n");
}
for (int i = 0; i < reader_thread_count; ++i)
{
if (reader_threads[i].workflow == NULL)
@@ -1708,11 +1749,14 @@ static int stop_reader_threads(void)
if (pthread_join(reader_threads[i].thread_id, NULL) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "pthread_join: %s\n", strerror(errno));
syslog(LOG_DAEMON | LOG_ERR, "pthread_join: %s", strerror(errno));
}
}
printf("------------------------------------ Results\n");
if (daemonize == 0)
{
printf("------------------------------------ Results\n");
}
for (int i = 0; i < reader_thread_count; ++i)
{
if (reader_threads[i].workflow == NULL)
@@ -1726,23 +1770,29 @@ static int stop_reader_threads(void)
total_flows_idle += reader_threads[i].workflow->total_idle_flows;
total_flows_detected += reader_threads[i].workflow->detected_flow_protocols;
printf(
"Stopping Thread %d, processed %10llu packets, %12llu bytes, total flows: %8llu, "
"idle flows: %8llu, detected flows: %8llu\n",
reader_threads[i].array_index,
reader_threads[i].workflow->packets_processed,
reader_threads[i].workflow->total_l4_data_len,
reader_threads[i].workflow->total_active_flows,
reader_threads[i].workflow->total_idle_flows,
reader_threads[i].workflow->detected_flow_protocols);
if (daemonize == 0)
{
printf(
"Stopping Thread %d, processed %10llu packets, %12llu bytes, total flows: %8llu, "
"idle flows: %8llu, detected flows: %8llu\n",
reader_threads[i].array_index,
reader_threads[i].workflow->packets_processed,
reader_threads[i].workflow->total_l4_data_len,
reader_threads[i].workflow->total_active_flows,
reader_threads[i].workflow->total_idle_flows,
reader_threads[i].workflow->detected_flow_protocols);
}
}
/* total packets captured: same value for all threads as packet2thread distribution happens later */
printf("Total packets captured.: %llu\n", reader_threads[0].workflow->packets_captured);
printf("Total packets processed: %llu\n", total_packets_processed);
printf("Total layer4 data size.: %llu\n", total_l4_data_len);
printf("Total flows captured...: %llu\n", total_flows_captured);
printf("Total flows timed out..: %llu\n", total_flows_idle);
printf("Total flows detected...: %llu\n", total_flows_detected);
if (daemonize == 0)
{
printf("Total packets captured.: %llu\n", reader_threads[0].workflow->packets_captured);
printf("Total packets processed: %llu\n", total_packets_processed);
printf("Total layer4 data size.: %llu\n", total_l4_data_len);
printf("Total flows captured...: %llu\n", total_flows_captured);
printf("Total flows timed out..: %llu\n", total_flows_idle);
printf("Total flows detected...: %llu\n", total_flows_detected);
}
return 0;
}
@@ -1762,20 +1812,21 @@ static void free_reader_threads(void)
static void sighandler(int signum)
{
syslog(LOG_DAEMON | LOG_NOTICE, "Received SIGNAL %d\n", signum);
syslog(LOG_DAEMON | LOG_NOTICE, "Received SIGNAL %d", signum);
if (main_thread_shutdown == 0)
{
syslog(LOG_DAEMON | LOG_NOTICE, "Stopping reader threads.");
main_thread_shutdown = 1;
if (stop_reader_threads() != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Failed to stop reader threads!\n");
syslog(LOG_DAEMON | LOG_ERR, "Failed to stop reader threads!");
exit(EXIT_FAILURE);
}
}
else
{
syslog(LOG_DAEMON | LOG_NOTICE, "Reader threads are already shutting down, please be patient.\n");
syslog(LOG_DAEMON | LOG_NOTICE, "Reader threads are already shutting down, please be patient.");
}
}
@@ -1783,7 +1834,7 @@ static int parse_options(int argc, char ** argv)
{
int opt;
while ((opt = getopt(argc, argv, "hi:lc:")) != -1)
while ((opt = getopt(argc, argv, "hi:lc:dp:")) != -1)
{
switch (opt)
{
@@ -1797,12 +1848,27 @@ static int parse_options(int argc, char ** argv)
strncpy(json_sockpath, optarg, sizeof(json_sockpath) - 1);
json_sockpath[sizeof(json_sockpath) - 1] = '\0';
break;
case 'd':
daemonize = 1;
break;
case 'p':
strncpy(pidfile, optarg, sizeof(pidfile) - 1);
pidfile[sizeof(pidfile) - 1] = '\0';
break;
default:
fprintf(stderr, "Usage: %s [-i pcap-file/interface ] [-l] [-c path-to-unix-sock]\n", argv[0]);
fprintf(stderr,
"Usage: %s [-i pcap-file/interface ] [-l] [-c path-to-unix-sock] [-d] [-p pidfile]\n",
argv[0]);
return 1;
}
}
if (log_to_stderr != 0 && daemonize != 0)
{
fprintf(stderr, "%s: Using -l and -d does not make sense.\n", argv[0]);
return 1;
}
return 0;
}
@@ -1832,13 +1898,13 @@ int main(int argc, char ** argv)
if (setup_reader_threads(pcap_file_or_interface) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "%s: setup_reader_threads failed\n", argv[0]);
syslog(LOG_DAEMON | LOG_ERR, "%s: setup_reader_threads failed", argv[0]);
return 1;
}
if (start_reader_threads() != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "%s: start_reader_threads\n", argv[0]);
syslog(LOG_DAEMON | LOG_ERR, "%s: start_reader_threads", argv[0]);
return 1;
}
@@ -1851,11 +1917,12 @@ int main(int argc, char ** argv)
if (main_thread_shutdown == 0 && stop_reader_threads() != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "%s: stop_reader_threads\n", argv[0]);
syslog(LOG_DAEMON | LOG_ERR, "%s: stop_reader_threads", argv[0]);
return 1;
}
free_reader_threads();
syslog(LOG_DAEMON | LOG_NOTICE, "Bye.");
closelog();
return 0;