mirror of
https://github.com/optim-enterprises-bv/nDPId.git
synced 2025-11-03 11:47:49 +00:00
several fixes and improvments
- set errno to 0 if it is checked right after a libc call - ignore SIGPIPE as we want to avoid signal handling where possible - fixed another issue in nDPIsrvd/c-json-stdout which caused buffering errors Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
@@ -61,7 +61,7 @@ int main(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
buf_used += bytes_read;
|
buf_used += bytes_read;
|
||||||
while (json_bytes == 0 && buf_used >= nDPIsrvd_JSON_BYTES + 1)
|
while (buf_used >= nDPIsrvd_JSON_BYTES + 1)
|
||||||
{
|
{
|
||||||
if (buf[nDPIsrvd_JSON_BYTES] != '{')
|
if (buf[nDPIsrvd_JSON_BYTES] != '{')
|
||||||
{
|
{
|
||||||
|
|||||||
24
nDPId.c
24
nDPId.c
@@ -219,6 +219,7 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errno = 0;
|
||||||
if (access(file_or_device, R_OK) != 0 && errno == ENOENT)
|
if (access(file_or_device, R_OK) != 0 && errno == ENOENT)
|
||||||
{
|
{
|
||||||
workflow->pcap_handle = pcap_open_live(file_or_device, 65535, 1, 250, pcap_error_buffer);
|
workflow->pcap_handle = pcap_open_live(file_or_device, 65535, 1, 250, pcap_error_buffer);
|
||||||
@@ -703,8 +704,8 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
|
|||||||
#if nDPIsrvd_JSON_BYTES != 4
|
#if nDPIsrvd_JSON_BYTES != 4
|
||||||
#error "Please do not forget to change the format string if you've changed the value of nDPIsrvd_JSON_BYTES."
|
#error "Please do not forget to change the format string if you've changed the value of nDPIsrvd_JSON_BYTES."
|
||||||
#endif
|
#endif
|
||||||
s_ret = snprintf(newline_json_str, sizeof(newline_json_str),
|
s_ret =
|
||||||
"%04zu%.*s", json_str_len, (int)json_str_len, json_str);
|
snprintf(newline_json_str, sizeof(newline_json_str), "%04zu%.*s", json_str_len, (int)json_str_len, json_str);
|
||||||
if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str))
|
if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str))
|
||||||
{
|
{
|
||||||
syslog(LOG_DAEMON | LOG_ERR,
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
@@ -727,8 +728,8 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reader_thread->json_sock_reconnect == 0 &&
|
errno = 0;
|
||||||
write(reader_thread->json_sockfd, newline_json_str, s_ret) <= 0)
|
if (reader_thread->json_sock_reconnect == 0 && write(reader_thread->json_sockfd, newline_json_str, s_ret) <= 0)
|
||||||
{
|
{
|
||||||
saved_errno = errno;
|
saved_errno = errno;
|
||||||
syslog(LOG_DAEMON | LOG_ERR,
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
@@ -743,8 +744,18 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
|
|||||||
workflow->packets_captured,
|
workflow->packets_captured,
|
||||||
reader_thread->array_index);
|
reader_thread->array_index);
|
||||||
}
|
}
|
||||||
|
if (saved_errno != EAGAIN)
|
||||||
|
{
|
||||||
reader_thread->json_sock_reconnect = 1;
|
reader_thread->json_sock_reconnect = 1;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
|
"[%8llu, %d] Possible data loss detected",
|
||||||
|
workflow->packets_captured,
|
||||||
|
reader_thread->array_index);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void serialize_and_send(struct nDPId_reader_thread * const reader_thread)
|
static void serialize_and_send(struct nDPId_reader_thread * const reader_thread)
|
||||||
@@ -1679,7 +1690,8 @@ static void * processing_thread(void * const ndpi_thread_arg)
|
|||||||
{
|
{
|
||||||
syslog(LOG_DAEMON | LOG_ERR,
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
"Thread %u: Could not connect to JSON sink %s, will try again later",
|
"Thread %u: Could not connect to JSON sink %s, will try again later",
|
||||||
reader_thread->array_index, json_sockpath);
|
reader_thread->array_index,
|
||||||
|
json_sockpath);
|
||||||
}
|
}
|
||||||
run_pcap_loop(reader_thread);
|
run_pcap_loop(reader_thread);
|
||||||
reader_thread->workflow->error_or_eof = 1;
|
reader_thread->workflow->error_or_eof = 1;
|
||||||
@@ -1953,6 +1965,8 @@ int main(int argc, char ** argv)
|
|||||||
|
|
||||||
signal(SIGINT, sighandler);
|
signal(SIGINT, sighandler);
|
||||||
signal(SIGTERM, sighandler);
|
signal(SIGTERM, sighandler);
|
||||||
|
signal(SIGPIPE, SIG_IGN);
|
||||||
|
|
||||||
while (main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0)
|
while (main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0)
|
||||||
{
|
{
|
||||||
sleep(1);
|
sleep(1);
|
||||||
|
|||||||
61
nDPIsrvd.c
61
nDPIsrvd.c
@@ -23,7 +23,8 @@ enum ev_type
|
|||||||
SERV_SOCK
|
SERV_SOCK
|
||||||
};
|
};
|
||||||
|
|
||||||
struct io_buffer {
|
struct io_buffer
|
||||||
|
{
|
||||||
uint8_t * ptr;
|
uint8_t * ptr;
|
||||||
size_t used;
|
size_t used;
|
||||||
size_t max;
|
size_t max;
|
||||||
@@ -149,7 +150,7 @@ static struct remote_desc * get_unused_remote_descriptor(void)
|
|||||||
if (remotes.desc[i].fd == -1)
|
if (remotes.desc[i].fd == -1)
|
||||||
{
|
{
|
||||||
remotes.desc_used++;
|
remotes.desc_used++;
|
||||||
remotes.desc[i].buf.ptr = (uint8_t *) malloc(NETWORK_BUFFER_MAX_SIZE);
|
remotes.desc[i].buf.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE);
|
||||||
remotes.desc[i].buf.max = NETWORK_BUFFER_MAX_SIZE;
|
remotes.desc[i].buf.max = NETWORK_BUFFER_MAX_SIZE;
|
||||||
remotes.desc[i].buf.used = 0;
|
remotes.desc[i].buf.used = 0;
|
||||||
return &remotes.desc[i];
|
return &remotes.desc[i];
|
||||||
@@ -238,9 +239,12 @@ int main(int argc, char ** argv)
|
|||||||
|
|
||||||
openlog("nDPIsrvd", LOG_CONS | LOG_PERROR, LOG_DAEMON);
|
openlog("nDPIsrvd", LOG_CONS | LOG_PERROR, LOG_DAEMON);
|
||||||
|
|
||||||
if (access(json_sockpath, F_OK) == 0) {
|
if (access(json_sockpath, F_OK) == 0)
|
||||||
syslog(LOG_DAEMON | LOG_ERR, "UNIX socket %s exists; nDPIsrvd already running? "
|
{
|
||||||
"Please remove the socket manually or change socket path.", json_sockpath);
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
|
"UNIX socket %s exists; nDPIsrvd already running? "
|
||||||
|
"Please remove the socket manually or change socket path.",
|
||||||
|
json_sockpath);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -275,6 +279,7 @@ int main(int argc, char ** argv)
|
|||||||
|
|
||||||
signal(SIGINT, sighandler);
|
signal(SIGINT, sighandler);
|
||||||
signal(SIGTERM, sighandler);
|
signal(SIGTERM, sighandler);
|
||||||
|
signal(SIGPIPE, SIG_IGN);
|
||||||
|
|
||||||
int epollfd = epoll_create1(0);
|
int epollfd = epoll_create1(0);
|
||||||
if (epollfd < 0)
|
if (epollfd < 0)
|
||||||
@@ -333,8 +338,8 @@ int main(int argc, char ** argv)
|
|||||||
current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK);
|
current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK);
|
||||||
|
|
||||||
int sockfd = (current->type == JSON_SOCK ? json_sockfd : serv_sockfd);
|
int sockfd = (current->type == JSON_SOCK ? json_sockfd : serv_sockfd);
|
||||||
socklen_t peer_addr_len = (current->type == JSON_SOCK ? sizeof(current->event_json.peer)
|
socklen_t peer_addr_len =
|
||||||
: sizeof(current->event_serv.peer));
|
(current->type == JSON_SOCK ? sizeof(current->event_json.peer) : sizeof(current->event_serv.peer));
|
||||||
|
|
||||||
current->fd = accept(sockfd,
|
current->fd = accept(sockfd,
|
||||||
(current->type == JSON_SOCK ? (struct sockaddr *)¤t->event_json.peer
|
(current->type == JSON_SOCK ? (struct sockaddr *)¤t->event_json.peer
|
||||||
@@ -383,7 +388,9 @@ int main(int argc, char ** argv)
|
|||||||
if (current->type == JSON_SOCK)
|
if (current->type == JSON_SOCK)
|
||||||
{
|
{
|
||||||
shutdown(current->fd, SHUT_WR); // collector
|
shutdown(current->fd, SHUT_WR); // collector
|
||||||
} else {
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
shutdown(current->fd, SHUT_RD); // distributor
|
shutdown(current->fd, SHUT_RD); // distributor
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -419,10 +426,18 @@ int main(int argc, char ** argv)
|
|||||||
if (events[i].events & EPOLLIN && current->type == JSON_SOCK)
|
if (events[i].events & EPOLLIN && current->type == JSON_SOCK)
|
||||||
{
|
{
|
||||||
/* read JSON strings (or parts) from the UNIX socket (collecting) */
|
/* read JSON strings (or parts) from the UNIX socket (collecting) */
|
||||||
|
if (current->buf.used == current->buf.max)
|
||||||
|
{
|
||||||
|
syslog(LOG_DAEMON, "Collector read buffer full. No more read possible.");
|
||||||
|
disconnect_client(epollfd, current);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
ssize_t bytes_read =
|
ssize_t bytes_read =
|
||||||
read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used);
|
read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used);
|
||||||
if (errno == EAGAIN) {
|
if (errno == EAGAIN)
|
||||||
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (bytes_read < 0 || errno != 0)
|
if (bytes_read < 0 || errno != 0)
|
||||||
@@ -439,17 +454,19 @@ int main(int argc, char ** argv)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
current->buf.used += bytes_read;
|
current->buf.used += bytes_read;
|
||||||
while (current->event_json.json_bytes == 0 &&
|
|
||||||
current->buf.used >= nDPIsrvd_JSON_BYTES + 1)
|
while (current->buf.used >= nDPIsrvd_JSON_BYTES + 1)
|
||||||
{
|
{
|
||||||
if (current->buf.ptr[nDPIsrvd_JSON_BYTES] != '{')
|
if (current->buf.ptr[nDPIsrvd_JSON_BYTES] != '{')
|
||||||
{
|
{
|
||||||
syslog(LOG_DAEMON | LOG_ERR, "BUG: JSON invalid opening character: '%c'",
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
|
"BUG: JSON invalid opening character: '%c'",
|
||||||
current->buf.ptr[nDPIsrvd_JSON_BYTES]);
|
current->buf.ptr[nDPIsrvd_JSON_BYTES]);
|
||||||
disconnect_client(epollfd, current);
|
disconnect_client(epollfd, current);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errno = 0;
|
||||||
char * json_str_start = NULL;
|
char * json_str_start = NULL;
|
||||||
current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10);
|
current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10);
|
||||||
current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr;
|
current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr;
|
||||||
@@ -464,7 +481,8 @@ int main(int argc, char ** argv)
|
|||||||
{
|
{
|
||||||
syslog(LOG_DAEMON | LOG_ERR,
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
"BUG: Missing size before JSON string: \"%.*s\"",
|
"BUG: Missing size before JSON string: \"%.*s\"",
|
||||||
nDPIsrvd_JSON_BYTES, current->buf.ptr);
|
nDPIsrvd_JSON_BYTES,
|
||||||
|
current->buf.ptr);
|
||||||
disconnect_client(epollfd, current);
|
disconnect_client(epollfd, current);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -508,12 +526,15 @@ int main(int argc, char ** argv)
|
|||||||
}
|
}
|
||||||
|
|
||||||
memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
|
memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
|
||||||
current->buf.ptr, current->event_json.json_bytes);
|
current->buf.ptr,
|
||||||
|
current->event_json.json_bytes);
|
||||||
remotes.desc[i].buf.used += current->event_json.json_bytes;
|
remotes.desc[i].buf.used += current->event_json.json_bytes;
|
||||||
|
|
||||||
ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr,
|
errno = 0;
|
||||||
remotes.desc[i].buf.used);
|
ssize_t bytes_written =
|
||||||
if (errno == EAGAIN) {
|
write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used);
|
||||||
|
if (errno == EAGAIN)
|
||||||
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (bytes_written < 0 || errno != 0)
|
if (bytes_written < 0 || errno != 0)
|
||||||
@@ -526,8 +547,7 @@ int main(int argc, char ** argv)
|
|||||||
}
|
}
|
||||||
if (bytes_written == 0)
|
if (bytes_written == 0)
|
||||||
{
|
{
|
||||||
syslog(LOG_DAEMON,
|
syslog(LOG_DAEMON, "Distributor connection closed during write");
|
||||||
"Distributor connection closed during write");
|
|
||||||
disconnect_client(epollfd, &remotes.desc[i]);
|
disconnect_client(epollfd, &remotes.desc[i]);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -535,7 +555,8 @@ int main(int argc, char ** argv)
|
|||||||
{
|
{
|
||||||
syslog(LOG_DAEMON,
|
syslog(LOG_DAEMON,
|
||||||
"Distributor connection wrote less bytes than expected: %zd < %zu",
|
"Distributor connection wrote less bytes than expected: %zd < %zu",
|
||||||
bytes_written, remotes.desc[i].buf.used);
|
bytes_written,
|
||||||
|
remotes.desc[i].buf.used);
|
||||||
disconnect_client(epollfd, &remotes.desc[i]);
|
disconnect_client(epollfd, &remotes.desc[i]);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user