mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
Added be_capacity_test for Wi-Fi 7
Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
@@ -80,6 +80,7 @@ class lf_libs:
|
||||
max_5g_stations = None
|
||||
max_6g_stations = None
|
||||
max_ax_stations = None
|
||||
max_be_stations = None
|
||||
max_ac_stations = None
|
||||
twog_prefix = "ath10k_2g0"
|
||||
fiveg_prefix = "ath10k_5g0"
|
||||
@@ -258,6 +259,7 @@ class lf_libs:
|
||||
self.max_5g_stations = 0
|
||||
self.max_6g_stations = 0
|
||||
self.max_ax_stations = 0
|
||||
self.max_be_stations = 0
|
||||
self.max_ac_stations = 0
|
||||
phantom_radios = []
|
||||
for info in data:
|
||||
@@ -274,7 +276,7 @@ class lf_libs:
|
||||
self.max_2g_stations += 1 * int(str(data[info]["max_vifs"]))
|
||||
self.max_5g_stations += 1 * int(str(data[info]["max_vifs"]))
|
||||
self.max_6g_stations += 1 * int(str(data[info]["max_vifs"]))
|
||||
self.max_ax_stations += 1 * int(str(data[info]["max_vifs"]))
|
||||
self.max_be_stations += 1 * int(str(data[info]["max_vifs"]))
|
||||
self.be200_radios.append(info)
|
||||
if str(data[info]["driver"]).__contains__("AX210"):
|
||||
self.max_possible_stations += 1
|
||||
@@ -319,6 +321,7 @@ class lf_libs:
|
||||
logging.info("max_5g_stations: " + str(self.max_5g_stations))
|
||||
logging.info("max_6g_stations: " + str(self.max_6g_stations))
|
||||
logging.info("max_ax_stations: " + str(self.max_ax_stations))
|
||||
logging.info("max_be_stations: " + str(self.max_be_stations))
|
||||
logging.info("max_ac_stations: " + str(self.max_ac_stations))
|
||||
|
||||
def load_scenario(self):
|
||||
|
||||
@@ -1182,7 +1182,7 @@ class lf_tests(lf_libs):
|
||||
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
|
||||
self.update_duts(identifier=identifier, ssid_data=ssid_data)
|
||||
|
||||
def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None):
|
||||
def add_stations(self, is_wifi7=False, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None):
|
||||
dut_name = []
|
||||
# for index in range(0, len(self.dut_data)):
|
||||
# dut_name.append(self.dut_data[index]["identifier"])
|
||||
@@ -1212,23 +1212,27 @@ class lf_tests(lf_libs):
|
||||
' password=' + dut_data[identifier]["ssid_data"][idx_]["password"] +
|
||||
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
|
||||
self.update_duts(identifier=identifier, ssid_data=ssid_data)
|
||||
dict_all_radios_2g = {"be200_radios": self.be200_radios,
|
||||
|
||||
if is_wifi7:
|
||||
dict_all_radios_2g = {"be200_radios": self.be200_radios}
|
||||
dict_all_radios_5g = {"be200_radios": self.be200_radios}
|
||||
dict_all_radios_6g = {"be200_radios": self.be200_radios}
|
||||
else:
|
||||
dict_all_radios_2g = {"be200_radios": self.be200_radios,
|
||||
"ax210_radios": self.ax210_radios,
|
||||
"ax200_radios": self.ax200_radios,
|
||||
"mtk_radios": self.mtk_radios,
|
||||
"wave2_2g_radios": self.wave2_2g_radios,
|
||||
"wave1_radios": self.wave1_radios
|
||||
}
|
||||
|
||||
dict_all_radios_5g = {"be200_radios": self.be200_radios,
|
||||
dict_all_radios_5g = {"be200_radios": self.be200_radios,
|
||||
"ax210_radios": self.ax210_radios,
|
||||
"ax200_radios": self.ax200_radios,
|
||||
"mtk_radios": self.mtk_radios,
|
||||
"wave2_5g_radios": self.wave2_5g_radios,
|
||||
"wave1_radios": self.wave1_radios,
|
||||
}
|
||||
|
||||
dict_all_radios_6g = {"be200_radios": self.be200_radios, "ax210_radios": self.ax210_radios}
|
||||
dict_all_radios_6g = {"be200_radios": self.be200_radios, "ax210_radios": self.ax210_radios}
|
||||
|
||||
max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19,
|
||||
"ax200_radios": 1, "ax210_radios": 1, "be200_radios": 1}
|
||||
@@ -1386,7 +1390,7 @@ class lf_tests(lf_libs):
|
||||
instance_name="wct_instance", download_rate="1Gbps", influx_tags="",
|
||||
upload_rate="1Gbps", protocol="TCP-IPv4", duration="60000", stations="", create_stations=False,
|
||||
sort="interleave", raw_lines=[], move_to_influx=False, dut_data={}, ssid_name=None,
|
||||
num_stations={}, add_stations=True, create_vlan=True, pass_fail_criteria=False):
|
||||
num_stations={}, add_stations=True, create_vlan=True, pass_fail_criteria=False, is_wifi7=False):
|
||||
wificapacity_obj_list = []
|
||||
vlan_raw_lines = None
|
||||
for dut in self.dut_data:
|
||||
@@ -1450,7 +1454,7 @@ class lf_tests(lf_libs):
|
||||
ssid_name = dut_data[i]["ssid_data"][j]["ssid"]
|
||||
radio_data = self.add_stations(band=band_, num_stations=num_stations[band_], ssid_name=ssid_name,
|
||||
dut_data=dut_data,
|
||||
identifier=identifier)
|
||||
identifier=identifier, is_wifi7=is_wifi7)
|
||||
if vlan_raw_lines is not None:
|
||||
for i in vlan_raw_lines:
|
||||
self.temp_raw_lines.append(i)
|
||||
@@ -3311,6 +3315,139 @@ class lf_tests(lf_libs):
|
||||
finally:
|
||||
self.set_radio_channel(radio=selected_ax_radio, antenna="0")
|
||||
|
||||
def be_capacity_test(self, instance_name="", dut_data=None, mode="BRIDGE", download_rate="10Gbps",
|
||||
upload_rate="0Gbps", dut_mode="", protocol="UDP-IPv4", num_stations={}, vlan_id=None):
|
||||
if self.max_be_stations == 0:
|
||||
logging.info("This test needs BE radios, looks like no BE radios are available on the Lanforge system.")
|
||||
pytest.skip("BE radios are not available on the Lanforge, so skipping this test.")
|
||||
|
||||
if dut_mode.lower() == "wifi6":
|
||||
logging.info("AP does not support BE mode, so skipping this test.")
|
||||
pytest.skip("AP does not support BE mode, so skipping this test")
|
||||
|
||||
dict_all_radios_be = {"be200_radios": self.be200_radios}
|
||||
selected_be_radio = None
|
||||
for radio in dict_all_radios_be:
|
||||
if len(dict_all_radios_be[radio]) > 0:
|
||||
selected_be_radio = dict_all_radios_be[radio][0]
|
||||
break
|
||||
logging.info("Selected BE Radio: {}".format(selected_be_radio))
|
||||
|
||||
for data in self.dut_data:
|
||||
identifier = data["identifier"]
|
||||
ssid_name = dut_data[identifier]["ssid_data"][0]["ssid"]
|
||||
passkey = dut_data[identifier]["ssid_data"][0]["password"]
|
||||
band = list(num_stations.keys())[0]
|
||||
|
||||
try:
|
||||
self.set_radio_channel(radio=selected_be_radio, antenna="AUTO")
|
||||
values = selected_be_radio.split(".")
|
||||
shelf = int(values[0])
|
||||
resource = int(values[1])
|
||||
self.pre_cleanup()
|
||||
sta_name = [f"{shelf}.{resource}.be_station"]
|
||||
logging.info("sta_name:- " + str(sta_name))
|
||||
sta_ip = self.client_connect_using_radio(ssid=ssid_name, passkey=passkey, mode=mode, station_name=sta_name,
|
||||
radio=selected_be_radio, vlan_id=vlan_id, create_vlan=True)
|
||||
time.sleep(0.5)
|
||||
|
||||
sta_rows = ["ip", "mode", "channel", "signal", "parent dev", "mac"]
|
||||
station_data = self.get_station_data(sta_name=sta_name, rows=sta_rows, allure_attach=True,
|
||||
allure_name="Station Data")
|
||||
logging.info("station_data:- " + str(station_data))
|
||||
if not sta_ip:
|
||||
logging.info("Test Failed, due to station has no ip")
|
||||
pytest.fail("Station did not get an ip")
|
||||
|
||||
sta_mode = station_data[sta_name[0]]["mode"]
|
||||
logging.info("sta_mode:- " + str(sta_mode))
|
||||
wifi_capacity_obj_list = self.wifi_capacity(instance_name=instance_name, mode=mode,
|
||||
download_rate=download_rate, upload_rate=upload_rate,
|
||||
protocol=protocol, duration="60000", ssid_name=ssid_name,
|
||||
batch_size="1", num_stations=num_stations, stations=sta_name[0],
|
||||
dut_data=dut_data, vlan_id=vlan_id, add_stations=False,
|
||||
create_vlan=False)
|
||||
|
||||
report = wifi_capacity_obj_list[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
|
||||
numeric_score = self.read_kpi_file(column_name=["numeric-score"], dir_name=report)
|
||||
current_directory = os.getcwd()
|
||||
file_path = current_directory + "/e2e/basic/performance_tests/performance_pass_fail.json"
|
||||
logging.info("performance_pass file config path:- " + str(file_path))
|
||||
with open(file_path, 'r') as file:
|
||||
json_string = file.read()
|
||||
all_pass_fail_data = json.loads(json_string)
|
||||
logging.info("All Testbed pass fail data:- " + str(all_pass_fail_data))
|
||||
# validate config json data
|
||||
try:
|
||||
json_object = json.dumps(all_pass_fail_data)
|
||||
except ValueError as e:
|
||||
logging.info("Performance Pass/Fail data is invalid")
|
||||
pytest.fail("Performance Pass/Fail data is invalid")
|
||||
logging.info("DUT Data: " + str(self.dut_data))
|
||||
model = self.dut_data[0]["model"]
|
||||
if model in all_pass_fail_data["AP Models"]:
|
||||
pass_fail_values = all_pass_fail_data["AP Models"][model]
|
||||
else:
|
||||
logging.error("AP model is not available in performance_pass_fail.json file")
|
||||
logging.info(str(model) + " All Benchmark throughput:- " + str(pass_fail_values))
|
||||
split_mode = sta_mode.split(" ")
|
||||
key = f"{band} {split_mode[2]} {split_mode[1]}MHz"
|
||||
logging.info("key:- " + str(key))
|
||||
proto = None
|
||||
if "TCP" in protocol:
|
||||
proto = "TCP"
|
||||
else:
|
||||
proto = "UDP"
|
||||
logging.info("Proto:- " + str(proto))
|
||||
logging.info("Given LF download_rate:- " + str(download_rate))
|
||||
logging.info("Given LF upload_rate:- " + str(upload_rate))
|
||||
pass_fail_value = pass_fail_values[key][proto]
|
||||
logging.info("pass_fail value:- " + str(pass_fail_value))
|
||||
download_rate = self.convert_to_gbps(download_rate)
|
||||
logging.info("download_rate:- " + str(download_rate))
|
||||
upload_rate = self.convert_to_gbps(upload_rate)
|
||||
logging.info("upload_rate:- " + str(upload_rate))
|
||||
# Pass fail logic for Upload. validating download rate because providing some value during Upload
|
||||
if upload_rate > download_rate:
|
||||
logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+")
|
||||
allure.attach(name="Benchmark throughput: ",
|
||||
body=str(pass_fail_value) + "+ Mbps")
|
||||
actual_tht = int(numeric_score[1][0])
|
||||
logging.info("Actual throughput:- " + str(actual_tht))
|
||||
allure.attach(name="Actual throughput: ",
|
||||
body=str(actual_tht) + " Mbps")
|
||||
if actual_tht < pass_fail_value:
|
||||
pytest.fail(
|
||||
f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps")
|
||||
elif upload_rate < download_rate:
|
||||
# Pass fail logic for Download. validating upload rate because providing some value during download
|
||||
logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+")
|
||||
allure.attach(name="Benchmark throughput: ",
|
||||
body=str(pass_fail_value) + "+ Mbps")
|
||||
actual_tht = int(numeric_score[0][0])
|
||||
logging.info("Actual throughput:- " + str(actual_tht))
|
||||
allure.attach(name="Actual throughput: ",
|
||||
body=str(actual_tht) + " Mbps")
|
||||
if actual_tht < pass_fail_value:
|
||||
pytest.fail(
|
||||
f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps")
|
||||
elif upload_rate == download_rate:
|
||||
# Pass fail logic for bidirectional
|
||||
pass_fail_value = pass_fail_value * 2
|
||||
logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+")
|
||||
allure.attach(name="Benchmark throughput: ",
|
||||
body=str(pass_fail_value) + "+ Mbps")
|
||||
actual_tht = int(numeric_score[2][0])
|
||||
logging.info("Actual throughput:- " + str(actual_tht))
|
||||
allure.attach(name="Actual throughput: ",
|
||||
body=str(actual_tht) + " Mbps")
|
||||
if actual_tht < pass_fail_value:
|
||||
pytest.fail(
|
||||
f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps")
|
||||
|
||||
finally:
|
||||
self.set_radio_channel(radio=selected_be_radio, antenna="0")
|
||||
|
||||
def multi_ssid_test(self, setup_params_general: dict, no_of_2g_and_5g_stations: int = 2, mode: str = "BRIDGE",
|
||||
security_key: str = "something", security: str = "wpa2") -> None:
|
||||
sta_names_2g, sta_names_5g = [], []
|
||||
|
||||
Reference in New Issue
Block a user