diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index b9922a31..d421f8e2 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -222,7 +222,8 @@ class lf_tests(lf_libs): time.sleep(runtime_secs) pass_fail_result = [] for obj in sta_connect_obj: - sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal", "mac", "mode"] + sta_rows = ["4way time (us)", "channel", "ssid", "key/phrase", "cx time (us)", "dhcp (ms)", "ip", "signal", + "mac", "mode"] station_data = self.get_station_data(sta_name=obj.station_names, rows=sta_rows, allure_attach=False) sta_table_dict = {} @@ -478,7 +479,7 @@ class lf_tests(lf_libs): time.sleep(runtime_secs) pass_fail_result = [] for obj in eap_connect_objs: - sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal", "mac", "mode"] + sta_rows = ["4way time (us)", "channel", "ssid", "cx time (us)", "dhcp (ms)", "ip", "signal", "mac", "mode"] self.station_data = self.get_station_data(sta_name=obj.sta_list, rows=sta_rows, allure_attach=False) sta_table_dict = {} @@ -745,12 +746,14 @@ class lf_tests(lf_libs): pytest.skip("Skipping This Test") client_connect_obj = [] station_data_all = {} + start_sniffer = False for radio in data[identifier]["station_data"]: if band == "twog": if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \ dict(dut_data.get(identifier)["radio_data"])["2G"] is not None: sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"] if data[identifier]["sniff_radio_2g"] is not None and sniffer_channel is not None: + start_sniffer = True self.start_sniffer(radio_channel=sniffer_channel, test_name=f'{data[identifier]["station_data"][radio][0]}', radio=data[identifier]["sniff_radio_2g"], @@ -761,6 +764,7 @@ class lf_tests(lf_libs): dict(dut_data.get(identifier)["radio_data"])["5G"] is not None: sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"] if data[identifier]["sniff_radio_5g"] is not None and sniffer_channel is not None: + start_sniffer = True self.start_sniffer(radio_channel=sniffer_channel, radio=data[identifier]["sniff_radio_5g"], duration=120) @@ -772,6 +776,7 @@ class lf_tests(lf_libs): int(dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"])) logging.info("LF sixg channel: " + str(sniffer_channel)) if data[identifier]["sniff_radio_6g"] is not None and sniffer_channel is not None: + start_sniffer = True self.start_sniffer(radio_channel=sniffer_channel, radio=data[identifier]["sniff_radio_6g"], duration=120) @@ -823,7 +828,7 @@ class lf_tests(lf_libs): # for item in i: # if i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] == '0.0.0.0': # logging.info('VLAN do not have IP') - if self.start_sniffer: + if start_sniffer: self.stop_sniffer() logging.info("pass_fail result: " + str(pass_fail)) @@ -1481,6 +1486,245 @@ class lf_tests(lf_libs): logging.error(f"{e}") return False + def tr398v2(self, mode="BRIDGE", + vlan_id=1, skip_2g=False, skip_5g=False, test=None, + move_to_influx=False, + dut_data={}, + create_vlan=True, testbed=None): + current_directory = os.getcwd() + file_path = current_directory + "/e2e/advanced/advanced-config.json" + with open(file_path, 'r') as file: + json_string = file.read() + all_config_data = json.loads(json_string) + logging.info("Advanced testbeds config data:- " + str(all_config_data)) + # validate config json data + try: + json_object = json.dumps(all_config_data) + except ValueError as e: + logging.info("Advanced config data is invalid") + pytest.fail("Advanced config data is invalid") + testbed_ = testbed[:-1] + testbed_config_data = all_config_data["TESTBEDS"][testbed_] + logging.info(str(testbed_) + " Testbed config data:- " + str(testbed_config_data)) + self.client_disconnect(clean_l3_traffic=True) + if type(test) == str: + test = test.split(",") + # DUT Name + dut_name = list(dut_data.keys())[0] + logging.info("DUT name:- " + str(dut_name)) + """ 2G and 5G channel """ + channel_2g = dut_data[dut_name]["radio_data"]["2G"]["channel"] + channel_5g = dut_data[dut_name]["radio_data"]["5G"]["channel"] + logging.info("2g_channel:- " + str(channel_2g)) + logging.info("5g_channel:- " + str(channel_5g)) + logging.info("DUT data:- " + str(dut_data)) + virtual_sta_radios = {} + virtual_sta_rssi_0_2 = {} + virtual_sta_rssi_0_5 = {} + virtual_sta_atten = {} + ax_radios = {} + ax_rssi_0_2 = {} + ax_rssi_0_5 = {} + ax_atten = {} + raw_line = [] + k = 0 + """ Logic for virtual sta radios """ + # find out virtual sta radios and make raw lines + config_data = testbed_config_data["Virtual Sta Radio Settings"] + for i in config_data: + for j in config_data[i]: + virtual_sta_radios["radio-" + str(k)] = config_data[i]["5Ghz"] + k = k + 1 + virtual_sta_radios["radio-" + str(k)] = config_data[i]["2.4Ghz"] + break + k = k + 1 + logging.info("virtual_sta_radios:- " + str(virtual_sta_radios)) + raw_line_list = [[f"{key}: {value}"] for key, value in virtual_sta_radios.items()] + raw_line.extend(raw_line_list) + # find out virtual sta virtual_sta_rssi_0_2, virtual_sta_rssi_0_5, virtual_sta_atten and make raw lines + c1 = 0 + c2 = 0 + c3 = 0 + config_data = testbed_config_data["Virtual Sta Radio Settings"] + for i in config_data: + for j in config_data[i]: + if j == "2.4Ghz RSSI 0 Atten": + for k in config_data[i]["2.4Ghz RSSI 0 Atten"]: + virtual_sta_rssi_0_2["rssi_0_2-" + str(c1)] = k + c1 = c1 + 1 + if j == "5Gh RSSI 0 Atten": + for l in config_data[i]["5Gh RSSI 0 Atten"]: + virtual_sta_rssi_0_5["rssi_0_5-" + str(c2)] = l + c2 = c2 + 1 + if j == "Attenuator Modules": + for m in config_data[i]["Attenuator Modules"]: + virtual_sta_atten["atten-" + str(c3)] = m + c3 = c3 + 1 + + logging.info("virtual_sta_rssi_0_2:- " + str(virtual_sta_rssi_0_2)) + logging.info("virtual_sta_rssi_0_5:- " + str(virtual_sta_rssi_0_5)) + logging.info("virtual_sta_atten:- " + str(virtual_sta_atten)) + raw_line_list = [[f"{key}: {value}"] for key, value in virtual_sta_rssi_0_2.items()] + raw_line.extend(raw_line_list) + raw_line_list = [[f"{key}: {value}"] for key, value in virtual_sta_rssi_0_5.items()] + raw_line.extend(raw_line_list) + raw_line_list = [[f"{key}: {value}"] for key, value in virtual_sta_atten.items()] + raw_line.extend(raw_line_list) + """ Logic for Ax radio setting """ + c1 = 0 + c2 = 0 + c3 = 0 + c4 = 0 + config_data = testbed_config_data["802.11AX Settings"] + for i in config_data: + for j in config_data[i]: + if j == "Radios": + for k in config_data[i]["Radios"]: + ax_radios["ax_radio-" + str(c1)] = k + c1 = c1 + 1 + if j == "2.4Ghz RSSI 0 Atten": + for l in config_data[i]["2.4Ghz RSSI 0 Atten"]: + ax_rssi_0_2["ax_rssi_0_2-" + str(c2)] = l + c2 = c2 + 1 + if j == "5Ghz RSSI 0 Atten": + for m in config_data[i]["5Ghz RSSI 0 Atten"]: + ax_rssi_0_5["ax_rssi_0_5-" + str(c3)] = m + c3 = c3 + 1 + if j == "Attenuator Modules": + for m in config_data[i]["Attenuator Modules"]: + if m != "": + ax_atten["ax_atten-" + str(c4)] = m + if c4 >= 12: + c4 = c4 + 2 + else: + c4 = c4 + 1 + + logging.info("ax_radios:- " + str(ax_radios)) + logging.info("ax_rssi_0_2:- " + str(ax_rssi_0_2)) + logging.info("ax_rssi_0_5:- " + str(ax_rssi_0_5)) + logging.info("ax_atten:- " + str(ax_atten)) + raw_line_list = [[f"{key}: {value}"] for key, value in ax_radios.items()] + raw_line.extend(raw_line_list) + raw_line_list = [[f"{key}: {value}"] for key, value in ax_rssi_0_2.items()] + raw_line.extend(raw_line_list) + raw_line_list = [[f"{key}: {value}"] for key, value in ax_rssi_0_5.items()] + raw_line.extend(raw_line_list) + raw_line_list = [[f"{key}: {value}"] for key, value in ax_atten.items()] + raw_line.extend(raw_line_list) + + # Fetch 2g_dut and 5g_dut + dut_2g = None + dut_5g = None + for i in dut_data[dut_name]['ssid_data']: + self.dut_idx_mapping[str(i)] = list(dut_data[dut_name]['ssid_data'][i].values()) + if self.dut_idx_mapping[str(i)][3] == "2G": + dut_2g = dut_name + ' ' + self.dut_idx_mapping[str(i)][0] + ' ' \ + '' + self.dut_idx_mapping[str(i)][ + 4].lower() + f' (1)' + if self.dut_idx_mapping[str(i)][3] == "5G": + dut_5g = dut_name + ' ' + self.dut_idx_mapping[str(i)][0] + ' ' \ + '' + \ + self.dut_idx_mapping[str(i)][4].lower() + f' (2)' + logging.info("dut_2g:- " + str(dut_2g)) + logging.info("dut_5g:- " + str(dut_5g)) + skip_twog, skip_fiveg = '1' if skip_2g else '0', '1' if skip_5g else '0' + if mode == "BRIDGE" or mode == "NAT-WAN": + upstream_port = list(self.lanforge_data['wan_ports'].keys())[0] + if mode == "VLAN": + if vlan_id is None: + logging.error("VLAN ID is Unspecified in the VLAN Case") + pytest.skip("VLAN ID is Unspecified in the VLAN Case") + else: + if create_vlan: + vlan_raw_lines = self.add_vlan(vlan_ids=vlan_id, build=True) + upstream_port = list(self.lanforge_data['wan_ports'].keys())[0] + "." + str(vlan_id[0]) + logging.info("Upstream data: " + str(upstream_port)) + skip_bandv2 = [['Skip 2.4Ghz Tests', f'{skip_twog}'], ['Skip 5Ghz Tests', f'{skip_fiveg}'], + ['2.4Ghz Channel', f'{channel_2g}'], ['5Ghz Channel', f'{channel_5g}'], ["use_virtual_ax_sta", "1"], + ["Use Issue-3 Behaviour", "0"], ["Skip 6Ghz Tests", "1"]] + enable_tests = [['rxsens: 0'], ['max_cx: 0'], ['max_tput: 0'], ['peak_perf: 0'], ['max_tput_bi: 0'], + ['dual_band_tput: 0'], ['multi_band_tput: 0'], ['atf: 0'], ['atf3: 0'], ['qos3: 0'], + ['lat3: 0'], ['mcast3: 0'], ['rvr: 0'], ['spatial: 0'], ['multi_sta: 0'], ['reset: 0'], + ['mu_mimo: 0'], ['stability: 0'], ['ap_coex: 0'], ['acs: 0']] + for t in test: + if [f"{t}: 0"] in enable_tests: + enable_tests[enable_tests.index([f"{t}: 0"])] = [f"{t}: 1"] + else: + logging.info(f"Unable to find the {t} test in selected run") + raise ValueError(f"Unable to find the {t} test in selected run") + raw_line.extend(enable_tests) + update_cv_dut = {} + try: + for i in dut_data: + update_cv_dut[i] = dict.fromkeys(dut_data[i], {}) + for j in dut_data[i]: + if j == 'ssid_data': + for k in dut_data[i][j]: + if (dut_data[i][j][k]['band'] == '5G' and dut_5g != "" + ) or (dut_data[i][j][k]['band'] == '2G' and dut_2g != ""): + update_cv_dut[i][j][k] = dut_data[i][j][k].copy() + else: + update_cv_dut[i][j] = dut_data[i][j].copy() + except Exception as e: + logging.error(f"{e}") + logging.info("update cv dut:- " + str(update_cv_dut)) + self.update_dut_ssid(dut_data=update_cv_dut) + instance_name = "tr398v2-instance-{}".format(str(random.randint(0, 100000))) + + # if not os.path.exists("tr398-test-config.txt"): + # with open("tr398v2-test-config.txt", "wt") as f: + # for i in raw_line: + # f.write(str(i[0]) + "\n") + # f.close() + logging.info("raw lines:- " + str(raw_line)) + cvtest_obj = TR398v2Test(lf_host=self.manager_ip, + lf_port=self.manager_http_port, + lf_user="lanforge", + lf_password="lanforge", + instance_name=instance_name, + upstream=upstream_port, + pull_report=True, + local_lf_report_dir=self.local_report_path, + load_old_cfg=False, + dut2=dut_2g, + dut5=dut_5g, + enables=[], + disables=[], + raw_lines=raw_line, + sets=skip_bandv2, + test_rig=dut_name) + # self.cvtest_obj.test_name, self.cvtest_obj.blob_text = "TR-398 Issue 2", "TR-398v2-" + # self.cvtest_obj.result = True + cvtest_obj.setup() + cvtest_obj.run() + # if os.path.exists("tr398v2-test-config.txt"): + # os.remove("tr398v2-test-config.txt") + if move_to_influx: + try: + report_name = "../reports/" + \ + cvtest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + influx = CSVtoInflux(influx_host=self.influx_params["influx_host"], + influx_port=self.influx_params["influx_port"], + influx_org=self.influx_params["influx_org"], + influx_token=self.influx_params["influx_token"], + influx_bucket=self.influx_params["influx_bucket"], + path=report_name) + + influx.glob() + except Exception as e: + print(e) + pass + report_name = cvtest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/" + time.sleep(10) + self.attach_report_graphs(report_name=report_name, pdf_name=str(test[0]) + " Test PDF Report") + result = self.read_kpi_file(column_name=["pass/fail"], dir_name=report_name) + self.attach_report_kpi(report_name=report_name) + self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True) + if result[0][0] == "PASS": + return True, "Test Passed" + else: + return False, "Test Failed" + def tr398(self, radios_2g=[], radios_5g=[], radios_ax=[], dut_name="TIP", dut_5g="", dut_2g="", mode="BRIDGE", vlan_id=1, skip_2g=True, skip_5g=False, instance_name="", test=None, move_to_influx=False, dut_data={}, ssid_name='', security_key='[BLANK]', security="open", sniff_packets=False, create_vlan=True,