mirror of
				https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
				synced 2025-10-31 10:48:02 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			217 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			217 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| NAME: lf_pcap.py
 | |
| 
 | |
| PURPOSE:
 | |
| Common Library for reading pcap files and check packet information for specific filters
 | |
| 
 | |
| SETUP: This script requires pyshark and tshark to be installed before
 | |
| 
 | |
| EXAMPLE:
 | |
| see: /py-scritps/lf_pcap_test.py for example
 | |
| 
 | |
| COPYWRITE
 | |
|     Copyright 2021 Candela Technologies Inc
 | |
|     License: Free to distribute and modify. LANforge systems must be licensed.
 | |
| 
 | |
| INCLUDE_IN_README
 | |
| """
 | |
| import os
 | |
| import sys
 | |
| import argparse
 | |
| import pyshark as ps
 | |
| import importlib
 | |
| from datetime import datetime
 | |
| 
 | |
| sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
 | |
| 
 | |
| wifi_monitor = importlib.import_module("py-json.wifi_monitor_profile")
 | |
| WiFiMonitor = wifi_monitor.WifiMonitor
 | |
| lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
 | |
| LFCliBase = lfcli_base.LFCliBase
 | |
| realm = importlib.import_module("py-json.realm")
 | |
| Realm = realm.Realm
 | |
| cv_test_reports = importlib.import_module("py-json.cv_test_reports")
 | |
| lf_report = cv_test_reports.lanforge_reports
 | |
| 
 | |
| 
 | |
| class LfPcap(Realm):
 | |
|     def __init__(self,
 | |
|                  host="localhost", port=8080,
 | |
|                  _read_pcap_file=None,
 | |
|                  _apply_filter=None,
 | |
|                  _live_pcap_interface=None,
 | |
|                  _live_cap_timeout=None,
 | |
|                  _live_filter=None,
 | |
|                  _live_remote_cap_host=None,
 | |
|                  _live_remote_cap_interface=None,
 | |
|                  _debug_on=False
 | |
|                  ):
 | |
|         super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on)
 | |
|         self.host = host,
 | |
|         self.port = port
 | |
|         self.debug = _debug_on
 | |
|         self.pcap = None
 | |
|         self.live_pcap = None
 | |
|         self.remote_pcap = None
 | |
|         self.pcap_file = _read_pcap_file
 | |
|         self.apply_filter = _apply_filter
 | |
|         self.live_filter = _live_filter
 | |
|         self.live_pcap_interface = _live_pcap_interface
 | |
|         self.live_cap_timeout = _live_cap_timeout
 | |
|         self.remote_cap_host = _live_remote_cap_host
 | |
|         self.remote_cap_interface = _live_remote_cap_interface
 | |
|         self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug)
 | |
| 
 | |
|     def read_pcap(self, pcap_file, apply_filter=None):
 | |
|         self.pcap_file = pcap_file
 | |
|         if apply_filter is not None:
 | |
|             self.apply_filter = apply_filter
 | |
|         try:
 | |
|             self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter)
 | |
|         except Exception as error:
 | |
|             raise error
 | |
|         return self.pcap
 | |
| 
 | |
|     def capture_live_pcap(self):
 | |
|         try:
 | |
|             self.live_pcap = ps.LiveCapture(interface=self.live_pcap_interface, output_file='captured.pcap')
 | |
|             self.live_pcap.sniff(timeout=300)
 | |
|         except ValueError:
 | |
|             raise "Capture Error"
 | |
|         return self.live_pcap
 | |
| 
 | |
|     def capture_remote_pcap(self):
 | |
|         try:
 | |
|             self.remote_pcap = ps.RemoteCapture(remote_host=self.remote_cap_host,
 | |
|                                                 remote_interface=self.remote_cap_interface)
 | |
|         except ValueError:
 | |
|             raise "Host error"
 | |
|         return self.remote_pcap
 | |
| 
 | |
|     def check_group_id_mgmt(self, pcap_file):
 | |
|         print("pcap file path:  %s" % pcap_file)
 | |
|         try:
 | |
|             if pcap_file is not None:
 | |
|                 print("Checking for Group ID Management Actions Frame...")
 | |
|                 cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management')
 | |
|                 packet_count = 0
 | |
|                 for pkt in cap:
 | |
|                     if 'wlan.mgt' in pkt:
 | |
|                         value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
 | |
|                         if value is not None:
 | |
|                             print(value)
 | |
|                             packet_count += 1
 | |
|                 print(packet_count)
 | |
|                 if packet_count >= 1:
 | |
|                     return True
 | |
|                 else:
 | |
|                     return False
 | |
|         except ValueError:
 | |
|             raise "pcap file is required"
 | |
| 
 | |
|     def check_beamformer_association_request(self, pcap_file):
 | |
|         try:
 | |
|             if pcap_file is not None:
 | |
|                 cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 &&  '
 | |
|                                                                        'wlan.fc.type_subtype==0x000')
 | |
|                 packet_count = 0
 | |
|                 for pkt in cap:
 | |
|                     if 'wlan.mgt' in pkt:
 | |
|                         value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
 | |
|                         if value is not None:
 | |
|                             print(value)
 | |
|                             packet_count += 1
 | |
|                 print(packet_count)
 | |
|                 if packet_count >= 1:
 | |
|                     return True
 | |
|                 else:
 | |
|                     return False
 | |
|         except ValueError:
 | |
|             raise "pcap file is required"
 | |
| 
 | |
|     def check_beamformer_association_response(self, pcap_file):
 | |
|         try:
 | |
|             if pcap_file is not None:
 | |
|                 cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 &&  '
 | |
|                                                                        'wlan.fc.type_subtype==0x001')
 | |
|                 packet_count = 0
 | |
|                 for pkt in cap:
 | |
|                     if 'wlan.mgt' in pkt:
 | |
|                         value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
 | |
|                         if value is not None:
 | |
|                             print(value)
 | |
|                             packet_count += 1
 | |
|                 if packet_count >= 1:
 | |
|                     return True
 | |
|                 else:
 | |
|                     return False
 | |
|         except ValueError:
 | |
|             raise "pcap file is required"
 | |
| 
 | |
|     def check_beamformer_report_poll(self, pcap_file):
 | |
|         try:
 | |
|             if pcap_file is not None:
 | |
|                 cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 0x0014')
 | |
|                 packet_count = 0
 | |
|                 for pkt in cap:
 | |
|                     packet_count += 1
 | |
|                 if packet_count >= 1:
 | |
|                     return True
 | |
|                 else:
 | |
|                     return False
 | |
|         except ValueError:
 | |
|             raise "pcap file is required."
 | |
| 
 | |
|     def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180):
 | |
|         pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap"
 | |
|         self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0")
 | |
|         self.wifi_monitor.start_sniff(capname=pcap_name, duration_sec=sniff_duration)
 | |
|         self.wifi_monitor.cleanup()
 | |
|         return pcap_name
 | |
| 
 | |
|     def move_pcap(self, current_path, updated_path):
 | |
|         lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge",
 | |
|                      report_location=current_path,
 | |
|                      report_dir=updated_path)
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     parser = argparse.ArgumentParser(
 | |
|         prog='lf_pcap.py',
 | |
|         formatter_class=argparse.RawTextHelpFormatter,
 | |
|         epilog='Common Library for reading pcap files and check packet information for specific filters',
 | |
|         description='''\
 | |
|     """
 | |
| -----------------------
 | |
| NAME: lf_pcap.py
 | |
| 
 | |
| PURPOSE:
 | |
| Common Library for reading pcap files and check packet information for specific filters
 | |
| 
 | |
| SETUP:
 | |
| This script requires pyshark to be installed before,you can install it by running "pip install pyshark"
 | |
| 
 | |
| EXAMPLE:
 | |
| see: /py-scritps/lf_pcap_test.py 
 | |
| ---------------------
 | |
| ''')
 | |
|     parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True)
 | |
|     parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None)
 | |
|     args = parser.parse_args()
 | |
|     pcap_obj = LfPcap(
 | |
|         _read_pcap_file=args.pcap_file,
 | |
|         _apply_filter=args.apply_filter,
 | |
|         _live_filter=None,
 | |
|         _live_pcap_interface=None,
 | |
|         _live_remote_cap_host=None,
 | |
|         _live_cap_timeout=None,
 | |
|         _live_remote_cap_interface=None
 | |
|     )
 | |
|     test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file)
 | |
|     print(test)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | 
