Added nDPIsrvd TCP/IP support for distributors.

* nDPIsrvd: Improved distributor client disconnect detection
 * nDPIsrvd: Fixed invalid usage of epoll_add instead of epoll_mod
 * nPDIsrvd: Improved logging for distributor clients

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2022-03-10 13:51:21 +01:00
parent 6f1f9e65ea
commit 41757ecf1c
2 changed files with 444 additions and 262 deletions

View File

@@ -153,7 +153,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(arg);
}
mock_json_desc = get_unused_remote_descriptor(JSON_SOCK, mock_pipefds[PIPE_nDPIsrvd], NETWORK_BUFFER_MAX_SIZE);
mock_json_desc = get_unused_remote_descriptor(COLLECTOR_UN, mock_pipefds[PIPE_nDPIsrvd], NETWORK_BUFFER_MAX_SIZE);
if (mock_json_desc == NULL)
{
logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (Collector)");
@@ -161,28 +161,23 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
}
mock_test_desc =
get_unused_remote_descriptor(SERV_SOCK, mock_testfds[PIPE_TEST_WRITE], NETWORK_BUFFER_MAX_SIZE / 4);
get_unused_remote_descriptor(DISTRIBUTOR_UN, mock_testfds[PIPE_TEST_WRITE], NETWORK_BUFFER_MAX_SIZE / 4);
if (mock_test_desc == NULL)
{
logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (TEST Distributor)");
THREAD_ERROR_GOTO(arg);
}
mock_null_desc = get_unused_remote_descriptor(SERV_SOCK, mock_nullfds[PIPE_NULL_WRITE], NETWORK_BUFFER_MAX_SIZE);
mock_null_desc =
get_unused_remote_descriptor(DISTRIBUTOR_UN, mock_nullfds[PIPE_NULL_WRITE], NETWORK_BUFFER_MAX_SIZE);
if (mock_null_desc == NULL)
{
logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (NULL Distributor)");
THREAD_ERROR_GOTO(arg);
}
strncpy(mock_test_desc->event_serv.peer_addr, "0.0.0.0", sizeof(mock_test_desc->event_serv.peer_addr));
mock_test_desc->event_serv.peer.sin_port = 0;
strncpy(mock_null_desc->event_serv.peer_addr, "0.0.0.0", sizeof(mock_null_desc->event_serv.peer_addr));
mock_null_desc->event_serv.peer.sin_port = 0;
if (add_in_event(epollfd, mock_pipefds[PIPE_nDPIsrvd], mock_json_desc) != 0 ||
add_in_event(epollfd, mock_testfds[PIPE_TEST_WRITE], mock_test_desc) != 0 ||
add_in_event(epollfd, mock_nullfds[PIPE_NULL_WRITE], mock_null_desc) != 0)
if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 ||
add_in_event(epollfd, mock_null_desc) != 0)
{
logger(1, "%s", "nDPIsrvd add input event failed");
THREAD_ERROR_GOTO(arg);
@@ -201,7 +196,8 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
for (int i = 0; i < nready; i++)
{
if (events[i].data.ptr == mock_json_desc)
if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc ||
events[i].data.ptr == mock_null_desc)
{
if (handle_data_event(epollfd, &events[i]) != 0)
{
@@ -496,12 +492,12 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv);
}
if (add_in_event(dis_epollfd, mock_testfds[PIPE_TEST_READ], NULL) != 0)
if (add_in_event_fd(dis_epollfd, mock_testfds[PIPE_TEST_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
if (add_in_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ], NULL) != 0)
if (add_in_event_fd(dis_epollfd, mock_nullfds[PIPE_NULL_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
@@ -548,7 +544,8 @@ static void * distributor_client_mainloop_thread(void * const arg)
if (parse_ret != PARSE_NEED_MORE_DATA)
{
logger(1, "JSON parsing failed: %s", nDPIsrvd_enum_to_string(parse_ret));
logger(1, "Problematic JSON string (start: %zu, length: %llu, buffer usage: %zu): %.*s",
logger(1,
"Problematic JSON string (start: %zu, length: %llu, buffer usage: %zu): %.*s",
mock_sock->buffer.json_string_start,
mock_sock->buffer.json_string_length,
mock_sock->buffer.used,
@@ -767,8 +764,9 @@ int main(int argc, char ** argv)
}
/* We do not have any sockets, any socket operation must fail! */
json_sockfd = -1;
serv_sockfd = -1;
collector_un_sockfd = -1;
distributor_un_sockfd = -1;
distributor_in_sockfd = -1;
if (setup_remote_descriptors(MAX_REMOTE_DESCRIPTORS) != 0)
{
@@ -910,13 +908,22 @@ int main(int argc, char ** argv)
if (nDPId_return.cur_active_flows != 0 || nDPId_return.cur_idle_flows != 0)
{
logger(1, "%s: %s [%llu / %llu]", argv[0], "Active / Idle inconsistency detected.", nDPId_return.cur_active_flows, nDPId_return.cur_idle_flows);
logger(1,
"%s: %s [%llu / %llu]",
argv[0],
"Active / Idle inconsistency detected.",
nDPId_return.cur_active_flows,
nDPId_return.cur_idle_flows);
return 1;
}
if (nDPId_return.total_skipped_flows != 0)
{
logger(1, "%s: %s [%llu]", argv[0], "Skipped flow detected, that should not happen.", nDPId_return.total_skipped_flows);
logger(1,
"%s: %s [%llu]",
argv[0],
"Skipped flow detected, that should not happen.",
nDPId_return.total_skipped_flows);
return 1;
}

File diff suppressed because it is too large Load Diff