Added pass-fail for rate limiting, ax capacity, single client wifi capacity

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2024-07-12 10:59:04 +05:30
parent bd9ac610f4
commit b76cf52e72

View File

@@ -1088,18 +1088,50 @@ class lf_tests(lf_libs):
create_stations=False,
sort="interleave", raw_lines=[], move_to_influx=False, dut_data={}, ssid_name=None,
num_stations={}, add_stations=True, passkey=None, up_rate=None, down_rate=None):
self.wifi_capacity(mode=mode, vlan_id=vlan_id, batch_size=batch_size, instance_name=instance_name,
obj = self.wifi_capacity(mode=mode, vlan_id=vlan_id, batch_size=batch_size, instance_name=instance_name,
download_rate=download_rate,
influx_tags=influx_tags, upload_rate=upload_rate, protocol=protocol, duration=duration,
stations=stations, create_stations=create_stations, sort=sort, raw_lines=raw_lines,
move_to_influx=move_to_influx,
dut_data=dut_data, ssid_name=ssid_name, num_stations=num_stations, add_stations=add_stations)
report_name = obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
numeric_score = self.read_kpi_file(column_name=["numeric-score"], dir_name=report_name)
logging.info("Numeric-score: " + str(numeric_score))
download_rate = self.convert_to_gbps(download_rate)
logging.info("download_rate:- " + str(download_rate))
upload_rate = self.convert_to_gbps(upload_rate)
logging.info("upload_rate:- " + str(upload_rate))
if upload_rate > download_rate:
logging.info("rate limit ingress-rate:- " + str(up_rate))
actual_tht = int(numeric_score[1][0])
logging.info("Actual throughput:- " + str(actual_tht))
if actual_tht > up_rate:
pytest.fail(f"Expected Throughput should be less than {up_rate} Mbps")
elif upload_rate < download_rate:
logging.info("rate limit egress-rate:- " + str(down_rate))
actual_tht = int(numeric_score[0][0])
logging.info("Actual throughput:- " + str(actual_tht))
if actual_tht > down_rate:
pytest.fail(f"Expected Throughput should be less than {down_rate} Mbps")
elif upload_rate == download_rate:
# Pass fail logic for bidirectional
logging.info("rate limit ingress-rate:- " + str(up_rate))
logging.info("rate limit egress-rate:- " + str(down_rate))
actual_tht_dw = int(numeric_score[0][0])
actual_tht_up = int(numeric_score[1][0])
logging.info("Actual throughput download:- " + str(actual_tht_dw))
logging.info("Actual throughput upload:- " + str(actual_tht_up))
if actual_tht_dw > down_rate:
pytest.fail(f"Expected Throughput should be less than {down_rate} Mbps")
if actual_tht_up > up_rate:
pytest.fail(f"Expected Throughput should be less than {up_rate} Mbps")
def wifi_capacity(self, mode="BRIDGE", vlan_id=100, batch_size="1,5,10,20,40,64,128",
instance_name="wct_instance", download_rate="1Gbps", influx_tags="",
upload_rate="1Gbps", protocol="TCP-IPv4", duration="60000", stations="", create_stations=False,
sort="interleave", raw_lines=[], move_to_influx=False, dut_data={}, ssid_name=None,
num_stations={}, add_stations=True, create_vlan=True):
num_stations={}, add_stations=True, create_vlan=True, pass_fail_criteria=False):
wificapacity_obj_list = []
vlan_raw_lines = None
for dut in self.dut_data:
@@ -1168,6 +1200,32 @@ class lf_tests(lf_libs):
for i in vlan_raw_lines:
self.temp_raw_lines.append(i)
self.chamber_view(raw_lines="custom")
if pass_fail_criteria:
# Station data
self.band_sta = list(num_stations.keys())[0]
if num_stations[self.band_sta] == 1:
print(self.temp_raw_lines)
shelf_resource = self.temp_raw_lines[0][0].split(" ")[1].split(".")
shelf = int(shelf_resource[0])
resource = int(shelf_resource[0])
sta_name = f"{shelf}.{resource}.wlan0"
self.local_realm.admin_up(sta_name)
sta_ip = self.local_realm.wait_for_ip([sta_name], timeout_sec=120)
sta_rows = ["4way time (us)", "channel", "ssid", "key/phrase", "cx time (us)", "dhcp (ms)",
"ip", "signal",
"mac", "mode"]
self.get_station_data(sta_name=[sta_name], rows=sta_rows,
allure_attach=True)
if sta_ip:
logging.info("ip's acquired")
else:
logging.info(
"Stations Failed to get IP's")
pytest.fail("Stations Failed to get IP's")
self.sta_mode_ = self.json_get(f'/port/{shelf}/{resource}/wlan0?fields=mode')['interface'][
'mode']
logging.info("sta_mode:- " + str(self.sta_mode_))
wificapacity_obj = WiFiCapacityTest(lfclient_host=self.manager_ip,
lf_port=self.manager_http_port,
ssh_port=self.manager_ssh_port,
@@ -1225,6 +1283,22 @@ class lf_tests(lf_libs):
logging.info("Numeric-score: " + str(numeric_score))
max_num_stations = int(sum(num_stations.values()))
logging.info("Max num stations: " + str(max_num_stations))
try:
# Admin down
exist_sta = []
for u in self.json_get("/port/?fields=port+type,alias")['interfaces']:
if list(u.values())[0]['port type'] not in ['Ethernet', 'WIFI-Radio', 'NA']:
exist_sta.append(list(u.values())[0]['alias'])
if len(exist_sta) == 0:
logging.info("Existing stations are not available")
else:
for port_eid in exist_sta:
# admin down
self.local_realm.admin_down(port_eid)
time.sleep(0.3)
except Exception as e:
print(e)
pass
if len(numeric_score) < 5:
if int(numeric_score[0][0]) < max_num_stations and int(numeric_score[1][0]) < max_num_stations and int(
numeric_score[-1][0]) > 0 and int(numeric_score[-2][0]) > 0:
@@ -1233,20 +1307,84 @@ class lf_tests(lf_libs):
if int(numeric_score[0][0]) == 0 and int(numeric_score[1][0]) == 0 and int(
numeric_score[2][0]) == 0:
pytest.fail("Did not report traffic")
if pass_fail_criteria:
if add_stations:
if num_stations[self.band_sta] == 1:
current_directory = os.getcwd()
file_path = current_directory + "/e2e/basic/performance_tests/performance_pass_fail.json"
logging.info("performance_pass file config path:- " + str(file_path))
with open(file_path, 'r') as file:
json_string = file.read()
all_pass_fail_data = json.loads(json_string)
logging.info("All Testbed pass fail data:- " + str(all_pass_fail_data))
# validate config json data
try:
json_object = json.dumps(all_pass_fail_data)
except ValueError as e:
logging.info("Performance Pass/Fail data is invalid")
pytest.fail("Performance Pass/Fail data is invalid")
logging.info("DUT Data: " + str(self.dut_data))
model = self.dut_data[0]["model"]
if model in all_pass_fail_data["AP Models"]:
pass_fail_values = all_pass_fail_data["AP Models"][model]
else:
logging.error("AP model is not available in performance_pass_fail.json file")
logging.info(str(model) + " All Benchmark throughput:- " + str(pass_fail_values))
split_mode = self.sta_mode_.split(" ")
key = f"{self.band_sta} {split_mode[2]} {split_mode[1]}MHz"
logging.info("key:- " + str(key))
proto = None
if "TCP" in protocol:
proto = "TCP"
else:
proto = "UDP"
logging.info("Proto:- " + str(proto))
logging.info("Given LF download_rate:- " + str(download_rate))
logging.info("Given LF upload_rate:- " + str(upload_rate))
pass_fail_value = pass_fail_values[key][proto]
download_rate = self.convert_to_gbps(download_rate)
logging.info("download_rate:- " + str(download_rate))
upload_rate = self.convert_to_gbps(upload_rate)
logging.info("upload_rate:- " + str(upload_rate))
# Pass fail logic for Upload. validating download rate because providing some value during Upload
if upload_rate > download_rate:
logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+")
allure.attach(name="Benchmark throughput: ",
body=str(pass_fail_value) + "+ Mbps")
actual_tht = int(numeric_score[1][0])
logging.info("Actual throughput:- " + str(actual_tht))
allure.attach(name="Actual throughput: ",
body=str(actual_tht) + " Mbps")
if actual_tht < pass_fail_value:
pytest.fail(
f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps")
elif upload_rate < download_rate:
# Pass fail logic for Download. validating upload rate because providing some value during download
logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+")
allure.attach(name="Benchmark throughput: ",
body=str(pass_fail_value) + "+ Mbps")
actual_tht = int(numeric_score[0][0])
logging.info("Actual throughput:- " + str(actual_tht))
allure.attach(name="Actual throughput: ",
body=str(actual_tht) + " Mbps")
if actual_tht < pass_fail_value:
pytest.fail(
f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps")
elif upload_rate == download_rate:
# Pass fail logic for bidirectional
pass_fail_value = pass_fail_value * 2
logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+")
allure.attach(name="Benchmark throughput: ",
body=str(pass_fail_value) + "+ Mbps")
actual_tht = int(numeric_score[2][0])
logging.info("Actual throughput:- " + str(actual_tht))
allure.attach(name="Actual throughput: ",
body=str(actual_tht) + " Mbps")
if actual_tht < pass_fail_value:
pytest.fail(
f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps")
wificapacity_obj_list.append(wificapacity_obj)
# Admin down
exist_sta = []
for u in self.json_get("/port/?fields=port+type,alias")['interfaces']:
if list(u.values())[0]['port type'] not in ['Ethernet', 'WIFI-Radio', 'NA']:
exist_sta.append(list(u.values())[0]['alias'])
if len(exist_sta) == 0:
logging.info("Existing stations are not available")
else:
for port_eid in exist_sta:
# admin down
self.local_realm.admin_down(port_eid)
time.sleep(0.3)
return wificapacity_obj_list
@@ -2734,17 +2872,23 @@ class lf_tests(lf_libs):
band = list(num_stations.keys())[0]
try:
self.set_radio_channel(radio=selected_ax_radio, antenna="4")
self.set_radio_channel(radio=selected_ax_radio, antenna="AUTO")
values = selected_ax_radio.split(".")
shelf = int(values[0])
resource = int(values[1])
self.pre_cleanup()
sta_name = ["1.1.ax_station"]
sta_name = [f"{shelf}.{resource}.ax_station"]
logging.info("sta_name:- " + str(sta_name))
self.client_connect_using_radio(ssid=ssid_name, passkey=passkey, mode=mode, station_name=sta_name,
radio=selected_ax_radio, vlan_id=vlan_id, create_vlan=True)
time.sleep(0.5)
sta_rows = ["ip", "mode", "channel", "signal", "parent dev", "mac"]
self.get_station_data(sta_name=sta_name, rows=sta_rows, allure_attach=True, allure_name="Station Data")
station_data = self.get_station_data(sta_name=sta_name, rows=sta_rows, allure_attach=True,
allure_name="Station Data")
logging.info("station_data:- " + str(station_data))
sta_mode = station_data[sta_name[0]]["mode"]
logging.info("sta_mode:- " + str(sta_mode))
wifi_capacity_obj_list = self.wifi_capacity(instance_name=instance_name, mode=mode,
download_rate=download_rate, upload_rate=upload_rate,
protocol=protocol, duration="60000", ssid_name=ssid_name,
@@ -2754,25 +2898,81 @@ class lf_tests(lf_libs):
report = wifi_capacity_obj_list[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
numeric_score = self.read_kpi_file(column_name=["numeric-score"], dir_name=report)
throughput = {
"download": [numeric_score[0][0]],
"upload": [numeric_score[1][0]],
"total": [numeric_score[2][0]]
}
self.attach_table_allure(data=throughput, allure_name="Throughput Data")
current_directory = os.getcwd()
file_path = current_directory + "/e2e/basic/performance_tests/performance_pass_fail.json"
logging.info("performance_pass file config path:- " + str(file_path))
with open(file_path, 'r') as file:
json_string = file.read()
all_pass_fail_data = json.loads(json_string)
logging.info("All Testbed pass fail data:- " + str(all_pass_fail_data))
# validate config json data
try:
json_object = json.dumps(all_pass_fail_data)
except ValueError as e:
logging.info("Performance Pass/Fail data is invalid")
pytest.fail("Performance Pass/Fail data is invalid")
logging.info("DUT Data: " + str(self.dut_data))
model = self.dut_data[0]["model"]
if model in all_pass_fail_data["AP Models"]:
pass_fail_values = all_pass_fail_data["AP Models"][model]
else:
logging.error("AP model is not available in performance_pass_fail.json file")
logging.info(str(model) + " All Benchmark throughput:- " + str(pass_fail_values))
split_mode = sta_mode.split(" ")
key = f"{band} {split_mode[2]} {split_mode[1]}MHz"
logging.info("key:- " + str(key))
proto = None
if "TCP" in protocol:
proto = "TCP"
else:
proto = "UDP"
logging.info("Proto:- " + str(proto))
logging.info("Given LF download_rate:- " + str(download_rate))
logging.info("Given LF upload_rate:- " + str(upload_rate))
pass_fail_value = pass_fail_values[key][proto]
logging.info("pass_fail value:- " + str(pass_fail_value))
download_rate = self.convert_to_gbps(download_rate)
logging.info("download_rate:- " + str(download_rate))
upload_rate = self.convert_to_gbps(upload_rate)
logging.info("upload_rate:- " + str(upload_rate))
# Pass fail logic for Upload. validating download rate because providing some value during Upload
if upload_rate > download_rate:
logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+")
allure.attach(name="Benchmark throughput: ",
body=str(pass_fail_value) + "+ Mbps")
actual_tht = int(numeric_score[1][0])
logging.info("Actual throughput:- " + str(actual_tht))
allure.attach(name="Actual throughput: ",
body=str(actual_tht) + " Mbps")
if actual_tht < pass_fail_value:
pytest.fail(
f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps")
elif upload_rate < download_rate:
# Pass fail logic for Download. validating upload rate because providing some value during download
logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+")
allure.attach(name="Benchmark throughput: ",
body=str(pass_fail_value) + "+ Mbps")
actual_tht = int(numeric_score[0][0])
logging.info("Actual throughput:- " + str(actual_tht))
allure.attach(name="Actual throughput: ",
body=str(actual_tht) + " Mbps")
if actual_tht < pass_fail_value:
pytest.fail(
f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps")
elif upload_rate == download_rate:
# Pass fail logic for bidirectional
pass_fail_value = pass_fail_value * 2
logging.info("Benchmark throughput:- " + str(pass_fail_value) + "+")
allure.attach(name="Benchmark throughput: ",
body=str(pass_fail_value) + "+ Mbps")
actual_tht = int(numeric_score[2][0])
logging.info("Actual throughput:- " + str(actual_tht))
allure.attach(name="Actual throughput: ",
body=str(actual_tht) + " Mbps")
if actual_tht < pass_fail_value:
pytest.fail(
f"Benchmark throughput:- {pass_fail_value}+ Mbps, Actual throughput:- {actual_tht} Mbps")
expected_throughput = 720 if band == "5G" else 200
fail_message = None
if download_rate == "10Gbps" and upload_rate != "10Gbps":
if throughput["download"][0] < expected_throughput:
fail_message = f"Download rate was {throughput['download'][0]}Mbps"
elif download_rate != "10Gbps" and upload_rate == "10Gbps":
if throughput["upload"][0] < expected_throughput:
fail_message = f"Upload rate was {throughput['upload'][0]}Mbps"
if fail_message is not None:
fail_message += f", less than the expected throughput of {expected_throughput}Mbps."
pytest.fail(fail_message)
finally:
self.set_radio_channel(radio=selected_ax_radio, antenna="0")