Major nDPId extension. Sorry for the huge commit.

- nDPId: fixed invalid IP4/IP6 tuple compare
 - nDPIsrvd: fixed caching issue (finally)
 - added tiny c example (can be used to check flow manager sanity)
 - c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow`
 - README.md update: added example JSON sequence
 - nDPId: added new flow event `update` necessary for correct
   timeout handling (and other future use-cases)
 - nDPIsrvd.h and nDPIsrvd.py: switched to an instance
   (consists of an alias/source tuple) based flow manager
 - every flow related event **must** now serialize `alias`, `source`,
   `flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout
   handling and verification process work correctly
 - nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation
 - nDPIsrvd.py: removed PcapPacket class (unused)
 - py-flow-dashboard and py-flow-multiprocess: fixed race condition
 - py-flow-info: print statusbar with probably useful information
 - nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`)
   to a generic flow event timestamp `ts_msec`
 - nDPId-test: added additional checks
 - nDPId: increased ICMP flow timeout
 - nDPId: using event based i/o if capturing packets from a device
 - nDPIsrvd: fixed memory leak on shutdown if remote descriptors
   were still connected

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2021-12-15 23:25:32 +01:00
parent a35fc1d5ea
commit 9e07a57566
254 changed files with 53120 additions and 67935 deletions

View File

@@ -2,7 +2,6 @@
import argparse
import array
import base64
import json
import re
import os
@@ -17,17 +16,12 @@ except ImportError:
sys.stderr.write('Python module colorama not found, using fallback.\n')
USE_COLORAMA=False
try:
import scapy.all
except ImportError:
sys.stderr.write('Python module scapy not found, PCAP generation will fail!\n')
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 7000
DEFAULT_UNIX = '/tmp/ndpid-distributor.sock'
NETWORK_BUFFER_MIN_SIZE = 6 # NETWORK_BUFFER_LENGTH_DIGITS + 1
NETWORK_BUFFER_MAX_SIZE = 12288 # Please keep this value in sync with the one in config.h
NETWORK_BUFFER_MAX_SIZE = 13312 # Please keep this value in sync with the one in config.h
PKT_TYPE_ETH_IP4 = 0x0800
PKT_TYPE_ETH_IP6 = 0x86DD
@@ -81,37 +75,163 @@ class TermColor:
else:
return '{}{}{}'.format(TermColor.BOLD, string, TermColor.END)
class Instance:
alias = ''
source = ''
most_recent_flow_time = 0
flows = dict()
def __init__(self, alias, source):
self.alias = str(alias)
self.source = str(source)
def __str__(self):
return '<%s.%s object at %s with alias %s, source %s>' % (
self.__class__.__module__,
self.__class__.__name__,
hex(id(self)),
self.alias,
self.source
)
class Flow:
flow_id = -1
flow_last_seen = -1
flow_idle_time = -1
cleanup_reason = -1
def __init__(self, flow_id):
self.flow_id = flow_id
def __str__(self):
return '<%s.%s object at %s with flow id %d>' % (
self.__class__.__module__,
self.__class__.__name__,
hex(id(self)),
self.flow_id
)
class FlowManager:
def __init__(self):
self.__flows = dict()
CLEANUP_REASON_INVALID = 0
CLEANUP_REASON_DAEMON_INIT = 1 # can happen if kill -SIGKILL $(pidof nDPId) or restart after SIGSEGV
CLEANUP_REASON_DAEMON_SHUTDOWN = 2 # graceful shutdown e.g. kill -SIGTERM $(pidof nDPId)
CLEANUP_REASON_FLOW_END = 3
CLEANUP_REASON_FLOW_IDLE = 4
CLEANUP_REASON_FLOW_TIMEOUT = 5 # nDPId died a long time ago w/o restart?
CLEANUP_REASON_APP_SHUTDOWN = 6 # your python app called FlowManager.doShutdown()
def __buildFlowKey(self, json_dict):
if 'flow_id' not in json_dict or \
'alias' not in json_dict or \
def __init__(self):
self.instances = dict()
def getInstance(self, json_dict):
if 'alias' not in json_dict or \
'source' not in json_dict:
return None
return str(json_dict['alias']) + str(json_dict['source']) + str(json_dict['flow_id'])
alias = json_dict['alias']
source = json_dict['source']
def getFlow(self, json_dict):
event = json_dict['flow_event_name'].lower() if 'flow_event_name' in json_dict else ''
flow_key = self.__buildFlowKey(json_dict)
flow = None
if alias not in self.instances:
self.instances[alias] = dict()
if source not in self.instances[alias]:
self.instances[alias][source] = dict()
self.instances[alias][source] = Instance(alias, source)
if flow_key is None:
if 'ts_msec' in json_dict:
self.instances[alias][source].most_recent_flow_time = \
max(self.instances[alias][source].most_recent_flow_time, \
json_dict['ts_msec'])
return self.instances[alias][source]
def getFlow(self, instance, json_dict):
if 'flow_id' not in json_dict:
return None
if flow_key not in self.__flows:
self.__flows[flow_key] = Flow()
self.__flows[flow_key].flow_id = int(json_dict['flow_id'])
flow = self.__flows[flow_key]
if event == 'end' or event == 'idle':
flow = self.__flows[flow_key]
del self.__flows[flow_key]
return flow
flow_id = int(json_dict['flow_id'])
if flow_id in instance.flows:
instance.flows[flow_id].flow_last_seen = int(json_dict['flow_last_seen'])
instance.flows[flow_id].flow_idle_time = int(json_dict['flow_idle_time'])
return instance.flows[flow_id]
instance.flows[flow_id] = Flow(flow_id)
instance.flows[flow_id].flow_last_seen = int(json_dict['flow_last_seen'])
instance.flows[flow_id].flow_idle_time = int(json_dict['flow_idle_time'])
instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_INVALID
return instance.flows[flow_id]
def getFlowsToCleanup(self, instance, json_dict):
flows = dict()
if 'daemon_event_name' in json_dict:
if json_dict['daemon_event_name'].lower() == 'init' or \
json_dict['daemon_event_name'].lower() == 'shutdown':
# invalidate all existing flows with that alias/source
for flow_id in instance.flows:
flow = instance.flows.pop(flow_id)
if json_dict['daemon_event_name'].lower() == 'init':
flow.cleanup_reason = FlowManager.CLEANUP_REASON_DAEMON_INIT
else:
flow.cleanup_reason = FlowManager.CLEANUP_REASON_DAEMON_SHUTDOWN
flows[flow_id] = flow
del self.instances[instance.alias][instance.source]
elif 'flow_event_name' in json_dict and \
(json_dict['flow_event_name'].lower() == 'end' or \
json_dict['flow_event_name'].lower() == 'idle' or \
json_dict['flow_event_name'].lower() == 'guessed' or \
json_dict['flow_event_name'].lower() == 'not-detected' or \
json_dict['flow_event_name'].lower() == 'detected'):
flow_id = json_dict['flow_id']
if json_dict['flow_event_name'].lower() == 'end':
instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_FLOW_END
elif json_dict['flow_event_name'].lower() == 'idle':
instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_FLOW_IDLE
# TODO: Flow Guessing/Detection can happen right before an idle event.
# We need to prevent that it results in a CLEANUP_REASON_FLOW_TIMEOUT.
# This may cause inconsistency and needs to be handled in another way.
if json_dict['flow_event_name'].lower() != 'guessed' and \
json_dict['flow_event_name'].lower() != 'not-detected' and \
json_dict['flow_event_name'].lower() != 'detected':
flows[flow_id] = instance.flows.pop(flow_id)
elif 'flow_last_seen' in json_dict:
if int(json_dict['flow_last_seen']) + int(json_dict['flow_idle_time']) < \
instance.most_recent_flow_time:
flow_id = json_dict['flow_id']
instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_FLOW_TIMEOUT
flows[flow_id] = instance.flows.pop(flow_id)
return flows
def doShutdown(self):
flows = dict()
for alias in self.instances:
for source in self.instances[alias]:
for flow_id in self.instances[alias][source].flows:
flow = self.instances[alias][source].flows[flow_id]
flow.cleanup_reason = FlowManager.CLEANUP_REASON_APP_SHUTDOWN
flows[flow_id] = flow
del self.instances
return flows
def verifyFlows(self):
invalid_flows = list()
for alias in self.instances:
for source in self.instances[alias]:
for flow_id in self.instances[alias][source].flows:
if self.instances[alias][source].flows[flow_id].flow_last_seen + \
self.instances[alias][source].flows[flow_id].flow_idle_time < \
self.instances[alias][source].most_recent_flow_time:
invalid_flows += [flow_id]
return invalid_flows
class nDPIsrvdException(Exception):
UNSUPPORTED_ADDRESS_TYPE = 1
@@ -163,6 +283,7 @@ class nDPIsrvdSocket:
def __init__(self):
self.sock_family = None
self.flow_mgr = FlowManager()
self.received_bytes = 0
def connect(self, addr):
if type(addr) is tuple:
@@ -212,6 +333,7 @@ class nDPIsrvdSocket:
self.lines += [(recvd,self.msglen,self.digitlen)]
new_data_avail = True
self.received_bytes += self.msglen + self.digitlen
self.msglen = 0
self.digitlen = 0
@@ -220,21 +342,31 @@ class nDPIsrvdSocket:
return new_data_avail
def parse(self, callback, global_user_data):
def parse(self, callback_json, callback_flow_cleanup, global_user_data):
retval = True
index = 0
for received_json_line in self.lines:
json_dict = json.loads(received_json_line[0].decode('ascii', errors='replace'), strict=True)
if callback(json_dict, self.flow_mgr.getFlow(json_dict), global_user_data) is not True:
for received_line in self.lines:
json_dict = json.loads(received_line[0].decode('ascii', errors='replace'), strict=True)
instance = self.flow_mgr.getInstance(json_dict)
if instance is None:
retval = False
break
continue
if callback_json(json_dict, instance, self.flow_mgr.getFlow(instance, json_dict), global_user_data) is not True:
retval = False
for _, flow in self.flow_mgr.getFlowsToCleanup(instance, json_dict).items():
if callback_flow_cleanup is None:
pass
elif callback_flow_cleanup(instance, flow, global_user_data) is not True:
retval = False
index += 1
self.lines = self.lines[index:]
return retval
def loop(self, callback, global_user_data):
def loop(self, callback_json, callback_flow_cleanup, global_user_data):
throw_ex = None
while True:
@@ -244,116 +376,17 @@ class nDPIsrvdSocket:
except Exception as err:
throw_ex = err
if self.parse(callback, global_user_data) is False:
if self.parse(callback_json, callback_flow_cleanup, global_user_data) is False:
raise CallbackReturnedFalse()
if throw_ex is not None:
raise throw_ex
class PcapPacket:
def __init__(self):
self.pktdump = None
self.flow_id = 0
self.packets = []
self.__suffix = ''
self.__dump = False
self.__dumped = False
def shutdown(self):
return self.flow_mgr.doShutdown().items()
@staticmethod
def isInitialized(current_flow):
return current_flow is not None and hasattr(current_flow, 'pcap_packet')
@staticmethod
def handleJSON(json_dict, current_flow):
if 'flow_event_name' in json_dict:
if json_dict['flow_event_name'] == 'new':
current_flow.pcap_packet = PcapPacket()
current_flow.pcap_packet.current_packet = 0
current_flow.pcap_packet.max_packets = json_dict['flow_max_packets']
current_flow.pcap_packet.flow_id = json_dict['flow_id']
elif PcapPacket.isInitialized(current_flow) is not True:
pass
elif json_dict['flow_event_name'] == 'end' or json_dict['flow_event_name'] == 'idle':
try:
current_flow.pcap_packet.fin()
except RuntimeError:
pass
elif PcapPacket.isInitialized(current_flow) is True and \
('packet_event_name' in json_dict and json_dict['packet_event_name'] == 'packet-flow' and current_flow.pcap_packet.flow_id > 0) or \
('packet_event_name' in json_dict and json_dict['packet_event_name'] == 'packet' and 'pkt' in json_dict):
buffer_decoded = base64.b64decode(json_dict['pkt'], validate=True)
current_flow.pcap_packet.packets += [ ( buffer_decoded, json_dict['pkt_type'], json_dict['pkt_l3_offset'] ) ]
current_flow.pcap_packet.current_packet += 1
if current_flow.pcap_packet.current_packet != int(json_dict['flow_packet_id']):
raise RuntimeError('Packet IDs not in sync (local: {}, remote: {}).'.format(current_flow.pcap_packet.current_packet, int(json_dict['flow_packet_id'])))
@staticmethod
def getIp(packet):
if packet[1] == PKT_TYPE_ETH_IP4:
return scapy.all.IP(packet[0][packet[2]:])
elif packet[1] == PKT_TYPE_ETH_IP6:
return scapy.all.IPv6(packet[0][packet[2]:])
else:
raise RuntimeError('packet type unknown: {}'.format(packet[1]))
@staticmethod
def getTCPorUDP(packet):
p = PcapPacket.getIp(packet)
if p.haslayer(scapy.all.TCP):
return p.getlayer(scapy.all.TCP)
elif p.haslayer(scapy.all.UDP):
return p.getlayer(scapy.all.UDP)
else:
return None
def setSuffix(self, filename_suffix):
self.__suffix = filename_suffix
def doDump(self):
self.__dump = True
def fin(self):
if self.__dumped is True:
raise RuntimeError('Flow {} already dumped.'.format(self.flow_id))
if self.__dump is False:
raise RuntimeError('Flow {} should not be dumped.'.format(self.flow_id))
emptyTCPorUDPcount = 0;
for packet in self.packets:
p = PcapPacket.getTCPorUDP(packet)
if p is not None:
if p.haslayer(scapy.all.Padding) and len(p.payload) - len(p[scapy.all.Padding]) == 0:
emptyTCPorUDPcount += 1
elif len(p.payload) == 0:
emptyTCPorUDPcount += 1
if emptyTCPorUDPcount == len(self.packets):
raise RuntimeError('Flow {} does not contain any packets({}) with non-empty layer4 payload.'.format(self.flow_id, len(self.packets)))
if self.pktdump is None:
if self.flow_id == 0:
self.pktdump = scapy.all.PcapWriter('packet-{}.pcap'.format(self.__suffix),
append=True, sync=True)
else:
self.pktdump = scapy.all.PcapWriter('flow-{}-{}.pcap'.format(self.__suffix, self.flow_id),
append=False, sync=True)
for packet in self.packets:
self.pktdump.write(PcapPacket.getIp(packet))
self.pktdump.close()
self.__dumped = True
return True
def verify(self):
return self.flow_mgr.verifyFlows()
def defaultArgumentParser():
parser = argparse.ArgumentParser(description='nDPIsrvd options', formatter_class=argparse.ArgumentDefaultsHelpFormatter)