mirror of
https://github.com/optim-enterprises-bv/nDPId.git
synced 2025-10-30 01:42:22 +00:00
nDPId-test: Reworked I/O handling to prevent some endless loop scenarios. Fixed a race condition in the memory wrapper as well.
* nDPId: Instead of sending too long JSON strings, log an error and some parts. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
281
nDPId-test.c
281
nDPId-test.c
@@ -7,7 +7,7 @@ static void nDPIsrvd_memprof_log(char const * const format, ...);
|
|||||||
static void nDPIsrvd_memprof_log_alloc(size_t alloc_size);
|
static void nDPIsrvd_memprof_log_alloc(size_t alloc_size);
|
||||||
static void nDPIsrvd_memprof_log_free(size_t free_size);
|
static void nDPIsrvd_memprof_log_free(size_t free_size);
|
||||||
|
|
||||||
//#define DO_MEMORY_LOGGING 1
|
//#define VERBOSE_MEMORY_PROFILING 1
|
||||||
#define NO_MAIN 1
|
#define NO_MAIN 1
|
||||||
#include "utils.c"
|
#include "utils.c"
|
||||||
#include "nDPIsrvd.c"
|
#include "nDPIsrvd.c"
|
||||||
@@ -99,6 +99,8 @@ struct distributor_global_user_data
|
|||||||
unsigned long long int flow_detection_update_count;
|
unsigned long long int flow_detection_update_count;
|
||||||
unsigned long long int flow_update_count;
|
unsigned long long int flow_update_count;
|
||||||
|
|
||||||
|
unsigned long long int shutdown_events;
|
||||||
|
|
||||||
unsigned long long int json_string_len_min;
|
unsigned long long int json_string_len_min;
|
||||||
unsigned long long int json_string_len_max;
|
unsigned long long int json_string_len_max;
|
||||||
double json_string_len_avg;
|
double json_string_len_avg;
|
||||||
@@ -132,17 +134,30 @@ struct distributor_return_value
|
|||||||
struct distributor_global_user_data stats;
|
struct distributor_global_user_data stats;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define TC_INIT(initial, wanted) \
|
||||||
|
{ \
|
||||||
|
.mutex = PTHREAD_MUTEX_INITIALIZER, .condition = PTHREAD_COND_INITIALIZER, .value = initial, \
|
||||||
|
.wanted_value = wanted \
|
||||||
|
}
|
||||||
|
struct thread_condition
|
||||||
|
{
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
pthread_cond_t condition;
|
||||||
|
int value;
|
||||||
|
int wanted_value;
|
||||||
|
};
|
||||||
|
|
||||||
static int mock_pipefds[PIPE_FDS] = {};
|
static int mock_pipefds[PIPE_FDS] = {};
|
||||||
static int mock_testfds[PIPE_FDS] = {};
|
static int mock_testfds[PIPE_FDS] = {};
|
||||||
static int mock_bufffds[PIPE_FDS] = {};
|
static int mock_bufffds[PIPE_FDS] = {};
|
||||||
static int mock_nullfds[PIPE_FDS] = {};
|
static int mock_nullfds[PIPE_FDS] = {};
|
||||||
static int mock_arpafds[PIPE_FDS] = {};
|
static int mock_arpafds[PIPE_FDS] = {};
|
||||||
static pthread_mutex_t nDPId_start_mutex = PTHREAD_MUTEX_INITIALIZER;
|
static struct thread_condition start_condition = TC_INIT(3, 0);
|
||||||
static pthread_mutex_t nDPIsrvd_start_mutex = PTHREAD_MUTEX_INITIALIZER;
|
#ifdef VERBOSE_MEMORY_PROFILING
|
||||||
static pthread_mutex_t distributor_start_mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
||||||
#ifdef DO_MEMORY_LOGGING
|
|
||||||
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
#endif
|
#endif
|
||||||
|
static pthread_mutex_t mem_mutex = PTHREAD_MUTEX_INITIALIZER; // required; memory wrappers are used from two threads
|
||||||
|
// (distributor and nDPIsrvd)
|
||||||
static unsigned long long int nDPIsrvd_alloc_count = 0;
|
static unsigned long long int nDPIsrvd_alloc_count = 0;
|
||||||
static unsigned long long int nDPIsrvd_alloc_bytes = 0;
|
static unsigned long long int nDPIsrvd_alloc_bytes = 0;
|
||||||
static unsigned long long int nDPIsrvd_free_count = 0;
|
static unsigned long long int nDPIsrvd_free_count = 0;
|
||||||
@@ -162,7 +177,7 @@ static unsigned long long int nDPIsrvd_free_bytes = 0;
|
|||||||
|
|
||||||
static void nDPIsrvd_memprof_log(char const * const format, ...)
|
static void nDPIsrvd_memprof_log(char const * const format, ...)
|
||||||
{
|
{
|
||||||
#ifdef DO_MEMORY_LOGGING
|
#ifdef VERBOSE_MEMORY_PROFILING
|
||||||
int logbuf_used, logbuf_used_tmp;
|
int logbuf_used, logbuf_used_tmp;
|
||||||
char logbuf[BUFSIZ];
|
char logbuf[BUFSIZ];
|
||||||
va_list ap;
|
va_list ap;
|
||||||
@@ -188,16 +203,60 @@ static void nDPIsrvd_memprof_log(char const * const format, ...)
|
|||||||
|
|
||||||
void nDPIsrvd_memprof_log_alloc(size_t alloc_size)
|
void nDPIsrvd_memprof_log_alloc(size_t alloc_size)
|
||||||
{
|
{
|
||||||
|
unsigned long alloc_count;
|
||||||
|
|
||||||
|
// nDPIsrvd.h is used by client applications and nDPIsrvd (two threads!)
|
||||||
|
pthread_mutex_lock(&mem_mutex);
|
||||||
nDPIsrvd_alloc_count++;
|
nDPIsrvd_alloc_count++;
|
||||||
nDPIsrvd_alloc_bytes += alloc_size;
|
nDPIsrvd_alloc_bytes += alloc_size;
|
||||||
// nDPIsrvd_memprof_log("nDPIsrvd.h: malloc #%llu, %llu bytes", nDPIsrvd_alloc_count, alloc_size);
|
alloc_count = nDPIsrvd_alloc_count;
|
||||||
|
pthread_mutex_unlock(&mem_mutex);
|
||||||
|
nDPIsrvd_memprof_log("nDPIsrvd.h: malloc #%llu, %llu bytes", alloc_count, alloc_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nDPIsrvd_memprof_log_free(size_t free_size)
|
void nDPIsrvd_memprof_log_free(size_t free_size)
|
||||||
{
|
{
|
||||||
|
unsigned long free_count;
|
||||||
|
|
||||||
|
// nDPIsrvd.h is used by client applications and nDPIsrvd (two threads!)
|
||||||
|
pthread_mutex_lock(&mem_mutex);
|
||||||
nDPIsrvd_free_count++;
|
nDPIsrvd_free_count++;
|
||||||
nDPIsrvd_free_bytes += free_size;
|
nDPIsrvd_free_bytes += free_size;
|
||||||
// nDPIsrvd_memprof_log("nDPIsrvd.h: free #%llu, %llu bytes", nDPIsrvd_free_count, free_size);
|
free_count = nDPIsrvd_free_count;
|
||||||
|
pthread_mutex_unlock(&mem_mutex);
|
||||||
|
nDPIsrvd_memprof_log("nDPIsrvd.h: free #%llu, %llu bytes", free_count, free_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int thread_wait(struct thread_condition * const tc)
|
||||||
|
{
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
|
ret |= (pthread_mutex_lock(&tc->mutex) << 16);
|
||||||
|
while (tc->value > tc->wanted_value)
|
||||||
|
{
|
||||||
|
ret |= (pthread_cond_wait(&tc->condition, &tc->mutex) << 8);
|
||||||
|
}
|
||||||
|
ret |= (pthread_mutex_unlock(&tc->mutex));
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int thread_signal(struct thread_condition * const tc)
|
||||||
|
{
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
|
ret |= (pthread_mutex_lock(&tc->mutex) << 16);
|
||||||
|
if (tc->value > tc->wanted_value)
|
||||||
|
{
|
||||||
|
tc->value--;
|
||||||
|
}
|
||||||
|
if (tc->value == tc->wanted_value)
|
||||||
|
{
|
||||||
|
ret |= (pthread_cond_broadcast(&tc->condition) << 8);
|
||||||
|
}
|
||||||
|
ret |= (pthread_mutex_unlock(&tc->mutex));
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int setup_pipe(int pipefd[PIPE_FDS])
|
static int setup_pipe(int pipefd[PIPE_FDS])
|
||||||
@@ -207,12 +266,23 @@ static int setup_pipe(int pipefd[PIPE_FDS])
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (fcntl_add_flags(pipefd[0], O_NONBLOCK) != 0)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fcntl_add_flags(pipefd[1], O_NONBLOCK) != 0)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void * nDPIsrvd_mainloop_thread(void * const arg)
|
static void * nDPIsrvd_mainloop_thread(void * const arg)
|
||||||
{
|
{
|
||||||
int nDPIsrvd_shutdown = 0;
|
int nDPIsrvd_distributor_disconnects = 0;
|
||||||
|
int const nDPIsrvd_distributor_expected_disconnects = 5;
|
||||||
int epollfd;
|
int epollfd;
|
||||||
struct remote_desc * mock_json_desc = NULL;
|
struct remote_desc * mock_json_desc = NULL;
|
||||||
struct remote_desc * mock_test_desc = NULL;
|
struct remote_desc * mock_test_desc = NULL;
|
||||||
@@ -278,9 +348,10 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
|
|||||||
THREAD_ERROR_GOTO(arg);
|
THREAD_ERROR_GOTO(arg);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&nDPIsrvd_start_mutex);
|
thread_signal(&start_condition);
|
||||||
|
thread_wait(&start_condition);
|
||||||
|
|
||||||
while (nDPIsrvd_shutdown == 0)
|
while (nDPIsrvd_distributor_disconnects < nDPIsrvd_distributor_expected_disconnects)
|
||||||
{
|
{
|
||||||
errno = 0;
|
errno = 0;
|
||||||
int nready = epoll_wait(epollfd, events, events_size, -1);
|
int nready = epoll_wait(epollfd, events, events_size, -1);
|
||||||
@@ -303,6 +374,16 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
|
|||||||
if (remote == mock_json_desc)
|
if (remote == mock_json_desc)
|
||||||
{
|
{
|
||||||
remote_desc_name = "Mock JSON";
|
remote_desc_name = "Mock JSON";
|
||||||
|
do {
|
||||||
|
if (mock_test_desc->fd >= 0)
|
||||||
|
drain_write_buffers_blocking(mock_test_desc);
|
||||||
|
if (mock_buff_desc->fd >= 0)
|
||||||
|
drain_write_buffers_blocking(mock_buff_desc);
|
||||||
|
if (mock_null_desc->fd >= 0)
|
||||||
|
drain_write_buffers_blocking(mock_null_desc);
|
||||||
|
if (mock_arpa_desc->fd >= 0)
|
||||||
|
drain_write_buffers_blocking(mock_arpa_desc);
|
||||||
|
} while (handle_data_event(epollfd, &events[i]) == 0);
|
||||||
}
|
}
|
||||||
else if (remote == mock_test_desc)
|
else if (remote == mock_test_desc)
|
||||||
{
|
{
|
||||||
@@ -324,14 +405,21 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
|
|||||||
{
|
{
|
||||||
remote_desc_name = "UNKNOWN";
|
remote_desc_name = "UNKNOWN";
|
||||||
}
|
}
|
||||||
logger(1, "nDPIsrvd distributor '%s' connection closed", remote_desc_name);
|
nDPIsrvd_distributor_disconnects++;
|
||||||
handle_data_event(epollfd, &events[i]);
|
logger(1,
|
||||||
nDPIsrvd_shutdown++;
|
"nDPIsrvd distributor '%s' connection closed (%d/%d)",
|
||||||
|
remote_desc_name,
|
||||||
|
nDPIsrvd_distributor_disconnects,
|
||||||
|
nDPIsrvd_distributor_expected_disconnects);
|
||||||
|
free_remote(epollfd, remote);
|
||||||
}
|
}
|
||||||
else if (handle_data_event(epollfd, &events[i]) != 0)
|
else
|
||||||
{
|
{
|
||||||
logger(1, "nDPIsrvd data event handler failed for distributor %d", events[i].data.fd);
|
if (handle_data_event(epollfd, &events[i]) != 0)
|
||||||
THREAD_ERROR_GOTO(arg);
|
{
|
||||||
|
logger(1, "%s", "nDPIsrvd data event handler failed");
|
||||||
|
THREAD_ERROR_GOTO(arg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -346,27 +434,10 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
error:
|
error:
|
||||||
if (mock_test_desc != NULL)
|
|
||||||
{
|
|
||||||
drain_write_buffers_blocking(mock_test_desc);
|
|
||||||
}
|
|
||||||
if (mock_buff_desc != NULL)
|
|
||||||
{
|
|
||||||
drain_write_buffers_blocking(mock_buff_desc);
|
|
||||||
}
|
|
||||||
if (mock_null_desc != NULL)
|
|
||||||
{
|
|
||||||
drain_write_buffers_blocking(mock_null_desc);
|
|
||||||
}
|
|
||||||
if (mock_arpa_desc != NULL)
|
|
||||||
{
|
|
||||||
drain_write_buffers_blocking(mock_arpa_desc);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_lock(&nDPIsrvd_start_mutex);
|
|
||||||
free_remotes(epollfd);
|
free_remotes(epollfd);
|
||||||
close(epollfd);
|
close(epollfd);
|
||||||
|
|
||||||
|
logger(0, "%s", "nDPIsrvd worker thread exits..");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -504,8 +575,8 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
|||||||
global_stats->total_events_serialized = nmb;
|
global_stats->total_events_serialized = nmb;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&nDPIsrvd_start_mutex);
|
logger(0, "%s", "Distributor received shutdown event..");
|
||||||
pthread_mutex_unlock(&nDPId_start_mutex);
|
global_stats->shutdown_events++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -610,8 +681,7 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
|||||||
|
|
||||||
return CALLBACK_OK;
|
return CALLBACK_OK;
|
||||||
callback_error:
|
callback_error:
|
||||||
pthread_mutex_unlock(&nDPIsrvd_start_mutex);
|
logger(1, "%s", "Distributor error..");
|
||||||
pthread_mutex_unlock(&nDPId_start_mutex);
|
|
||||||
return CALLBACK_ERROR;
|
return CALLBACK_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -719,11 +789,40 @@ static enum nDPIsrvd_callback_return distributor_json_mock_buff_callback(
|
|||||||
return distributor_json_callback(sock, instance, thread_data, flow);
|
return distributor_json_callback(sock, instance, thread_data, flow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static enum nDPIsrvd_callback_return distributor_json_printer(struct nDPIsrvd_socket * const sock,
|
||||||
|
struct nDPIsrvd_instance * const instance,
|
||||||
|
struct nDPIsrvd_thread_data * const thread_data,
|
||||||
|
struct nDPIsrvd_flow * const flow)
|
||||||
|
{
|
||||||
|
(void)instance;
|
||||||
|
(void)thread_data;
|
||||||
|
(void)flow;
|
||||||
|
|
||||||
|
{
|
||||||
|
struct nDPIsrvd_json_token const * const daemon_event_name = TOKEN_GET_SZ(sock, "daemon_event_name");
|
||||||
|
|
||||||
|
if (daemon_event_name != NULL)
|
||||||
|
{
|
||||||
|
if (TOKEN_VALUE_EQUALS_SZ(sock, daemon_event_name, "shutdown") != 0)
|
||||||
|
{
|
||||||
|
logger(0, "%s", "Distributor received shutdown event..");
|
||||||
|
int * const mock_null_shutdown_events = (int *)sock->global_user_data;
|
||||||
|
(*mock_null_shutdown_events)++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("%0" NETWORK_BUFFER_LENGTH_DIGITS_STR "llu%.*s",
|
||||||
|
sock->buffer.json_string_length - NETWORK_BUFFER_LENGTH_DIGITS,
|
||||||
|
nDPIsrvd_json_buffer_length(sock),
|
||||||
|
nDPIsrvd_json_buffer_string(sock));
|
||||||
|
return CALLBACK_OK;
|
||||||
|
}
|
||||||
|
|
||||||
static void * distributor_client_mainloop_thread(void * const arg)
|
static void * distributor_client_mainloop_thread(void * const arg)
|
||||||
{
|
{
|
||||||
int dis_epollfd = create_evq();
|
int dis_epollfd = create_evq();
|
||||||
int signalfd = setup_signalfd(dis_epollfd);
|
int signalfd = setup_signalfd(dis_epollfd);
|
||||||
int pipe_read_finished = 0, buff_read_finished = 0, null_read_finished = 0, arpa_read_finished = 0;
|
|
||||||
struct epoll_event events[32];
|
struct epoll_event events[32];
|
||||||
size_t const events_size = sizeof(events) / sizeof(events[0]);
|
size_t const events_size = sizeof(events) / sizeof(events[0]);
|
||||||
struct distributor_return_value * const drv = (struct distributor_return_value *)arg;
|
struct distributor_return_value * const drv = (struct distributor_return_value *)arg;
|
||||||
@@ -742,17 +841,21 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
distributor_json_mock_buff_callback,
|
distributor_json_mock_buff_callback,
|
||||||
distributor_instance_cleanup_callback,
|
distributor_instance_cleanup_callback,
|
||||||
distributor_flow_cleanup_callback);
|
distributor_flow_cleanup_callback);
|
||||||
|
struct nDPIsrvd_socket * mock_null =
|
||||||
|
nDPIsrvd_socket_init(sizeof(int), 0, 0, 0, distributor_json_printer, NULL, NULL);
|
||||||
struct distributor_global_user_data * sock_stats;
|
struct distributor_global_user_data * sock_stats;
|
||||||
struct distributor_global_user_data * buff_stats;
|
struct distributor_global_user_data * buff_stats;
|
||||||
|
int * mock_null_shutdown_events;
|
||||||
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
if (mock_sock == NULL || mock_buff == NULL)
|
if (mock_sock == NULL || mock_buff == NULL || mock_null == NULL)
|
||||||
{
|
{
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_sock->fd = mock_testfds[PIPE_TEST_READ];
|
mock_sock->fd = mock_testfds[PIPE_TEST_READ];
|
||||||
mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ];
|
mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ];
|
||||||
|
mock_null->fd = mock_nullfds[PIPE_NULL_READ];
|
||||||
|
|
||||||
if (dis_epollfd < 0 || signalfd < 0)
|
if (dis_epollfd < 0 || signalfd < 0)
|
||||||
{
|
{
|
||||||
@@ -789,10 +892,13 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
buff_stats = (struct distributor_global_user_data *)mock_buff->global_user_data;
|
buff_stats = (struct distributor_global_user_data *)mock_buff->global_user_data;
|
||||||
buff_stats->json_string_len_min = (unsigned long long int)-1;
|
buff_stats->json_string_len_min = (unsigned long long int)-1;
|
||||||
buff_stats->options.do_hash_checks = 0;
|
buff_stats->options.do_hash_checks = 0;
|
||||||
|
mock_null_shutdown_events = (int *)mock_null->global_user_data;
|
||||||
|
*mock_null_shutdown_events = 0;
|
||||||
|
|
||||||
pthread_mutex_lock(&distributor_start_mutex);
|
thread_signal(&start_condition);
|
||||||
|
thread_wait(&start_condition);
|
||||||
|
|
||||||
while (pipe_read_finished == 0 || buff_read_finished == 0 || null_read_finished == 0 || arpa_read_finished == 0)
|
while (sock_stats->shutdown_events == 0 || buff_stats->shutdown_events == 0 || *mock_null_shutdown_events == 0)
|
||||||
{
|
{
|
||||||
int nready = epoll_wait(dis_epollfd, events, events_size, -1);
|
int nready = epoll_wait(dis_epollfd, events, events_size, -1);
|
||||||
if (nready < 0 && errno != EINTR)
|
if (nready < 0 && errno != EINTR)
|
||||||
@@ -811,6 +917,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0)
|
if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0)
|
||||||
{
|
{
|
||||||
logger(1, "Distributor disconnected: %d", events[i].data.fd);
|
logger(1, "Distributor disconnected: %d", events[i].data.fd);
|
||||||
|
del_event(dis_epollfd, events[i].data.fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (events[i].data.fd == mock_testfds[PIPE_TEST_READ])
|
if (events[i].data.fd == mock_testfds[PIPE_TEST_READ])
|
||||||
@@ -825,8 +932,6 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
logger(1, "Read and verify fd returned an error: %s", strerror(errno));
|
logger(1, "Read and verify fd returned an error: %s", strerror(errno));
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
case READ_PEER_DISCONNECT:
|
case READ_PEER_DISCONNECT:
|
||||||
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
|
|
||||||
pipe_read_finished = 1;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -862,8 +967,6 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
logger(1, "Read and verify fd returned an error: %s", strerror(errno));
|
logger(1, "Read and verify fd returned an error: %s", strerror(errno));
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
case READ_PEER_DISCONNECT:
|
case READ_PEER_DISCONNECT:
|
||||||
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
|
|
||||||
buff_read_finished = 1;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -889,21 +992,32 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
}
|
}
|
||||||
else if (events[i].data.fd == mock_nullfds[PIPE_NULL_READ])
|
else if (events[i].data.fd == mock_nullfds[PIPE_NULL_READ])
|
||||||
{
|
{
|
||||||
/* Read all data from the pipe, but do nothing else. */
|
switch (nDPIsrvd_read(mock_null))
|
||||||
char buf[NETWORK_BUFFER_MAX_SIZE];
|
|
||||||
ssize_t bytes_read = read(mock_nullfds[PIPE_NULL_READ], buf, sizeof(buf));
|
|
||||||
if (bytes_read < 0)
|
|
||||||
{
|
{
|
||||||
logger(1, "Read and print to stdout fd returned an error: %s", strerror(errno));
|
case READ_OK:
|
||||||
THREAD_ERROR_GOTO(trv);
|
break;
|
||||||
}
|
case READ_LAST_ENUM_VALUE:
|
||||||
if (bytes_read == 0)
|
case READ_ERROR:
|
||||||
{
|
case READ_TIMEOUT:
|
||||||
del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]);
|
logger(1, "Read and verify fd returned an error: %s", strerror(errno));
|
||||||
null_read_finished = 1;
|
THREAD_ERROR_GOTO(trv);
|
||||||
|
case READ_PEER_DISCONNECT:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("%.*s", (int)bytes_read, buf);
|
enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(mock_null);
|
||||||
|
if (parse_ret != PARSE_NEED_MORE_DATA)
|
||||||
|
{
|
||||||
|
logger(1, "JSON parsing failed: %s", nDPIsrvd_enum_to_string(parse_ret));
|
||||||
|
logger(1,
|
||||||
|
"Problematic JSON string (buff sock, start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
||||||
|
mock_null->buffer.json_string_start,
|
||||||
|
mock_null->buffer.json_string_length,
|
||||||
|
mock_null->buffer.buf.used,
|
||||||
|
(int)mock_null->buffer.json_string_length,
|
||||||
|
mock_null->buffer.json_string);
|
||||||
|
THREAD_ERROR_GOTO(trv);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ])
|
else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ])
|
||||||
{
|
{
|
||||||
@@ -914,11 +1028,6 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
logger(1, "Read fd returned an error: %s", strerror(errno));
|
logger(1, "Read fd returned an error: %s", strerror(errno));
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
if (bytes_read == 0)
|
|
||||||
{
|
|
||||||
del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]);
|
|
||||||
arpa_read_finished = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Nothing to do .. ?
|
* Nothing to do .. ?
|
||||||
@@ -963,7 +1072,9 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
{
|
{
|
||||||
HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
|
HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
|
||||||
{
|
{
|
||||||
logger(1, "Active flow found during client distributor shutdown with id: %llu", current_flow->id_as_ull);
|
logger(1,
|
||||||
|
"[Mock Sock] Active flow found during client distributor shutdown with id: %llu",
|
||||||
|
current_flow->id_as_ull);
|
||||||
errno = 0;
|
errno = 0;
|
||||||
THREAD_ERROR(trv);
|
THREAD_ERROR(trv);
|
||||||
}
|
}
|
||||||
@@ -974,7 +1085,9 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
{
|
{
|
||||||
HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
|
HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
|
||||||
{
|
{
|
||||||
logger(1, "Active flow found during client distributor shutdown with id: %llu", current_flow->id_as_ull);
|
logger(1,
|
||||||
|
"[Mock Buff] Active flow found during client distributor shutdown with id: %llu",
|
||||||
|
current_flow->id_as_ull);
|
||||||
errno = 0;
|
errno = 0;
|
||||||
THREAD_ERROR(trv);
|
THREAD_ERROR(trv);
|
||||||
}
|
}
|
||||||
@@ -1002,17 +1115,36 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
}
|
}
|
||||||
drv->stats = *sock_stats;
|
drv->stats = *sock_stats;
|
||||||
|
|
||||||
|
if (sock_stats->shutdown_events != 1 || buff_stats->shutdown_events != 1 || *mock_null_shutdown_events != 1)
|
||||||
|
{
|
||||||
|
logger(1,
|
||||||
|
"Unexpected amount of shutdown events received, expected 1 per nDPIsrvd socket, got (Sock/Buff/NULL): "
|
||||||
|
"%llu/%llu/%d",
|
||||||
|
sock_stats->shutdown_events,
|
||||||
|
buff_stats->shutdown_events,
|
||||||
|
*mock_null_shutdown_events);
|
||||||
|
errno = 0;
|
||||||
|
THREAD_ERROR(trv);
|
||||||
|
}
|
||||||
|
|
||||||
error:
|
error:
|
||||||
del_event(dis_epollfd, signalfd);
|
del_event(dis_epollfd, signalfd);
|
||||||
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
|
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
|
||||||
del_event(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]);
|
del_event(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]);
|
||||||
del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]);
|
del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]);
|
||||||
del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]);
|
del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]);
|
||||||
|
close(mock_testfds[PIPE_TEST_READ]);
|
||||||
|
close(mock_bufffds[PIPE_BUFFER_READ]);
|
||||||
|
close(mock_nullfds[PIPE_NULL_READ]);
|
||||||
|
close(mock_arpafds[PIPE_ARPA_READ]);
|
||||||
close(dis_epollfd);
|
close(dis_epollfd);
|
||||||
close(signalfd);
|
close(signalfd);
|
||||||
|
|
||||||
nDPIsrvd_socket_free(&mock_sock);
|
nDPIsrvd_socket_free(&mock_sock);
|
||||||
nDPIsrvd_socket_free(&mock_buff);
|
nDPIsrvd_socket_free(&mock_buff);
|
||||||
|
nDPIsrvd_socket_free(&mock_null);
|
||||||
|
|
||||||
|
logger(0, "%s", "Distributor worker thread exits..");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1035,7 +1167,8 @@ static void * nDPId_mainloop_thread(void * const arg)
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&nDPId_start_mutex);
|
thread_signal(&start_condition);
|
||||||
|
thread_wait(&start_condition);
|
||||||
|
|
||||||
jsonize_daemon(&reader_threads[0], DAEMON_EVENT_INIT);
|
jsonize_daemon(&reader_threads[0], DAEMON_EVENT_INIT);
|
||||||
/* restore SIGPIPE to the default handler (Termination) */
|
/* restore SIGPIPE to the default handler (Termination) */
|
||||||
@@ -1073,10 +1206,10 @@ static void * nDPId_mainloop_thread(void * const arg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
error:
|
error:
|
||||||
pthread_mutex_lock(&nDPId_start_mutex);
|
|
||||||
free_reader_threads();
|
free_reader_threads();
|
||||||
close(mock_pipefds[PIPE_nDPId]);
|
close(mock_pipefds[PIPE_nDPId]);
|
||||||
|
|
||||||
|
logger(0, "%s", "nDPId worker thread exits..");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1169,11 +1302,6 @@ int main(int argc, char ** argv)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Start processing after all threads started and initialized. */
|
|
||||||
pthread_mutex_lock(&nDPId_start_mutex);
|
|
||||||
pthread_mutex_lock(&nDPIsrvd_start_mutex);
|
|
||||||
pthread_mutex_lock(&distributor_start_mutex);
|
|
||||||
|
|
||||||
pthread_t nDPId_thread;
|
pthread_t nDPId_thread;
|
||||||
struct nDPId_return_value nDPId_return = {};
|
struct nDPId_return_value nDPId_return = {};
|
||||||
if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, &nDPId_return) != 0)
|
if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, &nDPId_return) != 0)
|
||||||
@@ -1195,10 +1323,6 @@ int main(int argc, char ** argv)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&nDPIsrvd_start_mutex);
|
|
||||||
pthread_mutex_unlock(&distributor_start_mutex);
|
|
||||||
pthread_mutex_unlock(&nDPId_start_mutex);
|
|
||||||
|
|
||||||
/* Try to gracefully shutdown all threads. */
|
/* Try to gracefully shutdown all threads. */
|
||||||
while (thread_wait_for_termination(distributor_thread, 1, &distributor_return.thread_return_value) == 0)
|
while (thread_wait_for_termination(distributor_thread, 1, &distributor_return.thread_return_value) == 0)
|
||||||
{
|
{
|
||||||
@@ -1224,6 +1348,8 @@ int main(int argc, char ** argv)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger(0, "%s", "All worker threads terminated..");
|
||||||
|
|
||||||
if (THREADS_RETURNED_ERROR() != 0)
|
if (THREADS_RETURNED_ERROR() != 0)
|
||||||
{
|
{
|
||||||
char const * which_thread = "Unknown";
|
char const * which_thread = "Unknown";
|
||||||
@@ -1356,6 +1482,7 @@ int main(int argc, char ** argv)
|
|||||||
logger(1, "%s: %s", argv[0], "nDPIsrvd.h memory leak detected.");
|
logger(1, "%s: %s", argv[0], "nDPIsrvd.h memory leak detected.");
|
||||||
logger(1, "%s: Allocated / Free'd bytes: %llu / %llu", argv[0], nDPIsrvd_alloc_bytes, nDPIsrvd_free_bytes);
|
logger(1, "%s: Allocated / Free'd bytes: %llu / %llu", argv[0], nDPIsrvd_alloc_bytes, nDPIsrvd_free_bytes);
|
||||||
logger(1, "%s: Allocated / Free'd count: %llu / %llu", argv[0], nDPIsrvd_alloc_count, nDPIsrvd_free_count);
|
logger(1, "%s: Allocated / Free'd count: %llu / %llu", argv[0], nDPIsrvd_alloc_count, nDPIsrvd_free_count);
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nDPId_return.cur_active_flows != 0 || nDPId_return.cur_idle_flows != 0)
|
if (nDPId_return.cur_active_flows != 0 || nDPId_return.cur_idle_flows != 0)
|
||||||
|
|||||||
11
nDPId.c
11
nDPId.c
@@ -2221,7 +2221,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
|||||||
(int)json_str_len,
|
(int)json_str_len,
|
||||||
json_str);
|
json_str);
|
||||||
|
|
||||||
if (s_ret < 0 || s_ret == (int)sizeof(newline_json_str))
|
if (s_ret < 0 || s_ret >= (int)sizeof(newline_json_str))
|
||||||
{
|
{
|
||||||
logger(1,
|
logger(1,
|
||||||
"[%8llu, %zu] JSON buffer prepare failed: snprintf returned %d, buffer size %zu",
|
"[%8llu, %zu] JSON buffer prepare failed: snprintf returned %d, buffer size %zu",
|
||||||
@@ -2229,6 +2229,15 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
|||||||
reader_thread->array_index,
|
reader_thread->array_index,
|
||||||
s_ret,
|
s_ret,
|
||||||
sizeof(newline_json_str));
|
sizeof(newline_json_str));
|
||||||
|
if (s_ret >= (int)sizeof(newline_json_str))
|
||||||
|
{
|
||||||
|
logger(1,
|
||||||
|
"[%8llu, %zu] JSON string: %.*s...",
|
||||||
|
workflow->packets_captured,
|
||||||
|
reader_thread->array_index,
|
||||||
|
ndpi_min(512, NETWORK_BUFFER_MAX_SIZE),
|
||||||
|
newline_json_str);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1452,8 +1452,8 @@ static int mainloop(int epollfd)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close(signalfd);
|
|
||||||
free_remotes(epollfd);
|
free_remotes(epollfd);
|
||||||
|
close(signalfd);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user