From 935a2325b03b19dac168299b3a61b360ae3dc91d Mon Sep 17 00:00:00 2001 From: jitendracandela Date: Wed, 5 Mar 2025 11:41:12 +0530 Subject: [PATCH] Added be_capacity_test for Wi-Fi 7 Signed-off-by: jitendracandela --- lf_libs/lf_libs.py | 5 +- lf_libs/lf_tests.py | 153 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 149 insertions(+), 9 deletions(-) diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index 55f5ac8b..9d38046e 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -80,6 +80,7 @@ class lf_libs: max_5g_stations = None max_6g_stations = None max_ax_stations = None + max_be_stations = None max_ac_stations = None twog_prefix = "ath10k_2g0" fiveg_prefix = "ath10k_5g0" @@ -258,6 +259,7 @@ class lf_libs: self.max_5g_stations = 0 self.max_6g_stations = 0 self.max_ax_stations = 0 + self.max_be_stations = 0 self.max_ac_stations = 0 phantom_radios = [] for info in data: @@ -274,7 +276,7 @@ class lf_libs: self.max_2g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_5g_stations += 1 * int(str(data[info]["max_vifs"])) self.max_6g_stations += 1 * int(str(data[info]["max_vifs"])) - self.max_ax_stations += 1 * int(str(data[info]["max_vifs"])) + self.max_be_stations += 1 * int(str(data[info]["max_vifs"])) self.be200_radios.append(info) if str(data[info]["driver"]).__contains__("AX210"): self.max_possible_stations += 1 @@ -319,6 +321,7 @@ class lf_libs: logging.info("max_5g_stations: " + str(self.max_5g_stations)) logging.info("max_6g_stations: " + str(self.max_6g_stations)) logging.info("max_ax_stations: " + str(self.max_ax_stations)) + logging.info("max_be_stations: " + str(self.max_be_stations)) logging.info("max_ac_stations: " + str(self.max_ac_stations)) def load_scenario(self): diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index fa0f3ae0..7e06ffd6 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -1182,7 +1182,7 @@ class lf_tests(lf_libs): ' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()]) self.update_duts(identifier=identifier, ssid_data=ssid_data) - def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None): + def add_stations(self, is_wifi7=False, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None): dut_name = [] # for index in range(0, len(self.dut_data)): # dut_name.append(self.dut_data[index]["identifier"]) @@ -1212,23 +1212,27 @@ class lf_tests(lf_libs): ' password=' + dut_data[identifier]["ssid_data"][idx_]["password"] + ' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()]) self.update_duts(identifier=identifier, ssid_data=ssid_data) - dict_all_radios_2g = {"be200_radios": self.be200_radios, + + if is_wifi7: + dict_all_radios_2g = {"be200_radios": self.be200_radios} + dict_all_radios_5g = {"be200_radios": self.be200_radios} + dict_all_radios_6g = {"be200_radios": self.be200_radios} + else: + dict_all_radios_2g = {"be200_radios": self.be200_radios, "ax210_radios": self.ax210_radios, "ax200_radios": self.ax200_radios, "mtk_radios": self.mtk_radios, "wave2_2g_radios": self.wave2_2g_radios, "wave1_radios": self.wave1_radios } - - dict_all_radios_5g = {"be200_radios": self.be200_radios, + dict_all_radios_5g = {"be200_radios": self.be200_radios, "ax210_radios": self.ax210_radios, "ax200_radios": self.ax200_radios, "mtk_radios": self.mtk_radios, "wave2_5g_radios": self.wave2_5g_radios, "wave1_radios": self.wave1_radios, } - - dict_all_radios_6g = {"be200_radios": self.be200_radios, "ax210_radios": self.ax210_radios} + dict_all_radios_6g = {"be200_radios": self.be200_radios, "ax210_radios": self.ax210_radios} max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19, "ax200_radios": 1, "ax210_radios": 1, "be200_radios": 1} @@ -1386,7 +1390,7 @@ class lf_tests(lf_libs): instance_name="wct_instance", download_rate="1Gbps", influx_tags="", upload_rate="1Gbps", protocol="TCP-IPv4", duration="60000", stations="", create_stations=False, sort="interleave", raw_lines=[], move_to_influx=False, dut_data={}, ssid_name=None, - num_stations={}, add_stations=True, create_vlan=True, pass_fail_criteria=False): + num_stations={}, add_stations=True, create_vlan=True, pass_fail_criteria=False, is_wifi7=False): wificapacity_obj_list = [] vlan_raw_lines = None for dut in self.dut_data: @@ -1450,7 +1454,7 @@ class lf_tests(lf_libs): ssid_name = dut_data[i]["ssid_data"][j]["ssid"] radio_data = self.add_stations(band=band_, num_stations=num_stations[band_], ssid_name=ssid_name, dut_data=dut_data, - identifier=identifier) + identifier=identifier, is_wifi7=is_wifi7) if vlan_raw_lines is not None: for i in vlan_raw_lines: self.temp_raw_lines.append(i) @@ -3311,6 +3315,139 @@ class lf_tests(lf_libs): finally: self.set_radio_channel(radio=selected_ax_radio, antenna="0") + def be_capacity_test(self, instance_name="", dut_data=None, mode="BRIDGE", download_rate="10Gbps", + upload_rate="0Gbps", dut_mode="", protocol="UDP-IPv4", num_stations={}, vlan_id=None): + if self.max_be_stations == 0: + logging.info("This test needs BE radios, looks like no BE radios are available on the Lanforge system.") + pytest.skip("BE radios are not available on the Lanforge, so skipping this test.") + + if dut_mode.lower() == "wifi6": + logging.info("AP does not support BE mode, so skipping this test.") + pytest.skip("AP does not support BE mode, so skipping this test") + + dict_all_radios_be = {"be200_radios": self.be200_radios} + selected_be_radio = None + for radio in dict_all_radios_be: + if len(dict_all_radios_be[radio]) > 0: + selected_be_radio = dict_all_radios_be[radio][0] + break + logging.info("Selected BE Radio: {}".format(selected_be_radio)) + + for data in self.dut_data: + identifier = data["identifier"] + ssid_name = dut_data[identifier]["ssid_data"][0]["ssid"] + passkey = dut_data[identifier]["ssid_data"][0]["password"] + band = list(num_stations.keys())[0] + + try: + self.set_radio_channel(radio=selected_be_radio, antenna="AUTO") + values = selected_be_radio.split(".") + shelf = int(values[0]) + resource = int(values[1]) + self.pre_cleanup() + sta_name = [f"{shelf}.{resource}.be_station"] + logging.info("sta_name:- " + str(sta_name)) + sta_ip = self.client_connect_using_radio(ssid=ssid_name, passkey=passkey, mode=mode, station_name=sta_name, + radio=selected_be_radio, vlan_id=vlan_id, create_vlan=True) + time.sleep(0.5) + + sta_rows = ["ip", "mode", "channel", "signal", "parent dev", "mac"] + station_data = self.get_station_data(sta_name=sta_name, rows=sta_rows, allure_attach=True, + allure_name="Station Data") + logging.info("station_data:- " + str(station_data)) + if not sta_ip: + logging.info("Test Failed, due to station has no ip") + pytest.fail("Station did not get an ip") + + sta_mode = station_data[sta_name[0]]["mode"] + logging.info("sta_mode:- " + str(sta_mode)) + wifi_capacity_obj_list = self.wifi_capacity(instance_name=instance_name, mode=mode, + download_rate=download_rate, upload_rate=upload_rate, + protocol=protocol, duration="60000", ssid_name=ssid_name, + batch_size="1", num_stations=num_stations, stations=sta_name[0], + dut_data=dut_data, vlan_id=vlan_id, add_stations=False, + create_vlan=False) + + report = wifi_capacity_obj_list[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/" + numeric_score = self.read_kpi_file(column_name=["numeric-score"], dir_name=report) + current_directory = os.getcwd() + file_path = current_directory + "/e2e/basic/performance_tests/performance_pass_fail.json" + logging.info("performance_pass file config path:- " + str(file_path)) + with open(file_path, 'r') as file: + json_string = file.read() + all_pass_fail_data = json.loads(json_string) + logging.info("All Testbed pass fail data:- " + str(all_pass_fail_data)) + # validate config json data + try: + json_object = json.dumps(all_pass_fail_data) + except ValueError as e: + logging.info("Performance Pass/Fail data is invalid") + pytest.fail("Performance Pass/Fail data is invalid") + logging.info("DUT Data: " + str(self.dut_data)) + model = self.dut_data[0]["model"] + if model in all_pass_fail_data["AP Models"]: + pass_fail_values = all_pass_fail_data["AP Models"][model] + else: + logging.error("AP model is not available in performance_pass_fail.json file") + logging.info(str(model) + " All Benchmark throughput:- " + str(pass_fail_values)) + split_mode = sta_mode.split(" ") + key = f"{band} {split_mode[2]} {split_mode[1]}MHz" + logging.info("key:- " + str(key)) + proto = None + if "TCP" in protocol: + proto = "TCP" + else: + proto = "UDP" + logging.info("Proto:- " + str(proto)) + logging.info("Given LF download_rate:- " + str(download_rate)) + logging.info("Given LF upload_rate:- " + str(upload_rate)) + pass_fail_value = pass_fail_values[key][proto] + logging.info("pass_fail value:- " + str(pass_fail_value)) + download_rate = self.convert_to_gbps(download_rate) + logging.info("download_rate:- " + str(download_rate)) + upload_rate = self.convert_to_gbps(upload_rate) + logging.info("upload_rate:- " + str(upload_rate)) + # Pass fail logic for Upload. validating download rate because providing some value during Upload + if upload_rate > download_rate: + logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+") + allure.attach(name="Benchmark throughput: ", + body=str(pass_fail_value) + "+ Mbps") + actual_tht = int(numeric_score[1][0]) + logging.info("Actual throughput:- " + str(actual_tht)) + allure.attach(name="Actual throughput: ", + body=str(actual_tht) + " Mbps") + if actual_tht < pass_fail_value: + pytest.fail( + f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps") + elif upload_rate < download_rate: + # Pass fail logic for Download. validating upload rate because providing some value during download + logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+") + allure.attach(name="Benchmark throughput: ", + body=str(pass_fail_value) + "+ Mbps") + actual_tht = int(numeric_score[0][0]) + logging.info("Actual throughput:- " + str(actual_tht)) + allure.attach(name="Actual throughput: ", + body=str(actual_tht) + " Mbps") + if actual_tht < pass_fail_value: + pytest.fail( + f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps") + elif upload_rate == download_rate: + # Pass fail logic for bidirectional + pass_fail_value = pass_fail_value * 2 + logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+") + allure.attach(name="Benchmark throughput: ", + body=str(pass_fail_value) + "+ Mbps") + actual_tht = int(numeric_score[2][0]) + logging.info("Actual throughput:- " + str(actual_tht)) + allure.attach(name="Actual throughput: ", + body=str(actual_tht) + " Mbps") + if actual_tht < pass_fail_value: + pytest.fail( + f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps") + + finally: + self.set_radio_channel(radio=selected_be_radio, antenna="0") + def multi_ssid_test(self, setup_params_general: dict, no_of_2g_and_5g_stations: int = 2, mode: str = "BRIDGE", security_key: str = "something", security: str = "wpa2") -> None: sta_names_2g, sta_names_5g = [], []