mirror of
https://github.com/optim-enterprises-bv/nDPId.git
synced 2025-11-01 18:57:48 +00:00
nDPid-test: add buffer test
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Submodule examples/js-rt-analyzer updated: e0402c64c8...124595114f
302
nDPId-test.c
302
nDPId-test.c
@@ -7,6 +7,7 @@ static void nDPIsrvd_memprof_log(char const * const format, ...);
|
|||||||
static void nDPIsrvd_memprof_log_alloc(size_t alloc_size);
|
static void nDPIsrvd_memprof_log_alloc(size_t alloc_size);
|
||||||
static void nDPIsrvd_memprof_log_free(size_t free_size);
|
static void nDPIsrvd_memprof_log_free(size_t free_size);
|
||||||
|
|
||||||
|
//#define DO_MEMORY_LOGGING 1
|
||||||
#define NO_MAIN 1
|
#define NO_MAIN 1
|
||||||
#include "utils.c"
|
#include "utils.c"
|
||||||
#include "nDPIsrvd.c"
|
#include "nDPIsrvd.c"
|
||||||
@@ -20,6 +21,9 @@ enum
|
|||||||
PIPE_TEST_WRITE = 1, /* Distributor (data from nDPIsrvd) write */
|
PIPE_TEST_WRITE = 1, /* Distributor (data from nDPIsrvd) write */
|
||||||
PIPE_TEST_READ = 0, /* Distributor (do some validation tests) read */
|
PIPE_TEST_READ = 0, /* Distributor (do some validation tests) read */
|
||||||
|
|
||||||
|
PIPE_BUFFER_WRITE = 1, /* Distributor (data from nDPIsrvd, buffered json lines) write */
|
||||||
|
PIPE_BUFFER_READ = 0, /* Distributor (do some validation tests, buffered json lines) read */
|
||||||
|
|
||||||
PIPE_NULL_WRITE = 1, /* Distributor (data from nDPIsrvd) write */
|
PIPE_NULL_WRITE = 1, /* Distributor (data from nDPIsrvd) write */
|
||||||
PIPE_NULL_READ = 0, /* Distributor (print to stdout) read */
|
PIPE_NULL_READ = 0, /* Distributor (print to stdout) read */
|
||||||
|
|
||||||
@@ -27,7 +31,7 @@ enum
|
|||||||
PIPE_ARPA_READ = 0, /* Distributor (IP mockup) read */
|
PIPE_ARPA_READ = 0, /* Distributor (IP mockup) read */
|
||||||
|
|
||||||
PIPE_FDS = 2,
|
PIPE_FDS = 2,
|
||||||
MAX_REMOTE_DESCRIPTORS = 4 /* mock pipefd's + 2 * distributor pipefd's */
|
MAX_REMOTE_DESCRIPTORS = 5 /* mock pipefd's + 2 * distributor pipefd's */
|
||||||
};
|
};
|
||||||
|
|
||||||
struct thread_return_value
|
struct thread_return_value
|
||||||
@@ -106,6 +110,12 @@ struct distributor_global_user_data
|
|||||||
struct distributor_thread_user_data thread_user_data;
|
struct distributor_thread_user_data thread_user_data;
|
||||||
|
|
||||||
int flow_cleanup_error;
|
int flow_cleanup_error;
|
||||||
|
|
||||||
|
// please keep this struct at the end
|
||||||
|
struct
|
||||||
|
{
|
||||||
|
int do_hash_checks;
|
||||||
|
} options;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct distributor_flow_user_data
|
struct distributor_flow_user_data
|
||||||
@@ -124,12 +134,15 @@ struct distributor_return_value
|
|||||||
|
|
||||||
static int mock_pipefds[PIPE_FDS] = {};
|
static int mock_pipefds[PIPE_FDS] = {};
|
||||||
static int mock_testfds[PIPE_FDS] = {};
|
static int mock_testfds[PIPE_FDS] = {};
|
||||||
|
static int mock_bufffds[PIPE_FDS] = {};
|
||||||
static int mock_nullfds[PIPE_FDS] = {};
|
static int mock_nullfds[PIPE_FDS] = {};
|
||||||
static int mock_arpafds[PIPE_FDS] = {};
|
static int mock_arpafds[PIPE_FDS] = {};
|
||||||
static pthread_mutex_t nDPId_start_mutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t nDPId_start_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
static pthread_mutex_t nDPIsrvd_start_mutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t nDPIsrvd_start_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
static pthread_mutex_t distributor_start_mutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t distributor_start_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
#ifdef DO_MEMORY_LOGGING
|
||||||
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
#endif
|
||||||
static unsigned long long int nDPIsrvd_alloc_count = 0;
|
static unsigned long long int nDPIsrvd_alloc_count = 0;
|
||||||
static unsigned long long int nDPIsrvd_alloc_bytes = 0;
|
static unsigned long long int nDPIsrvd_alloc_bytes = 0;
|
||||||
static unsigned long long int nDPIsrvd_free_count = 0;
|
static unsigned long long int nDPIsrvd_free_count = 0;
|
||||||
@@ -138,7 +151,7 @@ static unsigned long long int nDPIsrvd_free_bytes = 0;
|
|||||||
#define THREAD_ERROR(thread_arg) \
|
#define THREAD_ERROR(thread_arg) \
|
||||||
do \
|
do \
|
||||||
{ \
|
{ \
|
||||||
((struct thread_return_value *)thread_arg)->val = 1; \
|
((struct thread_return_value *)thread_arg)->val = (errno != 0 ? errno : 1); \
|
||||||
} while (0);
|
} while (0);
|
||||||
#define THREAD_ERROR_GOTO(thread_arg) \
|
#define THREAD_ERROR_GOTO(thread_arg) \
|
||||||
do \
|
do \
|
||||||
@@ -149,15 +162,28 @@ static unsigned long long int nDPIsrvd_free_bytes = 0;
|
|||||||
|
|
||||||
static void nDPIsrvd_memprof_log(char const * const format, ...)
|
static void nDPIsrvd_memprof_log(char const * const format, ...)
|
||||||
{
|
{
|
||||||
|
#ifdef DO_MEMORY_LOGGING
|
||||||
|
int logbuf_used, logbuf_used_tmp;
|
||||||
|
char logbuf[BUFSIZ];
|
||||||
va_list ap;
|
va_list ap;
|
||||||
|
|
||||||
va_start(ap, format);
|
va_start(ap, format);
|
||||||
pthread_mutex_lock(&log_mutex);
|
pthread_mutex_lock(&log_mutex);
|
||||||
fprintf(stderr, "%s", "nDPIsrvd MemoryProfiler: ");
|
logbuf_used = snprintf(logbuf, sizeof(logbuf), "%s", "nDPIsrvd MemoryProfiler: ");
|
||||||
vfprintf(stderr, format, ap);
|
if (logbuf_used > 0)
|
||||||
fprintf(stderr, "%s\n", "");
|
{
|
||||||
|
logbuf_used_tmp = vsnprintf(logbuf + logbuf_used, sizeof(logbuf) - logbuf_used, format, ap);
|
||||||
|
if (logbuf_used_tmp > 0)
|
||||||
|
{
|
||||||
|
logbuf_used += logbuf_used_tmp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fprintf(stderr, "%s\n", logbuf);
|
||||||
pthread_mutex_unlock(&log_mutex);
|
pthread_mutex_unlock(&log_mutex);
|
||||||
va_end(ap);
|
va_end(ap);
|
||||||
|
#else
|
||||||
|
(void)format;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void nDPIsrvd_memprof_log_alloc(size_t alloc_size)
|
void nDPIsrvd_memprof_log_alloc(size_t alloc_size)
|
||||||
@@ -187,14 +213,17 @@ static int setup_pipe(int pipefd[PIPE_FDS])
|
|||||||
static void * nDPIsrvd_mainloop_thread(void * const arg)
|
static void * nDPIsrvd_mainloop_thread(void * const arg)
|
||||||
{
|
{
|
||||||
int nDPIsrvd_shutdown = 0;
|
int nDPIsrvd_shutdown = 0;
|
||||||
int epollfd = create_evq();
|
int epollfd;
|
||||||
struct remote_desc * mock_json_desc = NULL;
|
struct remote_desc * mock_json_desc = NULL;
|
||||||
struct remote_desc * mock_test_desc = NULL;
|
struct remote_desc * mock_test_desc = NULL;
|
||||||
|
struct remote_desc * mock_buff_desc = NULL;
|
||||||
struct remote_desc * mock_null_desc = NULL;
|
struct remote_desc * mock_null_desc = NULL;
|
||||||
struct remote_desc * mock_arpa_desc = NULL;
|
struct remote_desc * mock_arpa_desc = NULL;
|
||||||
struct epoll_event events[32];
|
struct epoll_event events[32];
|
||||||
size_t const events_size = sizeof(events) / sizeof(events[0]);
|
size_t const events_size = sizeof(events) / sizeof(events[0]);
|
||||||
|
|
||||||
|
errno = 0;
|
||||||
|
epollfd = create_evq();
|
||||||
if (epollfd < 0)
|
if (epollfd < 0)
|
||||||
{
|
{
|
||||||
logger(1, "nDPIsrvd epollfd invalid: %d", epollfd);
|
logger(1, "nDPIsrvd epollfd invalid: %d", epollfd);
|
||||||
@@ -215,6 +244,13 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
|
|||||||
THREAD_ERROR_GOTO(arg);
|
THREAD_ERROR_GOTO(arg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mock_buff_desc = get_remote_descriptor(DISTRIBUTOR_UN, mock_bufffds[PIPE_BUFFER_WRITE], 8);
|
||||||
|
if (mock_buff_desc == NULL)
|
||||||
|
{
|
||||||
|
logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (BUFFER Distributor)");
|
||||||
|
THREAD_ERROR_GOTO(arg);
|
||||||
|
}
|
||||||
|
|
||||||
mock_null_desc = get_remote_descriptor(DISTRIBUTOR_UN, mock_nullfds[PIPE_NULL_WRITE], NETWORK_BUFFER_MAX_SIZE);
|
mock_null_desc = get_remote_descriptor(DISTRIBUTOR_UN, mock_nullfds[PIPE_NULL_WRITE], NETWORK_BUFFER_MAX_SIZE);
|
||||||
if (mock_null_desc == NULL)
|
if (mock_null_desc == NULL)
|
||||||
{
|
{
|
||||||
@@ -233,8 +269,10 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
|
|||||||
sizeof(mock_arpa_desc->event_distributor_in.peer_addr));
|
sizeof(mock_arpa_desc->event_distributor_in.peer_addr));
|
||||||
mock_arpa_desc->event_distributor_in.peer.sin_port = 0;
|
mock_arpa_desc->event_distributor_in.peer.sin_port = 0;
|
||||||
|
|
||||||
|
errno = 0;
|
||||||
if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 ||
|
if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 ||
|
||||||
add_in_event(epollfd, mock_null_desc) != 0 || add_in_event(epollfd, mock_arpa_desc) != 0)
|
add_in_event(epollfd, mock_buff_desc) != 0 || add_in_event(epollfd, mock_null_desc) != 0 ||
|
||||||
|
add_in_event(epollfd, mock_arpa_desc) != 0)
|
||||||
{
|
{
|
||||||
logger(1, "%s", "nDPIsrvd add input event failed");
|
logger(1, "%s", "nDPIsrvd add input event failed");
|
||||||
THREAD_ERROR_GOTO(arg);
|
THREAD_ERROR_GOTO(arg);
|
||||||
@@ -244,6 +282,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
|
|||||||
|
|
||||||
while (nDPIsrvd_shutdown == 0)
|
while (nDPIsrvd_shutdown == 0)
|
||||||
{
|
{
|
||||||
|
errno = 0;
|
||||||
int nready = epoll_wait(epollfd, events, events_size, -1);
|
int nready = epoll_wait(epollfd, events, events_size, -1);
|
||||||
|
|
||||||
if (nready < 0)
|
if (nready < 0)
|
||||||
@@ -254,11 +293,38 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
|
|||||||
for (int i = 0; i < nready; i++)
|
for (int i = 0; i < nready; i++)
|
||||||
{
|
{
|
||||||
if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc ||
|
if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc ||
|
||||||
events[i].data.ptr == mock_null_desc || events[i].data.ptr == mock_arpa_desc)
|
events[i].data.ptr == mock_buff_desc || events[i].data.ptr == mock_null_desc ||
|
||||||
|
events[i].data.ptr == mock_arpa_desc)
|
||||||
{
|
{
|
||||||
if ((events[i].events & EPOLLHUP) != 0 || (events[i].events & EPOLLERR) != 0)
|
if ((events[i].events & EPOLLHUP) != 0 || (events[i].events & EPOLLERR) != 0)
|
||||||
{
|
{
|
||||||
logger(1, "nDPIsrvd distributor %d connection closed", events[i].data.fd);
|
char const * remote_desc_name;
|
||||||
|
struct remote_desc * remote = (struct remote_desc *)events[i].data.ptr;
|
||||||
|
if (remote == mock_json_desc)
|
||||||
|
{
|
||||||
|
remote_desc_name = "Mock JSON";
|
||||||
|
}
|
||||||
|
else if (remote == mock_test_desc)
|
||||||
|
{
|
||||||
|
remote_desc_name = "Mock Test";
|
||||||
|
}
|
||||||
|
else if (remote == mock_buff_desc)
|
||||||
|
{
|
||||||
|
remote_desc_name = "Mock Buffer";
|
||||||
|
}
|
||||||
|
else if (remote == mock_null_desc)
|
||||||
|
{
|
||||||
|
remote_desc_name = "Mock NULL";
|
||||||
|
}
|
||||||
|
else if (remote == mock_arpa_desc)
|
||||||
|
{
|
||||||
|
remote_desc_name = "Mock ARPA";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
remote_desc_name = "UNKNOWN";
|
||||||
|
}
|
||||||
|
logger(1, "nDPIsrvd distributor '%s' connection closed", remote_desc_name);
|
||||||
handle_data_event(epollfd, &events[i]);
|
handle_data_event(epollfd, &events[i]);
|
||||||
nDPIsrvd_shutdown++;
|
nDPIsrvd_shutdown++;
|
||||||
}
|
}
|
||||||
@@ -284,6 +350,10 @@ error:
|
|||||||
{
|
{
|
||||||
drain_write_buffers_blocking(mock_test_desc);
|
drain_write_buffers_blocking(mock_test_desc);
|
||||||
}
|
}
|
||||||
|
if (mock_buff_desc != NULL)
|
||||||
|
{
|
||||||
|
drain_write_buffers_blocking(mock_buff_desc);
|
||||||
|
}
|
||||||
if (mock_null_desc != NULL)
|
if (mock_null_desc != NULL)
|
||||||
{
|
{
|
||||||
drain_write_buffers_blocking(mock_null_desc);
|
drain_write_buffers_blocking(mock_null_desc);
|
||||||
@@ -306,7 +376,11 @@ static enum nDPIsrvd_callback_return update_flow_packets_processed(struct nDPIsr
|
|||||||
struct nDPIsrvd_json_token const * const flow_total_packets_processed[FD_COUNT] = {
|
struct nDPIsrvd_json_token const * const flow_total_packets_processed[FD_COUNT] = {
|
||||||
TOKEN_GET_SZ(sock, "flow_src_packets_processed"), TOKEN_GET_SZ(sock, "flow_dst_packets_processed")};
|
TOKEN_GET_SZ(sock, "flow_src_packets_processed"), TOKEN_GET_SZ(sock, "flow_dst_packets_processed")};
|
||||||
|
|
||||||
flow_stats->total_packets_processed = 0;
|
if (sock->flow_user_data_size > 0)
|
||||||
|
{
|
||||||
|
flow_stats->total_packets_processed = 0;
|
||||||
|
}
|
||||||
|
|
||||||
for (int dir = 0; dir < FD_COUNT; ++dir)
|
for (int dir = 0; dir < FD_COUNT; ++dir)
|
||||||
{
|
{
|
||||||
if (flow_total_packets_processed[dir] != NULL)
|
if (flow_total_packets_processed[dir] != NULL)
|
||||||
@@ -319,7 +393,10 @@ static enum nDPIsrvd_callback_return update_flow_packets_processed(struct nDPIsr
|
|||||||
|
|
||||||
if (flow_stats != NULL)
|
if (flow_stats != NULL)
|
||||||
{
|
{
|
||||||
flow_stats->total_packets_processed += nmb;
|
if (sock->flow_user_data_size > 0)
|
||||||
|
{
|
||||||
|
flow_stats->total_packets_processed += nmb;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -333,7 +410,11 @@ static enum nDPIsrvd_callback_return update_flow_l4_payload_len(struct nDPIsrvd_
|
|||||||
struct nDPIsrvd_json_token const * const flow_total_l4_payload_len[FD_COUNT] = {
|
struct nDPIsrvd_json_token const * const flow_total_l4_payload_len[FD_COUNT] = {
|
||||||
TOKEN_GET_SZ(sock, "flow_src_tot_l4_payload_len"), TOKEN_GET_SZ(sock, "flow_dst_tot_l4_payload_len")};
|
TOKEN_GET_SZ(sock, "flow_src_tot_l4_payload_len"), TOKEN_GET_SZ(sock, "flow_dst_tot_l4_payload_len")};
|
||||||
|
|
||||||
flow_stats->flow_total_l4_data_len = 0;
|
if (sock->flow_user_data_size > 0)
|
||||||
|
{
|
||||||
|
flow_stats->flow_total_l4_data_len = 0;
|
||||||
|
}
|
||||||
|
|
||||||
for (int dir = 0; dir < FD_COUNT; ++dir)
|
for (int dir = 0; dir < FD_COUNT; ++dir)
|
||||||
{
|
{
|
||||||
if (flow_total_l4_payload_len[dir] != NULL)
|
if (flow_total_l4_payload_len[dir] != NULL)
|
||||||
@@ -344,7 +425,7 @@ static enum nDPIsrvd_callback_return update_flow_l4_payload_len(struct nDPIsrvd_
|
|||||||
return CALLBACK_ERROR;
|
return CALLBACK_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (flow_stats != NULL)
|
if (sock->flow_user_data_size > 0 && flow_stats != NULL)
|
||||||
{
|
{
|
||||||
flow_stats->flow_total_l4_data_len += nmb;
|
flow_stats->flow_total_l4_data_len += nmb;
|
||||||
}
|
}
|
||||||
@@ -398,8 +479,14 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
|||||||
|
|
||||||
if (daemon_event_name != NULL)
|
if (daemon_event_name != NULL)
|
||||||
{
|
{
|
||||||
instance_stats->daemon_event_count++;
|
if (sock->instance_user_data_size > 0)
|
||||||
thread_stats->daemon_event_count++;
|
{
|
||||||
|
instance_stats->daemon_event_count++;
|
||||||
|
}
|
||||||
|
if (sock->thread_user_data_size > 0)
|
||||||
|
{
|
||||||
|
thread_stats->daemon_event_count++;
|
||||||
|
}
|
||||||
|
|
||||||
if (TOKEN_VALUE_EQUALS_SZ(sock, daemon_event_name, "shutdown") != 0)
|
if (TOKEN_VALUE_EQUALS_SZ(sock, daemon_event_name, "shutdown") != 0)
|
||||||
{
|
{
|
||||||
@@ -411,7 +498,7 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
|||||||
nDPIsrvd_ull nmb = 0;
|
nDPIsrvd_ull nmb = 0;
|
||||||
if (TOKEN_VALUE_TO_ULL(sock, total_events_serialized, &nmb) != CONVERSION_OK)
|
if (TOKEN_VALUE_TO_ULL(sock, total_events_serialized, &nmb) != CONVERSION_OK)
|
||||||
{
|
{
|
||||||
return CALLBACK_ERROR;
|
goto callback_error;
|
||||||
}
|
}
|
||||||
|
|
||||||
global_stats->total_events_serialized = nmb;
|
global_stats->total_events_serialized = nmb;
|
||||||
@@ -432,16 +519,19 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
|||||||
{
|
{
|
||||||
global_stats->cur_active_flows++;
|
global_stats->cur_active_flows++;
|
||||||
global_stats->flow_new_count++;
|
global_stats->flow_new_count++;
|
||||||
thread_stats->flow_new_count++;
|
if (sock->thread_user_data_size > 0)
|
||||||
|
{
|
||||||
|
thread_stats->flow_new_count++;
|
||||||
|
}
|
||||||
|
|
||||||
unsigned int hash_count = HASH_COUNT(instance->flow_table);
|
unsigned int hash_count = HASH_COUNT(instance->flow_table);
|
||||||
if (hash_count != global_stats->cur_active_flows)
|
if (global_stats->options.do_hash_checks != 0 && hash_count != global_stats->cur_active_flows)
|
||||||
{
|
{
|
||||||
logger(1,
|
logger(1,
|
||||||
"Amount of flows in the flow table not equal to current active flows counter: %u != %llu",
|
"Amount of flows in the flow table not equal to current active flows counter: %u != %llu",
|
||||||
hash_count,
|
hash_count,
|
||||||
global_stats->cur_active_flows);
|
global_stats->cur_active_flows);
|
||||||
return CALLBACK_ERROR;
|
goto callback_error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "end") != 0)
|
if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "end") != 0)
|
||||||
@@ -449,12 +539,15 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
|||||||
global_stats->cur_active_flows--;
|
global_stats->cur_active_flows--;
|
||||||
global_stats->cur_idle_flows++;
|
global_stats->cur_idle_flows++;
|
||||||
global_stats->flow_end_count++;
|
global_stats->flow_end_count++;
|
||||||
thread_stats->flow_end_count++;
|
if (sock->thread_user_data_size > 0)
|
||||||
|
{
|
||||||
|
thread_stats->flow_end_count++;
|
||||||
|
}
|
||||||
|
|
||||||
if (update_flow_packets_processed(sock, flow_stats) != CALLBACK_OK ||
|
if (update_flow_packets_processed(sock, flow_stats) != CALLBACK_OK ||
|
||||||
update_flow_l4_payload_len(sock, flow_stats) != CALLBACK_OK)
|
update_flow_l4_payload_len(sock, flow_stats) != CALLBACK_OK)
|
||||||
{
|
{
|
||||||
return CALLBACK_ERROR;
|
goto callback_error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "idle") != 0)
|
if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "idle") != 0)
|
||||||
@@ -462,12 +555,15 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
|||||||
global_stats->cur_active_flows--;
|
global_stats->cur_active_flows--;
|
||||||
global_stats->cur_idle_flows++;
|
global_stats->cur_idle_flows++;
|
||||||
global_stats->flow_idle_count++;
|
global_stats->flow_idle_count++;
|
||||||
thread_stats->flow_idle_count++;
|
if (sock->thread_user_data_size > 0)
|
||||||
|
{
|
||||||
|
thread_stats->flow_idle_count++;
|
||||||
|
}
|
||||||
|
|
||||||
if (update_flow_packets_processed(sock, flow_stats) != CALLBACK_OK ||
|
if (update_flow_packets_processed(sock, flow_stats) != CALLBACK_OK ||
|
||||||
update_flow_l4_payload_len(sock, flow_stats) != CALLBACK_OK)
|
update_flow_l4_payload_len(sock, flow_stats) != CALLBACK_OK)
|
||||||
{
|
{
|
||||||
return CALLBACK_ERROR;
|
goto callback_error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "detected") != 0)
|
if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "detected") != 0)
|
||||||
@@ -498,7 +594,8 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
|||||||
{
|
{
|
||||||
flow_count++;
|
flow_count++;
|
||||||
}
|
}
|
||||||
if (flow_count != global_stats->cur_active_flows + global_stats->cur_idle_flows)
|
if (global_stats->options.do_hash_checks != 0 &&
|
||||||
|
flow_count != global_stats->cur_active_flows + global_stats->cur_idle_flows)
|
||||||
{
|
{
|
||||||
logger(1,
|
logger(1,
|
||||||
"Amount of flows in flow table not equal current active flows plus current idle flows: %llu != "
|
"Amount of flows in flow table not equal current active flows plus current idle flows: %llu != "
|
||||||
@@ -506,12 +603,16 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
|||||||
(unsigned long long int)flow_count,
|
(unsigned long long int)flow_count,
|
||||||
global_stats->cur_active_flows,
|
global_stats->cur_active_flows,
|
||||||
global_stats->cur_idle_flows);
|
global_stats->cur_idle_flows);
|
||||||
return CALLBACK_ERROR;
|
goto callback_error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return CALLBACK_OK;
|
return CALLBACK_OK;
|
||||||
|
callback_error:
|
||||||
|
pthread_mutex_unlock(&nDPIsrvd_start_mutex);
|
||||||
|
pthread_mutex_unlock(&nDPId_start_mutex);
|
||||||
|
return CALLBACK_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void distributor_instance_cleanup_callback(struct nDPIsrvd_socket * const sock,
|
static void distributor_instance_cleanup_callback(struct nDPIsrvd_socket * const sock,
|
||||||
@@ -525,6 +626,11 @@ static void distributor_instance_cleanup_callback(struct nDPIsrvd_socket * const
|
|||||||
|
|
||||||
(void)reason;
|
(void)reason;
|
||||||
|
|
||||||
|
if (sock->global_user_data_size == 0 || sock->thread_user_data_size == 0 || sock->instance_user_data_size == 0)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
HASH_ITER(hh, instance->thread_data_table, current_thread_data, ttmp)
|
HASH_ITER(hh, instance->thread_data_table, current_thread_data, ttmp)
|
||||||
{
|
{
|
||||||
struct distributor_thread_user_data * const tud =
|
struct distributor_thread_user_data * const tud =
|
||||||
@@ -549,7 +655,10 @@ static void distributor_flow_cleanup_callback(struct nDPIsrvd_socket * const soc
|
|||||||
|
|
||||||
(void)thread_data;
|
(void)thread_data;
|
||||||
|
|
||||||
((struct distributor_instance_user_data *)instance->instance_user_data)->flow_cleanup_count++;
|
if (sock->instance_user_data_size > 0)
|
||||||
|
{
|
||||||
|
((struct distributor_instance_user_data *)instance->instance_user_data)->flow_cleanup_count++;
|
||||||
|
}
|
||||||
|
|
||||||
switch (reason)
|
switch (reason)
|
||||||
{
|
{
|
||||||
@@ -601,11 +710,20 @@ static void distributor_flow_cleanup_callback(struct nDPIsrvd_socket * const soc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static enum nDPIsrvd_callback_return distributor_json_mock_buff_callback(
|
||||||
|
struct nDPIsrvd_socket * const sock,
|
||||||
|
struct nDPIsrvd_instance * const instance,
|
||||||
|
struct nDPIsrvd_thread_data * const thread_data,
|
||||||
|
struct nDPIsrvd_flow * const flow)
|
||||||
|
{
|
||||||
|
return distributor_json_callback(sock, instance, thread_data, flow);
|
||||||
|
}
|
||||||
|
|
||||||
static void * distributor_client_mainloop_thread(void * const arg)
|
static void * distributor_client_mainloop_thread(void * const arg)
|
||||||
{
|
{
|
||||||
int dis_epollfd = create_evq();
|
int dis_epollfd = create_evq();
|
||||||
int signalfd = setup_signalfd(dis_epollfd);
|
int signalfd = setup_signalfd(dis_epollfd);
|
||||||
int pipe_read_finished = 0, null_read_finished = 0, arpa_read_finished = 0;
|
int pipe_read_finished = 0, buff_read_finished = 0, null_read_finished = 0, arpa_read_finished = 0;
|
||||||
struct epoll_event events[32];
|
struct epoll_event events[32];
|
||||||
size_t const events_size = sizeof(events) / sizeof(events[0]);
|
size_t const events_size = sizeof(events) / sizeof(events[0]);
|
||||||
struct distributor_return_value * const drv = (struct distributor_return_value *)arg;
|
struct distributor_return_value * const drv = (struct distributor_return_value *)arg;
|
||||||
@@ -617,41 +735,64 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
distributor_json_callback,
|
distributor_json_callback,
|
||||||
distributor_instance_cleanup_callback,
|
distributor_instance_cleanup_callback,
|
||||||
distributor_flow_cleanup_callback);
|
distributor_flow_cleanup_callback);
|
||||||
struct distributor_global_user_data * stats;
|
struct nDPIsrvd_socket * mock_buff = nDPIsrvd_socket_init(sizeof(struct distributor_global_user_data),
|
||||||
|
sizeof(struct distributor_instance_user_data),
|
||||||
|
sizeof(struct distributor_thread_user_data),
|
||||||
|
sizeof(struct distributor_flow_user_data),
|
||||||
|
distributor_json_mock_buff_callback,
|
||||||
|
distributor_instance_cleanup_callback,
|
||||||
|
distributor_flow_cleanup_callback);
|
||||||
|
struct distributor_global_user_data * sock_stats;
|
||||||
|
struct distributor_global_user_data * buff_stats;
|
||||||
|
|
||||||
if (mock_sock == NULL)
|
errno = 0;
|
||||||
|
if (mock_sock == NULL || mock_buff == NULL)
|
||||||
{
|
{
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_sock->fd = mock_testfds[PIPE_TEST_READ];
|
mock_sock->fd = mock_testfds[PIPE_TEST_READ];
|
||||||
|
mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ];
|
||||||
|
|
||||||
if (dis_epollfd < 0 || signalfd < 0)
|
if (dis_epollfd < 0 || signalfd < 0)
|
||||||
{
|
{
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errno = 0;
|
||||||
if (add_in_event_fd(dis_epollfd, mock_testfds[PIPE_TEST_READ]) != 0)
|
if (add_in_event_fd(dis_epollfd, mock_testfds[PIPE_TEST_READ]) != 0)
|
||||||
{
|
{
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errno = 0;
|
||||||
|
if (add_in_event_fd(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]) != 0)
|
||||||
|
{
|
||||||
|
THREAD_ERROR_GOTO(trv);
|
||||||
|
}
|
||||||
|
|
||||||
|
errno = 0;
|
||||||
if (add_in_event_fd(dis_epollfd, mock_nullfds[PIPE_NULL_READ]) != 0)
|
if (add_in_event_fd(dis_epollfd, mock_nullfds[PIPE_NULL_READ]) != 0)
|
||||||
{
|
{
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errno = 0;
|
||||||
if (add_in_event_fd(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]) != 0)
|
if (add_in_event_fd(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]) != 0)
|
||||||
{
|
{
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
|
|
||||||
stats = (struct distributor_global_user_data *)mock_sock->global_user_data;
|
sock_stats = (struct distributor_global_user_data *)mock_sock->global_user_data;
|
||||||
stats->json_string_len_min = (unsigned long long int)-1;
|
sock_stats->json_string_len_min = (unsigned long long int)-1;
|
||||||
|
sock_stats->options.do_hash_checks = 1;
|
||||||
|
buff_stats = (struct distributor_global_user_data *)mock_buff->global_user_data;
|
||||||
|
buff_stats->json_string_len_min = (unsigned long long int)-1;
|
||||||
|
buff_stats->options.do_hash_checks = 0;
|
||||||
|
|
||||||
pthread_mutex_lock(&distributor_start_mutex);
|
pthread_mutex_lock(&distributor_start_mutex);
|
||||||
|
|
||||||
while (pipe_read_finished == 0 || null_read_finished == 0 || arpa_read_finished == 0)
|
while (pipe_read_finished == 0 || buff_read_finished == 0 || null_read_finished == 0 || arpa_read_finished == 0)
|
||||||
{
|
{
|
||||||
int nready = epoll_wait(dis_epollfd, events, events_size, -1);
|
int nready = epoll_wait(dis_epollfd, events, events_size, -1);
|
||||||
if (nready < 0 && errno != EINTR)
|
if (nready < 0 && errno != EINTR)
|
||||||
@@ -667,6 +808,10 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
logger(1, "Invalid epoll event received: %d", events[i].events & (~EPOLLIN & ~EPOLLHUP));
|
logger(1, "Invalid epoll event received: %d", events[i].events & (~EPOLLIN & ~EPOLLHUP));
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
|
if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0)
|
||||||
|
{
|
||||||
|
logger(1, "Distributor disconnected: %d", events[i].data.fd);
|
||||||
|
}
|
||||||
|
|
||||||
if (events[i].data.fd == mock_testfds[PIPE_TEST_READ])
|
if (events[i].data.fd == mock_testfds[PIPE_TEST_READ])
|
||||||
{
|
{
|
||||||
@@ -690,7 +835,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
{
|
{
|
||||||
logger(1, "JSON parsing failed: %s", nDPIsrvd_enum_to_string(parse_ret));
|
logger(1, "JSON parsing failed: %s", nDPIsrvd_enum_to_string(parse_ret));
|
||||||
logger(1,
|
logger(1,
|
||||||
"Problematic JSON string (start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
"Problematic JSON string (mock sock, start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
||||||
mock_sock->buffer.json_string_start,
|
mock_sock->buffer.json_string_start,
|
||||||
mock_sock->buffer.json_string_length,
|
mock_sock->buffer.json_string_length,
|
||||||
mock_sock->buffer.buf.used,
|
mock_sock->buffer.buf.used,
|
||||||
@@ -699,7 +844,44 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (stats->flow_cleanup_error != 0)
|
if (sock_stats->flow_cleanup_error != 0)
|
||||||
|
{
|
||||||
|
logger(1, "%s", "Flow cleanup callback error'd");
|
||||||
|
THREAD_ERROR_GOTO(trv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (events[i].data.fd == mock_bufffds[PIPE_BUFFER_READ])
|
||||||
|
{
|
||||||
|
switch (nDPIsrvd_read(mock_buff))
|
||||||
|
{
|
||||||
|
case READ_OK:
|
||||||
|
break;
|
||||||
|
case READ_LAST_ENUM_VALUE:
|
||||||
|
case READ_ERROR:
|
||||||
|
case READ_TIMEOUT:
|
||||||
|
logger(1, "Read and verify fd returned an error: %s", strerror(errno));
|
||||||
|
THREAD_ERROR_GOTO(trv);
|
||||||
|
case READ_PEER_DISCONNECT:
|
||||||
|
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
|
||||||
|
buff_read_finished = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(mock_buff);
|
||||||
|
if (parse_ret != PARSE_NEED_MORE_DATA)
|
||||||
|
{
|
||||||
|
logger(1, "JSON parsing failed: %s", nDPIsrvd_enum_to_string(parse_ret));
|
||||||
|
logger(1,
|
||||||
|
"Problematic JSON string (buff sock, start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
||||||
|
mock_buff->buffer.json_string_start,
|
||||||
|
mock_buff->buffer.json_string_length,
|
||||||
|
mock_buff->buffer.buf.used,
|
||||||
|
(int)mock_buff->buffer.json_string_length,
|
||||||
|
mock_buff->buffer.json_string);
|
||||||
|
THREAD_ERROR_GOTO(trv);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (buff_stats->flow_cleanup_error != 0)
|
||||||
{
|
{
|
||||||
logger(1, "%s", "Flow cleanup callback error'd");
|
logger(1, "%s", "Flow cleanup callback error'd");
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
@@ -748,6 +930,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
struct signalfd_siginfo fdsi;
|
struct signalfd_siginfo fdsi;
|
||||||
ssize_t s;
|
ssize_t s;
|
||||||
|
|
||||||
|
errno = 0;
|
||||||
s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo));
|
s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo));
|
||||||
if (s != sizeof(struct signalfd_siginfo))
|
if (s != sizeof(struct signalfd_siginfo))
|
||||||
{
|
{
|
||||||
@@ -757,6 +940,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT)
|
if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT)
|
||||||
{
|
{
|
||||||
logger(1, "Got signal %d, abort.", fdsi.ssi_signo);
|
logger(1, "Got signal %d, abort.", fdsi.ssi_signo);
|
||||||
|
errno = 0;
|
||||||
THREAD_ERROR_GOTO(trv);
|
THREAD_ERROR_GOTO(trv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -780,22 +964,54 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
|||||||
HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
|
HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
|
||||||
{
|
{
|
||||||
logger(1, "Active flow found during client distributor shutdown with id: %llu", current_flow->id_as_ull);
|
logger(1, "Active flow found during client distributor shutdown with id: %llu", current_flow->id_as_ull);
|
||||||
|
errno = 0;
|
||||||
THREAD_ERROR(trv);
|
THREAD_ERROR(trv);
|
||||||
}
|
}
|
||||||
|
|
||||||
nDPIsrvd_cleanup_instance(mock_sock, current_instance, CLEANUP_REASON_APP_SHUTDOWN);
|
nDPIsrvd_cleanup_instance(mock_sock, current_instance, CLEANUP_REASON_APP_SHUTDOWN);
|
||||||
}
|
}
|
||||||
|
HASH_ITER(hh, mock_buff->instance_table, current_instance, itmp)
|
||||||
|
{
|
||||||
|
HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
|
||||||
|
{
|
||||||
|
logger(1, "Active flow found during client distributor shutdown with id: %llu", current_flow->id_as_ull);
|
||||||
|
errno = 0;
|
||||||
|
THREAD_ERROR(trv);
|
||||||
|
}
|
||||||
|
|
||||||
drv->stats = *stats;
|
nDPIsrvd_cleanup_instance(mock_buff, current_instance, CLEANUP_REASON_APP_SHUTDOWN);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (memcmp(sock_stats, buff_stats, sizeof(*sock_stats) - sizeof(sock_stats->options)) != 0)
|
||||||
|
{
|
||||||
|
logger(1,
|
||||||
|
"Global statistics differ across different sockets! Events: %llu/%llu != %llu/%llu, Total Flows: "
|
||||||
|
"%llu/%llu + %llu != %llu/%llu + %llu",
|
||||||
|
buff_stats->total_events_serialized,
|
||||||
|
buff_stats->total_events_deserialized,
|
||||||
|
sock_stats->total_events_serialized,
|
||||||
|
sock_stats->total_events_deserialized,
|
||||||
|
buff_stats->flow_new_count,
|
||||||
|
buff_stats->flow_end_count,
|
||||||
|
buff_stats->flow_idle_count,
|
||||||
|
sock_stats->flow_new_count,
|
||||||
|
sock_stats->flow_end_count,
|
||||||
|
sock_stats->flow_idle_count);
|
||||||
|
errno = 0;
|
||||||
|
THREAD_ERROR(trv);
|
||||||
|
}
|
||||||
|
drv->stats = *sock_stats;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
del_event(dis_epollfd, signalfd);
|
del_event(dis_epollfd, signalfd);
|
||||||
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
|
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
|
||||||
|
del_event(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]);
|
||||||
del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]);
|
del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]);
|
||||||
del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]);
|
del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]);
|
||||||
close(dis_epollfd);
|
close(dis_epollfd);
|
||||||
close(signalfd);
|
close(signalfd);
|
||||||
nDPIsrvd_socket_free(&mock_sock);
|
nDPIsrvd_socket_free(&mock_sock);
|
||||||
|
nDPIsrvd_socket_free(&mock_buff);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -911,6 +1127,7 @@ int main(int argc, char ** argv)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nDPIsrvd_options.max_write_buffers = 32;
|
||||||
nDPId_options.enable_data_analysis = 1;
|
nDPId_options.enable_data_analysis = 1;
|
||||||
nDPId_options.max_packets_per_flow_to_send = 3;
|
nDPId_options.max_packets_per_flow_to_send = 3;
|
||||||
#ifdef ENABLE_ZLIB
|
#ifdef ENABLE_ZLIB
|
||||||
@@ -936,8 +1153,8 @@ int main(int argc, char ** argv)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (setup_pipe(mock_pipefds) != 0 || setup_pipe(mock_testfds) != 0 || setup_pipe(mock_nullfds) != 0 ||
|
if (setup_pipe(mock_pipefds) != 0 || setup_pipe(mock_testfds) != 0 || setup_pipe(mock_bufffds) != 0 ||
|
||||||
setup_pipe(mock_arpafds) != 0)
|
setup_pipe(mock_nullfds) != 0 || setup_pipe(mock_arpafds) != 0)
|
||||||
{
|
{
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@@ -1010,20 +1227,29 @@ int main(int argc, char ** argv)
|
|||||||
if (THREADS_RETURNED_ERROR() != 0)
|
if (THREADS_RETURNED_ERROR() != 0)
|
||||||
{
|
{
|
||||||
char const * which_thread = "Unknown";
|
char const * which_thread = "Unknown";
|
||||||
|
int thread_errno;
|
||||||
|
|
||||||
if (nDPId_return.thread_return_value.val != 0)
|
if (nDPId_return.thread_return_value.val != 0)
|
||||||
{
|
{
|
||||||
which_thread = "nDPId";
|
which_thread = "nDPId";
|
||||||
|
thread_errno = nDPId_return.thread_return_value.val;
|
||||||
}
|
}
|
||||||
else if (nDPIsrvd_return.val != 0)
|
else if (nDPIsrvd_return.val != 0)
|
||||||
{
|
{
|
||||||
which_thread = "nDPIsrvd";
|
which_thread = "nDPIsrvd";
|
||||||
|
thread_errno = nDPIsrvd_return.val;
|
||||||
}
|
}
|
||||||
else if (distributor_return.thread_return_value.val != 0)
|
else if (distributor_return.thread_return_value.val != 0)
|
||||||
{
|
{
|
||||||
which_thread = "Distributor";
|
which_thread = "Distributor";
|
||||||
|
thread_errno = distributor_return.thread_return_value.val;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger(1, "%s Thread returned a non zero value", which_thread);
|
logger(1,
|
||||||
|
"%s Thread returned a non zero value: %d (%s)",
|
||||||
|
which_thread,
|
||||||
|
thread_errno,
|
||||||
|
(thread_errno < 0 ? strerror(thread_errno) : "Application specific error"));
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1253,12 +1253,6 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
|
|||||||
{
|
{
|
||||||
if (utarray_len(additional_write_buffers) == 0)
|
if (utarray_len(additional_write_buffers) == 0)
|
||||||
{
|
{
|
||||||
#if 0
|
|
||||||
logger_nDPIsrvd(&remotes.desc[i],
|
|
||||||
"Distributor",
|
|
||||||
"buffer capacity threshold (%zu bytes) reached, caching JSON strings.",
|
|
||||||
remotes.desc[i].buf.used);
|
|
||||||
#endif
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
if (add_out_event(epollfd, &remotes.desc[i]) != 0)
|
if (add_out_event(epollfd, &remotes.desc[i]) != 0)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user