mirror of
https://github.com/optim-enterprises-bv/nDPId.git
synced 2025-10-30 17:57:48 +00:00
nDPIsrvd.(h|py): Added socket read/recv timeout.
* nDPIsrvd.h: support for O_NONBLOCK nDPIsrvd_socket Signed-off-by: lns <matzeton@googlemail.com>
This commit is contained in:
50
dependencies/nDPIsrvd.h
vendored
50
dependencies/nDPIsrvd.h
vendored
@@ -4,6 +4,7 @@
|
||||
#include <arpa/inet.h>
|
||||
#include <ctype.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
@@ -53,6 +54,7 @@ enum nDPIsrvd_read_return
|
||||
{
|
||||
READ_OK = CONNECT_LAST_ENUM_VALUE,
|
||||
READ_PEER_DISCONNECT,
|
||||
READ_TIMEOUT,
|
||||
READ_ERROR, /* check for errno */
|
||||
|
||||
READ_LAST_ENUM_VALUE
|
||||
@@ -216,6 +218,7 @@ struct nDPIsrvd_jsmn
|
||||
struct nDPIsrvd_socket
|
||||
{
|
||||
int fd;
|
||||
struct timeval read_timeout;
|
||||
struct nDPIsrvd_address address;
|
||||
|
||||
size_t instance_user_data_size;
|
||||
@@ -324,6 +327,7 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value)
|
||||
|
||||
"READ_OK",
|
||||
"READ_PEER_DISCONNECT",
|
||||
"READ_TIMEOUT",
|
||||
"READ_ERROR",
|
||||
|
||||
"PARSE_OK",
|
||||
@@ -429,6 +433,9 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d
|
||||
if (sock != NULL)
|
||||
{
|
||||
sock->fd = -1;
|
||||
sock->read_timeout.tv_sec = 0;
|
||||
sock->read_timeout.tv_usec = 0;
|
||||
|
||||
if (nDPIsrvd_json_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0)
|
||||
{
|
||||
goto error;
|
||||
@@ -460,6 +467,45 @@ error:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static inline int nDPIsrvd_set_read_timeout(struct nDPIsrvd_socket * const sock,
|
||||
time_t seconds,
|
||||
suseconds_t micro_seconds)
|
||||
{
|
||||
struct timeval tv = {.tv_sec = seconds, .tv_usec = micro_seconds};
|
||||
|
||||
if (sock->fd < 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (setsockopt(sock->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
sock->read_timeout = tv;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int nDPIsrvd_set_nonblock(struct nDPIsrvd_socket * const sock)
|
||||
{
|
||||
int flags;
|
||||
|
||||
if (sock->fd < 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
flags = fcntl(sock->fd, F_GETFL, 0);
|
||||
if (flags == -1)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
return (fcntl(sock->fd, F_SETFL, flags | O_NONBLOCK) != 0);
|
||||
}
|
||||
|
||||
static inline void nDPIsrvd_cleanup_flow(struct nDPIsrvd_socket * const sock,
|
||||
struct nDPIsrvd_instance * const instance,
|
||||
struct nDPIsrvd_thread_data * const thread_data,
|
||||
@@ -675,6 +721,10 @@ static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * c
|
||||
}
|
||||
if (bytes_read < 0)
|
||||
{
|
||||
if (errno == EAGAIN)
|
||||
{
|
||||
return READ_TIMEOUT;
|
||||
}
|
||||
return READ_ERROR;
|
||||
}
|
||||
|
||||
|
||||
4
dependencies/nDPIsrvd.py
vendored
4
dependencies/nDPIsrvd.py
vendored
@@ -339,6 +339,9 @@ class nDPIsrvdSocket:
|
||||
self.digitlen = 0
|
||||
self.lines = []
|
||||
|
||||
def timeout(self, timeout):
|
||||
self.sock.settimeout(timeout)
|
||||
|
||||
def receive(self):
|
||||
if len(self.buffer) == NETWORK_BUFFER_MAX_SIZE:
|
||||
raise BufferCapacityReached(len(self.buffer), NETWORK_BUFFER_MAX_SIZE)
|
||||
@@ -349,6 +352,7 @@ class nDPIsrvdSocket:
|
||||
except ConnectionResetError:
|
||||
connection_finished = True
|
||||
recvd = bytes()
|
||||
|
||||
if len(recvd) == 0:
|
||||
connection_finished = True
|
||||
|
||||
|
||||
@@ -44,7 +44,6 @@ static struct
|
||||
uint64_t flow_detection_update_count;
|
||||
uint64_t flow_not_detected_count;
|
||||
|
||||
uint64_t flow_packet_count;
|
||||
uint64_t flow_total_bytes;
|
||||
uint64_t flow_risky_count;
|
||||
|
||||
@@ -256,7 +255,7 @@ static void print_collectd_exec_output(void)
|
||||
printf(COLLECTD_PUTVAL_N_FORMAT(flow_new_count) COLLECTD_PUTVAL_N_FORMAT(flow_end_count)
|
||||
COLLECTD_PUTVAL_N_FORMAT(flow_idle_count) COLLECTD_PUTVAL_N_FORMAT(flow_guessed_count)
|
||||
COLLECTD_PUTVAL_N_FORMAT(flow_detected_count) COLLECTD_PUTVAL_N_FORMAT(flow_detection_update_count)
|
||||
COLLECTD_PUTVAL_N_FORMAT(flow_not_detected_count) COLLECTD_PUTVAL_N_FORMAT(flow_packet_count)
|
||||
COLLECTD_PUTVAL_N_FORMAT(flow_not_detected_count)
|
||||
COLLECTD_PUTVAL_N_FORMAT(flow_total_bytes) COLLECTD_PUTVAL_N_FORMAT(flow_risky_count),
|
||||
|
||||
COLLECTD_PUTVAL_N(flow_new_count),
|
||||
@@ -266,7 +265,6 @@ static void print_collectd_exec_output(void)
|
||||
COLLECTD_PUTVAL_N(flow_detected_count),
|
||||
COLLECTD_PUTVAL_N(flow_detection_update_count),
|
||||
COLLECTD_PUTVAL_N(flow_not_detected_count),
|
||||
COLLECTD_PUTVAL_N(flow_packet_count),
|
||||
COLLECTD_PUTVAL_N(flow_total_bytes),
|
||||
COLLECTD_PUTVAL_N(flow_risky_count));
|
||||
|
||||
@@ -428,7 +426,7 @@ static uint64_t get_total_flow_bytes(struct nDPIsrvd_socket * const sock)
|
||||
{
|
||||
nDPIsrvd_ull total_bytes_ull = 0;
|
||||
|
||||
if (TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_tot_l4_data_len"), &total_bytes_ull) == CONVERSION_OK)
|
||||
if (TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_tot_l4_payload_len"), &total_bytes_ull) == CONVERSION_OK)
|
||||
{
|
||||
return total_bytes_ull;
|
||||
}
|
||||
@@ -670,11 +668,6 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
|
||||
collectd_statistics.flow_not_detected_count++;
|
||||
}
|
||||
|
||||
if (TOKEN_GET_SZ(sock, "packet_event_name") != NULL)
|
||||
{
|
||||
collectd_statistics.flow_packet_count++;
|
||||
}
|
||||
|
||||
return CALLBACK_OK;
|
||||
}
|
||||
|
||||
@@ -716,6 +709,13 @@ int main(int argc, char ** argv)
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (nDPIsrvd_set_nonblock(sock) != 0)
|
||||
{
|
||||
LOG(LOG_DAEMON | LOG_ERR, "nDPIsrvd set nonblock failed: %s", strerror(errno));
|
||||
nDPIsrvd_socket_free(&sock);
|
||||
return 1;
|
||||
}
|
||||
|
||||
signal(SIGINT, sighandler);
|
||||
signal(SIGTERM, sighandler);
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
@@ -11,7 +11,6 @@ flow_detection_update_count value:GAUGE:0:U
|
||||
flow_not_detected_count value:GAUGE:0:U
|
||||
|
||||
# flow additional counters
|
||||
flow_packet_count value:GAUGE:0:U
|
||||
flow_total_bytes value:GAUGE:0:U
|
||||
flow_risky_count value:GAUGE:0:U
|
||||
|
||||
|
||||
@@ -159,8 +159,7 @@ static void simple_flow_cleanup_callback(struct nDPIsrvd_socket * const sock,
|
||||
|
||||
if (reason == CLEANUP_REASON_FLOW_TIMEOUT)
|
||||
{
|
||||
fprintf(stderr, "Flow timeout occurred, something really bad happened.\n");
|
||||
exit(1);
|
||||
fprintf(stderr, "Flow %llu timeouted.\n", flow->id_as_ull);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,9 +187,27 @@ int main(int argc, char ** argv)
|
||||
return 1;
|
||||
}
|
||||
|
||||
enum nDPIsrvd_read_return read_ret;
|
||||
while (main_thread_shutdown == 0 && (read_ret = nDPIsrvd_read(sock)) == READ_OK)
|
||||
if (nDPIsrvd_set_read_timeout(sock, 3, 0) != 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
enum nDPIsrvd_read_return read_ret;
|
||||
while (main_thread_shutdown == 0)
|
||||
{
|
||||
read_ret = nDPIsrvd_read(sock);
|
||||
if (read_ret == READ_TIMEOUT)
|
||||
{
|
||||
printf("No data received during the last %llu second(s).\n",
|
||||
(long long unsigned int)sock->read_timeout.tv_sec);
|
||||
continue;
|
||||
}
|
||||
if (read_ret != READ_OK)
|
||||
{
|
||||
main_thread_shutdown = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(sock);
|
||||
if (parse_ret != PARSE_NEED_MORE_DATA)
|
||||
{
|
||||
|
||||
@@ -426,8 +426,16 @@ if __name__ == '__main__':
|
||||
|
||||
nsock = nDPIsrvdSocket()
|
||||
nsock.connect(address)
|
||||
nsock.timeout(1.0)
|
||||
stats = Stats(nsock)
|
||||
try:
|
||||
nsock.loop(onJsonLineRecvd, onFlowCleanup, stats)
|
||||
except KeyboardInterrupt:
|
||||
print('\n\nKeyboard Interrupt: cleaned up {} flows.'.format(len(nsock.shutdown())))
|
||||
|
||||
while True:
|
||||
try:
|
||||
nsock.loop(onJsonLineRecvd, onFlowCleanup, stats)
|
||||
except KeyboardInterrupt:
|
||||
print('\n\nKeyboard Interrupt: cleaned up {} flows.'.format(len(nsock.shutdown())))
|
||||
break
|
||||
except TimeoutError:
|
||||
stats.updateSpinner()
|
||||
stats.resetStatus()
|
||||
stats.printStatus()
|
||||
|
||||
@@ -626,6 +626,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
||||
case READ_ERROR:
|
||||
logger(1, "Read and verify fd returned an error: %s", strerror(errno));
|
||||
THREAD_ERROR_GOTO(trv);
|
||||
case READ_TIMEOUT:
|
||||
case READ_PEER_DISCONNECT:
|
||||
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
|
||||
pipe_read_finished = 1;
|
||||
|
||||
Reference in New Issue
Block a user