mirror of
https://github.com/optim-enterprises-bv/nDPId.git
synced 2025-10-29 17:32:23 +00:00
Replaced ambiguous naming of "JSON string" to more accurate "JSON message".
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
@@ -17,7 +17,7 @@
|
||||
- Fixed a bug in base64 encoding which could lead to invalid base64 strings
|
||||
- Added some machine learning examples
|
||||
- Fixed various smaller bugs
|
||||
- Fixed nDPIsrvd bug which causes invalid JSON strings sent to Distributors
|
||||
- Fixed nDPIsrvd bug which causes invalid JSON messages sent to Distributors
|
||||
|
||||
#### nDPId 1.5 (Apr 2022)
|
||||
|
||||
|
||||
10
README.md
10
README.md
@@ -29,7 +29,7 @@ The daemon `nDPId` is capable of multithreading for packet processing, but w/o m
|
||||
Instead, synchronization is achieved by a packet distribution mechanism.
|
||||
To balance the workload to all threads (more or less) equally, a unique identifier represented as hash value is calculated using a 3-tuple consisting of: IPv4/IPv6 src/dst address; IP header value of the layer4 protocol; and (for TCP/UDP) src/dst port. Other protocols e.g. ICMP/ICMPv6 lack relevance for DPI, thus nDPId does not distinguish between different ICMP/ICMPv6 flows coming from the same host. This saves memory and performance, but might change in the future.
|
||||
|
||||
`nDPId` uses libnDPI's JSON serialization interface to generate a JSON strings for each event it receives from the library and which it then sends out to a UNIX-socket (default: `/tmp/ndpid-collector.sock` ). From such a socket, `nDPIsrvd` (or other custom applications) can retrieve incoming JSON-messages and further proceed working/distributing messages to higher-level applications.
|
||||
`nDPId` uses libnDPI's JSON serialization interface to generate a JSON messages for each event it receives from the library and which it then sends out to a UNIX-socket (default: `/tmp/ndpid-collector.sock` ). From such a socket, `nDPIsrvd` (or other custom applications) can retrieve incoming JSON-messages and further proceed working/distributing messages to higher-level applications.
|
||||
|
||||
Unfortunately, `nDPIsrvd` does not yet support any encryption/authentication for TCP connections (TODO!).
|
||||
|
||||
@@ -71,11 +71,11 @@ where:
|
||||
|
||||
JSON messages streamed by both `nDPId` and `nDPIsrvd` are presented with:
|
||||
|
||||
* a 5-digit-number describing (as decimal number) the **entire** JSON string including the newline `\n` at the end;
|
||||
* a 5-digit-number describing (as decimal number) the **entire** JSON message including the newline `\n` at the end;
|
||||
* the JSON messages
|
||||
|
||||
```text
|
||||
[5-digit-number][JSON string]
|
||||
[5-digit-number][JSON message]
|
||||
```
|
||||
|
||||
as with the following example:
|
||||
@@ -93,8 +93,8 @@ Technical details about the JSON-message format can be obtained from the related
|
||||
|
||||
# Events
|
||||
|
||||
`nDPId` generates JSON strings whereby each string is assigned to a certain event.
|
||||
Those events specify the contents (key-value-pairs) of the JSON string.
|
||||
`nDPId` generates JSON messages whereby each string is assigned to a certain event.
|
||||
Those events specify the contents (key-value-pairs) of the JSON message.
|
||||
They are divided into four categories, each with a number of subevents.
|
||||
|
||||
## Error Events
|
||||
|
||||
54
dependencies/nDPIsrvd.h
vendored
54
dependencies/nDPIsrvd.h
vendored
@@ -207,9 +207,9 @@ struct nDPIsrvd_buffer
|
||||
struct nDPIsrvd_json_buffer
|
||||
{
|
||||
struct nDPIsrvd_buffer buf;
|
||||
char * json_string;
|
||||
size_t json_string_start;
|
||||
nDPIsrvd_ull json_string_length;
|
||||
char * json_message;
|
||||
size_t json_message_start;
|
||||
nDPIsrvd_ull json_message_length;
|
||||
};
|
||||
|
||||
struct nDPIsrvd_jsmn
|
||||
@@ -402,9 +402,9 @@ static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer)
|
||||
|
||||
static inline void nDPIsrvd_json_buffer_reset(struct nDPIsrvd_json_buffer * const json_buffer)
|
||||
{
|
||||
json_buffer->json_string_start = 0ul;
|
||||
json_buffer->json_string_length = 0ull;
|
||||
json_buffer->json_string = NULL;
|
||||
json_buffer->json_message_start = 0ul;
|
||||
json_buffer->json_message_length = 0ull;
|
||||
json_buffer->json_message = NULL;
|
||||
}
|
||||
|
||||
static inline int nDPIsrvd_json_buffer_init(struct nDPIsrvd_json_buffer * const json_buffer, size_t json_buffer_size)
|
||||
@@ -814,11 +814,11 @@ static inline nDPIsrvd_hashkey nDPIsrvd_build_key(char const * str, int len)
|
||||
static inline void nDPIsrvd_drain_buffer(struct nDPIsrvd_json_buffer * const json_buffer)
|
||||
{
|
||||
memmove(json_buffer->buf.ptr.raw,
|
||||
json_buffer->buf.ptr.raw + json_buffer->json_string_length,
|
||||
json_buffer->buf.used - json_buffer->json_string_length);
|
||||
json_buffer->buf.used -= json_buffer->json_string_length;
|
||||
json_buffer->json_string_length = 0;
|
||||
json_buffer->json_string_start = 0;
|
||||
json_buffer->buf.ptr.raw + json_buffer->json_message_length,
|
||||
json_buffer->buf.used - json_buffer->json_message_length);
|
||||
json_buffer->buf.used -= json_buffer->json_message_length;
|
||||
json_buffer->json_message_length = 0;
|
||||
json_buffer->json_message_start = 0;
|
||||
}
|
||||
|
||||
static inline nDPIsrvd_hashkey nDPIsrvd_vbuild_jsmn_key(char const * const json_key, va_list ap)
|
||||
@@ -883,7 +883,7 @@ static inline char const * nDPIsrvd_get_jsmn_token_value(struct nDPIsrvd_socket
|
||||
*value_length = jt->end - jt->start;
|
||||
}
|
||||
|
||||
return sock->buffer.json_string + jt->start;
|
||||
return sock->buffer.json_message + jt->start;
|
||||
}
|
||||
|
||||
static inline char const * nDPIsrvd_jsmn_token_to_string(struct nDPIsrvd_socket const * const sock,
|
||||
@@ -905,7 +905,7 @@ static inline char const * nDPIsrvd_jsmn_token_to_string(struct nDPIsrvd_socket
|
||||
*string_length = jt->end - jt->start;
|
||||
}
|
||||
|
||||
return sock->buffer.json_string + jt->start;
|
||||
return sock->buffer.json_message + jt->start;
|
||||
}
|
||||
|
||||
static inline int nDPIsrvd_get_token_size(struct nDPIsrvd_socket const * const sock,
|
||||
@@ -931,7 +931,7 @@ static inline char const * nDPIsrvd_get_token_value(struct nDPIsrvd_socket const
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return sock->buffer.json_string + t->start;
|
||||
return sock->buffer.json_message + t->start;
|
||||
}
|
||||
|
||||
static inline struct nDPIsrvd_json_token const * nDPIsrvd_get_next_token(struct nDPIsrvd_socket const * const sock,
|
||||
@@ -1123,7 +1123,7 @@ static inline int nDPIsrvd_walk_tokens(
|
||||
int i, j;
|
||||
jsmntok_t const * key;
|
||||
jsmntok_t const * const t = &sock->jsmn.tokens[b];
|
||||
char const * const js = sock->buffer.json_string;
|
||||
char const * const js = sock->buffer.json_message;
|
||||
|
||||
if (depth >= 16)
|
||||
{
|
||||
@@ -1444,36 +1444,36 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_jso
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
json_buffer->json_string_length = strtoull((const char *)json_buffer->buf.ptr.text, &json_buffer->json_string, 10);
|
||||
json_buffer->json_string_length += json_buffer->json_string - json_buffer->buf.ptr.text;
|
||||
json_buffer->json_string_start = json_buffer->json_string - json_buffer->buf.ptr.text;
|
||||
json_buffer->json_message_length = strtoull((const char *)json_buffer->buf.ptr.text, &json_buffer->json_message, 10);
|
||||
json_buffer->json_message_length += json_buffer->json_message - json_buffer->buf.ptr.text;
|
||||
json_buffer->json_message_start = json_buffer->json_message - json_buffer->buf.ptr.text;
|
||||
|
||||
if (errno == ERANGE)
|
||||
{
|
||||
return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT;
|
||||
}
|
||||
if (json_buffer->json_string == json_buffer->buf.ptr.text)
|
||||
if (json_buffer->json_message == json_buffer->buf.ptr.text)
|
||||
{
|
||||
return PARSE_SIZE_MISSING;
|
||||
}
|
||||
if (json_buffer->json_string_length > json_buffer->buf.max)
|
||||
if (json_buffer->json_message_length > json_buffer->buf.max)
|
||||
{
|
||||
return PARSE_STRING_TOO_BIG;
|
||||
}
|
||||
if (json_buffer->json_string_length > json_buffer->buf.used)
|
||||
if (json_buffer->json_message_length > json_buffer->buf.used)
|
||||
{
|
||||
return PARSE_NEED_MORE_DATA;
|
||||
}
|
||||
if (json_buffer->buf.ptr.text[json_buffer->json_string_length - 2] != '}' ||
|
||||
json_buffer->buf.ptr.text[json_buffer->json_string_length - 1] != '\n')
|
||||
if (json_buffer->buf.ptr.text[json_buffer->json_message_length - 2] != '}' ||
|
||||
json_buffer->buf.ptr.text[json_buffer->json_message_length - 1] != '\n')
|
||||
{
|
||||
return PARSE_INVALID_CLOSING_CHAR;
|
||||
}
|
||||
|
||||
jsmn_init(&jsmn->parser);
|
||||
jsmn->tokens_found = jsmn_parse(&jsmn->parser,
|
||||
json_buffer->buf.ptr.text + json_buffer->json_string_start,
|
||||
json_buffer->json_string_length - json_buffer->json_string_start,
|
||||
json_buffer->buf.ptr.text + json_buffer->json_message_start,
|
||||
json_buffer->json_message_length - json_buffer->json_message_start,
|
||||
jsmn->tokens,
|
||||
nDPIsrvd_MAX_JSON_TOKENS);
|
||||
if (jsmn->tokens_found < 0 || jsmn->tokens[0].type != JSMN_OBJECT)
|
||||
@@ -1685,7 +1685,7 @@ static inline int nDPIsrvd_json_buffer_length(struct nDPIsrvd_socket const * con
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (int)sock->buffer.json_string_length - NETWORK_BUFFER_LENGTH_DIGITS;
|
||||
return (int)sock->buffer.json_message_length - NETWORK_BUFFER_LENGTH_DIGITS;
|
||||
}
|
||||
|
||||
static inline char const * nDPIsrvd_json_buffer_string(struct nDPIsrvd_socket const * const sock)
|
||||
@@ -1695,7 +1695,7 @@ static inline char const * nDPIsrvd_json_buffer_string(struct nDPIsrvd_socket co
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return sock->buffer.json_string;
|
||||
return sock->buffer.json_message;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
3
dependencies/nDPIsrvd.py
vendored
3
dependencies/nDPIsrvd.py
vendored
@@ -531,7 +531,8 @@ def defaultArgumentParser(desc='nDPIsrvd Python Interface', enable_json_filter=F
|
||||
parser.add_argument('--unix', type=str, help='nDPIsrvd unix socket path')
|
||||
if enable_json_filter is True:
|
||||
parser.add_argument('--filter', type=str, action='append',
|
||||
help='Set a filter string which if evaluates to True will invoke the JSON callback.')
|
||||
help='Set a filter string which if evaluates to True will invoke the JSON callback.\n'
|
||||
'Example: json_dict[\'flow_event_name\'] == \'detected\' will only process \'detected\' events.')
|
||||
return parser
|
||||
|
||||
def toSeconds(usec):
|
||||
|
||||
@@ -61,10 +61,10 @@ Use sklearn together with CSVs created with **c-analysed** to train and predict
|
||||
Try it with: `./examples/py-machine-learning/sklearn_random_forest.py --csv ./ndpi-analysed.csv --proto-class tls.youtube --proto-class tls.github --proto-class tls.spotify --proto-class tls.facebook --proto-class tls.instagram --proto-class tls.doh_dot --proto-class quic --proto-class icmp`
|
||||
|
||||
This way you should get 9 different classification classes.
|
||||
You may notice that some classes e.g. TLS protocol classifications may have a higher false-negative rate.
|
||||
You may notice that some classes e.g. TLS protocol classifications have a higher false-negative/false-positive rate.
|
||||
Unfortunately, I can not provide any datasets due to some privacy concerns.
|
||||
|
||||
But you can use a [pre-trained model](https://drive.google.com/file/d/1KEwbP-Gx7KJr54wNoa63I56VI4USCAPL/view?usp=sharing) with `--load-model`.
|
||||
But you may use a [pre-trained model](https://drive.google.com/file/d/1KEwbP-Gx7KJr54wNoa63I56VI4USCAPL/view?usp=sharing) with `--load-model`.
|
||||
|
||||
## py-flow-dashboard
|
||||
|
||||
@@ -81,11 +81,11 @@ Dump received and parsed JSON objects.
|
||||
|
||||
## py-schema-validation
|
||||
|
||||
Validate nDPId JSON strings against pre-defined JSON schema's.
|
||||
Validate nDPId JSON messages against pre-defined JSON schema's.
|
||||
See `schema/`.
|
||||
Required by `tests/run_tests.sh`
|
||||
|
||||
## py-semantic-validation
|
||||
|
||||
Validate nDPId JSON strings against internal event semantics.
|
||||
Validate nDPId JSON messages against internal event semantics.
|
||||
Required by `tests/run_tests.sh`
|
||||
|
||||
@@ -625,7 +625,7 @@ static enum nDPIsrvd_callback_return collectd_json_callback(struct nDPIsrvd_sock
|
||||
struct flow_user_data * flow_user_data = NULL;
|
||||
|
||||
collectd_statistics.json_lines++;
|
||||
collectd_statistics.json_bytes += sock->buffer.json_string_length + NETWORK_BUFFER_LENGTH_DIGITS;
|
||||
collectd_statistics.json_bytes += sock->buffer.json_message_length + NETWORK_BUFFER_LENGTH_DIGITS;
|
||||
|
||||
struct nDPIsrvd_json_token const * const packet_event_name = TOKEN_GET_SZ(sock, "packet_event_name");
|
||||
if (packet_event_name != NULL)
|
||||
|
||||
@@ -65,24 +65,24 @@ int main(void)
|
||||
exit(1);
|
||||
}
|
||||
|
||||
char * json_str_start = NULL;
|
||||
json_bytes = strtoull((char *)buf, &json_str_start, 10);
|
||||
json_bytes += (uint8_t *)json_str_start - buf;
|
||||
json_start = (uint8_t *)json_str_start - buf;
|
||||
char * json_msg_start = NULL;
|
||||
json_bytes = strtoull((char *)buf, &json_msg_start, 10);
|
||||
json_bytes += (uint8_t *)json_msg_start - buf;
|
||||
json_start = (uint8_t *)json_msg_start - buf;
|
||||
|
||||
if (errno == ERANGE)
|
||||
{
|
||||
fprintf(stderr, "BUG: Size of JSON exceeds limit\n");
|
||||
exit(1);
|
||||
}
|
||||
if ((uint8_t *)json_str_start == buf)
|
||||
if ((uint8_t *)json_msg_start == buf)
|
||||
{
|
||||
fprintf(stderr, "BUG: Missing size before JSON string: \"%.*s\"\n", NETWORK_BUFFER_LENGTH_DIGITS, buf);
|
||||
fprintf(stderr, "BUG: Missing size before JSON message: \"%.*s\"\n", NETWORK_BUFFER_LENGTH_DIGITS, buf);
|
||||
exit(1);
|
||||
}
|
||||
if (json_bytes > sizeof(buf))
|
||||
{
|
||||
fprintf(stderr, "BUG: JSON string too big: %llu > %zu\n", json_bytes, sizeof(buf));
|
||||
fprintf(stderr, "BUG: JSON message too big: %llu > %zu\n", json_bytes, sizeof(buf));
|
||||
exit(1);
|
||||
}
|
||||
if (json_bytes > buf_used)
|
||||
@@ -92,7 +92,7 @@ int main(void)
|
||||
|
||||
if (buf[json_bytes - 2] != '}' || buf[json_bytes - 1] != '\n')
|
||||
{
|
||||
fprintf(stderr, "BUG: Invalid JSON string: \"%.*s\"\n", (int)json_bytes, buf);
|
||||
fprintf(stderr, "BUG: Invalid JSON message: \"%.*s\"\n", (int)json_bytes, buf);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ int main(void)
|
||||
if (r < 1 || tokens[0].type != JSMN_OBJECT)
|
||||
{
|
||||
fprintf(stderr, "JSON parsing failed with return value %d at position %u\n", r, parser.pos);
|
||||
fprintf(stderr, "JSON string: '%.*s'\n", (int)(json_bytes - json_start), (char *)(buf + json_start));
|
||||
fprintf(stderr, "JSON message: '%.*s'\n", (int)(json_bytes - json_start), (char *)(buf + json_start));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
||||
@@ -193,7 +193,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
|
||||
if (flow_last_seen is not None and 'flow_idle_time' not in json_dict) or \
|
||||
(flow_last_seen is None and 'flow_idle_time' in json_dict):
|
||||
raise SemanticValidationException(current_flow,
|
||||
'Got a JSON string with only 2 of 3 keys, ' \
|
||||
'Got a JSON message with only 2 of 3 keys, ' \
|
||||
'required for timeout handling: flow_idle_time')
|
||||
|
||||
if 'thread_ts_usec' in json_dict:
|
||||
@@ -213,7 +213,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
|
||||
try:
|
||||
if current_flow.flow_ended == True:
|
||||
raise SemanticValidationException(current_flow,
|
||||
'Received JSON string for a flow that already ended/idled.')
|
||||
'Received JSON message for a flow that already ended/idled.')
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
62
nDPId-test.c
62
nDPId-test.c
@@ -102,9 +102,9 @@ struct distributor_global_user_data
|
||||
|
||||
unsigned long long int shutdown_events;
|
||||
|
||||
unsigned long long int json_string_len_min;
|
||||
unsigned long long int json_string_len_max;
|
||||
double json_string_len_avg;
|
||||
unsigned long long int json_message_len_min;
|
||||
unsigned long long int json_message_len_max;
|
||||
double json_message_len_avg;
|
||||
|
||||
unsigned long long int cur_active_flows;
|
||||
unsigned long long int cur_idle_flows;
|
||||
@@ -551,7 +551,7 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
||||
struct distributor_flow_user_data * flow_stats = NULL;
|
||||
|
||||
#if 0
|
||||
printf("Distributor: %.*s\n", (int)sock->buffer.json_string_length, sock->buffer.json_string);
|
||||
printf("Distributor: %.*s\n", (int)sock->buffer.json_message_length, sock->buffer.json_message);
|
||||
#endif
|
||||
|
||||
if (thread_data != NULL)
|
||||
@@ -563,16 +563,16 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
|
||||
flow_stats = (struct distributor_flow_user_data *)flow->flow_user_data;
|
||||
}
|
||||
|
||||
if (sock->buffer.json_string_length < global_stats->json_string_len_min)
|
||||
if (sock->buffer.json_message_length < global_stats->json_message_len_min)
|
||||
{
|
||||
global_stats->json_string_len_min = sock->buffer.json_string_length;
|
||||
global_stats->json_message_len_min = sock->buffer.json_message_length;
|
||||
}
|
||||
if (sock->buffer.json_string_length > global_stats->json_string_len_max)
|
||||
if (sock->buffer.json_message_length > global_stats->json_message_len_max)
|
||||
{
|
||||
global_stats->json_string_len_max = sock->buffer.json_string_length;
|
||||
global_stats->json_message_len_max = sock->buffer.json_message_length;
|
||||
}
|
||||
global_stats->json_string_len_avg = (global_stats->json_string_len_avg +
|
||||
(global_stats->json_string_len_max + global_stats->json_string_len_min) / 2) /
|
||||
global_stats->json_message_len_avg = (global_stats->json_message_len_avg +
|
||||
(global_stats->json_message_len_max + global_stats->json_message_len_min) / 2) /
|
||||
2;
|
||||
|
||||
global_stats->total_events_deserialized++;
|
||||
@@ -910,7 +910,7 @@ static enum nDPIsrvd_callback_return distributor_json_printer(struct nDPIsrvd_so
|
||||
}
|
||||
|
||||
printf("%0" NETWORK_BUFFER_LENGTH_DIGITS_STR "llu%.*s",
|
||||
sock->buffer.json_string_length - NETWORK_BUFFER_LENGTH_DIGITS,
|
||||
sock->buffer.json_message_length - NETWORK_BUFFER_LENGTH_DIGITS,
|
||||
nDPIsrvd_json_buffer_length(sock),
|
||||
nDPIsrvd_json_buffer_string(sock));
|
||||
return CALLBACK_OK;
|
||||
@@ -1008,10 +1008,10 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
||||
}
|
||||
|
||||
sock_stats = (struct distributor_global_user_data *)mock_sock->global_user_data;
|
||||
sock_stats->json_string_len_min = (unsigned long long int)-1;
|
||||
sock_stats->json_message_len_min = (unsigned long long int)-1;
|
||||
sock_stats->options.do_hash_checks = 1;
|
||||
buff_stats = (struct distributor_global_user_data *)mock_buff->global_user_data;
|
||||
buff_stats->json_string_len_min = (unsigned long long int)-1;
|
||||
buff_stats->json_message_len_min = (unsigned long long int)-1;
|
||||
buff_stats->options.do_hash_checks = 0;
|
||||
mock_null_shutdown_events = (int *)mock_null->global_user_data;
|
||||
*mock_null_shutdown_events = 0;
|
||||
@@ -1065,12 +1065,12 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
||||
{
|
||||
logger(1, "JSON parsing failed: %s", nDPIsrvd_enum_to_string(parse_ret));
|
||||
logger(1,
|
||||
"Problematic JSON string (mock sock, start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
||||
mock_sock->buffer.json_string_start,
|
||||
mock_sock->buffer.json_string_length,
|
||||
"Problematic JSON message (mock sock, start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
||||
mock_sock->buffer.json_message_start,
|
||||
mock_sock->buffer.json_message_length,
|
||||
mock_sock->buffer.buf.used,
|
||||
(int)mock_sock->buffer.json_string_length,
|
||||
mock_sock->buffer.json_string);
|
||||
(int)mock_sock->buffer.json_message_length,
|
||||
mock_sock->buffer.json_message);
|
||||
THREAD_ERROR_GOTO(trv);
|
||||
}
|
||||
|
||||
@@ -1100,12 +1100,12 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
||||
{
|
||||
logger(1, "JSON parsing failed: %s", nDPIsrvd_enum_to_string(parse_ret));
|
||||
logger(1,
|
||||
"Problematic JSON string (buff sock, start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
||||
mock_buff->buffer.json_string_start,
|
||||
mock_buff->buffer.json_string_length,
|
||||
"Problematic JSON message (buff sock, start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
||||
mock_buff->buffer.json_message_start,
|
||||
mock_buff->buffer.json_message_length,
|
||||
mock_buff->buffer.buf.used,
|
||||
(int)mock_buff->buffer.json_string_length,
|
||||
mock_buff->buffer.json_string);
|
||||
(int)mock_buff->buffer.json_message_length,
|
||||
mock_buff->buffer.json_message);
|
||||
THREAD_ERROR_GOTO(trv);
|
||||
}
|
||||
|
||||
@@ -1135,12 +1135,12 @@ static void * distributor_client_mainloop_thread(void * const arg)
|
||||
{
|
||||
logger(1, "JSON parsing failed: %s", nDPIsrvd_enum_to_string(parse_ret));
|
||||
logger(1,
|
||||
"Problematic JSON string (buff sock, start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
||||
mock_null->buffer.json_string_start,
|
||||
mock_null->buffer.json_string_length,
|
||||
"Problematic JSON message (buff sock, start: %zu, length: %llu, buffer usage: %zu): %.*s",
|
||||
mock_null->buffer.json_message_start,
|
||||
mock_null->buffer.json_message_length,
|
||||
mock_null->buffer.buf.used,
|
||||
(int)mock_null->buffer.json_string_length,
|
||||
mock_null->buffer.json_string);
|
||||
(int)mock_null->buffer.json_message_length,
|
||||
mock_null->buffer.json_message);
|
||||
THREAD_ERROR_GOTO(trv);
|
||||
}
|
||||
}
|
||||
@@ -1855,9 +1855,9 @@ int main(int argc, char ** argv)
|
||||
"~~ json string min len.......: %llu chars\n"
|
||||
"~~ json string max len.......: %llu chars\n"
|
||||
"~~ json string avg len.......: %llu chars\n",
|
||||
distributor_return.stats.json_string_len_min,
|
||||
distributor_return.stats.json_string_len_max,
|
||||
(unsigned long long int)distributor_return.stats.json_string_len_avg);
|
||||
distributor_return.stats.json_message_len_min,
|
||||
distributor_return.stats.json_message_len_max,
|
||||
(unsigned long long int)distributor_return.stats.json_message_len_avg);
|
||||
}
|
||||
|
||||
if (MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0) != MT_GET_AND_ADD(ndpi_memory_free_bytes, 0) ||
|
||||
|
||||
42
nDPId.c
42
nDPId.c
@@ -2281,37 +2281,37 @@ static int connect_to_collector(struct nDPId_reader_thread * const reader_thread
|
||||
}
|
||||
|
||||
static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
||||
char const * const json_str,
|
||||
size_t json_str_len)
|
||||
char const * const json_msg,
|
||||
size_t json_msg_len)
|
||||
{
|
||||
struct nDPId_workflow * const workflow = reader_thread->workflow;
|
||||
int saved_errno;
|
||||
int s_ret;
|
||||
char newline_json_str[NETWORK_BUFFER_MAX_SIZE];
|
||||
char newline_json_msg[NETWORK_BUFFER_MAX_SIZE];
|
||||
|
||||
s_ret = snprintf(newline_json_str,
|
||||
sizeof(newline_json_str),
|
||||
s_ret = snprintf(newline_json_msg,
|
||||
sizeof(newline_json_msg),
|
||||
"%0" NETWORK_BUFFER_LENGTH_DIGITS_STR "zu%.*s\n",
|
||||
json_str_len + 1,
|
||||
(int)json_str_len,
|
||||
json_str);
|
||||
json_msg_len + 1,
|
||||
(int)json_msg_len,
|
||||
json_msg);
|
||||
|
||||
if (s_ret < 0 || s_ret >= (int)sizeof(newline_json_str))
|
||||
if (s_ret < 0 || s_ret >= (int)sizeof(newline_json_msg))
|
||||
{
|
||||
logger(1,
|
||||
"[%8llu, %zu] JSON buffer prepare failed: snprintf returned %d, buffer size %zu",
|
||||
workflow->packets_captured,
|
||||
reader_thread->array_index,
|
||||
s_ret,
|
||||
sizeof(newline_json_str));
|
||||
if (s_ret >= (int)sizeof(newline_json_str))
|
||||
sizeof(newline_json_msg));
|
||||
if (s_ret >= (int)sizeof(newline_json_msg))
|
||||
{
|
||||
logger(1,
|
||||
"[%8llu, %zu] JSON string: %.*s...",
|
||||
"[%8llu, %zu] JSON message: %.*s...",
|
||||
workflow->packets_captured,
|
||||
reader_thread->array_index,
|
||||
ndpi_min(512, NETWORK_BUFFER_MAX_SIZE),
|
||||
newline_json_str);
|
||||
newline_json_msg);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -2352,7 +2352,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
||||
errno = 0;
|
||||
ssize_t written;
|
||||
if (reader_thread->collector_sock_last_errno == 0 &&
|
||||
(written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret)
|
||||
(written = write(reader_thread->collector_sockfd, newline_json_msg, s_ret)) != s_ret)
|
||||
{
|
||||
saved_errno = errno;
|
||||
if (saved_errno == EPIPE || written == 0)
|
||||
@@ -2379,7 +2379,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
||||
{
|
||||
size_t pos = (written < 0 ? 0 : written);
|
||||
set_collector_block(reader_thread);
|
||||
while ((size_t)(written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) !=
|
||||
while ((size_t)(written = write(reader_thread->collector_sockfd, newline_json_msg + pos, s_ret - pos)) !=
|
||||
s_ret - pos)
|
||||
{
|
||||
saved_errno = errno;
|
||||
@@ -2415,22 +2415,22 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
|
||||
|
||||
static void serialize_and_send(struct nDPId_reader_thread * const reader_thread)
|
||||
{
|
||||
char * json_str;
|
||||
uint32_t json_str_len;
|
||||
char * json_msg;
|
||||
uint32_t json_msg_len;
|
||||
|
||||
json_str = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &json_str_len);
|
||||
if (json_str == NULL || json_str_len == 0)
|
||||
json_msg = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &json_msg_len);
|
||||
if (json_msg == NULL || json_msg_len == 0)
|
||||
{
|
||||
logger(1,
|
||||
"[%8llu, %zu] jsonize failed, buffer length: %u",
|
||||
reader_thread->workflow->packets_captured,
|
||||
reader_thread->array_index,
|
||||
json_str_len);
|
||||
json_msg_len);
|
||||
}
|
||||
else
|
||||
{
|
||||
reader_thread->workflow->total_events_serialized++;
|
||||
send_to_collector(reader_thread, json_str, json_str_len);
|
||||
send_to_collector(reader_thread, json_msg, json_msg_len);
|
||||
}
|
||||
ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer);
|
||||
}
|
||||
|
||||
26
nDPIsrvd.c
26
nDPIsrvd.c
@@ -229,7 +229,7 @@ static UT_array * get_additional_write_buffers(struct remote_desc * const remote
|
||||
|
||||
static int add_to_additional_write_buffers(struct remote_desc * const remote,
|
||||
uint8_t * const buf,
|
||||
nDPIsrvd_ull json_string_length)
|
||||
nDPIsrvd_ull json_message_length)
|
||||
{
|
||||
struct nDPIsrvd_write_buffer buf_src = {};
|
||||
UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
|
||||
@@ -264,7 +264,7 @@ static int add_to_additional_write_buffers(struct remote_desc * const remote,
|
||||
}
|
||||
|
||||
buf_src.buf.ptr.raw = buf;
|
||||
buf_src.buf.used = buf_src.buf.max = json_string_length;
|
||||
buf_src.buf.used = buf_src.buf.max = json_message_length;
|
||||
utarray_push_back(additional_write_buffers, &buf_src);
|
||||
|
||||
return 0;
|
||||
@@ -1142,7 +1142,7 @@ static int new_connection(struct nio * const io, int eventfd)
|
||||
static int handle_collector_protocol(struct nio * const io, struct remote_desc * const current)
|
||||
{
|
||||
struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
|
||||
char * json_str_start = NULL;
|
||||
char * json_msg_start = NULL;
|
||||
|
||||
if (json_read_buffer == NULL)
|
||||
{
|
||||
@@ -1160,28 +1160,28 @@ static int handle_collector_protocol(struct nio * const io, struct remote_desc *
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
current->event_collector_un.json_bytes = strtoull(json_read_buffer->buf.ptr.text, &json_str_start, 10);
|
||||
current->event_collector_un.json_bytes += json_str_start - json_read_buffer->buf.ptr.text;
|
||||
current->event_collector_un.json_bytes = strtoull(json_read_buffer->buf.ptr.text, &json_msg_start, 10);
|
||||
current->event_collector_un.json_bytes += json_msg_start - json_read_buffer->buf.ptr.text;
|
||||
|
||||
if (errno == ERANGE)
|
||||
{
|
||||
logger_nDPIsrvd(current, "BUG: Collector connection", "JSON string length exceeds numceric limits");
|
||||
logger_nDPIsrvd(current, "BUG: Collector connection", "JSON message length exceeds numceric limits");
|
||||
disconnect_client(io, current);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (json_str_start == json_read_buffer->buf.ptr.text)
|
||||
if (json_msg_start == json_read_buffer->buf.ptr.text)
|
||||
{
|
||||
logger_nDPIsrvd(current,
|
||||
"BUG: Collector connection",
|
||||
"missing JSON string length in protocol preamble: \"%.*s\"",
|
||||
"missing JSON message length in protocol preamble: \"%.*s\"",
|
||||
NETWORK_BUFFER_LENGTH_DIGITS,
|
||||
json_read_buffer->buf.ptr.text);
|
||||
disconnect_client(io, current);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (json_str_start - json_read_buffer->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS)
|
||||
if (json_msg_start - json_read_buffer->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS)
|
||||
{
|
||||
logger_nDPIsrvd(current,
|
||||
"BUG: Collector connection",
|
||||
@@ -1189,14 +1189,14 @@ static int handle_collector_protocol(struct nio * const io, struct remote_desc *
|
||||
"%ld "
|
||||
"bytes",
|
||||
NETWORK_BUFFER_LENGTH_DIGITS,
|
||||
(long int)(json_str_start - json_read_buffer->buf.ptr.text));
|
||||
(long int)(json_msg_start - json_read_buffer->buf.ptr.text));
|
||||
}
|
||||
|
||||
if (current->event_collector_un.json_bytes > json_read_buffer->buf.max)
|
||||
{
|
||||
logger_nDPIsrvd(current,
|
||||
"BUG: Collector connection",
|
||||
"JSON string too big: %llu > %zu",
|
||||
"JSON message too big: %llu > %zu",
|
||||
current->event_collector_un.json_bytes,
|
||||
json_read_buffer->buf.max);
|
||||
disconnect_client(io, current);
|
||||
@@ -1213,7 +1213,7 @@ static int handle_collector_protocol(struct nio * const io, struct remote_desc *
|
||||
{
|
||||
logger_nDPIsrvd(current,
|
||||
"BUG: Collector connection",
|
||||
"invalid JSON string: %.*s...",
|
||||
"invalid JSON message: %.*s...",
|
||||
(int)current->event_collector_un.json_bytes > 512 ? 512
|
||||
: (int)current->event_collector_un.json_bytes,
|
||||
json_read_buffer->buf.ptr.text);
|
||||
@@ -1244,7 +1244,7 @@ static int handle_incoming_data(struct nio * const io, struct remote_desc * cons
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* read JSON strings (or parts) from the UNIX socket (collecting) */
|
||||
/* read JSON messages (or parts) from the UNIX socket (collecting) */
|
||||
if (json_read_buffer->buf.used == json_read_buffer->buf.max)
|
||||
{
|
||||
logger_nDPIsrvd(current,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# schema
|
||||
|
||||
All schema's placed in here are nDPId exclusive, meaning that they are not necessarily representing a "real-world" JSON string received by e.g. `./example/py-json-stdout`.
|
||||
All schema's placed in here are nDPId exclusive, meaning that they are not necessarily representing a "real-world" JSON message received by e.g. `./example/py-json-stdout`.
|
||||
This is due to the fact that libnDPI itself add's some JSON information to the serializer of which we have no control over.
|
||||
IMHO it makes no sense to include stuff here that is part of libnDPI.
|
||||
|
||||
Reference in New Issue
Block a user