flow-info.py overhaul, terminal color support, ndpi protocol/category/flow-risk output

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2020-08-07 15:21:24 +02:00
parent 79e4fc9bfe
commit 9ae4aa7ce9
2 changed files with 56 additions and 16 deletions

View File

@@ -11,7 +11,7 @@ This value serves as unique identifier for the processing thread. Multithreaded
nDPId uses libnDPI's JSON serialization to produce meaningful JSON output which it then sends to the nDPIsrvd for distribution.
High level applications can connect to nDPIsrvd to get the latest flow/packet events from nDPId.
TODO: Provide some sort of authentication for connecting distributor clients (somehow very critical).
TODO: Provide some sort of AEAD for connecting distributor clients via TCP (somehow very critical).
# architecture
@@ -24,7 +24,7 @@ _______________________ ________________
| | | nDPIsrvd | | |
| nDPId --- Thread 1 >| ---> |> | <| <--- |< example/c-json-stdout |
| `- Thread 2 >| ---> |> collector | distributor <| <--- |< example/py-flow-info |
| `- Thread N >| ---> |> | <| <--- | |
| `- Thread N >| ---> |> | <| <--- | ... |
|_____________________| ^ |____________|______________| ^ |________________________|
| |
`- connect to UNIX socket `- connect to TCP socket
@@ -64,13 +64,13 @@ make all examples
# run
Daemon mode:
Daemons:
```shell
./nDPIsrvd -d
./nDPId -d
```
And why not a simple flow-info example:
And why not a flow-info example?
```shell
./examples/py-flow-info/flow-info.py
```

View File

@@ -20,7 +20,12 @@ class nDPIsrvdSocket:
self.digitlen = 0
def receive(self):
recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer)).decode(errors='strict')
try:
recvd_buf = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer))
recvd = recvd_buf.decode(errors='strict')
except UnicodeDecodeError as exc:
raise RuntimeError('Unicode Exception: {}\n\nReceived String: {}'.format(str(exc), str(recvd_buf)))
if recvd == '':
raise RuntimeError('socket connection broken')
self.buffer += recvd
@@ -38,15 +43,26 @@ class nDPIsrvdSocket:
if len(self.buffer) >= self.msglen + self.digitlen:
recvd = self.buffer[self.digitlen:self.msglen + self.digitlen]
self.buffer = self.buffer[self.msglen + self.digitlen:]
retval += [(recvd,self.msglen,self.digitlen)]
self.msglen = 0
self.digitlen = 0
retval += [recvd]
return retval
class TermColor:
WARNING = '\033[93m'
FAIL = '\033[91m'
BOLD = '\033[1m'
END = '\033[0m'
BLINK = "\x1b[5m"
def parse_json_str(json_str):
j = json.loads(json_str)
try:
j = json.loads(json_str[0])
except json.decoder.JSONDecodeError as exc:
raise RuntimeError('JSON Exception: {}\n\nJSON String: {}\n'.format(str(exc), str(json_str)))
if 'flow_event_name' in j:
event = j['flow_event_name'].lower()
@@ -65,23 +81,48 @@ def parse_json_str(json_str):
else:
raise RuntimeError('unknown flow event name: {}'.format(event))
ndpi_proto_categ = ''
ndpi_frisk = ''
if 'ndpi' in j:
if 'proto' in j['ndpi']:
ndpi_proto_categ += '[' + str(j['ndpi']['proto']) + ']'
if 'category' in j['ndpi']:
ndpi_proto_categ += '[' + str(j['ndpi']['category']) + ']'
if 'flow_risk' in j['ndpi']:
cnt = 0
for key in j['ndpi']['flow_risk']:
ndpi_frisk += str(j['ndpi']['flow_risk'][key]) + ', '
cnt += 1
ndpi_frisk = '{}: {}'.format(
TermColor.WARNING + TermColor.BOLD + 'RISK' + TermColor.END if cnt < 2
else TermColor.FAIL + TermColor.BOLD + TermColor.BLINK + 'RISK' + TermColor.END,
ndpi_frisk[:-2])
if j['l3_proto'] == 'ip4':
print('{:>14}: [{:>8}] [{}][{:>5}] [{:>15}][{:>5}] -> [{:>15}][{:>5}]'.format(event_str,
print('{:>14}: [{:.>8}] [{}][{:.>5}] [{:.>15}]{} -> [{:.>15}]{} {}'.format(event_str,
j['flow_id'], j['l3_proto'], j['l4_proto'],
j['src_ip'].lower(),
j['src_port'] if 'src_port' in j else '',
'[{:.>5}]'.format(j['src_port']) if 'src_port' in j else '',
j['dst_ip'].lower(),
j['dst_port'] if 'dst_port' in j else ''))
'[{:.>5}]'.format(j['dst_port']) if 'dst_port' in j else '',
ndpi_proto_categ))
elif j['l3_proto'] == 'ip6':
print('{:>14}: [{:>8}] [{}][{:>5}] [{:>39}][{:>5}] -> [{:>39}][{:>5}]'.format(event_str,
print('{:>14}: [{:.>8}] [{}][{:.>5}] [{:.>39}]{} -> [{:.>39}]{} {}'.format(event_str,
j['flow_id'], j['l3_proto'], j['l4_proto'],
j['src_ip'].lower(),
j['src_port'] if 'src_port' in j else '',
'[{:.>5}]'.format(j['src_port']) if 'src_port' in j else '',
j['dst_ip'].lower(),
j['dst_port'] if 'dst_port' in j else ''))
'[{:.>5}]'.format(j['dst_port']) if 'dst_port' in j else '',
ndpi_proto_categ))
else:
raise RuntimeError('unsupported l3 protocol: {}'.format(j['l3_proto']))
if len(ndpi_frisk) > 0:
print('{:>16}{}'.format('', ndpi_frisk))
if __name__ == '__main__':
host = HOST
@@ -102,7 +143,6 @@ if __name__ == '__main__':
while True:
received = nsock.receive()
for line in received:
#print(line)
parse_json_str(line)
for received_json_pkt in received:
parse_json_str(received_json_pkt)