mirror of
				https://github.com/optim-enterprises-bv/nDPId.git
				synced 2025-10-29 09:22:23 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			632 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			632 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| import argparse
 | |
| import array
 | |
| import json
 | |
| import re
 | |
| import os
 | |
| import stat
 | |
| import socket
 | |
| import sys
 | |
| 
 | |
| try:
 | |
|     from colorama import Back, Fore, Style
 | |
|     USE_COLORAMA=True
 | |
| except ImportError:
 | |
|     sys.stderr.write('Python module colorama not found, using fallback.\n')
 | |
|     USE_COLORAMA=False
 | |
| 
 | |
| DEFAULT_HOST = '127.0.0.1'
 | |
| DEFAULT_PORT = 7000
 | |
| DEFAULT_UNIX = '/tmp/ndpid-distributor.sock'
 | |
| 
 | |
| NETWORK_BUFFER_MIN_SIZE = 6 # NETWORK_BUFFER_LENGTH_DIGITS + 1
 | |
| NETWORK_BUFFER_MAX_SIZE = 33792 # Please keep this value in sync with the one in config.h
 | |
| nDPId_PACKETS_PLEN_MAX = 8192 # Please keep this value in sync with the one in config.h
 | |
| 
 | |
| PKT_TYPE_ETH_IP4 = 0x0800
 | |
| PKT_TYPE_ETH_IP6 = 0x86DD
 | |
| 
 | |
| 
 | |
| class TermColor:
 | |
|     HINT    = '\033[33m'
 | |
|     WARNING = '\033[93m'
 | |
|     FAIL    = '\033[91m'
 | |
|     BOLD    = '\033[1m'
 | |
|     END     = '\033[0m'
 | |
|     BLINK   = '\x1b[5m'
 | |
| 
 | |
|     if USE_COLORAMA is True:
 | |
|         COLOR_TUPLES = [ (Fore.BLUE, [Back.RED, Back.MAGENTA, Back.WHITE]),
 | |
|                          (Fore.CYAN, [Back.MAGENTA, Back.RED, Back.WHITE]),
 | |
|                          (Fore.GREEN, [Back.YELLOW, Back.RED, Back.MAGENTA, Back.WHITE]),
 | |
|                          (Fore.MAGENTA, [Back.CYAN, Back.BLUE, Back.WHITE]),
 | |
|                          (Fore.RED, [Back.GREEN, Back.BLUE, Back.WHITE]),
 | |
|                          (Fore.WHITE, [Back.BLACK, Back.MAGENTA, Back.RED, Back.BLUE]),
 | |
|                          (Fore.YELLOW, [Back.RED, Back.CYAN, Back.BLUE, Back.WHITE]),
 | |
|                          (Fore.LIGHTBLUE_EX, [Back.LIGHTRED_EX, Back.RED]),
 | |
|                          (Fore.LIGHTCYAN_EX, [Back.LIGHTMAGENTA_EX, Back.MAGENTA]),
 | |
|                          (Fore.LIGHTGREEN_EX, [Back.LIGHTYELLOW_EX, Back.YELLOW]),
 | |
|                          (Fore.LIGHTMAGENTA_EX, [Back.LIGHTCYAN_EX, Back.CYAN]),
 | |
|                          (Fore.LIGHTRED_EX, [Back.LIGHTGREEN_EX, Back.GREEN]),
 | |
|                          (Fore.LIGHTWHITE_EX, [Back.LIGHTBLACK_EX, Back.BLACK]),
 | |
|                          (Fore.LIGHTYELLOW_EX, [Back.LIGHTRED_EX, Back.RED]) ]
 | |
| 
 | |
|     @staticmethod
 | |
|     def disableColor():
 | |
|         TermColor.HINT    = ''
 | |
|         TermColor.WARNING = ''
 | |
|         TermColor.FAIL    = ''
 | |
|         TermColor.BOLD    = ''
 | |
|         TermColor.END     = ''
 | |
|         TermColor.BLINK   = ''
 | |
|         global USE_COLORAMA
 | |
|         USE_COLORAMA      = False
 | |
| 
 | |
|     @staticmethod
 | |
|     def calcColorHash(string):
 | |
|         h = 0
 | |
|         for char in string:
 | |
|             h += ord(char)
 | |
|         return h
 | |
| 
 | |
|     @staticmethod
 | |
|     def getColorsByHash(string):
 | |
|         h = TermColor.calcColorHash(string)
 | |
|         tuple_index = h % len(TermColor.COLOR_TUPLES)
 | |
|         bg_tuple_index = h % len(TermColor.COLOR_TUPLES[tuple_index][1])
 | |
|         return (TermColor.COLOR_TUPLES[tuple_index][0],
 | |
|                 TermColor.COLOR_TUPLES[tuple_index][1][bg_tuple_index])
 | |
| 
 | |
|     @staticmethod
 | |
|     def setColorByString(string):
 | |
|         global USE_COLORAMA
 | |
|         if USE_COLORAMA is True:
 | |
|             fg_color, bg_color = TermColor.getColorsByHash(string)
 | |
|             return '{}{}{}{}{}'.format(Style.BRIGHT, fg_color, bg_color, string, Style.RESET_ALL)
 | |
|         else:
 | |
|             return '{}{}{}'.format(TermColor.BOLD, string, TermColor.END)
 | |
| 
 | |
| class ThreadData:
 | |
|     pass
 | |
| 
 | |
| class Instance:
 | |
| 
 | |
|     def __init__(self, alias, source):
 | |
|         self.alias = str(alias)
 | |
|         self.source = str(source)
 | |
|         self.flows = dict()
 | |
|         self.thread_data = dict()
 | |
| 
 | |
|     def __str__(self):
 | |
|         return '<%s.%s object at %s with alias %s, source %s>' % (
 | |
|             self.__class__.__module__,
 | |
|             self.__class__.__name__,
 | |
|             hex(id(self)),
 | |
|             self.alias,
 | |
|             self.source
 | |
|         )
 | |
| 
 | |
|     def getThreadData(self, thread_id):
 | |
|         if thread_id not in self.thread_data:
 | |
|             return None
 | |
|         return self.thread_data[thread_id]
 | |
| 
 | |
|     def getThreadDataFromJSON(self, json_dict):
 | |
|         if 'thread_id' not in json_dict:
 | |
|             return None
 | |
|         return self.getThreadData(json_dict['thread_id'])
 | |
| 
 | |
|     def getMostRecentFlowTime(self, thread_id):
 | |
|         return self.thread_data[thread_id].most_recent_flow_time
 | |
| 
 | |
|     def setMostRecentFlowTime(self, thread_id, most_recent_flow_time):
 | |
|         if thread_id in self.thread_data:
 | |
|             return self.thread_data[thread_id]
 | |
| 
 | |
|         self.thread_data[thread_id] = ThreadData()
 | |
|         self.thread_data[thread_id].most_recent_flow_time = most_recent_flow_time
 | |
|         return self.thread_data[thread_id]
 | |
| 
 | |
|     def getMostRecentFlowTimeFromJSON(self, json_dict):
 | |
|         if 'thread_id' not in json_dict:
 | |
|             return 0
 | |
|         return self.getThreadData(json_dict['thread_id']).most_recent_flow_time
 | |
| 
 | |
|     def setMostRecentFlowTimeFromJSON(self, json_dict):
 | |
|         if 'thread_id' not in json_dict:
 | |
|             return
 | |
|         thread_id = json_dict['thread_id']
 | |
|         if 'thread_ts_usec' in json_dict:
 | |
|             mrtf = self.getMostRecentFlowTime(thread_id) if thread_id in self.thread_data else 0
 | |
|             self.setMostRecentFlowTime(thread_id, max(json_dict['thread_ts_usec'], mrtf))
 | |
| 
 | |
| class Flow:
 | |
| 
 | |
|     def __init__(self, flow_id, thread_id):
 | |
|         self.flow_id = flow_id
 | |
|         self.thread_id = thread_id
 | |
|         self.flow_last_seen = -1
 | |
|         self.flow_idle_time = -1
 | |
|         self.cleanup_reason = -1
 | |
| 
 | |
|     def __str__(self):
 | |
|         return '<%s.%s object at %s with flow id %d>' % (
 | |
|             self.__class__.__module__,
 | |
|             self.__class__.__name__,
 | |
|             hex(id(self)),
 | |
|             self.flow_id
 | |
|         )
 | |
| 
 | |
| class FlowManager:
 | |
|     CLEANUP_REASON_INVALID         = 0
 | |
|     CLEANUP_REASON_DAEMON_INIT     = 1 # can happen if kill -SIGKILL $(pidof nDPId) or restart after SIGSEGV
 | |
|     CLEANUP_REASON_DAEMON_SHUTDOWN = 2 # graceful shutdown e.g. kill -SIGTERM $(pidof nDPId)
 | |
|     CLEANUP_REASON_FLOW_END        = 3
 | |
|     CLEANUP_REASON_FLOW_IDLE       = 4
 | |
|     CLEANUP_REASON_FLOW_TIMEOUT    = 5 # nDPId died a long time ago w/o restart?
 | |
|     CLEANUP_REASON_APP_SHUTDOWN    = 6 # your python app called FlowManager.doShutdown()
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.instances = dict()
 | |
| 
 | |
|     def getInstance(self, json_dict):
 | |
|         if 'alias' not in json_dict or \
 | |
|            'source' not in json_dict:
 | |
|             return None
 | |
| 
 | |
|         alias  = json_dict['alias']
 | |
|         source = json_dict['source']
 | |
| 
 | |
|         if alias not in self.instances:
 | |
|             self.instances[alias] = dict()
 | |
|         if source not in self.instances[alias]:
 | |
|             self.instances[alias][source] = Instance(alias, source)
 | |
| 
 | |
|         self.instances[alias][source].setMostRecentFlowTimeFromJSON(json_dict)
 | |
| 
 | |
|         return self.instances[alias][source]
 | |
| 
 | |
|     @staticmethod
 | |
|     def getLastPacketTime(instance, flow_id, json_dict):
 | |
|         return max(int(json_dict['flow_src_last_pkt_time']), int(json_dict['flow_dst_last_pkt_time']), instance.flows[flow_id].flow_last_seen)
 | |
| 
 | |
|     def getFlow(self, instance, json_dict):
 | |
|         if 'flow_id' not in json_dict:
 | |
|             return None
 | |
| 
 | |
|         flow_id = int(json_dict['flow_id'])
 | |
| 
 | |
|         if flow_id in instance.flows:
 | |
|             instance.flows[flow_id].flow_last_seen = FlowManager.getLastPacketTime(instance, flow_id, json_dict)
 | |
|             instance.flows[flow_id].flow_idle_time = int(json_dict['flow_idle_time'])
 | |
|             return instance.flows[flow_id]
 | |
| 
 | |
|         thread_id = int(json_dict['thread_id'])
 | |
|         instance.flows[flow_id] = Flow(flow_id, thread_id)
 | |
|         instance.flows[flow_id].flow_last_seen = FlowManager.getLastPacketTime(instance, flow_id, json_dict)
 | |
|         instance.flows[flow_id].flow_idle_time = int(json_dict['flow_idle_time'])
 | |
|         instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_INVALID
 | |
| 
 | |
|         return instance.flows[flow_id]
 | |
| 
 | |
|     def getFlowsToCleanup(self, instance, json_dict):
 | |
|         flows = dict()
 | |
| 
 | |
|         if 'daemon_event_name' in json_dict:
 | |
|             if json_dict['daemon_event_name'].lower() == 'init' or \
 | |
|                json_dict['daemon_event_name'].lower() == 'shutdown':
 | |
|                 # invalidate all existing flows with that alias/source/thread_id
 | |
|                 for flow_id in instance.flows:
 | |
|                     flow = instance.flows[flow_id]
 | |
|                     if flow.thread_id != int(json_dict['thread_id']):
 | |
|                         continue
 | |
|                     if json_dict['daemon_event_name'].lower() == 'init':
 | |
|                         flow.cleanup_reason = FlowManager.CLEANUP_REASON_DAEMON_INIT
 | |
|                     else:
 | |
|                         flow.cleanup_reason = FlowManager.CLEANUP_REASON_DAEMON_SHUTDOWN
 | |
|                     flows[flow_id] = flow
 | |
|                 for flow_id in flows:
 | |
|                     del instance.flows[flow_id]
 | |
|                 if len(instance.flows) == 0:
 | |
|                     del self.instances[instance.alias][instance.source]
 | |
| 
 | |
|         elif 'flow_event_name' in json_dict and \
 | |
|            (json_dict['flow_event_name'].lower() == 'end' or \
 | |
|             json_dict['flow_event_name'].lower() == 'idle' or \
 | |
|             json_dict['flow_event_name'].lower() == 'guessed' or \
 | |
|             json_dict['flow_event_name'].lower() == 'not-detected' or \
 | |
|             json_dict['flow_event_name'].lower() == 'detected'):
 | |
|             flow_id = json_dict['flow_id']
 | |
|             if json_dict['flow_event_name'].lower() == 'end':
 | |
|                 instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_FLOW_END
 | |
|             elif json_dict['flow_event_name'].lower() == 'idle':
 | |
|                 instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_FLOW_IDLE
 | |
|             # TODO: Flow Guessing/Detection can happen right before an idle event.
 | |
|             #       We need to prevent that it results in a CLEANUP_REASON_FLOW_TIMEOUT.
 | |
|             #       This may cause inconsistency and needs to be handled in another way.
 | |
|             if json_dict['flow_event_name'].lower() != 'guessed' and \
 | |
|                json_dict['flow_event_name'].lower() != 'not-detected' and \
 | |
|                json_dict['flow_event_name'].lower() != 'detected':
 | |
|                 flows[flow_id] = instance.flows.pop(flow_id)
 | |
| 
 | |
|         elif 'flow_last_seen' in json_dict:
 | |
|             if int(json_dict['flow_last_seen']) + int(json_dict['flow_idle_time']) < \
 | |
|                instance.getMostRecentFlowTimeFromJSON(json_dict):
 | |
|                 flow_id = json_dict['flow_id']
 | |
|                 instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_FLOW_TIMEOUT
 | |
|                 flows[flow_id] = instance.flows.pop(flow_id)
 | |
| 
 | |
|         return flows
 | |
| 
 | |
|     def doShutdown(self):
 | |
|         flows = dict()
 | |
| 
 | |
|         for alias in self.instances:
 | |
|             for source in self.instances[alias]:
 | |
|                 for flow_id in self.instances[alias][source].flows:
 | |
|                     flow = self.instances[alias][source].flows[flow_id]
 | |
|                     flow.cleanup_reason = FlowManager.CLEANUP_REASON_APP_SHUTDOWN
 | |
|                     flows[flow_id] = flow
 | |
| 
 | |
|         del self.instances
 | |
| 
 | |
|         return flows
 | |
| 
 | |
|     def verifyFlows(self):
 | |
|         invalid_flows = list()
 | |
| 
 | |
|         for alias in self.instances:
 | |
|             for source in self.instances[alias]:
 | |
|                 for flow_id in self.instances[alias][source].flows:
 | |
|                     thread_id = self.instances[alias][source].flows[flow_id].thread_id
 | |
|                     if self.instances[alias][source].flows[flow_id].flow_last_seen + \
 | |
|                        self.instances[alias][source].flows[flow_id].flow_idle_time < \
 | |
|                        self.instances[alias][source].getMostRecentFlowTime(thread_id):
 | |
|                         invalid_flows += [flow_id]
 | |
| 
 | |
|         return invalid_flows
 | |
| 
 | |
| class nDPIsrvdException(Exception):
 | |
|     UNSUPPORTED_ADDRESS_TYPE = 1
 | |
|     BUFFER_CAPACITY_REACHED  = 2
 | |
|     SOCKET_CONNECTION_BROKEN = 3
 | |
|     INVALID_LINE_RECEIVED    = 4
 | |
|     CALLBACK_RETURNED_FALSE  = 5
 | |
|     SOCKET_TIMEOUT           = 6
 | |
|     JSON_DECODE_ERROR        = 7
 | |
| 
 | |
|     def __init__(self, etype):
 | |
|         self.etype = etype
 | |
|     def __str__(self):
 | |
|         return 'nDPIsrvdException type {}'.format(self.etype)
 | |
| 
 | |
| class UnsupportedAddressType(nDPIsrvdException):
 | |
|     def __init__(self, addr):
 | |
|         super().__init__(nDPIsrvdException.UNSUPPORTED_ADDRESS_TYPE)
 | |
|         self.addr = addr
 | |
|     def __str__(self):
 | |
|         return '{}'.format(str(self.addr))
 | |
| 
 | |
| class BufferCapacityReached(nDPIsrvdException):
 | |
|     def __init__(self, current_length, max_length):
 | |
|         super().__init__(nDPIsrvdException.BUFFER_CAPACITY_REACHED)
 | |
|         self.current_length = current_length
 | |
|         self.max_length = max_length
 | |
|     def __str__(self):
 | |
|         return '{} of {} bytes'.format(self.current_length, self.max_length)
 | |
| 
 | |
| class SocketConnectionBroken(nDPIsrvdException):
 | |
|     def __init__(self):
 | |
|         super().__init__(nDPIsrvdException.SOCKET_CONNECTION_BROKEN)
 | |
|     def __str__(self):
 | |
|         return 'Disconnected.'
 | |
| 
 | |
| class InvalidLineReceived(nDPIsrvdException):
 | |
|     def __init__(self, packet_buffer):
 | |
|         super().__init__(nDPIsrvdException.INVALID_LINE_RECEIVED)
 | |
|         self.packet_buffer = packet_buffer
 | |
|     def __str__(self):
 | |
|         return 'Received JSON line is invalid.'
 | |
| 
 | |
| class CallbackReturnedFalse(nDPIsrvdException):
 | |
|     def __init__(self):
 | |
|         super().__init__(nDPIsrvdException.CALLBACK_RETURNED_FALSE)
 | |
|     def __str__(self):
 | |
|         return 'Callback returned False, abort.'
 | |
| 
 | |
| class SocketTimeout(nDPIsrvdException):
 | |
|     def __init__(self):
 | |
|         super().__init__(nDPIsrvdException.SOCKET_TIMEOUT)
 | |
|     def __str__(self):
 | |
|         return 'Socket timeout.'
 | |
| 
 | |
| class JsonDecodeError(nDPIsrvdException):
 | |
|     def __init__(self, json_exception, failed_line):
 | |
|         super().__init__(nDPIsrvdException.JSON_DECODE_ERROR)
 | |
|         self.json_exception = json_exception
 | |
|         self.failed_line = failed_line
 | |
|     def __str__(self):
 | |
|         return '{}: {}'.format(self.json_exception, self.failed_line)
 | |
| 
 | |
| class JsonFilter():
 | |
|     def __init__(self, filter_string):
 | |
|         self.filter_string = filter_string
 | |
|         self.filter = compile(filter_string, '<string>', 'eval')
 | |
|     def evaluate(self, json_dict):
 | |
|         if type(json_dict) is not dict:
 | |
|             raise nDPIsrvdException('Could not evaluate JSON Filter: expected dictionary, got {}'.format(type(json_dict)))
 | |
|         return eval(self.filter, {'json_dict': json_dict})
 | |
| 
 | |
| class nDPIsrvdSocket:
 | |
|     def __init__(self):
 | |
|         self.sock_family = None
 | |
|         self.flow_mgr = FlowManager()
 | |
|         self.received_bytes = 0
 | |
|         self.json_filter = list()
 | |
| 
 | |
|     def addFilter(self, filter_str):
 | |
|         self.json_filter.append(JsonFilter(filter_str))
 | |
| 
 | |
|     def evalFilters(self, json_dict):
 | |
|         for jf in self.json_filter:
 | |
|             try:
 | |
|                 json_filter_retval = jf.evaluate(json_dict)
 | |
|             except Exception as err:
 | |
|                 print()
 | |
|                 sys.stderr.write('Error while evaluating expression "{}"\n'.format(jf.filter_string))
 | |
|                 raise err
 | |
| 
 | |
|             if not isinstance(json_filter_retval, bool):
 | |
|                 print()
 | |
|                 sys.stderr.write('Error while evaluating expression "{}"\n'.format(jf.filter_string))
 | |
|                 raise nDPIsrvdException('JSON Filter returned an invalid type: expected bool, got {}'.format(type(json_filter_retval)))
 | |
| 
 | |
|             if json_filter_retval is False:
 | |
|                 return False
 | |
| 
 | |
|         return True
 | |
| 
 | |
|     def connect(self, addr):
 | |
|         if type(addr) is tuple:
 | |
|             self.sock_family = socket.AF_INET
 | |
|         elif type(addr) is str:
 | |
|             self.sock_family = socket.AF_UNIX
 | |
|         else:
 | |
|             raise UnsupportedAddressType(addr)
 | |
| 
 | |
|         self.sock = socket.socket(self.sock_family, socket.SOCK_STREAM)
 | |
|         self.sock.connect(addr)
 | |
|         self.buffer = bytes()
 | |
|         self.msglen = 0
 | |
|         self.digitlen = 0
 | |
|         self.lines = []
 | |
|         self.failed_lines = []
 | |
|         self.filtered_lines = 0
 | |
| 
 | |
|     def timeout(self, timeout):
 | |
|         self.sock.settimeout(timeout)
 | |
| 
 | |
|     def receive(self):
 | |
|         if len(self.buffer) == NETWORK_BUFFER_MAX_SIZE:
 | |
|             raise BufferCapacityReached(len(self.buffer), NETWORK_BUFFER_MAX_SIZE)
 | |
| 
 | |
|         connection_finished = False
 | |
|         try:
 | |
|             recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer))
 | |
|         except ConnectionResetError:
 | |
|             connection_finished = True
 | |
|             recvd = bytes()
 | |
|         except TimeoutError:
 | |
|             raise SocketTimeout()
 | |
|         except socket.timeout:
 | |
|             raise SocketTimeout()
 | |
| 
 | |
|         if len(recvd) == 0:
 | |
|             connection_finished = True
 | |
| 
 | |
|         self.buffer += recvd
 | |
| 
 | |
|         new_data_avail = False
 | |
|         while self.msglen + self.digitlen <= len(self.buffer):
 | |
| 
 | |
|             if self.msglen == 0:
 | |
|                 starts_with_digits = re.match(r'(^\d+){', self.buffer[:NETWORK_BUFFER_MIN_SIZE].decode(errors='strict'))
 | |
|                 if starts_with_digits is None:
 | |
|                     if len(self.buffer) < NETWORK_BUFFER_MIN_SIZE:
 | |
|                         break
 | |
|                     raise InvalidLineReceived(self.buffer)
 | |
|                 self.msglen = int(starts_with_digits.group(1))
 | |
|                 self.digitlen = len(starts_with_digits.group(1))
 | |
| 
 | |
|             if len(self.buffer) >= self.msglen + self.digitlen:
 | |
|                 recvd = self.buffer[self.digitlen:self.msglen + self.digitlen]
 | |
|                 self.buffer = self.buffer[self.msglen + self.digitlen:]
 | |
|                 self.lines += [(recvd,self.msglen,self.digitlen)]
 | |
|                 new_data_avail = True
 | |
| 
 | |
|                 self.received_bytes += self.msglen + self.digitlen
 | |
|                 self.msglen = 0
 | |
|                 self.digitlen = 0
 | |
| 
 | |
|         if connection_finished is True:
 | |
|             raise SocketConnectionBroken()
 | |
| 
 | |
|         return new_data_avail
 | |
| 
 | |
|     def parse(self, callback_json, callback_flow_cleanup, global_user_data):
 | |
|         retval = True
 | |
| 
 | |
|         for received_line in self.lines:
 | |
|             try:
 | |
|                 json_dict = json.loads(received_line[0].decode('ascii', errors='replace'), strict=True)
 | |
|             except json.decoder.JSONDecodeError as e:
 | |
|                 json_dict = dict()
 | |
|                 self.failed_lines += [received_line]
 | |
|                 self.lines = self.lines[1:]
 | |
|                 raise JsonDecodeError(e, received_line)
 | |
| 
 | |
|             instance = self.flow_mgr.getInstance(json_dict)
 | |
|             if instance is None:
 | |
|                 self.failed_lines += [received_line]
 | |
|                 retval = False
 | |
|                 continue
 | |
| 
 | |
|             current_flow = self.flow_mgr.getFlow(instance, json_dict)
 | |
|             filter_eval = self.evalFilters(json_dict)
 | |
|             if filter_eval is True:
 | |
|                 try:
 | |
|                     if callback_json(json_dict, instance, current_flow, global_user_data) is not True:
 | |
|                         self.failed_lines += [received_line]
 | |
|                         retval = False
 | |
|                 except Exception as e:
 | |
|                         self.failed_lines += [received_line]
 | |
|                         self.lines = self.lines[1:]
 | |
|                         raise(e)
 | |
|             else:
 | |
|                 self.filtered_lines += 1
 | |
| 
 | |
|             for _, flow in self.flow_mgr.getFlowsToCleanup(instance, json_dict).items():
 | |
|                 if callback_flow_cleanup is None:
 | |
|                     pass
 | |
|                 elif filter_eval is True and callback_flow_cleanup(instance, flow, global_user_data) is not True:
 | |
|                     self.failed_lines += [received_line]
 | |
|                     self.lines = self.lines[1:]
 | |
|                     retval = False
 | |
| 
 | |
|             self.lines = self.lines[1:]
 | |
| 
 | |
|         return retval
 | |
| 
 | |
|     def loop(self, callback_json, callback_flow_cleanup, global_user_data):
 | |
|         throw_ex = None
 | |
| 
 | |
|         while True:
 | |
|             bytes_recv = 0
 | |
|             try:
 | |
|                 bytes_recv = self.receive()
 | |
|             except Exception as err:
 | |
|                 throw_ex = err
 | |
| 
 | |
|             if self.parse(callback_json, callback_flow_cleanup, global_user_data) is False:
 | |
|                 raise CallbackReturnedFalse()
 | |
| 
 | |
|             if throw_ex is not None:
 | |
|                 raise throw_ex
 | |
| 
 | |
|     def shutdown(self):
 | |
|         return self.flow_mgr.doShutdown().items()
 | |
| 
 | |
|     def verify(self):
 | |
|         if len(self.failed_lines) > 0:
 | |
|             raise nDPIsrvdException('Failed lines > 0: {}'.format(len(self.failed_lines)))
 | |
|         return self.flow_mgr.verifyFlows()
 | |
| 
 | |
| def defaultArgumentParser(desc='nDPIsrvd Python Interface', enable_json_filter=False,
 | |
|                           help_formatter=argparse.ArgumentDefaultsHelpFormatter):
 | |
|     parser = argparse.ArgumentParser(description=desc, formatter_class=help_formatter)
 | |
|     parser.add_argument('--host', type=str, help='nDPIsrvd host IP')
 | |
|     parser.add_argument('--port', type=int, default=DEFAULT_PORT, help='nDPIsrvd TCP port')
 | |
|     parser.add_argument('--unix', type=str, help='nDPIsrvd unix socket path')
 | |
|     if enable_json_filter is True:
 | |
|         parser.add_argument('--filter', type=str, action='append',
 | |
|                             help='Set a filter string which if evaluates to True will invoke the JSON callback.\n'
 | |
|                                  'Example: json_dict[\'flow_event_name\'] == \'detected\' will only process \'detected\' events.')
 | |
|     return parser
 | |
| 
 | |
| def toSeconds(usec):
 | |
|     return usec / (1000 * 1000)
 | |
| 
 | |
| def validateAddress(args):
 | |
|     tcp_addr_set = False
 | |
|     address = None
 | |
| 
 | |
|     if args.host is None:
 | |
|         address_tcpip = (DEFAULT_HOST, args.port)
 | |
|     else:
 | |
|         address_tcpip = (args.host, args.port)
 | |
|         tcp_addr_set = True
 | |
| 
 | |
|     if args.unix is None:
 | |
|         address_unix = DEFAULT_UNIX
 | |
|     else:
 | |
|         address_unix = args.unix
 | |
| 
 | |
|     possible_sock_mode = 0
 | |
|     try:
 | |
|         possible_sock_mode = os.stat(address_unix).st_mode
 | |
|     except:
 | |
|         pass
 | |
|     if tcp_addr_set == False and stat.S_ISSOCK(possible_sock_mode):
 | |
|         address = address_unix
 | |
|     else:
 | |
|         address = address_tcpip
 | |
| 
 | |
|     return address
 | |
| 
 | |
| def prepareJsonFilter(args, nsock):
 | |
|     # HowTo use JSON Filters:
 | |
|     # Add `--filter [FILTER_STRING]` to the Python scripts that support JSON filtering.
 | |
|     # Examples:
 | |
|     #   ./examples/py-json-stdout/json-stdout.py --filter '"ndpi" in json_dict and "proto" in json_dict["ndpi"]'
 | |
|     #   The command above will print only JSONs that have the subobjects json_dict["ndpi"] and json_dict["ndpi"]["proto"] available.
 | |
|     #   ./examples/py-flow-info/flow-info.py --filter 'json_dict["source"] == "eth0"' --filter '"flow_event_name" in json_dict and json_dict["flow_event_name"] == "analyse"'
 | |
|     #   Multiple JSON filter will be ANDed together.
 | |
|     # Note: You may *only* use the global "json_dict" in your expressions.
 | |
|     try:
 | |
|         json_filter = args.filter
 | |
|         if json_filter is not None:
 | |
|             for jf in json_filter:
 | |
|                 nsock.addFilter(jf)
 | |
|     except AttributeError:
 | |
|         pass
 | |
| 
 | |
| global schema
 | |
| schema = {'packet_event_schema' : None, 'error_event_schema' : None, 'daemon_event_schema' : None, 'flow_event_schema' : None}
 | |
| 
 | |
| def initSchemaValidator(schema_dirs=[]):
 | |
|     if len(schema_dirs) == 0:
 | |
|         schema_dirs += [os.path.dirname(sys.argv[0]) + '/../../schema']
 | |
|         schema_dirs += [os.path.dirname(sys.argv[0]) + '/../share/nDPId']
 | |
|         schema_dirs += [sys.base_prefix + '/share/nDPId']
 | |
| 
 | |
|     for key in schema:
 | |
|         for schema_dir in schema_dirs:
 | |
|             try:
 | |
|                 with open(schema_dir + '/' + str(key) + '.json', 'r') as schema_file:
 | |
|                     schema[key] = json.load(schema_file)
 | |
|             except FileNotFoundError:
 | |
|                 continue
 | |
|             else:
 | |
|                 break
 | |
| 
 | |
| def validateAgainstSchema(json_dict):
 | |
|     import jsonschema
 | |
| 
 | |
|     if 'packet_event_id' in json_dict:
 | |
|         try:
 | |
|             jsonschema.Draft7Validator(schema=schema['packet_event_schema']).validate(instance=json_dict)
 | |
|         except AttributeError:
 | |
|             jsonschema.validate(instance=json_dict, schema=schema['packet_event_schema'])
 | |
|         return True
 | |
|     if 'error_event_id' in json_dict:
 | |
|         try:
 | |
|             jsonschema.Draft7Validator(schema=schema['error_event_schema']).validate(instance=json_dict)
 | |
|         except AttributeError:
 | |
|             jsonschema.validate(instance=json_dict, schema=schema['error_event_schema'])
 | |
|         return True
 | |
|     if 'daemon_event_id' in json_dict:
 | |
|         try:
 | |
|             jsonschema.Draft7Validator(schema=schema['daemon_event_schema']).validate(instance=json_dict)
 | |
|         except AttributeError:
 | |
|             jsonschema.validate(instance=json_dict, schema=schema['daemon_event_schema'])
 | |
|         return True
 | |
|     if 'flow_event_id' in json_dict:
 | |
|         try:
 | |
|             jsonschema.Draft7Validator(schema=schema['flow_event_schema']).validate(instance=json_dict)
 | |
|         except AttributeError:
 | |
|             jsonschema.validate(instance=json_dict, schema=schema['flow_event_schema'])
 | |
|         return True
 | |
| 
 | |
|     return False
 | 
