multipsk test added to lf_tests

Signed-off-by: anil-tegala <anil.tegala@candelatech.com>
This commit is contained in:
anil-tegala
2022-09-01 12:37:16 +05:30
parent e11e933fda
commit 0225bed576

View File

@@ -302,8 +302,102 @@ class lf_tests(lf_libs):
def multiband_performance_test(self):
pass
def multi_psk_test(self):
pass
def multi_psk_test(self, band="twog", mpsk_data=None, ssid="'OpenWifi'", bssid="['BLANK']", passkey="OpenWifi",
encryption="wpa", vlan_id=None, mode="BRIDGE", num_sta=1, dut_data=None):
if mpsk_data is None:
mpsk_data={100: {"num_stations": num_sta, "passkey": "OpenWifi1"}, 200: {"num_stations": num_sta, "passkey": "OpenWifi2"}}
if dut_data is None:
dut_data = self.dut_data
# setup_data = self.setup_interfaces(ssid="OpenWifi", bssid="", passkey="OpenWifi", encryption="wpa",
# vlan_id=list(mpsk_data.keys()), mode="VLAN", band="twog", num_sta=1)
logging.info("Creating VLAN's")
# create VLAN's
self.add_vlan(vlan_ids=list(mpsk_data.keys()))
# query vlan and fetch Ip Address
port_data=self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces']
# Fail if Vlan don't have IP
for i in port_data:
for item in i:
if i[item]['port type'] == '802.1Q VLAN' and i[item]['ip'] == '0.0.0.0':
logging.error(f"VLAN Interface - {i[item]['alias']} do not have IP")
pytest.fail("VLAN don't have IP")
break
else:
pass
#create stations
for key in range(len(list(mpsk_data.keys()))):
passkey = mpsk_data[key]["passkey"]
self.client_connect(ssid=ssid, passkey=passkey, security=encryption, mode=mode, band=band,
vlan_id=[vlan_id], num_sta=num_sta, scan_ssid=True,
station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
allure_attach=True)
# check Pass/Fail
res_data=self.json_get(_req_url='port?fields=alias,port+type,ip,mac', )['interfaces']
table_heads=["station name", "configured vlan-id", "expected IP Range", "allocated IP", "mac address",
'pass/fail']
table_data=[]
temp={'sta100': '', 'sta200': '', 'sta00': ''}
for i in res_data:
for item in i:
if i[item]['port type'] == 'Ethernet' and i[item]['alias'] == self.upstream_port:
if mode == 'NAT':
temp.update({'sta00': '192.168.1.1'})
else:
temp.update({'sta00': i[item]['ip']})
if i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port + ".100"):
temp.update({'sta100': i[item]['ip']})
elif i[item]['port type'] == '802.1Q VLAN' and i[item]['alias'] == str(self.upstream_port + ".200"):
temp.update({'sta200': i[item]['ip']})
for i in res_data:
for item in i:
if i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == "sta100":
exp1=temp['sta100'].split('.')
ip1=i[item]['ip'].split('.')
if exp1[0] == ip1[0] and exp1[1] == ip1[1]:
pf='PASS'
else:
pf='FAIL'
result1='Fail'
table_data.append(
[i[item]['alias'], '100', f'{exp1[0]}.{exp1[1]}.X.X', i[item]['ip'], i[item]['mac'],
f'{pf}'])
elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta200':
exp2=temp['sta200'].split('.')
ip2=i[item]['ip'].split('.')
if exp2[0] == ip2[0] and exp2[1] == ip2[1]:
pf='PASS'
else:
pf='FAIL'
result1='Fail'
table_data.append(
[i[item]['alias'], '200', f'{exp2[0]}.{exp2[1]}.X.X', i[item]['ip'], i[item]['mac'], f'{pf}'])
elif i[item]['port type'] == 'WIFI-STA' and i[item]['alias'] == 'sta00':
exp3=temp['sta00'].split('.')
ip2=i[item]['ip'].split('.')
if mode == "BRIDGE":
if exp3[0] == ip2[0] and exp3[1] == ip2[1]:
pf='PASS'
else:
pf='FAIL'
result1='Fail'
table_data.append([i[item]['alias'], 'WAN upstream', f'{exp3[0]}.{exp3[1]}.X.X', i[item]['ip'],
i[item]['mac'], f'{pf}'])
elif mode == "NAT":
if exp3[0] == '192' and exp3[1] == '168':
pf='PASS'
else:
pf='FAIL'
result1='Fail'
table_data.append(
[i[item]['alias'], 'LAN upstream', f'192.168.X.X', i[item]['ip'], i[item]['mac'], f'{pf}'])
print(table_data)
# attach test data in a table to allure
report_obj=Report()
table_info=report_obj.table2(table=table_data, headers=table_heads)
allure.attach(name="Test Validation Check", body=table_info)
logging.info(str("\n") + str(table_info))
def client_connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog",
vlan_id=[None], num_sta=None, scan_ssid=True, sta_mode=0,
@@ -318,8 +412,8 @@ class lf_tests(lf_libs):
all_identifier_list.append(dut["identifier"])
print(all_identifier_list)
if identifier not in all_identifier_list:
logging.error("Identifier is missinhg")
pytest.fail("Identifier is missinhg")
logging.error("Identifier is missing")
pytest.fail("Identifier is missing")
data = self.setup_interfaces(ssid=ssid, passkey=passkey, encryption=security,
band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta)
@@ -867,7 +961,7 @@ if __name__ == '__main__':
"testbed": "basic",
"scenario": "dhcp-bridge",
"details": {
"manager_ip": "10.28.3.12",
"manager_ip": "10.28.3.34",
"http_port": 8080,
"ssh_port": 22,
"setup": {"method": "build", "DB": "Test_Scenario_Automation"},
@@ -897,3 +991,60 @@ if __name__ == '__main__':
obj = lf_tests(lf_data=dict(basic_04["traffic_generator"]), dut_data=list(basic_04["device_under_tests"]),
log_level=logging.DEBUG, run_lf=False)
# obj.add_stations()
# obj.add_stations(band="5G")
# obj.chamber_view(raw_lines="custom")
# dut = {'0000c1018812': {"ssid_data": {
# 0: {"ssid": 'TestSSID-2G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '2G',
# "bssid": '00:00:C1:01:88:15'},
# 1: {"ssid": 'TestSSID-5G', "encryption": 'wpa2', "password": 'OpenWifi', "band": '5G',
# "bssid": '00:00:C1:01:88:14'}}, "radio_data": {'2G': [1, 40, 2422], '5G': [36, 80, 5210], '6G': None}}}
# obj.wifi_capacity(instance_name="test_client_wpa2_BRIDGE_udp_bi", mode="BRIDGE",
# vlan_id=[100],
# download_rate="1Gbps", batch_size="1,5,10,20,40,64,128,256",
# influx_tags="Jitu",
# upload_rate="1Gbps", protocol="UDP-IPv4", duration="60000",
# move_to_influx=False, dut_data=dut, ssid_name="OpenWifi",
# num_stations={"2G": 10, "5G": 10})
# A =obj.setup_interfaces(band="fiveg", vlan_id=100, mode="NAT-WAN", num_sta=1)
# print(A)
# obj.setup_relevent_profiles()
# obj.client_connect(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", mode="BRIDGE", band="twog",
# vlan_id=[None], num_sta=65, scan_ssid=True,
# station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
# allure_attach=True)
obj.multi_psk_test(band="twog", mpsk_data=None, ssid="['BLANK']", bssid="['BLANK']", passkey="['BLANK']",
encryption="wpa2", vlan_id=None, mode=None, num_sta=1)
# obj.add_vlan(vlan_iFds=[100])
# obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600])
# obj.get_cx_data()
# obj.chamber_view()
# dut = {'0000c1018812': [['ssid_open_2g_nat', 'open', 'something', '2G', '6A:21:5F:DA:45:6F'],
# {'2G': [6, 40, 2437], '5G': None, '6G': None}]}
# passes, result = obj.client_connectivity_test(ssid="ssid_open_2g_nat", passkey="something", security="open",
# extra_securities=[],
# num_sta=1, mode="NAT-WAN", dut_data=dut,
# band="twog")
# print(passes == "PASS", result)
# # obj.start_sniffer(radio_channel=1, radio="wiphy7", test_name="sniff_radio", duration=30)
# print("started")
# time.sleep(30)
# obj.stop_sniffer()
# lf_report.pull_reports(hostname="10.28.3.28", port=22, username="lanforge",
# password="lanforge",
# report_location="/home/lanforge/" + "sniff_radio.pcap",
# report_dir=".")
# def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60):
#
# obj.get_cx_data()
# obj.chamber_view()
# obj.client_connectivity_test(ssid="wpa2_5g", passkey="something", security="wpa2", extra_securities=[],
# num_sta=1, mode="BRIDGE", vlan_id=1,
# # band="fiveg", ssid_channel=36)
# obj.chamber_view()
# obj.setup_relevent_profiles()
# obj.add_vlan(vlan_ids=[100, 200, 300])
# # obj.chamber_view()
# obj.setup_relevent_profiles()