run test in tr398/tr398v2

Signed-off-by: karthikaeyetea <karthika.subramani@candelatech.com>
This commit is contained in:
karthikaeyetea
2022-12-19 11:28:12 +05:30
parent c0f519f8d5
commit 0eb5a5715d
4 changed files with 108 additions and 34 deletions

View File

@@ -1538,7 +1538,6 @@ class lf_libs:
def attenuator_serial_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", atn_val=400, def attenuator_serial_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", atn_val=400,
vlan_id=100, sta_mode=0, station_name=[], radio='1.1.wiphy0', timeout=20): vlan_id=100, sta_mode=0, station_name=[], radio='1.1.wiphy0', timeout=20):
radio = radio
# index 0 of atten_serial_radio will ser no of 1st 2g/5g radio and index 1 will ser no of 2nd and 3rd 2g/5g radio # index 0 of atten_serial_radio will ser no of 1st 2g/5g radio and index 1 will ser no of 2nd and 3rd 2g/5g radio
atten_serial_radio = [] atten_serial_radio = []
atten_serial = self.attenuator_serial() atten_serial = self.attenuator_serial()

View File

@@ -56,6 +56,8 @@ tr398v2test = importlib.import_module("py-scripts.lf_tr398v2_test")
TR398v2Test = tr398v2test.TR398v2Test TR398v2Test = tr398v2test.TR398v2Test
rvr = importlib.import_module("py-scripts.lf_rvr_test") rvr = importlib.import_module("py-scripts.lf_rvr_test")
rvr_test = rvr.RvrTest rvr_test = rvr.RvrTest
lf_pcap = importlib.import_module("py-scripts.lf_pcap")
LfPcap = lf_pcap.LfPcap
class lf_tests(lf_libs): class lf_tests(lf_libs):
@@ -1437,7 +1439,8 @@ class lf_tests(lf_libs):
def tr398(self,radios_2g=[], radios_5g=[], radios_ax=[], dut_name="TIP", dut_5g="", dut_2g="", mode="BRIDGE", def tr398(self,radios_2g=[], radios_5g=[], radios_ax=[], dut_name="TIP", dut_5g="", dut_2g="", mode="BRIDGE",
vlan_id=1, skip_2g=True, skip_5g=False, instance_name="", test=None, move_to_influx=False,dut_data={}, vlan_id=1, skip_2g=True, skip_5g=False, instance_name="", test=None, move_to_influx=False,dut_data={},
ssid_name='', security_key='[BLANK]', security="open"): ssid_name='', security_key='[BLANK]', security="open", sniff_packets=False, create_vlan=True, tr398v2=False,
tr398=False):
#User can select one or more TR398 tests #User can select one or more TR398 tests
try: try:
if type(test) == str: if type(test) == str:
@@ -1445,19 +1448,38 @@ class lf_tests(lf_libs):
self.client_disconnect(clean_l3_traffic=True) self.client_disconnect(clean_l3_traffic=True)
raw_line = [] raw_line = []
skip_twog, skip_fiveg = '1' if skip_2g else '0', '1' if skip_5g else '0' skip_twog, skip_fiveg = '1' if skip_2g else '0', '1' if skip_5g else '0'
if mode == "BRIDGE" or mode == "NAT": channel = 149 if skip_twog else 11
sniff_radio = 'wiphy0'
if mode == "BRIDGE" or mode == "NAT-WAN":
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0] upstream_port = list(self.lanforge_data['wan_ports'].keys())[0]
else: if mode == "VLAN":
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0] + "." + str(vlan_id) if vlan_id is None:
atten_serial = self.attenuator_serial_radio(ssid=ssid_name, passkey=security_key, security=security, sta_mode=0, logging.error("VLAN ID is Unspecified in the VLAN Case")
station_name=['sta0000']) pytest.skip("VLAN ID is Unspecified in the VLAN Case")
else:
if create_vlan:
vlan_raw_lines = self.add_vlan(vlan_ids=vlan_id, build=True)
upstream_port = list(self.lanforge_data['wan_ports'].keys())[0] + "." + str(vlan_id[0])
logging.info("Upstream data: " + str(upstream_port))
enable_tests = [['rxsens: 0'], ['max_cx: 0'], ['max_tput: 0'], ['peak_perf: 0'], ['max_tput_bi: 0'], atten_serial = self.attenuator_serial_radio(ssid=ssid_name, passkey=security_key, security=security, sta_mode=0,
['dual_band_tput: 0'], station_name=['sta0000'],
['multi_band_tput: 0'], ['atf: 0'], ['atf3: 0'], ['qos3: 0'], ['lat3: 0'], ['mcast3: 0'], radio=self.wave2_2g_radios[0] if skip_5g else self.wave2_5g_radios[0])
['rvr: 0'],
['spatial: 0'], ['multi_sta: 0'], ['reset: 0'], ['mu_mimo: 0'], ['stability: 0'], if tr398v2:
['ap_coex: 0'], ['acs: 0']] enable_tests = [['rxsens: 0'], ['max_cx: 0'], ['max_tput: 0'], ['peak_perf: 0'], ['max_tput_bi: 0'],
['dual_band_tput: 0'], ['multi_band_tput: 0'], ['atf: 0'], ['atf3: 0'], ['qos3: 0'],
['lat3: 0'], ['mcast3: 0'], ['rvr: 0'], ['spatial: 0'], ['multi_sta: 0'], ['reset: 0'],
['mu_mimo: 0'], ['stability: 0'], ['ap_coex: 0'], ['acs: 0']]
elif tr398:
enable_tests = [['rxsens: 0'], ['max_cx: 0'], ['max_tput: 0'], ['atf: 0'], ['rvr: 0'], ['spatial: 0'],
['multi_sta: 0'], ['reset: 0'], ['mu_mimo: 0'], ['stability: 0'], ['ap_coex: 0']]
for t in test:
if [f"{t}: 0"] in enable_tests:
enable_tests[enable_tests.index([f"{t}: 0"])] = [f"{t}: 1"]
else:
logging.info(f"Unable to find the {t} test in selected run")
raise ValueError(f"Unable to find the {t} test in selected run")
rad_atten = [[f'atten-0: {atten_serial[0]}.0'], [f'atten-1: {atten_serial[0]}.1'], [f'atten-2: {atten_serial[0]}.2'], rad_atten = [[f'atten-0: {atten_serial[0]}.0'], [f'atten-1: {atten_serial[0]}.1'], [f'atten-2: {atten_serial[0]}.2'],
[f'atten-3: {atten_serial[0]}.3'], [f'atten-4: {atten_serial[1]}.0'], [f'atten-5: {atten_serial[1]}.1'], [f'atten-3: {atten_serial[0]}.3'], [f'atten-4: {atten_serial[1]}.0'], [f'atten-5: {atten_serial[1]}.1'],
@@ -1465,9 +1487,7 @@ class lf_tests(lf_libs):
skip_band = [['Skip 2.4Ghz Tests', f'{skip_twog}'], ['Skip 5Ghz Tests', f'{skip_fiveg}'], skip_band = [['Skip 2.4Ghz Tests', f'{skip_twog}'], ['Skip 5Ghz Tests', f'{skip_fiveg}'],
['2.4Ghz Channel', 'AUTO'], ['5Ghz Channel', 'AUTO'], ['Skip AX Tests', '1']] ['2.4Ghz Channel', 'AUTO'], ['5Ghz Channel', 'AUTO'], ['Skip AX Tests', '1']]
for t in test:
if [f"{t}: 0"] in enable_tests:
enable_tests[enable_tests.index([f"{t}: 0"])] = [f"{t}: 1"]
if len(radios_2g) >= 3 and len(radios_5g) >= 3: if len(radios_2g) >= 3 and len(radios_5g) >= 3:
for i in range(6): for i in range(6):
if i == 0 or i == 2: if i == 0 or i == 2:
@@ -1476,20 +1496,61 @@ class lf_tests(lf_libs):
raw_line.append([f'radio-{i}: {radios_2g[0] if i == 1 else radios_2g[1]}']) raw_line.append([f'radio-{i}: {radios_2g[0] if i == 1 else radios_2g[1]}'])
if i == 4 or i == 5: if i == 4 or i == 5:
raw_line.append([f'radio-{i}: {radios_5g[2] if i == 4 else radios_2g[2]}']) raw_line.append([f'radio-{i}: {radios_5g[2] if i == 4 else radios_2g[2]}'])
if sniff_packets:
if len(radios_ax) >= 1:
temp_ax = str(radios_ax[0]).split(" ")
if len(temp_ax) == 2:
sniff_radio = str(temp_ax[1])
elif skip_2g:
temp = str(radios_5g[0]).split(" ")
if len(temp) == 2:
sniff_radio = str(temp[1])
elif skip_5g:
temp = str(radios_2g[0]).split(" ")
if len(temp) == 2:
sniff_radio = str(temp[1])
elif len(radios_2g) >= 2 and len(radios_5g) >= 2 and len(radios_ax) >= 2: elif len(radios_2g) >= 2 and len(radios_5g) >= 2 and len(radios_ax) >= 2:
for i in range(6): if len(radios_2g) >= 3 and len(radios_5g) >= 3:
if i == 0 or i == 2: for i in range(6):
raw_line.append([f'radio-{i}: {radios_5g[0] if i == 0 else radios_5g[1]}']) if i == 0 or i == 2:
if i == 1 or i == 3: raw_line.append([f'radio-{i}: {radios_5g[0] if i == 0 else radios_5g[1]}'])
raw_line.append([f'radio-{i}: {radios_2g[0] if i == 1 else radios_2g[1]}']) if i == 1 or i == 3:
if i == 4 or i == 5: raw_line.append([f'radio-{i}: {radios_2g[0] if i == 1 else radios_2g[1]}'])
raw_line.append([f'radio-{i}: {radios_ax[0] if i == 4 else radios_ax[1]}']) if i == 4 or i == 5:
raw_line.append([f'radio-{i}: {radios_5g[2] if i == 4 else radios_2g[2]}'])
if sniff_packets:
if len(radios_ax) >= 1:
temp_ax = str(radios_ax[0]).split(" ")
if len(temp_ax) == 2:
sniff_radio = str(temp_ax[1])
else:
for i in range(6):
if i == 0 or i == 2:
raw_line.append([f'radio-{i}: {radios_5g[0] if i == 0 else radios_5g[1]}'])
if i == 1 or i == 3:
raw_line.append([f'radio-{i}: {radios_2g[0] if i == 1 else radios_2g[1]}'])
if i == 4 or i == 5:
raw_line.append([f'radio-{i}: {radios_ax[0] if i == 4 else radios_ax[1]}'])
if len(raw_line) != 6: if len(raw_line) != 6:
raw_line = [['radio-0: 1.1.5 wiphy1'], ['radio-1: 1.1.4 wiphy0'], ['radio-2: 1.1.7 wiphy3'], raw_line = [['radio-0: 1.1.5 wiphy1'], ['radio-1: 1.1.4 wiphy0'], ['radio-2: 1.1.7 wiphy3'],
['radio-3: 1.1.6 wiphy2'], ['radio-4: 1.1.8 wiphy4'], ['radio-5: 1.1.9 wiphy5']] ['radio-3: 1.1.6 wiphy2'], ['radio-4: 1.1.8 wiphy4'], ['radio-5: 1.1.9 wiphy5']]
raw_line.extend(enable_tests + rad_atten) raw_line.extend(enable_tests + rad_atten)
self.update_dut_ssid(dut_data=dut_data) update_cv_dut = {}
try:
for i in dut_data:
update_cv_dut[i] = dict.fromkeys(dut_data[i], {})
for j in dut_data[i]:
if j == 'ssid_data':
for k in dut_data[i][j]:
if (dut_data[i][j][k]['band'] == '5G' and dut_5g != ""
) or (dut_data[i][j][k]['band'] == '2G' and dut_2g != ""):
update_cv_dut[i][j][k] = dut_data[i][j][k].copy()
else:
update_cv_dut[i][j] = dut_data[i][j].copy()
except Exception as e:
logging.error(f"{e}")
self.update_dut_ssid(dut_data=update_cv_dut)
instance_name = "tr398-instance-{}".format(str(random.randint(0, 100000))) instance_name = "tr398-instance-{}".format(str(random.randint(0, 100000)))
# if not os.path.exists("tr398-test-config.txt"): # if not os.path.exists("tr398-test-config.txt"):
@@ -1515,12 +1576,25 @@ class lf_tests(lf_libs):
disables=[], disables=[],
raw_lines=[], raw_lines=[],
sets=skip_band, sets=skip_band,
test_rig=dut_name test_rig=dut_name)
) if tr398v2:
self.cvtest_obj.test_name = "TR-398 Issue 2" self.cvtest_obj.test_name, self.cvtest_obj.blob_text = "TR-398 Issue 2", "TR-398v2-"
elif tr398:
self.cvtest_obj.test_name, self.cvtest_obj.blob_text = "TR-398", "TR-398-"
self.cvtest_obj.result = True self.cvtest_obj.result = True
self.cvtest_obj.setup() self.cvtest_obj.setup()
self.cvtest_obj.run() if sniff_packets:
self.pcap_obj = LfPcap(host=self.manager_ip, port=self.manager_http_port)
t1 = threading.Thread(target=self.cvtest_obj.run)
t1.start()
t2 = threading.Thread(target=self.pcap_obj.sniff_packets, args=(sniff_radio, "mu-mimo", channel, 40))
if t1.is_alive():
time.sleep(375)
t2.start()
while t1.is_alive():
time.sleep(1)
else:
self.cvtest_obj.run()
if os.path.exists("tr398-test-config.txt"): if os.path.exists("tr398-test-config.txt"):
os.remove("tr398-test-config.txt") os.remove("tr398-test-config.txt")
@@ -1538,7 +1612,7 @@ class lf_tests(lf_libs):
except Exception as e: except Exception as e:
print(e) print(e)
pass pass
report_name = self.cvtest_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/" report_name = self.cvtest_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
self.attach_report_graphs(report_name=report_name) self.attach_report_graphs(report_name=report_name)
result = self.read_kpi_file(column_name=["pass/fail"], dir_name=report_name) result = self.read_kpi_file(column_name=["pass/fail"], dir_name=report_name)
allure.attach.file(source="../reports/" + report_name + "/kpi.csv", allure.attach.file(source="../reports/" + report_name + "/kpi.csv",

View File

@@ -25,8 +25,8 @@ from datetime import datetime
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
wifi_monitor = importlib.import_module("py-json.wifi_monitor_profile") wifi_monitor_profile = importlib.import_module("py-json.wifi_monitor_profile")
WiFiMonitor = wifi_monitor.WifiMonitor WiFiMonitor = wifi_monitor_profile.WifiMonitor
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base") lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
LFCliBase = lfcli_base.LFCliBase LFCliBase = lfcli_base.LFCliBase
realm = importlib.import_module("py-json.realm") realm = importlib.import_module("py-json.realm")
@@ -48,7 +48,7 @@ class LfPcap(Realm):
_debug_on=False _debug_on=False
): ):
# uncomment the follwoing line to use wifi monitor functions # uncomment the follwoing line to use wifi monitor functions
# super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on) super().__init__(lfclient_host=host, lfclient_port=port, debug_=_debug_on)
self.host = host, self.host = host,
self.port = port self.port = port
self.debug = _debug_on self.debug = _debug_on
@@ -63,7 +63,7 @@ class LfPcap(Realm):
self.remote_cap_host = _live_remote_cap_host self.remote_cap_host = _live_remote_cap_host
self.remote_cap_interface = _live_remote_cap_interface self.remote_cap_interface = _live_remote_cap_interface
# uncomment the follwoing line to use wifi monitor functions # uncomment the follwoing line to use wifi monitor functions
# self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug) self.wifi_monitor = WiFiMonitor(self.lfclient_url, local_realm=self, debug_=self.debug)
def read_pcap(self, pcap_file, apply_filter=None): def read_pcap(self, pcap_file, apply_filter=None):
self.pcap_file = pcap_file self.pcap_file = pcap_file

View File

@@ -324,6 +324,7 @@ class TR398v2Test(cvtest):
self.sets = sets self.sets = sets
self.local_lf_report_dir = local_lf_report_dir self.local_lf_report_dir = local_lf_report_dir
self.test_rig = test_rig self.test_rig = test_rig
self.blob_text = "TR-398v2-"
def setup(self): def setup(self):
# Nothing to do at this time. # Nothing to do at this time.
@@ -334,7 +335,7 @@ class TR398v2Test(cvtest):
time.sleep(2) time.sleep(2)
self.sync_cv() self.sync_cv()
blob_test = "TR398v2-" blob_test = self.blob_text
self.rm_text_blob(self.config_name, blob_test) # To delete old config with same name self.rm_text_blob(self.config_name, blob_test) # To delete old config with same name
self.show_text_blob(None, None, False) self.show_text_blob(None, None, False)