From 911812b682671e3ff4a3032113c7841586dcd747 Mon Sep 17 00:00:00 2001 From: anil-tegala Date: Mon, 5 Sep 2022 03:03:29 +0530 Subject: [PATCH] pass/fail validations added to multipsk test Signed-off-by: anil-tegala --- lf_libs/lf_tests.py | 136 ++++++++++++++++++++++---------------------- 1 file changed, 69 insertions(+), 67 deletions(-) diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index d546a38c..940cbf85 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -305,99 +305,101 @@ class lf_tests(lf_libs): def multi_psk_test(self, band="twog", mpsk_data=None, ssid="'OpenWifi'", bssid="['BLANK']", passkey="OpenWifi", encryption="wpa", vlan_id=None, mode="BRIDGE", num_sta=1, dut_data=None): if mpsk_data is None: - mpsk_data={100: {"num_stations": num_sta, "passkey": "OpenWifi1"}, 200: {"num_stations": num_sta, "passkey": "OpenWifi2"}} + mpsk_data = {100: {"num_stations": num_sta, "passkey": "OpenWifi1"}, + 200: {"num_stations": num_sta, "passkey": "OpenWifi2"}} if dut_data is None: dut_data = self.dut_data - # setup_data = self.setup_interfaces(ssid="OpenWifi", bssid="", passkey="OpenWifi", encryption="wpa", - # vlan_id=list(mpsk_data.keys()), mode="VLAN", band="twog", num_sta=1) + logging.info("Creating VLAN's") # create VLAN's self.add_vlan(vlan_ids=list(mpsk_data.keys())) - # query vlan and fetch Ip Address + logging.info("Wait until VLAN's bring up") + time.sleep(10) + # query and fetch vlan Ip Address port_data=self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces'] # Fail if Vlan don't have IP + vlan_data = {} for i in port_data: for item in i: if i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] == '0.0.0.0': logging.error(f"VLAN Interface - {i[item]['alias']} do not have IP") - pytest.fail("VLAN don't have IP") + pytest.fail("VLAN do not have IP") break + elif i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] != '0.0.0.0': + vlan_data[i[item]['alias'].split(".")[1]] = i[item] else: pass - #create stations - for key in range(len(list(mpsk_data.keys()))): - passkey = mpsk_data[key]["passkey"] - self.client_connect(ssid=ssid, passkey=passkey, security=encryption, mode=mode, band=band, - vlan_id=[vlan_id], num_sta=num_sta, scan_ssid=True, - station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"], - allure_attach=True) + # create stations + sta_data = {} + for key in list(mpsk_data.keys()): + if "passkey" in mpsk_data[key] and mpsk_data[key]["passkey"] is not None: + sta_data[key] = self.client_connect(ssid=ssid, passkey=mpsk_data[key]["passkey"], security=encryption, mode=mode, band=band, + vlan_id=[None], num_sta=num_sta, scan_ssid=True, + station_data=["ip", "alias", "mac", "port type"], + allure_attach=True) + non_vlan_sta = "" + if mode == "BRIDGE": + non_vlan_sta = "WAN Upstream" + else: + non_vlan_sta = "LAN upstream" + sta_data[non_vlan_sta] = self.client_connect(ssid=ssid, passkey=passkey, security=encryption, mode=mode, band=band, + vlan_id=[None], num_sta=num_sta, scan_ssid=True, + station_data=["ip", "alias", "mac", "port type"], + allure_attach=True) + logging.info("station data: " + str(sta_data)) # check Pass/Fail - res_data=self.json_get(_req_url='port?fields=alias,port+type,ip,mac', )['interfaces'] table_heads=["station name", "configured vlan-id", "expected IP Range", "allocated IP", "mac address", 'pass/fail'] table_data=[] - temp={'sta100': '', 'sta200': '', 'sta00': ''} - for i in res_data: - for item in i: - if i[item]['port type'] == 'Ethernet' and i[item]['alias'] == self.upstream_port: - if mode == 'NAT': - temp.update({'sta00': '192.168.1.1'}) - else: - temp.update({'sta00': i[item]['ip']}) - if i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port + ".100"): - temp.update({'sta100': i[item]['ip']}) - elif i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port + ".200"): - temp.update({'sta200': i[item]['ip']}) - for i in res_data: - for item in i: - if i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == "sta100": - exp1=temp['sta100'].split('.') - ip1=i[item]['ip'].split('.') + for i in sta_data: + if str(i) in vlan_data: + for item in sta_data[i]: + exp1 = sta_data[i][item]['ip'].split('.') + ip1 = vlan_data[str(i)]['ip'].split('.') if exp1[0] == ip1[0] and exp1[1] == ip1[1]: - pf='PASS' + pf = 'PASS' + logging.info(f"PASS: Station got IP from vlan {i}") else: - pf='FAIL' - result1='Fail' + pf = 'FAIL' + logging.info(f"FAIL: Station did not got IP from vlan {i}") table_data.append( - [i[item]['alias'], '100', f'{exp1[0]}.{exp1[1]}.X.X', i[item]['ip'], i[item]['mac'], + [sta_data[i][item]['alias'], sta_data[i][item], f'{exp1[0]}.{exp1[1]}.X.X', sta_data[i][item]['ip'], sta_data[i][item]['mac'], f'{pf}']) - elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta200': - exp2=temp['sta200'].split('.') - ip2=i[item]['ip'].split('.') + elif str(i) == "WAN Upstream": + for item in sta_data[i]: + exp2 = sta_data[i][item]['ip'].split('.') + ip2 = vlan_data[str(i)]['ip'].split('.') if exp2[0] == ip2[0] and exp2[1] == ip2[1]: - pf='PASS' + pf = 'PASS' + logging.info(f"PASS: Station got IP from WAN Upstream") else: - pf='FAIL' - result1='Fail' + pf = 'FAIL' + logging.info(f"FAIL: Station did not got IP from WAN Upstream") table_data.append( - [i[item]['alias'], '200', f'{exp2[0]}.{exp2[1]}.X.X', i[item]['ip'], i[item]['mac'], f'{pf}']) - elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta00': - exp3=temp['sta00'].split('.') - ip2=i[item]['ip'].split('.') - if mode == "BRIDGE": - if exp3[0] == ip2[0] and exp3[1] == ip2[1]: - pf='PASS' - else: - pf='FAIL' - result1='Fail' - table_data.append([i[item]['alias'], 'WAN upstream', f'{exp3[0]}.{exp3[1]}.X.X', i[item]['ip'], - i[item]['mac'], f'{pf}']) - elif mode == "NAT": - if exp3[0] == '192' and exp3[1] == '168': - pf='PASS' - else: - pf='FAIL' - result1='Fail' - table_data.append( - [i[item]['alias'], 'LAN upstream', f'192.168.X.X', i[item]['ip'], i[item]['mac'], f'{pf}']) - print(table_data) + [sta_data[i][item]['alias'], sta_data[i][item], f'{exp2[0]}.{exp2[1]}.X.X', + sta_data[i][item]['ip'], sta_data[i][item]['mac'], + f'{pf}']) + elif str(i) == "LAN Upstream": + for item in sta_data[i]: + exp3 = sta_data[i][item]['ip'].split('.') + ip3=vlan_data[str(i)]['ip'].split('.') + if exp3[0] == '192' and exp3[1] == '168': + pf = 'PASS' + logging.info(f"PASS: Station got IP from LAN Upstream") + else: + pf = 'FAIL' + logging.info(f"FAIL: Station did not got IP from LAN Upstream") + table_data.append( + [sta_data[i][item]['alias'], 'LAN upstream', f'192.168.X.X', sta_data[i][item]['ip'], sta_data[i][item]['mac'], f'{pf}']) + # attach test data in a table to allure - report_obj=Report() - table_info=report_obj.table2(table=table_data, headers=table_heads) - allure.attach(name="Test Validation Check", body=table_info) + report_obj = Report() + table_info = report_obj.table2(table=table_data, headers=table_heads) logging.info(str("\n") + str(table_info)) + allure.attach(name="Test Results", body=table_info) + return table_data def client_connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog", vlan_id=[None], num_sta=None, scan_ssid=True, sta_mode=0, @@ -961,7 +963,7 @@ if __name__ == '__main__': "testbed": "basic", "scenario": "dhcp-bridge", "details": { - "manager_ip": "10.28.3.34", + "manager_ip": "10.28.3.28", "http_port": 8080, "ssh_port": 22, "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, @@ -1014,8 +1016,8 @@ if __name__ == '__main__': # vlan_id=[None], num_sta=65, scan_ssid=True, # station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"], # allure_attach=True) - obj.multi_psk_test(band="twog", mpsk_data=None, ssid="['BLANK']", bssid="['BLANK']", passkey="['BLANK']", - encryption="wpa2", vlan_id=None, mode=None, num_sta=1) + obj.multi_psk_test(band="twog", mpsk_data=None, ssid="OpenWifi", bssid="['00:00:c1:01:88:12']", passkey="OpenWifi", + encryption="wpa", vlan_id=None, mode="BRIDGE", num_sta=1) # obj.add_vlan(vlan_iFds=[100]) # obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600]) # obj.get_cx_data()