diff --git a/py-scripts/lf_hard_roam_test.py b/py-scripts/lf_hard_roam_test.py index 8642c02c..2b6ba262 100644 --- a/py-scripts/lf_hard_roam_test.py +++ b/py-scripts/lf_hard_roam_test.py @@ -1,5 +1,7 @@ """ -Note : please do not overwrite script under progress and is used for cisco +Note : lf_hard_roam_test works on forced roaming specific to 11r + - This script will give results of 11r forced roam test pdf along with all the packet captures genersted after the roam + - still under progress do not overwrite """ import sys @@ -76,7 +78,8 @@ class HardRoam(Realm): band_cc="5g", timeout="10", identity=None, - ttls_pass=None + ttls_pass=None, + debug=False ): super().__init__(lanforge_ip, lanforge_port) @@ -132,19 +135,24 @@ class HardRoam(Realm): self.end_time = None self.identity = identity self.ttls_pass = ttls_pass + self.debug = debug + self.mac_data = None + # start debugger of controller def start_debug_(self, mac_list): mac = mac_list for i in mac: y = self.cc.debug_wireless_mac_cc(mac=str(i)) print(y) + # stop debugger of controller def stop_debug_(self, mac_list): mac = mac_list for i in mac: y = self.cc.no_debug_wireless_mac_cc(mac=str(i)) print(y) + # get trace file names from controller def get_ra_trace_file(self): ra = self.cc.get_ra_trace_files__cc() print(ra) @@ -152,39 +160,66 @@ class HardRoam(Realm): print(ele_list) return ele_list + # get trace file names from controller with respect to number of clients def get_file_name(self, client): - file = self.get_ra_trace_file() - indices = [i for i, s in enumerate(file) if 'dir bootflash: | i ra_trace' in s] - # print(indices) - y = indices[-1] file_name = [] - if client == 1: - z = file[y + 1] - list_ = [] - list_.append(z) - m = list_[0].split(" ") - print(m) - print(len(m)) - print(m[-1]) - file_name.append(m[-1]) - else: + if not self.debug: for i in range(client): - z = file[y + (int(i)+1)] + file_name.append("debug disabled") + else: + file = self.get_ra_trace_file() + indices = [i for i, s in enumerate(file) if 'dir bootflash: | i ra_trace' in s] + # print(indices) + y = indices[-1] + + if client == 1: + z = file[y + 1] list_ = [] list_.append(z) m = list_[0].split(" ") print(m) print(len(m)) print(m[-1]) + if m[-1].isnumeric(): + print("log file not available") + file_name.append("file not found") file_name.append(m[-1]) - print("file_name", file_name) - file_name.reverse() + else: + z = file[y + (int(0) + 1)] + list_ = [] + list_.append(z) + m = list_[0].split(" ") + print(m) + print(len(m)) + print(m[-1]) + if m[-1].isnumeric(): + print("log file not available") + for i in range(client): + file_name.append("file not found") + else: + for i in range(client): + z = file[y + (int(i)+1)] + list_ = [] + list_.append(z) + m = list_[0].split(" ") + print(m) + print(len(m)) + print(m[-1]) + if m[-1].isnumeric(): + print("log file not available") + file_name.append("file not found") + file_name.append(m[-1]) + print("file_name", file_name) + file_name.reverse() + return file_name + # delete trace file from controller def delete_trace_file(self, file): # file = self.get_file_name() self.cc.del_ra_trace_file_cc(file=file) + # get station list from lf def get_station_list(self): sta = self.staConnect.station_list() if sta == "no response": @@ -195,6 +230,7 @@ class HardRoam(Realm): sta_list.append(j) return sta_list + # create n number of clients of advanced configuration on lf def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=None, dut_security=None, dut_passwd=None, radio=None, type=None): @@ -294,6 +330,7 @@ class HardRoam(Realm): print("Stations failed to get IPs") return False + # create layer3 traffic on clients def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, side_a_min_pdu, side_b_min_pdu, traffic_type, sta_list): @@ -317,18 +354,21 @@ class HardRoam(Realm): side_b=self.upstream, sleep_time=0) self.cx_profile.start_cx() + # get layer3 values def get_layer3_values(self, cx_name=None, query=None): url = f"/cx/{cx_name}" response = self.json_get(_req_url=url) result = response[str(cx_name)][str(query)] return result + # get cross connect names def get_cx_list(self): layer3_result = self.local_realm.cx_list() layer3_names = [item["name"] for item in layer3_result.values() if "_links" in item] print(layer3_names) return layer3_names + # get endpoint values def get_endp_values(self, endp="A", cx_name="niki", query="tx bytes"): # self.get_cx_list() # self.json_get("http://192.168.100.131:8080/endp/Unsetwlan000-0-B?fields=rx%20rate") @@ -342,6 +382,7 @@ class HardRoam(Realm): print(final) return final + # precleanup on lanforge def precleanup(self): obj = lf_clean.lf_clean(host=self.lanforge_ip, port=self.lanforge_port, @@ -351,6 +392,7 @@ class HardRoam(Realm): obj.cxs_clean() obj.endp_clean() + # get client data from lf def station_data_query(self, station_name="wlan0", query="channel"): url = f"/port/{1}/{1}/{station_name}?fields={query}" # print("url//////", url) @@ -363,6 +405,7 @@ class HardRoam(Realm): y = response["interface"][query] return y + # start packet capture on lf def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60): self.pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap" self.pcap_obj_2 = sniff_radio.SniffRadio(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port, @@ -373,6 +416,7 @@ class HardRoam(Realm): time.sleep(5) self.pcap_obj_2.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration) + # stop packet capture and get file name def stop_sniffer(self): directory = None directory_name = "pcap" @@ -396,6 +440,7 @@ class HardRoam(Realm): return self.pcap_name + # generate csv files at the begining def generate_csv(self): file_name = [] for i in range(self.num_sta): @@ -407,6 +452,7 @@ class HardRoam(Realm): lf_csv_obj.generate_csv() return file_name + # get journal ctl logs/ kernel logs def journal_ctl_logs(self, file): jor_lst = [] name = "kernel_log" + file + ".txt" @@ -426,23 +472,28 @@ class HardRoam(Realm): print(e) return jor_lst + # gives wlan management status of pcap file def get_wlan_mgt_status(self, file_name, filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55)"): query_reasso_response = self.pcap_obj.get_wlan_mgt_status_code(pcap_file=str(file_name), filter=filter) print("query", query_reasso_response) return query_reasso_response + # main roam fun def run(self, file_n=None): kernel_log = [] + # start timer test_time = datetime.now() test_time = test_time.strftime("%b %d %H:%M:%S") print("Test started at ", test_time) self.start_time = test_time + # get two bssids for roam self.final_bssid.extend([self.c1_bssid, self.c2_bssid]) print("final bssid", self.final_bssid) self.precleanup() - message = None, None + + # create clients with respect to bands if self.band == "twog": self.local_realm.reset_port(self.twog_radios) self.create_n_clients(sta_prefix="wlan1", num_sta=self.num_sta, dut_ssid=self.ssid_name, @@ -462,7 +513,7 @@ class HardRoam(Realm): # check if all stations have ip sta_list = self.get_station_list() - print(sta_list) + print("station list", sta_list) if sta_list == "no response": print("no response") else: @@ -470,14 +521,19 @@ class HardRoam(Realm): mac_list = [] for sta_name in sta_list: sta = sta_name.split(".")[2] - time.sleep(5) + time.sleep(0.5) mac = self.station_data_query(station_name=str(sta), query="mac") mac_list.append(mac) - print(mac_list) + print("mac address list of all stations", mac_list) + self.mac_data = mac_list + # if all stations got ip check mac address if val: print("all stations got ip") - print("check if all tations are conncted one ap") + print("check if all stations are connected one ap") + + # get bssids of all stations + print("get bssids of all stations") check = [] for sta_name in sta_list: sta = sta_name.split(".")[2] @@ -488,14 +544,18 @@ class HardRoam(Realm): print(check) # check if all element of bssid list has same bssid's + print("check if all bssids are same or not") result = all(element == check[0] for element in check) + # if yes if result: self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000, side_b_max_rate=0, sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250, side_b_min_pdu=1250) + else: - print("move all clients to one ap") + # if bssid's are not same move try to move all clients to one ap + print("try to move all clients to one ap before roam ") for sta_name in sta_list: sta = sta_name.split(".")[2] print(sta) @@ -532,7 +592,8 @@ class HardRoam(Realm): if self.iteration_based: variable = self.iteration iterable_var = self.iteration - + post_bssid = None + # roam iteration loop starts here while variable: print("variable", variable) iter = None @@ -558,8 +619,9 @@ class HardRoam(Realm): else: station = self.wait_for_ip(sta_list) time.sleep(20) - print("start debug") - self.start_debug_(mac_list=mac_list) + if self.debug: + print("start debug") + self.start_debug_(mac_list=mac_list) if station: print("all stations got ip") # get bssid's of all stations connected @@ -570,18 +632,6 @@ class HardRoam(Realm): bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list.append(bssid) print(bssid_list) - - # for sta_name in sta_list: - # # local_row_list = [0, "68"] - # local_row_list = [str(iter)] - # sta = sta_name.split(".")[2] - # time.sleep(5) - # before_bssid = self.station_data_query(station_name=str(sta), query="ap") - # print(before_bssid) - # local_row_list.append(before_bssid) - # print(local_row_list) - # row_list.append(local_row_list) - # print(row_list) pass_fail_list = [] pcap_file_list = [] roam_time1 = [] @@ -593,12 +643,16 @@ class HardRoam(Realm): result = all(element == bssid_list[0] for element in bssid_list) if not result: + # for a iteration if client is not connected to same ap try to connect them print("giving a try to connect") print("move all clients to one ap") for sta_name in sta_list: sta = sta_name.split(".")[2] print(sta) - wpa_cmd = "roam " + str(bssid_list[0]) + if iter == 0: + wpa_cmd = "roam " + str(bssid_list[0]) + else: + wpa_cmd = "roam " + str(post_bssid) wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, @@ -627,10 +681,12 @@ class HardRoam(Realm): print(before_bss) before_bssid.append(before_bss) print("bssid 1", before_bssid) - # local_row_list.append(before_bss) - # print(local_row_list) - # row_list.append(local_row_list) - # print(row_list) + + if before_bssid[0] == str(self.c1_bssid.upper()): + post_bssid = self.c2_bssid.upper() + else: + post_bssid = self.c1_bssid.upper() + print("station are connected to post bssid", post_bssid) result1 = all(element == before_bssid[0] for element in before_bssid) @@ -726,35 +782,13 @@ class HardRoam(Realm): file_name_ = self.stop_sniffer() file_name = "./pcap/" + str(file_name_) print("pcap file name", file_name) + if self.debug: + print("stop debugger") + self.stop_debug_(mac_list=mac_list) + time.sleep(40) + else: + print("debug is disabled") - self.stop_debug_(mac_list=mac_list) - - time.sleep(60) - trace = self.get_file_name(client=self.num_sta) - log_file.append(trace) - - # cx_list = self.get_cx_list() - # print("quiece layer3") - # self.local_realm.drain_stop_cx(cx_name=cx_list[0]) - # time.sleep(10) - # self.cx_profile.start_cx() - # time.sleep(10) - # - # print("quiece layer3") - # - # cx_list = self.get_cx_list() - # print(cx_list) - # self.local_realm.drain_stop_cx(cx_name=cx_list[0]) - # time.sleep(30) - # tx_b = self.get_endp_values(cx_name=cx_list[0], query="tx bytes", endp="B") - # rx_a = self.get_endp_values(cx_name=cx_list[0], query="rx bytes", endp="A") - # packet_loss = int(tx_b) - int(rx_a) - # print(packet_loss) - # packet_loss_lst.append(packet_loss) - # print("start cx again") - # self.cx_profile.start_cx() - - time.sleep(40) self.wait_for_ip(sta_list) bssid_list_1 = [] for sta_name in sta_list: @@ -766,13 +800,17 @@ class HardRoam(Realm): for i, x in zip(row_list, bssid_list_1): i.append(x) print("row list", row_list) + time.sleep(60) + trace = self.get_file_name(client=self.num_sta) + log_file.append(trace) # check if all are equal - result = all(element == bssid_list_1[0] for element in bssid_list_1) + all(element == bssid_list_1[0] for element in bssid_list_1) res = "" station_before_ = before_bssid print("station before", station_before_) - for i,x in zip( mac_list, range(len(station_before_))): + # for each mac address query data from pcap + for i, x in zip(mac_list, range(len(station_before_))): print("mac ", i) print(x) print(bssid_list_1) @@ -786,12 +824,7 @@ class HardRoam(Realm): res = "PASS" if res == "FAIL": res = "FAIL" - - - - if res == "PASS": - query_reasso_response = self.get_wlan_mgt_status(file_name=file_name, filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) @@ -939,7 +972,13 @@ class HardRoam(Realm): message = "all stations are not connected to same ap for iteration " + str(iter) print("all stations are not connected to same ap") - + print("starting snifer") + self.start_sniffer(radio_channel=int(self.channel), radio=self.sniff_radio, + test_name="roam_11r_" + str(self.option) + "_iteration_" + str( + iter) + "_", + duration=3600) + print("stop sniff") + self.stop_sniffer() bssid_list2 = [] for sta_name in sta_list: # local_row_list = [0, "68"] @@ -966,7 +1005,13 @@ class HardRoam(Realm): for i in row_list: i.append("N/A") print("row list", row_list) - self.stop_debug_(mac_list=mac_list) + if self.debug: + print("stop debugger") + self.stop_debug_(mac_list=mac_list) + + else: + print("debug is disabled") + time.sleep(60) trace = self.get_file_name(client=self.num_sta) log_file.append(trace) @@ -982,7 +1027,6 @@ class HardRoam(Realm): for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) - else: message = "station's failed to get ip after the test start" print("station's failed to get ip after test starts") @@ -990,11 +1034,14 @@ class HardRoam(Realm): if time.time() > timeout: break except Exception as e: - pass + print(e) + pass else: message = "station's failed to get ip at the begining" print("station's failed to get associate at the begining") + else: + print("stations failed to get ip") test_end = datetime.now() test_end = test_end.strftime("%b %d %H:%M:%S") @@ -1006,6 +1053,7 @@ class HardRoam(Realm): self.test_duration = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT) return kernel_log, message + # graph function def generate_client_pass_fail_graph(self, csv_list=None): print("csv_list", csv_list) x_axis_category = [] @@ -1049,6 +1097,7 @@ class HardRoam(Realm): print("graph name {}".format(graph_png)) return graph_png + # report generation function def generate_report(self, csv_list, kernel_lst, current_path=None): report = lf_report_pdf.lf_report(_path= "", _results_dir_name="Hard Roam Test", _output_html="hard_roam.html", _output_pdf="Hard_roam_test.pdf") @@ -1099,7 +1148,7 @@ class HardRoam(Realm): # report.set_table_title("Client information " + str(i)) # report.build_table_title() report.set_obj_html("Client " + str(i+1) + " Information", "This Table gives detailed information " - "of client " + str(i+1) + " like the bssid it was before roam," + + "of client " + str(i+1) + " like the bssid it was before roam," + " bssid it was after roam, " + "roam time, capture file name and ra_trace file name along with remarks ") @@ -1139,6 +1188,7 @@ class HardRoam(Realm): "iterations": self.iteration, "SSID": self.ssid_name, "Security": self.security, + "Client mac": self.mac_data, "Contact": "support@candelatech.com" } report.set_table_title("Test basic Information")