#!/usr/bin/env python3 """ Note : lf_hard_roam_test works on forced roaming specific to 11r - This script will give results of 11r forced roam test pdf along with all the packet captures genersted after the roam - still under progress do not overwrite """ import sys import os import importlib import logging import time import datetime from datetime import datetime import pandas as pd import paramiko from itertools import chain logger = logging.getLogger(__name__) if sys.version_info[0] != 3: logger.critical("This script requires Python 3") exit(1) sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base") LFCliBase = lfcli_base.LFCliBase LFUtils = importlib.import_module("py-json.LANforge.LFUtils") realm = importlib.import_module("py-json.realm") Realm = realm.Realm lf_logger_config = importlib.import_module("py-scripts.lf_logger_config") cv_test_reports = importlib.import_module("py-json.cv_test_reports") lf_report = cv_test_reports.lanforge_reports lf_report_pdf = importlib.import_module("py-scripts.lf_report") lf_csv = importlib.import_module("py-scripts.lf_csv") lf_pcap = importlib.import_module("py-scripts.lf_pcap") lf_graph = importlib.import_module("py-scripts.lf_graph") sniff_radio = importlib.import_module("py-scripts.lf_sniff_radio") sta_connect = importlib.import_module("py-scripts.sta_connect2") lf_clean = importlib.import_module("py-scripts.lf_cleanup") series = importlib.import_module("cc_module_9800_3504") attenuator = importlib.import_module("py-scripts.attenuator_serial") modify = importlib.import_module("py-scripts.lf_atten_mod_test") class HardRoam(Realm): def __init__(self, lanforge_ip=None, lanforge_port=None, lanforge_ssh_port=None, c1_bssid=None, c2_bssid=None, fiveg_radio=None, twog_radio=None, sixg_radio=None, band=None, sniff_radio=None, num_sta=None, security=None, security_key=None, ssid=None, upstream=None, duration=None, iteration=None, channel=None, option=None, duration_based=None, iteration_based=None, dut_name=[], traffic_type="lf_udp", roaming_delay=None, path="../", scheme="ssh", dest="localhost", user="admin", passwd="Cisco123", prompt="WLC2", series_cc="9800", ap="AP687D.B45C.1D1C", port="8888", band_cc="5g", timeout="10", identity=None, ttls_pass=None, debug=False, soft_roam=False ): super().__init__(lanforge_ip, lanforge_port) self.lanforge_ip = lanforge_ip self.lanforge_port = lanforge_port self.lanforge_ssh_port = lanforge_ssh_port self.c1_bssid = c1_bssid self.c2_bssid = c2_bssid self.fiveg_radios = fiveg_radio self.twog_radios = twog_radio self.sixg_radios = sixg_radio self.band = band self.sniff_radio = sniff_radio self.num_sta = num_sta self.ssid_name = ssid self.security = security self.security_key = security_key self.upstream = upstream self.duration = duration self.iteration = iteration self.channel = channel self.option = option self.iteration_based = iteration_based self.duration_based = duration_based self.local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) self.staConnect = sta_connect.StaConnect2(self.lanforge_ip, self.lanforge_port) self.final_bssid = [] self.pcap_obj_2 = None self.pcap_name = None self.test_duration = None self.client_list = [] self.dut_name = dut_name self.pcap_obj = lf_pcap.LfPcap() self.lf_csv_obj = lf_csv.lf_csv() self.traffic_type = traffic_type self.roam_delay = roaming_delay self.cx_profile = self.local_realm.new_l3_cx_profile() self.cc = None self.cc = series.create_controller_series_object( scheme=scheme, dest=dest, user=user, passwd=passwd, prompt=prompt, series=series_cc, ap=ap, port=port, band=band_cc, timeout=timeout) self.cc.pwd = path # self.cc.pwd = "../lanforge/lanforge-scripts" self.start_time = None self.end_time = None self.identity = identity self.ttls_pass = ttls_pass self.debug = debug self.mac_data = None self.soft_roam = soft_roam # start debugger of controller def start_debug_(self, mac_list): mac = mac_list for i in mac: y = self.cc.debug_wireless_mac_cc(mac=str(i)) print(y) # stop debugger of controller def stop_debug_(self, mac_list): mac = mac_list for i in mac: y = self.cc.no_debug_wireless_mac_cc(mac=str(i)) print(y) # get trace file names from controller def get_ra_trace_file(self): ra = self.cc.get_ra_trace_files__cc() print(ra) ele_list = [y for y in (x.strip() for x in ra.splitlines()) if y] print(ele_list) return ele_list # get trace file names from controller with respect to number of clients def get_file_name(self, client): file_name = [] if not self.debug: for i in range(client): file_name.append("debug disabled") else: file = self.get_ra_trace_file() indices = [i for i, s in enumerate(file) if 'dir bootflash: | i ra_trace' in s] # print(indices) y = indices[-1] if client == 1: z = file[y + 1] list_ = [] list_.append(z) m = list_[0].split(" ") print(m) print(len(m)) print(m[-1]) if m[-1].isnumeric(): print("log file not available") file_name.append("file not found") file_name.append(m[-1]) else: z = file[y + (int(0) + 1)] list_ = [] list_.append(z) m = list_[0].split(" ") print(m) print(len(m)) print(m[-1]) if m[-1].isnumeric(): print("log file not available") for i in range(client): file_name.append("file not found") else: for i in range(client): z = file[y + (int(i) + 1)] list_ = [] list_.append(z) m = list_[0].split(" ") print(m) print(len(m)) print(m[-1]) if m[-1].isnumeric(): print("log file not available") file_name.append("file not found") file_name.append(m[-1]) print("file_name", file_name) file_name.reverse() return file_name # delete trace file from controller def delete_trace_file(self, file): # file = self.get_file_name() self.cc.del_ra_trace_file_cc(file=file) # get station list from lf def get_station_list(self): sta = self.staConnect.station_list() if sta == "no response": return "no response" sta_list = [] for i in sta: for j in i: sta_list.append(j) return sta_list # create n number of clients of advanced configuration on lf def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=None, dut_security=None, dut_passwd=None, radio=None, type=None): local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) station_profile = local_realm.new_station_profile() if self.band == "fiveg": radio = self.fiveg_radios if self.band == "twog": radio = self.twog_radios if self.band == "sixg": radio = self.sixg_radios # pre clean sta_list = self.get_station_list() print(sta_list) if not sta_list: print("no stations on lanforge") else: station_profile.cleanup(sta_list, delay=1) LFUtils.wait_until_ports_disappear(base_url=local_realm.lfclient_url, port_list=sta_list, debug=True) # time.sleep(2) print("pre cleanup done") station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, end_id_=num_sta - 1, padding_number_=10000, radio=radio) if type == "11r-sae-802.1x": dut_passwd = "[BLANK]" station_profile.use_security(dut_security, dut_ssid, dut_passwd) station_profile.set_number_template("00") station_profile.set_command_flag("add_sta", "create_admin_down", 1) station_profile.set_command_param("set_port", "report_timer", 1500) # connect station to particular bssid # self.station_profile.set_command_param("add_sta", "ap", self.bssid[0]) station_profile.set_command_flag("set_port", "rpt_timer", 1) if type == "11r": station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) station_profile.set_command_flag("add_sta", "disable_roam", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-PSK ", pairwise="", group="", psk="", eap="", identity="", passwd="", pin="" ) if type == "11r-sae": station_profile.set_command_flag("add_sta", "ieee80211w", 2) station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) station_profile.set_command_flag("add_sta", "disable_roam", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-SAE ", pairwise="", group="", psk="", eap="", identity="", passwd="", pin="" ) if type == "11r-sae-802.1x": station_profile.set_command_flag("set_port", "rpt_timer", 1) station_profile.set_command_flag("add_sta", "ieee80211w", 2) station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) station_profile.set_command_flag("add_sta", "disable_roam", 1) station_profile.set_command_flag("add_sta", "power_save_enable", 1) # station_profile.set_command_flag("add_sta", "ap", "68:7d:b4:5f:5c:3f") station_profile.set_wifi_extra(key_mgmt="FT-EAP ", pairwise="[BLANK]", group="[BLANK]", psk="[BLANK]", eap="TTLS", identity=self.identity, passwd=self.ttls_pass, pin="" ) print("Creating stations.") station_profile.create(radio=radio, sta_names_=station_list) print("Waiting for ports to appear") local_realm.wait_until_ports_appear(sta_list=station_list) for sta_name in station_list: sta = sta_name.split(".")[2] # TODO: Use name_to_eid # wpa_cmd = "roam " + str(checker2) bgscan = { "shelf": 1, "resource": 1, # TODO: Do not hard-code resource, get it from radio eid I think. "port": str(sta), "type": 'NA', "text": 'bgscan="simple:30:-65:300"' } print(bgscan) self.local_realm.json_post("/cli-json/set_wifi_custom", bgscan) # time.sleep(2) station_profile.admin_up() print("Waiting for ports to admin up") if local_realm.wait_for_ip(station_list): print("All stations got IPs") return True else: print("Stations failed to get IPs") return False # create layer3 traffic on clients def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, side_a_min_pdu, side_b_min_pdu, traffic_type, sta_list): # checked print(sta_list) print(type(sta_list)) print(self.upstream) # cx_profile = self.local_realm.new_l3_cx_profile() self.cx_profile.host = self.lanforge_ip self.cx_profile.port = self.lanforge_port # layer3_cols = ['name', 'tx bytes', 'rx bytes', 'tx rate', 'rx rate'] self.cx_profile.side_a_min_bps = side_a_min_rate self.cx_profile.side_a_max_bps = side_a_max_rate self.cx_profile.side_b_min_bps = side_b_min_rate self.cx_profile.side_b_max_bps = side_b_max_rate self.cx_profile.side_a_min_pdu = side_a_min_pdu, self.cx_profile.side_b_min_pdu = side_b_min_pdu, # create print("Creating endpoints") self.cx_profile.create(endp_type=traffic_type, side_a=sta_list, side_b=self.upstream, sleep_time=0) self.cx_profile.start_cx() # get layer3 values def get_layer3_values(self, cx_name=None, query=None): url = f"/cx/{cx_name}" response = self.json_get(_req_url=url) result = response[str(cx_name)][str(query)] return result # get cross connect names def get_cx_list(self): layer3_result = self.local_realm.cx_list() layer3_names = [item["name"] for item in layer3_result.values() if "_links" in item] print(layer3_names) return layer3_names # get endpoint values def get_endp_values(self, endp="A", cx_name="niki", query="tx bytes"): # self.get_cx_list() # self.json_get("http://192.168.100.131:8080/endp/Unsetwlan000-0-B?fields=rx%20rate") url = f"/endp/{cx_name}-{endp}?fields={query}" response = self.json_get(_req_url=url) print(response) if (response is None) or ("endpoint" not in response): print("incomplete response:") exit(1) final = response["endpoint"][query] print(final) return final # precleanup on lanforge def precleanup(self): obj = lf_clean.lf_clean(host=self.lanforge_ip, port=self.lanforge_port, clean_cxs=True, clean_endp=True) obj.resource = "all" obj.cxs_clean() obj.endp_clean() # get client data from lf def station_data_query(self, station_name="wlan0", query="channel"): url = f"/port/{1}/{1}/{station_name}?fields={query}" # print("url//////", url) response = self.local_realm.json_get(_req_url=url) print("response: ", response) if (response is None) or ("interface" not in response): print("station_list: incomplete response:") # pprint(response) exit(1) y = response["interface"][query] return y # start packet capture on lf # TODO: Check if other monitor ports exist on this radio already. If so, delete those # before adding new monitor port (or just use the existing monitor port without creating # a new one. --Ben def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60): self.pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" self.pcap_obj_2 = sniff_radio.SniffRadio(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port, radio=radio, channel=radio_channel, monitor_name="monitor") self.pcap_obj_2.setup(0, 0, 0) time.sleep(5) self.pcap_obj_2.monitor.admin_up() time.sleep(5) self.pcap_obj_2.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration) # stop packet capture and get file name def stop_sniffer(self): print("in stop_sniffer") directory = None directory_name = "pcap" if directory_name: directory = os.path.join("", str(directory_name)) try: if not os.path.exists(directory): os.mkdir(directory) except Exception as x: print(x) self.pcap_obj_2.monitor.admin_down() time.sleep(2) self.pcap_obj_2.cleanup() lf_report.pull_reports(hostname=self.lanforge_ip, port=self.lanforge_ssh_port, username="lanforge", password="lanforge", report_location="/home/lanforge/" + self.pcap_name, report_dir="pcap") time.sleep(10) return self.pcap_name # generate csv files at the begining def generate_csv(self): file_name = [] for i in range(self.num_sta): file = 'test_client_' + str(i) + '.csv' lf_csv_obj = lf_csv.lf_csv(_columns=['Iterations', 'bssid1', 'bssid2', "Roam Time(ms)", "PASS/FAIL", "Pcap file Name", "Log File", "Remark"], _rows=[], _filename=file) # "Packet loss", file_name.append(file) lf_csv_obj.generate_csv() return file_name # get journal ctl logs/ kernel logs def journal_ctl_logs(self, file): jor_lst = [] name = "kernel_log" + file + ".txt" jor_lst.append(name) try: ssh = paramiko.SSHClient() command = "journalctl --since '5 minutes ago' > kernel_log" + file + ".txt" ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=self.lanforge_ip, port=self.lanforge_ssh_port, username="lanforge", password="lanforge", banner_timeout=600) stdin, stdout, stderr = ssh.exec_command(str(command)) stdout.readlines() ssh.close() kernel_log = "/home/lanforge/kernel_log" + file + ".txt" lf_report.pull_reports(hostname=self.lanforge_ip, port=self.lanforge_ssh_port, username="lanforge", password="lanforge", report_location=kernel_log, report_dir=".") except Exception as e: print(e) return jor_lst # gives wlan management status of pcap file def get_wlan_mgt_status(self, file_name, filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55)"): query_reasso_response = self.pcap_obj.get_wlan_mgt_status_code(pcap_file=str(file_name), filter=filter) print("query", query_reasso_response) return query_reasso_response # get attenuator serial number def attenuator_serial(self): obj = attenuator.AttenuatorSerial( lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port ) val = obj.show() return val def attenuator_modify(self, serno, idx, val): atten_obj = modify.CreateAttenuator(self.lanforge_ip, self.lanforge_port, serno, idx, val) atten_obj.build() # main roam fun def run(self, file_n=None): kernel_log = [] # start timer test_time = datetime.now() test_time = test_time.strftime("%b %d %H:%M:%S") print("Test started at ", test_time) self.start_time = test_time # get two bssids for roam self.final_bssid.extend([self.c1_bssid, self.c2_bssid]) print("final bssid", self.final_bssid) print("Calling precleanup.") self.precleanup() message = None, None # if soft roam is selected set attenuator to zero initiallly if self.soft_roam: print("make both attenuators set to zero attenuation at the begining") ser_no = self.attenuator_serial() print(ser_no[0], ser_no[1]) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] # set attenuation to zero in first attenuator and high in second attenuator self.attenuator_modify(ser_1, "all", 0) self.attenuator_modify(ser_2, "all", 0) # create clients with respect to bands self.start_sniffer(radio_channel=int(self.channel), radio=self.sniff_radio, test_name="roam_11r_" + str(self.option) + "start" + "_", duration=3600) if self.band == "twog": self.local_realm.reset_port(self.twog_radios) self.create_n_clients(sta_prefix="wlan1", num_sta=self.num_sta, dut_ssid=self.ssid_name, dut_security=self.security, dut_passwd=self.security_key, radio=self.twog_radios, type="11r") if self.band == "fiveg": self.local_realm.reset_port(self.fiveg_radios) self.create_n_clients(sta_prefix="wlan", num_sta=self.num_sta, dut_ssid=self.ssid_name, dut_security=self.security, dut_passwd=self.security_key, radio=self.fiveg_radios, type="11r") if self.band == "sixg": self.local_realm.reset_port(self.sixg_radios) self.create_n_clients(sta_prefix="wlan", num_sta=self.num_sta, dut_ssid=self.ssid_name, dut_security=self.security, radio=self.sixg_radios, type="11r-sae-802.1x") # check if all stations have ip sta_list = self.get_station_list() print("checking for IP, station list", sta_list) if sta_list == "no response": print("no response") else: val = self.wait_for_ip(sta_list) mac_list = [] for sta_name in sta_list: sta = sta_name.split(".")[2] # use name_to_eid # time.sleep(0.5) mac = self.station_data_query(station_name=str(sta), query="mac") mac_list.append(mac) print("mac address list of all stations", mac_list) self.mac_data = mac_list # if self.debug: # print("start debug") # self.start_debug_(mac_list=mac_list) # print("check for 30 min") # time.sleep(1800) # print("stop sniff") # file_name_ = self.stop_sniffer() # file_name = "./pcap/" + str(file_name_) # print("pcap file name", file_name) # if self.debug: # print("stop debugger") # self.stop_debug_(mac_list=mac_list) # # time.sleep(40) # exit() # if all stations got ip check mac address if val: print("all stations got ip") print("check if all stations are connected one ap") # get bssids of all stations print("get bssids of all stations") check = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") print(bssid) check.append(bssid) print(check) # check if all element of bssid list has same bssid's print("check if all bssids are same or not") result = all(element == check[0] for element in check) # if yes if result: pass # self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000, # side_b_max_rate=0, # sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250, # side_b_min_pdu=1250) else: # if bssid's are not same move try to move all clients to one ap print("try to move all clients to one ap before roam ") count1 = check.count(self.c1_bssid.upper()) count2 = check.count(self.c2_bssid.upper()) checker, new_sta_list, checker2 = None, [], None if count1 > count2: print("station connected mostly to ap1") checker = self.c2_bssid.upper() checker2 = self.c1_bssid.upper() else: checker = self.c1_bssid.upper() checker2 = self.c2_bssid.upper() index_count = [i for i, x in enumerate(check) if x == checker] print(index_count) for i in index_count: new_sta_list.append(sta_list[i]) print("new_sta_list", new_sta_list) for sta_name in new_sta_list: sta = sta_name.split(".")[2] # TODO: use name-to-eid print(sta) wpa_cmd = "roam " + str(checker2) wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, # TODO: do not hard-code "port": str(sta), "wpa_cli_cmd": 'scan trigger freq 5180 5300' } wifi_cli_cmd_data = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": wpa_cmd } print(wifi_cli_cmd_data) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: LANforge sta on same radio will share scan results, so you only need to scan on one STA per # radio, and then sleep should be 5 seconds, then roam every station that needs to roam. # You do not need this sleep and scan for each STA. time.sleep(2) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) # self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000, # side_b_max_rate=0, # sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250, # side_b_min_pdu=1250) timeout, variable, iterable_var = None, None, None if self.duration_based: timeout = time.time() + 60 * float(self.duration) # iteration_dur = 50000000 iterable_var = 50000000 variable = -1 if self.iteration_based: variable = self.iteration iterable_var = self.iteration post_bssid = None # roam iteration loop starts here while variable: print("variable", variable) iter, number, ser_1, ser_2 = None, None, None, None if variable != -1: iter = iterable_var - variable variable = variable - 1 if variable == -1: # need to write duration iteration logic # iter = iterable_var - iteration_dur if self.duration is not None: if time.time() > timeout: break time.sleep(1) if self.soft_roam: if iter % 2 == 0: print("even set c1 to lowest and c2 to highest attenuation ") number = "even" # get serial nummber of attenuators from lf ser_no = self.attenuator_serial() print(ser_no[0]) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] # set attenuation to zero in first attenuator and high in second attenuator self.attenuator_modify(ser_1, "all", 950) self.attenuator_modify(ser_2, "all", 100) else: # TODO: Are you sure it is already in expected state on every loop? Maybe # set it to be sure? print("odd, c1 is already at highest and c2 is at lowest") number = "odd" try: # define ro list per iteration row_list = [] sta_list = self.get_station_list() print(sta_list) if sta_list == "no response": print("no response") pass else: station = self.wait_for_ip(sta_list) time.sleep(20) if self.debug: print("start debug") self.start_debug_(mac_list=mac_list) if station: print("all stations got ip") # get bssid's of all stations connected bssid_list = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list.append(bssid) print(bssid_list) pass_fail_list = [] pcap_file_list = [] roam_time1 = [] # packet_loss_lst = [] remark = [] log_file = [] # check if all element of bssid list has same bssid's result = all(element == bssid_list[0] for element in bssid_list) if not result: # for a iteration if client is not connected to same ap try to connect them print("giving a try to connect") print("move all clients to one ap") count3 = bssid_list.count(self.c1_bssid.upper()) count4 = bssid_list.count(self.c2_bssid.upper()) print("count3", count3) print("count4", count4) checker, new_sta_list, checker2 = None, [], None if count3 > count4: print("station connected mostly to ap1") checker = self.c2_bssid.upper() checker2 = self.c1_bssid.upper() else: checker = self.c1_bssid.upper() checker2 = self.c2_bssid.upper() index_count = [i for i, x in enumerate(bssid_list) if x == checker] print(index_count) for i in index_count: new_sta_list.append(sta_list[i]) print("new_sta_list", new_sta_list) # for i, x in zip(bssid_list, sta_list): # if i == checker: # index_count = bssid_list.index(checker) # new_sta_list.append(sta_list[index_count]) # print("new_sta_list", new_sta_list) for sta_name in new_sta_list: sta = sta_name.split(".")[2] print(sta) wpa_cmd = "roam " + str(checker2) wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": 'scan trigger freq 5180 5300' } wifi_cli_cmd_data = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": wpa_cmd } print(wifi_cli_cmd_data) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) time.sleep(2) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) # check bssid before before_bssid = [] for sta_name in sta_list: sta = sta_name.split(".")[2] before_bss = self.station_data_query(station_name=str(sta), query="ap") print(before_bss) before_bssid.append(before_bss) print("bssid 1", before_bssid) if before_bssid[0] == str(self.c1_bssid.upper()): post_bssid = self.c2_bssid.upper() else: post_bssid = self.c1_bssid.upper() print("station are connected to post bssid", post_bssid) result1 = all(element == before_bssid[0] for element in before_bssid) if result1: print("All stations connected to one ap") for i in before_bssid: local_row_list = [str(iter)] local_row_list.append(i) print(local_row_list) row_list.append(local_row_list) print(row_list) # if all bssid are equal then do check to which ap it is connected formated_bssid = before_bssid[0].lower() station_before = "" if formated_bssid == self.c1_bssid: print("station connected to chamber1 ap") station_before = formated_bssid elif formated_bssid == self.c2_bssid: print("station connected to chamber 2 ap") station_before = formated_bssid print(station_before) # after checking all conditions start roam and start snifffer print("starting snifer") self.start_sniffer(radio_channel=int(self.channel), radio=self.sniff_radio, test_name="roam_11r_" + str(self.option) + "_iteration_" + str( iter) + "_", duration=3600) if self.soft_roam: ser_num = None ser_num2 = None if number == "even": ser_num = ser_1 ser_num2 = ser_2 elif number == "odd": ser_num = ser_2 ser_num2 = ser_1 # logic to decrease c2 attenuation till 10 db using 1dbm steps status = None for atten_val2 in range(900, 100, -10): self.attenuator_modify(int(ser_num), "all", atten_val2) # TODO: You are changing in 1db increments. So, sleep for only 4 seconds # should be enough. print("wait for 30 secs") time.sleep(30) # query bssid's of all stations bssid_check = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_check.append(bssid) print(bssid_check) # check if all are equal resulta = all(element == bssid_check[0] for element in bssid_check) if resulta: station_after = bssid_check[0].lower() if station_after == station_before: status = "station did not roamed" print("station did not roamed") continue elif station_after != station_before: print("client performed roam") break if status == "station did not roamed": # set c1 to high for atten_val1 in (range(150, 950, 10)): print(atten_val1) self.attenuator_modify(int(ser_num2), "all", atten_val1) # TODO: You are changing in 1db increments. So, sleep for only 4 seconds # should be enough. # TODO: Add attenuation step to logs to make it more obvious what script is doing. print("wait for 30 secs") time.sleep(30) bssid_check2 = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_check2.append(bssid) print(bssid_check2) # check if all are equal result = all(element == bssid_check2[0] for element in bssid_check2) if result: station_after = bssid_check2[0].lower() if station_after == station_before: status = "station did not roamed" print("station did not roamed") continue else: print('station roamed') break else: if station_before == self.final_bssid[0]: print("connected stations bssid is same to bssid list first element") for sta_name in sta_list: sta = sta_name.split(".")[2] print(sta) wpa_cmd = "" if self.option == "ota": wpa_cmd = "roam " + str(self.final_bssid[1]) if self.option == "otds": wpa_cmd = "ft_ds " + str(self.final_bssid[1]) # wpa_cmd = "roam " + str(self.final_bssid[1]) wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": 'scan trigger freq 5180 5300' } wifi_cli_cmd_data = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": wpa_cmd } print(wifi_cli_cmd_data) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: See note in similar code above about only needing to scan once per radio time.sleep(2) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) time.sleep(2) else: print("connected stations bssid is same to bssid list second element") for sta_name in sta_list: sta = sta_name.split(".")[2] wifi_cmd = "" if self.option == "ota": wifi_cmd = "roam " + str(self.final_bssid[0]) if self.option == "otds": wifi_cmd = "ft_ds " + str(self.final_bssid[0]) print(sta) wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": 'scan trigger freq 5180 5300' } wifi_cli_cmd_data = { "shelf": 1, "resource": 1, "port": str(sta), "wpa_cli_cmd": wifi_cmd } print(wifi_cli_cmd_data) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: See note in similar code above about only needing to scan once per radio time.sleep(2) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) time.sleep(2) # krnel logs kernel = self.journal_ctl_logs(file=str(iter)) for i in kernel: kernel_log.append(i) # stop sniff and attach data time.sleep(30) print("stop sniff") file_name_ = self.stop_sniffer() file_name = "./pcap/" + str(file_name_) print("pcap file name", file_name) if self.debug: print("stop debugger") self.stop_debug_(mac_list=mac_list) time.sleep(40) else: print("debug is disabled") self.wait_for_ip(sta_list) bssid_list_1 = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list_1.append(bssid) print(bssid_list_1) for i, x in zip(row_list, bssid_list_1): i.append(x) print("row list", row_list) time.sleep(60) trace = self.get_file_name(client=self.num_sta) log_file.append(trace) # check if all are equal all(element == bssid_list_1[0] for element in bssid_list_1) res = "" station_before_ = before_bssid print("station before", station_before_) # for each mac address query data from pcap for i, x in zip(mac_list, range(len(station_before_))): print("mac ", i) print(x) print(bssid_list_1) station_after = bssid_list_1[x] print("station_after", station_after) if station_after == station_before_[x] or station_after == "na": print("station did not roamed") res = "FAIL" elif station_after != station_before_[x]: print("client performed roam") res = "PASS" if res == "FAIL": res = "FAIL" if res == "PASS": query_reasso_response = self.get_wlan_mgt_status(file_name=file_name, filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print(query_reasso_response) if len(query_reasso_response) != 0 and query_reasso_response != "empty": if query_reasso_response == "Successful": print("re-association status is successful") reasso_t = self.pcap_obj.read_time(pcap_file=str(file_name), filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print("reassociation time is", reasso_t) print("check for auth frame") query_auth_response = self.pcap_obj.get_wlan_mgt_status_code( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( str(i))) if query_auth_response == "Successful": print("authentication request is present") auth_time = self.pcap_obj.read_time(pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( str(i))) print("auth time is", auth_time) roam_time = reasso_t - auth_time print("roam time ms", roam_time) roam_time1.append(roam_time) if roam_time < 50: pass_fail_list.append("PASS") pcap_file_list.append(str(file_name)) remark.append("Passed all criteria") else: pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("Roam time is greater then 50 ms") else: roam_time1.append('Auth Fail') pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append(" auth failure") else: roam_time1.append('Reassociation Fail') pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("Reassociation failure") print("pcap_file name for fail instance of iteration value ") else: # for a in range(len(row_list)): roam_time1.append("No Reassociation") # for a in range(len(row_list)): pass_fail_list.append("FAIL") # for a in range(len(row_list)): pcap_file_list.append(str(file_name)) # for a in range(len(row_list)): remark.append("No Reasso response") print("row list", row_list) else: query_reasso_response = self.get_wlan_mgt_status(file_name=file_name, filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print(query_reasso_response) if len(query_reasso_response) != 0 and query_reasso_response != 'empty': if query_reasso_response == "Successful": print("re-association status is successful") reasso_t = self.pcap_obj.read_time(pcap_file=str(file_name), filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print("reassociation time is", reasso_t) print("check for auth frame") query_auth_response = self.pcap_obj.get_wlan_mgt_status_code( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( str(i))) if query_auth_response == "Successful": print("authentication request is present") auth_time = self.pcap_obj.read_time(pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( str(i))) print("auth time is", auth_time) roam_time = reasso_t - auth_time print("roam time ms", roam_time) roam_time1.append(roam_time) if roam_time < 50: pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append( "(bssid mismatched)Client disconnected after roaming") else: pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append( "(bssid mis matched)Roam time is greater then 50 ms,") else: roam_time1.append('Auth Fail') pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("bssid switched auth failure") else: roam_time1.append('Reassociation Fail') pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("bssid mismatched Reassociation failure") else: # for a in range(len(row_list)): roam_time1.append("No Reassociation") # for a in range(len(row_list)): pass_fail_list.append("FAIL") # for a in range(len(row_list)): pcap_file_list.append(str(file_name)) # for a in range(len(row_list)): remark.append("bssid mismatched , No Reasso response") print("row list", row_list) for i, x in zip(row_list, roam_time1): i.append(x) print("row list", row_list) # for i, x in zip(row_list, packet_loss_lst): # i.append(x) for i, x in zip(row_list, pass_fail_list): i.append(x) print("row list", row_list) for i, x in zip(row_list, pcap_file_list): i.append(x) print("log file", log_file) my_unnested_list = list(chain(*log_file)) print(my_unnested_list) for i, x in zip(row_list, my_unnested_list): i.append(x) print("row list", row_list) for i, x in zip(row_list, remark): i.append(x) print("row list", row_list) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: message = "all stations are not connected to same ap for iteration " + str(iter) print("all stations are not connected to same ap") print("starting snifer") self.start_sniffer(radio_channel=int(self.channel), radio=self.sniff_radio, test_name="roam_11r_" + str(self.option) + "_iteration_" + str( iter) + "_", duration=3600) print("stop sniff") self.stop_sniffer() kernel = self.journal_ctl_logs(file=str(iter)) for i in kernel: kernel_log.append(i) bssid_list2 = [] for sta_name in sta_list: # local_row_list = [0, "68"] local_row_list = [str(iter)] sta = sta_name.split(".")[2] before_bssid_ = self.station_data_query(station_name=str(sta), query="ap") print(before_bssid_) bssid_list2.append(before_bssid_) local_row_list.append(before_bssid_) print(local_row_list) row_list.append(local_row_list) print(row_list) for i, x in zip(row_list, bssid_list2): i.append(x) print("row list", row_list) for i in row_list: i.append("No Roam Time") print("row list", row_list) for a in row_list: a.append("FAIL") print("row list", row_list) # pcap for i in row_list: i.append("N/A") print("row list", row_list) if self.debug: print("stop debugger") self.stop_debug_(mac_list=mac_list) else: print("debug is disabled") time.sleep(60) trace = self.get_file_name(client=self.num_sta) log_file.append(trace) print("log file", log_file) my_unnested_list = list(chain(*log_file)) print(my_unnested_list) for i, x in zip(row_list, my_unnested_list): i.append(x) print("row list", row_list) for i in row_list: i.append("no roam performed all stations are not connected to same ap") print("row list", row_list) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: message = "station's failed to get ip after the test start" print("station's failed to get ip after test starts") if self.duration_based: if time.time() > timeout: break except Exception as e: # print(e) pass else: message = "station's failed to get ip at the begining" print("station's failed to get associate at the begining") else: print("stations failed to get ip") test_end = datetime.now() test_end = test_end.strftime("%b %d %H:%M:%S") print("Test ended at ", test_end) self.end_time = test_end s1 = test_time s2 = test_end # for example FMT = '%b %d %H:%M:%S' self.test_duration = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) return kernel_log, message # graph function def generate_client_pass_fail_graph(self, csv_list=None): print("csv_list", csv_list) x_axis_category = [] for i in range(self.num_sta): x_axis_category.append(i + 1) print(x_axis_category) pass_list = [] fail_list = [] dataset = [] for i in csv_list: print("i", i) lf_csv_obj = lf_csv.lf_csv() h = lf_csv_obj.read_csv(file_name=i, column="PASS/FAIL") count = h.count("PASS") print(count) count_ = h.count("FAIL") print(count_) pass_list.append(count) fail_list.append(count_) dataset.append(pass_list) dataset.append(fail_list) print(dataset) # it will contain per station station pass and fail number eg [[9, 7], [3, 4]] here 9, 7 are pass number for clients 3 and 4 are fail number # dataset = [[9, 7 , 4], [3, 4,9]] graph = lf_graph.lf_bar_graph(_data_set=dataset, _xaxis_name="Stations = " + str(self.num_sta), _yaxis_name="Total iterations = " + str(self.iteration), _xaxis_categories=x_axis_category, _label=["Pass", "Fail"], _xticks_font=8, _graph_image_name="11r roam client per iteration graph", _color=['forestgreen', 'darkorange', 'blueviolet'], _color_edge='black', _figsize=(12, 4), _grp_title="client per iteration graph", _xaxis_step=1, _show_bar_value=True, _text_font=6, _text_rotation=60, _legend_loc="upper right", _legend_box=(1, 1.15), _enable_csv=True ) graph_png = graph.build_bar_graph() print("graph name {}".format(graph_png)) return graph_png # report generation function def generate_report(self, csv_list, kernel_lst, current_path=None): report = lf_report_pdf.lf_report(_path="", _results_dir_name="Hard Roam Test", _output_html="hard_roam.html", _output_pdf="Hard_roam_test.pdf") if current_path is not None: report.current_path = os.path.dirname(os.path.abspath(current_path)) report_path = report.get_report_path() report.build_x_directory(directory_name="csv_data") report.build_x_directory(directory_name="kernel_log") for i in kernel_lst: report.move_data(directory="kernel_log", _file_name=str(i)) date = str(datetime.now()).split(",")[0].replace(" ", "-").split(".")[0] test_setup_info = { "DUT Name": self.dut_name, "SSID": self.ssid_name, "Test Duration": self.test_duration, } report.set_title("HARD ROAM (11r) TEST") report.set_date(date) report.build_banner() report.set_table_title("Test Setup Information") report.build_table_title() report.test_setup_table(value="Device under test", test_setup_data=test_setup_info) report.set_obj_html("Objective", "The Hard Roam (11r) Test is designed to test the performance of the " "Access Point. The goal is to check whether the 11r configuration of AP for all the " + str(self.num_sta) + " clients are working as expected or not") report.build_objective() report.set_obj_html("Client per iteration Graph", "The below graph provides information about out of total iterations how many times each client got Pass or Fail") report.build_objective() graph = self.generate_client_pass_fail_graph(csv_list=csv_list) report.set_graph_image(graph) report.set_csv_filename(graph) report.move_csv_file() report.move_graph_image() report.build_graph() for i in csv_list: report.move_data(directory="csv_data", _file_name=str(i)) report.move_data(directory_name="pcap") for i, x in zip(range(self.num_sta), csv_list): # report.set_table_title("Client information " + str(i)) # report.build_table_title() report.set_obj_html("Client " + str(i + 1) + " Information", "This Table gives detailed information " "of client " + str( i + 1) + " like the bssid it was before roam," + " bssid it was after roam, " + "roam time, capture file name and ra_trace file name along with remarks ") report.build_objective() lf_csv_obj = lf_csv.lf_csv() y = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Iterations") z = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="bssid1") u = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="bssid2") t = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Roam Time(ms)") # l = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Packet loss") h = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="PASS/FAIL") p = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Pcap file Name") lf = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Log File") r = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Remark") table = { "iterations": y, "Bssid before": z, "Bssid After": u, "Roam Time(ms)": t, "PASS/FAIL": h, "pcap file name": p, "Log File": lf, "Remark": r } test_setup = pd.DataFrame(table) report.set_table_dataframe(test_setup) report.build_table() test_input_infor = { "LANforge ip": self.lanforge_ip, "LANforge port": self.lanforge_port, "test start time": self.start_time, "test end time": self.end_time, "Bands": self.band, "Upstream": self.upstream, "Stations": self.num_sta, "iterations": self.iteration, "SSID": self.ssid_name, "Security": self.security, "Client mac": self.mac_data, "Contact": "support@candelatech.com" } report.set_table_title("Test basic Information") report.build_table_title() report.test_setup_table(value="Information", test_setup_data=test_input_infor) report.build_footer() report.write_html() report.write_pdf_with_timestamp(_page_size='A4', _orientation='Landscape') return report_path def main(): obj = HardRoam(lanforge_ip="192.168.100.131", lanforge_port=8080, lanforge_ssh_port=22, c1_bssid="68:7d:b4:5f:5c:3a", c2_bssid="14:16:9d:53:58:ca", fiveg_radio="1.1.wiphy1", twog_radio=None, sixg_radio=None, band="fiveg", sniff_radio="wiphy2", num_sta=1, security="wpa2", security_key="something", ssid="ssid_5g", upstream="eth2", duration=None, iteration=1, channel=40, option="ota", duration_based=False, iteration_based=True, dut_name=["AP687D.B45C.1D1C", "AP2C57.4152.385C"], traffic_type="lf_udp", scheme="ssh", dest="localhost", user="admin", passwd="Cisco123", prompt="WLC2", series_cc="9800", ap="AP687D.B45C.1D1C", port="8888", band_cc="5g", timeout="10", identity="testuser", ttls_pass="testpasswd", soft_roam=True ) x = os.getcwd() print(x) file = obj.generate_csv() kernel, message = obj.run(file_n=file) report_dir_name = obj.generate_report(csv_list=file, kernel_lst=kernel, current_path=str(x) + "/tests") print(report_dir_name) # file = obj.generate_csv() # obj.run(file_n=file) # obj.generate_report(csv_list=file) # obj.generate_client_pass_fail_graph() # obj.controller_class() # obj.stop_debug_() # obj.get_file_name() # obj.delete_trace_file() # lst = obj.journal_ctl_logs(file="nik") # file = [] # obj.generate_report(csv_list=file, kernel_lst=lst) # obj.get_file_name(client=1) if __name__ == '__main__': main()