Added daemon event: DAEMON_EVENT_STATUS (periodically send's daemon statistics.)

* Improved distributor timeout handling (per-thread).
 * flow-info.py / flow-dash.py: Distinguish between flow risk severities.
 * nDPId: Skip tag switch datalink packet dissection / processing.
 * nDPId: Fixed incorrect value for current active flows.
 * Improved JSON schema's.

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2022-03-06 17:31:26 +01:00
parent 9db048c9d9
commit 46f68501d5
279 changed files with 62686 additions and 62855 deletions

View File

@@ -21,7 +21,7 @@ class Stats:
self.nsock = nDPIsrvd_sock
def resetEventCounter(self):
keys = ['init','reconnect','shutdown', \
keys = ['init','reconnect','shutdown','status', \
'new','end','idle','update',
'guessed','detected','detection-update','not-detected', \
'packet', 'packet-flow']
@@ -53,7 +53,7 @@ class Stats:
return True
def getEventCounterStr(self):
keys = [ [ 'init','reconnect','shutdown' ], \
keys = [ [ 'init','reconnect','shutdown','status' ], \
[ 'new','end','idle','update' ], \
[ 'guessed','detected','detection-update','not-detected' ], \
[ 'packet', 'packet-flow' ] ]
@@ -76,8 +76,6 @@ class SemanticValidationException(Exception):
return 'Flow ID {}: {}'.format(self.current_flow.flow_id, self.text)
def onFlowCleanup(instance, current_flow, global_user_data):
_, enable_timeout_check, _ = global_user_data
if type(instance) is not nDPIsrvd.Instance:
raise SemanticValidationException(current_flow,
'instance is not of type nDPIsrvd.Instance: ' \
@@ -99,37 +97,32 @@ def onFlowCleanup(instance, current_flow, global_user_data):
raise SemanticValidationException(current_flow,
'Unexpected flow cleanup reason: CLEANUP_REASON_FLOW_TIMEOUT')
if enable_timeout_check is True:
try:
l4_proto = current_flow.l4_proto
except AttributeError:
l4_proto = 'n/a'
try:
l4_proto = current_flow.l4_proto
except AttributeError:
l4_proto = 'n/a'
invalid_flows = stats.nsock.verify()
if len(invalid_flows) > 0:
invalid_flows_str = ''
for flow_id in invalid_flows:
flow = instance.flows[flow_id]
try:
l4_proto = flow.l4_proto
except AttributeError:
l4_proto = 'n/a'
invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time,
flow.flow_last_seen, flow.flow_idle_time,
instance.most_recent_flow_time,
instance.most_recent_flow_time -
(flow.flow_last_seen + flow.flow_idle_time))
invalid_flows = stats.nsock.verify()
if len(invalid_flows) > 0:
invalid_flows_str = ''
for flow_id in invalid_flows:
flow = instance.flows[flow_id]
try:
l4_proto = flow.l4_proto
except AttributeError:
l4_proto = 'n/a'
invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time,
flow.flow_last_seen, flow.flow_idle_time,
instance.most_recent_flow_time,
instance.most_recent_flow_time -
(flow.flow_last_seen + flow.flow_idle_time))
raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2]))
raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2]))
return True
class ThreadData(object):
lowest_possible_flow_id = 0
lowest_possible_packet_id = 0
def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
_, _, stats = global_user_data
_, stats = global_user_data
stats.incrementEventCounter(json_dict)
if type(instance) is not nDPIsrvd.Instance:
@@ -149,15 +142,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
'stats is not of type Stats: ' \
'{}'.format(type(stats)))
try:
thread_data_dict = instance.thread_data
except AttributeError:
thread_data_dict = instance.thread_data = dict()
if json_dict['thread_id'] in thread_data_dict:
td = thread_data_dict[json_dict['thread_id']]
else:
td = thread_data_dict[json_dict['thread_id']] = ThreadData()
td = instance.getThreadDataFromJSON(json_dict)
for event_name in ['basic_event_name', 'daemon_event_name',
'packet_event_name', 'flow_event_name']:
@@ -165,8 +150,12 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
raise SemanticValidationException(current_flow,
'Received an invalid event for {}'.format(event_name))
lowest_possible_flow_id = td.lowest_possible_flow_id
lowest_possible_packet_id = td.lowest_possible_packet_id
if td is not None:
lowest_possible_flow_id = getattr(td, 'lowest_possible_flow_id', 0)
lowest_possible_packet_id = getattr(td, 'lowest_possible_packet_id', 0)
else:
lowest_possible_flow_id = 0
lowest_possible_packet_id = 0
if current_flow is not None:
@@ -203,8 +192,8 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
'both required for timeout handling:' \
'flow_last_seen, flow_idle_time')
if 'ts_msec' in json_dict:
current_flow.ts_msec = int(json_dict['ts_msec'])
if 'thread_ts_msec' in json_dict:
current_flow.thread_ts_msec = int(json_dict['thread_ts_msec'])
if 'flow_packet_id' in json_dict:
try:
@@ -233,7 +222,8 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
'got {}'.format(json_dict['thread_id'],
lowest_possible_packet_id,
json_dict['packet_id']))
td.lowest_possible_packet_id = lowest_possible_packet_id
if td is not None:
td.lowest_possible_packet_id = lowest_possible_packet_id
if 'flow_id' in json_dict:
if current_flow.flow_id != json_dict['flow_id']:
@@ -275,13 +265,13 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
pass
try:
if json_dict['flow_first_seen'] > current_flow.ts_msec or \
json_dict['flow_last_seen'] > current_flow.ts_msec or \
if json_dict['flow_first_seen'] > current_flow.thread_ts_msec or \
json_dict['flow_last_seen'] > current_flow.thread_ts_msec or \
json_dict['flow_first_seen'] > json_dict['flow_last_seen']:
raise SemanticValidationException(current_flow,
'Last packet timestamp is invalid: ' \
'first_seen({}) <= {} >= last_seen({})'.format(json_dict['flow_first_seen'],
current_flow.ts_msec,
current_flow.thread_ts_msec,
json_dict['flow_last_seen']))
except AttributeError:
if json_dict['flow_event_name'] == 'new':
@@ -303,7 +293,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
pass
current_flow.flow_new_seen = True
current_flow.flow_packet_id = 0
if lowest_possible_flow_id == 0:
if lowest_possible_flow_id == 0 and td is not None:
td.lowest_possible_flow_id = current_flow.flow_id
elif json_dict['flow_event_name'] == 'detected' or \
json_dict['flow_event_name'] == 'not-detected':
@@ -337,8 +327,6 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
if __name__ == '__main__':
argparser = nDPIsrvd.defaultArgumentParser()
argparser.add_argument('--strict', action='store_true', default=False, help='Require and validate a full nDPId application lifecycle.')
argparser.add_argument('--enable-timeout-check', action='store_true', default=False,
help='Enable additional flow timeout validation. See README.md for more information')
args = argparser.parse_args()
address = nDPIsrvd.validateAddress(args)
@@ -349,7 +337,7 @@ if __name__ == '__main__':
nsock.connect(address)
stats = Stats(nsock)
try:
nsock.loop(onJsonLineRecvd, onFlowCleanup, (args.strict, args.enable_timeout_check, stats))
nsock.loop(onJsonLineRecvd, onFlowCleanup, (args.strict, stats))
except nDPIsrvd.SocketConnectionBroken as err:
sys.stderr.write('\n{}\n'.format(err))
except KeyboardInterrupt: