nDPId-test: added JSON distribution + JSON parsing (Multithreaded design re-using most of nDPId/nDPIsrvd core)

* improved Makefile.old install targets
 * splitted nDPIsrvd_parse into nDPIsrvd_parse_line and nDPIsrvd_parse_all for the sake of readability
 * minor Python script improvments (check for nDPIsrvd.py on multiple locations, may be superseeded by setuptools in the future)
 * some paths needs to be absolute (chdir() during daemonize) and therefor additional checks introduced
 * test run script checks and fails if certain files are are missing (PCAP file <=> result output file)
 * removed not very useful "internal format error" JSON serialization if a BUG for same exists
 * fixed invalid l4 type statistics counters for nDPIsrvd-collectd

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2021-04-08 20:33:25 +02:00
parent e576162a43
commit 0a7ad7a76a
15 changed files with 533 additions and 270 deletions

View File

@@ -65,7 +65,7 @@ RM = rm -f
INSTALL = install INSTALL = install
ifeq ($(ENABLE_DEBUG),yes) ifeq ($(ENABLE_DEBUG),yes)
INSTALL_ARGS = -s INSTALL_ARGS_STRIP = -s
endif endif
all: help nDPId nDPIsrvd nDPId-test examples all: help nDPId nDPIsrvd nDPId-test examples
@@ -99,16 +99,20 @@ else
endif endif
install: all install: all
$(INSTALL) -d '$(DESTDIR)$(PREFIX)/bin' '$(DESTDIR)$(PREFIX)/sbin' $(INSTALL) -d \
$(INSTALL) $(INSTALL_ARGS) ./nDPId-test '$(DESTDIR)$(PREFIX)/bin' '$(DESTDIR)$(PREFIX)/bin' \
$(INSTALL) $(INSTALL_ARGS) ./nDPIsrvd '$(DESTDIR)$(PREFIX)/bin' '$(DESTDIR)$(PREFIX)/sbin' \
$(INSTALL) $(INSTALL_ARGS) ./nDPId '$(DESTDIR)$(PREFIX)/sbin' '$(DESTDIR)$(PREFIX)/share/nDPId'
$(INSTALL) $(INSTALL_ARGS) ./examples/c-captured/c-captured '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-captured' $(INSTALL) $(INSTALL_ARGS_STRIP) ./nDPId-test '$(DESTDIR)$(PREFIX)/bin'
$(INSTALL) $(INSTALL_ARGS) ./examples/c-json-stdout/c-json-stdout '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-json-dump' $(INSTALL) $(INSTALL_ARGS_STRIP) ./nDPIsrvd '$(DESTDIR)$(PREFIX)/bin'
$(INSTALL) $(INSTALL_ARGS) ./examples/c-collectd/c-collectd '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-collectd' $(INSTALL) $(INSTALL_ARGS_STRIP) ./nDPId '$(DESTDIR)$(PREFIX)/sbin'
$(INSTALL) $(INSTALL_ARGS) ./examples/py-flow-info/flow-info.py '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-flow-info.py' $(INSTALL) $(INSTALL_ARGS_STRIP) ./examples/c-captured/c-captured '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-captured'
$(INSTALL) $(INSTALL_ARGS_STRIP) ./examples/c-json-stdout/c-json-stdout '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-json-dump'
$(INSTALL) $(INSTALL_ARGS_STRIP) ./examples/c-collectd/c-collectd '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-collectd'
$(INSTALL) ./dependencies/nDPIsrvd.py '$(DESTDIR)$(PREFIX)/share/nDPId/nDPIsrvd.py'
$(INSTALL) ./examples/py-flow-info/flow-info.py '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-flow-info.py'
ifneq ($(GOCC),) ifneq ($(GOCC),)
$(INSTALL) $(INSTALL_ARGS) -t '$(DESTDIR)$(PREFIX)/bin' examples/go-dashboard/go-dashboard $(INSTALL) $(INSTALL_ARGS_STRIP) -t '$(DESTDIR)$(PREFIX)/bin' examples/go-dashboard/go-dashboard
endif endif
clean: clean:

View File

@@ -24,8 +24,10 @@
#define nDPIsrvd_STRLEN_SZ(s) (sizeof(s) / sizeof(s[0]) - sizeof(s[0])) #define nDPIsrvd_STRLEN_SZ(s) (sizeof(s) / sizeof(s[0]) - sizeof(s[0]))
#define TOKEN_GET_SZ(sock, key) token_get(sock, (char const *)key, nDPIsrvd_STRLEN_SZ(key)) #define TOKEN_GET_SZ(sock, key) token_get(sock, (char const *)key, nDPIsrvd_STRLEN_SZ(key))
#define TOKEN_GET_VALUE_SZ(sock, key, value_length) token_get_value(sock, (char const *)key, nDPIsrvd_STRLEN_SZ(key), value_length) #define TOKEN_GET_VALUE_SZ(sock, key, value_length) \
#define TOKEN_VALUE_EQUALS_SZ(token, string_to_check) token_value_equals(token, string_to_check, nDPIsrvd_STRLEN_SZ(string_to_check)) token_get_value(sock, (char const *)key, nDPIsrvd_STRLEN_SZ(key), value_length)
#define TOKEN_VALUE_EQUALS_SZ(token, string_to_check) \
token_value_equals(token, string_to_check, nDPIsrvd_STRLEN_SZ(string_to_check))
#define TOKEN_VALUE_TO_ULL(token, value) token_value_to_ull(token, value) #define TOKEN_VALUE_TO_ULL(token, value) token_value_to_ull(token, value)
#define FIRST_ENUM_VALUE 1 #define FIRST_ENUM_VALUE 1
@@ -51,7 +53,8 @@ enum nDPIsrvd_read_return
enum nDPIsrvd_parse_return enum nDPIsrvd_parse_return
{ {
PARSE_OK = READ_LAST_ENUM_VALUE, PARSE_OK = READ_LAST_ENUM_VALUE, /* can only be returned by nDPIsrvd_parse_line, not nDPIsrvd_parse_all */
PARSE_NEED_MORE_DATA, /* returned by nDPIsrvd_parse_line and nDPIsrvd_parse_all */
PARSE_INVALID_OPENING_CHAR, PARSE_INVALID_OPENING_CHAR,
PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT, PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT,
PARSE_SIZE_MISSING, PARSE_SIZE_MISSING,
@@ -116,8 +119,7 @@ static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size
typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock, typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_flow * const flow); struct nDPIsrvd_flow * const flow);
typedef void (*flow_end_callback)(struct nDPIsrvd_socket * const sock, typedef void (*flow_end_callback)(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const flow);
struct nDPIsrvd_flow * const flow);
struct nDPIsrvd_address struct nDPIsrvd_address
{ {
@@ -130,6 +132,22 @@ struct nDPIsrvd_address
}; };
}; };
struct nDPIsrvd_buffer
{
char raw[NETWORK_BUFFER_MAX_SIZE];
size_t used;
char * json_string;
size_t json_string_start;
nDPIsrvd_ull json_string_length;
};
struct nDPIsrvd_jsmn
{
jsmn_parser parser;
jsmntok_t tokens[nDPIsrvd_MAX_JSON_TOKENS];
int tokens_found;
};
struct nDPIsrvd_socket struct nDPIsrvd_socket
{ {
int fd; int fd;
@@ -140,22 +158,8 @@ struct nDPIsrvd_socket
json_callback json_callback; json_callback json_callback;
flow_end_callback flow_end_callback; flow_end_callback flow_end_callback;
struct struct nDPIsrvd_buffer buffer;
{ struct nDPIsrvd_jsmn jsmn;
char raw[NETWORK_BUFFER_MAX_SIZE];
size_t used;
char * json_string;
size_t json_string_start;
nDPIsrvd_ull json_string_length;
} buffer;
/* jsmn JSON parser */
struct
{
jsmn_parser parser;
jsmntok_t tokens[nDPIsrvd_MAX_JSON_TOKENS];
int tokens_found;
} jsmn;
/* easy and fast JSON key/value access via hash table and a static array */ /* easy and fast JSON key/value access via hash table and a static array */
struct struct
@@ -246,8 +250,7 @@ static inline int nDPIsrvd_base64decode(char * in, size_t inLen, unsigned char *
static inline char const * nDPIsrvd_enum_to_string(int enum_value) static inline char const * nDPIsrvd_enum_to_string(int enum_value)
{ {
static char const * const enum_str[LAST_ENUM_VALUE + 1] = { static char const * const enum_str[LAST_ENUM_VALUE + 1] = {"CONNECT_OK",
"CONNECT_OK",
"CONNECT_ERROR_SOCKET", "CONNECT_ERROR_SOCKET",
"CONNECT_ERROR", "CONNECT_ERROR",
@@ -256,6 +259,7 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value)
"READ_ERROR", "READ_ERROR",
"PARSE_OK", "PARSE_OK",
"PARSE_NEED_MORE_DATA",
"PARSE_INVALID_OPENING_CHAR", "PARSE_INVALID_OPENING_CHAR",
"PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT", "PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT",
"PARSE_SIZE_MISSING", "PARSE_SIZE_MISSING",
@@ -274,8 +278,7 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value)
"CONVERSION_NOT_A_NUMBER", "CONVERSION_NOT_A_NUMBER",
"CONVERSION_RANGE_EXCEEDED", "CONVERSION_RANGE_EXCEEDED",
[LAST_ENUM_VALUE] = "LAST_ENUM_VALUE" [LAST_ENUM_VALUE] = "LAST_ENUM_VALUE"};
};
if (enum_value < FIRST_ENUM_VALUE || enum_value >= LAST_ENUM_VALUE) if (enum_value < FIRST_ENUM_VALUE || enum_value >= LAST_ENUM_VALUE)
{ {
@@ -355,7 +358,8 @@ static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock)
{ {
HASH_ITER(hh, (*sock)->flow_table, current_flow, ftmp) HASH_ITER(hh, (*sock)->flow_table, current_flow, ftmp)
{ {
if ((*sock)->flow_end_callback != NULL) { if ((*sock)->flow_end_callback != NULL)
{
(*sock)->flow_end_callback(*sock, current_flow); (*sock)->flow_end_callback(*sock, current_flow);
} }
HASH_DEL((*sock)->flow_table, current_flow); HASH_DEL((*sock)->flow_table, current_flow);
@@ -377,14 +381,17 @@ static inline int nDPIsrvd_setup_address(struct nDPIsrvd_address * const address
memset(address, 0, sizeof(*address)); memset(address, 0, sizeof(*address));
if (last_colon == NULL) { if (last_colon == NULL)
{
address->raw.sa_family = AF_UNIX; address->raw.sa_family = AF_UNIX;
address->size = sizeof(address->un); address->size = sizeof(address->un);
if (snprintf(address->un.sun_path, sizeof(address->un.sun_path), "%s", destination) <= 0) if (snprintf(address->un.sun_path, sizeof(address->un.sun_path), "%s", destination) <= 0)
{ {
return 1; return 1;
} }
} else { }
else
{
char addr_buf[INET6_ADDRSTRLEN]; char addr_buf[INET6_ADDRSTRLEN];
char const * address_start = destination; char const * address_start = destination;
char const * address_end = last_colon; char const * address_end = last_colon;
@@ -401,7 +408,9 @@ static inline int nDPIsrvd_setup_address(struct nDPIsrvd_address * const address
{ {
return 1; return 1;
} }
} else { }
else
{
address->raw.sa_family = AF_INET6; address->raw.sa_family = AF_INET6;
address->size = sizeof(address->in6); address->size = sizeof(address->in6);
address->in6.sin6_port = htons(atoi(last_colon + 1)); address->in6.sin6_port = htons(atoi(last_colon + 1));
@@ -486,13 +495,16 @@ static inline int jsmn_token_size(struct nDPIsrvd_socket const * const sock, int
return sock->jsmn.tokens[current_token_index].end - sock->jsmn.tokens[current_token_index].start; return sock->jsmn.tokens[current_token_index].end - sock->jsmn.tokens[current_token_index].start;
} }
static inline int jsmn_token_is_jsmn_type(struct nDPIsrvd_socket const * const sock, int current_token_index, jsmntype_t type_to_check) static inline int jsmn_token_is_jsmn_type(struct nDPIsrvd_socket const * const sock,
int current_token_index,
jsmntype_t type_to_check)
{ {
return sock->jsmn.tokens[current_token_index].type == type_to_check; return sock->jsmn.tokens[current_token_index].type == type_to_check;
} }
static inline struct nDPIsrvd_json_token const * static inline struct nDPIsrvd_json_token const * token_get(struct nDPIsrvd_socket const * const sock,
token_get(struct nDPIsrvd_socket const * const sock, char const * const key, size_t key_length) char const * const key,
size_t key_length)
{ {
struct nDPIsrvd_json_token * token = NULL; struct nDPIsrvd_json_token * token = NULL;
@@ -505,8 +517,10 @@ token_get(struct nDPIsrvd_socket const * const sock, char const * const key, siz
return NULL; return NULL;
} }
static inline char const * static inline char const * token_get_value(struct nDPIsrvd_socket const * const sock,
token_get_value(struct nDPIsrvd_socket const * const sock, char const * const key, size_t key_length, size_t * value_length) char const * const key,
size_t key_length,
size_t * value_length)
{ {
struct nDPIsrvd_json_token const * const token = token_get(sock, key, key_length); struct nDPIsrvd_json_token const * const token = token_get(sock, key, key_length);
if (token != NULL) if (token != NULL)
@@ -521,19 +535,20 @@ token_get_value(struct nDPIsrvd_socket const * const sock, char const * const ke
return NULL; return NULL;
} }
static inline int token_value_equals(struct nDPIsrvd_json_token const * const token, char const * const value, size_t value_length) static inline int token_value_equals(struct nDPIsrvd_json_token const * const token,
char const * const value,
size_t value_length)
{ {
if (token == NULL) if (token == NULL)
{ {
return 0; return 0;
} }
return strncmp(token->value, value, token->value_length) == 0 && return strncmp(token->value, value, token->value_length) == 0 && token->value_length == (int)value_length;
token->value_length == (int)value_length;
} }
static inline enum nDPIsrvd_conversion_return static inline enum nDPIsrvd_conversion_return str_value_to_ull(char const * const value_as_string,
str_value_to_ull(char const * const value_as_string, nDPIsrvd_ull_ptr const value) nDPIsrvd_ull_ptr const value)
{ {
char * endptr = NULL; char * endptr = NULL;
*value = strtoull(value_as_string, &endptr, 10); *value = strtoull(value_as_string, &endptr, 10);
@@ -550,8 +565,8 @@ str_value_to_ull(char const * const value_as_string, nDPIsrvd_ull_ptr const valu
return CONVERSION_OK; return CONVERSION_OK;
} }
static inline enum nDPIsrvd_conversion_return static inline enum nDPIsrvd_conversion_return token_value_to_ull(struct nDPIsrvd_json_token const * const token,
token_value_to_ull(struct nDPIsrvd_json_token const * const token, nDPIsrvd_ull_ptr const value) nDPIsrvd_ull_ptr const value)
{ {
if (token == NULL) if (token == NULL)
{ {
@@ -569,10 +584,15 @@ static inline int nDPIsrvd_build_flow_key(struct nDPIsrvd_flow_key * const key,
return 1; return 1;
} }
if (snprintf(key->key, nDPIsrvd_FLOW_KEY_STRLEN, "%.*s-%.*s-%.*s", if (snprintf(key->key,
tokens[0]->value_length, tokens[0]->value, nDPIsrvd_FLOW_KEY_STRLEN,
tokens[1]->value_length, tokens[1]->value, "%.*s-%.*s-%.*s",
tokens[2]->value_length, tokens[2]->value) <= 0) tokens[0]->value_length,
tokens[0]->value,
tokens[1]->value_length,
tokens[1]->value,
tokens[2]->value_length,
tokens[2]->value) <= 0)
{ {
return 1; return 1;
} }
@@ -584,7 +604,9 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket *
struct nDPIsrvd_json_token const * const flow_id) struct nDPIsrvd_json_token const * const flow_id)
{ {
struct nDPIsrvd_json_token const * const tokens[nDPIsrvd_FLOW_KEY_TOKENS] = { struct nDPIsrvd_json_token const * const tokens[nDPIsrvd_FLOW_KEY_TOKENS] = {
flow_id, TOKEN_GET_SZ(sock, "alias"), TOKEN_GET_SZ(sock, "source"), flow_id,
TOKEN_GET_SZ(sock, "alias"),
TOKEN_GET_SZ(sock, "source"),
}; };
struct nDPIsrvd_flow_key key = {}; struct nDPIsrvd_flow_key key = {};
@@ -612,7 +634,8 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket *
return flow; return flow;
} }
static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const current_flow) static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_flow * const current_flow)
{ {
if (current_flow == NULL) if (current_flow == NULL)
{ {
@@ -621,10 +644,10 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, s
struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name");
if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle") != 0 && if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle") != 0 && TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0)
TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0) {
if (sock->flow_end_callback != NULL)
{ {
if (sock->flow_end_callback != NULL) {
sock->flow_end_callback(sock, current_flow); sock->flow_end_callback(sock, current_flow);
} }
HASH_DEL(sock->flow_table, current_flow); HASH_DEL(sock->flow_table, current_flow);
@@ -634,55 +657,72 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, s
return 0; return 0;
} }
static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket * const sock) static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buffer * const buffer,
struct nDPIsrvd_jsmn * const jsmn)
{ {
enum nDPIsrvd_parse_return ret = PARSE_OK; if (buffer->used < NETWORK_BUFFER_LENGTH_DIGITS + 1)
while (sock->buffer.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
{ {
if (sock->buffer.raw[NETWORK_BUFFER_LENGTH_DIGITS] != '{') return PARSE_NEED_MORE_DATA;
}
if (buffer->raw[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
{ {
return PARSE_INVALID_OPENING_CHAR; return PARSE_INVALID_OPENING_CHAR;
} }
errno = 0; errno = 0;
sock->buffer.json_string_length = strtoull((const char *)sock->buffer.raw, &sock->buffer.json_string, 10); buffer->json_string_length = strtoull((const char *)buffer->raw, &buffer->json_string, 10);
sock->buffer.json_string_length += sock->buffer.json_string - sock->buffer.raw; buffer->json_string_length += buffer->json_string - buffer->raw;
sock->buffer.json_string_start = sock->buffer.json_string - sock->buffer.raw; buffer->json_string_start = buffer->json_string - buffer->raw;
if (errno == ERANGE) if (errno == ERANGE)
{ {
return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT; return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT;
} }
if (sock->buffer.json_string == sock->buffer.raw) if (buffer->json_string == buffer->raw)
{ {
return PARSE_SIZE_MISSING; return PARSE_SIZE_MISSING;
} }
if (sock->buffer.json_string_length > sizeof(sock->buffer.raw)) if (buffer->json_string_length > sizeof(buffer->raw))
{ {
return PARSE_STRING_TOO_BIG; return PARSE_STRING_TOO_BIG;
} }
if (sock->buffer.json_string_length > sock->buffer.used) if (buffer->json_string_length > buffer->used)
{ {
break; return PARSE_NEED_MORE_DATA;
} }
if (buffer->raw[buffer->json_string_length - 2] != '}' || buffer->raw[buffer->json_string_length - 1] != '\n')
if (sock->buffer.raw[sock->buffer.json_string_length - 2] != '}' ||
sock->buffer.raw[sock->buffer.json_string_length - 1] != '\n')
{ {
return PARSE_INVALID_CLOSING_CHAR; return PARSE_INVALID_CLOSING_CHAR;
} }
jsmn_init(&sock->jsmn.parser); jsmn_init(&jsmn->parser);
sock->jsmn.tokens_found = jsmn_parse(&sock->jsmn.parser, jsmn->tokens_found = jsmn_parse(&jsmn->parser,
(char *)(sock->buffer.raw + sock->buffer.json_string_start), (char *)(buffer->raw + buffer->json_string_start),
sock->buffer.json_string_length - sock->buffer.json_string_start, buffer->json_string_length - buffer->json_string_start,
sock->jsmn.tokens, nDPIsrvd_MAX_JSON_TOKENS); jsmn->tokens,
if (sock->jsmn.tokens_found < 0 || sock->jsmn.tokens[0].type != JSMN_OBJECT) nDPIsrvd_MAX_JSON_TOKENS);
if (jsmn->tokens_found < 0 || jsmn->tokens[0].type != JSMN_OBJECT)
{ {
return PARSE_JSMN_ERROR; return PARSE_JSMN_ERROR;
} }
return PARSE_OK;
}
static void nDPIsrvd_drain_buffer(struct nDPIsrvd_buffer * const buffer)
{
memmove(buffer->raw, buffer->raw + buffer->json_string_length, buffer->used - buffer->json_string_length);
buffer->used -= buffer->json_string_length;
buffer->json_string_length = 0;
buffer->json_string_start = 0;
}
static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_socket * const sock)
{
enum nDPIsrvd_parse_return ret;
while ((ret = nDPIsrvd_parse_line(&sock->buffer, &sock->jsmn)) == PARSE_OK)
{
char const * key = NULL; char const * key = NULL;
int key_length = 0; int key_length = 0;
for (int current_token = 1; current_token < sock->jsmn.tokens_found; current_token++) for (int current_token = 1; current_token < sock->jsmn.tokens_found; current_token++)
@@ -713,12 +753,12 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket *
{ {
token->value = jsmn_token_get(sock, current_token); token->value = jsmn_token_get(sock, current_token);
token->value_length = jsmn_token_size(sock, current_token); token->value_length = jsmn_token_size(sock, current_token);
} else { }
struct nDPIsrvd_json_token jt = { else
.value = jsmn_token_get(sock, current_token), {
struct nDPIsrvd_json_token jt = {.value = jsmn_token_get(sock, current_token),
.value_length = jsmn_token_size(sock, current_token), .value_length = jsmn_token_size(sock, current_token),
.hh = {} .hh = {}};
};
if (key == NULL || key_length > nDPIsrvd_JSON_KEY_STRLEN || if (key == NULL || key_length > nDPIsrvd_JSON_KEY_STRLEN ||
utarray_len(sock->json.tokens) == nDPIsrvd_MAX_JSON_TOKENS) utarray_len(sock->json.tokens) == nDPIsrvd_MAX_JSON_TOKENS)
@@ -730,7 +770,8 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket *
jt.key_length = key_length; jt.key_length = key_length;
snprintf(jt.key, nDPIsrvd_JSON_KEY_STRLEN, "%.*s", key_length, key); snprintf(jt.key, nDPIsrvd_JSON_KEY_STRLEN, "%.*s", key_length, key);
utarray_push_back(sock->json.tokens, &jt); utarray_push_back(sock->json.tokens, &jt);
HASH_ADD_STR(sock->json.token_table, key, HASH_ADD_STR(sock->json.token_table,
key,
(struct nDPIsrvd_json_token *)utarray_back(sock->json.tokens)); (struct nDPIsrvd_json_token *)utarray_back(sock->json.tokens));
} }
@@ -749,8 +790,7 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket *
ret = PARSE_FLOW_MGMT_ERROR; ret = PARSE_FLOW_MGMT_ERROR;
} }
} }
if (ret == PARSE_OK && if (ret == PARSE_OK && sock->json_callback(sock, flow) != CALLBACK_OK)
sock->json_callback(sock, flow) != CALLBACK_OK)
{ {
ret = PARSE_JSON_CALLBACK_ERROR; ret = PARSE_JSON_CALLBACK_ERROR;
} }
@@ -771,12 +811,7 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket *
} }
} }
memmove(sock->buffer.raw, nDPIsrvd_drain_buffer(&sock->buffer);
sock->buffer.raw + sock->buffer.json_string_length,
sock->buffer.used - sock->buffer.json_string_length);
sock->buffer.used -= sock->buffer.json_string_length;
sock->buffer.json_string_length = 0;
sock->buffer.json_string_start = 0;
} }
return ret; return ret;

View File

@@ -560,6 +560,14 @@ static int parse_options(int argc, char ** argv)
} }
errno = 0; errno = 0;
if (datadir[0] != '/')
{
fprintf(stderr,
"%s: PCAP capture directory must be absolut i.e. starting with `/', path given: `%s'\n",
argv[0],
datadir);
return 1;
}
if (mkdir(datadir, S_IRWXU) != 0 && errno != EEXIST) if (mkdir(datadir, S_IRWXU) != 0 && errno != EEXIST)
{ {
fprintf(stderr, "%s: Could not create directory %s: %s\n", argv[0], datadir, strerror(errno)); fprintf(stderr, "%s: Could not create directory %s: %s\n", argv[0], datadir, strerror(errno));
@@ -581,8 +589,8 @@ static int mainloop(void)
return 1; return 1;
} }
enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse(sock); enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(sock);
if (parse_ret != PARSE_OK) if (parse_ret != PARSE_NEED_MORE_DATA)
{ {
syslog(LOG_DAEMON | LOG_ERR, "nDPIsrvd parse failed with: %s", nDPIsrvd_enum_to_string(parse_ret)); syslog(LOG_DAEMON | LOG_ERR, "nDPIsrvd parse failed with: %s", nDPIsrvd_enum_to_string(parse_ret));
return 1; return 1;
@@ -645,6 +653,7 @@ int main(int argc, char ** argv)
int retval = mainloop(); int retval = mainloop();
nDPIsrvd_free(&sock); nDPIsrvd_free(&sock);
daemonize_shutdown(pidfile);
closelog(); closelog();
return retval; return retval;

View File

@@ -89,6 +89,7 @@ static struct
uint64_t flow_l3_other_count; uint64_t flow_l3_other_count;
uint64_t flow_l4_tcp_count; uint64_t flow_l4_tcp_count;
uint64_t flow_l4_udp_count; uint64_t flow_l4_udp_count;
uint64_t flow_l4_icmp_count;
uint64_t flow_l4_other_count; uint64_t flow_l4_other_count;
} collectd_statistics = {}; } collectd_statistics = {};
@@ -313,13 +314,15 @@ static void print_collectd_exec_output(void)
printf(COLLECTD_PUTVAL_N_FORMAT(flow_l3_ip4_count) COLLECTD_PUTVAL_N_FORMAT(flow_l3_ip6_count) printf(COLLECTD_PUTVAL_N_FORMAT(flow_l3_ip4_count) COLLECTD_PUTVAL_N_FORMAT(flow_l3_ip6_count)
COLLECTD_PUTVAL_N_FORMAT(flow_l3_other_count) COLLECTD_PUTVAL_N_FORMAT(flow_l4_tcp_count) COLLECTD_PUTVAL_N_FORMAT(flow_l3_other_count) COLLECTD_PUTVAL_N_FORMAT(flow_l4_tcp_count)
COLLECTD_PUTVAL_N_FORMAT(flow_l4_udp_count) COLLECTD_PUTVAL_N_FORMAT(flow_l4_other_count), COLLECTD_PUTVAL_N_FORMAT(flow_l4_udp_count) COLLECTD_PUTVAL_N_FORMAT(flow_l4_icmp_count)
COLLECTD_PUTVAL_N_FORMAT(flow_l4_other_count),
COLLECTD_PUTVAL_N(flow_l3_ip4_count), COLLECTD_PUTVAL_N(flow_l3_ip4_count),
COLLECTD_PUTVAL_N(flow_l3_ip6_count), COLLECTD_PUTVAL_N(flow_l3_ip6_count),
COLLECTD_PUTVAL_N(flow_l3_other_count), COLLECTD_PUTVAL_N(flow_l3_other_count),
COLLECTD_PUTVAL_N(flow_l4_tcp_count), COLLECTD_PUTVAL_N(flow_l4_tcp_count),
COLLECTD_PUTVAL_N(flow_l4_udp_count), COLLECTD_PUTVAL_N(flow_l4_udp_count),
COLLECTD_PUTVAL_N(flow_l4_icmp_count),
COLLECTD_PUTVAL_N(flow_l4_other_count)); COLLECTD_PUTVAL_N(flow_l4_other_count));
memset(&collectd_statistics, 0, sizeof(collectd_statistics)); memset(&collectd_statistics, 0, sizeof(collectd_statistics));
@@ -370,8 +373,8 @@ static int mainloop(int epollfd)
return 1; return 1;
} }
enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse(sock); enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(sock);
if (parse_ret != PARSE_OK) if (parse_ret != PARSE_NEED_MORE_DATA)
{ {
LOG(LOG_DAEMON | LOG_ERR, "nDPIsrvd parse failed with: %s", nDPIsrvd_enum_to_string(parse_ret)); LOG(LOG_DAEMON | LOG_ERR, "nDPIsrvd parse failed with: %s", nDPIsrvd_enum_to_string(parse_ret));
return 1; return 1;
@@ -424,14 +427,18 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
} }
struct nDPIsrvd_json_token const * const l4_proto = TOKEN_GET_SZ(sock, "l4_proto"); struct nDPIsrvd_json_token const * const l4_proto = TOKEN_GET_SZ(sock, "l4_proto");
if (TOKEN_VALUE_EQUALS_SZ(l3_proto, "tcp") != 0) if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "tcp") != 0)
{ {
collectd_statistics.flow_l4_tcp_count++; collectd_statistics.flow_l4_tcp_count++;
} }
else if (TOKEN_VALUE_EQUALS_SZ(l3_proto, "tcp") != 0) else if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "udp") != 0)
{ {
collectd_statistics.flow_l4_udp_count++; collectd_statistics.flow_l4_udp_count++;
} }
else if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "icmp") != 0)
{
collectd_statistics.flow_l4_icmp_count++;
}
else if (l4_proto != NULL) else if (l4_proto != NULL)
{ {
collectd_statistics.flow_l4_other_count++; collectd_statistics.flow_l4_other_count++;

View File

@@ -3,6 +3,11 @@
import os import os
import sys import sys
sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
try:
import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor
except ModuleNotFoundError:
sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
import nDPIsrvd import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor from nDPIsrvd import nDPIsrvdSocket, TermColor

View File

@@ -3,9 +3,14 @@
import os import os
import sys import sys
sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
try:
import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor
except ModuleNotFoundError:
sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
import nDPIsrvd import nDPIsrvd
from nDPIsrvd import TermColor, nDPIsrvdSocket, PcapPacket from nDPIsrvd import nDPIsrvdSocket, TermColor
def onJsonLineRecvd(json_dict, current_flow, global_user_data): def onJsonLineRecvd(json_dict, current_flow, global_user_data):
if current_flow is None: if current_flow is None:

View File

@@ -3,11 +3,15 @@
import os import os
import sys import sys
sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
try:
import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor
except ModuleNotFoundError:
sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
import nDPIsrvd import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor from nDPIsrvd import nDPIsrvdSocket, TermColor
def onJsonLineRecvd(json_dict, current_flow, global_user_data): def onJsonLineRecvd(json_dict, current_flow, global_user_data):
print(json_dict) print(json_dict)
return True return True

View File

@@ -4,9 +4,14 @@ import base64
import os import os
import sys import sys
sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
try:
import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor
except ModuleNotFoundError:
sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
import nDPIsrvd import nDPIsrvd
from nDPIsrvd import TermColor, nDPIsrvdSocket, PcapPacket from nDPIsrvd import nDPIsrvdSocket, TermColor
def onJsonLineRecvd(json_dict, current_flow, global_user_data): def onJsonLineRecvd(json_dict, current_flow, global_user_data):
if current_flow is None: if current_flow is None:

View File

@@ -3,6 +3,11 @@
import os import os
import sys import sys
sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
try:
import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor
except ModuleNotFoundError:
sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
import nDPIsrvd import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor from nDPIsrvd import nDPIsrvdSocket, TermColor

View File

@@ -16,11 +16,29 @@ enum
PIPE_COUNT = 2 PIPE_COUNT = 2
}; };
static int epollfd = -1; struct thread_return_value
{
int val;
};
static int mock_pipefds[PIPE_COUNT] = {}; static int mock_pipefds[PIPE_COUNT] = {};
static int mock_servfds[PIPE_COUNT] = {}; static int mock_servfds[PIPE_COUNT] = {};
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
#define MAX_REMOTE_DESCRIPTORS 2
#define THREAD_ERROR(thread_arg) \
do \
{ \
((struct thread_return_value *)thread_arg)->val = 1; \
} while (0);
#define THREAD_ERROR_GOTO(thread_arg) \
do \
{ \
THREAD_ERROR(thread_arg); \
goto error; \
} while (0);
void mock_syslog_stderr(int p, const char * format, ...) void mock_syslog_stderr(int p, const char * format, ...)
{ {
va_list ap; va_list ap;
@@ -47,98 +65,158 @@ static int setup_pipe(int pipefd[PIPE_COUNT])
static void * nDPIsrvd_mainloop_thread(void * const arg) static void * nDPIsrvd_mainloop_thread(void * const arg)
{ {
(void)arg; (void)arg;
int epollfd = create_evq();
struct remote_desc * mock_json_desc = NULL; struct remote_desc * mock_json_desc = NULL;
struct remote_desc * mock_serv_desc = NULL; struct remote_desc * mock_serv_desc = NULL;
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
if (epollfd < 0)
{
THREAD_ERROR_GOTO(arg);
}
mock_json_desc = get_unused_remote_descriptor(JSON_SOCK, mock_pipefds[PIPE_nDPIsrvd]); mock_json_desc = get_unused_remote_descriptor(JSON_SOCK, mock_pipefds[PIPE_nDPIsrvd]);
if (mock_json_desc == NULL) if (mock_json_desc == NULL)
{ {
goto error; THREAD_ERROR_GOTO(arg);
} }
mock_serv_desc = get_unused_remote_descriptor(SERV_SOCK, mock_servfds[PIPE_WRITE]); mock_serv_desc = get_unused_remote_descriptor(SERV_SOCK, mock_servfds[PIPE_WRITE]);
if (mock_serv_desc == NULL) if (mock_serv_desc == NULL)
{ {
goto error; THREAD_ERROR_GOTO(arg);
} }
strncpy(mock_serv_desc->event_serv.peer_addr, "0.0.0.0", sizeof(mock_serv_desc->event_serv.peer_addr)); strncpy(mock_serv_desc->event_serv.peer_addr, "0.0.0.0", sizeof(mock_serv_desc->event_serv.peer_addr));
mock_serv_desc->event_serv.peer.sin_port = 0; mock_serv_desc->event_serv.peer.sin_port = 0;
if (add_event(epollfd, mock_pipefds[PIPE_nDPIsrvd], mock_json_desc) != 0) if (add_event(epollfd, mock_pipefds[PIPE_nDPIsrvd], mock_json_desc) != 0)
{ {
goto error; THREAD_ERROR_GOTO(arg);
} }
if (add_event(epollfd, mock_servfds[PIPE_WRITE], mock_serv_desc) != 0) if (add_event(epollfd, mock_servfds[PIPE_WRITE], mock_serv_desc) != 0)
{ {
goto error; THREAD_ERROR_GOTO(arg);
} }
if (mainloop(epollfd) != 0) while (1)
{
int nready = epoll_wait(epollfd, events, events_size, -1);
if (nready < 0)
{
THREAD_ERROR_GOTO(arg);
}
for (int i = 0; i < nready; i++)
{
if (events[i].data.ptr == mock_json_desc)
{
if (handle_incoming_data_event(epollfd, &events[i]) != 0)
{ {
goto error; goto error;
} }
}
while (handle_incoming_data(epollfd, mock_json_desc) == 0) {} else
{
THREAD_ERROR_GOTO(arg);
}
}
}
error: error:
del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]);
del_event(epollfd, mock_servfds[PIPE_WRITE]);
close(mock_pipefds[PIPE_nDPIsrvd]);
close(mock_servfds[PIPE_WRITE]); close(mock_servfds[PIPE_WRITE]);
close(epollfd);
return NULL; return NULL;
} }
static void * distributor_mainloop_thread(void * const arg) static enum nDPIsrvd_parse_return parse_json_lines(struct io_buffer * const buffer)
{ {
char buf[NETWORK_BUFFER_MAX_SIZE]; struct nDPIsrvd_buffer buf = {};
struct nDPIsrvd_jsmn jsmn = {};
size_t const n = (buffer->used > sizeof(buf.raw) ? sizeof(buf.raw) : buffer->used);
(void)arg; if (n > NETWORK_BUFFER_MAX_SIZE)
{
return PARSE_STRING_TOO_BIG;
}
int dis_thread_shutdown = 0; memcpy(buf.raw, buffer->ptr, n);
buf.used = buffer->used;
enum nDPIsrvd_parse_return ret;
while ((ret = nDPIsrvd_parse_line(&buf, &jsmn)) == PARSE_OK)
{
if (jsmn.tokens_found == 0)
{
return PARSE_JSMN_ERROR;
}
nDPIsrvd_drain_buffer(&buf);
}
memcpy(buffer->ptr, buf.raw, buf.used);
buffer->used = buf.used;
return ret;
}
static void * distributor_client_mainloop_thread(void * const arg)
{
struct io_buffer client_buffer = {.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE),
.max = NETWORK_BUFFER_MAX_SIZE,
.used = 0};
int dis_epollfd = create_evq(); int dis_epollfd = create_evq();
int signalfd = setup_signalfd(dis_epollfd); int signalfd = setup_signalfd(dis_epollfd);
struct epoll_event events[32]; struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]); size_t const events_size = sizeof(events) / sizeof(events[0]);
if (dis_epollfd < 0) if (client_buffer.ptr == NULL || dis_epollfd < 0 || signalfd < 0)
{ {
goto error; THREAD_ERROR_GOTO(arg);
} }
if (add_event(dis_epollfd, mock_servfds[PIPE_READ], NULL) != 0) if (add_event(dis_epollfd, mock_servfds[PIPE_READ], NULL) != 0)
{ {
goto error; THREAD_ERROR_GOTO(arg);
}
if (signalfd < 0)
{
goto error;
} }
while (dis_thread_shutdown == 0) while (1)
{ {
int nready = epoll_wait(dis_epollfd, events, events_size, -1); int nready = epoll_wait(dis_epollfd, events, events_size, -1);
for (int i = 0; i < nready; i++) for (int i = 0; i < nready; i++)
{ {
if ((events[i].events & EPOLLERR) != 0) if ((events[i].events & EPOLLIN) == 0 && (events[i].events & EPOLLHUP) == 0)
{ {
dis_thread_shutdown = 1; THREAD_ERROR_GOTO(arg);
break;
}
if ((events[i].events & EPOLLIN) == 0)
{
dis_thread_shutdown = 1;
break;
} }
if (events[i].data.fd == mock_servfds[PIPE_READ]) if (events[i].data.fd == mock_servfds[PIPE_READ])
{ {
ssize_t bytes_read = read(mock_servfds[PIPE_READ], buf, sizeof(buf)); ssize_t bytes_read = read(mock_servfds[PIPE_READ],
if (bytes_read <= 0) client_buffer.ptr + client_buffer.used,
client_buffer.max - client_buffer.used);
if (bytes_read == 0)
{ {
dis_thread_shutdown = 1; goto error;
break; }
else if (bytes_read < 0)
{
THREAD_ERROR_GOTO(arg);
}
printf("%.*s", (int)bytes_read, client_buffer.ptr + client_buffer.used);
client_buffer.used += bytes_read;
enum nDPIsrvd_parse_return parse_ret = parse_json_lines(&client_buffer);
if (parse_ret != PARSE_NEED_MORE_DATA)
{
fprintf(stderr, "JSON parsing failed: %s\n", nDPIsrvd_enum_to_string(parse_ret));
THREAD_ERROR(arg);
} }
printf("%.*s", (int)bytes_read, buf);
} }
else if (events[i].data.fd == signalfd) else if (events[i].data.fd == signalfd)
{ {
@@ -148,45 +226,38 @@ static void * distributor_mainloop_thread(void * const arg)
s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo)); s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo));
if (s != sizeof(struct signalfd_siginfo)) if (s != sizeof(struct signalfd_siginfo))
{ {
dis_thread_shutdown = 1; THREAD_ERROR(arg);
break;
} }
if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT) if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT)
{ {
dis_thread_shutdown = 1; fprintf(stderr, "Got signal %d, abort.\n", fdsi.ssi_signo);
break; THREAD_ERROR(arg);
} }
} }
else else
{ {
dis_thread_shutdown = 1; THREAD_ERROR(arg);
break;
} }
} }
} }
ssize_t bytes_read;
while ((bytes_read = read(mock_servfds[PIPE_READ], buf, sizeof(buf))) > 0)
{
printf("%.*s", (int)bytes_read, buf);
}
error: error:
del_event(dis_epollfd, signalfd); del_event(dis_epollfd, signalfd);
del_event(dis_epollfd, mock_servfds[PIPE_READ]); del_event(dis_epollfd, mock_servfds[PIPE_READ]);
close(dis_epollfd); close(dis_epollfd);
close(signalfd); close(signalfd);
free(client_buffer.ptr);
return NULL; return NULL;
} }
static void * nDPId_mainloop_thread(void * const arg) static void * nDPId_mainloop_thread(void * const arg)
{ {
(void)arg;
if (setup_reader_threads() != 0) if (setup_reader_threads() != 0)
{ {
exit(EXIT_FAILURE); THREAD_ERROR(arg);
return NULL;
} }
/* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */ /* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */
@@ -203,9 +274,33 @@ static void * nDPId_mainloop_thread(void * const arg)
static void usage(char const * const arg0) static void usage(char const * const arg0)
{ {
printf("usage: %s [path-to-pcap-file]\n", arg0); fprintf(stderr, "usage: %s [path-to-pcap-file]\n", arg0);
} }
static int thread_wait_for_termination(pthread_t thread, time_t wait_time_secs, struct thread_return_value * const trv)
{
struct timespec ts;
if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
{
return -1;
}
ts.tv_sec += wait_time_secs;
int err = pthread_timedjoin_np(thread, (void **)&trv, &ts);
switch (err)
{
case EBUSY:
return 0;
case ETIMEDOUT:
return 0;
}
return 1;
}
#define THREADS_RETURNED_ERROR() (nDPId_return.val != 0 || nDPIsrvd_return.val != 0 || distributor_return.val != 0)
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
if (argc != 2) if (argc != 2)
@@ -214,6 +309,11 @@ int main(int argc, char ** argv)
return -1; return -1;
} }
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
{
return -1;
}
nDPId_options.reader_thread_count = 1; /* Please do not change this! Generating meaningful pcap diff's relies on a nDPId_options.reader_thread_count = 1; /* Please do not change this! Generating meaningful pcap diff's relies on a
single reader thread! */ single reader thread! */
nDPId_options.instance_alias = strdup("nDPId-test"); nDPId_options.instance_alias = strdup("nDPId-test");
@@ -232,53 +332,57 @@ int main(int argc, char ** argv)
json_sockfd = -1; json_sockfd = -1;
serv_sockfd = -1; serv_sockfd = -1;
if (setup_remote_descriptors(2) != 0) if (setup_remote_descriptors(MAX_REMOTE_DESCRIPTORS) != 0)
{
return -1;
}
epollfd = create_evq();
if (epollfd < 0)
{ {
return -1; return -1;
} }
pthread_t nDPId_thread; pthread_t nDPId_thread;
if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, NULL) != 0) struct thread_return_value nDPId_return = {};
if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, &nDPId_return) != 0)
{ {
return -1; return -1;
} }
pthread_t nDPIsrvd_thread; pthread_t nDPIsrvd_thread;
if (pthread_create(&nDPIsrvd_thread, NULL, nDPIsrvd_mainloop_thread, NULL) != 0) struct thread_return_value nDPIsrvd_return = {};
if (pthread_create(&nDPIsrvd_thread, NULL, nDPIsrvd_mainloop_thread, &nDPIsrvd_return) != 0)
{ {
return -1; return -1;
} }
pthread_t distributor_thread; pthread_t distributor_thread;
if (pthread_create(&distributor_thread, NULL, distributor_mainloop_thread, NULL) != 0) struct thread_return_value distributor_return = {};
if (pthread_create(&distributor_thread, NULL, distributor_client_mainloop_thread, &distributor_return) != 0)
{ {
return -1; return -1;
} }
if (pthread_join(nDPId_thread, NULL) != 0) /* Try to gracefully shutdown all threads. */
while (thread_wait_for_termination(distributor_thread, 1, &distributor_return) == 0)
{
if (THREADS_RETURNED_ERROR() != 0)
{ {
return -1; return -1;
} }
}
pthread_kill(nDPIsrvd_thread, SIGINT); while (thread_wait_for_termination(nDPId_thread, 1, &nDPId_return) == 0)
{
if (pthread_join(nDPIsrvd_thread, NULL) != 0) if (THREADS_RETURNED_ERROR() != 0)
{ {
return -1; return -1;
} }
}
pthread_kill(distributor_thread, SIGINT); while (thread_wait_for_termination(nDPIsrvd_thread, 1, &nDPIsrvd_return) == 0)
{
if (pthread_join(distributor_thread, NULL) != 0) if (THREADS_RETURNED_ERROR() != 0)
{ {
return -1; return -1;
} }
}
return 0;
return THREADS_RETURNED_ERROR();
} }

35
nDPId.c
View File

@@ -1532,11 +1532,13 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
serialize_and_send(reader_thread); serialize_and_send(reader_thread);
} }
static void jsonize_format_error(struct nDPId_reader_thread * const reader_thread, uint32_t format_index) static void internal_format_error(ndpi_serializer * const serializer, char const * const format, uint32_t format_index)
{ {
ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer, "serializer-error", "format"); syslog(LOG_DAEMON | LOG_ERR,
ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, "serializer-format-index", format_index); "BUG: Internal error detected for format string `%s' at format index %u",
serialize_and_send(reader_thread); format,
format_index);
ndpi_reset_serializer(serializer);
} }
static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thread, char const * format, va_list ap) static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thread, char const * format, va_list ap)
@@ -1582,7 +1584,7 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
} }
else else
{ {
jsonize_format_error(reader_thread, format_index); internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return; return;
} }
break; break;
@@ -1592,7 +1594,7 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
format_index++; format_index++;
if (got_jsonkey != 1) if (got_jsonkey != 1)
{ {
jsonize_format_error(reader_thread, format_index); internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return; return;
} }
if (*format == 'l') if (*format == 'l')
@@ -1634,7 +1636,7 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
} }
else else
{ {
jsonize_format_error(reader_thread, format_index); internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return; return;
} }
format++; format++;
@@ -1649,7 +1651,7 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
} }
else else
{ {
jsonize_format_error(reader_thread, format_index); internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return; return;
} }
break; break;
@@ -1663,16 +1665,17 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
} }
else else
{ {
jsonize_format_error(reader_thread, format_index); internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return; return;
} }
break; break;
/* format string separators */
case ' ': case ' ':
case ',': case ',':
case '%': case '%':
break; break;
default: default:
jsonize_format_error(reader_thread, format_index); internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return; return;
} }
} }
@@ -2202,9 +2205,9 @@ static void ndpi_process_packet(uint8_t * const args,
if (tree_result == NULL) if (tree_result == NULL)
{ {
/* flow still not found, must be new */ /* flow still not found, must be new or midstream */
if (nDPId_options.process_internal_initial_direction != 0) if (nDPId_options.process_internal_initial_direction != 0 && flow_basic.tcp_is_midstream_flow == 0)
{ {
if (is_ip_in_subnet(&flow_basic.src, if (is_ip_in_subnet(&flow_basic.src,
&nDPId_options.pcap_dev_netmask, &nDPId_options.pcap_dev_netmask,
@@ -2231,7 +2234,7 @@ static void ndpi_process_packet(uint8_t * const args,
return; return;
} }
} }
else if (nDPId_options.process_external_initial_direction != 0) else if (nDPId_options.process_external_initial_direction != 0 && flow_basic.tcp_is_midstream_flow == 0)
{ {
if (is_ip_in_subnet(&flow_basic.src, if (is_ip_in_subnet(&flow_basic.src,
&nDPId_options.pcap_dev_netmask, &nDPId_options.pcap_dev_netmask,
@@ -2761,7 +2764,7 @@ static int nDPId_parse_options(int argc, char ** argv)
"[-d] [-p pidfile]\n" "[-d] [-p pidfile]\n"
"\t \t" "\t \t"
"[-u user] [-g group] " "[-u user] [-g group] "
"[-P path] [-C path] " "[-P path] [-C path] [-J path] "
"[-a instance-alias] " "[-a instance-alias] "
"[-o subopt=value]\n\n" "[-o subopt=value]\n\n"
"\t-i\tInterface or file from where to read packets from.\n" "\t-i\tInterface or file from where to read packets from.\n"
@@ -2932,6 +2935,10 @@ static int validate_options(char const * const arg0)
{ {
int retval = 0; int retval = 0;
if (is_path_absolute("JSON socket", nDPId_options.json_sockpath) != 0)
{
retval = 1;
}
if (nDPId_options.instance_alias == NULL) if (nDPId_options.instance_alias == NULL)
{ {
char hname[256]; char hname[256];

View File

@@ -277,11 +277,19 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
{ {
nDPIsrvd_options.pidfile = strdup(nDPIsrvd_PIDFILE); nDPIsrvd_options.pidfile = strdup(nDPIsrvd_PIDFILE);
} }
if (is_path_absolute("Pidfile", nDPIsrvd_options.pidfile) != 0)
{
return 1;
}
if (nDPIsrvd_options.json_sockpath == NULL) if (nDPIsrvd_options.json_sockpath == NULL)
{ {
nDPIsrvd_options.json_sockpath = strdup(COLLECTOR_UNIX_SOCKET); nDPIsrvd_options.json_sockpath = strdup(COLLECTOR_UNIX_SOCKET);
} }
if (is_path_absolute("JSON socket", nDPIsrvd_options.json_sockpath) != 0)
{
return 1;
}
if (nDPIsrvd_options.serv_optarg == NULL) if (nDPIsrvd_options.serv_optarg == NULL)
{ {
@@ -293,6 +301,10 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
fprintf(stderr, "%s: Could not parse address `%s'\n", argv[0], nDPIsrvd_options.serv_optarg); fprintf(stderr, "%s: Could not parse address `%s'\n", argv[0], nDPIsrvd_options.serv_optarg);
return 1; return 1;
} }
if (serv_address.raw.sa_family == AF_UNIX && is_path_absolute("SERV socket", nDPIsrvd_options.serv_optarg) != 0)
{
return 1;
}
if (optind < argc) if (optind < argc)
{ {
@@ -303,6 +315,28 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
return 0; return 0;
} }
static struct remote_desc * accept_remote(int server_fd,
enum sock_type socktype,
struct sockaddr * const sockaddr,
socklen_t * const addrlen)
{
int client_fd = accept(server_fd, sockaddr, addrlen);
if (client_fd < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
return NULL;
}
struct remote_desc * current = get_unused_remote_descriptor(socktype, client_fd);
if (current == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
return NULL;
}
return current;
}
static int new_connection(int epollfd, int eventfd) static int new_connection(int epollfd, int eventfd)
{ {
union { union {
@@ -330,17 +364,9 @@ static int new_connection(int epollfd, int eventfd)
return 1; return 1;
} }
int client_fd = accept(server_fd, (struct sockaddr *)&sockaddr, &peer_addr_len); struct remote_desc * current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len);
if (client_fd < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
return 1;
}
struct remote_desc * current = get_unused_remote_descriptor(stype, client_fd);
if (current == NULL) if (current == NULL)
{ {
syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
return 1; return 1;
} }
@@ -628,6 +654,11 @@ static int handle_incoming_data_event(int epollfd, struct epoll_event * const ev
{ {
struct remote_desc * current = (struct remote_desc *)event->data.ptr; struct remote_desc * current = (struct remote_desc *)event->data.ptr;
if ((event->events & EPOLLIN) == 0)
{
return 1;
}
if (current == NULL) if (current == NULL)
{ {
syslog(LOG_DAEMON | LOG_ERR, "%s", "remote descriptor got from event data invalid"); syslog(LOG_DAEMON | LOG_ERR, "%s", "remote descriptor got from event data invalid");
@@ -640,11 +671,6 @@ static int handle_incoming_data_event(int epollfd, struct epoll_event * const ev
return 1; return 1;
} }
if ((event->events & EPOLLIN) == 0)
{
return 1;
}
return handle_incoming_data(epollfd, current); return handle_incoming_data(epollfd, current);
} }

View File

@@ -1,11 +1,19 @@
#!/usr/bin/env sh #!/usr/bin/env bash
set -e set -e
LINE_SPACES=${LINE_SPACES:-48} LINE_SPACES=${LINE_SPACES:-48}
MYDIR="$(realpath "$(dirname ${0})")" MYDIR="$(realpath "$(dirname ${0})")"
nDPId_test_EXEC="${2:-"$(realpath "${MYDIR}/../nDPId-test")"}" nDPId_test_EXEC="${2:-"$(realpath "${MYDIR}/../nDPId-test")"}"
nDPI_SOURCE_ROOT="${1}" nDPI_SOURCE_ROOT="$(realpath "${1}")"
LOCKFILE="$(realpath "${0}").lock"
touch "${LOCKFILE}"
exec 42< "${LOCKFILE}"
flock -x -n 42 || {
printf '%s\n' "Could not aquire file lock for ${0}. Already running instance?";
exit 1;
}
if [ $# -ne 1 -a $# -ne 2 ]; then if [ $# -ne 1 -a $# -ne 2 ]; then
cat <<EOF cat <<EOF
@@ -36,16 +44,24 @@ mkdir -p /tmp/nDPId-test-stderr
set +e set +e
RETVAL=0 RETVAL=0
for pcap_file in $(ls *.pcap*); do for pcap_file in $(ls *.pcap*); do
printf '%s\n' "${nDPId_test_EXEC} ${pcap_file}" \
>"/tmp/nDPId-test-stderr/${pcap_file}.out"
${nDPId_test_EXEC} "${pcap_file}" \ ${nDPId_test_EXEC} "${pcap_file}" \
>"${MYDIR}/results/${pcap_file}.out.new" \ >"${MYDIR}/results/${pcap_file}.out.new" \
2>"/tmp/nDPId-test-stderr/${pcap_file}.out" 2>>"/tmp/nDPId-test-stderr/${pcap_file}.out"
printf "%-${LINE_SPACES}s\t" "${pcap_file}"
if [ $? -eq 0 ]; then if [ $? -eq 0 ]; then
if diff -u0 "${MYDIR}/results/${pcap_file}.out" \ if [ ! -r "${MYDIR}/results/${pcap_file}.out" ]; then
printf '%s\n' '[NEW]'
RETVAL=1
elif diff -u0 "${MYDIR}/results/${pcap_file}.out" \
"${MYDIR}/results/${pcap_file}.out.new" >/dev/null; then "${MYDIR}/results/${pcap_file}.out.new" >/dev/null; then
printf "%-${LINE_SPACES}s\t%s\n" "${pcap_file}" '[OK]' printf '%s\n' '[OK]'
else else
printf "%-${LINE_SPACES}s\t%s\n" "${pcap_file}" '[DIFF]' printf '%s\n' '[DIFF]'
diff -u0 "${MYDIR}/results/${pcap_file}.out" \ diff -u0 "${MYDIR}/results/${pcap_file}.out" \
"${MYDIR}/results/${pcap_file}.out.new" "${MYDIR}/results/${pcap_file}.out.new"
mv -v "${MYDIR}/results/${pcap_file}.out.new" \ mv -v "${MYDIR}/results/${pcap_file}.out.new" \
@@ -53,7 +69,7 @@ for pcap_file in $(ls *.pcap*); do
RETVAL=1 RETVAL=1
fi fi
else else
printf "%-${LINE_SPACES}s\t%s\n" "${pcap_file}" '[FAIL]' printf '%s\n' '[FAIL]'
printf '%s\n' '----------------------------------------' printf '%s\n' '----------------------------------------'
printf '%s\n' "-- STDERR of ${pcap_file}" printf '%s\n' "-- STDERR of ${pcap_file}"
cat "/tmp/nDPId-test-stderr/${pcap_file}.out" cat "/tmp/nDPId-test-stderr/${pcap_file}.out"
@@ -63,4 +79,13 @@ for pcap_file in $(ls *.pcap*); do
rm -f "${MYDIR}/results/${pcap_file}.out.new" rm -f "${MYDIR}/results/${pcap_file}.out.new"
done done
cd "${MYDIR}"
for out_file in $(ls results/*.out); do
pcap_file="${nDPI_TEST_DIR}/$(basename ${out_file%.out})"
if [ ! -r "${pcap_file}" ]; then
printf "%-${LINE_SPACES}s\t%s\n" "$(basename ${pcap_file})" '[MISSING]'
RETVAL=1
fi
done
exit ${RETVAL} exit ${RETVAL}

19
utils.c
View File

@@ -55,6 +55,11 @@ static int create_pidfile(char const * const pidfile)
{ {
int pfd; int pfd;
if (is_path_absolute("Pidfile", pidfile) != 0)
{
return 1;
}
pfd = open(pidfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); pfd = open(pidfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (pfd < 0) if (pfd < 0)
@@ -74,6 +79,20 @@ static int create_pidfile(char const * const pidfile)
return 0; return 0;
} }
int is_path_absolute(char const * const prefix,
char const * const path)
{
if (path[0] != '/')
{
syslog(LOG_DAEMON | LOG_ERR,
"%s path must be absolut i.e. starting with a `/', path given: `%s'",
prefix, path);
return 1;
}
return 0;
}
int daemonize_with_pidfile(char const * const pidfile) int daemonize_with_pidfile(char const * const pidfile)
{ {
pid_str ps = {}; pid_str ps = {};

View File

@@ -1,6 +1,9 @@
#ifndef UTILS_H #ifndef UTILS_H
#define UTILS_H 1 #define UTILS_H 1
int is_path_absolute(char const * const prefix,
char const * const path);
void daemonize_enable(void); void daemonize_enable(void);
int daemonize_with_pidfile(char const * const pidfile); int daemonize_with_pidfile(char const * const pidfile);