methods added to get status code and to get pcap info

This commit is contained in:
anil-tegala
2022-03-25 13:14:57 +05:30
committed by shivam
parent d5f1799cfd
commit 80e07bceb9

View File

@@ -16,20 +16,42 @@ COPYWRITE
INCLUDE_IN_README INCLUDE_IN_README
""" """
import os
import sys
import argparse import argparse
import pyshark as ps import pyshark as ps
import importlib
from datetime import datetime
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
wifi_monitor = importlib.import_module("py-json.wifi_monitor_profile")
WiFiMonitor = wifi_monitor.WifiMonitor
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
LFCliBase = lfcli_base.LFCliBase
realm = importlib.import_module("py-json.realm")
Realm = realm.Realm
cv_test_reports = importlib.import_module("py-json.cv_test_reports")
lf_report = cv_test_reports.lanforge_reports
class LfPcap: class LfPcap(Realm):
def __init__(self, def __init__(self,
host="localhost", port=8080,
_read_pcap_file=None, _read_pcap_file=None,
_apply_filter=None, _apply_filter=None,
_live_pcap_interface=None, _live_pcap_interface=None,
_live_cap_timeout=None, _live_cap_timeout=None,
_live_filter=None, _live_filter=None,
_live_remote_cap_host=None, _live_remote_cap_host=None,
_live_remote_cap_interface=None _live_remote_cap_interface=None,
_debug_on=False
): ):
# uncomment the follwoing line to use wifi monitor functions
# super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on)
self.host = host,
self.port = port
self.debug = _debug_on
self.pcap = None self.pcap = None
self.live_pcap = None self.live_pcap = None
self.remote_pcap = None self.remote_pcap = None
@@ -40,13 +62,15 @@ class LfPcap:
self.live_cap_timeout = _live_cap_timeout self.live_cap_timeout = _live_cap_timeout
self.remote_cap_host = _live_remote_cap_host self.remote_cap_host = _live_remote_cap_host
self.remote_cap_interface = _live_remote_cap_interface self.remote_cap_interface = _live_remote_cap_interface
# uncomment the follwoing line to use wifi monitor functions
# self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug)
def read_pcap(self, pcap_file, apply_filter=None): def read_pcap(self, pcap_file, apply_filter=None):
self.pcap_file = pcap_file self.pcap_file = pcap_file
if apply_filter is not None: if apply_filter is not None:
self.apply_filter = apply_filter self.apply_filter = apply_filter
try: try:
self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter) self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter, use_json=False)
except Exception as error: except Exception as error:
raise error raise error
return self.pcap return self.pcap
@@ -67,6 +91,51 @@ class LfPcap:
raise "Host error" raise "Host error"
return self.remote_pcap return self.remote_pcap
def get_packet_info(self, pcap_file):
"""get packet info from each packet from the pcap file"""
print("pcap file path: %s" % pcap_file)
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55')
packet_count = 0
data = []
for pkt in cap:
data.append(pkt)
packet_count += 1
print("Total Packets: ", packet_count)
print(data)
if packet_count != 0:
return data
else:
return data
except ValueError:
raise "pcap file is required"
def get_wlan_mgt_status_code(self, pcap_file):
""" To get status code of each packet in WLAN MGT Layer """
print("pcap file path: %s" % pcap_file)
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55')
packet_count = 0
value, data = '', []
for pkt in cap:
# print(pkt)
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_fixed_status_code')
if value == '0x0000':
data.append('Successful')
else:
data.append('failed')
packet_count += 1
print("Total Packets: ", packet_count)
if packet_count != 0:
return data
else:
return data
except ValueError:
raise "pcap file is required"
def check_group_id_mgmt(self, pcap_file): def check_group_id_mgmt(self, pcap_file):
print("pcap file path: %s" % pcap_file) print("pcap file path: %s" % pcap_file)
try: try:
@@ -74,37 +143,47 @@ class LfPcap:
print("Checking for Group ID Management Actions Frame...") print("Checking for Group ID Management Actions Frame...")
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management') cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt: if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
if value is not None: if value is not None:
print(value) print("Group ID Management: ", value)
packet_count += 1 packet_count += 1
print(packet_count) if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return {"Wireless Management - Group ID Management": str(value)}
else: else:
return False return {"Wireless Management - Group ID Management": str(value)}
except ValueError: except ValueError:
raise "pcap file is required" raise "pcap file is required"
def check_beamformer_association_request(self, pcap_file): def check_beamformee_association_request(self, pcap_file):
print("pcap file path: %s" % pcap_file)
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformee == 1 && '
'wlan.fc.type_subtype==0x000') 'wlan.fc.type_subtype == 0')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt: if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformee')
if value is not None: if value is not None:
print(value) print(value)
packet_count += 1 packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
print(packet_count) print(packet_count)
if packet_count >= 1: if packet_count >= 1:
return True return {"Association Request - MU Beamformee Capable": value}
else: else:
return False return {"Association Request - MU Beamformee Capable": value}
except ValueError: except ValueError:
raise "pcap file is required" raise "pcap file is required"
@@ -112,25 +191,78 @@ class LfPcap:
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && '
'wlan.fc.type_subtype==0x001') 'wlan.fc.type_subtype == 1')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt: if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None: if value is not None:
print(value) print(value)
packet_count += 1 packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return {"Association Response -MU Beamformer Capable": value}
else: else:
return False return {"Association Response -MU Beamformer Capable": value}
except ValueError: except ValueError:
raise "pcap file is required" raise "pcap file is required"
def check_beamformer_beacon_frame(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 8')
packet_count = 0
value = "Frame Not Found"
for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None:
print(value)
packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1:
return {"Beacon Frame - MU Beamformer Capable": value}
else:
return {"Beacon Frame - MU Beamformer Capable": value}
except ValueError:
raise "pcap file is required."
def check_beamformer_report_poll(self, pcap_file): def check_beamformer_report_poll(self, pcap_file):
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 0x0014') cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20')
packet_count = 0
value = "Frame Not Found"
for pkt in cap:
if 'wlan' in pkt:
value = pkt['wlan'].get_field_value('fc_type_subtype')
if value is not None:
print(value)
packet_count += 1
if packet_count == 1:
break
if packet_count >= 1:
return {"Beamforming Report Poll ": value}
else:
return {"Beamforming Report Poll ": value}
except ValueError:
raise "pcap file is required."
def check_he_capability(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format')
packet_count = 0 packet_count = 0
for pkt in cap: for pkt in cap:
packet_count += 1 packet_count += 1
@@ -141,6 +273,46 @@ class LfPcap:
except ValueError: except ValueError:
raise "pcap file is required." raise "pcap file is required."
def check_probe_request(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 4')
packet_count = 0
for pkt in cap:
packet_count += 1
if packet_count >= 1:
return True
else:
return False
except ValueError:
raise "pcap file is required."
def check_probe_response(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 5')
packet_count = 0
for pkt in cap:
packet_count += 1
if packet_count >= 1:
return True
else:
return False
except ValueError:
raise "pcap file is required."
def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180):
pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap"
self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0")
self.wifi_monitor.start_sniff(capname=pcap_name, duration_sec=sniff_duration)
self.wifi_monitor.cleanup()
return pcap_name
def move_pcap(self, current_path, updated_path):
lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge",
report_location=current_path,
report_dir=updated_path)
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@@ -159,13 +331,15 @@ SETUP:
This script requires pyshark to be installed before,you can install it by running "pip install pyshark" This script requires pyshark to be installed before,you can install it by running "pip install pyshark"
EXAMPLE: EXAMPLE:
see: /py-scritps/lf_pcap_test.py see: /py-scritps/lf_pcap.py
--------------------- ---------------------
''') ''')
parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True) parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True)
parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None) parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None)
args = parser.parse_args() args = parser.parse_args()
pcap_obj = LfPcap( pcap_obj = LfPcap(
host="192.168.200.229",
port=8080,
_read_pcap_file=args.pcap_file, _read_pcap_file=args.pcap_file,
_apply_filter=args.apply_filter, _apply_filter=args.apply_filter,
_live_filter=None, _live_filter=None,
@@ -174,8 +348,12 @@ see: /py-scritps/lf_pcap_test.py
_live_cap_timeout=None, _live_cap_timeout=None,
_live_remote_cap_interface=None _live_remote_cap_interface=None
) )
test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) # pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file)
print(test) # pcap_obj.check_beamformer_association_request(pcap_file=pcap_obj.pcap_file)
# pcap_obj.check_beamformer_association_response(pcap_file=pcap_obj.pcap_file)
# pcap_obj.check_beamformer_beacon_frame(pcap_file=pcap_obj.pcap_file)
# pcap_obj.get_wlan_mgt_status_code(pcap_file=pcap_obj.pcap_file)
# pcap_obj.get_packet_info(pcap_obj.pcap_file)
if __name__ == "__main__": if __name__ == "__main__":