Aligned nDPIsrvd.py to nDPIsrvd.h for consistency.

* Simplified Python interface as well.
 * c-captured and flow-undetected-to-pcap.py produce similiar results
 * Removed overloaded nDPIsrvd.py event structures.
 * flow-info.py prints (with a color-hash) additional information e.g. alias/source and midstream

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2021-02-16 20:37:29 +01:00
parent 7218990e58
commit 893f437051
7 changed files with 296 additions and 299 deletions

View File

@@ -2,6 +2,7 @@
import argparse
import array
import base64
import json
import re
import os
@@ -25,19 +26,6 @@ NETWORK_BUFFER_MAX_SIZE = 12288 # Please keep this value in sync with the one in
PKT_TYPE_ETH_IP4 = 0x0800
PKT_TYPE_ETH_IP6 = 0x86DD
EVENT_UNKNOWN = 'Unknown'
# Event tuple: (pretty-name, real-name)
DAEMON_EVENTS = [ ('Invalid','invalid'), ('Init','init'), \
('Reconnect','reconnect'), ('Shutdown','shutdown') ]
BASIC_EVENTS = ['Invalid', 'Unknown-Datalink-Layer', 'Unknown-Layer3-Protocol', 'Non-IP-Packet',
'Ethernet-Packet-Too-Short', 'Ethernet-Packet-Unknown', 'IP4-Packet-Too-Short',
'IP4-Size-Smaller-Than-Header', 'IP4-Layer4-Payload-Detection-Failed', 'IP6-Packet-Too-Short',
'IP6-Size-Smaller-Than-Header', 'IP6-Layer4-Payload-Detection-Failed', 'TCP-Packet-Too-Short',
'UDP-Packet-Too-Short', 'Capture-Size-Smaller-Than-Packet-Size', 'Max-Flow-To-Track',
'Flow-Memory-Allocation-Failed']
PACKET_EVENTS = [ ('Invalid','invalid'), ('Packet','packet'), ('Packet-Flow','packet-flow') ]
FLOW_EVENTS = [ ('Invalid','invalid'), ('New','new'), ('End','end'), ('Idle','idle'), ('Guessed','guessed'), \
('Detected','detected'), ('Detection-Update','detection-update'), ('Not-Detected','not-detected') ]
class TermColor:
HINT = '\033[33m'
@@ -87,9 +75,42 @@ class TermColor:
else:
return '{}{}{}'.format(TermColor.BOLD, string, TermColor.END)
class Flow:
flow_id = -1
class FlowManager:
def __init__(self):
self.__flows = dict()
def __buildFlowKey(self, json_dict):
if 'flow_id' not in json_dict or \
'alias' not in json_dict or \
'source' not in json_dict:
return None
return str(json_dict['alias']) + str(json_dict['source']) + str(json_dict['flow_id'])
def getFlow(self, json_dict):
event = json_dict['flow_event_name'].lower() if 'flow_event_name' in json_dict else ''
flow_key = self.__buildFlowKey(json_dict)
flow = None
if flow_key is None:
return None
if flow_key not in self.__flows:
self.__flows[flow_key] = Flow()
self.__flows[flow_key].flow_id = int(json_dict['flow_id'])
flow = self.__flows[flow_key]
if event == 'end' or event == 'idle':
flow = self.__flows[flow_key]
del self.__flows[flow_key]
return flow
class nDPIsrvdSocket:
def __init__(self):
self.sock_family = None
self.flow_mgr = FlowManager()
def connect(self, addr):
if type(addr) is tuple:
@@ -104,18 +125,19 @@ class nDPIsrvdSocket:
self.buffer = bytes()
self.msglen = 0
self.digitlen = 0
self.lines = []
def receive(self):
if len(self.buffer) == NETWORK_BUFFER_MAX_SIZE:
raise RuntimeError('buffer capacity reached ({} bytes), check if it is in sync with nDPId\'s NETWORK_BUFFER_MAX_SIZE'.format(NETWORK_BUFFER_MAX_SIZE))
raise RuntimeError('Buffer capacity reached ({} bytes), check if it is in sync with nDPId\'s NETWORK_BUFFER_MAX_SIZE.'.format(NETWORK_BUFFER_MAX_SIZE))
recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer))
if len(recvd) == 0:
raise RuntimeError('socket connection broken')
raise RuntimeError('Socket connection broken.')
self.buffer += recvd
retval = []
new_data_avail = False
while self.msglen + self.digitlen < len(self.buffer):
if self.msglen == 0:
@@ -130,23 +152,80 @@ class nDPIsrvdSocket:
if len(self.buffer) >= self.msglen + self.digitlen:
recvd = self.buffer[self.digitlen:self.msglen + self.digitlen]
self.buffer = self.buffer[self.msglen + self.digitlen:]
retval += [(recvd,self.msglen,self.digitlen)]
self.lines += [(recvd,self.msglen,self.digitlen)]
new_data_avail = True
self.msglen = 0
self.digitlen = 0
return new_data_avail
def parse(self, callback, global_user_data):
retval = True
index = 0
for received_json_line in self.lines:
json_dict = json.loads(received_json_line[0].decode('ascii', errors='replace'), strict=True)
if callback(json_dict, self.flow_mgr.getFlow(json_dict), global_user_data) is not True:
retval = False
break
index += 1
self.lines = self.lines[index:]
return retval
class PcapPacket:
def __init__(self, flow_id=-1):
self.pktdump = None
self.was_dumped = False
self.was_detected = False
self.flow_id = flow_id
self.packets = []
def loop(self, callback, global_user_data):
while True:
if self.receive() > 0:
if self.parse(callback, global_user_data) is False:
sys.stderr.write('Callback returned False, abort.\n')
break;
def addPacket(self, pkt, pkt_type, pkt_ipoffset):
self.packets += [ ( pkt, pkt_type, pkt_ipoffset ) ]
class PcapPacket:
def __init__(self):
self.pktdump = None
self.flow_id = 0
self.packets = []
self.__suffix = ''
self.__dump = False
self.__dumped = False
@staticmethod
def isInitialized(current_flow):
return current_flow is not None and hasattr(current_flow, 'pcap_packet')
@staticmethod
def handleJSON(json_dict, current_flow):
if 'flow_event_name' in json_dict:
if json_dict['flow_event_name'] == 'new':
current_flow.pcap_packet = PcapPacket()
current_flow.pcap_packet.current_packet = 0
current_flow.pcap_packet.max_packets = json_dict['flow_max_packets']
current_flow.pcap_packet.flow_id = json_dict['flow_id']
elif PcapPacket.isInitialized(current_flow) is not True:
pass
elif json_dict['flow_event_name'] == 'end' or json_dict['flow_event_name'] == 'idle':
try:
current_flow.pcap_packet.fin()
except RuntimeError:
pass
elif PcapPacket.isInitialized(current_flow) is True and \
('packet_event_name' in json_dict and json_dict['packet_event_name'] == 'packet-flow' and current_flow.pcap_packet.flow_id > 0) or \
('packet_event_name' in json_dict and json_dict['packet_event_name'] == 'packet' and 'pkt' in json_dict):
buffer_decoded = base64.b64decode(json_dict['pkt'], validate=True)
current_flow.pcap_packet.packets += [ ( buffer_decoded, json_dict['pkt_type'], json_dict['pkt_l3_offset'] ) ]
current_flow.pcap_packet.current_packet += 1
if current_flow.pcap_packet.current_packet != int(json_dict['flow_packet_id']):
raise RuntimeError('Packet IDs not in sync (local: {}, remote: {}).'.format(current_flow.pcap_packet.current_packet, int(json_dict['flow_packet_id'])))
@staticmethod
def getIp(packet):
@@ -167,14 +246,17 @@ class PcapPacket:
else:
return None
def detected(self):
self.was_detected = True
def setSuffix(self, filename_suffix):
self.__suffix = filename_suffix
def fin(self, filename_suffix):
if self.was_dumped is True:
return 'Flow already dumped.'
if self.was_detected is True:
return 'Flow detected.'
def doDump(self):
self.__dump = True
def fin(self):
if self.__dumped is True:
raise RuntimeError('Flow {} already dumped.'.format(self.flow_id))
if self.__dump is False:
raise RuntimeError('Flow {} should not be dumped.'.format(self.flow_id))
emptyTCPorUDPcount = 0;
for packet in self.packets:
@@ -182,108 +264,27 @@ class PcapPacket:
if p is not None:
if p.haslayer(scapy.all.Padding) and len(p.payload) - len(p[scapy.all.Padding]) == 0:
emptyTCPorUDPcount += 1
if len(p.payload) == 0:
elif len(p.payload) == 0:
emptyTCPorUDPcount += 1
if emptyTCPorUDPcount == len(self.packets):
return 'Flow does not contain any packets with non-empty layer4 payload.'
raise RuntimeError('Flow {} does not contain any packets({}) with non-empty layer4 payload.'.format(self.flow_id, len(self.packets)))
if self.pktdump is None:
if self.flow_id == -1:
self.pktdump = scapy.all.PcapWriter('packet-{}.pcap'.format(filename_suffix),
if self.flow_id == 0:
self.pktdump = scapy.all.PcapWriter('packet-{}.pcap'.format(self.__suffix),
append=True, sync=True)
else:
self.pktdump = scapy.all.PcapWriter('flow-{}-{}.pcap'.format(filename_suffix, self.flow_id),
self.pktdump = scapy.all.PcapWriter('flow-{}-{}.pcap'.format(self.__suffix, self.flow_id),
append=False, sync=True)
for packet in self.packets:
self.pktdump.write(PcapPacket.getIp(packet))
self.pktdump.close()
self.was_dumped = True
self.__dumped = True
return 'Success.'
def JsonParseBytes(json_bytes):
return json.loads(json_bytes.decode('ascii', errors='replace'), strict=True)
class nDPIdEvent:
isValid = False
DaemonEventID = -1
DaemonEventName = None
DaemonEventPrettyName = EVENT_UNKNOWN
BasicEventID = -1
BasicEventName = None
BasicEventPrettyName = EVENT_UNKNOWN
PacketEventID = -1
PacketEventName = None
PacketEventPrettyName = EVENT_UNKNOWN
FlowEventID = -1
FlowEventName = None
FlowEventPrettyName = EVENT_UNKNOWN
def validateEvent(self, event_id, event_name, list_of_event_tuples):
if self.isValid is True:
raise RuntimeError('nDPId event already validated. Multiple Events in one JSON strings are not allowed.\n' \
'[EVENTS]\n'
'current: {}\n' \
'daemon.: {}\n' \
'basic..: {}\n' \
'packet.: {}\n' \
'flow...: {}\n'.format(event_name,
self.DaemonEventName, self.BasicEventName, \
self.PacketEventName, self.FlowEventName))
if type(event_id) is not int:
raise RuntimeError('Argument is not an Integer/EventID!')
if event_id < 0 or event_id >= len(list_of_event_tuples):
raise RuntimeError('Unknown event id: {} aka {}.'.format(event_id, event_name))
if type(list_of_event_tuples[0]) == tuple and list_of_event_tuples[event_id][1] != event_name:
raise RuntimeError('Unknown event name: {}.'.format(event_name))
self.isValid = True
return list_of_event_tuples[event_id][0] if type(list_of_event_tuples[0]) == tuple \
else list_of_event_tuples[event_id]
def validateFlowEvent(self):
return self.validateEvent(self.FlowEventID, self.FlowEventName, FLOW_EVENTS)
def validatePacketEvent(self):
return self.validateEvent(self.PacketEventID, self.PacketEventName, PACKET_EVENTS)
def validateBasicEvent(self):
return self.validateEvent(self.BasicEventID, self.BasicEventName, BASIC_EVENTS)
def validateDaemonEvent(self):
return self.validateEvent(self.DaemonEventID, self.DaemonEventName, DAEMON_EVENTS)
@staticmethod
def validateJsonEventTypes(json_dict):
if type(json_dict) is not dict:
raise RuntimeError('Argument is not a dictionary!')
nev = nDPIdEvent()
if 'daemon_event_id' in json_dict:
nev.DaemonEventID = json_dict['daemon_event_id']
nev.DaemonEventName = json_dict['daemon_event_name']
nev.DaemonEventPrettyName = nev.validateDaemonEvent()
if 'basic_event_id' in json_dict:
nev.BasicEventID = json_dict['basic_event_id']
nev.BasicEventName = json_dict['basic_event_name']
nev.BasicEventPrettyName = nev.validateBasicEvent()
if 'packet_event_id' in json_dict:
nev.PacketEventID = json_dict['packet_event_id']
nev.PacketEventName = json_dict['packet_event_name']
nev.PacketEventPrettyName = nev.validatePacketEvent()
if 'flow_event_id' in json_dict:
nev.FlowEventID = json_dict['flow_event_id']
nev.FlowEventName = json_dict['flow_event_name']
nev.FlowEventPrettyName = nev.validateFlowEvent()
return nev
return True
def defaultArgumentParser():
parser = argparse.ArgumentParser(description='nDPIsrvd options', formatter_class=argparse.ArgumentDefaultsHelpFormatter)