Getting pass/fail from kpi.csv dual_band_performance_test

Signed-off-by: karthikaeyetea <karthika.subramani@candelatech.com>
This commit is contained in:
karthikaeyetea
2023-01-30 16:29:57 +05:30
parent 6f8df7be01
commit a43a7703b0

View File

@@ -1804,66 +1804,80 @@ class lf_tests(lf_libs):
def dualbandperformancetest(self, ssid_5G="[BLANK]", ssid_2G="[BLANK]", mode="BRIDGE", vlan_id=100, dut_name="TIP",
instance_name="test_demo", dut_5g="", dut_2g="", influx_tags="", move_to_influx=False,
create_vlan=True, dut_data={}):
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
try:
instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
if mode == "BRIDGE" or mode == "NAT-WAN":
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0]
if mode == "VLAN":
if vlan_id is None:
logging.error("VLAN ID is Unspecified in the VLAN Case")
pytest.skip("VLAN ID is Unspecified in the VLAN Case")
if mode == "BRIDGE" or mode == "NAT-WAN":
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0]
if mode == "VLAN":
if vlan_id is None:
logging.error("VLAN ID is Unspecified in the VLAN Case")
pytest.skip("VLAN ID is Unspecified in the VLAN Case")
else:
if create_vlan:
vlan_raw_lines = self.add_vlan(vlan_ids=vlan_id, build=True)
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0] + "." + str(vlan_id[0])
logging.info("Upstream data: " + str(upstream_port))
self.update_dut_ssid(dut_data=dut_data)
self.dualbandptest_obj = ApAutoTest(lf_host=self.manager_ip,
lf_port=self.manager_http_port,
lf_user="lanforge",
lf_password="lanforge",
ssh_port=self.manager_ssh_port,
instance_name=instance_name,
config_name="dbp_config",
upstream=upstream_port,
pull_report=True,
dut5_0=dut_5g,
dut2_0=dut_2g,
load_old_cfg=False,
local_lf_report_dir=self.local_report_path,
max_stations_2=64,
max_stations_5=64,
max_stations_dual=124,
radio2=[self.wave2_2g_radios],
radio5=[self.wave2_5g_radios],
raw_lines=[['modes', 'AUTO']],
# test_tag=influx_tags,
sets=[['Basic Client Connectivity', '0'], ['Multi Band Performance', '1'],
['Throughput vs Pkt Size', '0'], ['Capacity', '0'],
['Skip 2.4Ghz Tests', '1'],
['Skip 5Ghz Tests', '1'],
['Stability', '0'],
['Band-Steering', '0'], ['Multi-Station Throughput vs Pkt Size', '0'],
['Long-Term', '0']]
)
self.dualbandptest_obj.setup()
self.dualbandptest_obj.run()
if move_to_influx:
report_name = "../reports/" + self.dualbandptest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
try:
influx = CSVtoInflux(influx_host=self.influx_params["influx_host"],
influx_port=self.influx_params["influx_port"],
influx_org=self.influx_params["influx_org"],
influx_token=self.influx_params["influx_token"],
influx_bucket=self.influx_params["influx_bucket"],
path=report_name)
influx.glob()
except Exception as e:
print(e)
pass
report_name = self.dualbandptest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
self.attach_report_graphs(report_name=report_name, pdf_name="Dual Band Performance Test")
result = self.read_kpi_file(column_name=["pass/fail"], dir_name=report_name)
allure.attach.file(source="../reports/" + report_name + "/kpi.csv",
name=f"dual_band_CSV", attachment_type=allure.attachment_type.CSV)
if result[0][0] == "PASS":
return True, "Test Passed"
else:
if create_vlan:
vlan_raw_lines = self.add_vlan(vlan_ids=vlan_id, build=True)
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0] + "." + str(vlan_id[0])
logging.info("Upstream data: " + str(upstream_port))
return False, "Test Failed"
self.update_dut_ssid(dut_data=dut_data)
self.dualbandptest_obj = ApAutoTest(lf_host=self.manager_ip,
lf_port=self.manager_http_port,
lf_user="lanforge",
lf_password="lanforge",
ssh_port=self.manager_ssh_port,
instance_name=instance_name,
config_name="dbp_config",
upstream=upstream_port,
pull_report=True,
dut5_0=dut_5g,
dut2_0=dut_2g,
load_old_cfg=False,
local_lf_report_dir=self.local_report_path,
max_stations_2=64,
max_stations_5=64,
max_stations_dual=124,
radio2=[self.wave2_2g_radios],
radio5=[self.wave2_5g_radios],
raw_lines=[['modes', 'AUTO']],
# test_tag=influx_tags,
sets=[['Basic Client Connectivity', '0'], ['Multi Band Performance', '1'],
['Throughput vs Pkt Size', '0'], ['Capacity', '0'],
['Skip 2.4Ghz Tests', '1'],
['Skip 5Ghz Tests', '1'],
['Stability', '0'],
['Band-Steering', '0'], ['Multi-Station Throughput vs Pkt Size', '0'],
['Long-Term', '0']]
)
self.dualbandptest_obj.setup()
self.dualbandptest_obj.run()
if move_to_influx:
report_name = "../reports/" + self.dualbandptest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
try:
influx = CSVtoInflux(influx_host=self.influx_params["influx_host"],
influx_port=self.influx_params["influx_port"],
influx_org=self.influx_params["influx_org"],
influx_token=self.influx_params["influx_token"],
influx_bucket=self.influx_params["influx_bucket"],
path=report_name)
except Exception as e:
logging.error(f"{e}")
return False, f"{e}"
influx.glob()
except Exception as e:
print(e)
pass
return self.dualbandptest_obj
def multi_station_performance(self, ssid_name=None, security_key=None, mode="BRIDGE", vlan=1, band="twog", antenna=1,
instance_name="", set_att_db="10db",download_rate="0Gbps",upload_rate="1Gbps",