mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
mu-mimo test settings issue fixed
Signed-off-by: anil-tegala <anil.tegala@candelatech.com>
This commit is contained in:
@@ -52,6 +52,8 @@ lf_dataplane_test = importlib.import_module("py-scripts.lf_dataplane_test")
|
||||
DataplaneTest = lf_dataplane_test.DataplaneTest
|
||||
ttlstest = importlib.import_module("py-scripts.test_ipv4_ttls")
|
||||
TTLSTest = ttlstest.TTLSTest
|
||||
tr398test = importlib.import_module("py-scripts.lf_tr398_test")
|
||||
TR398Test = tr398test.TR398Test
|
||||
tr398v2test = importlib.import_module("py-scripts.lf_tr398v2_test")
|
||||
TR398v2Test = tr398v2test.TR398v2Test
|
||||
rvr = importlib.import_module("py-scripts.lf_rvr_test")
|
||||
@@ -748,7 +750,8 @@ class lf_tests(lf_libs):
|
||||
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
|
||||
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"]
|
||||
if data[identifier]["sniff_radio_2g"] is not None and sniffer_channel is not None:
|
||||
self.start_sniffer(radio_channel=sniffer_channel, test_name=f'{data[identifier]["station_data"][radio][0]}',
|
||||
self.start_sniffer(radio_channel=sniffer_channel,
|
||||
test_name=f'{data[identifier]["station_data"][radio][0]}',
|
||||
radio=data[identifier]["sniff_radio_2g"],
|
||||
duration=120)
|
||||
logging.info("started-sniffer")
|
||||
@@ -764,7 +767,8 @@ class lf_tests(lf_libs):
|
||||
if band == "sixg":
|
||||
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \
|
||||
dict(dut_data.get(identifier)["radio_data"])["6G"] is not None:
|
||||
sniffer_channel = self.lf_sixg_lookup_validation(int(dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]))
|
||||
sniffer_channel = self.lf_sixg_lookup_validation(
|
||||
int(dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]))
|
||||
logging.info("LF sixg channel: " + str(sniffer_channel))
|
||||
if data[identifier]["sniff_radio_6g"] is not None and sniffer_channel is not None:
|
||||
self.start_sniffer(radio_channel=sniffer_channel,
|
||||
@@ -1212,7 +1216,8 @@ class lf_tests(lf_libs):
|
||||
max_num_stations = int(sum(num_stations.values()))
|
||||
logging.info("Max num stations: " + str(max_num_stations))
|
||||
if len(numeric_score) < 5:
|
||||
if int(numeric_score[0][0]) < max_num_stations and int(numeric_score[1][0]) < max_num_stations and int(numeric_score[-1][0]) > 0 and int(numeric_score[-2][0]) > 0:
|
||||
if int(numeric_score[0][0]) < max_num_stations and int(numeric_score[1][0]) < max_num_stations and int(
|
||||
numeric_score[-1][0]) > 0 and int(numeric_score[-2][0]) > 0:
|
||||
pytest.fail("Station did not get an ip")
|
||||
else:
|
||||
if int(numeric_score[0][0]) == 0 and int(numeric_score[1][0]) == 0 and int(
|
||||
@@ -1390,7 +1395,7 @@ class lf_tests(lf_libs):
|
||||
thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],))
|
||||
thr1.start()
|
||||
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan,
|
||||
download_rate=downld_rate,add_stations=False,
|
||||
download_rate=downld_rate, add_stations=False,
|
||||
stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=upld_rate,
|
||||
protocol="UDP-IPv4", duration="120000", create_stations=False,
|
||||
dut_data=dut_data, create_vlan=False,
|
||||
@@ -1399,7 +1404,7 @@ class lf_tests(lf_libs):
|
||||
report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
|
||||
file_name = "/csv-data/data-Combined_bps__60_second_running_average-1.csv"
|
||||
if not os.path.exists(f"../reports/{report_name}{file_name}"):
|
||||
file_name = file_name.replace('_bps__','_Mbps__')
|
||||
file_name = file_name.replace('_bps__', '_Mbps__')
|
||||
csv_val = self.read_csv_individual_station_throughput(dir_name=report_name, option=traffic_direction,
|
||||
file_name=file_name)
|
||||
logging.info(csv_val)
|
||||
@@ -1471,11 +1476,11 @@ class lf_tests(lf_libs):
|
||||
logging.error(f"{e}")
|
||||
return False
|
||||
|
||||
def tr398(self,radios_2g=[], radios_5g=[], radios_ax=[], dut_name="TIP", dut_5g="", dut_2g="", mode="BRIDGE",
|
||||
vlan_id=1, skip_2g=True, skip_5g=False, instance_name="", test=None, move_to_influx=False,dut_data={},
|
||||
ssid_name='', security_key='[BLANK]', security="open", sniff_packets=False, create_vlan=True, tr398v2=False,
|
||||
tr398=False):
|
||||
#User can select one or more TR398 tests
|
||||
def tr398(self, radios_2g=[], radios_5g=[], radios_ax=[], dut_name="TIP", dut_5g="", dut_2g="", mode="BRIDGE",
|
||||
vlan_id=1, skip_2g=True, skip_5g=False, instance_name="", test=None, move_to_influx=False, dut_data={},
|
||||
ssid_name='', security_key='[BLANK]', security="open", sniff_packets=False, create_vlan=True,
|
||||
tr398v2=True, tr398=False):
|
||||
# User can select one or more TR398 tests
|
||||
try:
|
||||
if type(test) == str:
|
||||
test = test.split(",")
|
||||
@@ -1496,9 +1501,11 @@ class lf_tests(lf_libs):
|
||||
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0] + "." + str(vlan_id[0])
|
||||
logging.info("Upstream data: " + str(upstream_port))
|
||||
|
||||
atten_serial = self.attenuator_serial_radio(ssid=ssid_name, passkey=security_key, security=security, sta_mode=0,
|
||||
station_name=['sta0000'],
|
||||
radio=self.wave2_2g_radios[0] if skip_5g else self.wave2_5g_radios[0])
|
||||
# atten_serial = self.attenuator_serial_radio(ssid=ssid_name, passkey=security_key, security=security,
|
||||
# sta_mode=0, station_name=['sta0000'], radio=self.wave2_2g_radios[0] if skip_5g else
|
||||
# self.wave2_5g_radios[0])
|
||||
|
||||
atten_serial = self.attenuator_serial()
|
||||
|
||||
if tr398v2:
|
||||
enable_tests = [['rxsens: 0'], ['max_cx: 0'], ['max_tput: 0'], ['peak_perf: 0'], ['max_tput_bi: 0'],
|
||||
@@ -1515,12 +1522,17 @@ class lf_tests(lf_libs):
|
||||
logging.info(f"Unable to find the {t} test in selected run")
|
||||
raise ValueError(f"Unable to find the {t} test in selected run")
|
||||
|
||||
rad_atten = [[f'atten-0: {atten_serial[0]}.0'], [f'atten-1: {atten_serial[0]}.1'], [f'atten-2: {atten_serial[0]}.2'],
|
||||
[f'atten-3: {atten_serial[0]}.3'], [f'atten-4: {atten_serial[1]}.0'], [f'atten-5: {atten_serial[1]}.1'],
|
||||
rad_atten = [[f'atten-0: {atten_serial[0]}.0'], [f'atten-1: {atten_serial[0]}.1'],
|
||||
[f'atten-2: {atten_serial[0]}.2'],
|
||||
[f'atten-3: {atten_serial[0]}.3'], [f'atten-4: {atten_serial[1]}.0'],
|
||||
[f'atten-5: {atten_serial[1]}.1'],
|
||||
[f'atten-8: {atten_serial[1]}.2'], [f'atten-9: {atten_serial[1]}.3']]
|
||||
|
||||
skip_band = [['Skip 2.4Ghz Tests', f'{skip_twog}'], ['Skip 5Ghz Tests', f'{skip_fiveg}'],
|
||||
['2.4Ghz Channel', 'AUTO'], ['5Ghz Channel', 'AUTO'], ['Skip AX Tests', '1']]
|
||||
['2.4Ghz Channel', 'AUTO'], ['5Ghz Channel', 'AUTO']]
|
||||
|
||||
skip_bandv2 = [['Skip 2.4Ghz Tests', f'{skip_twog}'], ['Skip 5Ghz Tests', f'{skip_fiveg}'],
|
||||
['2.4Ghz Channel', 'AUTO'], ['5Ghz Channel', 'AUTO'], ['Skip AX Tests', '1']]
|
||||
|
||||
if len(radios_2g) >= 3 and len(radios_5g) >= 3:
|
||||
for i in range(6):
|
||||
@@ -1565,6 +1577,9 @@ class lf_tests(lf_libs):
|
||||
raw_line.append([f'radio-{i}: {radios_2g[0] if i == 1 else radios_2g[1]}'])
|
||||
if i == 4 or i == 5:
|
||||
raw_line.append([f'radio-{i}: {radios_ax[0] if i == 4 else radios_ax[1]}'])
|
||||
elif len(radios_2g) == 0 and len(radios_5g) == 0 and len(radios_ax) >= 3 and len(radios_ax) >= 6:
|
||||
for i in range(6):
|
||||
raw_line.append([f'radio-{i}: {radios_ax[i]}'])
|
||||
|
||||
if len(raw_line) != 6:
|
||||
raw_line = [['radio-0: 1.1.5 wiphy1'], ['radio-1: 1.1.4 wiphy0'], ['radio-2: 1.1.7 wiphy3'],
|
||||
@@ -1593,27 +1608,46 @@ class lf_tests(lf_libs):
|
||||
f.write(str(i[0]) + "\n")
|
||||
f.close()
|
||||
|
||||
self.cvtest_obj = TR398v2Test(lf_host=self.manager_ip,
|
||||
lf_port=self.manager_http_port,
|
||||
lf_user="lanforge",
|
||||
lf_password="lanforge",
|
||||
instance_name=instance_name,
|
||||
config_name="cv_dflt_cfg",
|
||||
upstream=upstream_port,
|
||||
pull_report=True,
|
||||
local_lf_report_dir=self.local_report_path,
|
||||
load_old_cfg=False,
|
||||
dut2=dut_2g,
|
||||
dut5=dut_5g,
|
||||
raw_lines_file="tr398-test-config.txt",
|
||||
enables=[],
|
||||
disables=[],
|
||||
raw_lines=[],
|
||||
sets=skip_band,
|
||||
test_rig=dut_name)
|
||||
if tr398v2:
|
||||
self.cvtest_obj = TR398v2Test(lf_host=self.manager_ip,
|
||||
lf_port=self.manager_http_port,
|
||||
lf_user="lanforge",
|
||||
lf_password="lanforge",
|
||||
instance_name=instance_name,
|
||||
config_name="cv_dflt_cfg",
|
||||
upstream=upstream_port,
|
||||
pull_report=True,
|
||||
local_lf_report_dir=self.local_report_path,
|
||||
load_old_cfg=False,
|
||||
dut2=dut_2g,
|
||||
dut5=dut_5g,
|
||||
raw_lines_file="tr398-test-config.txt",
|
||||
enables=[],
|
||||
disables=[],
|
||||
raw_lines=[],
|
||||
sets=skip_bandv2,
|
||||
test_rig=dut_name)
|
||||
self.cvtest_obj.test_name, self.cvtest_obj.blob_text = "TR-398 Issue 2", "TR-398v2-"
|
||||
elif tr398:
|
||||
self.cvtest_obj = TR398Test(lf_host=self.lanforge_ip,
|
||||
lf_port=self.lanforge_port,
|
||||
lf_user="lanforge",
|
||||
lf_password="lanforge",
|
||||
instance_name=instance_name,
|
||||
config_name="cv_dflt_cfg",
|
||||
upstream="1.1." + upstream_port,
|
||||
pull_report=True,
|
||||
local_lf_report_dir=self.local_report_path,
|
||||
load_old_cfg=False,
|
||||
dut2=dut_2g,
|
||||
dut5=dut_5g,
|
||||
raw_lines_file="mu-mimo-config.txt",
|
||||
enables=[],
|
||||
disables=[],
|
||||
raw_lines=[],
|
||||
sets=skip_band,
|
||||
test_rig=dut_name
|
||||
)
|
||||
self.cvtest_obj.test_name, self.cvtest_obj.blob_text = "TR-398", "TR-398-"
|
||||
self.cvtest_obj.result = True
|
||||
self.cvtest_obj.setup()
|
||||
@@ -1634,7 +1668,8 @@ class lf_tests(lf_libs):
|
||||
|
||||
if move_to_influx:
|
||||
try:
|
||||
report_name = "../reports/" + self.cvtest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
||||
report_name = "../reports/" + \
|
||||
self.cvtest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
||||
influx = CSVtoInflux(influx_host=self.influx_params["influx_host"],
|
||||
influx_port=self.influx_params["influx_port"],
|
||||
influx_org=self.influx_params["influx_org"],
|
||||
@@ -1650,7 +1685,7 @@ class lf_tests(lf_libs):
|
||||
self.attach_report_graphs(report_name=report_name)
|
||||
result = self.read_kpi_file(column_name=["pass/fail"], dir_name=report_name)
|
||||
allure.attach.file(source="../reports/" + report_name + "/kpi.csv",
|
||||
name=f"{test}_CSV", attachment_type=allure.attachment_type.CSV)
|
||||
name=f"{test}_test_report.csv", attachment_type=allure.attachment_type.CSV)
|
||||
if result[0][0] == "PASS":
|
||||
return True, "Test Passed"
|
||||
else:
|
||||
@@ -1679,7 +1714,7 @@ class lf_tests(lf_libs):
|
||||
"of data_rates 40% of throughput_1 and 40% of throughput_3 as throughput_6, station_1 + station_3"
|
||||
"of data_rates 40% of throughput_1 and 40% of throughput_4 as throughput_7")
|
||||
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
|
||||
sta = list(map(lambda i: f"sta000{i}",range(3)))
|
||||
sta = list(map(lambda i: f"sta000{i}", range(3)))
|
||||
all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
|
||||
logging.info("All 5g radios" + str(all_radio_5g))
|
||||
all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
|
||||
@@ -1705,18 +1740,21 @@ class lf_tests(lf_libs):
|
||||
if i == 2:
|
||||
# mode = 2/1 will create legacy client
|
||||
create_sta = self.client_connect_using_radio(ssid=ssid, passkey=passkey, security=security,
|
||||
radio=radios[i], station_name=[sta[i]], sta_mode=sta_mode[0])
|
||||
radio=radios[i], station_name=[sta[i]],
|
||||
sta_mode=sta_mode[0])
|
||||
else:
|
||||
# mode = 11/9 will create bgn-AC/an-AC client
|
||||
create_sta = self.client_connect_using_radio(ssid=ssid, passkey=passkey, security=security,
|
||||
radio=radios[i], station_name=[sta[i]], sta_mode=sta_mode[1])
|
||||
radio=radios[i], station_name=[sta[i]],
|
||||
sta_mode=sta_mode[1])
|
||||
if create_sta == False:
|
||||
logging.info(f"Test failed due to no IP for {sta[i]}")
|
||||
assert False, f"Test failed due to no IP for {sta[i]}"
|
||||
else:
|
||||
lf_sta = list(create_sta.station_map().keys())
|
||||
|
||||
def wifi_cap(sta=None, down=None, up=0, proto=None, thrpt_key=None, wifi_cap=False, atn=None, l3_trf=False):
|
||||
def wifi_cap(sta=None, down=None, up=0, proto=None, thrpt_key=None, wifi_cap=False, atn=None,
|
||||
l3_trf=False):
|
||||
if atn:
|
||||
for i in range(2):
|
||||
self.attenuator_modify(int(atten_serial_split[2]), i, int(atn))
|
||||
@@ -1730,7 +1768,8 @@ class lf_tests(lf_libs):
|
||||
# self.attach_report_graphs(report_name=report_name)
|
||||
entries = os.listdir("../reports/" + report_name + '/')
|
||||
if "kpi.csv" in entries:
|
||||
thrpt[thrpt_key] = self.read_kpi_file(column_name=["numeric-score"], dir_name=report_name)[0][0]
|
||||
thrpt[thrpt_key] = \
|
||||
self.read_kpi_file(column_name=["numeric-score"], dir_name=report_name)[0][0]
|
||||
if l3_trf:
|
||||
self.client_disconnect(clean_l3_traffic=True)
|
||||
for i in sta[0:1]:
|
||||
@@ -1744,11 +1783,13 @@ class lf_tests(lf_libs):
|
||||
self.create_layer3(sta_list=sta[1:2], traffic_type=proto, side_a_min_rate=0,
|
||||
side_b_min_rate=int(down[1]), start_cx=False)
|
||||
created_cx = {}
|
||||
cx_list = [created_cx.setdefault(i, "Endpoints") for i in self.get_cx_list() if i not in created_cx]
|
||||
cx_list = [created_cx.setdefault(i, "Endpoints") for i in self.get_cx_list() if
|
||||
i not in created_cx]
|
||||
self.start_cx_list(created_cx=created_cx, check_run_status=True)
|
||||
thrpt[thrpt_key] = self.monitor(duration_sec=int(60) + 10, monitor_interval=1, created_cx=created_cx.keys(),
|
||||
col_names=['bps rx a', 'bps rx b'], iterations=0, side_a_min_rate=0,
|
||||
side_b_min_rate=down)[0]
|
||||
thrpt[thrpt_key] = \
|
||||
self.monitor(duration_sec=int(60) + 10, monitor_interval=1, created_cx=created_cx.keys(),
|
||||
col_names=['bps rx a', 'bps rx b'], iterations=0, side_a_min_rate=0,
|
||||
side_b_min_rate=down)[0]
|
||||
|
||||
# station_0 TCP down throughtput
|
||||
wifi_cap(down="1Gbps", sta=f"{lf_sta[0]}", up="0Gbps", proto="TCP-IPv4", thrpt_key=f"{no_of_iter[0]}",
|
||||
@@ -1766,10 +1807,12 @@ class lf_tests(lf_libs):
|
||||
wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta1_tcp_dl"] * 0.01) * 4E7], sta=sta[0:2],
|
||||
up="0Gbps", thrpt_key=f"{no_of_iter[4]}", l3_trf=True, proto="lf_udp")
|
||||
# UDP traffic for station_0 of data-rate 40% of sta0_data_rate and medium distance station_1 of data-rate 40% of sta1_data_rate
|
||||
wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta1_tcp_dl_atn"] * 0.01) * 4E7], sta=sta[0:2],
|
||||
wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta1_tcp_dl_atn"] * 0.01) * 4E7],
|
||||
sta=sta[0:2],
|
||||
up="0Gbps", thrpt_key=f"{no_of_iter[5]}", l3_trf=True, atn=atn, proto="lf_udp")
|
||||
# UDP traffic for station_0 of data-rate 40% of sta0_data_rate and station_2 of data-rate 40% of sta2_data_rate
|
||||
wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta2_tcp_dl"] * 0.01) * 4E7], sta=sta[0:3:2],
|
||||
wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta2_tcp_dl"] * 0.01) * 4E7],
|
||||
sta=sta[0:3:2],
|
||||
up="0Gbps", thrpt_key=f"{no_of_iter[6]}", l3_trf=True, atn=100, proto="lf_udp")
|
||||
logging.info("Throughput values: \n", thrpt)
|
||||
self.allure_report_table_format(dict_data=thrpt, key="Station combination", value="Throughput values",
|
||||
@@ -1813,22 +1856,22 @@ class lf_tests(lf_libs):
|
||||
logging.info("Upstream data: " + str(upstream_port))
|
||||
|
||||
rvr_obj = rvr_test(lf_host=self.manager_ip,
|
||||
lf_port=self.manager_http_port,
|
||||
ssh_port=self.manager_ssh_port,
|
||||
lf_user="lanforge",
|
||||
local_lf_report_dir="../reports/",
|
||||
lf_password="lanforge",
|
||||
instance_name=instance_name,
|
||||
config_name="rvr_config",
|
||||
upstream=upstream_port,
|
||||
pull_report=True,
|
||||
load_old_cfg=False,
|
||||
upload_speed=upload_rate,
|
||||
download_speed=download_rate,
|
||||
duration=duration,
|
||||
station=station_name,
|
||||
dut=dut_name,
|
||||
raw_lines=raw_lines)
|
||||
lf_port=self.manager_http_port,
|
||||
ssh_port=self.manager_ssh_port,
|
||||
lf_user="lanforge",
|
||||
local_lf_report_dir="../reports/",
|
||||
lf_password="lanforge",
|
||||
instance_name=instance_name,
|
||||
config_name="rvr_config",
|
||||
upstream=upstream_port,
|
||||
pull_report=True,
|
||||
load_old_cfg=False,
|
||||
upload_speed=upload_rate,
|
||||
download_speed=download_rate,
|
||||
duration=duration,
|
||||
station=station_name,
|
||||
dut=dut_name,
|
||||
raw_lines=raw_lines)
|
||||
rvr_obj.run()
|
||||
if move_to_influx:
|
||||
try:
|
||||
@@ -1844,18 +1887,20 @@ class lf_tests(lf_libs):
|
||||
except Exception as e:
|
||||
print(e)
|
||||
pass
|
||||
#fetch the report
|
||||
# fetch the report
|
||||
report_name = rvr_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
||||
time.sleep(10)
|
||||
logging.info("report_name: " + str(report_name))
|
||||
self.attach_report_graphs(report_name=report_name,pdf_name= "Rate vs Range Test PDF Report")
|
||||
self.attach_report_graphs(report_name=report_name, pdf_name="Rate vs Range Test PDF Report")
|
||||
self.attach_report_kpi(report_name=report_name)
|
||||
|
||||
return rvr_obj, report_name
|
||||
|
||||
def dual_band_performance_test(self, ssid_5G="[BLANK]", ssid_2G="[BLANK]", mode="BRIDGE", vlan_id=100, dut_name="TIP",
|
||||
instance_name="test_demo", dut_5g="", dut_2g="", influx_tags="", move_to_influx=False,
|
||||
create_vlan=True, dut_data={}):
|
||||
def dual_band_performance_test(self, ssid_5G="[BLANK]", ssid_2G="[BLANK]", mode="BRIDGE", vlan_id=100,
|
||||
dut_name="TIP",
|
||||
instance_name="test_demo", dut_5g="", dut_2g="", influx_tags="",
|
||||
move_to_influx=False,
|
||||
create_vlan=True, dut_data={}):
|
||||
try:
|
||||
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
|
||||
|
||||
@@ -1892,18 +1937,21 @@ class lf_tests(lf_libs):
|
||||
radio5=[self.wave2_5g_radios],
|
||||
raw_lines=[['modes', 'AUTO']],
|
||||
# test_tag=influx_tags,
|
||||
sets=[['Basic Client Connectivity', '0'], ['Multi Band Performance', '1'],
|
||||
sets=[['Basic Client Connectivity', '0'],
|
||||
['Multi Band Performance', '1'],
|
||||
['Throughput vs Pkt Size', '0'], ['Capacity', '0'],
|
||||
['Skip 2.4Ghz Tests', '1'],
|
||||
['Skip 5Ghz Tests', '1'],
|
||||
['Stability', '0'],
|
||||
['Band-Steering', '0'], ['Multi-Station Throughput vs Pkt Size', '0'],
|
||||
['Band-Steering', '0'],
|
||||
['Multi-Station Throughput vs Pkt Size', '0'],
|
||||
['Long-Term', '0']]
|
||||
)
|
||||
self.dualbandptest_obj.setup()
|
||||
self.dualbandptest_obj.run()
|
||||
if move_to_influx:
|
||||
report_name = "../reports/" + self.dualbandptest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
||||
report_name = "../reports/" + \
|
||||
self.dualbandptest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
||||
try:
|
||||
influx = CSVtoInflux(influx_host=self.influx_params["influx_host"],
|
||||
influx_port=self.influx_params["influx_port"],
|
||||
@@ -1931,12 +1979,12 @@ class lf_tests(lf_libs):
|
||||
return False, f"{e}"
|
||||
return self.dualbandptest_obj
|
||||
|
||||
|
||||
def multi_station_performance(self, ssid_name=None, security_key=None, mode="BRIDGE", vlan=1, band="twog", antenna=1,
|
||||
instance_name="", set_att_db="10db",download_rate="0Gbps",upload_rate="1Gbps",
|
||||
batch_size="",protocol="UDP-IPv4",duration="120000",expected_throughput=35,
|
||||
traffic_type="udp_upload",sniff_radio=False, create_vlan=True,dut_data=None):
|
||||
global station_name,radio_prefix, set_value, set_value1, type
|
||||
def multi_station_performance(self, ssid_name=None, security_key=None, mode="BRIDGE", vlan=1, band="twog",
|
||||
antenna=1,
|
||||
instance_name="", set_att_db="10db", download_rate="0Gbps", upload_rate="1Gbps",
|
||||
batch_size="", protocol="UDP-IPv4", duration="120000", expected_throughput=35,
|
||||
traffic_type="udp_upload", sniff_radio=False, create_vlan=True, dut_data=None):
|
||||
global station_name, radio_prefix, set_value, set_value1, type
|
||||
self.chamber_view()
|
||||
self.client_disconnect(clean_l3_traffic=True)
|
||||
batch_size = batch_size
|
||||
@@ -1971,7 +2019,7 @@ class lf_tests(lf_libs):
|
||||
data = {"shelf": shelf, "resource": resource, "radio": values[2], "antenna": antenna}
|
||||
self.json_post(_req_url="cli-json/set_wifi_radio", data=data)
|
||||
sta_ip = self.client_connect_using_radio(ssid=ssid_name, passkey=security_key, mode=mode, band=band,
|
||||
radio=radio_name,station_name=sta, vlan_id=[vlan],
|
||||
radio=radio_name, station_name=sta, vlan_id=[vlan],
|
||||
dut_data=dut_data, sniff_radio=sniff_radio)
|
||||
if not sta_ip:
|
||||
logging.info("Test Failed, due to station has no ip")
|
||||
@@ -2004,10 +2052,11 @@ class lf_tests(lf_libs):
|
||||
data = {"shelf": shelf, "resource": resource, "radio": values[2], "antenna": antenna}
|
||||
self.json_post(_req_url="cli-json/set_wifi_radio", data=data)
|
||||
time.sleep(0.5)
|
||||
sta_ip = self.client_connect_using_radio(ssid=ssid_name,passkey=security_key,mode=mode, band=band,
|
||||
sta_ip = self.client_connect_using_radio(ssid=ssid_name, passkey=security_key, mode=mode, band=band,
|
||||
radio=radio_name, station_name=sta[i], vlan_id=[vlan],
|
||||
dut_data=dut_data, sniff_radio=sniff_radio, create_vlan=create_vlan)
|
||||
create_vlan=False
|
||||
dut_data=dut_data, sniff_radio=sniff_radio,
|
||||
create_vlan=create_vlan)
|
||||
create_vlan = False
|
||||
if not sta_ip:
|
||||
logging.info("Test Failed, due to station has no ip")
|
||||
return False, "TEST FAILED, due to station has no ip"
|
||||
@@ -2024,7 +2073,7 @@ class lf_tests(lf_libs):
|
||||
if "25db" in set_att_db:
|
||||
set_value = 250
|
||||
elif "38db" in set_att_db:
|
||||
set_value =380
|
||||
set_value = 380
|
||||
print(set_value)
|
||||
for i in range(2):
|
||||
self.attenuator_modify(int(atten_sr2[2]), i, set_value)
|
||||
@@ -2048,14 +2097,14 @@ class lf_tests(lf_libs):
|
||||
time.sleep(0.5)
|
||||
# wifi_capacity test
|
||||
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=[vlan],
|
||||
download_rate=download_rate, batch_size=batch_size,
|
||||
upload_rate=upload_rate, protocol=protocol, duration=duration,
|
||||
sort="linear",create_vlan=False)
|
||||
download_rate=download_rate, batch_size=batch_size,
|
||||
upload_rate=upload_rate, protocol=protocol, duration=duration,
|
||||
sort="linear", create_vlan=False)
|
||||
report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
||||
csv_val = self.read_csv_individual_station_throughput(dir_name=report_name, option=None,
|
||||
individual_station_throughput=False,
|
||||
kpi_csv=True,
|
||||
file_name="/kpi.csv", batch_size=batch_size)
|
||||
individual_station_throughput=False,
|
||||
kpi_csv=True,
|
||||
file_name="/kpi.csv", batch_size=batch_size)
|
||||
print(csv_val)
|
||||
# considering the 70% from the expected throughput
|
||||
pass_value = (expected_throughput * 0.7)
|
||||
@@ -2067,9 +2116,9 @@ class lf_tests(lf_libs):
|
||||
return False, "CSV file does not exist, Test failed"
|
||||
else:
|
||||
if traffic_type == "udp_upload":
|
||||
type="Up"
|
||||
type = "Up"
|
||||
elif traffic_type == "udp_download":
|
||||
type="Down"
|
||||
type = "Down"
|
||||
print("Traffic type", type)
|
||||
if list(csv_val[type].values())[-1] >= pass_value:
|
||||
allure.attach(name="Csv Data", body="Throughput value : " + str(list(csv_val[type].values())[-1]))
|
||||
@@ -2078,7 +2127,8 @@ class lf_tests(lf_libs):
|
||||
else:
|
||||
allure.attach(name="Csv Data", body="Throughput value : " + str(list(csv_val[type].values())[-1]))
|
||||
logging.info("TEST FAILED, Actual throughput is lesser than Expected.")
|
||||
return False, "TEST FAILED, Actual throughput (%sMbps) is lesser than Expected (%sMbps)" % (str(list(csv_val[type].values())[-1]),str(pass_value))
|
||||
return False, "TEST FAILED, Actual throughput (%sMbps) is lesser than Expected (%sMbps)" % (
|
||||
str(list(csv_val[type].values())[-1]), str(pass_value))
|
||||
|
||||
def spatial_consistency(self, ssid_name=None, security_key=None, security="wpa2", mode="BRIDGE", band="twog",
|
||||
vlan=1, dut_data=None, num_sta=1, download_rate="100%", upload_rate="0", spatial_streams=1,
|
||||
@@ -2100,9 +2150,12 @@ class lf_tests(lf_libs):
|
||||
['tt_deg: 0..+60..300']]
|
||||
if station:
|
||||
# rvr test
|
||||
rvr_o, report_name = self.rate_vs_range_test(station_name=sta_name[0], mode=mode, download_rate=download_rate,
|
||||
upload_rate=upload_rate, instance_name=instance_name, duration="60000",
|
||||
vlan_id=[vlan], dut_name=dut_name, raw_lines=val,create_vlan=create_vlan)
|
||||
rvr_o, report_name = self.rate_vs_range_test(station_name=sta_name[0], mode=mode,
|
||||
download_rate=download_rate,
|
||||
upload_rate=upload_rate, instance_name=instance_name,
|
||||
duration="60000",
|
||||
vlan_id=[vlan], dut_name=dut_name, raw_lines=val,
|
||||
create_vlan=create_vlan)
|
||||
entries = os.listdir("../reports/" + report_name + '/')
|
||||
print("entries", entries)
|
||||
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
|
||||
@@ -2165,8 +2218,8 @@ class lf_tests(lf_libs):
|
||||
['chamber: 0'], ['tt_deg: 0']]
|
||||
if station:
|
||||
# rvr test
|
||||
rvr_o, report_name = self.rate_vs_range_test(station_name=sta_name[0], mode=mode,download_rate="100%",
|
||||
duration='30000',instance_name=instance_name, vlan_id=[vlan],
|
||||
rvr_o, report_name = self.rate_vs_range_test(station_name=sta_name[0], mode=mode, download_rate="100%",
|
||||
duration='30000', instance_name=instance_name, vlan_id=[vlan],
|
||||
dut_name=dut_name, raw_lines=val, create_vlan=create_vlan)
|
||||
entries = os.listdir("../reports/" + report_name + '/')
|
||||
print("entries", entries)
|
||||
@@ -2216,9 +2269,10 @@ class lf_tests(lf_libs):
|
||||
return False, "TEST FAILED, due to station has no ip"
|
||||
|
||||
def client_isolation(self, ssid1=None, ssid2=None, passkey=None, security=None, mode="BRIDGE", band_2g=None,
|
||||
band_5g=None, dut_data=None, num_sta=None, side_a_min_rate=None, side_a_max_rate=None, side_b_min_rate=None,
|
||||
band_5g=None, dut_data=None, num_sta=None, side_a_min_rate=None, side_a_max_rate=None,
|
||||
side_b_min_rate=None,
|
||||
side_b_max_rate=None, sniff_radio=True):
|
||||
band = radio_name = station_name = radio_names = station_name1= station_name2 = station_result = layer3_result = None
|
||||
band = radio_name = station_name = radio_names = station_name1 = station_name2 = station_result = layer3_result = None
|
||||
if band_5g is None and band_2g is not None:
|
||||
band = "twog"
|
||||
radio_name = self.wave2_2g_radios[0]
|
||||
@@ -2243,37 +2297,38 @@ class lf_tests(lf_libs):
|
||||
|
||||
sta = []
|
||||
ssids = [ssid1, ssid2]
|
||||
if num_sta>1:
|
||||
if num_sta > 1:
|
||||
if band_2g and band_5g is not None:
|
||||
for i in range(1):
|
||||
sta.append(station_name1 + str(i))
|
||||
sta.append(station_name2 + str(i))
|
||||
for i in range(num_sta):
|
||||
station_result = self.client_connect_using_radio(ssid=ssids[i], passkey=passkey,
|
||||
security=security, mode=mode,
|
||||
band=band[i], radio=radio_names[i],
|
||||
station_name=[sta[i]],
|
||||
dut_data=dut_data,
|
||||
sniff_radio=True)
|
||||
security=security, mode=mode,
|
||||
band=band[i], radio=radio_names[i],
|
||||
station_name=[sta[i]],
|
||||
dut_data=dut_data,
|
||||
sniff_radio=True)
|
||||
layer3_result = self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate,
|
||||
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
|
||||
traffic_type="lf_udp", sta_list=[sta[0]], side_b=sta[1])
|
||||
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
|
||||
traffic_type="lf_udp", sta_list=[sta[0]], side_b=sta[1])
|
||||
else:
|
||||
for i in range(2):
|
||||
sta.append(station_name + str(i))
|
||||
station_result = self.client_connect_using_radio(ssid=ssids[i], passkey=passkey, band=band,security=security,
|
||||
mode=mode, radio=radio_name,station_name=[sta[i]],
|
||||
dut_data=dut_data, sniff_radio=sniff_radio)
|
||||
station_result = self.client_connect_using_radio(ssid=ssids[i], passkey=passkey, band=band,
|
||||
security=security,
|
||||
mode=mode, radio=radio_name, station_name=[sta[i]],
|
||||
dut_data=dut_data, sniff_radio=sniff_radio)
|
||||
layer3_result = self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate,
|
||||
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
|
||||
traffic_type="lf_udp", sta_list=sta, side_b=sta[1])
|
||||
elif num_sta==1:
|
||||
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
|
||||
traffic_type="lf_udp", sta_list=sta, side_b=sta[1])
|
||||
elif num_sta == 1:
|
||||
station_result = self.client_connect_using_radio(ssid=ssid1, passkey=passkey, band=band, security=security,
|
||||
mode=mode, radio=radio_name, station_name=[station_name],
|
||||
dut_data=dut_data, sniff_radio=sniff_radio)
|
||||
layer3_result = self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate,
|
||||
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
|
||||
traffic_type="lf_udp", sta_list=[station_name], side_b="")
|
||||
side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate,
|
||||
traffic_type="lf_udp", sta_list=[station_name], side_b="")
|
||||
logging.info("waiting for 20 seconds")
|
||||
time.sleep(20)
|
||||
cx_list = self.get_cx_list()
|
||||
@@ -2305,17 +2360,19 @@ class lf_tests(lf_libs):
|
||||
if layer3_result is None:
|
||||
logging.info("Layer3 traffic ran.")
|
||||
if ssid1 == ssid2:
|
||||
if ((bps_rx_a == 0 and bps_rx_b == 0) and (rx_drop_a == 100 and rx_drop_b == 100)) or ((bps_rx_a != 0 and bps_rx_b != 0) and (rx_drop_a != 100 and rx_drop_b != 100)):
|
||||
if ((bps_rx_a == 0 and bps_rx_b == 0) and (rx_drop_a == 100 and rx_drop_b == 100)) or (
|
||||
(bps_rx_a != 0 and bps_rx_b != 0) and (rx_drop_a != 100 and rx_drop_b != 100)):
|
||||
allure.attach(name="Test Result", body="TEST PASSED" + "\n\n" + str(table))
|
||||
return True, "TEST PASS"
|
||||
else:
|
||||
allure.attach(name="Test Result",
|
||||
body="TEST FAILED, Stations should not ping each other, when isolation enabled or"
|
||||
"rx-drop should not be 100% when isolation disabled in same ssid" + "\n\n" + str(table))
|
||||
"rx-drop should not be 100% when isolation disabled in same ssid" + "\n\n" + str(
|
||||
table))
|
||||
return False, "TEST FAILED, Stations should not ping each other, when isolation enabled or " \
|
||||
"rx-drop should not be 100% when isolation disabled in same ssid"
|
||||
elif band_2g and band_5g is not None:
|
||||
if (bps_rx_a == 0 and bps_rx_b != 0) or (bps_rx_a !=0 and bps_rx_b == 0):
|
||||
if (bps_rx_a == 0 and bps_rx_b != 0) or (bps_rx_a != 0 and bps_rx_b == 0):
|
||||
allure.attach(name="Test Result", body="TEST PASSED" + "\n\n" + str(table))
|
||||
return True, "TEST PASS"
|
||||
else:
|
||||
@@ -2323,7 +2380,8 @@ class lf_tests(lf_libs):
|
||||
return False, "TEST FAILED, Traffic not ran properly"
|
||||
else:
|
||||
if rx_drop_a and rx_drop_b == 100 or 0:
|
||||
allure.attach(name="Test Result", body="TEST FAILED, Rx drop should not be 100% or 0%" + "\n\n" + str(table))
|
||||
allure.attach(name="Test Result",
|
||||
body="TEST FAILED, Rx drop should not be 100% or 0%" + "\n\n" + str(table))
|
||||
return False, "TEST FAILED, Rx drop should not be 100% or 0%"
|
||||
else:
|
||||
allure.attach(name="Test Result", body="TEST PASSED" + "\n\n" + str(table))
|
||||
@@ -2331,6 +2389,7 @@ class lf_tests(lf_libs):
|
||||
else:
|
||||
logging.info("Layer3 not ran properly.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
basic = {
|
||||
"target": "tip_2x",
|
||||
|
||||
Reference in New Issue
Block a user