Added tr398v2 method and added ssid data in station table and fixed sniffer issue in client_connect method

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2024-02-29 15:46:36 +05:30
parent 95aa07337c
commit aa6f2c4216

View File

@@ -222,7 +222,8 @@ class lf_tests(lf_libs):
time.sleep(runtime_secs)
pass_fail_result = []
for obj in sta_connect_obj:
sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal", "mac", "mode"]
sta_rows = ["4way time (us)", "channel", "ssid", "key/phrase", "cx time (us)", "dhcp (ms)", "ip", "signal",
"mac", "mode"]
station_data = self.get_station_data(sta_name=obj.station_names, rows=sta_rows,
allure_attach=False)
sta_table_dict = {}
@@ -478,7 +479,7 @@ class lf_tests(lf_libs):
time.sleep(runtime_secs)
pass_fail_result = []
for obj in eap_connect_objs:
sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal", "mac", "mode"]
sta_rows = ["4way time (us)", "channel", "ssid", "cx time (us)", "dhcp (ms)", "ip", "signal", "mac", "mode"]
self.station_data = self.get_station_data(sta_name=obj.sta_list, rows=sta_rows,
allure_attach=False)
sta_table_dict = {}
@@ -745,12 +746,14 @@ class lf_tests(lf_libs):
pytest.skip("Skipping This Test")
client_connect_obj = []
station_data_all = {}
start_sniffer = False
for radio in data[identifier]["station_data"]:
if band == "twog":
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"]
if data[identifier]["sniff_radio_2g"] is not None and sniffer_channel is not None:
start_sniffer = True
self.start_sniffer(radio_channel=sniffer_channel,
test_name=f'{data[identifier]["station_data"][radio][0]}',
radio=data[identifier]["sniff_radio_2g"],
@@ -761,6 +764,7 @@ class lf_tests(lf_libs):
dict(dut_data.get(identifier)["radio_data"])["5G"] is not None:
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"]
if data[identifier]["sniff_radio_5g"] is not None and sniffer_channel is not None:
start_sniffer = True
self.start_sniffer(radio_channel=sniffer_channel,
radio=data[identifier]["sniff_radio_5g"],
duration=120)
@@ -772,6 +776,7 @@ class lf_tests(lf_libs):
int(dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]))
logging.info("LF sixg channel: " + str(sniffer_channel))
if data[identifier]["sniff_radio_6g"] is not None and sniffer_channel is not None:
start_sniffer = True
self.start_sniffer(radio_channel=sniffer_channel,
radio=data[identifier]["sniff_radio_6g"],
duration=120)
@@ -823,7 +828,7 @@ class lf_tests(lf_libs):
# for item in i:
# if i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] == '0.0.0.0':
# logging.info('VLAN do not have IP')
if self.start_sniffer:
if start_sniffer:
self.stop_sniffer()
logging.info("pass_fail result: " + str(pass_fail))
@@ -1481,6 +1486,245 @@ class lf_tests(lf_libs):
logging.error(f"{e}")
return False
def tr398v2(self, mode="BRIDGE",
vlan_id=1, skip_2g=False, skip_5g=False, test=None,
move_to_influx=False,
dut_data={},
create_vlan=True, testbed=None):
current_directory = os.getcwd()
file_path = current_directory + "/e2e/advanced/advanced-config.json"
with open(file_path, 'r') as file:
json_string = file.read()
all_config_data = json.loads(json_string)
logging.info("Advanced testbeds config data:- " + str(all_config_data))
# validate config json data
try:
json_object = json.dumps(all_config_data)
except ValueError as e:
logging.info("Advanced config data is invalid")
pytest.fail("Advanced config data is invalid")
testbed_ = testbed[:-1]
testbed_config_data = all_config_data["TESTBEDS"][testbed_]
logging.info(str(testbed_) + " Testbed config data:- " + str(testbed_config_data))
self.client_disconnect(clean_l3_traffic=True)
if type(test) == str:
test = test.split(",")
# DUT Name
dut_name = list(dut_data.keys())[0]
logging.info("DUT name:- " + str(dut_name))
""" 2G and 5G channel """
channel_2g = dut_data[dut_name]["radio_data"]["2G"]["channel"]
channel_5g = dut_data[dut_name]["radio_data"]["5G"]["channel"]
logging.info("2g_channel:- " + str(channel_2g))
logging.info("5g_channel:- " + str(channel_5g))
logging.info("DUT data:- " + str(dut_data))
virtual_sta_radios = {}
virtual_sta_rssi_0_2 = {}
virtual_sta_rssi_0_5 = {}
virtual_sta_atten = {}
ax_radios = {}
ax_rssi_0_2 = {}
ax_rssi_0_5 = {}
ax_atten = {}
raw_line = []
k = 0
""" Logic for virtual sta radios """
# find out virtual sta radios and make raw lines
config_data = testbed_config_data["Virtual Sta Radio Settings"]
for i in config_data:
for j in config_data[i]:
virtual_sta_radios["radio-" + str(k)] = config_data[i]["5Ghz"]
k = k + 1
virtual_sta_radios["radio-" + str(k)] = config_data[i]["2.4Ghz"]
break
k = k + 1
logging.info("virtual_sta_radios:- " + str(virtual_sta_radios))
raw_line_list = [[f"{key}: {value}"] for key, value in virtual_sta_radios.items()]
raw_line.extend(raw_line_list)
# find out virtual sta virtual_sta_rssi_0_2, virtual_sta_rssi_0_5, virtual_sta_atten and make raw lines
c1 = 0
c2 = 0
c3 = 0
config_data = testbed_config_data["Virtual Sta Radio Settings"]
for i in config_data:
for j in config_data[i]:
if j == "2.4Ghz RSSI 0 Atten":
for k in config_data[i]["2.4Ghz RSSI 0 Atten"]:
virtual_sta_rssi_0_2["rssi_0_2-" + str(c1)] = k
c1 = c1 + 1
if j == "5Gh RSSI 0 Atten":
for l in config_data[i]["5Gh RSSI 0 Atten"]:
virtual_sta_rssi_0_5["rssi_0_5-" + str(c2)] = l
c2 = c2 + 1
if j == "Attenuator Modules":
for m in config_data[i]["Attenuator Modules"]:
virtual_sta_atten["atten-" + str(c3)] = m
c3 = c3 + 1
logging.info("virtual_sta_rssi_0_2:- " + str(virtual_sta_rssi_0_2))
logging.info("virtual_sta_rssi_0_5:- " + str(virtual_sta_rssi_0_5))
logging.info("virtual_sta_atten:- " + str(virtual_sta_atten))
raw_line_list = [[f"{key}: {value}"] for key, value in virtual_sta_rssi_0_2.items()]
raw_line.extend(raw_line_list)
raw_line_list = [[f"{key}: {value}"] for key, value in virtual_sta_rssi_0_5.items()]
raw_line.extend(raw_line_list)
raw_line_list = [[f"{key}: {value}"] for key, value in virtual_sta_atten.items()]
raw_line.extend(raw_line_list)
""" Logic for Ax radio setting """
c1 = 0
c2 = 0
c3 = 0
c4 = 0
config_data = testbed_config_data["802.11AX Settings"]
for i in config_data:
for j in config_data[i]:
if j == "Radios":
for k in config_data[i]["Radios"]:
ax_radios["ax_radio-" + str(c1)] = k
c1 = c1 + 1
if j == "2.4Ghz RSSI 0 Atten":
for l in config_data[i]["2.4Ghz RSSI 0 Atten"]:
ax_rssi_0_2["ax_rssi_0_2-" + str(c2)] = l
c2 = c2 + 1
if j == "5Ghz RSSI 0 Atten":
for m in config_data[i]["5Ghz RSSI 0 Atten"]:
ax_rssi_0_5["ax_rssi_0_5-" + str(c3)] = m
c3 = c3 + 1
if j == "Attenuator Modules":
for m in config_data[i]["Attenuator Modules"]:
if m != "":
ax_atten["ax_atten-" + str(c4)] = m
if c4 >= 12:
c4 = c4 + 2
else:
c4 = c4 + 1
logging.info("ax_radios:- " + str(ax_radios))
logging.info("ax_rssi_0_2:- " + str(ax_rssi_0_2))
logging.info("ax_rssi_0_5:- " + str(ax_rssi_0_5))
logging.info("ax_atten:- " + str(ax_atten))
raw_line_list = [[f"{key}: {value}"] for key, value in ax_radios.items()]
raw_line.extend(raw_line_list)
raw_line_list = [[f"{key}: {value}"] for key, value in ax_rssi_0_2.items()]
raw_line.extend(raw_line_list)
raw_line_list = [[f"{key}: {value}"] for key, value in ax_rssi_0_5.items()]
raw_line.extend(raw_line_list)
raw_line_list = [[f"{key}: {value}"] for key, value in ax_atten.items()]
raw_line.extend(raw_line_list)
# Fetch 2g_dut and 5g_dut
dut_2g = None
dut_5g = None
for i in dut_data[dut_name]['ssid_data']:
self.dut_idx_mapping[str(i)] = list(dut_data[dut_name]['ssid_data'][i].values())
if self.dut_idx_mapping[str(i)][3] == "2G":
dut_2g = dut_name + ' ' + self.dut_idx_mapping[str(i)][0] + ' ' \
'' + self.dut_idx_mapping[str(i)][
4].lower() + f' (1)'
if self.dut_idx_mapping[str(i)][3] == "5G":
dut_5g = dut_name + ' ' + self.dut_idx_mapping[str(i)][0] + ' ' \
'' + \
self.dut_idx_mapping[str(i)][4].lower() + f' (2)'
logging.info("dut_2g:- " + str(dut_2g))
logging.info("dut_5g:- " + str(dut_5g))
skip_twog, skip_fiveg = '1' if skip_2g else '0', '1' if skip_5g else '0'
if mode == "BRIDGE" or mode == "NAT-WAN":
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0]
if mode == "VLAN":
if vlan_id is None:
logging.error("VLAN ID is Unspecified in the VLAN Case")
pytest.skip("VLAN ID is Unspecified in the VLAN Case")
else:
if create_vlan:
vlan_raw_lines = self.add_vlan(vlan_ids=vlan_id, build=True)
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0] + "." + str(vlan_id[0])
logging.info("Upstream data: " + str(upstream_port))
skip_bandv2 = [['Skip 2.4Ghz Tests', f'{skip_twog}'], ['Skip 5Ghz Tests', f'{skip_fiveg}'],
['2.4Ghz Channel', f'{channel_2g}'], ['5Ghz Channel', f'{channel_5g}'], ["use_virtual_ax_sta", "1"],
["Use Issue-3 Behaviour", "0"], ["Skip 6Ghz Tests", "1"]]
enable_tests = [['rxsens: 0'], ['max_cx: 0'], ['max_tput: 0'], ['peak_perf: 0'], ['max_tput_bi: 0'],
['dual_band_tput: 0'], ['multi_band_tput: 0'], ['atf: 0'], ['atf3: 0'], ['qos3: 0'],
['lat3: 0'], ['mcast3: 0'], ['rvr: 0'], ['spatial: 0'], ['multi_sta: 0'], ['reset: 0'],
['mu_mimo: 0'], ['stability: 0'], ['ap_coex: 0'], ['acs: 0']]
for t in test:
if [f"{t}: 0"] in enable_tests:
enable_tests[enable_tests.index([f"{t}: 0"])] = [f"{t}: 1"]
else:
logging.info(f"Unable to find the {t} test in selected run")
raise ValueError(f"Unable to find the {t} test in selected run")
raw_line.extend(enable_tests)
update_cv_dut = {}
try:
for i in dut_data:
update_cv_dut[i] = dict.fromkeys(dut_data[i], {})
for j in dut_data[i]:
if j == 'ssid_data':
for k in dut_data[i][j]:
if (dut_data[i][j][k]['band'] == '5G' and dut_5g != ""
) or (dut_data[i][j][k]['band'] == '2G' and dut_2g != ""):
update_cv_dut[i][j][k] = dut_data[i][j][k].copy()
else:
update_cv_dut[i][j] = dut_data[i][j].copy()
except Exception as e:
logging.error(f"{e}")
logging.info("update cv dut:- " + str(update_cv_dut))
self.update_dut_ssid(dut_data=update_cv_dut)
instance_name = "tr398v2-instance-{}".format(str(random.randint(0, 100000)))
# if not os.path.exists("tr398-test-config.txt"):
# with open("tr398v2-test-config.txt", "wt") as f:
# for i in raw_line:
# f.write(str(i[0]) + "\n")
# f.close()
logging.info("raw lines:- " + str(raw_line))
cvtest_obj = TR398v2Test(lf_host=self.manager_ip,
lf_port=self.manager_http_port,
lf_user="lanforge",
lf_password="lanforge",
instance_name=instance_name,
upstream=upstream_port,
pull_report=True,
local_lf_report_dir=self.local_report_path,
load_old_cfg=False,
dut2=dut_2g,
dut5=dut_5g,
enables=[],
disables=[],
raw_lines=raw_line,
sets=skip_bandv2,
test_rig=dut_name)
# self.cvtest_obj.test_name, self.cvtest_obj.blob_text = "TR-398 Issue 2", "TR-398v2-"
# self.cvtest_obj.result = True
cvtest_obj.setup()
cvtest_obj.run()
# if os.path.exists("tr398v2-test-config.txt"):
# os.remove("tr398v2-test-config.txt")
if move_to_influx:
try:
report_name = "../reports/" + \
cvtest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
influx = CSVtoInflux(influx_host=self.influx_params["influx_host"],
influx_port=self.influx_params["influx_port"],
influx_org=self.influx_params["influx_org"],
influx_token=self.influx_params["influx_token"],
influx_bucket=self.influx_params["influx_bucket"],
path=report_name)
influx.glob()
except Exception as e:
print(e)
pass
report_name = cvtest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
time.sleep(10)
self.attach_report_graphs(report_name=report_name, pdf_name=str(test[0]) + " Test PDF Report")
result = self.read_kpi_file(column_name=["pass/fail"], dir_name=report_name)
self.attach_report_kpi(report_name=report_name)
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
if result[0][0] == "PASS":
return True, "Test Passed"
else:
return False, "Test Failed"
def tr398(self, radios_2g=[], radios_5g=[], radios_ax=[], dut_name="TIP", dut_5g="", dut_2g="", mode="BRIDGE",
vlan_id=1, skip_2g=True, skip_5g=False, instance_name="", test=None, move_to_influx=False, dut_data={},
ssid_name='', security_key='[BLANK]', security="open", sniff_packets=False, create_vlan=True,