This commit is contained in:
jitendracandela
2022-04-16 17:21:55 +05:30

View File

@@ -19,6 +19,8 @@ INCLUDE_IN_README
import os import os
import sys import sys
import argparse import argparse
import time
import pyshark as ps import pyshark as ps
import importlib import importlib
from datetime import datetime from datetime import datetime
@@ -45,9 +47,11 @@ class LfPcap(Realm):
_live_filter=None, _live_filter=None,
_live_remote_cap_host=None, _live_remote_cap_host=None,
_live_remote_cap_interface=None, _live_remote_cap_interface=None,
_debug_on=False _debug_on=False,
_pcap_name=None
): ):
super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on)
self.pcap_name = _pcap_name
self.host = host, self.host = host,
self.port = port self.port = port
self.debug = _debug_on self.debug = _debug_on
@@ -68,7 +72,7 @@ class LfPcap(Realm):
if apply_filter is not None: if apply_filter is not None:
self.apply_filter = apply_filter self.apply_filter = apply_filter
try: try:
self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter) self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter, use_json=False)
except Exception as error: except Exception as error:
raise error raise error
return self.pcap return self.pcap
@@ -89,6 +93,51 @@ class LfPcap(Realm):
raise "Host error" raise "Host error"
return self.remote_pcap return self.remote_pcap
def get_packet_info(self, pcap_file):
"""get packet info from each packet from the pcap file"""
print("pcap file path: %s" % pcap_file)
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55')
packet_count = 0
data = []
for pkt in cap:
data.append(pkt)
packet_count += 1
print("Total Packets: ", packet_count)
print(data)
if packet_count != 0:
return data
else:
return data
except ValueError:
raise "pcap file is required"
def get_wlan_mgt_status_code(self, pcap_file):
""" To get status code of each packet in WLAN MGT Layer """
print("pcap file path: %s" % pcap_file)
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype==3 && wlan.tag.number==55')
packet_count = 0
value, data = '', []
for pkt in cap:
# print(pkt)
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_fixed_status_code')
if value == '0x0000':
data.append('Successful')
else:
data.append('failed')
packet_count += 1
print("Total Packets: ", packet_count)
if packet_count != 0:
return data
else:
return data
except ValueError:
raise "pcap file is required"
def check_group_id_mgmt(self, pcap_file): def check_group_id_mgmt(self, pcap_file):
print("pcap file path: %s" % pcap_file) print("pcap file path: %s" % pcap_file)
try: try:
@@ -96,37 +145,47 @@ class LfPcap(Realm):
print("Checking for Group ID Management Actions Frame...") print("Checking for Group ID Management Actions Frame...")
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management') cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt: if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
if value is not None: if value is not None:
print(value) print("Group ID Management: ", value)
packet_count += 1 packet_count += 1
print(packet_count) if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return f"Wireless Management - Group ID Management: {value}"
else: else:
return False return f"Wireless Management - Group ID Management: {value}"
except ValueError: except ValueError:
raise "pcap file is required" raise "pcap file is required"
def check_beamformer_association_request(self, pcap_file): def check_beamformee_association_request(self, pcap_file):
print("pcap file path: %s" % pcap_file)
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformee == 1 && '
'wlan.fc.type_subtype == 0') 'wlan.fc.type_subtype == 0')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt: if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformee')
if value is not None: if value is not None:
print(value) print(value)
packet_count += 1 packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
print(packet_count) print(packet_count)
if packet_count >= 1: if packet_count >= 1:
return True return f"Association Request - MU Beamformee Capable: {value}"
else: else:
return False return f"Association Request - MU Beamformee Capable: {value}"
except ValueError: except ValueError:
raise "pcap file is required" raise "pcap file is required"
@@ -136,86 +195,226 @@ class LfPcap(Realm):
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && '
'wlan.fc.type_subtype == 1') 'wlan.fc.type_subtype == 1')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt: if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None: if value is not None:
print(value) print(value)
packet_count += 1 packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return f"Association Response - MU Beamformer Capable : {value}"
else: else:
return False return f"Association Response - MU Beamformer Capable: {value}"
except ValueError: except ValueError:
raise "pcap file is required" raise "pcap file is required"
def check_beamformer_beacon_frame(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 8')
packet_count = 0
value = "Frame Not Found"
for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None:
print(value)
packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1:
return f"Beacon Frame - MU Beamformer Capable: {value}"
else:
return f"Beacon Frame - MU Beamformer Capable: {value}"
except ValueError:
raise "pcap file is required."
def check_beamformer_report_poll(self, pcap_file): def check_beamformer_report_poll(self, pcap_file):
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20') cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan' in pkt:
value = pkt['wlan'].get_field_value('fc_type_subtype')
if value is not None:
print(value)
packet_count += 1 packet_count += 1
if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return f"Beamforming Report Poll : {value}"
else: else:
return False return f"Beamforming Report Poll : {value}"
except ValueError: except ValueError:
raise "pcap file is required." raise "pcap file is required."
def check_he_capability(self, pcap_file): def check_he_capability_beacon_frame(self, pcap_file):
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format') cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 8')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None:
print(value)
packet_count += 1 packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return f"Beacon Frame - HE Capable: {value}"
else: else:
return False return f"Beacon Frame - HE Capable: {value}"
except ValueError: except ValueError:
raise "pcap file is required." raise "pcap file is required."
def check_probe_request(self, pcap_file): def check_he_capability_probe_request(self, pcap_file):
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 4') cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 4')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None:
print(value)
packet_count += 1 packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return f"Probe Request - HE Capability : {value}"
else: else:
return False return f"Probe Request - HE Capability : {value}"
except ValueError: except ValueError:
raise "pcap file is required." raise "pcap file is required."
def check_probe_response(self, pcap_file): def check_he_capability_probe_response(self, pcap_file):
try: try:
if pcap_file is not None: if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 5') cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format && wlan.fc.type_subtype == 5')
packet_count = 0 packet_count = 0
value = "Frame Not Found"
for pkt in cap: for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None:
print(value)
packet_count += 1 packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1: if packet_count >= 1:
return True return f"Probe Response - HE Capability : {value}"
else: else:
return False return f"Probe Response - HE Capability : {value}"
except ValueError: except ValueError:
raise "pcap file is required." raise "pcap file is required."
def check_he_capability_association_request(self, pcap_file):
print("pcap file path: %s" % pcap_file)
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 0')
packet_count = 0
value = "Frame Not Found"
for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformee')
if value is not None:
print(value)
packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
print(packet_count)
if packet_count >= 1:
return f"Association Request - HE Capability : {value}"
else:
return f"Association Request - HE Capability : {value}"
except ValueError:
raise "pcap file is required"
def check_he_capability_association_response(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 1')
packet_count = 0
value = "Frame Not Found"
for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_capabilities_mubeamformer')
if value is not None:
print(value)
packet_count += 1
if value == 0:
value = "Not Supported"
if value == 1:
value = "Supported"
if packet_count == 1:
break
if packet_count >= 1:
return f"Association Response - HE Capability : {value}"
else:
return f"Association Response - HE Capability : {value}"
except ValueError:
raise "pcap file is required"
def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180): def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180):
pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" if test_name is not None:
self.pcap_name = test_name + ".pcap"
else:
self.pcap_name = "capture" + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap"
print('----------pcap name----------: \n', self.pcap_name)
self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0") self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0")
self.wifi_monitor.start_sniff(capname=pcap_name, duration_sec=sniff_duration) self.wifi_monitor.start_sniff(capname=self.pcap_name, duration_sec=sniff_duration)
for i in range(int(sniff_duration)):
time.sleep(1)
self.wifi_monitor.cleanup() self.wifi_monitor.cleanup()
return pcap_name return self.pcap_name
def move_pcap(self, current_path, updated_path): def move_pcap(self, current_path="/home/lanforge/", pcap_name=None):
lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", if current_path is None:
report_location=current_path, current_path = "/home/lanforge/"
report_dir=updated_path) if pcap_name is None:
pcap_name = self.pcap_name
print('...............Moving pcap to directory............\n', current_path + pcap_name)
print('++++++', os.getcwd())
if pcap_name is not None:
if os.path.exists(current_path+pcap_name):
lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", report_location=current_path + pcap_name, report_dir=".")
else:
raise FileNotFoundError
else:
raise ValueError("pcap_name is Required!")
def main(): def main():
@@ -235,13 +434,15 @@ SETUP:
This script requires pyshark to be installed before,you can install it by running "pip install pyshark" This script requires pyshark to be installed before,you can install it by running "pip install pyshark"
EXAMPLE: EXAMPLE:
see: /py-scritps/lf_pcap_test.py see: /py-scritps/lf_pcap.py
--------------------- ---------------------
''') ''')
parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True) parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True)
parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None) parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None)
args = parser.parse_args() args = parser.parse_args()
pcap_obj = LfPcap( pcap_obj = LfPcap(
host="192.168.200.229",
port=8080,
_read_pcap_file=args.pcap_file, _read_pcap_file=args.pcap_file,
_apply_filter=args.apply_filter, _apply_filter=args.apply_filter,
_live_filter=None, _live_filter=None,
@@ -250,8 +451,13 @@ see: /py-scritps/lf_pcap_test.py
_live_cap_timeout=None, _live_cap_timeout=None,
_live_remote_cap_interface=None _live_remote_cap_interface=None
) )
test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) # pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file)
print(test) # pcap_obj.check_beamformer_association_request(pcap_file=pcap_obj.pcap_file)
# pcap_obj.check_beamformer_association_response(pcap_file=pcap_obj.pcap_file)
# pcap_obj.check_beamformer_beacon_frame(pcap_file=pcap_obj.pcap_file)
# pcap_obj.get_wlan_mgt_status_code(pcap_file=pcap_obj.pcap_file)
# pcap_obj.get_packet_info(pcap_obj.pcap_file)
pcap_obj.check_he_capability_beacon_frame(pcap_file=pcap_obj.pcap_file)
if __name__ == "__main__": if __name__ == "__main__":