From 9ae4aa7ce95ef88eb0933a96608fe0c7726c0797 Mon Sep 17 00:00:00 2001 From: Toni Uhlig Date: Fri, 7 Aug 2020 15:21:24 +0200 Subject: [PATCH] flow-info.py overhaul, terminal color support, ndpi protocol/category/flow-risk output Signed-off-by: Toni Uhlig --- README.md | 8 ++-- examples/py-flow-info/flow-info.py | 64 ++++++++++++++++++++++++------ 2 files changed, 56 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 5fb7f414..5d5e1b04 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ This value serves as unique identifier for the processing thread. Multithreaded nDPId uses libnDPI's JSON serialization to produce meaningful JSON output which it then sends to the nDPIsrvd for distribution. High level applications can connect to nDPIsrvd to get the latest flow/packet events from nDPId. -TODO: Provide some sort of authentication for connecting distributor clients (somehow very critical). +TODO: Provide some sort of AEAD for connecting distributor clients via TCP (somehow very critical). # architecture @@ -24,7 +24,7 @@ _______________________ ________________ | | | nDPIsrvd | | | | nDPId --- Thread 1 >| ---> |> | <| <--- |< example/c-json-stdout | | `- Thread 2 >| ---> |> collector | distributor <| <--- |< example/py-flow-info | -| `- Thread N >| ---> |> | <| <--- | | +| `- Thread N >| ---> |> | <| <--- | ... | |_____________________| ^ |____________|______________| ^ |________________________| | | `- connect to UNIX socket `- connect to TCP socket @@ -64,13 +64,13 @@ make all examples # run -Daemon mode: +Daemons: ```shell ./nDPIsrvd -d ./nDPId -d ``` -And why not a simple flow-info example: +And why not a flow-info example? ```shell ./examples/py-flow-info/flow-info.py ``` diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index 41f695e0..3dd41c87 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -20,7 +20,12 @@ class nDPIsrvdSocket: self.digitlen = 0 def receive(self): - recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer)).decode(errors='strict') + try: + recvd_buf = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer)) + recvd = recvd_buf.decode(errors='strict') + except UnicodeDecodeError as exc: + raise RuntimeError('Unicode Exception: {}\n\nReceived String: {}'.format(str(exc), str(recvd_buf))) + if recvd == '': raise RuntimeError('socket connection broken') self.buffer += recvd @@ -38,15 +43,26 @@ class nDPIsrvdSocket: if len(self.buffer) >= self.msglen + self.digitlen: recvd = self.buffer[self.digitlen:self.msglen + self.digitlen] self.buffer = self.buffer[self.msglen + self.digitlen:] + retval += [(recvd,self.msglen,self.digitlen)] + self.msglen = 0 self.digitlen = 0 - retval += [recvd] return retval +class TermColor: + WARNING = '\033[93m' + FAIL = '\033[91m' + BOLD = '\033[1m' + END = '\033[0m' + BLINK = "\x1b[5m" + def parse_json_str(json_str): - j = json.loads(json_str) + try: + j = json.loads(json_str[0]) + except json.decoder.JSONDecodeError as exc: + raise RuntimeError('JSON Exception: {}\n\nJSON String: {}\n'.format(str(exc), str(json_str))) if 'flow_event_name' in j: event = j['flow_event_name'].lower() @@ -65,23 +81,48 @@ def parse_json_str(json_str): else: raise RuntimeError('unknown flow event name: {}'.format(event)) + ndpi_proto_categ = '' + ndpi_frisk = '' + + if 'ndpi' in j: + if 'proto' in j['ndpi']: + ndpi_proto_categ += '[' + str(j['ndpi']['proto']) + ']' + + if 'category' in j['ndpi']: + ndpi_proto_categ += '[' + str(j['ndpi']['category']) + ']' + + if 'flow_risk' in j['ndpi']: + cnt = 0 + for key in j['ndpi']['flow_risk']: + ndpi_frisk += str(j['ndpi']['flow_risk'][key]) + ', ' + cnt += 1 + ndpi_frisk = '{}: {}'.format( + TermColor.WARNING + TermColor.BOLD + 'RISK' + TermColor.END if cnt < 2 + else TermColor.FAIL + TermColor.BOLD + TermColor.BLINK + 'RISK' + TermColor.END, + ndpi_frisk[:-2]) + if j['l3_proto'] == 'ip4': - print('{:>14}: [{:>8}] [{}][{:>5}] [{:>15}][{:>5}] -> [{:>15}][{:>5}]'.format(event_str, + print('{:>14}: [{:.>8}] [{}][{:.>5}] [{:.>15}]{} -> [{:.>15}]{} {}'.format(event_str, j['flow_id'], j['l3_proto'], j['l4_proto'], j['src_ip'].lower(), - j['src_port'] if 'src_port' in j else '', + '[{:.>5}]'.format(j['src_port']) if 'src_port' in j else '', j['dst_ip'].lower(), - j['dst_port'] if 'dst_port' in j else '')) + '[{:.>5}]'.format(j['dst_port']) if 'dst_port' in j else '', + ndpi_proto_categ)) elif j['l3_proto'] == 'ip6': - print('{:>14}: [{:>8}] [{}][{:>5}] [{:>39}][{:>5}] -> [{:>39}][{:>5}]'.format(event_str, + print('{:>14}: [{:.>8}] [{}][{:.>5}] [{:.>39}]{} -> [{:.>39}]{} {}'.format(event_str, j['flow_id'], j['l3_proto'], j['l4_proto'], j['src_ip'].lower(), - j['src_port'] if 'src_port' in j else '', + '[{:.>5}]'.format(j['src_port']) if 'src_port' in j else '', j['dst_ip'].lower(), - j['dst_port'] if 'dst_port' in j else '')) + '[{:.>5}]'.format(j['dst_port']) if 'dst_port' in j else '', + ndpi_proto_categ)) else: raise RuntimeError('unsupported l3 protocol: {}'.format(j['l3_proto'])) + if len(ndpi_frisk) > 0: + print('{:>16}{}'.format('', ndpi_frisk)) + if __name__ == '__main__': host = HOST @@ -102,7 +143,6 @@ if __name__ == '__main__': while True: received = nsock.receive() - for line in received: - #print(line) - parse_json_str(line) + for received_json_pkt in received: + parse_json_str(received_json_pkt)