diff --git a/py-scripts/lf_pcap.py b/py-scripts/lf_pcap.py index 0e213cf7..f78237c1 100644 --- a/py-scripts/lf_pcap.py +++ b/py-scripts/lf_pcap.py @@ -19,6 +19,8 @@ INCLUDE_IN_README import os import sys import argparse +import time + import pyshark as ps import importlib from datetime import datetime @@ -45,9 +47,11 @@ class LfPcap(Realm): _live_filter=None, _live_remote_cap_host=None, _live_remote_cap_interface=None, - _debug_on=False + _debug_on=False, + _pcap_name=None ): super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) + self.pcap_name = _pcap_name self.host = host, self.port = port self.debug = _debug_on @@ -68,7 +72,7 @@ class LfPcap(Realm): if apply_filter is not None: self.apply_filter = apply_filter try: - self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter) + self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter, use_json=False) except Exception as error: raise error return self.pcap @@ -89,6 +93,51 @@ class LfPcap(Realm): raise "Host error" return self.remote_pcap + def get_packet_info(self, pcap_file): + """get packet info from each packet from the pcap file""" + print("pcap file path: %s" % pcap_file) + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55') + packet_count = 0 + data = [] + for pkt in cap: + data.append(pkt) + packet_count += 1 + print("Total Packets: ", packet_count) + print(data) + if packet_count != 0: + return data + else: + return data + except ValueError: + raise "pcap file is required" + + def get_wlan_mgt_status_code(self, pcap_file): + """ To get status code of each packet in WLAN MGT Layer """ + print("pcap file path: %s" % pcap_file) + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55') + packet_count = 0 + value, data = '', [] + for pkt in cap: + # print(pkt) + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_fixed_status_code') + if value == '0x0000': + data.append('Successful') + else: + data.append('failed') + packet_count += 1 + print("Total Packets: ", packet_count) + if packet_count != 0: + return data + else: + return data + except ValueError: + raise "pcap file is required" + def check_group_id_mgmt(self, pcap_file): print("pcap file path: %s" % pcap_file) try: @@ -96,37 +145,47 @@ class LfPcap(Realm): print("Checking for Group ID Management Actions Frame...") cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management') packet_count = 0 + value = "Frame Not Found" for pkt in cap: if 'wlan.mgt' in pkt: value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') if value is not None: - print(value) + print("Group ID Management: ", value) packet_count += 1 - print(packet_count) + if packet_count == 1: + break if packet_count >= 1: - return True + return f"Wireless Management - Group ID Management: {value}" else: - return False + return f"Wireless Management - Group ID Management: {value}" except ValueError: raise "pcap file is required" - def check_beamformer_association_request(self, pcap_file): + def check_beamformee_association_request(self, pcap_file): + print("pcap file path: %s" % pcap_file) try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformee == 1 && ' 'wlan.fc.type_subtype == 0') packet_count = 0 + value = "Frame Not Found" for pkt in cap: if 'wlan.mgt' in pkt: - value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformee') if value is not None: print(value) packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break print(packet_count) if packet_count >= 1: - return True + return f"Association Request - MU Beamformee Capable: {value}" else: - return False + return f"Association Request - MU Beamformee Capable: {value}" except ValueError: raise "pcap file is required" @@ -136,86 +195,226 @@ class LfPcap(Realm): cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' 'wlan.fc.type_subtype == 1') packet_count = 0 + value = "Frame Not Found" for pkt in cap: if 'wlan.mgt' in pkt: - value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') if value is not None: print(value) packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break if packet_count >= 1: - return True + return f"Association Response - MU Beamformer Capable : {value}" else: - return False + return f"Association Response - MU Beamformer Capable: {value}" except ValueError: raise "pcap file is required" + def check_beamformer_beacon_frame(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 8') + packet_count = 0 + value = "Frame Not Found" + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break + if packet_count >= 1: + return f"Beacon Frame - MU Beamformer Capable: {value}" + else: + return f"Beacon Frame - MU Beamformer Capable: {value}" + except ValueError: + raise "pcap file is required." + def check_beamformer_report_poll(self, pcap_file): try: if pcap_file is not None: cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20') packet_count = 0 + value = "Frame Not Found" for pkt in cap: - packet_count += 1 + if 'wlan' in pkt: + value = pkt['wlan'].get_field_value('fc_type_subtype') + if value is not None: + print(value) + packet_count += 1 + if packet_count == 1: + break if packet_count >= 1: - return True + return f"Beamforming Report Poll : {value}" else: - return False + return f"Beamforming Report Poll : {value}" except ValueError: raise "pcap file is required." - def check_he_capability(self, pcap_file): + def check_he_capability_beacon_frame(self, pcap_file): try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format') + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 8') packet_count = 0 + value = "Frame Not Found" for pkt in cap: - packet_count += 1 + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break if packet_count >= 1: - return True + return f"Beacon Frame - HE Capable: {value}" else: - return False + return f"Beacon Frame - HE Capable: {value}" except ValueError: raise "pcap file is required." - def check_probe_request(self, pcap_file): + def check_he_capability_probe_request(self, pcap_file): try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 4') + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 4') packet_count = 0 + value = "Frame Not Found" for pkt in cap: - packet_count += 1 + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break if packet_count >= 1: - return True + return f"Probe Request - HE Capability : {value}" else: - return False + return f"Probe Request - HE Capability : {value}" except ValueError: raise "pcap file is required." - def check_probe_response(self, pcap_file): + def check_he_capability_probe_response(self, pcap_file): try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 5') + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 5') packet_count = 0 + value = "Frame Not Found" for pkt in cap: - packet_count += 1 + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break if packet_count >= 1: - return True + return f"Probe Response - HE Capability : {value}" else: - return False + return f"Probe Response - HE Capability : {value}" except ValueError: raise "pcap file is required." + def check_he_capability_association_request(self, pcap_file): + print("pcap file path: %s" % pcap_file) + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 0') + packet_count = 0 + value = "Frame Not Found" + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformee') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break + print(packet_count) + if packet_count >= 1: + return f"Association Request - HE Capability : {value}" + else: + return f"Association Request - HE Capability : {value}" + except ValueError: + raise "pcap file is required" + + def check_he_capability_association_response(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 1') + packet_count = 0 + value = "Frame Not Found" + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break + if packet_count >= 1: + return f"Association Response - HE Capability : {value}" + else: + return f"Association Response - HE Capability : {value}" + except ValueError: + raise "pcap file is required" + def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180): - pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" + if test_name is not None: + self.pcap_name = test_name + ".pcap" + else: + self.pcap_name = "capture" + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" + print('----------pcap name----------: \n', self.pcap_name) self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0") - self.wifi_monitor.start_sniff(capname=pcap_name, duration_sec=sniff_duration) + self.wifi_monitor.start_sniff(capname=self.pcap_name, duration_sec=sniff_duration) + for i in range(int(sniff_duration)): + time.sleep(1) self.wifi_monitor.cleanup() - return pcap_name + return self.pcap_name - def move_pcap(self, current_path, updated_path): - lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", - report_location=current_path, - report_dir=updated_path) + def move_pcap(self, current_path="/home/lanforge/", pcap_name=None): + if current_path is None: + current_path = "/home/lanforge/" + if pcap_name is None: + pcap_name = self.pcap_name + print('...............Moving pcap to directory............\n', current_path + pcap_name) + print('++++++', os.getcwd()) + if pcap_name is not None: + if os.path.exists(current_path+pcap_name): + lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", report_location=current_path + pcap_name, report_dir=".") + else: + raise FileNotFoundError + else: + raise ValueError("pcap_name is Required!") def main(): @@ -235,13 +434,15 @@ SETUP: This script requires pyshark to be installed before,you can install it by running "pip install pyshark" EXAMPLE: -see: /py-scritps/lf_pcap_test.py +see: /py-scritps/lf_pcap.py --------------------- ''') parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True) parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None) args = parser.parse_args() pcap_obj = LfPcap( + host="192.168.200.229", + port=8080, _read_pcap_file=args.pcap_file, _apply_filter=args.apply_filter, _live_filter=None, @@ -250,8 +451,13 @@ see: /py-scritps/lf_pcap_test.py _live_cap_timeout=None, _live_remote_cap_interface=None ) - test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) - print(test) + # pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) + # pcap_obj.check_beamformer_association_request(pcap_file=pcap_obj.pcap_file) + # pcap_obj.check_beamformer_association_response(pcap_file=pcap_obj.pcap_file) + # pcap_obj.check_beamformer_beacon_frame(pcap_file=pcap_obj.pcap_file) + # pcap_obj.get_wlan_mgt_status_code(pcap_file=pcap_obj.pcap_file) + # pcap_obj.get_packet_info(pcap_obj.pcap_file) + pcap_obj.check_he_capability_beacon_frame(pcap_file=pcap_obj.pcap_file) if __name__ == "__main__":