Files
wlan-lanforge-scripts/py-scripts/lf_pcap.py
2022-02-24 12:09:37 +05:30

197 lines
7.3 KiB
Python

#!/usr/bin/env python3
"""
NAME: lf_pcap.py
PURPOSE:
Common Library for reading pcap files and check packet information for specific filters
SETUP: This script requires pyshark and tshark to be installed before
EXAMPLE:
see: /py-scritps/lf_pcap_test.py for example
COPYWRITE
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
INCLUDE_IN_README
"""
import argparse
import pyshark as ps
import importlib
from datetime import datetime
wifi_monitor = importlib.import_module("py-json.wifi_monitor_profile")
WiFiMonitor = wifi_monitor.WifiMonitor
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
LFCliBase = lfcli_base.LFCliBase
class LfPcap(WiFiMonitor):
def __init__(self,
_read_pcap_file=None,
_apply_filter=None,
_live_pcap_interface=None,
_live_cap_timeout=None,
_live_filter=None,
_live_remote_cap_host=None,
_live_remote_cap_interface=None
):
super().__init__(self, lfclient_url=LFCliBase.lfclient_url, local_realm=self, up=True, debug_=False, resource_=1)
self.pcap = None
self.live_pcap = None
self.remote_pcap = None
self.pcap_file = _read_pcap_file
self.apply_filter = _apply_filter
self.live_filter = _live_filter
self.live_pcap_interface = _live_pcap_interface
self.live_cap_timeout = _live_cap_timeout
self.remote_cap_host = _live_remote_cap_host
self.remote_cap_interface = _live_remote_cap_interface
def read_pcap(self, pcap_file, apply_filter=None):
self.pcap_file = pcap_file
if apply_filter is not None:
self.apply_filter = apply_filter
try:
self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter)
except Exception as error:
raise error
return self.pcap
def capture_live_pcap(self):
try:
self.live_pcap = ps.LiveCapture(interface=self.live_pcap_interface, output_file='captured.pcap')
self.live_pcap.sniff(timeout=300)
except ValueError:
raise "Capture Error"
return self.live_pcap
def capture_remote_pcap(self):
try:
self.remote_pcap = ps.RemoteCapture(remote_host=self.remote_cap_host,
remote_interface=self.remote_cap_interface)
except ValueError:
raise "Host error"
return self.remote_pcap
def check_group_id_mgmt(self, pcap_file):
print("pcap file path: %s" % pcap_file)
try:
if pcap_file is not None:
print("Checking for Group ID Management Actions Frame...")
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management')
packet_count = 0
for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
if value is not None:
print(value)
packet_count += 1
print(packet_count)
if packet_count >= 1:
return True
else:
return False
except ValueError:
raise "pcap file is required"
def check_beamformer_association_request(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && '
'wlan.fc.type_subtype==0x000')
packet_count = 0
for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
if value is not None:
print(value)
packet_count += 1
print(packet_count)
if packet_count >= 1:
return True
else:
return False
except ValueError:
raise "pcap file is required"
def check_beamformer_association_response(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && '
'wlan.fc.type_subtype==0x001')
packet_count = 0
for pkt in cap:
if 'wlan.mgt' in pkt:
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
if value is not None:
print(value)
packet_count += 1
if packet_count >= 1:
return True
else:
return False
except ValueError:
raise "pcap file is required"
def check_beamformer_report_poll(self, pcap_file):
try:
if pcap_file is not None:
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 0x0014')
packet_count = 0
for pkt in cap:
packet_count += 1
if packet_count >= 1:
return True
else:
return False
except ValueError:
raise "pcap file is required."
def sniff_packets(self, interface_name="Wiphy0", test_name="mu-mimo", sniff_duration=180):
pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M-%S")).replace(':', '-') + ".pcap"
self.create(resource_=1, channel=None, mode="AUTO", radio_=interface_name, name_="moni0")
self.start_sniff(capname=pcap_name, duration_sec=sniff_duration)
self.cleanup()
def main():
parser = argparse.ArgumentParser(
prog='lf_pcap.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='Common Library for reading pcap files and check packet information for specific filters',
description='''\
"""
-----------------------
NAME: lf_pcap.py
PURPOSE:
Common Library for reading pcap files and check packet information for specific filters
SETUP:
This script requires pyshark to be installed before,you can install it by running "pip install pyshark"
EXAMPLE:
see: /py-scritps/lf_pcap_test.py
---------------------
''')
parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True)
parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None)
args = parser.parse_args()
pcap_obj = LfPcap(
_read_pcap_file=args.pcap_file,
_apply_filter=args.apply_filter,
_live_filter=None,
_live_pcap_interface=None,
_live_remote_cap_host=None,
_live_cap_timeout=None,
_live_remote_cap_interface=None
)
test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file)
print(test)
if __name__ == "__main__":
main()