mirror of
https://github.com/optim-enterprises-bv/nDPId.git
synced 2025-11-02 19:27:48 +00:00
send json string to sink, added basic json event serialization fn call
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import json
|
||||||
import sys
|
import sys
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
@@ -12,7 +13,17 @@ class EchoServer(asyncio.Protocol):
|
|||||||
|
|
||||||
def data_received(self, data):
|
def data_received(self, data):
|
||||||
message = data.decode()
|
message = data.decode()
|
||||||
print('{!r}'.format(message))
|
out = str()
|
||||||
|
for line in message.split('\n'):
|
||||||
|
if len(line) == 0:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
json_object = json.loads(line)
|
||||||
|
line = json.dumps(json_object, indent=2)
|
||||||
|
except json.decoder.JSONDecodeError as err:
|
||||||
|
sys.stderr.write('{}\n ERROR: {} -> {!r}\n{}\n'.format('-'*64, str(err), str(line), '-'*64))
|
||||||
|
return
|
||||||
|
print('{}'.format(line))
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
coro = loop.create_unix_server(EchoServer, JSON_SOCKPATH)
|
coro = loop.create_unix_server(EchoServer, JSON_SOCKPATH)
|
||||||
|
|||||||
110
nDPId.c
110
nDPId.c
@@ -594,6 +594,51 @@ static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thre
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
|
||||||
|
char const * const json_str, size_t json_str_len)
|
||||||
|
{
|
||||||
|
struct nDPId_workflow * const workflow = reader_thread->workflow;
|
||||||
|
int saved_errno;
|
||||||
|
int s_ret;
|
||||||
|
char newline_json_str[BUFSIZ];
|
||||||
|
|
||||||
|
s_ret = snprintf(newline_json_str, sizeof(newline_json_str), "%.*s\n", (int)json_str_len, json_str);
|
||||||
|
if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str)) {
|
||||||
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
|
"[%8llu, %d] JSON buffer prepare failed",
|
||||||
|
workflow->packets_captured,
|
||||||
|
reader_thread->array_index);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (reader_thread->json_sock_reconnect != 0) {
|
||||||
|
if (connect_to_json_socket(reader_thread) == 0) {
|
||||||
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
|
"[%8llu, %d] Reconnected to JSON sink",
|
||||||
|
workflow->packets_captured,
|
||||||
|
reader_thread->array_index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (reader_thread->json_sock_reconnect == 0 &&
|
||||||
|
send(reader_thread->json_sockfd, newline_json_str, s_ret, MSG_NOSIGNAL) < 0)
|
||||||
|
{
|
||||||
|
saved_errno = errno;
|
||||||
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
|
"[%8llu, %d] send data to JSON sink failed: %s",
|
||||||
|
workflow->packets_captured,
|
||||||
|
reader_thread->array_index,
|
||||||
|
strerror(saved_errno));
|
||||||
|
if (saved_errno == EPIPE) {
|
||||||
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
|
"[%8llu, %d] Lost connection to JSON sink",
|
||||||
|
workflow->packets_captured,
|
||||||
|
reader_thread->array_index);
|
||||||
|
}
|
||||||
|
reader_thread->json_sock_reconnect = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
|
static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
|
||||||
struct nDPId_flow_info const * const flow,
|
struct nDPId_flow_info const * const flow,
|
||||||
enum flow_event event)
|
enum flow_event event)
|
||||||
@@ -601,7 +646,6 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
|
|||||||
char * json_str;
|
char * json_str;
|
||||||
uint32_t json_str_len = 0;
|
uint32_t json_str_len = 0;
|
||||||
struct nDPId_workflow * const workflow = reader_thread->workflow;
|
struct nDPId_workflow * const workflow = reader_thread->workflow;
|
||||||
int saved_errno;
|
|
||||||
|
|
||||||
switch (event) {
|
switch (event) {
|
||||||
case FLOW_NEW:
|
case FLOW_NEW:
|
||||||
@@ -625,7 +669,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
|
|||||||
}
|
}
|
||||||
json_str = jsonize_flow(workflow, flow, &json_str_len);
|
json_str = jsonize_flow(workflow, flow, &json_str_len);
|
||||||
|
|
||||||
if (json_str == NULL) {
|
if (json_str == NULL || json_str_len == 0) {
|
||||||
syslog(LOG_DAEMON | LOG_ERR,
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
"[%8llu, %d, %4u] jsonize failed, buffer length: %u\n",
|
"[%8llu, %d, %4u] jsonize failed, buffer length: %u\n",
|
||||||
workflow->packets_captured,
|
workflow->packets_captured,
|
||||||
@@ -633,50 +677,31 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
|
|||||||
flow->flow_id,
|
flow->flow_id,
|
||||||
json_str_len);
|
json_str_len);
|
||||||
} else {
|
} else {
|
||||||
if (reader_thread->json_sock_reconnect != 0) {
|
send_to_json_sink(reader_thread, json_str, json_str_len);
|
||||||
if (connect_to_json_socket(reader_thread) == 0) {
|
|
||||||
syslog(LOG_DAEMON | LOG_ERR,
|
|
||||||
"[%8llu, %d, %4u] Reconnected to JSON sink",
|
|
||||||
workflow->packets_captured,
|
|
||||||
reader_thread->array_index,
|
|
||||||
flow->flow_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reader_thread->json_sock_reconnect == 0 &&
|
|
||||||
send(reader_thread->json_sockfd, json_str, json_str_len, MSG_NOSIGNAL) < 0)
|
|
||||||
{
|
|
||||||
saved_errno = errno;
|
|
||||||
syslog(LOG_DAEMON | LOG_ERR,
|
|
||||||
"[%8llu, %d, %4u] send data to JSON sink failed: %s",
|
|
||||||
workflow->packets_captured,
|
|
||||||
reader_thread->array_index,
|
|
||||||
flow->flow_id, strerror(saved_errno));
|
|
||||||
if (saved_errno == EPIPE) {
|
|
||||||
syslog(LOG_DAEMON | LOG_ERR,
|
|
||||||
"[%8llu, %d, %4u] Lost connection to JSON sink",
|
|
||||||
workflow->packets_captured,
|
|
||||||
reader_thread->array_index,
|
|
||||||
flow->flow_id);
|
|
||||||
}
|
|
||||||
reader_thread->json_sock_reconnect = 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ndpi_reset_serializer(&workflow->ndpi_serializer);
|
ndpi_reset_serializer(&workflow->ndpi_serializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void jsonize_format_error(struct nDPId_reader_thread * const reader_thread, uint32_t format_index)
|
static void jsonize_format_error(struct nDPId_reader_thread * const reader_thread, uint32_t format_index)
|
||||||
{
|
{
|
||||||
char * out;
|
char * json_str;
|
||||||
uint32_t out_size = 0;
|
uint32_t json_str_len = 0;
|
||||||
|
|
||||||
ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer,
|
ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer,
|
||||||
"serializer-error", "format");
|
"serializer-error", "format");
|
||||||
ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer,
|
ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer,
|
||||||
"serializer-format-index", format_index);
|
"serializer-format-index", format_index);
|
||||||
out = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &out_size);
|
json_str = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &json_str_len);
|
||||||
if (out != NULL && out_size > 0) {
|
if (json_str != NULL && json_str_len == 0) {
|
||||||
printf("ERR: %s\n", out);
|
|
||||||
|
syslog(LOG_DAEMON | LOG_ERR,
|
||||||
|
"[%8llu, %d] jsonize failed, buffer length: %u\n",
|
||||||
|
reader_thread->workflow->packets_captured,
|
||||||
|
reader_thread->array_index,
|
||||||
|
json_str_len);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
send_to_json_sink(reader_thread, json_str, json_str_len);
|
||||||
}
|
}
|
||||||
ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer);
|
ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer);
|
||||||
}
|
}
|
||||||
@@ -692,6 +717,10 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread
|
|||||||
(void)reader_thread;
|
(void)reader_thread;
|
||||||
va_start(ap, format);
|
va_start(ap, format);
|
||||||
while (*format) {
|
while (*format) {
|
||||||
|
if (got_jsonkey == 0) {
|
||||||
|
json_key[0] = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
switch (*format++) {
|
switch (*format++) {
|
||||||
case 's': {
|
case 's': {
|
||||||
format_index++;
|
format_index++;
|
||||||
@@ -725,12 +754,12 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread
|
|||||||
jsonize_format_error(reader_thread, format_index);
|
jsonize_format_error(reader_thread, format_index);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (*(format++) == 'd') {
|
if (*format == 'd') {
|
||||||
long long int value = va_arg(ap, long long int);
|
long long int value = va_arg(ap, long long int);
|
||||||
ndpi_serialize_string_int64(&reader_thread->workflow->ndpi_serializer,
|
ndpi_serialize_string_int64(&reader_thread->workflow->ndpi_serializer,
|
||||||
json_key, value);
|
json_key, value);
|
||||||
got_jsonkey = 0;
|
got_jsonkey = 0;
|
||||||
} else if (*(format++) == 'u') {
|
} else if (*format == 'u') {
|
||||||
unsigned long long int value = va_arg(ap, unsigned long long int);
|
unsigned long long int value = va_arg(ap, unsigned long long int);
|
||||||
ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer,
|
ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer,
|
||||||
json_key, value);
|
json_key, value);
|
||||||
@@ -739,6 +768,7 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread
|
|||||||
jsonize_format_error(reader_thread, format_index);
|
jsonize_format_error(reader_thread, format_index);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
format++;
|
||||||
break;
|
break;
|
||||||
case 'u':
|
case 'u':
|
||||||
format_index++;
|
format_index++;
|
||||||
@@ -764,8 +794,9 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case ' ':
|
||||||
|
case ',':
|
||||||
case '%':
|
case '%':
|
||||||
format_index++;
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
jsonize_format_error(reader_thread, format_index);
|
jsonize_format_error(reader_thread, format_index);
|
||||||
@@ -834,6 +865,9 @@ static void ndpi_process_packet(uint8_t * const args,
|
|||||||
break;
|
break;
|
||||||
case DLT_EN10MB:
|
case DLT_EN10MB:
|
||||||
if (header->len < sizeof(struct ndpi_ethhdr)) {
|
if (header->len < sizeof(struct ndpi_ethhdr)) {
|
||||||
|
jsonize_basic_event(reader_thread, "%s%lu %s%lu %s%d %s%s", "packet_id", workflow->packets_captured,
|
||||||
|
"thread_id", reader_thread->array_index, "msg_id", 0,
|
||||||
|
"msg", "Ethernet packet too short - skipping");
|
||||||
syslog(LOG_DAEMON | LOG_WARNING,
|
syslog(LOG_DAEMON | LOG_WARNING,
|
||||||
"[%8llu, %d] Ethernet packet too short - skipping\n",
|
"[%8llu, %d] Ethernet packet too short - skipping\n",
|
||||||
workflow->packets_captured,
|
workflow->packets_captured,
|
||||||
|
|||||||
Reference in New Issue
Block a user