diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index 1e0e12f5..3ad466ed 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -1132,6 +1132,7 @@ class lf_libs: return data[radio]["max_vifs"] def add_vlan(self, vlan_ids=[]): + self.temp_raw_lines = self.default_scenario_raw_lines.copy() data = self.json_get("/port/all") flag = 0 profile_name = "" diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index 9b89c231..293289fb 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -544,16 +544,17 @@ class lf_tests(lf_libs): if mpsk_data is None: mpsk_data = {100: {"num_stations": num_sta, "passkey": "OpenWifi1"}, 200: {"num_stations": num_sta, "passkey": "OpenWifi2"}} - if dut_data is None: - dut_data = self.dut_data - logging.info("Creating VLAN's") # create VLAN's + vlans = list(mpsk_data.keys()) + if "default" in vlans: + vlans.remove("default") self.add_vlan(vlan_ids=list(mpsk_data.keys())) + logging.info("Wait until VLAN's bring up") time.sleep(10) # query and fetch vlan Ip Address - port_data=self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces'] + port_data = self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces'] # Fail if Vlan don't have IP vlan_data = {} for i in port_data: @@ -569,34 +570,37 @@ class lf_tests(lf_libs): # create stations sta_data = {} - for key in list(mpsk_data.keys()): - if "passkey" in mpsk_data[key] and mpsk_data[key]["passkey"] is not None: - sta_data[key] = self.client_connect(ssid=ssid, passkey=mpsk_data[key]["passkey"], security=encryption, mode=mode, band=band, - vlan_id=[None], num_sta=num_sta, scan_ssid=True, - station_data=["ip", "alias", "mac", "port type"], - allure_attach=True) non_vlan_sta = "" if mode == "BRIDGE" or mode == "NAT-WAN": - # setup_data = self.setup_interfaces(ssid=ssid, bssid=bssid, passkey=bssid, encryption=encryption, - # band=band, vlan_id=None, mode=mode, num_sta=None) logging.info(f"---------- \n setup data : { - # setup_data} \n") non_vlan_sta = "WAN Upstream" - upstream_port = dut_data[0]["wan_port"] + upstream_port = self.dut_data[0]["wan_port"] vlan_data[non_vlan_sta] = self.wan_ports[upstream_port] if mode == "NAT-LAN": non_vlan_sta = "LAN upstream" - upstream_port=dut_data[0]["lan_port"] - vlan_data[non_vlan_sta]=self.lan_ports[upstream_port] - sta_data[non_vlan_sta] = self.client_connect(ssid=ssid, passkey=passkey, security=encryption, mode=mode, band=band, - vlan_id=[None], num_sta=num_sta, scan_ssid=True, - station_data=["ip", "alias", "mac", "port type"], - allure_attach=True) + upstream_port = self.dut_data[0]["lan_port"] + vlan_data[non_vlan_sta] = self.lan_ports[upstream_port] + for key in list(mpsk_data.keys()): + if key == "default": + sta_data[non_vlan_sta] = self.client_connect(ssid=ssid, passkey=passkey, security=encryption, mode=mode, + band=band, + vlan_id=[None], num_sta=num_sta, scan_ssid=True, + station_data=["ip", "alias", "mac", "port type"], + allure_attach=True) + self.client_disconnect(station_name=list(sta_data[non_vlan_sta].keys())) + if "passkey" in mpsk_data[key] and mpsk_data[key]["passkey"] is not None: + sta_data[key] = self.client_connect(ssid=ssid, passkey=mpsk_data[key]["passkey"], security=encryption, + mode=mode, band=band, + vlan_id=[None], num_sta=num_sta, scan_ssid=True, + station_data=["ip", "alias", "mac", "port type"], + allure_attach=True) + self.client_disconnect(station_name=list(sta_data[key].keys())) + logging.info("station data: " + str(sta_data)) # check Pass/Fail - table_heads=["station name", "configured vlan-id", "expected IP Range", "allocated IP", "mac address", - 'pass/fail'] - table_data=[] + table_heads = ["station name", "configured vlan-id", "expected IP Range", "allocated IP", "mac address", + 'pass/fail'] + table_data = [] pf = 'PASS' for i in sta_data: if (str(i) in vlan_data) and (str(i) != 'WAN Upstream' and str(i) != 'LAN Upstream'): @@ -610,7 +614,8 @@ class lf_tests(lf_libs): pf = 'FAIL' logging.info(f"FAIL: Station did not got IP from vlan {i}") table_data.append( - [sta_data[i][item]['alias'], str(i), f'{exp1[0]}.{exp1[1]}.X.X', sta_data[i][item]['ip'], sta_data[i][item]['mac'], + [sta_data[i][item]['alias'], str(i), f'{exp1[0]}.{exp1[1]}.X.X', sta_data[i][item]['ip'], + sta_data[i][item]['mac'], f'{pf}']) elif str(i) == "WAN Upstream": for item in sta_data[i]: @@ -629,7 +634,7 @@ class lf_tests(lf_libs): elif str(i) == "LAN Upstream": for item in sta_data[i]: exp3 = sta_data[i][item]['ip'].split('.') - ip3=vlan_data[str(i)]['ip'].split('.') + ip3 = vlan_data[str(i)]['ip'].split('.') if exp3[0] == '192' and exp3[1] == '168': pf = 'PASS' logging.info(f"PASS: Station got IP from LAN Upstream") @@ -637,7 +642,8 @@ class lf_tests(lf_libs): pf = 'FAIL' logging.info(f"FAIL: Station did not got IP from LAN Upstream") table_data.append( - [sta_data[i][item]['alias'], 'LAN upstream', f'192.168.X.X', sta_data[i][item]['ip'], sta_data[i][item]['mac'], f'{pf}']) + [sta_data[i][item]['alias'], 'LAN upstream', f'192.168.X.X', sta_data[i][item]['ip'], + sta_data[i][item]['mac'], f'{pf}']) # attach test data in a table to allure report_obj = Report() @@ -1271,7 +1277,14 @@ if __name__ == '__main__': # obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600]) # obj.get_cx_data() # obj.chamber_view() - dut = {'903cb36c4301': {'ssid_data': {0: {'ssid': 'ssid_wpa_2g_br', 'encryption': 'wpa', 'password': 'something', 'band': '2G', 'bssid': '90:3C:B3:6C:43:04'}}, 'radio_data': {'2G': {'channel': 6, 'bandwidth': 20, 'frequency': 2437}, '5G': {'channel': None, 'bandwidth': None, 'frequency': None}, '6G': {'channel': None, 'bandwidth': None, 'frequency': None}}}} + dut = {'903cb36c4301': + {'ssid_data': { + 0: {'ssid': 'ssid_wpa_2g_br', 'encryption': 'wpa', 'password': 'something', 'band': '2G', + 'bssid': '90:3C:B3:6C:43:04'}}, 'radio_data': {'2G': {'channel': 6, 'bandwidth': 20, 'frequency': 2437}, + '5G': {'channel': None, 'bandwidth': None, + 'frequency': None}, + '6G': {'channel': None, 'bandwidth': None, + 'frequency': None}}}} passes, result = obj.client_connectivity_test(ssid="ssid_wpa_2g_br", passkey="something", security="wpa", extra_securities=[],