From 7cc2b43354b2fda0715748b23f5b06a2e059262d Mon Sep 17 00:00:00 2001 From: anil-tegala Date: Thu, 24 Mar 2022 19:53:12 +0530 Subject: [PATCH 1/4] wlan mgt status code and get packet info methods were added Signed-off-by: anil-tegala --- py-scripts/lf_pcap.py | 142 +++++++++++++++++++++++++++++++++++------- 1 file changed, 121 insertions(+), 21 deletions(-) diff --git a/py-scripts/lf_pcap.py b/py-scripts/lf_pcap.py index 0e213cf7..9365d03d 100644 --- a/py-scripts/lf_pcap.py +++ b/py-scripts/lf_pcap.py @@ -47,7 +47,7 @@ class LfPcap(Realm): _live_remote_cap_interface=None, _debug_on=False ): - super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) + # super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) self.host = host, self.port = port self.debug = _debug_on @@ -61,14 +61,14 @@ class LfPcap(Realm): self.live_cap_timeout = _live_cap_timeout self.remote_cap_host = _live_remote_cap_host self.remote_cap_interface = _live_remote_cap_interface - self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug) + # self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug) def read_pcap(self, pcap_file, apply_filter=None): self.pcap_file = pcap_file if apply_filter is not None: self.apply_filter = apply_filter try: - self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter) + self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter, use_json=False) except Exception as error: raise error return self.pcap @@ -89,6 +89,51 @@ class LfPcap(Realm): raise "Host error" return self.remote_pcap + def get_packet_info(self, pcap_file): + """get packet info from each packet from the pcap file""" + print("pcap file path: %s" % pcap_file) + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55') + packet_count = 0 + data = [] + for pkt in cap: + data.append(pkt) + packet_count += 1 + print("Total Packets: ", packet_count) + print(data) + if packet_count != 0: + return data + else: + return data + except ValueError: + raise "pcap file is required" + + def get_wlan_mgt_status_code(self, pcap_file): + """ To get status code of each packet in WLAN MGT Layer """ + print("pcap file path: %s" % pcap_file) + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55') + packet_count = 0 + value, data = '', [] + for pkt in cap: + # print(pkt) + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_fixed_status_code') + if value == '0x0000': + data.append('Successful') + else: + data.append('failed') + packet_count += 1 + print("Total Packets: ", packet_count) + if packet_count != 0: + return data + else: + return data + except ValueError: + raise "pcap file is required" + def check_group_id_mgmt(self, pcap_file): print("pcap file path: %s" % pcap_file) try: @@ -96,37 +141,47 @@ class LfPcap(Realm): print("Checking for Group ID Management Actions Frame...") cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management') packet_count = 0 + value = "Frame Not Found" for pkt in cap: if 'wlan.mgt' in pkt: value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') if value is not None: - print(value) + print("Group ID Management: ", value) packet_count += 1 - print(packet_count) + if packet_count == 1: + break if packet_count >= 1: - return True + return {"Wireless Management - Group ID Management": str(value)} else: - return False + return {"Wireless Management - Group ID Management": str(value)} except ValueError: raise "pcap file is required" - def check_beamformer_association_request(self, pcap_file): + def check_beamformee_association_request(self, pcap_file): + print("pcap file path: %s" % pcap_file) try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformee == 1 && ' 'wlan.fc.type_subtype == 0') packet_count = 0 + value = "Frame Not Found" for pkt in cap: if 'wlan.mgt' in pkt: - value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformee') if value is not None: print(value) packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break print(packet_count) if packet_count >= 1: - return True + return {"Association Request - MU Beamformee Capable": value} else: - return False + return {"Association Request - MU Beamformee Capable": value} except ValueError: raise "pcap file is required" @@ -136,30 +191,69 @@ class LfPcap(Realm): cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' 'wlan.fc.type_subtype == 1') packet_count = 0 + value = "Frame Not Found" for pkt in cap: if 'wlan.mgt' in pkt: - value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') if value is not None: print(value) packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break if packet_count >= 1: - return True + return {"Association Response -MU Beamformer Capable": value} else: - return False + return {"Association Response -MU Beamformer Capable": value} except ValueError: raise "pcap file is required" + def check_beamformer_beacon_frame(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 8') + packet_count = 0 + value = "Frame Not Found" + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break + if packet_count >= 1: + return {"Beacon Frame - MU Beamformer Capable": value} + else: + return {"Beacon Frame - MU Beamformer Capable": value} + except ValueError: + raise "pcap file is required." + def check_beamformer_report_poll(self, pcap_file): try: if pcap_file is not None: cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20') packet_count = 0 + value = "Frame Not Found" for pkt in cap: - packet_count += 1 + if 'wlan' in pkt: + value = pkt['wlan'].get_field_value('fc_type_subtype') + if value is not None: + print(value) + packet_count += 1 + if packet_count == 1: + break if packet_count >= 1: - return True + return {"Beamforming Report Poll ": value} else: - return False + return {"Beamforming Report Poll ": value} except ValueError: raise "pcap file is required." @@ -235,13 +329,15 @@ SETUP: This script requires pyshark to be installed before,you can install it by running "pip install pyshark" EXAMPLE: -see: /py-scritps/lf_pcap_test.py +see: /py-scritps/lf_pcap.py --------------------- ''') parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True) parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None) args = parser.parse_args() pcap_obj = LfPcap( + host="192.168.200.229", + port=8080, _read_pcap_file=args.pcap_file, _apply_filter=args.apply_filter, _live_filter=None, @@ -250,8 +346,12 @@ see: /py-scritps/lf_pcap_test.py _live_cap_timeout=None, _live_remote_cap_interface=None ) - test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) - print(test) + # pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) + # pcap_obj.check_beamformer_association_request(pcap_file=pcap_obj.pcap_file) + # pcap_obj.check_beamformer_association_response(pcap_file=pcap_obj.pcap_file) + # pcap_obj.check_beamformer_beacon_frame(pcap_file=pcap_obj.pcap_file) + # pcap_obj.get_wlan_mgt_status_code(pcap_file=pcap_obj.pcap_file) + # pcap_obj.get_packet_info(pcap_obj.pcap_file) if __name__ == "__main__": From 242787c16347c45a36821261951df2a10f0015f8 Mon Sep 17 00:00:00 2001 From: anil-tegala Date: Tue, 29 Mar 2022 21:13:13 +0530 Subject: [PATCH 2/4] packet dissection method return values modified Signed-off-by: anil-tegala --- py-scripts/lf_pcap.py | 60 +++++++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/py-scripts/lf_pcap.py b/py-scripts/lf_pcap.py index 9365d03d..17e0af2f 100644 --- a/py-scripts/lf_pcap.py +++ b/py-scripts/lf_pcap.py @@ -19,6 +19,8 @@ INCLUDE_IN_README import os import sys import argparse +import time + import pyshark as ps import importlib from datetime import datetime @@ -45,9 +47,11 @@ class LfPcap(Realm): _live_filter=None, _live_remote_cap_host=None, _live_remote_cap_interface=None, - _debug_on=False + _debug_on=False, + _pcap_name=None ): # super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) + self.pcap_name = _pcap_name self.host = host, self.port = port self.debug = _debug_on @@ -179,9 +183,9 @@ class LfPcap(Realm): break print(packet_count) if packet_count >= 1: - return {"Association Request - MU Beamformee Capable": value} + return f"Association Request - MU Beamformee Capable: {value}" else: - return {"Association Request - MU Beamformee Capable": value} + return f"Association Request - MU Beamformee Capable: {value}" except ValueError: raise "pcap file is required" @@ -205,9 +209,9 @@ class LfPcap(Realm): if packet_count == 1: break if packet_count >= 1: - return {"Association Response -MU Beamformer Capable": value} + return f"Association Response - MU Beamformer Capable : {value}" else: - return {"Association Response -MU Beamformer Capable": value} + return f"Association Response - MU Beamformer Capable: {value}" except ValueError: raise "pcap file is required" @@ -230,9 +234,9 @@ class LfPcap(Realm): if packet_count == 1: break if packet_count >= 1: - return {"Beacon Frame - MU Beamformer Capable": value} + return f"Beacon Frame - MU Beamformer Capable: {value}" else: - return {"Beacon Frame - MU Beamformer Capable": value} + return f"Beacon Frame - MU Beamformer Capable: {value}" except ValueError: raise "pcap file is required." @@ -251,16 +255,16 @@ class LfPcap(Realm): if packet_count == 1: break if packet_count >= 1: - return {"Beamforming Report Poll ": value} + return f"Beamforming Report Poll : {value}" else: - return {"Beamforming Report Poll ": value} + return f"Beamforming Report Poll : {value}" except ValueError: raise "pcap file is required." - def check_he_capability(self, pcap_file): + def check_he_capability_beacon_frame(self, pcap_file): try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format') + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 8') packet_count = 0 for pkt in cap: packet_count += 1 @@ -271,10 +275,10 @@ class LfPcap(Realm): except ValueError: raise "pcap file is required." - def check_probe_request(self, pcap_file): + def check_he_capability_probe_request(self, pcap_file): try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 4') + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 4') packet_count = 0 for pkt in cap: packet_count += 1 @@ -285,10 +289,10 @@ class LfPcap(Realm): except ValueError: raise "pcap file is required." - def check_probe_response(self, pcap_file): + def check_he_capability_probe_response(self, pcap_file): try: if pcap_file is not None: - cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 5') + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 5') packet_count = 0 for pkt in cap: packet_count += 1 @@ -300,16 +304,28 @@ class LfPcap(Realm): raise "pcap file is required." def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180): - pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" + if test_name is not None: + self.pcap_name = test_name + ".pcap" + else: + self.pcap_name = "capture" + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" + print('----------pcap name----------: ', self.pcap_name) self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0") - self.wifi_monitor.start_sniff(capname=pcap_name, duration_sec=sniff_duration) + self.wifi_monitor.start_sniff(capname=self.pcap_name, duration_sec=sniff_duration) + for i in range(int(sniff_duration)): + time.sleep(1) self.wifi_monitor.cleanup() - return pcap_name + return self.pcap_name - def move_pcap(self, current_path, updated_path): - lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", - report_location=current_path, - report_dir=updated_path) + def move_pcap(self, current_path="/home/lanforge/", pcap_name=None): + if current_path is None: + current_path = "/home/lanforge/" + if pcap_name is None: + pcap_name = self.pcap_name + if pcap_name is not None: + print('...............Moving pcap to directory............', current_path + pcap_name) + lf_report.pull_reports(hostname=self.host, port=self.port, username="lanforge", password="lanforge", report_location=current_path + pcap_name, report_dir=".") + else: + raise ValueError("pcap_name is Required!") def main(): From 794faa060daf29fe32d47d6ffc4b4a92fefa33cb Mon Sep 17 00:00:00 2001 From: anil-tegala Date: Thu, 31 Mar 2022 00:09:52 +0530 Subject: [PATCH 3/4] uncommented wifi monitor functionality in lf_pcap Signed-off-by: anil-tegala --- py-scripts/lf_pcap.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/py-scripts/lf_pcap.py b/py-scripts/lf_pcap.py index 17e0af2f..e9b50673 100644 --- a/py-scripts/lf_pcap.py +++ b/py-scripts/lf_pcap.py @@ -50,7 +50,7 @@ class LfPcap(Realm): _debug_on=False, _pcap_name=None ): - # super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) + super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) self.pcap_name = _pcap_name self.host = host, self.port = port @@ -65,7 +65,7 @@ class LfPcap(Realm): self.live_cap_timeout = _live_cap_timeout self.remote_cap_host = _live_remote_cap_host self.remote_cap_interface = _live_remote_cap_interface - # self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug) + self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug) def read_pcap(self, pcap_file, apply_filter=None): self.pcap_file = pcap_file @@ -308,7 +308,7 @@ class LfPcap(Realm): self.pcap_name = test_name + ".pcap" else: self.pcap_name = "capture" + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" - print('----------pcap name----------: ', self.pcap_name) + print('----------pcap name----------: \n', self.pcap_name) self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0") self.wifi_monitor.start_sniff(capname=self.pcap_name, duration_sec=sniff_duration) for i in range(int(sniff_duration)): @@ -322,8 +322,11 @@ class LfPcap(Realm): if pcap_name is None: pcap_name = self.pcap_name if pcap_name is not None: - print('...............Moving pcap to directory............', current_path + pcap_name) - lf_report.pull_reports(hostname=self.host, port=self.port, username="lanforge", password="lanforge", report_location=current_path + pcap_name, report_dir=".") + if os.path.exists(current_path+self.pcap_name): + print('...............Moving pcap to directory............\n', current_path + pcap_name) + lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", report_location=current_path + pcap_name, report_dir="./") + else: + raise FileNotFoundError else: raise ValueError("pcap_name is Required!") From a87a3ad2747f00cdf529f74eba5bf089217efa7a Mon Sep 17 00:00:00 2001 From: anil-tegala Date: Thu, 14 Apr 2022 17:25:15 +0530 Subject: [PATCH 4/4] methods to check HE capabilities are modified Signed-off-by: anil-tegala --- py-scripts/lf_pcap.py | 115 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 14 deletions(-) diff --git a/py-scripts/lf_pcap.py b/py-scripts/lf_pcap.py index e9b50673..f78237c1 100644 --- a/py-scripts/lf_pcap.py +++ b/py-scripts/lf_pcap.py @@ -155,9 +155,9 @@ class LfPcap(Realm): if packet_count == 1: break if packet_count >= 1: - return {"Wireless Management - Group ID Management": str(value)} + return f"Wireless Management - Group ID Management: {value}" else: - return {"Wireless Management - Group ID Management": str(value)} + return f"Wireless Management - Group ID Management: {value}" except ValueError: raise "pcap file is required" @@ -266,12 +266,23 @@ class LfPcap(Realm): if pcap_file is not None: cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 8') packet_count = 0 + value = "Frame Not Found" for pkt in cap: - packet_count += 1 + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break if packet_count >= 1: - return True + return f"Beacon Frame - HE Capable: {value}" else: - return False + return f"Beacon Frame - HE Capable: {value}" except ValueError: raise "pcap file is required." @@ -280,12 +291,23 @@ class LfPcap(Realm): if pcap_file is not None: cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 4') packet_count = 0 + value = "Frame Not Found" for pkt in cap: - packet_count += 1 + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break if packet_count >= 1: - return True + return f"Probe Request - HE Capability : {value}" else: - return False + return f"Probe Request - HE Capability : {value}" except ValueError: raise "pcap file is required." @@ -294,15 +316,78 @@ class LfPcap(Realm): if pcap_file is not None: cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 5') packet_count = 0 + value = "Frame Not Found" for pkt in cap: - packet_count += 1 + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break if packet_count >= 1: - return True + return f"Probe Response - HE Capability : {value}" else: - return False + return f"Probe Response - HE Capability : {value}" except ValueError: raise "pcap file is required." + def check_he_capability_association_request(self, pcap_file): + print("pcap file path: %s" % pcap_file) + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 0') + packet_count = 0 + value = "Frame Not Found" + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformee') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break + print(packet_count) + if packet_count >= 1: + return f"Association Request - HE Capability : {value}" + else: + return f"Association Request - HE Capability : {value}" + except ValueError: + raise "pcap file is required" + + def check_he_capability_association_response(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 1') + packet_count = 0 + value = "Frame Not Found" + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer') + if value is not None: + print(value) + packet_count += 1 + if value == 0: + value = "Not Supported" + if value == 1: + value = "Supported" + if packet_count == 1: + break + if packet_count >= 1: + return f"Association Response - HE Capability : {value}" + else: + return f"Association Response - HE Capability : {value}" + except ValueError: + raise "pcap file is required" + def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180): if test_name is not None: self.pcap_name = test_name + ".pcap" @@ -321,10 +406,11 @@ class LfPcap(Realm): current_path = "/home/lanforge/" if pcap_name is None: pcap_name = self.pcap_name + print('...............Moving pcap to directory............\n', current_path + pcap_name) + print('++++++', os.getcwd()) if pcap_name is not None: - if os.path.exists(current_path+self.pcap_name): - print('...............Moving pcap to directory............\n', current_path + pcap_name) - lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", report_location=current_path + pcap_name, report_dir="./") + if os.path.exists(current_path+pcap_name): + lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", report_location=current_path + pcap_name, report_dir=".") else: raise FileNotFoundError else: @@ -371,6 +457,7 @@ see: /py-scritps/lf_pcap.py # pcap_obj.check_beamformer_beacon_frame(pcap_file=pcap_obj.pcap_file) # pcap_obj.get_wlan_mgt_status_code(pcap_file=pcap_obj.pcap_file) # pcap_obj.get_packet_info(pcap_obj.pcap_file) + pcap_obj.check_he_capability_beacon_frame(pcap_file=pcap_obj.pcap_file) if __name__ == "__main__":