Resolved merge conflicts

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2022-08-24 14:10:12 +05:30
5 changed files with 90 additions and 41 deletions

View File

@@ -1,5 +1,6 @@
import csv
import importlib
import json
import logging
import os
import sys
@@ -59,10 +60,17 @@ class lf_tests(lf_libs):
security="open", extra_securities=[], sta_mode=0,
num_sta=1, mode="BRIDGE", vlan_id=[None], band="twog",
allure_attach=True, runtime_secs=40):
logging.info("DUT Data:\n" + json.dumps(str(dut_data), indent=2))
allure.attach(name="DUT Data:\n", body=json.dumps(str(dut_data), indent=2),
attachment_type=allure.attachment_type.JSON)
data = self.setup_interfaces(ssid=ssid, bssid=bssid, passkey=passkey, encryption=security,
band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta)
logging.info("Setup interface data" + str(data))
logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2))
allure.attach(name="Interface Info: \n", body=json.dumps(str(data), indent=2),
attachment_type=allure.attachment_type.JSON)
if data == {}:
pytest.skip("Skipping This Test")
sta_connect_obj = []
@@ -100,14 +108,13 @@ class lf_tests(lf_libs):
logging.info("ssid scan data : " + str(result))
if not result:
# Sniffer required
# print("sniff radio", data["sniff_radio"].split(".")[2])
for duts in self.dut_data:
identifier = duts["identifier"]
if dut_data.keys().__contains__(identifier):
if band == "twog":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)[-1])["2G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["2G"][0]
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"]
if data[dut]["sniff_radio_2g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_2g"].split(".")[2],
@@ -115,9 +122,9 @@ class lf_tests(lf_libs):
time.sleep(10)
self.stop_sniffer()
elif band == "fiveg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)[-1])["5G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["5G"][0]
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)["radio_data"])["5G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"]
if data[dut]["sniff_radio_5g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_5g"].split(".")[2],
@@ -125,9 +132,9 @@ class lf_tests(lf_libs):
time.sleep(10)
self.stop_sniffer()
elif band == "sixg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)[-1])["6G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["6G"][0]
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)["radio_data"])["6G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]
if data[dut]["sniff_radio_6g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_6g"].split(".")[2],
@@ -142,9 +149,9 @@ class lf_tests(lf_libs):
identifier = dut_["identifier"]
if dut_data.keys().__contains__(identifier):
if band == "twog":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)[-1])["2G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["2G"][0]
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)["radio_data"])["2G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["2G"]["channel"]
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_2g"].split(".")[2],
duration=runtime_secs)
logging.info("started-sniffer")
@@ -155,9 +162,9 @@ class lf_tests(lf_libs):
logging.info("stopping-sniffer")
self.stop_sniffer()
elif band == "fiveg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)[-1])["5G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["5G"][0]
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)["radio_data"])["5G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"]
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_5g"].split(".")[2],
duration=runtime_secs)
for obj in sta_connect_obj:
@@ -166,9 +173,9 @@ class lf_tests(lf_libs):
time.sleep(runtime_secs)
self.stop_sniffer()
elif band == "sixg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)[-1])["6G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["6G"][0]
if dict(dut_data.get(identifier)["radio_data"]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)["radio_data"])["6G"] is not None:
channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]
self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_6g"].split(".")[2],
duration=runtime_secs)
for obj in sta_connect_obj:
@@ -178,9 +185,8 @@ class lf_tests(lf_libs):
self.stop_sniffer()
else:
for obj in sta_connect_obj:
print(obj)
obj.start()
print("napping %f sec" % runtime_secs)
logging.info("napping %f sec" % runtime_secs)
time.sleep(runtime_secs)
pass_fail_result = []
for obj in sta_connect_obj:
@@ -266,7 +272,12 @@ class lf_tests(lf_libs):
else:
logging.info("client connection to" + str(obj.dut_ssid) + "unsuccessful. Test Failed")
result = "FAIL"
for obj in sta_connect_obj:
try:
print("1." + str(obj.resource) + "." + str(obj.radio))
self.get_supplicant_logs(radio=str(obj.radio))
except Exception as e:
logging.error("client_cpnnectivity_tests() -- Error in getting Supplicant Logs:" + str(e))
result = "PASS"
description = ""
for i in pass_fail_result:
@@ -324,7 +335,6 @@ class lf_tests(lf_libs):
for obj in client_connect_obj:
obj.build()
result = obj.wait_for_ip(station_list=obj.sta_list, timeout_sec=50)
# print(self.client_connect.wait_for_ip(station_name))
pass_fail.append(result)
station_data_ = self.get_station_data(sta_name=obj.sta_list, rows=station_data,
allure_attach=False)
@@ -841,11 +851,11 @@ if __name__ == '__main__':
"password": "OpenWifi%123"
},
"device_under_tests": [{
"model": "edgecore_ecw5211",
"supported_bands": ["2G", "5G"],
"model": "cig_wf196",
"supported_bands": ["2G", "5G", "6G"],
"supported_modes": ["BRIDGE", "NAT", "VLAN"],
"wan_port": "1.1.eth2",
"lan_port": "1.1.eth1",
"wan_port": "1.3.eth2",
"lan_port": None,
"ssid": {
"2g-ssid": "OpenWifi",
"5g-ssid": "OpenWifi",
@@ -853,15 +863,15 @@ if __name__ == '__main__':
"2g-password": "OpenWifi",
"5g-password": "OpenWifi",
"6g-password": "OpenWifi",
"2g-encryption": "OPEN",
"5g-encryption": "OPEN",
"6g-encryption": "OPEN",
"2g-encryption": "WPA2",
"5g-encryption": "WPA2",
"6g-encryption": "WPA3",
"2g-bssid": "68:7d:b4:5f:5c:31",
"5g-bssid": "68:7d:b4:5f:5c:3c",
"6g-bssid": "68:7d:b4:5f:5c:38"
},
"mode": "wifi5",
"identifier": "68215fda456d",
"mode": "wifi6e",
"identifier": "824f816011e4",
"method": "serial",
"host_ip": "localhost",
"host_username": "lanforge",
@@ -880,7 +890,7 @@ if __name__ == '__main__':
"ssh_port": 22,
"setup": {"method": "build", "DB": "Test_Scenario_Automation"},
"wan_ports": {
"1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
"1.3.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
@@ -888,9 +898,7 @@ if __name__ == '__main__':
}
},
"lan_ports": {
"1.1.eth1": {
"addressing": "dynamic"
}
},
"uplink_nat_ports": {
"1.1.eth3": {
@@ -921,7 +929,7 @@ if __name__ == '__main__':
# vlan_id=100, num_sta=5, scan_ssid=True,
# station_data=["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"],
# allure_attach=True)
# obj.add_vlan(vlan_ids=[100])
# obj.add_vlan(vlan_iFds=[100])
# obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600])
# obj.get_cx_data()
# obj.chamber_view()