wlan mgt status code and get packet info methods were added

Signed-off-by: anil-tegala <anil.tegala@candelatech.com>
This commit is contained in:
anil-tegala
2022-03-24 19:53:12 +05:30
parent 575d9facef
commit 7cc2b43354

View File

@@ -47,7 +47,7 @@ class LfPcap(Realm):
_live_remote_cap_interface=None, _live_remote_cap_interface=None,
_debug_on=False _debug_on=False
): ):
super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) # super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on)
self.host = host, self.host = host,
self.port = port self.port = port
self.debug = _debug_on self.debug = _debug_on
@@ -61,14 +61,14 @@ class LfPcap(Realm):
self.live_cap_timeout = _live_cap_timeout self.live_cap_timeout = _live_cap_timeout
self.remote_cap_host = _live_remote_cap_host self.remote_cap_host = _live_remote_cap_host
self.remote_cap_interface = _live_remote_cap_interface self.remote_cap_interface = _live_remote_cap_interface
self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug) # self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug)
def read_pcap(self, pcap_file, apply_filter=None): def read_pcap(self, pcap_file, apply_filter=None):
self.pcap_file = pcap_file self.pcap_file = pcap_file
if apply_filter is not None: if apply_filter is not None:
self.apply_filter = apply_filter self.apply_filter = apply_filter
try: try:
self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter) self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter, use_json=False)
except Exception as error: except Exception as error:
raise error raise error
return self.pcap return self.pcap
@@ -89,6 +89,51 @@ class LfPcap(Realm):
raise "Host error" raise "Host error"
return self.remote_pcap return self.remote_pcap
def get_packet_info(self, pcap_file):
"""get packet info from each packet from the pcap file"""
print("pcap file path: %s" % pcap_file)
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55')
packet_count = 0
data = []
for pkt in cap:
data.append(pkt)
packet_count += 1
print("Total Packets: ", packet_count)
print(data)
if packet_count != 0:
return data
else:
return data
except ValueError:
raise "pcap file is required"
def get_wlan_mgt_status_code(self, pcap_file):
""" To get status code of each packet in WLAN MGT Layer """
print("pcap file path: %s" % pcap_file)
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55')
packet_count = 0
value, data = '', []
for pkt in cap:
# print(pkt)
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_fixed_status_code')
if value == '0x0000':
data.append('Successful')
else:
data.append('failed')
packet_count += 1
print("Total Packets: ", packet_count)
if packet_count != 0:
return data
else:
return data
except ValueError:
raise "pcap file is required"
def check_group_id_mgmt(self, pcap_file): def check_group_id_mgmt(self, pcap_file):
print("pcap file path: %s" % pcap_file) print("pcap file path: %s" % pcap_file)
try: try:
@@ -96,37 +141,47 @@ class LfPcap(Realm):
print("Checking for Group ID Management Actions Frame...") print("Checking for Group ID Management Actions Frame...")
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management') cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt: if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
if value is not None: if value is not None:
print(value) print("Group ID Management: ", value)
packet_count += 1 packet_count += 1
print(packet_count) if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return {"Wireless Management - Group ID Management": str(value)}
else: else:
return False return {"Wireless Management - Group ID Management": str(value)}
except ValueError: except ValueError:
raise "pcap file is required" raise "pcap file is required"
def check_beamformer_association_request(self, pcap_file): def check_beamformee_association_request(self, pcap_file):
print("pcap file path: %s" % pcap_file)
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformee == 1 && '
'wlan.fc.type_subtype == 0') 'wlan.fc.type_subtype == 0')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt: if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformee')
if value is not None: if value is not None:
print(value) print(value)
packet_count += 1 packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
print(packet_count) print(packet_count)
if packet_count >= 1: if packet_count >= 1:
return True return {"Association Request - MU Beamformee Capable": value}
else: else:
return False return {"Association Request - MU Beamformee Capable": value}
except ValueError: except ValueError:
raise "pcap file is required" raise "pcap file is required"
@@ -136,30 +191,69 @@ class LfPcap(Realm):
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && '
'wlan.fc.type_subtype == 1') 'wlan.fc.type_subtype == 1')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt: if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None: if value is not None:
print(value) print(value)
packet_count += 1 packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return {"Association Response -MU Beamformer Capable": value}
else: else:
return False return {"Association Response -MU Beamformer Capable": value}
except ValueError: except ValueError:
raise "pcap file is required" raise "pcap file is required"
def check_beamformer_beacon_frame(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 8')
packet_count = 0
value = "Frame Not Found"
for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None:
print(value)
packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1:
return {"Beacon Frame - MU Beamformer Capable": value}
else:
return {"Beacon Frame - MU Beamformer Capable": value}
except ValueError:
raise "pcap file is required."
def check_beamformer_report_poll(self, pcap_file): def check_beamformer_report_poll(self, pcap_file):
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20') cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan' in pkt:
value = pkt['wlan'].get_field_value('fc_type_subtype')
if value is not None:
print(value)
packet_count += 1 packet_count += 1
if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return {"Beamforming Report Poll ": value}
else: else:
return False return {"Beamforming Report Poll ": value}
except ValueError: except ValueError:
raise "pcap file is required." raise "pcap file is required."
@@ -235,13 +329,15 @@ SETUP:
This script requires pyshark to be installed before,you can install it by running "pip install pyshark" This script requires pyshark to be installed before,you can install it by running "pip install pyshark"
EXAMPLE: EXAMPLE:
see: /py-scritps/lf_pcap_test.py see: /py-scritps/lf_pcap.py
--------------------- ---------------------
''') ''')
parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True) parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True)
parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None) parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None)
args = parser.parse_args() args = parser.parse_args()
pcap_obj = LfPcap( pcap_obj = LfPcap(
host="192.168.200.229",
port=8080,
_read_pcap_file=args.pcap_file, _read_pcap_file=args.pcap_file,
_apply_filter=args.apply_filter, _apply_filter=args.apply_filter,
_live_filter=None, _live_filter=None,
@@ -250,8 +346,12 @@ see: /py-scritps/lf_pcap_test.py
_live_cap_timeout=None, _live_cap_timeout=None,
_live_remote_cap_interface=None _live_remote_cap_interface=None
) )
test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) # pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file)
print(test) # pcap_obj.check_beamformer_association_request(pcap_file=pcap_obj.pcap_file)
# pcap_obj.check_beamformer_association_response(pcap_file=pcap_obj.pcap_file)
# pcap_obj.check_beamformer_beacon_frame(pcap_file=pcap_obj.pcap_file)
# pcap_obj.get_wlan_mgt_status_code(pcap_file=pcap_obj.pcap_file)
# pcap_obj.get_packet_info(pcap_obj.pcap_file)
if __name__ == "__main__": if __name__ == "__main__":