diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index 3ac8536d..2c8f285c 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -938,7 +938,7 @@ class lf_libs: except Exception as e: logging.error(e) - def set_radio_channel(self, radio="1.1.wiphy0", channel="AUTO", country=None): + def set_radio_channel(self, radio="1.1.wiphy0", channel="AUTO", country=None, antenna=None): # country_code = US(840) try: radio = radio.split(".") @@ -956,6 +956,8 @@ class lf_libs: try: if country: # update the dictionary data["country"] = country + if antenna: + data["antenna"] = antenna except Exception as e: logging.error(f"{e}\nunable to change lanforge radio country code") local_realm_obj.json_post("/cli-json/set_wifi_radio", _data=data) @@ -1281,6 +1283,46 @@ class lf_libs: name=i, attachment_type="image/png", extension=None) + def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True, kpi_csv=False, + file_name="/csv-data/data-Combined_bps__60_second_running_average-1.csv", + batch_size="0"): + try: + df = pd.read_csv("../reports/" + str(dir_name) + file_name, + sep=r'\t', engine='python') + logging.info("csv file opened") + except FileNotFoundError: + logging.info("csv file does not exist") + return False + + if kpi_csv: + count = 0 + dict_data = {"Down": {}, "Up": {}, "Both": {}} + csv_short_dis = df.loc[:,"short-description"] + csv_num_score = df.loc[:,"numeric-score"] + for i in range(len(batch_size.split(","))): + dict_data["Down"][csv_short_dis[count + 0]] = csv_num_score[count + 0] + dict_data["Up"][csv_short_dis[count + 1]] = csv_num_score[count + 1] + dict_data["Both"][csv_short_dis[count + 2]] = csv_num_score[count + 2] + count += 3 + + if individual_station_throughput: + dict_data = {} + if option == "download": + csv_sta_names = df.iloc[[0]].values.tolist() + csv_throughput_values = df.iloc[[1]].values.tolist() + elif option == "upload": + csv_sta_names = df.iloc[[0]].values.tolist() + csv_throughput_values = df.iloc[[2]].values.tolist() + else: + print("Provide proper option: download or upload") + return + + sta_list = csv_sta_names[0][0][:-1].replace('"', '').split(",") + th_list = list(map(float, csv_throughput_values[0][0].split(","))) + for i in range(len(sta_list)): + dict_data[sta_list[i]] = th_list[i] + return dict_data + def attach_report_kpi(self, report_name=None, file_name="kpi_file"): if report_name[-1] == "/": path = "../reports/" + str(report_name) + "kpi.csv" diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index 6b54ecfe..c1b077ae 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -959,6 +959,29 @@ class lf_tests(lf_libs): logging.error("5 Ghz channel didn't changed after radar detected") pytest.fail("5 Ghz channel didn't changed after radar detected") + def update_dut_ssid(self, dut_data={}): + r_val = dict() + for dut in self.dut_data: + r_val[dut["identifier"]] = None + # updating ssids on all APS + for dut in self.dut_data: + ssid_data = [] + identifier = dut["identifier"] + if r_val.keys().__contains__(identifier): + for idx_ in dut_data[identifier]["ssid_data"]: + if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() == "OPEN": + ssid_data.append( + ['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"] + + + ' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()]) + else: + ssid_data.append( + ['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"] + + ' security=' + str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() + + ' password=' + dut_data[identifier]["ssid_data"][idx_]["password"] + + ' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()]) + self.update_duts(identifier=identifier, ssid_data=ssid_data) + def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None): dut_name = [] # for index in range(0, len(self.dut_data)): @@ -967,10 +990,10 @@ class lf_tests(lf_libs): if num_stations == 0: logging.warning("0 Stations") return + idx = None r_val = dict() for dut in self.dut_data: r_val[dut["identifier"]] = None - idx = None # updating ssids on all APS for dut in self.dut_data: ssid_data = [] @@ -1333,52 +1356,97 @@ class lf_tests(lf_libs): return dataplane_obj_list def multi_asso_disasso(self, band="2G", num_stations=16, dut_data={}, idx=0, mode="BRIDGE", vlan=1, - instance_name="wct_instance"): - def thread_fun(station_list): - print(station_list) - time.sleep(60) - self.local_realm.admin_down(sta_list=station_list) - print("stations down") - time.sleep(10) - self.admin_up(sta_list=station_list) - print("stations up") - radio, traffic_rate = (self.wave2_5g_radios, '8Mbps') if band == "5G" else (self.wave2_2g_radios, '4Mbps') - per_radio_sta = int(num_stations / len(radio)) - rem = num_stations % len(radio) - logging.info("Total stations per radio: " + str(per_radio_sta)) - num_stations = lambda rem: per_radio_sta+1 if rem else per_radio_sta - identifier = list(dut_data.keys())[0] - for i in radio: - station_data = ["profile_link " + i.split(".")[0] + "." + i.split(".")[1] + - " STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]] - rem = 0 - self.temp_raw_lines.append(station_data) - logging.debug("Raw Line : " + str(station_data)) + instance_name="wct_instance", traffic_direction="upload", traffic_rate="0Mbps"): + try: + def thread_fun(station_list): + time.sleep(60) + for i in station_list: + self.local_realm.admin_down(i) + logging.info("stations down") + time.sleep(10) + for i in station_list: + self.local_realm.admin_up(i) + logging.info("stations up") - self.chamber_view(raw_lines="custom") - sta_list = [] - for rad in radio: - self.set_radio_channel(radio=rad, antenna=4) - for u in self.json_get("/port/?fields=port+type,alias")['interfaces']: - if list(u.values())[0]['port type'] in ['WIFI-STA']: - sta_list.append(list(u.keys())[0]) + # clean l3 traffics which won't get cleaned by deleting old scenario in CV + self.client_disconnect(clean_l3_traffic=True) + radio = self.wave2_5g_radios if band == "5G" else self.wave2_2g_radios + upld_rate, downld_rate = "0Gbps", "0Gbps" + if traffic_direction == "upload": + upld_rate = traffic_rate + elif traffic_direction == "download": + downld_rate = traffic_rate + per_radio_sta = int(num_stations / len(radio)) + rem = num_stations % len(radio) + logging.info("Total stations per radio: " + str(per_radio_sta)) + num_stations = lambda rem: per_radio_sta+1 if rem else per_radio_sta + identifier = list(dut_data.keys())[0] + allure.attach(name="Definition", + body="Multiple association/disassociation stability test intends to measure stability of Wi-Fi device " \ + "under a dynamic environment with frequent change of connection status.") + allure.attach(name="Procedure", + body=f"This test case definition states that we Create 16 stations on {band} radio and" \ + " Run Wifi-capacity test for first 8 stations. 8 stations are picked for sending/receiving packets " + "while the other 8 STAs are picked to do a dis-association/re-association process during the test" \ + f" Enable {traffic_direction} {traffic_rate} Mbps UDP flow from DUT to each of the 8 traffic stations" \ + "Disassociate the other 8 stations. Wait for 30 seconds, after that Re-associate the 8 stations.") - self.local_realm.admin_up(sta_list=sta_list) - sel_stations = ",".join(sta_list[0:8]) - val = [['ul_rate_sel: Per-Station Upload Rate:']] - thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],)) - thr1.start() - wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, download_rate="0Gbps", - stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=traffic_rate, - protocol="UDP-IPv4", duration="120000", create_stations=False, dut_data=dut_data, - sort = "interleave",) + for i in radio: + station_data = ["profile_link " + i.split(".")[0] + "." + i.split(".")[1] + + " STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]] + rem = 0 + self.temp_raw_lines.append(station_data) + logging.debug("Raw Line : " + str(station_data)) + # update the dut ssid in CV + self.update_dut_ssid(dut_data=dut_data) + self.chamber_view(raw_lines="custom") + sta_list = [] + for rad in radio: + self.set_radio_channel(radio=rad, antenna=4) + for u in self.json_get("/port/?fields=port+type,alias")['interfaces']: + if list(u.values())[0]['port type'] in ['WIFI-STA']: + sta_list.append(list(u.keys())[0]) - report_name = wct_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] - self.attach_report_graphs(report_name=report_name) - csv_val = get_test_library.read_csv_individual_station_throughput(dir_name=report_name, option="upload") - print(type(csv_val)) - print(csv_val) + for i in sta_list: + self.local_realm.admin_up(i) + sel_stations = ",".join(sta_list[0:8]) + val = [['ul_rate_sel: Per-Station Upload Rate:']] + thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],)) + thr1.start() + wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, download_rate=downld_rate, + stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=upld_rate, + protocol="UDP-IPv4", duration="120000", create_stations=False, dut_data=dut_data, + sort = "interleave",) + + report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/" + self.attach_report_graphs(report_name=report_name) + csv_val = self.read_csv_individual_station_throughput(dir_name=report_name, option=traffic_direction) + logging.info(csv_val) + pass_value = int(traffic_rate[0]) * 0.99 + logging.info(csv_val) + allure.attach(name="Pass Fail Criteria", + body=f"UDP traffic rate is at least 99% of the configured rate for each station. Here configured " \ + f"traffic rate is {traffic_rate[0]} Mbps so traffic for each station should be {pass_value} Mbps ") + if not csv_val: + return False, "csv file does not exist" + else: + pass_fail = [1 if i >= pass_value else 0 for i in csv_val.values()] + allure.attach.file( + source="../reports/" + report_name + "/csv-data/data-Combined_bps__60_second_running_average-1.csv", + name="Throughput CSV file", attachment_type=allure.attachment_type.CSV) + if pass_fail.count(0) == 0: + return True, "Test passed" + else: + return False, "Test failed due to lesser value" + except Exception as e: + logging.error(f"{e}") + return False, f"{e}" + finally: + try: + self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True) + except Exception as e: + logging.error(f"{e}") def country_code_channel_division(self, ssid="[BLANK]", passkey='[BLANK]', security="wpa2", mode="BRIDGE", band='twog', num_sta=1, vlan_id=100, channel='1', channel_width=20,