nDPid: Fixed base64encode bug which lead to invalid base64 strings.

* py-semantic-validation: Decode base64 raw packet data as well
 * nDPIsrvd.py: Added PACKETS_PLEN_MAX
 * nDPIsrvd.py: Improved JSON parse error/exception handling

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2023-01-09 01:30:40 +01:00
parent e9443d7618
commit 655393e953
7 changed files with 73 additions and 45 deletions

View File

@@ -20,9 +20,14 @@ after_script:
build_and_test_static_libndpi_tsan: build_and_test_static_libndpi_tsan:
script: script:
# test for NETWORK_BUFFER_MAX_SIZE C and Python value equality
- C_VAL=$(cat config.h | sed -n 's/^#define\s\+NETWORK_BUFFER_MAX_SIZE\s\+\([0-9]\+\).*$/\1/gp') - C_VAL=$(cat config.h | sed -n 's/^#define\s\+NETWORK_BUFFER_MAX_SIZE\s\+\([0-9]\+\).*$/\1/gp')
- PY_VAL=$(cat dependencies/nDPIsrvd.py | sed -n 's/^NETWORK_BUFFER_MAX_SIZE = \([0-9]\+\).*$/\1/gp') - PY_VAL=$(cat dependencies/nDPIsrvd.py | sed -n 's/^NETWORK_BUFFER_MAX_SIZE = \([0-9]\+\).*$/\1/gp')
- test ${C_VAL} = ${PY_VAL} - test ${C_VAL} = ${PY_VAL}
# test for nDPId_PACKETS_PLEN_MAX C and Python value equality
- C_VAL=$(cat config.h | sed -n 's/^#define\s\+nDPId_PACKETS_PLEN_MAX\s\+\([0-9]\+\).*$/\1/gp')
- PY_VAL=$(cat dependencies/nDPIsrvd.py | sed -n 's/^nDPId_PACKETS_PLEN_MAX = \([0-9]\+\).*$/\1/gp')
- test ${C_VAL} = ${PY_VAL}
# static linked build # static linked build
- mkdir build-clang-tsan - mkdir build-clang-tsan
- cd build-clang-tsan - cd build-clang-tsan

View File

@@ -36,7 +36,7 @@
#define nDPId_UDP_IDLE_TIME TIME_S_TO_US(180u) /* 180 sec */ #define nDPId_UDP_IDLE_TIME TIME_S_TO_US(180u) /* 180 sec */
#define nDPId_TCP_POST_END_FLOW_TIME TIME_S_TO_US(120u) /* 120 sec */ #define nDPId_TCP_POST_END_FLOW_TIME TIME_S_TO_US(120u) /* 120 sec */
#define nDPId_THREAD_DISTRIBUTION_SEED 0x03dd018b #define nDPId_THREAD_DISTRIBUTION_SEED 0x03dd018b
#define nDPId_PACKETS_PLEN_MAX (1024u * 8u) /* 8kB */ #define nDPId_PACKETS_PLEN_MAX 8192u /* 8kB */
#define nDPId_PACKETS_PER_FLOW_TO_SEND 15u #define nDPId_PACKETS_PER_FLOW_TO_SEND 15u
#define nDPId_PACKETS_PER_FLOW_TO_PROCESS NDPI_DEFAULT_MAX_NUM_PKTS_PER_FLOW_TO_DISSECT #define nDPId_PACKETS_PER_FLOW_TO_PROCESS NDPI_DEFAULT_MAX_NUM_PKTS_PER_FLOW_TO_DISSECT
#define nDPId_PACKETS_PER_FLOW_TO_ANALYZE 32u #define nDPId_PACKETS_PER_FLOW_TO_ANALYZE 32u

View File

@@ -22,6 +22,7 @@ DEFAULT_UNIX = '/tmp/ndpid-distributor.sock'
NETWORK_BUFFER_MIN_SIZE = 6 # NETWORK_BUFFER_LENGTH_DIGITS + 1 NETWORK_BUFFER_MIN_SIZE = 6 # NETWORK_BUFFER_LENGTH_DIGITS + 1
NETWORK_BUFFER_MAX_SIZE = 33792 # Please keep this value in sync with the one in config.h NETWORK_BUFFER_MAX_SIZE = 33792 # Please keep this value in sync with the one in config.h
nDPId_PACKETS_PLEN_MAX = 8192 # Please keep this value in sync with the one in config.h
PKT_TYPE_ETH_IP4 = 0x0800 PKT_TYPE_ETH_IP4 = 0x0800
PKT_TYPE_ETH_IP6 = 0x86DD PKT_TYPE_ETH_IP6 = 0x86DD
@@ -361,6 +362,7 @@ class nDPIsrvdSocket:
self.msglen = 0 self.msglen = 0
self.digitlen = 0 self.digitlen = 0
self.lines = [] self.lines = []
self.failed_lines = []
def timeout(self, timeout): def timeout(self, timeout):
self.sock.settimeout(timeout) self.sock.settimeout(timeout)
@@ -414,31 +416,39 @@ class nDPIsrvdSocket:
def parse(self, callback_json, callback_flow_cleanup, global_user_data): def parse(self, callback_json, callback_flow_cleanup, global_user_data):
retval = True retval = True
index = 0
for received_line in self.lines: for received_line in self.lines:
try: try:
json_dict = json.loads(received_line[0].decode('ascii', errors='replace'), strict=True) json_dict = json.loads(received_line[0].decode('ascii', errors='replace'), strict=True)
except json.decoder.JSONDecodeError as err: except json.decoder.JSONDecodeError as e:
sys.stderr.write('\nFATAL: JSON decode failed at line "{}"\n'.format(received_line[0].decode('ascii', errors='replace'))) self.failed_lines += [received_line]
sys.stderr.write('\n{}\n'.format(str(err))) self.lines = self.lines[1:]
retval = False raise(e)
instance = self.flow_mgr.getInstance(json_dict) instance = self.flow_mgr.getInstance(json_dict)
if instance is None: if instance is None:
self.failed_lines += [received_line]
retval = False retval = False
continue continue
if callback_json(json_dict, instance, self.flow_mgr.getFlow(instance, json_dict), global_user_data) is not True: try:
retval = False if callback_json(json_dict, instance, self.flow_mgr.getFlow(instance, json_dict), global_user_data) is not True:
self.failed_lines += [received_line]
retval = False
except Exception as e:
self.failed_lines += [received_line]
self.lines = self.lines[1:]
raise(e)
for _, flow in self.flow_mgr.getFlowsToCleanup(instance, json_dict).items(): for _, flow in self.flow_mgr.getFlowsToCleanup(instance, json_dict).items():
if callback_flow_cleanup is None: if callback_flow_cleanup is None:
pass pass
elif callback_flow_cleanup(instance, flow, global_user_data) is not True: elif callback_flow_cleanup(instance, flow, global_user_data) is not True:
self.failed_lines += [received_line]
self.lines = self.lines[1:]
retval = False retval = False
index += 1
self.lines = self.lines[index:] self.lines = self.lines[1:]
return retval return retval
@@ -462,6 +472,8 @@ class nDPIsrvdSocket:
return self.flow_mgr.doShutdown().items() return self.flow_mgr.doShutdown().items()
def verify(self): def verify(self):
if len(self.failed_lines) > 0:
raise nDPIsrvdException('Failed lines > 0: {}'.format(len(self.failed_lines)))
return self.flow_mgr.verifyFlows() return self.flow_mgr.verifyFlows()
def defaultArgumentParser(desc='nDPIsrvd Python Interface', def defaultArgumentParser(desc='nDPIsrvd Python Interface',

View File

@@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import base64
import os import os
import sys import sys
@@ -73,6 +74,24 @@ class SemanticValidationException(Exception):
else: else:
return 'Flow ID {}: {}'.format(self.current_flow.flow_id, self.text) return 'Flow ID {}: {}'.format(self.current_flow.flow_id, self.text)
def verifyFlows(nsock, instance):
invalid_flows = nsock.verify()
if len(invalid_flows) > 0:
invalid_flows_str = ''
for flow_id in invalid_flows:
flow = instance.flows[flow_id]
try:
l4_proto = flow.l4_proto
except AttributeError:
l4_proto = 'n/a'
invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time,
flow.flow_last_seen, flow.flow_idle_time,
instance.most_recent_flow_time,
instance.most_recent_flow_time -
(flow.flow_last_seen + flow.flow_idle_time))
raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2]))
def onFlowCleanup(instance, current_flow, global_user_data): def onFlowCleanup(instance, current_flow, global_user_data):
if type(instance) is not nDPIsrvd.Instance: if type(instance) is not nDPIsrvd.Instance:
raise SemanticValidationException(current_flow, raise SemanticValidationException(current_flow,
@@ -100,28 +119,14 @@ def onFlowCleanup(instance, current_flow, global_user_data):
except AttributeError: except AttributeError:
l4_proto = 'n/a' l4_proto = 'n/a'
invalid_flows = stats.nsock.verify() verifyFlows(stats.nsock, instance)
if len(invalid_flows) > 0:
invalid_flows_str = ''
for flow_id in invalid_flows:
flow = instance.flows[flow_id]
try:
l4_proto = flow.l4_proto
except AttributeError:
l4_proto = 'n/a'
invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time,
flow.flow_last_seen, flow.flow_idle_time,
instance.most_recent_flow_time,
instance.most_recent_flow_time -
(flow.flow_last_seen + flow.flow_idle_time))
raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2]))
return True return True
def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
_, stats = global_user_data _, stats = global_user_data
stats.incrementEventCounter(json_dict) stats.incrementEventCounter(json_dict)
verifyFlows(stats.nsock, instance)
if type(instance) is not nDPIsrvd.Instance: if type(instance) is not nDPIsrvd.Instance:
raise SemanticValidationException(current_flow, raise SemanticValidationException(current_flow,
@@ -213,6 +218,8 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
pass pass
if 'packet_event_name' in json_dict: if 'packet_event_name' in json_dict:
base64.b64decode(json_dict['pkt'], validate=True)
if json_dict['packet_event_name'] == 'packet-flow': if json_dict['packet_event_name'] == 'packet-flow':
if lowest_possible_packet_id > json_dict['packet_id']: if lowest_possible_packet_id > json_dict['packet_id']:
raise SemanticValidationException(current_flow, raise SemanticValidationException(current_flow,
@@ -342,6 +349,10 @@ if __name__ == '__main__':
sys.stderr.write('\n{}\n'.format(err)) sys.stderr.write('\n{}\n'.format(err))
except KeyboardInterrupt: except KeyboardInterrupt:
print() print()
except Exception as e:
for failed_line in nsock.failed_lines:
sys.stderr.write('Affected JSON line: {}\n'.format(failed_line[0]))
raise(e)
sys.stderr.write('\nEvent counter:\n' + stats.getEventCounterStr() + '\n') sys.stderr.write('\nEvent counter:\n' + stats.getEventCounterStr() + '\n')
if args.strict is True: if args.strict is True:

View File

@@ -2406,7 +2406,7 @@ static void base64encode(uint8_t const * const data_buf,
* if we have one byte available, then its encoding is spread * if we have one byte available, then its encoding is spread
* out over two characters * out over two characters
*/ */
if (resultIndex + 2 >= *resultSize - padCount - 1) if (resultIndex + 2 >= *resultSize - (3 - padCount))
{ {
break; break;
} }
@@ -2419,7 +2419,7 @@ static void base64encode(uint8_t const * const data_buf,
*/ */
if ((x + 1) < dataLength) if ((x + 1) < dataLength)
{ {
if (resultIndex + 1 >= *resultSize - padCount - 1) if (resultIndex + 1 >= *resultSize - (3 - padCount))
{ {
break; break;
} }
@@ -2432,7 +2432,7 @@ static void base64encode(uint8_t const * const data_buf,
*/ */
if ((x + 2) < dataLength) if ((x + 2) < dataLength)
{ {
if (resultIndex + 1 >= *resultSize - padCount - 1) if (resultIndex + 1 >= *resultSize - (3 - padCount))
{ {
break; break;
} }

File diff suppressed because one or more lines are too long

View File

@@ -1,5 +1,5 @@
PUTVAL "localhost/exec-nDPIsrvd/gauge-json_lines" interval=60 N:1299 PUTVAL "localhost/exec-nDPIsrvd/gauge-json_lines" interval=60 N:1299
PUTVAL "localhost/exec-nDPIsrvd/gauge-json_bytes" interval=60 N:1466332 PUTVAL "localhost/exec-nDPIsrvd/gauge-json_bytes" interval=60 N:1466306
PUTVAL "localhost/exec-nDPIsrvd/gauge-flow_new_count" interval=60 N:197 PUTVAL "localhost/exec-nDPIsrvd/gauge-flow_new_count" interval=60 N:197
PUTVAL "localhost/exec-nDPIsrvd/gauge-flow_end_count" interval=60 N:9 PUTVAL "localhost/exec-nDPIsrvd/gauge-flow_end_count" interval=60 N:9
PUTVAL "localhost/exec-nDPIsrvd/gauge-flow_idle_count" interval=60 N:188 PUTVAL "localhost/exec-nDPIsrvd/gauge-flow_idle_count" interval=60 N:188