mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-02 11:48:03 +00:00
Merge branch 'master' of https://github.com/Telecominfraproject/wlan-lanforge-scripts
This commit is contained in:
258
py-scripts/lf_pcap.py
Normal file
258
py-scripts/lf_pcap.py
Normal file
@@ -0,0 +1,258 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
NAME: lf_pcap.py
|
||||
|
||||
PURPOSE:
|
||||
Common Library for reading pcap files and check packet information for specific filters
|
||||
|
||||
SETUP: This script requires pyshark and tshark to be installed before
|
||||
|
||||
EXAMPLE:
|
||||
see: /py-scritps/lf_pcap_test.py for example
|
||||
|
||||
COPYWRITE
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
License: Free to distribute and modify. LANforge systems must be licensed.
|
||||
|
||||
INCLUDE_IN_README
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import pyshark as ps
|
||||
import importlib
|
||||
from datetime import datetime
|
||||
|
||||
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
|
||||
|
||||
wifi_monitor = importlib.import_module("py-json.wifi_monitor_profile")
|
||||
WiFiMonitor = wifi_monitor.WifiMonitor
|
||||
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
|
||||
LFCliBase = lfcli_base.LFCliBase
|
||||
realm = importlib.import_module("py-json.realm")
|
||||
Realm = realm.Realm
|
||||
cv_test_reports = importlib.import_module("py-json.cv_test_reports")
|
||||
lf_report = cv_test_reports.lanforge_reports
|
||||
|
||||
|
||||
class LfPcap(Realm):
|
||||
def __init__(self,
|
||||
host="localhost", port=8080,
|
||||
_read_pcap_file=None,
|
||||
_apply_filter=None,
|
||||
_live_pcap_interface=None,
|
||||
_live_cap_timeout=None,
|
||||
_live_filter=None,
|
||||
_live_remote_cap_host=None,
|
||||
_live_remote_cap_interface=None,
|
||||
_debug_on=False
|
||||
):
|
||||
super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on)
|
||||
self.host = host,
|
||||
self.port = port
|
||||
self.debug = _debug_on
|
||||
self.pcap = None
|
||||
self.live_pcap = None
|
||||
self.remote_pcap = None
|
||||
self.pcap_file = _read_pcap_file
|
||||
self.apply_filter = _apply_filter
|
||||
self.live_filter = _live_filter
|
||||
self.live_pcap_interface = _live_pcap_interface
|
||||
self.live_cap_timeout = _live_cap_timeout
|
||||
self.remote_cap_host = _live_remote_cap_host
|
||||
self.remote_cap_interface = _live_remote_cap_interface
|
||||
self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug)
|
||||
|
||||
def read_pcap(self, pcap_file, apply_filter=None):
|
||||
self.pcap_file = pcap_file
|
||||
if apply_filter is not None:
|
||||
self.apply_filter = apply_filter
|
||||
try:
|
||||
self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter)
|
||||
except Exception as error:
|
||||
raise error
|
||||
return self.pcap
|
||||
|
||||
def capture_live_pcap(self):
|
||||
try:
|
||||
self.live_pcap = ps.LiveCapture(interface=self.live_pcap_interface, output_file='captured.pcap')
|
||||
self.live_pcap.sniff(timeout=300)
|
||||
except ValueError:
|
||||
raise "Capture Error"
|
||||
return self.live_pcap
|
||||
|
||||
def capture_remote_pcap(self):
|
||||
try:
|
||||
self.remote_pcap = ps.RemoteCapture(remote_host=self.remote_cap_host,
|
||||
remote_interface=self.remote_cap_interface)
|
||||
except ValueError:
|
||||
raise "Host error"
|
||||
return self.remote_pcap
|
||||
|
||||
def check_group_id_mgmt(self, pcap_file):
|
||||
print("pcap file path: %s" % pcap_file)
|
||||
try:
|
||||
if pcap_file is not None:
|
||||
print("Checking for Group ID Management Actions Frame...")
|
||||
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management')
|
||||
packet_count = 0
|
||||
for pkt in cap:
|
||||
if 'wlan.mgt' in pkt:
|
||||
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
|
||||
if value is not None:
|
||||
print(value)
|
||||
packet_count += 1
|
||||
print(packet_count)
|
||||
if packet_count >= 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except ValueError:
|
||||
raise "pcap file is required"
|
||||
|
||||
def check_beamformer_association_request(self, pcap_file):
|
||||
try:
|
||||
if pcap_file is not None:
|
||||
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && '
|
||||
'wlan.fc.type_subtype == 0')
|
||||
packet_count = 0
|
||||
for pkt in cap:
|
||||
if 'wlan.mgt' in pkt:
|
||||
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
|
||||
if value is not None:
|
||||
print(value)
|
||||
packet_count += 1
|
||||
print(packet_count)
|
||||
if packet_count >= 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except ValueError:
|
||||
raise "pcap file is required"
|
||||
|
||||
def check_beamformer_association_response(self, pcap_file):
|
||||
try:
|
||||
if pcap_file is not None:
|
||||
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && '
|
||||
'wlan.fc.type_subtype == 1')
|
||||
packet_count = 0
|
||||
for pkt in cap:
|
||||
if 'wlan.mgt' in pkt:
|
||||
value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management')
|
||||
if value is not None:
|
||||
print(value)
|
||||
packet_count += 1
|
||||
if packet_count >= 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except ValueError:
|
||||
raise "pcap file is required"
|
||||
|
||||
def check_beamformer_report_poll(self, pcap_file):
|
||||
try:
|
||||
if pcap_file is not None:
|
||||
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20')
|
||||
packet_count = 0
|
||||
for pkt in cap:
|
||||
packet_count += 1
|
||||
if packet_count >= 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except ValueError:
|
||||
raise "pcap file is required."
|
||||
|
||||
def check_he_capability(self, pcap_file):
|
||||
try:
|
||||
if pcap_file is not None:
|
||||
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format')
|
||||
packet_count = 0
|
||||
for pkt in cap:
|
||||
packet_count += 1
|
||||
if packet_count >= 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except ValueError:
|
||||
raise "pcap file is required."
|
||||
|
||||
def check_probe_request(self, pcap_file):
|
||||
try:
|
||||
if pcap_file is not None:
|
||||
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 4')
|
||||
packet_count = 0
|
||||
for pkt in cap:
|
||||
packet_count += 1
|
||||
if packet_count >= 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except ValueError:
|
||||
raise "pcap file is required."
|
||||
|
||||
def check_probe_response(self, pcap_file):
|
||||
try:
|
||||
if pcap_file is not None:
|
||||
cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 5')
|
||||
packet_count = 0
|
||||
for pkt in cap:
|
||||
packet_count += 1
|
||||
if packet_count >= 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except ValueError:
|
||||
raise "pcap file is required."
|
||||
|
||||
def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180):
|
||||
pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap"
|
||||
self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0")
|
||||
self.wifi_monitor.start_sniff(capname=pcap_name, duration_sec=sniff_duration)
|
||||
self.wifi_monitor.cleanup()
|
||||
return pcap_name
|
||||
|
||||
def move_pcap(self, current_path, updated_path):
|
||||
lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge",
|
||||
report_location=current_path,
|
||||
report_dir=updated_path)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='lf_pcap.py',
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog='Common Library for reading pcap files and check packet information for specific filters',
|
||||
description='''\
|
||||
"""
|
||||
-----------------------
|
||||
NAME: lf_pcap.py
|
||||
|
||||
PURPOSE:
|
||||
Common Library for reading pcap files and check packet information for specific filters
|
||||
|
||||
SETUP:
|
||||
This script requires pyshark to be installed before,you can install it by running "pip install pyshark"
|
||||
|
||||
EXAMPLE:
|
||||
see: /py-scritps/lf_pcap_test.py
|
||||
---------------------
|
||||
''')
|
||||
parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True)
|
||||
parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None)
|
||||
args = parser.parse_args()
|
||||
pcap_obj = LfPcap(
|
||||
_read_pcap_file=args.pcap_file,
|
||||
_apply_filter=args.apply_filter,
|
||||
_live_filter=None,
|
||||
_live_pcap_interface=None,
|
||||
_live_remote_cap_host=None,
|
||||
_live_cap_timeout=None,
|
||||
_live_remote_cap_interface=None
|
||||
)
|
||||
test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file)
|
||||
print(test)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -99,6 +99,7 @@ class IPVariableTime(Realm):
|
||||
self.traffic_type = traffic_type
|
||||
self.number_template = number_template
|
||||
self.debug = _debug_on
|
||||
self.timeout_sec = 60
|
||||
# self.json_post("/cli-json/set_resource", {
|
||||
# "shelf":1,
|
||||
# "resource":all,
|
||||
@@ -165,11 +166,11 @@ class IPVariableTime(Realm):
|
||||
self.station_profile.admin_up()
|
||||
temp_stas = self.station_profile.station_names.copy()
|
||||
print("temp_stas {temp_stas}".format(temp_stas=temp_stas))
|
||||
if self.wait_for_ip(temp_stas, ipv4=not self.ipv6, ipv6=self.ipv6):
|
||||
if self.wait_for_ip(temp_stas, ipv4=not self.ipv6, ipv6=self.ipv6, timeout_sec=self.timeout_sec):
|
||||
self._pass("All stations got IPs")
|
||||
else:
|
||||
self._fail("Stations failed to get IPs")
|
||||
self.exit_fail()
|
||||
#self.exit_fail()
|
||||
self.cx_profile.start_cx()
|
||||
|
||||
def stop(self):
|
||||
|
||||
@@ -10,7 +10,6 @@ if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
|
||||
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
|
||||
|
||||
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
|
||||
@@ -108,7 +107,7 @@ class TTLSTest(Realm):
|
||||
self.hs20_enable = hs20_enable
|
||||
self.enable_pkc = enable_pkc
|
||||
|
||||
self.timeout = 120
|
||||
self.timeout = 60
|
||||
self.number_template = number_template
|
||||
self.debug = _debug_on
|
||||
self.station_profile = self.new_station_profile()
|
||||
@@ -175,8 +174,9 @@ class TTLSTest(Realm):
|
||||
eap=self.eap,
|
||||
identity=self.identity,
|
||||
passwd=self.ttls_passwd,
|
||||
realm=self.ttls_realm,
|
||||
domain=self.domain,
|
||||
private_key=self.private_key,
|
||||
pk_password=self.pk_passwd,
|
||||
ca_cert=self.ca_cert,
|
||||
hessid=self.hessid)
|
||||
if self.ieee80211w:
|
||||
self.station_profile.set_command_param("add_sta", "ieee80211w", self.ieee80211w)
|
||||
@@ -335,7 +335,7 @@ test_ipv4_ttls.py:
|
||||
--------------------
|
||||
Generic command layout:
|
||||
python ./test_ipv4_ttls.py
|
||||
|
||||
|
||||
--upstream_port eth1
|
||||
--radio wiphy0
|
||||
--num_stations 3
|
||||
|
||||
Reference in New Issue
Block a user