Added Multi SSID, Max SSID methods to lf_tests and get_radio_availabilities helper function to lf_libs. Added sta_list param to get_supplicant_logs for detailed display in report.

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2024-05-02 10:55:21 +05:30
parent dd34550248
commit 15f8e2a2a8
2 changed files with 497 additions and 6 deletions

View File

@@ -1355,7 +1355,7 @@ class lf_libs:
name=file_name, attachment_type="CSV")
return os.path.exists(path)
def get_supplicant_logs(self, radio="1.1.wiphy0", attach_allure=True):
def get_supplicant_logs(self, radio="1.1.wiphy0", sta_list=[], attach_allure=True):
try:
resource = radio.split(".")[1]
radio = radio.split(".")[2]
@@ -1368,7 +1368,7 @@ class lf_libs:
obj.pull_file()
if attach_allure:
allure.attach.file(source="wpa_supplicant_log_" + radio + ".txt",
name="wpa_supplicant_log - " + str(radio))
name=f"wpa_supplicant_log - {radio} - {', '.join(sta_list)}")
except Exception as e:
logging.error("get_supplicant_logs() - Error in getting supplicant Logs: " + str(e))
@@ -1693,6 +1693,101 @@ class lf_libs:
if name is not None:
allure.attach(name=name, body=str(data_table))
def get_radio_availabilities(self, num_stations_2g: int = 0, num_stations_5g: int = 0) -> tuple:
"""
Get the port name of radios and how many stations to be created on each radio for the given num of
2g stations and 5g stations. This method takes in account the fact that same radio can't be used to
create a station on multiple band at the same time even though it supports both bands.
- Returns tuple[dict[str, int], dict[str, int]] or skips the test if not enough radios are available
for the requested number of stations.
"""
message = None
requested_num_stations_2g = num_stations_2g
requested_num_stations_5g = num_stations_5g
radio_dict_2g = {}
radio_dict_5g = {}
dict_all_radios_2g = {
"wave2_2g_radios": self.wave2_2g_radios,
"wave1_radios": self.wave1_radios,
"mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"ax210_radios": self.ax210_radios
}
dict_all_radios_5g = {
"wave2_5g_radios": self.wave2_5g_radios,
"wave1_radios": self.wave1_radios,
"mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"ax210_radios": self.ax210_radios
}
max_station_per_radio = {
"wave2_2g_radios": 64,
"wave2_5g_radios": 64,
"wave1_radios": 64,
"mtk_radios": 19,
"ax200_radios": 1,
"ax210_radios": 1
}
for i in range(2):
if num_stations_2g > num_stations_5g:
for keys in dict_all_radios_2g:
if num_stations_2g == 0:
break
max_station = max_station_per_radio[keys]
if len(dict_all_radios_2g[keys]) > 0:
diff = max_station - num_stations_2g
for port_name in dict_all_radios_2g[keys]:
if port_name in radio_dict_5g:
continue
if diff >= 0:
radio_dict_2g[port_name] = num_stations_2g
num_stations_2g = 0
break
else:
radio_dict_2g[port_name] = max_station
num_stations_2g -= max_station
diff = max_station - num_stations_2g
if num_stations_2g != 0:
if i == 0:
message = f"Not enough radios available for connecting {requested_num_stations_2g} 2g clients!"
break
else:
for keys in dict_all_radios_5g:
if num_stations_5g == 0:
break
max_station = max_station_per_radio[keys]
if len(dict_all_radios_5g[keys]) > 0:
diff = max_station - num_stations_5g
for port_name in dict_all_radios_5g[keys]:
if port_name in radio_dict_2g:
continue
if diff >= 0:
radio_dict_5g[port_name] = num_stations_5g
num_stations_5g = 0
break
else:
radio_dict_5g[port_name] = max_station
num_stations_5g -= max_station
diff = max_station - num_stations_5g
if num_stations_5g != 0:
if i == 0:
message = f"Not enough radios available for connecting {requested_num_stations_5g} 5g clients!"
break
if num_stations_2g != 0 or num_stations_5g != 0:
logging.info(f"Radio-2G-Stations dict : {num_stations_2g}")
logging.info(f"Radio-5G-Stations dict : {num_stations_5g}")
if message is None:
message = (f"Not enough radios available for connecting {requested_num_stations_2g} 2g clients and "
f"{requested_num_stations_5g} 5g clients simultaneously!")
logging.info(message)
pytest.skip(message)
return radio_dict_2g, radio_dict_5g
def client_connect_using_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band=None,
vlan_id=[None], radio=None, client_type=0, station_name=[], dut_data=None,
sniff_radio=False, create_vlan=True, attach_port_info=True,
@@ -1710,11 +1805,11 @@ class lf_libs:
logging.error("VLAN ID is Unspecified in the VLAN Case")
pytest.skip("VLAN ID is Unspecified in the VLAN Case")
else:
up = self.get_wan_upstream_ports()
upstream = list(up.values())
upstream_port = upstream[0] + "." + str(vlan_id[0])
if create_vlan:
self.add_vlan(vlan_ids=vlan_id, build=True)
up = self.get_wan_upstream_ports()
upstream = list(up.values())
upstream_port = upstream[0] + "." + str(vlan_id[0])
else:
self.add_vlan(vlan_ids=vlan_id, build=False)
print("upstream_port1:", upstream_port)
@@ -1763,7 +1858,7 @@ class lf_libs:
client_connect.build()
# fetch supplicant logs from lanforge
try:
self.get_supplicant_logs(radio=str(radio))
self.get_supplicant_logs(radio=str(radio), sta_list=station_name)
except Exception as e:
print(e)

View File

@@ -2774,6 +2774,402 @@ class lf_tests(lf_libs):
finally:
self.set_radio_channel(radio=selected_ax_radio, antenna="0")
def multi_ssid_test(self, setup_params_general: dict, no_of_2g_and_5g_stations: int = 2, mode: str = "BRIDGE",
security_key: str = "something", security: str = "wpa2") -> None:
sta_names_2g, sta_names_5g = [], []
for i in range(no_of_2g_and_5g_stations):
sta_names_2g.append(f"sta_2g_{i + 1}")
sta_names_5g.append(f"sta_5g_{i + 1}")
cx_sta_list = [sta_names_2g[-2], sta_names_2g[-1], sta_names_5g[-2], sta_names_5g[-1]]
radio_dict_2g, radio_dict_5g = self.get_radio_availabilities(num_stations_2g=len(sta_names_2g),
num_stations_5g=len(sta_names_5g))
logging.info(f"Radio-2G-Stations dict : {radio_dict_2g}")
logging.info(f"Radio-5G-Stations dict : {radio_dict_5g}")
security_mode = 'wpa2_personal'
for security_mode_ in setup_params_general["ssid_modes"]:
security_mode = security_mode_
sta_got_ip = []
allure.attach(name="ssid info", body=str(setup_params_general["ssid_modes"][security_mode]))
self.pre_cleanup()
no_of_ssids = len(setup_params_general["ssid_modes"][security_mode])
logging.info(f"A total of {no_of_2g_and_5g_stations} 2G and {no_of_2g_and_5g_stations} 5G stations will be "
f"created for {no_of_ssids} SSIDs, i.e., a 2G and a 5G stations on each SSID.")
for i in range(no_of_2g_and_5g_stations):
ssid_name = setup_params_general["ssid_modes"][security_mode][i % no_of_ssids]["ssid_name"]
logging.info(f"Creating a 2G station on {ssid_name} ssid...")
radio = None
for _radio in radio_dict_2g:
radio = _radio
if radio_dict_2g[radio] == 1:
del radio_dict_2g[radio]
else:
radio_dict_2g[radio] -= 1
break
sta_got_ip.append(self.client_connect_using_radio(ssid=ssid_name, security=security,
passkey=security_key, mode=mode,
radio=radio,
station_name=[sta_names_2g[i]],
attach_station_data=False,
attach_port_info=False))
logging.info(f"Creating a 5G station on {ssid_name} ssid...")
for _radio in radio_dict_5g:
radio = _radio
if radio_dict_5g[radio] == 1:
del radio_dict_5g[radio]
else:
radio_dict_5g[radio] -= 1
break
sta_got_ip.append(self.client_connect_using_radio(ssid=ssid_name, security=security,
passkey=security_key, mode=mode,
radio=radio,
station_name=[sta_names_5g[i]],
attach_station_data=False,
attach_port_info=False))
port_data = self.json_get(_req_url="port?fields=ip")
port_info = {key: value for d in port_data["interfaces"] for key, value in d.items()}
self.allure_report_table_format(dict_data=port_info, key="Port Names", value="ip",
name="Port info after creating all stations")
dict_table_2g_1st = {}
dict_table_2g_2nd = {}
dict_table_5g_1st = {}
dict_table_5g_2nd = {}
for sta in sta_names_2g + sta_names_5g:
result = self.json_get(_req_url="port/1/1/%s" % sta)
if "Key" not in dict_table_2g_1st:
dict_table_2g_1st["Key"] = list(result["interface"].keys())
dict_table_2g_2nd["Key"] = list(result["interface"].keys())
dict_table_5g_1st["Key"] = list(result["interface"].keys())
dict_table_5g_2nd["Key"] = list(result["interface"].keys())
if '_2g_' in sta:
if len(dict_table_2g_1st) < 5:
dict_table_2g_1st[f"Value ({sta})"] = list(result["interface"].values())
else:
dict_table_2g_2nd[f"Value ({sta})"] = list(result["interface"].values())
else:
if len(dict_table_5g_1st) < 5:
dict_table_5g_1st[f"Value ({sta})"] = list(result["interface"].values())
else:
dict_table_5g_2nd[f"Value ({sta})"] = list(result["interface"].values())
data_table_2g_1st = tabulate(dict_table_2g_1st, headers='keys', tablefmt='fancy_grid')
data_table_2g_2nd = tabulate(dict_table_2g_2nd, headers='keys', tablefmt='fancy_grid')
data_table_5g_1st = tabulate(dict_table_5g_1st, headers='keys', tablefmt='fancy_grid')
data_table_5g_2nd = tabulate(dict_table_5g_2nd, headers='keys', tablefmt='fancy_grid')
logging.info(f"2G Stations Data (1-{min(4, no_of_2g_and_5g_stations)}): \n{data_table_2g_1st}\n")
allure.attach(name=f"2G Stations Data (1-{min(4, no_of_2g_and_5g_stations)})", body=str(data_table_2g_1st))
if no_of_2g_and_5g_stations > 4:
logging.info(f"2G Stations Data (5-{no_of_2g_and_5g_stations}): \n{data_table_2g_2nd}\n")
allure.attach(name=f"2G Stations Data (5-{no_of_2g_and_5g_stations})", body=str(data_table_2g_2nd))
logging.info(f"5G Stations Data (1-{min(4, no_of_2g_and_5g_stations)}): \n{data_table_5g_1st}\n")
allure.attach(name=f"5G Stations Data (1-{min(4, no_of_2g_and_5g_stations)})", body=str(data_table_5g_1st))
if no_of_2g_and_5g_stations > 4:
logging.info(f"5G Stations Data (5-{no_of_2g_and_5g_stations}): \n{data_table_5g_2nd}\n")
allure.attach(name=f"5G Stations Data (5-{no_of_2g_and_5g_stations})", body=str(data_table_5g_2nd))
if False in sta_got_ip:
logging.info("Some/All Stations didn't get IP address")
pytest.fail("Some/All Stations didn't get IP address")
logging.info("All 2G/5G Stations got IP address")
# create Layer 3 and check data path
for i in range(3):
self.create_layer3(side_a_min_rate=6291456, side_a_max_rate=0,
side_b_min_rate=6291456, side_b_max_rate=0,
traffic_type="lf_tcp", sta_list=[cx_sta_list[i]],
side_b=cx_sta_list[i + 1], start_cx=True,
prefix=f"{cx_sta_list[i][4:]}-{cx_sta_list[i + 1][4:]}:t")
logging.info(f"CX with TCP traffic created between "
f"endpoint-a = {cx_sta_list[i]} and endpoint-b = {cx_sta_list[i + 1]}.")
time.sleep(2)
self.create_layer3(side_a_min_rate=6291456, side_a_max_rate=0,
side_b_min_rate=6291456, side_b_max_rate=0,
traffic_type="lf_udp", sta_list=[cx_sta_list[i]],
side_b=cx_sta_list[i + 1], start_cx=True,
prefix=f"{cx_sta_list[i][4:]}-{cx_sta_list[i + 1][4:]}:u")
logging.info(f"CX with UDP traffic created between "
f"endpoint-a = {cx_sta_list[i]} and endpoint-b = {cx_sta_list[i + 1]}.")
time.sleep(2)
logging.info("Running Layer3 traffic for 40 sec ...")
time.sleep(40)
cx_list = self.get_cx_list()
dict_table_cx_tcp = {}
dict_table_cx_udp = {}
pass_fail_data = []
for i in range(len(cx_list)):
cx_data = self.json_get(_req_url=f"cx/{cx_list[i]}")
cx_name = f"sta_{cx_list[i].split(':')[0].split('-')[0]} <==> sta_{cx_list[i].split(':')[0].split('-')[1]}"
if "L3 CX Column" not in dict_table_cx_tcp:
dict_table_cx_tcp["L3 CX Column"] = list(cx_data[f"{cx_list[i]}"].keys())
dict_table_cx_udp["L3 CX Column"] = list(cx_data[f"{cx_list[i]}"].keys())
if "TCP" in cx_data[f"{cx_list[i]}"]['type']:
dict_table_cx_tcp[f"values ({cx_name})"] = list(cx_data[f"{cx_list[i]}"].values())
else:
dict_table_cx_udp[f"values ({cx_name})"] = list(cx_data[f"{cx_list[i]}"].values())
if cx_data[cx_list[i]]['bps rx a'] != 0 and cx_data[cx_list[i]]['bps rx a'] != 0:
res = True
else:
res = False
pass_fail_data.append(
[f"{cx_list[i]}", f"{cx_data[cx_list[i]]['bps rx a']}", f"{cx_data[cx_list[i]]['bps rx b']}", res])
# attach l3 cx data to allure
data_table_cx_tcp = tabulate(dict_table_cx_tcp, headers='keys', tablefmt='fancy_grid')
data_table_cx_udp = tabulate(dict_table_cx_udp, headers='keys', tablefmt='fancy_grid')
logging.info(f"L3 cross-connects Data (TCP): \n{data_table_cx_tcp}\n")
logging.info(f"L3 cross-connects Data (UDP): \n{data_table_cx_udp}\n")
allure.attach(name="L3 cross-connects Data (TCP)", body=str(data_table_cx_tcp))
allure.attach(name="L3 cross-connects Data (UDP)", body=str(data_table_cx_udp))
# attach pass fail data to allure
result_table = tabulate(pass_fail_data,
headers=["Data Path", "Tx Rate (bps)", "Rx Rate (bps)", "Pass/Fail"],
tablefmt='fancy_grid')
logging.info(f"Test Result Table: \n{result_table}\n")
allure.attach(name="Test Result Table", body=str(result_table))
# cleanup Layer3 data
self.client_disconnect(station_name=sta_names_2g + sta_names_5g, clean_l3_traffic=True, clear_all_sta=True)
test_result = True
for pf in pass_fail_data:
if pf[3] is False:
test_result = False
if not test_result:
pytest.fail("DataPath check failed, Traffic didn't reported on some endpoints")
def max_ssid(self, setup_params_general: dict, mode: str = 'BRIDGE', vlan_id: list = None) -> None:
self.pre_cleanup()
ssid_2g_list = []
ssid_5g_list = []
for security, ssids in setup_params_general["ssid_modes"].items():
for ssid in ssids:
ssid_dict = {
'ssid_name': ssid["ssid_name"],
'security': security.split("_")[0],
'password': ssid.get("security_key", "[BLANK]"),
}
if "2G" in ssid["appliedRadios"]:
ssid_2g_list.append(ssid_dict)
elif "5G" in ssid["appliedRadios"]:
ssid_5g_list.append(ssid_dict)
no_of_sta_2g = len(ssid_2g_list)
no_of_sta_5g = len(ssid_5g_list)
sta_names_2g = [f"sta_2g_{i + 1}" for i in range(no_of_sta_2g)]
sta_names_5g = [f"sta_5g_{i + 1}" for i in range(no_of_sta_5g)]
radio_dict_2g, radio_dict_5g = self.get_radio_availabilities(num_stations_2g=no_of_sta_2g,
num_stations_5g=no_of_sta_5g)
if len(radio_dict_2g) > 0:
logging.info(f"Radio-Stations dict : {radio_dict_2g}")
if len(radio_dict_5g) > 0:
logging.info(f"Radio-Stations dict : {radio_dict_5g}")
if no_of_sta_2g > 0:
logging.info(f"A total of {no_of_sta_2g} 2G stations will be created for {no_of_sta_2g} SSIDs, "
f"i.e., one 2G stations on each SSID.")
if no_of_sta_5g > 0:
logging.info(f"A total of {no_of_sta_5g} 5G stations will be created for {no_of_sta_5g} SSIDs, "
f"i.e., one 5G stations on each SSID.")
upstream_port = ""
if mode == 'VLAN':
self.add_vlan(vlan_ids=vlan_id, build=True)
up = self.get_wan_upstream_ports()
upstream = list(up.values())
upstream_port = upstream[0] + "." + str(vlan_id[0])
sta_got_ip = []
radio = None
timeout_sec = 100 if no_of_sta_2g <= 8 and no_of_sta_5g <= 8 else 10
for i in range(no_of_sta_2g):
logging.info(f"Creating a 2G station on {ssid_2g_list[i]['ssid_name']} ssid...")
for _radio in radio_dict_2g:
radio = _radio
if radio_dict_2g[radio] == 1:
del radio_dict_2g[radio]
else:
radio_dict_2g[radio] -= 1
break
sta_got_ip.append(self.client_connect_using_radio(ssid=ssid_2g_list[i]['ssid_name'],
security=ssid_2g_list[i]['security'],
passkey=ssid_2g_list[i]['password'],
mode=mode,
radio=radio,
station_name=[sta_names_2g[i]],
attach_station_data=False,
attach_port_info=False,
timeout_sec=timeout_sec,
vlan_id=vlan_id,
create_vlan=False))
for i in range(no_of_sta_5g):
logging.info(f"Creating a 5G station on {ssid_5g_list[i]['ssid_name']} ssid...")
for _radio in radio_dict_5g:
radio = _radio
if radio_dict_5g[radio] == 1:
del radio_dict_5g[radio]
else:
radio_dict_5g[radio] -= 1
break
sta_got_ip.append(self.client_connect_using_radio(ssid=ssid_5g_list[i]['ssid_name'],
security=ssid_5g_list[i]['security'],
passkey=ssid_5g_list[i]['password'],
mode=mode,
radio=radio,
station_name=[sta_names_5g[i]],
attach_station_data=False,
attach_port_info=False,
timeout_sec=timeout_sec,
vlan_id=vlan_id,
create_vlan=False))
logging.info("Fetching port info after all stations created")
port_data = self.json_get(_req_url="port?fields=ip")
port_info = {key: value for d in port_data["interfaces"] for key, value in d.items()}
self.allure_report_table_format(dict_data=port_info, key="Port Names", value="ip",
name="Port info after creating all stations")
logging.info("Adding Station Data to the report")
dict_table_sta = {}
start_sta, end_sta = 1, 0
for index, sta in enumerate(sta_names_2g):
end_sta += 1
result = self.json_get(_req_url="port/1/1/%s" % sta)
if "Key" not in dict_table_sta:
dict_table_sta["Key"] = list(result["interface"].keys())
dict_table_sta[f"Value ({sta})"] = list(result["interface"].values())
if end_sta - start_sta == 3 or index == len(sta_names_2g) - 1:
data_table_sta = tabulate(dict_table_sta, headers='keys', tablefmt='fancy_grid')
logging.info(f"2G-Stations Data ({start_sta}-{end_sta}): \n{data_table_sta}\n")
allure.attach(name=f"2G-Stations Data ({start_sta}-{end_sta})", body=str(data_table_sta))
start_sta = end_sta + 1
dict_table_sta.clear()
start_sta, end_sta = 1, 0
for index, sta in enumerate(sta_names_5g):
end_sta += 1
result = self.json_get(_req_url="port/1/1/%s" % sta)
if "Key" not in dict_table_sta:
dict_table_sta["Key"] = list(result["interface"].keys())
dict_table_sta[f"Value ({sta})"] = list(result["interface"].values())
if end_sta - start_sta == 3 or index == len(sta_names_5g) - 1:
data_table_sta = tabulate(dict_table_sta, headers='keys', tablefmt='fancy_grid')
logging.info(f"5G-Stations Data ({start_sta}-{end_sta}): \n{data_table_sta}\n")
allure.attach(name=f"5G-Stations Data ({start_sta}-{end_sta})", body=str(data_table_sta))
start_sta = end_sta + 1
dict_table_sta.clear()
if no_of_sta_2g > 8 or no_of_sta_5g > 8:
if True in sta_got_ip:
logging.info("Some/All stations got the IP when more than 8 SSIDs were configured on a single band!")
pytest.fail("Some/All stations got the IP when more than 8 SSIDs were configured on a single band!")
else:
logging.info("As expected, None of the stations got the IP when more than 8 SSIDs were configured "
"on a single band!")
self.pre_cleanup()
return
if False in sta_got_ip:
logging.info("Some/All Stations didn't get IP address")
pytest.fail("Some/All Stations didn't get IP address")
logging.info("All Stations got IP address")
logging.info("Creating Layer3 traffic on stations...")
for sta in sta_names_2g + sta_names_5g:
self.create_layer3(side_a_min_rate=6291456, side_a_max_rate=0,
side_b_min_rate=6291456, side_b_max_rate=0,
traffic_type="lf_tcp", sta_list=[sta], side_b=upstream_port,
start_cx=True, prefix=f"t-")
logging.info(f"CX with TCP traffic created between endpoint-a = {sta} and endpoint-b = upstream port.")
time.sleep(2)
self.create_layer3(side_a_min_rate=6291456, side_a_max_rate=0,
side_b_min_rate=6291456, side_b_max_rate=0,
traffic_type="lf_udp", sta_list=[sta], side_b=upstream_port,
start_cx=True, prefix=f"u-")
logging.info(f"CX with UDP traffic created between endpoint-a = {sta} and endpoint-b = upstream port.")
time.sleep(2)
logging.info("Running Layer3 traffic for 40 sec ...")
time.sleep(40)
logging.info("Fetching CX data and adding it to the report...")
cx_list = self.get_cx_list()
dict_table_cx_tcp = {}
dict_table_cx_udp = {}
pass_fail_data = []
overall_test = True
start_tcp, start_udp = 1, 1
end_tcp, end_udp = 0, 0
for i in range(len(cx_list)):
cx_data = self.json_get(_req_url=f"cx/{cx_list[i]}")
cx_name = f"{cx_list[i].split('-')[1]}"
if "L3 CX Column" not in dict_table_cx_tcp:
dict_table_cx_tcp["L3 CX Column"] = list(cx_data[f"{cx_list[i]}"].keys())
if "L3 CX Column" not in dict_table_cx_udp:
dict_table_cx_udp["L3 CX Column"] = list(cx_data[f"{cx_list[i]}"].keys())
if "TCP" in cx_data[f"{cx_list[i]}"]['type']:
end_tcp += 1
dict_table_cx_tcp[f"values ({cx_name})"] = list(cx_data[f"{cx_list[i]}"].values())
else:
end_udp += 1
dict_table_cx_udp[f"values ({cx_name})"] = list(cx_data[f"{cx_list[i]}"].values())
if cx_data[cx_list[i]]['bps rx a'] != 0 and cx_data[cx_list[i]]['bps rx a'] != 0:
res = True
else:
overall_test = False
res = False
pass_fail_data.append(
[f"{cx_list[i][:-2]}", f"{cx_data[cx_list[i]]['bps rx a']}", f"{cx_data[cx_list[i]]['bps rx b']}", res])
# attach l3 cx data to allure
if end_tcp - start_tcp == 3 or (i == len(cx_list) - 1 and start_tcp <= end_tcp):
data_table_cx_tcp = tabulate(dict_table_cx_tcp, headers='keys', tablefmt='fancy_grid')
logging.info(f"L3 cross-connects Data (TCP) ({start_tcp} - {end_tcp}): \n{data_table_cx_tcp}\n")
allure.attach(name=f"L3 cross-connects Data (TCP) ({start_tcp} - {end_tcp})",
body=str(data_table_cx_tcp))
start_tcp = end_tcp + 1
dict_table_cx_tcp.clear()
if end_udp - start_udp == 3 or (i == len(cx_list) - 1 and start_udp <= end_udp):
data_table_cx_udp = tabulate(dict_table_cx_udp, headers='keys', tablefmt='fancy_grid')
logging.info(f"L3 cross-connects Data (UDP) ({start_udp} - {end_udp}): \n{data_table_cx_udp}\n")
allure.attach(name=f"L3 cross-connects Data (UDP) ({start_udp} - {end_udp})",
body=str(data_table_cx_udp))
start_udp = end_udp + 1
dict_table_cx_udp.clear()
logging.info("Attaching pass/fail data to the report...")
result_table = tabulate(pass_fail_data,
headers=["Data Path", "Tx Rate (bps)", "Rx Rate (bps)", "Pass/Fail"],
tablefmt='fancy_grid')
logging.info(f"Test Result Table: \n{result_table}\n")
allure.attach(name="Test Result Table", body=str(result_table))
self.pre_cleanup()
if overall_test is False:
pytest.fail("DataPath check failed, Traffic didn't reported on some endpoints")
logging.info("All Traffic reported on all endpoints, test successful!")
def strict_forwarding(self, ssids=[], num_stations_per_ssid=1, security="wpa2", dut_data={}, passkey="[BLANK]",
mode="BRIDGE", side_a_min_rate=6291456, side_a_max_rate=6291456, side_b_min_rate=0,
side_b_max_rate=0,