diff --git a/py-scripts/lf_pcap.py b/py-scripts/lf_pcap.py new file mode 100644 index 00000000..0e213cf7 --- /dev/null +++ b/py-scripts/lf_pcap.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +""" +NAME: lf_pcap.py + +PURPOSE: +Common Library for reading pcap files and check packet information for specific filters + +SETUP: This script requires pyshark and tshark to be installed before + +EXAMPLE: +see: /py-scritps/lf_pcap_test.py for example + +COPYWRITE + Copyright 2021 Candela Technologies Inc + License: Free to distribute and modify. LANforge systems must be licensed. + +INCLUDE_IN_README +""" +import os +import sys +import argparse +import pyshark as ps +import importlib +from datetime import datetime + +sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) + +wifi_monitor = importlib.import_module("py-json.wifi_monitor_profile") +WiFiMonitor = wifi_monitor.WifiMonitor +lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base") +LFCliBase = lfcli_base.LFCliBase +realm = importlib.import_module("py-json.realm") +Realm = realm.Realm +cv_test_reports = importlib.import_module("py-json.cv_test_reports") +lf_report = cv_test_reports.lanforge_reports + + +class LfPcap(Realm): + def __init__(self, + host="localhost", port=8080, + _read_pcap_file=None, + _apply_filter=None, + _live_pcap_interface=None, + _live_cap_timeout=None, + _live_filter=None, + _live_remote_cap_host=None, + _live_remote_cap_interface=None, + _debug_on=False + ): + super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) + self.host = host, + self.port = port + self.debug = _debug_on + self.pcap = None + self.live_pcap = None + self.remote_pcap = None + self.pcap_file = _read_pcap_file + self.apply_filter = _apply_filter + self.live_filter = _live_filter + self.live_pcap_interface = _live_pcap_interface + self.live_cap_timeout = _live_cap_timeout + self.remote_cap_host = _live_remote_cap_host + self.remote_cap_interface = _live_remote_cap_interface + self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug) + + def read_pcap(self, pcap_file, apply_filter=None): + self.pcap_file = pcap_file + if apply_filter is not None: + self.apply_filter = apply_filter + try: + self.pcap = ps.FileCapture(input_file=self.pcap_file, display_filter=self.apply_filter) + except Exception as error: + raise error + return self.pcap + + def capture_live_pcap(self): + try: + self.live_pcap = ps.LiveCapture(interface=self.live_pcap_interface, output_file='captured.pcap') + self.live_pcap.sniff(timeout=300) + except ValueError: + raise "Capture Error" + return self.live_pcap + + def capture_remote_pcap(self): + try: + self.remote_pcap = ps.RemoteCapture(remote_host=self.remote_cap_host, + remote_interface=self.remote_cap_interface) + except ValueError: + raise "Host error" + return self.remote_pcap + + def check_group_id_mgmt(self, pcap_file): + print("pcap file path: %s" % pcap_file) + try: + if pcap_file is not None: + print("Checking for Group ID Management Actions Frame...") + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.mgt && wlan.vht.group_id_management') + packet_count = 0 + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') + if value is not None: + print(value) + packet_count += 1 + print(packet_count) + if packet_count >= 1: + return True + else: + return False + except ValueError: + raise "pcap file is required" + + def check_beamformer_association_request(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' + 'wlan.fc.type_subtype == 0') + packet_count = 0 + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') + if value is not None: + print(value) + packet_count += 1 + print(packet_count) + if packet_count >= 1: + return True + else: + return False + except ValueError: + raise "pcap file is required" + + def check_beamformer_association_response(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.vht.capabilities.mubeamformer == 1 && ' + 'wlan.fc.type_subtype == 1') + packet_count = 0 + for pkt in cap: + if 'wlan.mgt' in pkt: + value = pkt['wlan.mgt'].get_field_value('wlan_vht_group_id_management') + if value is not None: + print(value) + packet_count += 1 + if packet_count >= 1: + return True + else: + return False + except ValueError: + raise "pcap file is required" + + def check_beamformer_report_poll(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 20') + packet_count = 0 + for pkt in cap: + packet_count += 1 + if packet_count >= 1: + return True + else: + return False + except ValueError: + raise "pcap file is required." + + def check_he_capability(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='radiotap.he.data_1.ppdu_format') + packet_count = 0 + for pkt in cap: + packet_count += 1 + if packet_count >= 1: + return True + else: + return False + except ValueError: + raise "pcap file is required." + + def check_probe_request(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 4') + packet_count = 0 + for pkt in cap: + packet_count += 1 + if packet_count >= 1: + return True + else: + return False + except ValueError: + raise "pcap file is required." + + def check_probe_response(self, pcap_file): + try: + if pcap_file is not None: + cap = self.read_pcap(pcap_file=pcap_file, apply_filter='wlan.fc.type_subtype == 5') + packet_count = 0 + for pkt in cap: + packet_count += 1 + if packet_count >= 1: + return True + else: + return False + except ValueError: + raise "pcap file is required." + + def sniff_packets(self, interface_name="wiphy1", test_name="mu-mimo", channel=-1, sniff_duration=180): + pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" + self.wifi_monitor.create(resource_=1, channel=channel, mode="AUTO", radio_=interface_name, name_="moni0") + self.wifi_monitor.start_sniff(capname=pcap_name, duration_sec=sniff_duration) + self.wifi_monitor.cleanup() + return pcap_name + + def move_pcap(self, current_path, updated_path): + lf_report.pull_reports(hostname=self.host, port=22, username="lanforge", password="lanforge", + report_location=current_path, + report_dir=updated_path) + + +def main(): + parser = argparse.ArgumentParser( + prog='lf_pcap.py', + formatter_class=argparse.RawTextHelpFormatter, + epilog='Common Library for reading pcap files and check packet information for specific filters', + description='''\ + """ +----------------------- +NAME: lf_pcap.py + +PURPOSE: +Common Library for reading pcap files and check packet information for specific filters + +SETUP: +This script requires pyshark to be installed before,you can install it by running "pip install pyshark" + +EXAMPLE: +see: /py-scritps/lf_pcap_test.py +--------------------- +''') + parser.add_argument('--pcap_file', '-p', help='provide the pcap file path', dest="pcap_file", required=True) + parser.add_argument('--apply_filter', '-f', help='apply the filter you want to', dest='apply_filter', default=None) + args = parser.parse_args() + pcap_obj = LfPcap( + _read_pcap_file=args.pcap_file, + _apply_filter=args.apply_filter, + _live_filter=None, + _live_pcap_interface=None, + _live_remote_cap_host=None, + _live_cap_timeout=None, + _live_remote_cap_interface=None + ) + test = pcap_obj.check_group_id_mgmt(pcap_file=pcap_obj.pcap_file) + print(test) + + +if __name__ == "__main__": + main() diff --git a/py-scripts/test_ip_variable_time.py b/py-scripts/test_ip_variable_time.py index 963d372f..6cb6bbbf 100755 --- a/py-scripts/test_ip_variable_time.py +++ b/py-scripts/test_ip_variable_time.py @@ -99,6 +99,7 @@ class IPVariableTime(Realm): self.traffic_type = traffic_type self.number_template = number_template self.debug = _debug_on + self.timeout_sec = 60 # self.json_post("/cli-json/set_resource", { # "shelf":1, # "resource":all, @@ -165,11 +166,11 @@ class IPVariableTime(Realm): self.station_profile.admin_up() temp_stas = self.station_profile.station_names.copy() print("temp_stas {temp_stas}".format(temp_stas=temp_stas)) - if self.wait_for_ip(temp_stas, ipv4=not self.ipv6, ipv6=self.ipv6): + if self.wait_for_ip(temp_stas, ipv4=not self.ipv6, ipv6=self.ipv6, timeout_sec=self.timeout_sec): self._pass("All stations got IPs") else: self._fail("Stations failed to get IPs") - self.exit_fail() + #self.exit_fail() self.cx_profile.start_cx() def stop(self): diff --git a/py-scripts/test_ipv4_ttls.py b/py-scripts/test_ipv4_ttls.py index 8c4cf40d..39084807 100755 --- a/py-scripts/test_ipv4_ttls.py +++ b/py-scripts/test_ipv4_ttls.py @@ -10,7 +10,6 @@ if sys.version_info[0] != 3: print("This script requires Python 3") exit(1) - sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) LFUtils = importlib.import_module("py-json.LANforge.LFUtils") @@ -108,7 +107,7 @@ class TTLSTest(Realm): self.hs20_enable = hs20_enable self.enable_pkc = enable_pkc - self.timeout = 120 + self.timeout = 60 self.number_template = number_template self.debug = _debug_on self.station_profile = self.new_station_profile() @@ -175,8 +174,9 @@ class TTLSTest(Realm): eap=self.eap, identity=self.identity, passwd=self.ttls_passwd, - realm=self.ttls_realm, - domain=self.domain, + private_key=self.private_key, + pk_password=self.pk_passwd, + ca_cert=self.ca_cert, hessid=self.hessid) if self.ieee80211w: self.station_profile.set_command_param("add_sta", "ieee80211w", self.ieee80211w) @@ -335,7 +335,7 @@ test_ipv4_ttls.py: -------------------- Generic command layout: python ./test_ipv4_ttls.py - + --upstream_port eth1 --radio wiphy0 --num_stations 3