#!/usr/bin/env python3 """ NAME: lf_roam_test.py PURPOSE: lf_hard_rome_test.py works on both roaming methods i.e. hard/forced roaming and also attenuation based roaming (soft roam) specific or purely based to 11r. - By default, this script executes a hard roaming process and provides the results of the 11r roam test pdf, as well as all the packet captures generated after the roam test. However, to perform a soft roam, the soft_roam parameter must be set to true. Hard Roam EXAMPLE: For a single station and a single iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 1 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based EXAMPLE: For a single station and multiple iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 1 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 10 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based EXAMPLE: For multiple station and a single iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 10 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based EXAMPLE: For multiple station and multiple iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 10 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 10 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based EXAMPLE: For multiple station and multiple iteration with multicast traffic enable python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "10:f9:20:fd:f3:4b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 2 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "36" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --sta_type normal --multicast True Soft Roam EXAMPLE: For a single station and a single iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 1 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --soft_roam True EXAMPLE: For a single station and multiple iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 1 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 10 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --soft_roam True EXAMPLE: For multiple station and a single iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 10 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --soft_roam True EXAMPLE: For multiple station and multiple iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 10 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 10 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --soft_roam True SCRIPT_CLASSIFICATION: Test NOTES: The primary focus of this script is to enable seamless roaming of clients/stations between two access points (APs). The test can be conducted with a single or multiple stations, with single or multiple iterations. The script will create stations/clients with advanced/802.1x and 11r key management. By default, it will create a single station/client. Once the stations are created, the script will generate CX traffic between the upstream port and the stations and run the traffic before roam. Packet captures will be taken for each station/client in two scenarios: (i) While the station/client is connected to an AP (ii) While the station/client roams from one AP to another AP These packet captures will be used to analyze the performance and stability of the roaming process. Overall, this script is designed to provide a comprehensive test of the roaming functionality of the APs and the stability of the network when clients move between APs. The following are the criteria for PASS the test: 1. The BSSID of the station should change after roaming from one AP to another 2 The station should not experience any disconnections during/after the roaming process. 3. The duration of the roaming process should be less than 100 ms. The following are the criteria for FAIL the test: 1. The BSSID of the station remains unchanged after roaming from one AP to another. 2. No roaming occurs, as all stations are connected to the same AP. 3. The captured packet does not contain a Reassociation Response Frame. 4. The station experiences disconnection during/after the roaming process. 5. The duration of the roaming process exceeds 100 ms. STATUS: BETA RELEASE (MORE TESTING ONLY WITH MULTICAST) VERIFIED_ON: 15-MAY-2023, Underdevelopment LICENSE: Free to distribute and modify. LANforge systems must be licensed. Copyright 2022 Candela Technologies Inc INCLUDE_IN_README: False """ import sys import os import importlib import logging import time import datetime from datetime import datetime import pandas as pd import paramiko from itertools import chain import argparse logger = logging.getLogger(__name__) if sys.version_info[0] != 3: logger.critical("This script requires Python 3") exit(1) sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base") LFCliBase = lfcli_base.LFCliBase LFUtils = importlib.import_module("py-json.LANforge.LFUtils") realm = importlib.import_module("py-json.realm") Realm = realm.Realm lf_logger_config = importlib.import_module("py-scripts.lf_logger_config") cv_test_reports = importlib.import_module("py-json.cv_test_reports") lf_report = cv_test_reports.lanforge_reports lf_report_pdf = importlib.import_module("py-scripts.lf_report") lf_csv = importlib.import_module("py-scripts.lf_csv") lf_pcap = importlib.import_module("py-scripts.lf_pcap") lf_graph = importlib.import_module("py-scripts.lf_graph") sniff_radio = importlib.import_module("py-scripts.lf_sniff_radio") sta_connect = importlib.import_module("py-scripts.sta_connect2") lf_clean = importlib.import_module("py-scripts.lf_cleanup") series = importlib.import_module("cc_module_9800_3504") attenuator = importlib.import_module("py-scripts.attenuator_serial") modify = importlib.import_module("py-scripts.lf_atten_mod_test") multicast_profile = importlib.import_module("py-json.multicast_profile") class HardRoam(Realm): def __init__(self, lanforge_ip=None, lanforge_port=None, lanforge_ssh_port=None, c1_bssid=None, c2_bssid=None, fiveg_radio=None, twog_radio=None, sixg_radio=None, band=None, sniff_radio_=None, num_sta=None, security=None, security_key=None, ssid=None, upstream=None, duration=None, iteration=None, channel=None, option=None, duration_based=None, iteration_based=None, dut_name=None, traffic_type="lf_udp", roaming_delay=None, path="../", scheme="ssh", dest="localhost", user="admin", passwd="Cisco123", prompt="WLC2", series_cc="9800", ap="AP687D.B45C.1D1C", port="8888", band_cc="5g", timeout="10", eap_method=None, eap_identity=None, eap_password=None, pairwise_cipher=None, groupwise_cipher=None, private_key=None, pk_passwd=None, ca_cert=None, eap_phase1=None, eap_phase2=None, log_file=False, debug=False, soft_roam=False, sta_type=None, ieee80211w=None, multicast=None ): super().__init__(lanforge_ip, lanforge_port) self.lanforge_ip = lanforge_ip self.lanforge_port = lanforge_port self.lanforge_ssh_port = lanforge_ssh_port self.c1_bssid = c1_bssid self.c2_bssid = c2_bssid self.fiveg_radios = fiveg_radio self.twog_radios = twog_radio self.sixg_radios = sixg_radio self.band = band self.sniff_radio = sniff_radio_ self.num_sta = num_sta self.ssid_name = ssid self.security = security self.security_key = security_key self.upstream = upstream self.duration = duration self.iteration = iteration self.channel = channel self.option = option self.iteration_based = iteration_based self.duration_based = duration_based self.local_realm = Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) self.staConnect = sta_connect.StaConnect2(host=self.lanforge_ip, port=self.lanforge_port, outfile="sta_connect2.csv") self.final_bssid = [] self.pcap_obj_2 = None self.pcap_name = None self.test_duration = None self.client_list = [] self.dut_name = dut_name self.pcap_obj = lf_pcap.LfPcap(host=self.lanforge_ip, port=self.lanforge_port) self.lf_csv_obj = lf_csv() self.traffic_type = traffic_type self.roam_delay = roaming_delay self.sta_type = sta_type self.cx_profile = self.local_realm.new_l3_cx_profile() self.cc = None self.cc = series.create_controller_series_object( scheme=scheme, dest=dest, user=user, passwd=passwd, prompt=prompt, series=series_cc, ap=ap, port=port, band=band_cc, timeout=timeout) self.cc.pwd = path self.start_time = None self.end_time = None self.eap_method = eap_method self.eap_identity = eap_identity self.eap_password = eap_password self.pairwise_cipher = str(pairwise_cipher) self.groupwise_cipher = groupwise_cipher self.private_key = private_key self.pk_passwd = pk_passwd self.ca_cert = ca_cert self.eap_phase1 = eap_phase1 self.eap_phase2 = eap_phase2 self.log_file = log_file self.debug = debug self.mac_data = None self.soft_roam = soft_roam self.ieee80211w = ieee80211w self.multicast = multicast print("Number of iteration : ", self.iteration) # logging.basicConfig(filename='roam.log', filemode='w', level=logging.INFO, force=True) self.multi_cast_profile = multicast_profile.MULTICASTProfile(self.lanforge_ip, self.lanforge_port, local_realm=self) # Start debugger of controller def start_debug_(self, mac_list): mac = mac_list for i in mac: y = self.cc.debug_wireless_mac_cc(mac=str(i)) print(y) # Stop debugger of controller def stop_debug_(self, mac_list): mac = mac_list for i in mac: y = self.cc.no_debug_wireless_mac_cc(mac=str(i)) print(y) # Get trace file names from controller def get_ra_trace_file(self): ra = self.cc.get_ra_trace_files__cc() print(ra) ele_list = [y for y in (x.strip() for x in ra.splitlines()) if y] print(ele_list) return ele_list # Get trace file names from controller with respect to number of clients def get_file_name(self, client): file_name = [] if not self.debug: for i in range(client): file_name.append("debug disabled") else: file = self.get_ra_trace_file() indices = [i for i, s in enumerate(file) if 'dir bootflash: | i ra_trace' in s] # print(indices) y = indices[-1] if client == 1: z = file[y + 1] list_ = [z] m = list_[0].split(" ") print(m) print(len(m)) print(m[-1]) if m[-1].isnumeric(): print("Log file not Available") file_name.append("file not found") file_name.append(m[-1]) else: z = file[y + (int(0) + 1)] list_ = [z] m = list_[0].split(" ") print(m) print(len(m)) print(m[-1]) if m[-1].isnumeric(): print("Log file not Available") for i in range(client): file_name.append("file not found") else: for i in range(client): z = file[y + (int(i) + 1)] list_ = [z] m = list_[0].split(" ") print(m) print(len(m)) print(m[-1]) if m[-1].isnumeric(): print("Log file not Available") file_name.append("file not found") file_name.append(m[-1]) print("File_name", file_name) file_name.reverse() return file_name # delete trace file from controller def delete_trace_file(self, file): # file = self.get_file_name() self.cc.del_ra_trace_file_cc(file=file) # get station list from lf def get_station_list(self): sta = self.staConnect.station_list() if sta == "no response": return "no response" sta_list = [] for i in sta: for j in i: sta_list.append(j) return sta_list # Create N - number of clients of advanced configuration on lf def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=None, dut_security=None, dut_passwd=None, radio=None): local_realm = Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) station_profile = local_realm.new_station_profile() if self.band == "fiveg": radio = self.fiveg_radios if self.band == "twog": radio = self.twog_radios if self.band == "sixg": radio = self.sixg_radios sta_list = self.get_station_list() print("Available list of stations on lanforge-GUI :", sta_list) logging.info(str(sta_list)) if not sta_list: print("No stations are available on lanforge-GUI") logging.info("No stations are available on lanforge-GUI") else: station_profile.cleanup(sta_list, delay=1) LFUtils.wait_until_ports_disappear(base_url=local_realm.lfclient_url, port_list=sta_list, debug=True) print("Creating stations.") logging.info("Creating stations.") station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, end_id_=num_sta - 1, padding_number_=10000, radio=radio) if self.sta_type == "normal": station_profile.set_command_flag("add_sta", "power_save_enable", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: print("Soft roam true") logging.info("Soft roam true") if self.option == "otds": print("OTDS present") station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) if self.sta_type == "11r-sae-802.1x": dut_passwd = "[BLANK]" station_profile.use_security(dut_security, dut_ssid, dut_passwd) station_profile.set_number_template("00") station_profile.set_command_flag("add_sta", "create_admin_down", 1) station_profile.set_command_param("set_port", "report_timer", 1500) # connect station to particular bssid # self.station_profile.set_command_param("add_sta", "ap", self.bssid[0]) station_profile.set_command_flag("set_port", "rpt_timer", 1) if self.sta_type == "11r": station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) if not self.soft_roam: # station_profile.ssid_pass = self.security_key station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: print("Soft roam true") logging.info("Soft roam true") if self.option == "otds": print("OTDS present") station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-PSK ", pairwise="", group="", psk="", eap="", identity="", passwd="", pin="", phase1="NA", phase2="NA", pac_file="NA", private_key="NA", pk_password="NA", hessid="00:00:00:00:00:01", realm="localhost.localdomain", client_cert="NA", imsi="NA", milenage="NA", domain="localhost.localdomain", roaming_consortium="NA", venue_group="NA", network_type="NA", ipaddr_type_avail="NA", network_auth_type="NA", anqp_3gpp_cell_net="NA") if self.sta_type == "11r-sae": station_profile.set_command_flag("add_sta", "ieee80211w", 2) station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) # station_profile.set_command_flag("add_sta", "disable_roam", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: if self.option == "otds": station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-SAE ", pairwise="", group="", psk="", eap="", identity="", passwd="", pin="", phase1="NA", phase2="NA", pac_file="NA", private_key="NA", pk_password="NA", hessid="00:00:00:00:00:01", realm="localhost.localdomain", client_cert="NA", imsi="NA", milenage="NA", domain="localhost.localdomain", roaming_consortium="NA", venue_group="NA", network_type="NA", ipaddr_type_avail="NA", network_auth_type="NA", anqp_3gpp_cell_net="NA") if self.sta_type == "11r-eap": # wpa2 enterprise station_profile.set_command_flag("set_port", "rpt_timer", 1) # station_profile.set_command_param("add_sta", "ieee80211w", 2) station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: if self.option == "otds": station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) # station_profile.set_command_flag("add_sta", "disable_roam", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) # station_profile.set_command_flag("add_sta", "ap", "68:7d:b4:5f:5c:3f") station_profile.set_wifi_extra(key_mgmt="FT-EAP ", pairwise=self.pairwise_cipher, group=self.groupwise_cipher, eap=self.eap_method, identity=self.eap_identity, passwd=self.eap_password, ca_cert=self.ca_cert, private_key=self.private_key, pk_password=self.pk_passwd) if self.sta_type == "11r-eap-sha384": # wpa3 enterprise station_profile.set_command_flag("set_port", "rpt_timer", 1) station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: if self.option == "otds": station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-EAP-SHA384 ", pairwise=self.pairwise_cipher, group=self.groupwise_cipher, eap=self.eap_method, identity=self.eap_identity, passwd=self.eap_password, ca_cert=self.ca_cert, private_key=self.private_key, pk_password=self.pk_passwd) # enabling ieee80211w flag if self.ieee80211w: station_profile.set_command_param("add_sta", "ieee80211w", self.ieee80211w) station_profile.create(radio=radio, sta_names_=station_list) print("Waiting for ports to appear") logging.info("Waiting for ports to appear") local_realm.wait_until_ports_appear(sta_list=station_list) if self.soft_roam: for sta_name in station_list: sta = sta_name.split(".")[2] # TODO: Use name_to_eid # wpa_cmd = "roam " + str(checker2) bgscan = { "shelf": 1, "resource": 1, # TODO: Do not hard-code resource, get it from radio eid I think. "port": str(sta), "type": 'NA', "text": 'bgscan="simple:30:-65:300"' } print(bgscan) logging.info(str(bgscan)) self.local_realm.json_post("/cli-json/set_wifi_custom", bgscan) # time.sleep(2) station_profile.admin_up() print("Waiting for ports to admin up") logging.info("Waiting for ports to admin up") if local_realm.wait_for_ip(station_list): print("All stations got IPs") logging.info("All stations got IPs") # exit() return True else: print("Stations failed to get IPs") logging.info("Stations failed to get IPs") return False # create a multicast profile def mcast_tx(self): # set 1mbps tx rate self.multi_cast_profile.side_b_min_bps = 1000000 self.multi_cast_profile.create_mc_tx("mc_udp", self.upstream) def mcast_rx(self, sta_list): self.multi_cast_profile.side_a_min_bps = 0 print("Station List :", sta_list) self.multi_cast_profile.create_mc_rx("mc_udp", sta_list) def mcast_start(self): self.multi_cast_profile.start_mc() def mcast_stop(self): self.multi_cast_profile.stop_mc() # Create layer-3 traffic on clients def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, side_a_min_pdu, side_b_min_pdu, traffic_type, sta_list): print("Station List :", sta_list) logging.info(f"Station List : {str(sta_list)}") print(type(sta_list)) print("Upstream port :", self.upstream) logging.info(str(self.upstream)) self.cx_profile.host = self.lanforge_ip self.cx_profile.port = self.lanforge_port self.cx_profile.side_a_min_bps = side_a_min_rate self.cx_profile.side_a_max_bps = side_a_max_rate self.cx_profile.side_b_min_bps = side_b_min_rate self.cx_profile.side_b_max_bps = side_b_max_rate self.cx_profile.side_a_min_pdu = side_a_min_pdu, self.cx_profile.side_b_min_pdu = side_b_min_pdu, # Create layer3 end points & run traffic print("Creating Endpoints") logging.info("Creating Endpoints") self.cx_profile.create(endp_type=traffic_type, side_a=sta_list, side_b=self.upstream, sleep_time=0) self.cx_profile.start_cx() # Get layer3 values def get_layer3_values(self, cx_name=None, query=None): url = f"/cx/{cx_name}" response = self.json_get(_req_url=url) result = response[str(cx_name)][str(query)] return result # Get cross-connect names def get_cx_list(self): layer3_result = self.local_realm.cx_list() layer3_names = [item["name"] for item in layer3_result.values() if "_links" in item] print("Layer-3 Names :", layer3_names) return layer3_names # Get Endpoint values def get_endp_values(self, endp="A", cx_name="niki", query="tx bytes"): # self.get_cx_list() # self.json_get("http://192.168.100.131:8080/endp/Unsetwlan000-0-B?fields=rx%20rate") url = f"/endp/{cx_name}-{endp}?fields={query}" response = self.json_get(_req_url=url) print(response) if (response is None) or ("endpoint" not in response): print("Incomplete response:") exit(1) final = response["endpoint"][query] print(final) return final # Pre-Cleanup on lanforge def precleanup(self): obj = lf_clean.lf_clean(host=self.lanforge_ip, port=self.lanforge_port, clean_cxs=True, clean_endp=True) obj.resource = "all" obj.sta_clean() obj.cxs_clean() # obj.layer3_endp_clean() # Get client data from lf def station_data_query(self, station_name="wlan0", query="channel"): url = f"/port/{1}/{1}/{station_name}?fields={query}" # print("url//////", url) response = self.local_realm.json_get(_req_url=url) print("Response: ", response) if (response is None) or ("interface" not in response): print("Station_list: incomplete response:") # pprint(response) exit(1) y = response["interface"][query] return y # Start packet capture on lf # TODO: Check if other monitor ports exist on this radio already. If so, delete those # before adding new monitor port (or) just use the existing monitor port without creating # a new one. --Ben def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60): self.pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" self.pcap_obj_2 = sniff_radio.SniffRadio(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port, radio=radio, channel=radio_channel, monitor_name="monitor") self.pcap_obj_2.setup(0, 0, 0) time.sleep(5) self.pcap_obj_2.monitor.admin_up() time.sleep(5) self.pcap_obj_2.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration) # Stop packet capture and get file name def stop_sniffer(self): print("In Stop Sniffer") directory = None directory_name = "pcap" if directory_name: directory = os.path.join("", str(directory_name)) try: if not os.path.exists(directory): os.mkdir(directory) except Exception as x: print(x) self.pcap_obj_2.monitor.admin_down() self.pcap_obj_2.cleanup() lf_report.pull_reports(hostname=self.lanforge_ip, port=self.lanforge_ssh_port, username="lanforge", password="lanforge", report_location="/home/lanforge/" + self.pcap_name, report_dir="pcap") return self.pcap_name # Generate csv files at the beginning def generate_csv(self): file_name = [] for i in range(self.num_sta): file = 'test_client_' + str(i) + '.csv' if self.multicast == "True": lf_csv_obj = lf_csv(_columns=['Iterations', 'bssid1', 'bssid2', "PASS/FAIL", "Remark"], _rows=[], _filename=file) else: lf_csv_obj = lf_csv(_columns=['Iterations', 'bssid1', 'bssid2', "Roam Time(ms)", "PASS/FAIL", "Pcap file Name", "Log File", "Remark"], _rows=[], _filename=file) # "Packet loss", file_name.append(file) lf_csv_obj.generate_csv() return file_name # Get journal ctl logs/ kernel logs def journal_ctl_logs(self, file): jor_lst = [] name = "kernel_log" + file + ".txt" jor_lst.append(name) try: ssh = paramiko.SSHClient() command = "journalctl --since '5 minutes ago' > kernel_log" + file + ".txt" ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=self.lanforge_ip, port=self.lanforge_ssh_port, username="lanforge", password="lanforge", banner_timeout=600) stdin, stdout, stderr = ssh.exec_command(str(command)) stdout.readlines() ssh.close() kernel_log = "/home/lanforge/kernel_log" + file + ".txt" lf_report.pull_reports(hostname=self.lanforge_ip, port=self.lanforge_ssh_port, username="lanforge", password="lanforge", report_location=kernel_log, report_dir=".") except Exception as e: print(e) return jor_lst # Gives wlan management status of pcap file def get_wlan_mgt_status(self, file_name, pyshark_filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55)"): query_reasso_response = self.pcap_obj.get_wlan_mgt_status_code(pcap_file=str(file_name), filter=pyshark_filter) print("Query", query_reasso_response) return query_reasso_response # Get attenuator serial number def attenuator_serial(self): obj = attenuator.AttenuatorSerial(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) val = obj.show() return val # To modify the attenuators def attenuator_modify(self, serno, idx, val): atten_obj = modify.CreateAttenuator(self.lanforge_ip, self.lanforge_port, serno, idx, val) atten_obj.build() # This is where the main roaming functionality begins def run(self, file_n=None): try: print("Setting both attenuators to zero attenuation at the beginning.") logging.info("Setting both attenuators to zero attenuation at the beginning.") ser_no = self.attenuator_serial() print("Available attenuators :", ser_no[0], ser_no[1]) logging.info("Available attenuators :" + str(ser_no[0]) + " , " + str(ser_no[1])) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] self.attenuator_modify(ser_1, "all", 0) self.attenuator_modify(ser_2, "all", 0) except Exception as e: logging.warning(str(e)) finally: kernel_log = [] message = None, None # Start Timer test_time = datetime.now() test_time = test_time.strftime("%b %d %H:%M:%S") print("Test started at ", test_time) logging.info("Test started at " + str(test_time)) self.start_time = test_time # Getting two BSSID's for roam self.final_bssid.extend([self.c1_bssid, self.c2_bssid]) print("Final BSSID's are :", self.final_bssid) logging.info("Final BSSID's are :" + str(self.final_bssid)) # If 'Soft Roam' is selected, initially set the attenuator to zero. if self.soft_roam: print("Setting both attenuators to zero attenuation at the beginning for 'soft roam'") logging.info("Setting both attenuators to zero attenuation at the beginning for 'soft roam'") ser_no = self.attenuator_serial() print("Available attenuators :", ser_no[0], ser_no[1]) logging.info("Available attenuators :" + str(ser_no[0]) + " , " + str(ser_no[1])) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] self.attenuator_modify(ser_1, "all", 0) self.attenuator_modify(ser_2, "all", 0) # Start sniffer & Create clients with respect to bands print("Begin sniffing to establish the initial connection.") logging.info("Begin sniffing to establish the initial connection.") self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str(self.option) + "start" + "_", duration=3600) if self.band == "twog": self.local_realm.reset_port(self.twog_radios) self.create_n_clients(sta_prefix="wlan1", num_sta=self.num_sta, dut_ssid=self.ssid_name, dut_security=self.security, dut_passwd=self.security_key, radio=self.twog_radios) if self.band == "fiveg": self.local_realm.reset_port(self.fiveg_radios) self.create_n_clients(sta_prefix="wlan", num_sta=self.num_sta, dut_ssid=self.ssid_name, dut_security=self.security, dut_passwd=self.security_key, radio=self.fiveg_radios) if self.band == "sixg": self.local_realm.reset_port(self.sixg_radios) self.create_n_clients(sta_prefix="wlan", num_sta=self.num_sta, dut_ssid=self.ssid_name, dut_security=self.security, dut_passwd=self.security_key, radio=self.sixg_radios) # Check if all stations have ip or not sta_list = self.get_station_list() print("Checking for IP and station list :", sta_list) logging.info("Checking for IP and station list :" + str(sta_list)) if sta_list == "no response": print("No response from station") logging.info("No response from station") else: # if all stations got ip check mac address for stations val = self.wait_for_ip(sta_list) mac_list = [] for sta_name in sta_list: sta = sta_name.split(".")[2] # use name_to_eid mac = self.station_data_query(station_name=str(sta), query="mac") mac_list.append(mac) print("List of MAC addresses for all stations :", mac_list) logging.info("List of MAC addresses for all stations :" + str(mac_list)) self.mac_data = mac_list # if self.debug: # print("start debug") # self.start_debug_(mac_list=mac_list) # print("check for 30 min") # time.sleep(1800) print("Stop Sniffer") logging.info("Stop Sniffer") file_name_ = self.stop_sniffer() file_name = "./pcap/" + str(file_name_) print("pcap file name :", file_name) logging.info("pcap file name : " + str(file_name)) # if self.debug: # print("stop debugger") # self.stop_debug_(mac_list=mac_list) # # time.sleep(40) # exit() if val: # if all station got an ip, then check all station are connected to single AP print("All stations got ip") logging.info("All stations got ip") print("Check if all stations are connected single ap") logging.info("Check if all stations are connected single ap") # get BSSID'S of all stations print("Get BSSID's of all stations") logging.info("Get BSSID's of all stations") check = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") logging.info(str(bssid)) check.append(bssid) print("BSSID of the current connected stations : ", check) logging.info(str(check)) # Check if all the stations in the BSSID list have the same BSSID. print("Verifying whether all BSSID's are identical or not.") logging.info("Verifying whether all BSSID's are identical or not.") result = all(element == check[0] for element in check) # if all BSSID's are identical / same, run layer3 traffic b/w station to upstream if result: if self.multicast == "True": print("multicast is true") self.mcast_tx() self.mcast_rx(sta_list=sta_list) self.mcast_start() else: self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000, side_b_max_rate=0, sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250, side_b_min_pdu=1250) else: # if BSSID's are not identical / same, try to move all clients to one ap print("Attempt to ensure that all clients are connected to the same AP before " "initiating a roaming process.") logging.info("Attempt to ensure that all clients are connected to the same AP before " "initiating a roaming process.") count1 = check.count(self.c1_bssid.upper()) count2 = check.count(self.c2_bssid.upper()) checker, new_sta_list, checker2 = None, [], None if count1 > count2: print("Station connected mostly to ap1") logging.info("Station connected mostly to ap1") checker = self.c2_bssid.upper() checker2 = self.c1_bssid.upper() else: checker = self.c1_bssid.upper() checker2 = self.c2_bssid.upper() index_count = [i for i, x in enumerate(check) if x == checker] print(index_count) logging.info(str(index_count)) for i in index_count: new_sta_list.append(sta_list[i]) print("new_sta_list", new_sta_list) logging.info("new_sta_list " + str(new_sta_list)) for sta_name in new_sta_list: eid = self.name_to_eid(sta_name) print("eid", eid) # sta = sta_name.split(".")[2] # TODO: use name-to-eid sta = eid[2] print(sta) logging.info(sta) wpa_cmd = "roam " + str(checker2) wifi_cli_cmd_data1 = { "shelf": eid[0], "resource": eid[1], # TODO: do not hard-code "port": str(sta), "wpa_cli_cmd": 'scan trigger freq 5180 5300' } wifi_cli_cmd_data = { "shelf": eid[0], "resource": eid[1], "port": str(sta), "wpa_cli_cmd": wpa_cmd } print(wifi_cli_cmd_data) logging.info(str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: LANforge sta on same radio will share scan results, so you only need to scan on one STA per # radio, and then sleep should be 5 seconds, then roam every station that needs to roam. # You do not need this sleep and scan for each STA. self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) if self.multicast == "True": print("multicast is true") self.mcast_tx() self.mcast_rx(sta_list=sta_list) self.mcast_start() else: self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000, side_b_max_rate=0, sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250, side_b_min_pdu=1250) timeout, variable, iterable_var = None, None, None if self.duration_based is True: timeout = time.time() + 60 * float(self.duration) # iteration_dur = 50000000 iterable_var = 50000000 variable = -1 if self.iteration_based: variable = self.iteration iterable_var = self.iteration # post_bssid = None while variable: # The iteration loop for roaming begins at this point. if self.multicast == "True": if variable == 1: print("ignore") else: print("wait for 5 mins for next roam process") time.sleep(120) print("Value of the Variable : ", variable) logging.info("Value of the Variable :" + str(variable)) iterations, number, ser_1, ser_2 = self.iteration, None, None, None if variable != -1: iterations = iterable_var - variable variable = variable - 1 if variable == -1: # need to write duration iteration logic # iterations = iterable_var - iteration_dur if self.duration is not None: if time.time() > timeout: break if self.soft_roam: # Get the serial number of attenuators from lf ser_no = self.attenuator_serial() print(ser_no[0]) logging.info(str(ser_no[0])) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] if iterations % 2 == 0: print("even set c1 to lowest and c2 to highest attenuation ") logging.info("even set c1 to lowest and c2 to highest attenuation ") number = "even" print("number", number) logging.info("number " + str(number)) # set attenuation to zero in first attenuator and high in second attenuator self.attenuator_modify(ser_1, "all", 700) self.attenuator_modify(ser_2, "all", 0) else: print("odd, c1 is already at highest and c2 is at lowest") logging.info("odd, c1 is already at highest and c2 is at lowest") self.attenuator_modify(ser_1, "all", 0) self.attenuator_modify(ser_2, "all", 700) # 700 == 300/400 bgscan 15:-70:300 number = "odd" print("number", number) logging.info("number " + str(number)) try: # Define row list per iteration row_list = [] sta_list = self.get_station_list() print("Station list : ", sta_list) logging.info("Station list :" + str(sta_list)) if sta_list == "no response": print("No response") logging.info("No response") pass else: station = self.wait_for_ip(sta_list) if self.debug: print("Start debug") logging.info("Start debug") self.start_debug_(mac_list=mac_list) if station: print("All stations got ip") logging.info("All stations got ip") # Get bssid's of all stations currently connected bssid_list = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list.append(bssid) print("BSSID of the current connected stations : ", bssid_list) logging.info(str(bssid_list)) pass_fail_list = [] pcap_file_list = [] roam_time1 = [] # packet_loss_lst = [] remark = [] log_file = [] # Check if all element of bssid list has same bssid's result = all(element == bssid_list[0] for element in bssid_list) if not result: # Attempt to connect the client to the same AP for each iteration print("Giving a try to connect") logging.info("Giving a try to connect") print("Move all clients to one AP") logging.info("Move all clients to one AP") count3 = bssid_list.count(self.c1_bssid.upper()) count4 = bssid_list.count(self.c2_bssid.upper()) print("Count3", count3) logging.info("Count3 " + str(count3)) print("Count4", count4) logging.info("Count4 " + str(count4)) checker, new_sta_list, checker2 = None, [], None if count3 > count4: print("Station connected mostly to AP-1") logging.info("Station connected mostly to AP-1") checker = self.c2_bssid.upper() checker2 = self.c1_bssid.upper() else: checker = self.c1_bssid.upper() checker2 = self.c2_bssid.upper() index_count = [i for i, x in enumerate(bssid_list) if x == checker] print(index_count) logging.info(str(index_count)) for i in index_count: new_sta_list.append(sta_list[i]) print("new_sta_list", new_sta_list) logging.info("new_sta_list " + str(new_sta_list)) # for i, x in zip(bssid_list, sta_list): # if i == checker: # index_count = bssid_list.index(checker) # new_sta_list.append(sta_list[index_count]) # print("new_sta_list", new_sta_list) for sta_name in new_sta_list: # sta = sta_name.split(".")[2] eid = self.name_to_eid(sta_name) sta = eid[2] print(sta) logging.info(str(sta)) wpa_cmd = "roam " + str(checker2) wifi_cli_cmd_data1 = { "shelf": eid[0], "resource": eid[1], "port": str(sta), "wpa_cli_cmd": 'scan trigger freq 5180 5300' } wifi_cli_cmd_data = { "shelf": eid[0], "resource": eid[1], "port": str(sta), "wpa_cli_cmd": wpa_cmd } print(wifi_cli_cmd_data) logging.info(str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) # check bssid before before_bssid = [] for sta_name in sta_list: sta = sta_name.split(".")[2] before_bss = self.station_data_query(station_name=str(sta), query="ap") logging.info(str(before_bss)) before_bssid.append(before_bss) print("BSSID of the current connected stations : ", before_bssid) logging.info("BSSID of the current connected stations : " + str(before_bssid)) if before_bssid[0] == str(self.c1_bssid.upper()): post_bssid = self.c2_bssid.upper() else: post_bssid = self.c1_bssid.upper() print("After roaming, the stations will connect to %s the BSSID" % post_bssid) logging.info( "After roaming, the stations will connect to " + str(post_bssid) + "the BSSID") result1 = all(element == before_bssid[0] for element in before_bssid) if result1: print("All stations connected to same AP") logging.info("All stations connected to same AP") for i in before_bssid: local_row_list = [str(iterations + 1), i] logging.info(str(local_row_list)) row_list.append(local_row_list) print("Row list :", row_list) logging.info(str(row_list)) # if all bssid are equal then do check to which ap it is connected formated_bssid = before_bssid[0].lower() station_before = "" if formated_bssid == self.c1_bssid: print("Station connected to chamber1 AP") logging.info("Station connected to chamber1 AP") station_before = formated_bssid elif formated_bssid == self.c2_bssid: print("Station connected to chamber 2 AP") logging.info("Station connected to chamber 2 AP") station_before = formated_bssid print("Current connected stations BSSID", station_before) logging.info(str(station_before)) # After checking all conditions start roam and start snifffer print("Starting sniffer") logging.info("Starting sniffer") self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str( self.option) + "_iteration_" + str( iterations) + "_", duration=3600) if self.soft_roam: ser_num = None ser_num2 = None if number == "even": ser_num = ser_1 ser_num2 = ser_2 print("even", ser_num) logging.info("even " + str(ser_num)) elif number == "odd": ser_num = ser_2 ser_num2 = ser_1 print("odd", ser_num) logging.info("odd " + str(ser_num)) # logic to decrease c2 attenuation till 10 db using 1dbm steps status = None print("checking attenuation") logging.info("checking attenuation") print("ser num", ser_num) logging.info("ser num " + str(ser_num)) for atten_val2 in range(700, -10, -10): print(atten_val2) self.attenuator_modify(int(ser_num), "all", atten_val2) # TODO: You are changing in 1db increments. So, sleep for only 4 seconds # should be enough. print("wait for 4 secs") logging.info("wait for 4 secs") # query bssid's of all stations bssid_check = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") # if bssid == "NA": # time.sleep(10) bssid_check.append(bssid) print(bssid_check) logging.info(str(bssid_check)) # check if all are equal resulta = all(element == bssid_check[0] for element in bssid_check) if resulta: station_after = bssid_check[0].lower() if station_after == "N/A" or station_after == "na": status = "station did not roamed" print("station did not roamed") logging.info("station did not roamed") continue if station_after == station_before: status = "station did not roamed" print("station did not roamed") logging.info("station did not roamed") continue elif station_after != station_before: print("client performed roam") logging.info("client performed roam") break if status == "station did not roamed": # set c1 to high for atten_val1 in (range(0, 700, 10)): print(atten_val1) logging.info(str(atten_val1)) self.attenuator_modify(int(ser_num2), "all", atten_val1) # TODO: You are changing in 1db increments. So, sleep for only 4 seconds # should be enough. # TODO: Add attenuation step to logs to make it more obvious what script is doing. print("wait for 4 secs") logging.info("wait for 4 secs") bssid_check2 = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") # if bssid == "NA": # time.sleep(10) bssid_check2.append(bssid) print(bssid_check2) logging.info(str(bssid_check2)) # check if all are equal result = all(element == bssid_check2[0] for element in bssid_check2) if result: station_after = bssid_check2[0].lower() if station_after == "N/A" or station_after == "na": # status = "station did not roamed" print("station did not roamed") logging.info("station did not roamed") continue if station_after == station_before: # status = "station did not roamed" print("station did not roamed") logging.info("station did not roamed") continue else: print('station roamed') logging.info('station roamed') break else: if station_before == self.final_bssid[0]: print("Connected stations bssid is same to bssid list first element") logging.info( "Connected stations bssid is same to bssid list first element") for sta_name in sta_list: sta = sta_name.split(".")[2] logging.info(str(sta)) wpa_cmd = "" if self.option == "ota": wpa_cmd = "roam " + str(self.final_bssid[1]) if self.option == "otds": wpa_cmd = "ft_ds " + str(self.final_bssid[1]) # wpa_cmd = "roam " + str(self.final_bssid[1]) wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": 'scan trigger freq 5180 5300' } wifi_cli_cmd_data = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": wpa_cmd } print("Roam Command : ", wifi_cli_cmd_data) logging.info("Roam Command : " + str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: See note in similar code above about only needing to scan once per radio self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) else: print("Connected stations bssid is same to bssid list second element") logging.info( "Connected stations bssid is same to bssid list second element") for sta_name in sta_list: sta = sta_name.split(".")[2] wifi_cmd = "" if self.option == "ota": wifi_cmd = "roam " + str(self.final_bssid[0]) if self.option == "otds": wifi_cmd = "ft_ds " + str(self.final_bssid[0]) logging.info(str(sta)) wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": 'scan trigger freq 5180 5300' } wifi_cli_cmd_data = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": wifi_cmd } print("Roam Command : ", wifi_cli_cmd_data) logging.info("Roam Command : " + str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: See note in similar code above about only needing to scan once per radio self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) # Kernel logs kernel = self.journal_ctl_logs(file=str(iterations)) print("Name of the Kernel logs file :", kernel) for i in kernel: kernel_log.append(i) # Stop sniff & Attach data print("Stop sniffer") logging.info("Stop sniffer") file_name_ = self.stop_sniffer() file_name = "./pcap/" + str(file_name_) print("pcap file name", file_name) logging.info("pcap file name " + str(file_name)) if self.debug: print("Stop debugger") logging.info("Stop debugger") self.stop_debug_(mac_list=mac_list) else: print("Debug is disabled") logging.info("Debug is disabled") self.wait_for_ip(sta_list) bssid_list_1 = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list_1.append(bssid) print("The stations are romed to another AP (%s)" % bssid_list_1) logging.info("The stations are romed to another AP " + str(bssid_list_1)) for i, x in zip(row_list, bssid_list_1): i.append(x) print("Row list, after roam :", row_list) logging.info("Row list, after roam :" + str(row_list)) trace = self.get_file_name(client=self.num_sta) print("Trace file :", trace) log_file.append(trace) print("Log file :", log_file) # Check if all are equal all(element == bssid_list_1[0] for element in bssid_list_1) res = "" station_before_ = before_bssid print("The BSSID of the station before roamed :", station_before_) logging.info("The BSSID of the station before roamed : " + str(station_before_)) # For each mac address query data from pcap for i, x in zip(mac_list, range(len(station_before_))): print("MAC address :", i) logging.info("MAC address :" + str(i)) print("BSSID :", bssid_list_1) logging.info(str(bssid_list_1)) query_action_frame_time, auth_time = None, None station_after = bssid_list_1[x] print("The connected BSSID for stations, after rome :", station_after) logging.info( "The connected BSSID for stations, after rome : " + str(station_after)) if station_after == station_before_[x] or station_after == "na": print("Station did not roamed") logging.info("Station did not roamed") res = "FAIL" elif station_after != station_before_[x]: print("Client has performed a roaming operation.") logging.info("Client has performed a roaming operation.") res = "PASS" if res == "FAIL": res = "FAIL" if self.multicast == "True": print("multicast function") if res == "PASS": print("roam success") print("check for multicast traffic resumed or not") endp_list = self.json_get( "endp?fields=name,eid,rx rate (last)", debug_=False) print("endpoint", endp_list) local_list, local_list1, final_list = [], [], [] if "endpoint" in endp_list: print(endp_list["endpoint"]) for i in range(1, len(endp_list["endpoint"])): local_list.append(endp_list['endpoint'][i]) print(local_list) new_lst = [] for i in range(len(local_list)): local_list1 = list(local_list[i].keys()) new_lst.append(local_list1[0]) print(local_list1) print(new_lst) for i in range(len(new_lst)): final_list.append( endp_list['endpoint'][i + 1][new_lst[i]][ 'rx rate (last)']) print(final_list) if 0 in final_list: print("try to start multicast few times") print("start multicast once again") self.mcast_start() time.sleep(60) self.mcast_start() print("check for multicast resumed or not ") endp_list = self.json_get( "endp?fields=name,eid,rx rate (last)", debug_=False) print("endpoint", endp_list) local_list, local_list1, final_list = [], [], [] if "endpoint" in endp_list: print(endp_list["endpoint"]) for i in range(1, len(endp_list["endpoint"])): local_list.append(endp_list['endpoint'][i]) print(local_list) new_lst = [] for i in range(len(local_list)): local_list1 = list(local_list[i].keys()) new_lst.append(local_list1[0]) print(local_list1) print(new_lst) for i in range(len(new_lst)): final_list.append( endp_list['endpoint'][i + 1][new_lst[i]][ 'rx rate (last)']) print(final_list) if 0 in final_list: print("multicast did not resumed after few trials") pass_fail_list.append("FAIL") remark.append( "bssid switched but multicast did not resumed after few trials") else: pass_fail_list.append("PASS") remark.append( "bssid switched and multicast resumed after few trials ") else: pass_fail_list.append("PASS") remark.append("multicast resumed after roam") else: print("roaming failed") pass_fail_list.append("FAIL") remark.append("bssid does not switched") else: if res == "PASS": if self.sta_type == "normal": query_reasso_response = self.get_wlan_mgt_status( file_name=file_name, pyshark_filter="wlan.da eq %s and wlan.fc.type_subtype eq 3" % ( str(i))) else: query_reasso_response = self.get_wlan_mgt_status( file_name=file_name, pyshark_filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print(query_reasso_response) logging.info(str(query_reasso_response)) if len(query_reasso_response) != 0 and query_reasso_response != "empty": if query_reasso_response == "Successful": print("Re-association status is successful") logging.info("Re-association status is successful") if self.sta_type == "normal": reasso_t = self.pcap_obj.read_time( pcap_file=str(file_name), filter="wlan.da eq %s and wlan.fc.type_subtype eq 3" % ( str(i))) else: reasso_t = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print("Re-association time is", reasso_t) logging.info("Re-association time is " + str(reasso_t)) if self.option == "otds": print("Checking for Action Frame") logging.info("Checking for Action Frame") # Action frame check query_action_frame = self.pcap_obj.check_frame_present( pcap_file=str(file_name), filter="(wlan.fixed.category_code == 6) && (wlan.sa == %s)" % ( str(i))) print("Action Frame", query_action_frame) if len(query_action_frame) != 0 and query_action_frame != "empty": print("Action frame is present") logging.info("Action frame is present") query_action_frame_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.category_code == 6) && (wlan.sa == %s)" % ( str(i))) print("Action frame time is", query_action_frame_time) logging.info( "Action frame time is " + str(reasso_t)) else: roam_time1.append("No Action frame") pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("No Action Frame") print("Row list :", row_list) logging.info("Row list " + str(row_list)) else: print("Checking for Authentication Frame") logging.info("Checking for Authentication Frame") if self.sta_type == "normal": query_auth_response = self.pcap_obj.get_wlan_mgt_status_code( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 0 && wlan.sa == %s)" % ( str(i))) else: query_auth_response = self.pcap_obj.get_wlan_mgt_status_code( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( str(i))) print("Authentication Frames response is", query_auth_response) if len(query_auth_response) != 0 and query_auth_response != "empty": if query_auth_response == "Successful": print("Authentication Request Frame is present") logging.info( "Authentication Request Frame is present") if self.sta_type == "normal": auth_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 0 && wlan.sa == %s)" % ( str(i))) else: auth_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( str(i))) print("Authentication Request Frame time is", auth_time) logging.info( "Authentication Request Frame time is" + str( auth_time)) else: roam_time1.append('Auth Fail') pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append(" auth failure") else: roam_time1.append("No Auth frame") pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("No Auth frame") print("Row list :", row_list) logging.info("row list " + str(row_list)) # roam_time = None if self.option == "otds": roam_time = reasso_t - query_action_frame_time else: roam_time = reasso_t - auth_time print("Roam Time (ms)", roam_time) logging.info("Roam Time (ms)" + str(roam_time)) roam_time1.append(roam_time) if self.option == "ota": if roam_time < 100: pass_fail_list.append("PASS") pcap_file_list.append(str(file_name)) remark.append("Passed all criteria") else: pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("Roam time is greater then 100 ms") else: pass_fail_list.append("PASS") pcap_file_list.append(str(file_name)) remark.append("Passed all criteria") else: roam_time1.append('Reassociation Fail') pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("Reassociation failure") print( "pcap_file name for fail instance of iteration value ") logging.info( "pcap_file name for fail instance of iteration value ") else: roam_time1.append("No Reassociation") pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("No Reasso response") print("Row list : ", row_list) logging.info("row list " + str(row_list)) else: query_reasso_response = self.get_wlan_mgt_status( file_name=file_name, pyshark_filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print("Query_reasso_response:", query_reasso_response) logging.info(str(query_reasso_response)) if len(query_reasso_response) != 0 and query_reasso_response != 'empty': if query_reasso_response == "Successful": print("Re-Association status is successful") logging.info("Re-Association status is successful") reasso_t = self.pcap_obj.read_time(pcap_file=str(file_name), filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print("Re-Association time is", reasso_t) logging.info("Re-Association time is " + str(reasso_t)) if self.option == "otds": print("Check for Action frame") logging.info("Check for Action Frame") # action frame check query_action_frame = self.pcap_obj.check_frame_present( pcap_file=str(file_name), filter="(wlan.fixed.category_code == 6) && (wlan.sa == %s)" % ( str(i))) if len(query_action_frame) != 0 and query_action_frame != "empty": print("Action Frame is present") logging.info("Action Frame is present") query_action_frame_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.category_code == 6) && (wlan.sa == %s)" % ( str(i))) print("Action Frame time is", query_action_frame_time) logging.info( "Action Frame) time is " + str(reasso_t)) else: roam_time1.append("No Action frame") pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("bssid miNo Action Frame") print("Row list :", row_list) logging.info("Row list :" + str(row_list)) else: print("Check for Authentication Frame") logging.info("Check for Authentication Frame") query_auth_response = self.pcap_obj.get_wlan_mgt_status_code( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( str(i))) if len(query_auth_response) != 0 and query_auth_response != "empty": if query_auth_response == "Successful": print("Authentication Request is present") logging.info( "Authentication Request is present") auth_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( str(i))) print("Authentication time is", auth_time) logging.info( "Authentication time is " + str(auth_time)) else: roam_time1.append('Auth Fail') pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("bssid mismatch auth failure") else: roam_time1.append("No Auth frame") pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("bssid mismatched No Auth frame") print("Row list :", row_list) logging.info("Row list :" + str(row_list)) # roam_time = None if self.option == "otds": roam_time = reasso_t - query_action_frame_time else: roam_time = reasso_t - auth_time print("Roam time (ms)", roam_time) logging.info("Roam time (ms) " + str(roam_time)) roam_time1.append(roam_time) if self.option == "ota": if roam_time < 50: pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append( "(BSSID mismatched)Client disconnected after roaming") else: pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append( "(BSSID mismatched)Roam time is greater then 100 ms,") else: pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("BSSID mismatched") else: roam_time1.append('Reassociation Fail') pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("BSSID mismatched Reassociation failure") else: roam_time1.append("No Reassociation") pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("BSSID mismatched , No Reasso response") print("Row list :", row_list) logging.info("row list " + str(row_list)) if self.multicast == "True": print(row_list) print(pass_fail_list) print(remark) for i, x in zip(row_list, pass_fail_list): i.append(x) for i, x in zip(row_list, remark): i.append(x) print("Row list :", row_list) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: for i, x in zip(row_list, roam_time1): i.append(x) print("Row list :", row_list) logging.info("Row list : " + str(row_list)) # for i, x in zip(row_list, packet_loss_lst): # i.append(x) for i, x in zip(row_list, pass_fail_list): i.append(x) print("Row list :", row_list) logging.info("Row list : " + str(row_list)) for i, x in zip(row_list, pcap_file_list): i.append(x) print("Log file :", log_file) logging.info("Log file : " + str(log_file)) my_unnested_list = list(chain(*log_file)) print(my_unnested_list) logging.info(str(my_unnested_list)) for i, x in zip(row_list, my_unnested_list): i.append(x) print("Row list :", row_list) for i, x in zip(row_list, remark): i.append(x) print("Row list :", row_list) logging.info("row list " + str(row_list)) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: message = "all stations are not connected to same ap for iteration " + str( iterations) print("All stations are not connected to same ap") logging.info("All stations are not connected to same ap") print("Starting Sniffer") logging.info("Starting Sniffer") self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str( self.option) + "_iteration_" + str( iterations) + "_", duration=3600) print("Stop Sniffer") logging.info("Stop Sniffer") self.stop_sniffer() kernel = self.journal_ctl_logs(file=str(iterations)) for i in kernel: kernel_log.append(i) bssid_list2 = [] for sta_name in sta_list: # local_row_list = [0, "68"] local_row_list = [str(iterations)] sta = sta_name.split(".")[2] before_bssid_ = self.station_data_query(station_name=str(sta), query="ap") print(before_bssid_) logging.info(str(before_bssid_)) bssid_list2.append(before_bssid_) local_row_list.append(before_bssid_) print(local_row_list) logging.info(str(local_row_list)) row_list.append(local_row_list) print(row_list) logging.info(str(row_list)) for i, x in zip(row_list, bssid_list2): i.append(x) print("Row list :", row_list) logging.info("Row list : " + str(row_list)) if self.multicast == "True": for a in row_list: a.append("FAIL") print("Row list :", row_list) else: for i in row_list: i.append("No Roam Time") print("Row list :", row_list) logging.info("Row list : " + str(row_list)) for a in row_list: a.append("FAIL") print("Row list :", row_list) logging.info("Row list : " + str(row_list)) # pcap for i in row_list: i.append("N/A") print("Row list:", row_list) logging.info("Row list : " + str(row_list)) if self.debug: print("Stop Debugger") logging.info("Stop Debugger") self.stop_debug_(mac_list=mac_list) else: print("Debug is disabled") logging.info("Debug is disabled") trace = self.get_file_name(client=self.num_sta) log_file.append(trace) print("Log file :", log_file) logging.info("Log file : " + str(log_file)) my_unnested_list = list(chain(*log_file)) print(my_unnested_list) logging.info(str(my_unnested_list)) for i, x in zip(row_list, my_unnested_list): i.append(x) print("Row list:", row_list) logging.info("Row list : " + str(row_list)) for i in row_list: i.append("No roam performed all stations are not connected to same ap") print("Row list:", row_list) logging.info("Row list : " + str(row_list)) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: message = "station's failed to get ip after the test start" print("Station's failed to get ip after test starts") logging.info("Station's failed to get ip after test starts") if self.duration_based is True: if time.time() > timeout: break except Exception as e: # print(e) logging.warning(str(e)) pass else: message = "station's failed to get ip at the beginning" print("##### Station's failed to get associate at the beginning") logging.info("Station's failed to get associate at the beginning") else: print("Stations failed to get ip") logging.info("Stations failed to get ip") test_end = datetime.now() test_end = test_end.strftime("%b %d %H:%M:%S") print("Test Ended At ", test_end) logging.info("Test Ended At " + str(test_end)) self.end_time = test_end s1 = test_time s2 = test_end # for example fmt = '%b %d %H:%M:%S' self.test_duration = datetime.strptime(s2, fmt) - datetime.strptime(s1, fmt) return kernel_log, message # except Exception as e: # logging.warning(str(e)) # Graph generation function def generate_client_pass_fail_graph(self, csv_list=None): try: print("CSV list", csv_list) logging.info("CSV list " + str(csv_list)) x_axis_category = [] for i in range(self.num_sta): x_axis_category.append(i + 1) print(x_axis_category) logging.info(str(x_axis_category)) pass_list = [] fail_list = [] dataset = [] for i in csv_list: print("i", i) logging.info("i, " + i) lf_csv_obj = lf_csv() h = lf_csv_obj.read_csv(file_name=i, column="PASS/FAIL") count = h.count("PASS") print(count) logging.info(str(count)) count_ = h.count("FAIL") print(count_) logging.info(str(count_)) pass_list.append(count) fail_list.append(count_) dataset.append(pass_list) dataset.append(fail_list) print(dataset) logging.info(str(dataset)) # It will contain per station pass and fail number eg [[9, 7], [3, 4]] here 9, 7 are pass number for clients 3 and 4 are fail number # dataset = [[9, 7 , 4], [1, 3, 4]] graph = lf_graph.lf_bar_graph(_data_set=dataset, _xaxis_name="Total Number Of Stations = " + str(self.num_sta), _yaxis_name="Total Number of iterations = " + str(self.iteration), _xaxis_categories=x_axis_category, _label=["PASS", "FAIL"], _xticks_font=8, _graph_image_name="11r roam client per iteration graph", _color=['forestgreen', 'red', 'blueviolet'], _color_edge='black', _figsize=(13, 5), _xaxis_step=1, _graph_title="Client Performance Over %s Iterations" % (str(self.iteration)), _show_bar_value=True, _text_font=12, _text_rotation=45, _enable_csv=True, _legend_loc="upper right", _legend_fontsize=12, _legend_box=(1.12, 1.01), _remove_border=['top', 'right', 'left'], _alignment={"left": 0.011}, ) graph_png = graph.build_bar_graph() print("graph name {}".format(graph_png)) logging.info(str("graph name {}".format(graph_png))) return graph_png except Exception as e: logging.info(str(e)) print(str(e)) # Report generation function def generate_report(self, csv_list, kernel_lst, current_path=None): try: option, band_, station_, iteration__ = None, None, None, None if self.option == 'ota': option = "OTA" else: option = "OTD" if self.band == "fiveg": band_ = "5G" elif self.band == "twog": band_ = "2G" elif self.band == "sixg": band_ = "6G" if int(self.num_sta) > 1: station_ = "Multi" else: station_ = "Single" if int(self.iteration) > 1: iteration__ = "Multi" else: iteration__ = "Single" if self.soft_roam: dir_name = "Soft_Roam_Test_" + str(band_) + "_" + str(option) + "_" + str(station_) + "Client_" + str( iteration__) + "_Iteration" out_html = "soft_roam.html" pdf_name = "soft_roam_test.pdf" else: dir_name = "Hard_Roam_Test_" + str(band_) + "_" + str(option) + "_" + str(station_) + "Client_" + str( iteration__) + "_Iteration" out_html = "hard_roam.html" pdf_name = "Hard_roam_test.pdf" report = lf_report_pdf.lf_report(_path="", _results_dir_name=dir_name, _output_html=out_html, _output_pdf=pdf_name) if current_path is not None: report.current_path = os.path.dirname(os.path.abspath(current_path)) report_path = report.get_report_path() report.build_x_directory(directory_name="csv_data") report.build_x_directory(directory_name="kernel_log") for i in kernel_lst: report.move_data(directory="kernel_log", _file_name=str(i)) date = str(datetime.now()).split(",")[0].replace(" ", "-").split(".")[0] test_setup_info = { "DUT Name": self.dut_name, "SSID": self.ssid_name, "Test Duration": self.test_duration, } if self.soft_roam: report.set_title("SOFT ROAM (11r) TEST") else: if self.sta_type == "normal": report.set_title("HARD ROAM TEST") else: report.set_title("HARD ROAM (11r) TEST") report.set_date(date) report.build_banner_cover() report.set_table_title("Test Setup Information") report.build_table_title() report.test_setup_table(value="Device under test", test_setup_data=test_setup_info) report.set_obj_html("Objective", "The Roaming test is a type of performance test that is performed on wireless Access Points (APs)" " to evaluate their ability to support 802.11r (Fast BSS Transition) standard for fast and seamless" " roaming of wireless clients between APs within the same network. This standard helps minimize the" " handoff time when a client moves from one AP to another, resulting in a more stable and consistent wireless experience.
" "
" "Hard Roaming:
" "This happens when a wireless device completely disconnects from the current Access Point before " "connecting to a new one. However, with the 802.11r standard, the authentication and key negotiation" " process can be expedited, reducing the time it takes to connect to the new Access Point. This results" " in a faster and more seamless handoff between Access Points.
" "
" "Soft Roaming:
" "This happens when a wireless device maintains a connection with both the current and new Access Points" " during the transition. With 802.11r, the device can maintain its security context during the handoff," " allowing for a faster and more secure transition. Soft roaming with 11r is designed to be seamless," " allowing the device to move from one Access Point to another without any interruption in connectivity.") report.build_objective() report.set_obj_html("Client per iteration Graph", "The below graph provides information about out of total iterations how many times each client got Pass or Fail") report.build_objective() graph = self.generate_client_pass_fail_graph(csv_list=csv_list) report.set_graph_image(graph) report.set_csv_filename(graph) report.move_csv_file() report.move_graph_image() report.build_graph_without_border() if self.multicast == "True": report.set_obj_html("Pass/Fail Criteria:", "The following are the criteria for PASS the test:

" "1. The BSSID of the station should change after roaming from one AP to another.
" "2. multicast traffic should resume after the client roams.
" "
" "The following are the criteria for FAIL the test:

" "1. The BSSID of the station remains unchanged after roaming from one AP to another.
" "2. No roaming occurs, as all stations are connected to the same AP.
") else: report.set_obj_html("Pass/Fail Criteria:", "The following are the criteria for PASS the test:

" "1. The BSSID of the station should change after roaming from one AP to another.
" "2. The station should not experience any disconnections during/after the roaming process.
" "3. The duration of the roaming process should be less than 100 ms.
" "
" "The following are the criteria for FAIL the test:

" "1. The BSSID of the station remains unchanged after roaming from one AP to another.
" "2. No roaming occurs, as all stations are connected to the same AP.
" "3. The captured packet does not contain a Reassociation Response Frame.
" "4. The station experiences disconnection during/after the roaming process.
" "5. The duration of the roaming process exceeds 100 ms.
") report.build_objective() for i in csv_list: report.move_data(directory="csv_data", _file_name=str(i)) report.move_data(directory_name="pcap") for i, x in zip(range(self.num_sta), csv_list): # report.set_table_title("Client information " + str(i)) # report.build_table_title() if self.multicast == "True": report.set_obj_html("Client " + str(i + 1) + " Information", "The table below presents comprehensive information regarding Client " + str( i + 1) + ", including its BSSID before and after roaming, PASS/FAIL criteria and Remark") else: report.set_obj_html("Client " + str(i + 1) + " Information", "The table below presents comprehensive information regarding Client " + str( i + 1) + ", including its BSSID before and after roaming, the time of roaming, the name of " "the capture file, and any relevant remarks.") report.build_objective() lf_csv_obj = lf_csv() if self.multicast == "True": y = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Iterations") z = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="bssid1") u = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="bssid2") h = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="PASS/FAIL") r = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Remark") else: y = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Iterations") z = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="bssid1") u = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="bssid2") t = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Roam Time(ms)") # l = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Packet loss") h = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="PASS/FAIL") p = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Pcap file Name") lf = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Log File") r = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Remark") if self.multicast == "True": table = { "iterations": y, "Bssid before": z, "Bssid After": u, "PASS/FAIL": h, "Remark": r } else: table = { "iterations": y, "Bssid before": z, "Bssid After": u, "Roam Time(ms)": t, "PASS/FAIL": h, "pcap file name": p, "Log File": lf, "Remark": r } if self.multicast != "True": if not self.log_file: del table["Log File"] print("Tabel Data :", table) test_setup = pd.DataFrame(table) report.set_table_dataframe(test_setup) report.build_table() if self.option == 'ota': testname = 'over the air' else: testname = 'over the ds' test_input_infor = { "LANforge ip": self.lanforge_ip, "LANforge port": self.lanforge_port, "test start time": self.start_time, "test end time": self.end_time, "Bands": self.band, "Upstream": self.upstream, "Stations": self.num_sta, "iterations": self.iteration, "SSID": self.ssid_name, "Security": self.security, "Client mac": self.mac_data, 'Test': testname, "Contact": "support@candelatech.com" } report.set_table_title("Test basic Information") report.build_table_title() report.test_setup_table(value="Information", test_setup_data=test_input_infor) report.build_footer() report.write_html() report.write_pdf_with_timestamp(_page_size='A4', _orientation='Portrait') return report_path except Exception as e: print(str(e)) logging.info(str(e)) def main(): help_summary = '''\ The script is designed to support both hard and soft roaming, ensuring a smooth transition for devices between access points (APs). Additionally, the script captures packets in two scenarios: when a device is connected to an AP and when it roams from one AP to another. These captured packets help analyze the performance and stability of the roaming process. In essence, the script serves as a thorough test for assessing how well APs handle roaming and the overall network stability when clients move between different access points. The roaming test will create stations with advanced/802.1x and 11r key management, create CX traffic between upstream port and stations, run traffic and generate a report. ''' parser = argparse.ArgumentParser( prog='lf_roam_test.py', # formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawTextHelpFormatter, epilog='''\ lf_roam_test.py ''', description='''\ lf_roam_test.py : -------------------- Summary : ---------- The primary focus of this script is to enable seamless roaming of clients/stations between two access points (APs). The test can be conducted with a single or multiple stations, with single or multiple iterations. The script will create stations/clients with advanced/802.1x and 11r key management. By default, it will create a single station/client. Once the stations are created, the script will generate CX traffic between the upstream port and the stations and run the traffic before roam. Packet captures will be taken for each station/client in two scenarios: (i) While the station/client is connected to an AP (ii) While the station/client roams from one AP to another AP These packet captures will be used to analyze the performance and stability of the roaming process. Overall, this script is designed to provide a comprehensive test of the roaming functionality of the APs and the stability of the network when clients move between APs. The following are the criteria for PASS the test: 1. The BSSID of the station should change after roaming from one AP to another 2 The station should not experience any disconnections during/after the roaming process. 3. The duration of the roaming process should be less than 100 ms. The following are the criteria for FAIL the test: 1. The BSSID of the station remains unchanged after roaming from one AP to another. 2. No roaming occurs, as all stations are connected to the same AP. 3. The captured packet does not contain a Reassociation Response Frame. 4. The station experiences disconnection during/after the roaming process. 5. The duration of the roaming process exceeds 100 ms. ############################################ # Examples Commands for different scenarios ############################################ Hard Roam EXAMPLE: For a single station and a single iteration python3 lf__roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 1 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based EXAMPLE: For a single station and multiple iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 1 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 10 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based EXAMPLE: For multiple station and a single iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 10 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based EXAMPLE: For multiple station and multiple iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 10 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 10 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based EXAMPLE: For multiple station and multiple iteration with multicast traffic enable python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "10:f9:20:fd:f3:4b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 2 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "36" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --sta_type normal --multicast True Soft Roam EXAMPLE: For a single station and a single iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 1 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --soft_roam True EXAMPLE: For a single station and multiple iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 1 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 10 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --soft_roam True EXAMPLE: For multiple station and a single iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 10 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 1 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --soft_roam True EXAMPLE: For multiple station and multiple iteration python3 lf_roam_test.py --mgr 192.168.100.221 --ap1_bssid "68:7d:b4:5f:5c:3b" --ap2_bssid "14:16:9d:53:58:cb" --fiveg_radios "1.1.wiphy1" --band "fiveg" --sniff_radio "wiphy2" --num_sta 10 --ssid_name "RoamAP5g" --security "wpa2" --security_key "something" --duration None --upstream "eth2" --iteration 10 --channel "40" --option "ota" --dut_name ["AP1","AP2"] --traffic_type "lf_udp" --log_file False --debug False --iteration_based --soft_roam True NOTES: * For enterprise authentication --eap_method Add this argument to specify the EAP method example: TLS, TTLS, PEAP --pairwise_cipher [BLANK] Add this argument to specify the type of pairwise cipher DEFAULT CCMP TKIP NONE CCMP-TKIP CCMP-256 GCMP GCMP-256 CCMP/GCMP-256 --groupwise_cipher [BLANK] Add this argument to specify the type of groupwise cipher DEFAULT CCMP TKIP WEP104 WEP40 GTK_NOT_USED GCMP-256 CCMP-256 GCMP/CCMP-256 ALL --eap_identity Add this argument to specify the username of radius server --eap_password Add this argument to specify the password of radius server --pk_passwd Add this argument to specify the private key password Required only for TLS --ca_cert Add this argument to specify the certificate path Required only for TLS example: /home/lanforge/ca.pem --private_key Add this argument to specify the private key path Required only for TLS example: /home/lanforge/client.p12 =============================================================================== ''') required = parser.add_argument_group('Required arguments') required.add_argument('--mgr', help='lanforge ip', default="192.168.100.221") required.add_argument('--lanforge_port', help='lanforge port', type=int, default=8080) required.add_argument('--lanforge_ssh_port', help='lanforge ssh port', type=int, default=22) required.add_argument('--ap1_bssid', type=str, help='AP1 bssid', default="68:7d:b4:5f:5c:3b") required.add_argument('--ap2_bssid', type=str, help='AP2 bssid', default="14:16:9d:53:58:cb") required.add_argument('--twog_radios', help='Twog radio', default=None) required.add_argument('--fiveg_radios', help='Fiveg radio', default="1.1.wiphy1") required.add_argument('--sixg_radios', help='Sixg radio', default=None) required.add_argument('--band', help='eg. --band "twog" or sixg', default="fiveg") required.add_argument('--sniff_radio', help='eg. --sniff_radio "wiphy2', default="wiphy2") required.add_argument('--num_sta', help='eg. --num_sta 1', type=int, default=1) required.add_argument('--ssid_name', help='eg. --ssid_name "ssid_5g"', default="RoamAP5g") required.add_argument('--security', help='eg. --security "wpa2"', default="wpa2") required.add_argument('--security_key', help='eg. --security_key "something"', default="something") required.add_argument('--upstream', help='eg. --upstream "eth2"', default="eth2") required.add_argument('--duration', help='duration', default=None) required.add_argument('--iteration', help='Number of iterations', type=int, default=1) required.add_argument('--channel', help='Channel', type=str, default="40") required.add_argument('--option', help='eg. --option "ota', default="ota") required.add_argument('--iteration_based', help='Iteration based', default=False, action='store_true') required.add_argument('--duration_based', help='Duration based', default=False, action='store_true') required.add_argument('--dut_name', help='', default=["AP1", "AP2"]) # ["AP687D.B45C.1D1C", "AP2C57.4152.385C"] required.add_argument('--traffic_type', help='To chose the traffic type', default="lf_udp") # eap authentication required.add_argument('--eap_method', help='Enter EAP method e.g: TLS', default=None) required.add_argument('--eap_identity', help='Radius server identity', default='[BLANK]') required.add_argument('--eap_password', help='Radius Server password', default='[BLANK]') required.add_argument('--pairwise_cipher', help='Pairwise Ciphers\n' 'DEFAULT\n' 'CCMP\n' 'TKIP\n' 'NONE\n' 'CCMP-TKIP\n' 'CCMP-256\n' 'GCMP\n' 'GCMP-256\n' 'CCMP/GCMP-256', default='[BLANK]') required.add_argument('--groupwise_cipher', type=str, help='Groupwise Ciphers\n' 'DEFAULT\n' 'CCMP\n' 'TKIP\n' 'WEP104\n' 'WEP40\n' 'GTK_NOT_USED\n' 'GCMP-256\n' 'CCMP-256\n' 'GCMP/CCMP-256\n' 'ALL', default='[BLANK]') required.add_argument('--private_key', help='Enter private key path e.g: /home/lanforge/client.p12', default='[BLANK]') required.add_argument('--pk_passwd', help='Enter the private key password', default='[BLANK]') required.add_argument('--ca_cert', help='Enter path for certificate e.g: /home/lanforge/ca.pem', default='[BLANK]') required.add_argument("--eap_phase1", help="EAP Phase 1 (outer authentication, i.e. TLS tunnel) parameters.\n" "For example, \"peapver=0\" or \"peapver=1 peaplabel=1\".\n" "Some WPA Enterprise setups may require \"auth=MSCHAPV2\"", default="[BLANK]") required.add_argument("--eap_phase2", help="EAP Phase 2 (inner authentication) parameters.\n" "For example, \"autheap=MSCHAPV2 autheap=MD5\" for EAP-TTLS.", default="[BLANK]") required.add_argument('--log_file', help='To get the log file, need to pass the True', default=False) required.add_argument('--debug', help='To enable/disable debugger, need to pass the True/False', default=False) required.add_argument('--soft_roam', help='To enable soft rome eg. --soft_rome True', default=False) required.add_argument('--sta_type', type=str, help="provide the type of client you want to create " "i.e 11r, 11r-sae, 11r-eap, 11r-eap-sha384, normal", default="11r") required.add_argument('--ieee80211w', help='--ieee80211w