diff --git a/py-scripts/lf_pcap.py b/py-scripts/lf_pcap.py index 75759518..6cad13a8 100644 --- a/py-scripts/lf_pcap.py +++ b/py-scripts/lf_pcap.py @@ -16,20 +16,42 @@ COPYWRITE INCLUDE_IN_README """ +import os +import sys import argparse import pyshark as ps +import importlib +from datetime import datetime + +sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) + +wifi_monitor = importlib.import_module("py-json.wifi_monitor_profile") +WiFiMonitor = wifi_monitor.WifiMonitor +lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base") +LFCliBase = lfcli_base.LFCliBase +realm = importlib.import_module("py-json.realm") +Realm = realm.Realm +cv_test_reports = importlib.import_module("py-json.cv_test_reports") +lf_report = cv_test_reports.lanforge_reports -class LfPcap: +class LfPcap(Realm): def __init__(self, + host="localhost", port=8080, _read_pcap_file=None, _apply_filter=None, _live_pcap_interface=None, _live_cap_timeout=None, _live_filter=None, _live_remote_cap_host=None, - _live_remote_cap_interface=None + _live_remote_cap_interface=None, + _debug_on=False ): + # uncomment the follwoing line to use wifi monitor functions + # super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) + self.host = host, + self.port = port + self.debug = _debug_on self.pcap = None self.live_pcap = None self.remote_pcap = None @@ -40,13 +62,15 @@ class LfPcap: self.live_cap_timeout = _live_cap_timeout self.remote_cap_host = _live_remote_cap_host self.remote_cap_interface = _live_remote_cap_interface + # uncomment the follwoing line to use wifi monitor functions + # self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug) def read_pcap(self, pcap_file, apply_filter=None): self.pcap_file = pcap_file if apply_filter is not None: self.apply_filter = apply_filter try: - self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter) + self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter, use_json=False) except Exception as error: raise error return self.pcap @@ -67,6 +91,51 @@ class LfPcap: raise "Host error" return self.remote_pcap + def get_packet_info(self, pcap_file): + """get packet info from each packet from the pcap file""" + print("pcap file path: %s" % pcap_file) + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55') + packet_count = 0 + data = [] + for pkt in cap: + data.append(pkt) + packet_count += 1 + print("Total Packets: ", packet_count) + print(data) + if packet_count != 0: + return data + else: + return data + except ValueError: + raise "pcap file is required" + + def get_wlan_mgt_status_code(self, pcap_file): + """ To get status code of each packet in WLAN MGT Layer """ + print("pcap file path: %s" % pcap_file) + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55') + packet_count = 0 + value, data = '', [] + for pkt in cap: + # print(pkt) + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_fixed_status_code') + if value == '0x0000': + data.append('Successful') + else: + data.append('failed') + packet_count += 1 + print("Total Packets: ", packet_count) + if packet_count != 0: + return data + else: + return data + except ValueError: + raise "pcap file is required" + def check_group_id_mgmt(self, pcap_file): print("pcap file path: %s" % pcap_file) try: @@ -74,37 +143,47 @@ class LfPcap: print("Checking for Group ID Management Actions Frame...") cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management') packet_count = 0 + value = "Frame Not Found" for pkt in cap: if 'wlan.mgt' in pkt: value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') if value is not None: - print(value) + print("Group ID Management: ", value) packet_count += 1 - print(packet_count) + if packet_count == 1: + break if packet_count >= 1: - return True + return {"Wireless Management - Group ID Management": str(value)} else: - return False + return {"Wireless Management - Group ID Management": str(value)} except ValueError: raise "pcap file is required" - def check_beamformer_association_request(self, pcap_file): + def check_beamformee_association_request(self, pcap_file): + print("pcap file path: %s" % pcap_file) try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' - 'wlan.fc.type_subtype==0x000') + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformee == 1 && ' + 'wlan.fc.type_subtype == 0') packet_count = 0 + value = "Frame Not Found" for pkt in cap: if 'wlan.mgt' in pkt: - value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformee') if value is not None: print(value) packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break print(packet_count) if packet_count >= 1: - return True + return {"Association Request - MU Beamformee Capable": value} else: - return False + return {"Association Request - MU Beamformee Capable": value} except ValueError: raise "pcap file is required" @@ -112,25 +191,78 @@ class LfPcap: try: if pcap_file is not None: cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' - 'wlan.fc.type_subtype==0x001') + 'wlan.fc.type_subtype == 1') packet_count = 0 + value = "Frame Not Found" for pkt in cap: if 'wlan.mgt' in pkt: - value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') if value is not None: print(value) packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break if packet_count >= 1: - return True + return {"Association Response -MU Beamformer Capable": value} else: - return False + return {"Association Response -MU Beamformer Capable": value} except ValueError: raise "pcap file is required" + def check_beamformer_beacon_frame(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 8') + packet_count = 0 + value = "Frame Not Found" + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break + if packet_count >= 1: + return {"Beacon Frame - MU Beamformer Capable": value} + else: + return {"Beacon Frame - MU Beamformer Capable": value} + except ValueError: + raise "pcap file is required." + def check_beamformer_report_poll(self, pcap_file): try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 0x0014') + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20') + packet_count = 0 + value = "Frame Not Found" + for pkt in cap: + if 'wlan' in pkt: + value = pkt['wlan'].get_field_value('fc_type_subtype') + if value is not None: + print(value) + packet_count += 1 + if packet_count == 1: + break + if packet_count >= 1: + return {"Beamforming Report Poll ": value} + else: + return {"Beamforming Report Poll ": value} + except ValueError: + raise "pcap file is required." + + def check_he_capability(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format') packet_count = 0 for pkt in cap: packet_count += 1 @@ -141,6 +273,46 @@ class LfPcap: except ValueError: raise "pcap file is required." + def check_probe_request(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 4') + packet_count = 0 + for pkt in cap: + packet_count += 1 + if packet_count >= 1: + return True + else: + return False + except ValueError: + raise "pcap file is required." + + def check_probe_response(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 5') + packet_count = 0 + for pkt in cap: + packet_count += 1 + if packet_count >= 1: + return True + else: + return False + except ValueError: + raise "pcap file is required." + + def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180): + pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" + self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0") + self.wifi_monitor.start_sniff(capname=pcap_name, duration_sec=sniff_duration) + self.wifi_monitor.cleanup() + return pcap_name + + def move_pcap(self, current_path, updated_path): + lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", + report_location=current_path, + report_dir=updated_path) + def main(): parser = argparse.ArgumentParser( @@ -159,13 +331,15 @@ SETUP: This script requires pyshark to be installed before,you can install it by running "pip install pyshark" EXAMPLE: -see: /py-scritps/lf_pcap_test.py +see: /py-scritps/lf_pcap.py --------------------- ''') parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True) parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None) args = parser.parse_args() pcap_obj = LfPcap( + host="192.168.200.229", + port=8080, _read_pcap_file=args.pcap_file, _apply_filter=args.apply_filter, _live_filter=None, @@ -174,8 +348,12 @@ see: /py-scritps/lf_pcap_test.py _live_cap_timeout=None, _live_remote_cap_interface=None ) - test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) - print(test) + # pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) + # pcap_obj.check_beamformer_association_request(pcap_file=pcap_obj.pcap_file) + # pcap_obj.check_beamformer_association_response(pcap_file=pcap_obj.pcap_file) + # pcap_obj.check_beamformer_beacon_frame(pcap_file=pcap_obj.pcap_file) + # pcap_obj.get_wlan_mgt_status_code(pcap_file=pcap_obj.pcap_file) + # pcap_obj.get_packet_info(pcap_obj.pcap_file) if __name__ == "__main__":