From d99bd825b23e4277d88a5149cfbbbc73f2ac206e Mon Sep 17 00:00:00 2001 From: Toni Uhlig Date: Thu, 9 Jul 2020 22:40:46 +0200 Subject: [PATCH] send json string to sink, added basic json event serialization fn call Signed-off-by: Toni Uhlig --- nDPId-collect.py | 13 +++++- nDPId.c | 110 +++++++++++++++++++++++++++++++---------------- 2 files changed, 84 insertions(+), 39 deletions(-) diff --git a/nDPId-collect.py b/nDPId-collect.py index 7f0438ba..5134a29a 100755 --- a/nDPId-collect.py +++ b/nDPId-collect.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +import json import sys import asyncio @@ -12,7 +13,17 @@ class EchoServer(asyncio.Protocol): def data_received(self, data): message = data.decode() - print('{!r}'.format(message)) + out = str() + for line in message.split('\n'): + if len(line) == 0: + continue + try: + json_object = json.loads(line) + line = json.dumps(json_object, indent=2) + except json.decoder.JSONDecodeError as err: + sys.stderr.write('{}\n ERROR: {} -> {!r}\n{}\n'.format('-'*64, str(err), str(line), '-'*64)) + return + print('{}'.format(line)) loop = asyncio.get_event_loop() coro = loop.create_unix_server(EchoServer, JSON_SOCKPATH) diff --git a/nDPId.c b/nDPId.c index 92e8fa6c..b8b3d647 100644 --- a/nDPId.c +++ b/nDPId.c @@ -594,6 +594,51 @@ static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thre return 0; } +static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, + char const * const json_str, size_t json_str_len) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + int saved_errno; + int s_ret; + char newline_json_str[BUFSIZ]; + + s_ret = snprintf(newline_json_str, sizeof(newline_json_str), "%.*s\n", (int)json_str_len, json_str); + if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str)) { + syslog(LOG_DAEMON | LOG_ERR, + "[%8llu, %d] JSON buffer prepare failed", + workflow->packets_captured, + reader_thread->array_index); + return; + } + + if (reader_thread->json_sock_reconnect != 0) { + if (connect_to_json_socket(reader_thread) == 0) { + syslog(LOG_DAEMON | LOG_ERR, + "[%8llu, %d] Reconnected to JSON sink", + workflow->packets_captured, + reader_thread->array_index); + } + } + + if (reader_thread->json_sock_reconnect == 0 && + send(reader_thread->json_sockfd, newline_json_str, s_ret, MSG_NOSIGNAL) < 0) + { + saved_errno = errno; + syslog(LOG_DAEMON | LOG_ERR, + "[%8llu, %d] send data to JSON sink failed: %s", + workflow->packets_captured, + reader_thread->array_index, + strerror(saved_errno)); + if (saved_errno == EPIPE) { + syslog(LOG_DAEMON | LOG_ERR, + "[%8llu, %d] Lost connection to JSON sink", + workflow->packets_captured, + reader_thread->array_index); + } + reader_thread->json_sock_reconnect = 1; + } +} + static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, struct nDPId_flow_info const * const flow, enum flow_event event) @@ -601,7 +646,6 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, char * json_str; uint32_t json_str_len = 0; struct nDPId_workflow * const workflow = reader_thread->workflow; - int saved_errno; switch (event) { case FLOW_NEW: @@ -625,7 +669,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, } json_str = jsonize_flow(workflow, flow, &json_str_len); - if (json_str == NULL) { + if (json_str == NULL || json_str_len == 0) { syslog(LOG_DAEMON | LOG_ERR, "[%8llu, %d, %4u] jsonize failed, buffer length: %u\n", workflow->packets_captured, @@ -633,50 +677,31 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, flow->flow_id, json_str_len); } else { - if (reader_thread->json_sock_reconnect != 0) { - if (connect_to_json_socket(reader_thread) == 0) { - syslog(LOG_DAEMON | LOG_ERR, - "[%8llu, %d, %4u] Reconnected to JSON sink", - workflow->packets_captured, - reader_thread->array_index, - flow->flow_id); - } - } - - if (reader_thread->json_sock_reconnect == 0 && - send(reader_thread->json_sockfd, json_str, json_str_len, MSG_NOSIGNAL) < 0) - { - saved_errno = errno; - syslog(LOG_DAEMON | LOG_ERR, - "[%8llu, %d, %4u] send data to JSON sink failed: %s", - workflow->packets_captured, - reader_thread->array_index, - flow->flow_id, strerror(saved_errno)); - if (saved_errno == EPIPE) { - syslog(LOG_DAEMON | LOG_ERR, - "[%8llu, %d, %4u] Lost connection to JSON sink", - workflow->packets_captured, - reader_thread->array_index, - flow->flow_id); - } - reader_thread->json_sock_reconnect = 1; - } + send_to_json_sink(reader_thread, json_str, json_str_len); } ndpi_reset_serializer(&workflow->ndpi_serializer); } static void jsonize_format_error(struct nDPId_reader_thread * const reader_thread, uint32_t format_index) { - char * out; - uint32_t out_size = 0; + char * json_str; + uint32_t json_str_len = 0; ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer, "serializer-error", "format"); ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, "serializer-format-index", format_index); - out = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &out_size); - if (out != NULL && out_size > 0) { - printf("ERR: %s\n", out); + json_str = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &json_str_len); + if (json_str != NULL && json_str_len == 0) { + + syslog(LOG_DAEMON | LOG_ERR, + "[%8llu, %d] jsonize failed, buffer length: %u\n", + reader_thread->workflow->packets_captured, + reader_thread->array_index, + json_str_len); + } else { + + send_to_json_sink(reader_thread, json_str, json_str_len); } ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer); } @@ -692,6 +717,10 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread (void)reader_thread; va_start(ap, format); while (*format) { + if (got_jsonkey == 0) { + json_key[0] = '\0'; + } + switch (*format++) { case 's': { format_index++; @@ -725,12 +754,12 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread jsonize_format_error(reader_thread, format_index); return; } - if (*(format++) == 'd') { + if (*format == 'd') { long long int value = va_arg(ap, long long int); ndpi_serialize_string_int64(&reader_thread->workflow->ndpi_serializer, json_key, value); got_jsonkey = 0; - } else if (*(format++) == 'u') { + } else if (*format == 'u') { unsigned long long int value = va_arg(ap, unsigned long long int); ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer, json_key, value); @@ -739,6 +768,7 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread jsonize_format_error(reader_thread, format_index); return; } + format++; break; case 'u': format_index++; @@ -764,8 +794,9 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread return; } break; + case ' ': + case ',': case '%': - format_index++; break; default: jsonize_format_error(reader_thread, format_index); @@ -834,6 +865,9 @@ static void ndpi_process_packet(uint8_t * const args, break; case DLT_EN10MB: if (header->len < sizeof(struct ndpi_ethhdr)) { + jsonize_basic_event(reader_thread, "%s%lu %s%lu %s%d %s%s", "packet_id", workflow->packets_captured, + "thread_id", reader_thread->array_index, "msg_id", 0, + "msg", "Ethernet packet too short - skipping"); syslog(LOG_DAEMON | LOG_WARNING, "[%8llu, %d] Ethernet packet too short - skipping\n", workflow->packets_captured,