mirror of
https://github.com/optim-enterprises-bv/nDPId.git
synced 2025-10-30 17:57:48 +00:00
nDPId: Fixed fcntl() issue; invalid fcntl() set after a blocking-write.
* nDPId: imrpvoed collector socket error messages on connect/write/etc failures * reverted `netcat` parts of the README Signed-off-by: lns <matzeton@googlemail.com>
This commit is contained in:
12
README.md
12
README.md
@@ -1,5 +1,5 @@
|
|||||||
[](https://github.com/utoni/nDPId/actions/workflows/build.yml)
|
[](https://github.com/utoni/nDPId/actions/workflows/build.yml)
|
||||||
[](https://gitlab.com/utoni/nDPId/-/pipelines)
|
[](https://gitlab.com/utoni/nDPId/-/pipelines)
|
||||||
|
|
||||||
# Abstract
|
# Abstract
|
||||||
|
|
||||||
@@ -183,22 +183,26 @@ The CMake cache variable `-DBUILD_NDPI=ON` builds a version of `libnDPI` residin
|
|||||||
|
|
||||||
As mentioned above, in order to run `nDPId` a UNIX-socket need to be provided in order to stream our related JSON-data.
|
As mentioned above, in order to run `nDPId` a UNIX-socket need to be provided in order to stream our related JSON-data.
|
||||||
|
|
||||||
Such a UNIX-socket can be provided by both the included `nDPIsrvd` daemon, or, if you simply need a quick check, with the `netcat` utility (preferably `openbsd-netcat`), with a simple `netcat -U /tmp/listen.sock -l -k`.
|
Such a UNIX-socket can be provided by both the included `nDPIsrvd` daemon, or, if you simply need a quick check, with the [ncat](https://nmap.org/book/ncat-man.html) utility, with a simple `ncat -U /tmp/listen.sock -l -k`. Remember that OpenBSD `netcat` is not able to handle multiple connections reliably.
|
||||||
|
|
||||||
Once the socket is ready, you can run `nDPId` capturing and analyzing your own traffic, with something similar to:
|
Once the socket is ready, you can run `nDPId` capturing and analyzing your own traffic, with something similar to:
|
||||||
|
|
||||||
Of course, both `netcat` and `nDPId` need to point to the same UNIX-socket (`nDPId` provides the `-c` option, exactly for this. As a default, `nDPId` refer to `/tmp/ndpid-collector.sock`, and the same default-path is also used by `nDPIsrvd` as for the incoming socket)
|
Of course, both `ncat` and `nDPId` need to point to the same UNIX-socket (`nDPId` provides the `-c` option, exactly for this. As a default, `nDPId` refer to `/tmp/ndpid-collector.sock`, and the same default-path is also used by `nDPIsrvd` as for the incoming socket).
|
||||||
|
|
||||||
You also need to provide `nDPId` some real-traffic. You can capture your own traffic, with something similar to:
|
You also need to provide `nDPId` some real-traffic. You can capture your own traffic, with something similar to:
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
|
ncat -U /tmp/listen.sock -l -k
|
||||||
|
#socat UNIX-Listen:/tmp/listen.sock,fork - # does the same as `ncat`
|
||||||
|
sudo chown nobody:nobody /tmp/listen.sock # default `nDPId` user/group, see `-u` and `-g`
|
||||||
sudo ./nDPId -c /tmp/listen.sock -l
|
sudo ./nDPId -c /tmp/listen.sock -l
|
||||||
```
|
```
|
||||||
|
|
||||||
`nDPId` supports also UDP collector endpoints:
|
`nDPId` supports also UDP collector endpoints:
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
netcat -u 127.0.0.1 7000 -l -k
|
ncat -u 127.0.0.1 7000 -l -k
|
||||||
|
#socat UDP-Listen:7000,fork - # does the same as `ncat`
|
||||||
sudo ./nDPId -c 127.0.0.1:7000 -l
|
sudo ./nDPId -c 127.0.0.1:7000 -l
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -793,7 +793,7 @@ static void * nDPId_mainloop_thread(void * const arg)
|
|||||||
|
|
||||||
/* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */
|
/* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */
|
||||||
reader_threads[0].collector_sockfd = mock_pipefds[PIPE_nDPId];
|
reader_threads[0].collector_sockfd = mock_pipefds[PIPE_nDPId];
|
||||||
reader_threads[0].collector_sock_reconnect = 0;
|
reader_threads[0].collector_sock_last_errno = 0;
|
||||||
|
|
||||||
pthread_mutex_lock(&nDPId_start_mutex);
|
pthread_mutex_lock(&nDPId_start_mutex);
|
||||||
|
|
||||||
|
|||||||
155
nDPId.c
155
nDPId.c
@@ -247,7 +247,7 @@ struct nDPId_reader_thread
|
|||||||
struct nDPId_workflow * workflow;
|
struct nDPId_workflow * workflow;
|
||||||
pthread_t thread_id;
|
pthread_t thread_id;
|
||||||
int collector_sockfd;
|
int collector_sockfd;
|
||||||
int collector_sock_reconnect;
|
int collector_sock_last_errno;
|
||||||
size_t array_index;
|
size_t array_index;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -504,6 +504,44 @@ static void jsonize_flow_detection_event(struct nDPId_reader_thread * const read
|
|||||||
struct nDPId_flow * const flow,
|
struct nDPId_flow * const flow,
|
||||||
enum flow_event event);
|
enum flow_event event);
|
||||||
|
|
||||||
|
static int set_collector_nonblock(struct nDPId_reader_thread * const reader_thread)
|
||||||
|
{
|
||||||
|
int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0);
|
||||||
|
|
||||||
|
if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags | O_NONBLOCK) == -1)
|
||||||
|
{
|
||||||
|
reader_thread->collector_sock_last_errno = errno;
|
||||||
|
logger(1,
|
||||||
|
"[%8llu, %zu] Could not set collector fd %d to non-blocking mode: %s",
|
||||||
|
reader_thread->workflow->packets_processed,
|
||||||
|
reader_thread->thread_id,
|
||||||
|
reader_thread->collector_sockfd,
|
||||||
|
strerror(errno));
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int set_collector_block(struct nDPId_reader_thread * const reader_thread)
|
||||||
|
{
|
||||||
|
int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0);
|
||||||
|
|
||||||
|
if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags & ~O_NONBLOCK) == -1)
|
||||||
|
{
|
||||||
|
reader_thread->collector_sock_last_errno = errno;
|
||||||
|
logger(1,
|
||||||
|
"[%8llu, %zu] Could not set collector fd %d to blocking mode: %s",
|
||||||
|
reader_thread->workflow->packets_processed,
|
||||||
|
reader_thread->thread_id,
|
||||||
|
reader_thread->collector_sockfd,
|
||||||
|
strerror(errno));
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef ENABLE_ZLIB
|
#ifdef ENABLE_ZLIB
|
||||||
static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen)
|
static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen)
|
||||||
{
|
{
|
||||||
@@ -1942,36 +1980,34 @@ static int connect_to_collector(struct nDPId_reader_thread * const reader_thread
|
|||||||
reader_thread->collector_sockfd = socket(collector_address.raw.sa_family, sock_type | SOCK_CLOEXEC, 0);
|
reader_thread->collector_sockfd = socket(collector_address.raw.sa_family, sock_type | SOCK_CLOEXEC, 0);
|
||||||
if (reader_thread->collector_sockfd < 0)
|
if (reader_thread->collector_sockfd < 0)
|
||||||
{
|
{
|
||||||
reader_thread->collector_sock_reconnect = 1;
|
reader_thread->collector_sock_last_errno = errno;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int opt = NETWORK_BUFFER_MAX_SIZE * 16;
|
int opt = NETWORK_BUFFER_MAX_SIZE;
|
||||||
if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0)
|
if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0)
|
||||||
{
|
{
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (set_collector_nonblock(reader_thread) != 0)
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0)
|
if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0)
|
||||||
{
|
{
|
||||||
reader_thread->collector_sock_reconnect = 1;
|
reader_thread->collector_sock_last_errno = errno;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0)
|
if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0)
|
||||||
{
|
{
|
||||||
|
reader_thread->collector_sock_last_errno = errno;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fcntl(reader_thread->collector_sockfd,
|
reader_thread->collector_sock_last_errno = 0;
|
||||||
F_SETFL,
|
|
||||||
fcntl(reader_thread->collector_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1)
|
|
||||||
{
|
|
||||||
reader_thread->collector_sock_reconnect = 1;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
reader_thread->collector_sock_reconnect = 0;
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -2003,26 +2039,46 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reader_thread->collector_sock_reconnect != 0)
|
if (reader_thread->collector_sock_last_errno != 0)
|
||||||
{
|
{
|
||||||
|
saved_errno = reader_thread->collector_sock_last_errno;
|
||||||
|
|
||||||
if (connect_to_collector(reader_thread) == 0)
|
if (connect_to_collector(reader_thread) == 0)
|
||||||
{
|
{
|
||||||
logger(1,
|
if (collector_address.raw.sa_family == AF_UNIX)
|
||||||
"[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s",
|
{
|
||||||
workflow->packets_captured,
|
logger(1,
|
||||||
reader_thread->array_index,
|
"[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s",
|
||||||
nDPId_options.collector_address);
|
workflow->packets_captured,
|
||||||
jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT);
|
reader_thread->array_index,
|
||||||
|
nDPId_options.collector_address);
|
||||||
|
jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (saved_errno != reader_thread->collector_sock_last_errno)
|
||||||
|
{
|
||||||
|
logger(1,
|
||||||
|
"[%8llu, %zu] Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s",
|
||||||
|
workflow->packets_captured,
|
||||||
|
reader_thread->array_index,
|
||||||
|
nDPId_options.collector_address,
|
||||||
|
(reader_thread->collector_sock_last_errno != 0
|
||||||
|
? strerror(reader_thread->collector_sock_last_errno)
|
||||||
|
: "Internal Error."));
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
ssize_t written;
|
ssize_t written;
|
||||||
if (reader_thread->collector_sock_reconnect == 0 &&
|
if (reader_thread->collector_sock_last_errno == 0 &&
|
||||||
(written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret)
|
(written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret)
|
||||||
{
|
{
|
||||||
saved_errno = errno;
|
saved_errno = errno;
|
||||||
if (saved_errno == EPIPE)
|
if (saved_errno == EPIPE || written == 0)
|
||||||
{
|
{
|
||||||
logger(1,
|
logger(1,
|
||||||
"[%8llu, %zu] Lost connection to nDPIsrvd Collector",
|
"[%8llu, %zu] Lost connection to nDPIsrvd Collector",
|
||||||
@@ -2040,21 +2096,32 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
|||||||
(collector_address.raw.sa_family == AF_UNIX ? "Connection" : "Datagram"),
|
(collector_address.raw.sa_family == AF_UNIX ? "Connection" : "Datagram"),
|
||||||
nDPId_options.collector_address);
|
nDPId_options.collector_address);
|
||||||
}
|
}
|
||||||
if (collector_address.raw.sa_family == AF_UNIX)
|
reader_thread->collector_sock_last_errno = saved_errno;
|
||||||
{
|
|
||||||
reader_thread->collector_sock_reconnect = 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else if (collector_address.raw.sa_family == AF_UNIX)
|
||||||
{
|
{
|
||||||
fcntl(reader_thread->collector_sockfd,
|
|
||||||
F_SETFL,
|
|
||||||
fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
|
|
||||||
off_t pos = (written < 0 ? 0 : written);
|
off_t pos = (written < 0 ? 0 : written);
|
||||||
|
logger(0,
|
||||||
|
"[%8llu, %zu] Send less data then expected (%zd < %d bytes), falling back to blocking I/O",
|
||||||
|
workflow->packets_captured,
|
||||||
|
reader_thread->array_index,
|
||||||
|
pos,
|
||||||
|
s_ret);
|
||||||
|
set_collector_block(reader_thread);
|
||||||
while ((written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) !=
|
while ((written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) !=
|
||||||
s_ret - pos)
|
s_ret - pos)
|
||||||
{
|
{
|
||||||
if (written < 0)
|
saved_errno = errno;
|
||||||
|
if (saved_errno == EPIPE || written == 0)
|
||||||
|
{
|
||||||
|
logger(1,
|
||||||
|
"[%8llu, %zu] Lost connection to nDPIsrvd Collector",
|
||||||
|
workflow->packets_captured,
|
||||||
|
reader_thread->array_index);
|
||||||
|
reader_thread->collector_sock_last_errno = saved_errno;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (written < 0)
|
||||||
{
|
{
|
||||||
logger(1,
|
logger(1,
|
||||||
"[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector at %s failed: %s",
|
"[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector at %s failed: %s",
|
||||||
@@ -2062,10 +2129,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
|||||||
reader_thread->array_index,
|
reader_thread->array_index,
|
||||||
nDPId_options.collector_address,
|
nDPId_options.collector_address,
|
||||||
strerror(saved_errno));
|
strerror(saved_errno));
|
||||||
if (collector_address.raw.sa_family == AF_UNIX)
|
reader_thread->collector_sock_last_errno = saved_errno;
|
||||||
{
|
|
||||||
reader_thread->collector_sock_reconnect = 1;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -2073,9 +2137,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
|||||||
pos += written;
|
pos += written;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fcntl(reader_thread->collector_sockfd,
|
set_collector_nonblock(reader_thread);
|
||||||
F_SETFL,
|
|
||||||
fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & O_NONBLOCK);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -4031,16 +4093,15 @@ static void * processing_thread(void * const ndpi_thread_arg)
|
|||||||
struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg;
|
struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg;
|
||||||
|
|
||||||
reader_thread->collector_sockfd = -1;
|
reader_thread->collector_sockfd = -1;
|
||||||
reader_thread->collector_sock_reconnect = 1;
|
|
||||||
|
|
||||||
errno = 0;
|
|
||||||
if (connect_to_collector(reader_thread) != 0)
|
if (connect_to_collector(reader_thread) != 0)
|
||||||
{
|
{
|
||||||
logger(1,
|
logger(1,
|
||||||
"Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s",
|
"Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s",
|
||||||
reader_thread->array_index,
|
reader_thread->array_index,
|
||||||
nDPId_options.collector_address,
|
nDPId_options.collector_address,
|
||||||
(errno != 0 ? strerror(errno) : "Internal Error."));
|
(reader_thread->collector_sock_last_errno != 0 ? strerror(reader_thread->collector_sock_last_errno)
|
||||||
|
: "Internal Error."));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -4048,7 +4109,7 @@ static void * processing_thread(void * const ndpi_thread_arg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
run_pcap_loop(reader_thread);
|
run_pcap_loop(reader_thread);
|
||||||
fcntl(reader_thread->collector_sockfd, F_SETFL, fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
|
set_collector_block(reader_thread);
|
||||||
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
|
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -4171,15 +4232,7 @@ static void process_remaining_flows(void)
|
|||||||
{
|
{
|
||||||
for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i)
|
for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i)
|
||||||
{
|
{
|
||||||
if (fcntl(reader_threads[i].collector_sockfd,
|
set_collector_block(&reader_threads[i]);
|
||||||
F_SETFL,
|
|
||||||
fcntl(reader_threads[i].collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
|
|
||||||
{
|
|
||||||
logger(1,
|
|
||||||
"Could not set JSON fd %d to blocking mode for shutdown: %s",
|
|
||||||
reader_threads[i].collector_sockfd,
|
|
||||||
strerror(errno));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows;
|
for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows;
|
||||||
++idle_scan_index)
|
++idle_scan_index)
|
||||||
|
|||||||
Reference in New Issue
Block a user