mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 11:18:03 +00:00
pass/fail validations added to multipsk test
Signed-off-by: anil-tegala <anil.tegala@candelatech.com>
This commit is contained in:
@@ -305,99 +305,101 @@ class lf_tests(lf_libs):
|
||||
def multi_psk_test(self, band="twog", mpsk_data=None, ssid="'OpenWifi'", bssid="['BLANK']", passkey="OpenWifi",
|
||||
encryption="wpa", vlan_id=None, mode="BRIDGE", num_sta=1, dut_data=None):
|
||||
if mpsk_data is None:
|
||||
mpsk_data={100: {"num_stations": num_sta, "passkey": "OpenWifi1"}, 200: {"num_stations": num_sta, "passkey": "OpenWifi2"}}
|
||||
mpsk_data = {100: {"num_stations": num_sta, "passkey": "OpenWifi1"},
|
||||
200: {"num_stations": num_sta, "passkey": "OpenWifi2"}}
|
||||
if dut_data is None:
|
||||
dut_data = self.dut_data
|
||||
# setup_data = self.setup_interfaces(ssid="OpenWifi", bssid="", passkey="OpenWifi", encryption="wpa",
|
||||
# vlan_id=list(mpsk_data.keys()), mode="VLAN", band="twog", num_sta=1)
|
||||
|
||||
logging.info("Creating VLAN's")
|
||||
# create VLAN's
|
||||
self.add_vlan(vlan_ids=list(mpsk_data.keys()))
|
||||
# query vlan and fetch Ip Address
|
||||
logging.info("Wait until VLAN's bring up")
|
||||
time.sleep(10)
|
||||
# query and fetch vlan Ip Address
|
||||
port_data=self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces']
|
||||
# Fail if Vlan don't have IP
|
||||
vlan_data = {}
|
||||
for i in port_data:
|
||||
for item in i:
|
||||
if i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] == '0.0.0.0':
|
||||
logging.error(f"VLAN Interface - {i[item]['alias']} do not have IP")
|
||||
pytest.fail("VLAN don't have IP")
|
||||
pytest.fail("VLAN do not have IP")
|
||||
break
|
||||
elif i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] != '0.0.0.0':
|
||||
vlan_data[i[item]['alias'].split(".")[1]] = i[item]
|
||||
else:
|
||||
pass
|
||||
|
||||
#create stations
|
||||
for key in range(len(list(mpsk_data.keys()))):
|
||||
passkey = mpsk_data[key]["passkey"]
|
||||
self.client_connect(ssid=ssid, passkey=passkey, security=encryption, mode=mode, band=band,
|
||||
vlan_id=[vlan_id], num_sta=num_sta, scan_ssid=True,
|
||||
station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
|
||||
allure_attach=True)
|
||||
# create stations
|
||||
sta_data = {}
|
||||
for key in list(mpsk_data.keys()):
|
||||
if "passkey" in mpsk_data[key] and mpsk_data[key]["passkey"] is not None:
|
||||
sta_data[key] = self.client_connect(ssid=ssid, passkey=mpsk_data[key]["passkey"], security=encryption, mode=mode, band=band,
|
||||
vlan_id=[None], num_sta=num_sta, scan_ssid=True,
|
||||
station_data=["ip", "alias", "mac", "port type"],
|
||||
allure_attach=True)
|
||||
non_vlan_sta = ""
|
||||
if mode == "BRIDGE":
|
||||
non_vlan_sta = "WAN Upstream"
|
||||
else:
|
||||
non_vlan_sta = "LAN upstream"
|
||||
sta_data[non_vlan_sta] = self.client_connect(ssid=ssid, passkey=passkey, security=encryption, mode=mode, band=band,
|
||||
vlan_id=[None], num_sta=num_sta, scan_ssid=True,
|
||||
station_data=["ip", "alias", "mac", "port type"],
|
||||
allure_attach=True)
|
||||
logging.info("station data: " + str(sta_data))
|
||||
|
||||
# check Pass/Fail
|
||||
res_data=self.json_get(_req_url='port?fields=alias,port+type,ip,mac', )['interfaces']
|
||||
table_heads=["station name", "configured vlan-id", "expected IP Range", "allocated IP", "mac address",
|
||||
'pass/fail']
|
||||
table_data=[]
|
||||
temp={'sta100': '', 'sta200': '', 'sta00': ''}
|
||||
for i in res_data:
|
||||
for item in i:
|
||||
if i[item]['port type'] == 'Ethernet' and i[item]['alias'] == self.upstream_port:
|
||||
if mode == 'NAT':
|
||||
temp.update({'sta00': '192.168.1.1'})
|
||||
else:
|
||||
temp.update({'sta00': i[item]['ip']})
|
||||
if i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port + ".100"):
|
||||
temp.update({'sta100': i[item]['ip']})
|
||||
elif i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port + ".200"):
|
||||
temp.update({'sta200': i[item]['ip']})
|
||||
for i in res_data:
|
||||
for item in i:
|
||||
if i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == "sta100":
|
||||
exp1=temp['sta100'].split('.')
|
||||
ip1=i[item]['ip'].split('.')
|
||||
for i in sta_data:
|
||||
if str(i) in vlan_data:
|
||||
for item in sta_data[i]:
|
||||
exp1 = sta_data[i][item]['ip'].split('.')
|
||||
ip1 = vlan_data[str(i)]['ip'].split('.')
|
||||
if exp1[0] == ip1[0] and exp1[1] == ip1[1]:
|
||||
pf='PASS'
|
||||
pf = 'PASS'
|
||||
logging.info(f"PASS: Station got IP from vlan {i}")
|
||||
else:
|
||||
pf='FAIL'
|
||||
result1='Fail'
|
||||
pf = 'FAIL'
|
||||
logging.info(f"FAIL: Station did not got IP from vlan {i}")
|
||||
table_data.append(
|
||||
[i[item]['alias'], '100', f'{exp1[0]}.{exp1[1]}.X.X', i[item]['ip'], i[item]['mac'],
|
||||
[sta_data[i][item]['alias'], sta_data[i][item], f'{exp1[0]}.{exp1[1]}.X.X', sta_data[i][item]['ip'], sta_data[i][item]['mac'],
|
||||
f'{pf}'])
|
||||
elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta200':
|
||||
exp2=temp['sta200'].split('.')
|
||||
ip2=i[item]['ip'].split('.')
|
||||
elif str(i) == "WAN Upstream":
|
||||
for item in sta_data[i]:
|
||||
exp2 = sta_data[i][item]['ip'].split('.')
|
||||
ip2 = vlan_data[str(i)]['ip'].split('.')
|
||||
if exp2[0] == ip2[0] and exp2[1] == ip2[1]:
|
||||
pf='PASS'
|
||||
pf = 'PASS'
|
||||
logging.info(f"PASS: Station got IP from WAN Upstream")
|
||||
else:
|
||||
pf='FAIL'
|
||||
result1='Fail'
|
||||
pf = 'FAIL'
|
||||
logging.info(f"FAIL: Station did not got IP from WAN Upstream")
|
||||
table_data.append(
|
||||
[i[item]['alias'], '200', f'{exp2[0]}.{exp2[1]}.X.X', i[item]['ip'], i[item]['mac'], f'{pf}'])
|
||||
elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta00':
|
||||
exp3=temp['sta00'].split('.')
|
||||
ip2=i[item]['ip'].split('.')
|
||||
if mode == "BRIDGE":
|
||||
if exp3[0] == ip2[0] and exp3[1] == ip2[1]:
|
||||
pf='PASS'
|
||||
else:
|
||||
pf='FAIL'
|
||||
result1='Fail'
|
||||
table_data.append([i[item]['alias'], 'WAN upstream', f'{exp3[0]}.{exp3[1]}.X.X', i[item]['ip'],
|
||||
i[item]['mac'], f'{pf}'])
|
||||
elif mode == "NAT":
|
||||
if exp3[0] == '192' and exp3[1] == '168':
|
||||
pf='PASS'
|
||||
else:
|
||||
pf='FAIL'
|
||||
result1='Fail'
|
||||
table_data.append(
|
||||
[i[item]['alias'], 'LAN upstream', f'192.168.X.X', i[item]['ip'], i[item]['mac'], f'{pf}'])
|
||||
print(table_data)
|
||||
[sta_data[i][item]['alias'], sta_data[i][item], f'{exp2[0]}.{exp2[1]}.X.X',
|
||||
sta_data[i][item]['ip'], sta_data[i][item]['mac'],
|
||||
f'{pf}'])
|
||||
elif str(i) == "LAN Upstream":
|
||||
for item in sta_data[i]:
|
||||
exp3 = sta_data[i][item]['ip'].split('.')
|
||||
ip3=vlan_data[str(i)]['ip'].split('.')
|
||||
if exp3[0] == '192' and exp3[1] == '168':
|
||||
pf = 'PASS'
|
||||
logging.info(f"PASS: Station got IP from LAN Upstream")
|
||||
else:
|
||||
pf = 'FAIL'
|
||||
logging.info(f"FAIL: Station did not got IP from LAN Upstream")
|
||||
table_data.append(
|
||||
[sta_data[i][item]['alias'], 'LAN upstream', f'192.168.X.X', sta_data[i][item]['ip'], sta_data[i][item]['mac'], f'{pf}'])
|
||||
|
||||
# attach test data in a table to allure
|
||||
report_obj=Report()
|
||||
table_info=report_obj.table2(table=table_data, headers=table_heads)
|
||||
allure.attach(name="Test Validation Check", body=table_info)
|
||||
report_obj = Report()
|
||||
table_info = report_obj.table2(table=table_data, headers=table_heads)
|
||||
logging.info(str("\n") + str(table_info))
|
||||
allure.attach(name="Test Results", body=table_info)
|
||||
return table_data
|
||||
|
||||
def client_connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog",
|
||||
vlan_id=[None], num_sta=None, scan_ssid=True, sta_mode=0,
|
||||
@@ -961,7 +963,7 @@ if __name__ == '__main__':
|
||||
"testbed": "basic",
|
||||
"scenario": "dhcp-bridge",
|
||||
"details": {
|
||||
"manager_ip": "10.28.3.34",
|
||||
"manager_ip": "10.28.3.28",
|
||||
"http_port": 8080,
|
||||
"ssh_port": 22,
|
||||
"setup": {"method": "build", "DB": "Test_Scenario_Automation"},
|
||||
@@ -1014,8 +1016,8 @@ if __name__ == '__main__':
|
||||
# vlan_id=[None], num_sta=65, scan_ssid=True,
|
||||
# station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
|
||||
# allure_attach=True)
|
||||
obj.multi_psk_test(band="twog", mpsk_data=None, ssid="['BLANK']", bssid="['BLANK']", passkey="['BLANK']",
|
||||
encryption="wpa2", vlan_id=None, mode=None, num_sta=1)
|
||||
obj.multi_psk_test(band="twog", mpsk_data=None, ssid="OpenWifi", bssid="['00:00:c1:01:88:12']", passkey="OpenWifi",
|
||||
encryption="wpa", vlan_id=None, mode="BRIDGE", num_sta=1)
|
||||
# obj.add_vlan(vlan_iFds=[100])
|
||||
# obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600])
|
||||
# obj.get_cx_data()
|
||||
|
||||
Reference in New Issue
Block a user