mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-11-02 20:07:57 +00:00
Added Environment Variable Generic Method for tip_2x
Signed-off-by: shivam <shivam.thakur@candelatech.com>
This commit is contained in:
@@ -99,7 +99,7 @@ class APLIBS:
|
||||
|
||||
if output.__contains__("INCOMPLETE") or output.__contains__("FAILED"):
|
||||
logging.error(output)
|
||||
pytest.exit("up0v0 interface is failed to have connectivity!!!")
|
||||
pytest.fail("up0v0 interface is failed to have connectivity!!!")
|
||||
|
||||
def get_uci_show(self, param="ucentral", idx=0, print_log=True, attach_allure=True):
|
||||
output = self.run_generic_command(cmd="uci show " + param, idx=idx,
|
||||
@@ -140,7 +140,7 @@ class APLIBS:
|
||||
ret_val["active"] = data.get("active")
|
||||
return ret_val
|
||||
|
||||
def get_latest_config_recieved(self, idx=0):
|
||||
def get_latest_config_recieved(self, idx=0, print_log=True, attach_allure=True):
|
||||
r_val = self.ubus_call_ucentral_status(idx=idx)
|
||||
latest_json = {}
|
||||
if r_val["latest"] is None:
|
||||
@@ -150,8 +150,8 @@ class APLIBS:
|
||||
return False
|
||||
latest_uuid = r_val["latest"]
|
||||
output = self.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest_uuid), idx=idx,
|
||||
print_log=True,
|
||||
attach_allure=False,
|
||||
print_log=print_log,
|
||||
attach_allure=attach_allure,
|
||||
expected_attachment_type=allure.attachment_type.JSON)
|
||||
try:
|
||||
data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
|
||||
@@ -161,7 +161,7 @@ class APLIBS:
|
||||
try_again = True
|
||||
return latest_json
|
||||
|
||||
def get_active_config(self, idx=0):
|
||||
def get_active_config(self, idx=0, print_log=True, attach_allure=True):
|
||||
r_val = self.ubus_call_ucentral_status(idx=idx)
|
||||
active_json = {}
|
||||
if r_val["active"] is None:
|
||||
@@ -171,8 +171,8 @@ class APLIBS:
|
||||
return False
|
||||
active_uuid = r_val["active"]
|
||||
output = self.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(active_uuid), idx=idx,
|
||||
print_log=True,
|
||||
attach_allure=False,
|
||||
print_log=print_log,
|
||||
attach_allure=attach_allure,
|
||||
expected_attachment_type=allure.attachment_type.JSON)
|
||||
try:
|
||||
data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
|
||||
@@ -182,19 +182,51 @@ class APLIBS:
|
||||
try_again = True
|
||||
return active_json
|
||||
|
||||
def get_iwinfo(self, idx=0):
|
||||
pass
|
||||
def get_iwinfo(self, idx=0, print_log=True, attach_allure=True):
|
||||
|
||||
# [['ssid_wpa2_2g', 'wpa', 'something', '2G'], ['ssid_wpa2_2g', 'wpa', 'something', '5G']] {'wlan0': [
|
||||
# '"ssid_wpa3_p_5g"', '12:34:56:78:90:12', '5G'], 'wlan1': ['"ssid_wpa3_p_2g"','00:03:7F:12:34:56', '5G']}
|
||||
iwinfo_output = self.run_generic_command(cmd="iwinfo", idx=idx,
|
||||
print_log=print_log,
|
||||
attach_allure=attach_allure,
|
||||
expected_attachment_type=allure.attachment_type.TEXT)
|
||||
|
||||
return iwinfo_output
|
||||
|
||||
def get_bssid_band_mapping(self, idx=0):
|
||||
data = self.get_iwinfo(idx=idx)
|
||||
data = str(data).replace(" ", "").split("\n")
|
||||
band_info = []
|
||||
for i in data:
|
||||
tmp = []
|
||||
if i.__contains__("AccessPoint"):
|
||||
bssid = i.replace("AccessPoint:", "")
|
||||
tmp.append(bssid.casefold())
|
||||
elif i.__contains__("MasterChannel"):
|
||||
if i.split(":")[2].__contains__("2.4"):
|
||||
tmp.append("2G")
|
||||
else:
|
||||
tmp.append("5G")
|
||||
else:
|
||||
tmp = []
|
||||
if tmp != []:
|
||||
band_info.append(tmp)
|
||||
bssi_band_mapping = {}
|
||||
for i in range(len(band_info)):
|
||||
if (i % 2) == 0:
|
||||
bssi_band_mapping[band_info[i][0]] = band_info[i + 1][0]
|
||||
return bssi_band_mapping
|
||||
|
||||
def get_ifconfig(self, idx=0):
|
||||
pass
|
||||
|
||||
def verify_certificates(self, idx=0):
|
||||
def verify_certificates(self, idx=0, print_log=False, attach_allure=False):
|
||||
cert_files_name = ["cas.pem", "dev-id", "key.pem", "cert.pem"]
|
||||
for cert in cert_files_name:
|
||||
output = self.run_generic_command(cmd='[ -f /etc/ucentral/' + cert + ' ] && echo "True" || echo "False"',
|
||||
idx=idx,
|
||||
print_log=True,
|
||||
attach_allure=False,
|
||||
print_log=print_log,
|
||||
attach_allure=attach_allure,
|
||||
expected_attachment_type=allure.attachment_type.JSON)
|
||||
if output == "False":
|
||||
logging.error("Certificate " + cert + "is missing from /etc/ucentral/ directory. "
|
||||
@@ -274,6 +306,37 @@ class APLIBS:
|
||||
attach_allure=False,
|
||||
expected_attachment_type=allure.attachment_type.JSON)
|
||||
|
||||
def get_wifi_status(self, idx=0, print_log=True, attach_allure=True):
|
||||
output = self.run_generic_command(cmd="wifi status", idx=idx,
|
||||
print_log=print_log,
|
||||
attach_allure=attach_allure,
|
||||
expected_attachment_type=allure.attachment_type.JSON)
|
||||
|
||||
try_again = False
|
||||
try:
|
||||
data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
|
||||
except Exception as e:
|
||||
logging.error("error in converting the ubus call ucentral status output to json" + output)
|
||||
try_again = True
|
||||
if try_again or len(data.keys()) != 3:
|
||||
output = self.run_generic_command(cmd="wifi status", idx=idx,
|
||||
print_log=print_log,
|
||||
attach_allure=attach_allure,
|
||||
expected_attachment_type=allure.attachment_type.JSON)
|
||||
try:
|
||||
data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
|
||||
except Exception as e:
|
||||
logging.error("error in converting the ubus call ucentral status output to json" + output)
|
||||
ret_val = data
|
||||
return ret_val
|
||||
|
||||
def get_ap_version(self, idx=0, print_log=False, attach_allure=False):
|
||||
output = self.run_generic_command(cmd="cat /tmp/ucentral.version", idx=idx,
|
||||
print_log=print_log,
|
||||
attach_allure=attach_allure,
|
||||
expected_attachment_type=allure.attachment_type.JSON)
|
||||
return output
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
basic_1 = {
|
||||
@@ -284,29 +347,67 @@ if __name__ == '__main__':
|
||||
"password": "OpenWifi%123"
|
||||
},
|
||||
"device_under_tests": [{
|
||||
"model": "edgecore_eap101",
|
||||
"model": "cig_wf188n",
|
||||
"supported_bands": ["2G", "5G"],
|
||||
"supported_modes": ["BRIDGE", "NAT", "VLAN"],
|
||||
"ssid": {
|
||||
"2g-ssid": "OpenWifi",
|
||||
"5g-ssid": "OpenWifi",
|
||||
"6g-ssid": "candela6ghz",
|
||||
"2g-password": "OpenWifi",
|
||||
"5g-password": "OpenWifi",
|
||||
"6g-password": "hello123",
|
||||
"2g-encryption": "WPA2",
|
||||
"5g-encryption": "open",
|
||||
"6g-encryption": "WPA3",
|
||||
"2g-bssid": "68:7d:b4:5f:5c:31 ",
|
||||
"5g-bssid": "68:7d:b4:5f:5c:3c",
|
||||
"6g-bssid": "68:7d:b4:5f:5c:38"
|
||||
},
|
||||
"mode": "wifi6",
|
||||
"identifier": "903cb36c44f0",
|
||||
"method": "serial", # serial/ssh/telnet
|
||||
"host_ip": "192.168.200.101",
|
||||
"identifier": "0000c1018812",
|
||||
"method": "serial",
|
||||
"host_ip": "10.28.3.103",
|
||||
"host_username": "lanforge",
|
||||
"host_password": "Endurance@123", # Endurance@123
|
||||
"host_password": "pumpkin77",
|
||||
"host_ssh_port": 22,
|
||||
"serial_tty": "/dev/ttyUSB0",
|
||||
"serial_tty": "/dev/ttyAP1",
|
||||
"firmware_version": "next-latest"
|
||||
}],
|
||||
"traffic_generator": {}
|
||||
"traffic_generator": {
|
||||
"name": "lanforge",
|
||||
"testbed": "basic",
|
||||
"scenario": "dhcp-bridge",
|
||||
"details": {
|
||||
"manager_ip": "10.28.3.28",
|
||||
"http_port": 8080,
|
||||
"ssh_port": 22,
|
||||
"setup": {"method": "build", "DB": "Test_Scenario_Automation"},
|
||||
"wan_ports": {
|
||||
"1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
|
||||
"lease-first": 10,
|
||||
"lease-count": 10000,
|
||||
"lease-time": "6h"
|
||||
}}
|
||||
},
|
||||
"lan_ports": {
|
||||
|
||||
},
|
||||
"uplink_nat_ports": {
|
||||
"1.1.eth1": {"addressing": "static", "subnet": "10.28.2.16/24", "gateway_ip": "10.28.2.1"}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.NOTSET)
|
||||
obj = APLIBS(dut_data=basic_1["device_under_tests"])
|
||||
obj.check_serial_connection()
|
||||
obj.setup_serial_environment()
|
||||
obj.run_generic_command("uci show ucentral")
|
||||
obj.verify_certificates()
|
||||
obj.get_dut_logs()
|
||||
l = obj.get_latest_config_recieved()
|
||||
a = obj.get_active_config()
|
||||
if a == l:
|
||||
print("a = l")
|
||||
# obj.run_generic_command("uci show ucentral")
|
||||
# obj.verify_certificates()
|
||||
# obj.get_dut_logs()
|
||||
# l = obj.get_latest_config_recieved()
|
||||
# a = obj.get_active_config()
|
||||
# if a == l:
|
||||
# print("a = l")
|
||||
print(obj.get_ap_version())
|
||||
|
||||
@@ -32,7 +32,9 @@ class ConfigureController:
|
||||
self.access_token = ""
|
||||
# self.session = requests.Session()
|
||||
self.login_resp = self.login()
|
||||
self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint()
|
||||
self.gw_host, self.fms_host, \
|
||||
self.prov_host, self.owrrm_host, \
|
||||
self.owanalytics_host, self.owsub_host = self.get_gw_endpoint()
|
||||
if self.gw_host == "" or self.fms_host == "" or self.prov_host == "":
|
||||
time.sleep(60)
|
||||
self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint()
|
||||
@@ -51,7 +53,6 @@ class ConfigureController:
|
||||
return new_uri
|
||||
|
||||
def build_uri(self, path):
|
||||
|
||||
new_uri = 'https://%s:%d/api/v1/%s' % (self.gw_host.hostname, self.gw_host.port, path)
|
||||
return new_uri
|
||||
|
||||
@@ -59,6 +60,18 @@ class ConfigureController:
|
||||
new_uri = 'https://%s:%d/api/v1/%s' % (self.prov_host.hostname, self.prov_host.port, path)
|
||||
return new_uri
|
||||
|
||||
def build_url_owrrm(self, path):
|
||||
new_uri = 'https://%s:%d/api/v1/%s' % (self.owrrm_host.hostname, self.owrrm_host.port, path)
|
||||
return new_uri
|
||||
|
||||
def build_url_owanalytics(self, path):
|
||||
new_uri = 'https://%s:%d/api/v1/%s' % (self.owanalytics_host.hostname, self.owanalytics_host.port, path)
|
||||
return new_uri
|
||||
|
||||
def build_url_owsub(self, path):
|
||||
new_uri = 'https://%s:%d/api/v1/%s' % (self.owsub_host.hostname, self.owsub_host.port, path)
|
||||
return new_uri
|
||||
|
||||
def request(self, service, command, method, params, payload):
|
||||
if service == "sec":
|
||||
uri = self.build_uri_sec(command)
|
||||
@@ -68,6 +81,12 @@ class ConfigureController:
|
||||
uri = self.build_url_fms(command)
|
||||
elif service == "prov":
|
||||
uri = self.build_url_prov(command)
|
||||
elif service == "rrm":
|
||||
uri = self.build_url_owrrm(command)
|
||||
elif service == "analytics":
|
||||
uri = self.build_url_owanalytics(command)
|
||||
elif service == "sub":
|
||||
uri = self.build_url_owsub(command)
|
||||
else:
|
||||
raise NameError("Invalid service code for request.")
|
||||
params = params
|
||||
@@ -131,7 +150,13 @@ class ConfigureController:
|
||||
fms_host = urlparse(service["uri"])
|
||||
if service['type'] == "owprov":
|
||||
prov_host = urlparse(service["uri"])
|
||||
return gw_host, fms_host, prov_host
|
||||
if service['type'] == "owrrm":
|
||||
owrrm_host = urlparse(service["uri"])
|
||||
if service['type'] == "owanalytics":
|
||||
owanalytics_host = urlparse(service["uri"])
|
||||
if service['type'] == "owsub":
|
||||
owsub_host = urlparse(service["uri"])
|
||||
return gw_host, fms_host, prov_host, owrrm_host, owanalytics_host, owsub_host
|
||||
|
||||
def logout(self):
|
||||
uri = self.build_uri_sec('oauth2/%s' % self.access_token)
|
||||
@@ -211,7 +236,7 @@ class Controller(ConfigureController):
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
return resp
|
||||
|
||||
def get_sdk_version(self):
|
||||
def get_sdk_version_gw(self):
|
||||
uri = self.build_uri("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
@@ -226,6 +251,96 @@ class Controller(ConfigureController):
|
||||
version = resp.json()
|
||||
return version['version']
|
||||
|
||||
def get_sdk_version_fms(self):
|
||||
uri = self.build_url_fms("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
allure.attach(name="Sending Command:", body="Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
version = resp.json()
|
||||
return version['version']
|
||||
|
||||
def get_sdk_version_prov(self):
|
||||
uri = self.build_url_prov("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
allure.attach(name="Sending Command:", body="Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
version = resp.json()
|
||||
return version['version']
|
||||
|
||||
def get_sdk_version_owrrm(self):
|
||||
uri = self.build_url_owrrm("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
allure.attach(name="Sending Command:", body="Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
version = resp.json()
|
||||
return version['version']
|
||||
|
||||
def get_sdk_version_ow_analytics(self):
|
||||
uri = self.build_url_owanalytics("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
allure.attach(name="Sending Command:", body="Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
version = resp.json()
|
||||
return version['version']
|
||||
|
||||
def get_sdk_version_owsub(self):
|
||||
uri = self.build_url_owsub("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
allure.attach(name="Sending Command:", body="Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
version = resp.json()
|
||||
return version['version']
|
||||
|
||||
def get_sdk_version_sec(self):
|
||||
uri = self.build_uri_sec("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
allure.attach(name="Sending Command:", body="Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
version = resp.json()
|
||||
return version['version']
|
||||
|
||||
def get_system_gw(self):
|
||||
uri = self.build_uri("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
@@ -268,6 +383,48 @@ class Controller(ConfigureController):
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
return resp
|
||||
|
||||
def get_system_ow_rrm(self):
|
||||
uri = self.build_url_owrrm("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
allure.attach(name="Sending Command:", body="Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
return resp
|
||||
|
||||
def get_system_ow_analytics(self):
|
||||
uri = self.build_url_owanalytics("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
allure.attach(name="Sending Command:", body="Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
return resp
|
||||
|
||||
def get_system_ow_sub(self):
|
||||
uri = self.build_url_owsub("system?command=info")
|
||||
logging.info("Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
allure.attach(name="Sending Command:", body="Sending Command: " + "\n" +
|
||||
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
|
||||
"URI: " + str(uri) + "\n" +
|
||||
"Headers: " + str(self.make_headers()))
|
||||
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
|
||||
self.check_response("GET", resp, self.make_headers(), "", uri)
|
||||
return resp
|
||||
|
||||
def get_device_uuid(self, serial_number):
|
||||
device_info = self.get_device_by_serial_number(serial_number=serial_number)
|
||||
device_info = device_info.json()
|
||||
|
||||
@@ -216,11 +216,17 @@ class tip_2x:
|
||||
profile_object.add_ssid(ssid_data=ssid_data)
|
||||
logging.info(
|
||||
"Configuration That is getting pushed: " + json.dumps(profile_object.base_profile_config, indent=2))
|
||||
r_val = []
|
||||
self.pre_apply_check() # Do check AP before pushing the configuration
|
||||
|
||||
# Setup Config Apply on all AP's
|
||||
ret_val = dict()
|
||||
for i in range(0, len(self.device_under_tests_info)):
|
||||
self.pre_apply_check(idx=i) # Do check AP before pushing the configuration
|
||||
|
||||
# Check the latest uuid
|
||||
r_data = self.dut_library_object.ubus_call_ucentral_status(idx=i, print_log=True, attach_allure=False)
|
||||
uuid_before_apply = r_data["latest"]
|
||||
|
||||
# Apply the Config
|
||||
resp = profile_object.push_config(serial_number=self.device_under_tests_info[i]["identifier"])
|
||||
logging.info("Response for Config apply: " + str(resp.status_code))
|
||||
if resp.status_code != 200:
|
||||
@@ -232,22 +238,57 @@ class tip_2x:
|
||||
if resp.status_code != 200:
|
||||
logging.error("Failed to apply Config, Response code:" + str(resp.status_code))
|
||||
pytest.fail("Failed to apply Config, Response code :" + str(resp.status_code))
|
||||
if resp.status_code == 200:
|
||||
r_val.append(True)
|
||||
|
||||
"""
|
||||
serial connection check
|
||||
ubus call ucentral status
|
||||
save the current uuid and compare with the one before config apply
|
||||
save the active config and compare with the latest apply
|
||||
uci show
|
||||
ifconfig
|
||||
iwinfo
|
||||
wifi status
|
||||
start logger to collect ap logs before config apply
|
||||
Timestamp after doing config apply
|
||||
"""
|
||||
return r_val
|
||||
r_data = self.dut_library_object.ubus_call_ucentral_status(idx=i, print_log=True, attach_allure=False)
|
||||
uuid_after_apply = r_data["latest"]
|
||||
x = 0
|
||||
while uuid_before_apply == uuid_after_apply:
|
||||
time.sleep(10)
|
||||
x += 1
|
||||
logging.info("uuid_before_apply: ", uuid_before_apply)
|
||||
logging.info("uuid_after_apply: ", uuid_after_apply)
|
||||
r_data = self.dut_library_object.ubus_call_ucentral_status(idx=i, print_log=False, attach_allure=False)
|
||||
uuid_after_apply = r_data["latest"]
|
||||
if x == 5:
|
||||
break
|
||||
if uuid_after_apply == uuid_before_apply:
|
||||
logging.error("Config is not received by AP")
|
||||
logging.info("uuid_before_apply: ", uuid_before_apply)
|
||||
logging.info("uuid_after_apply: ", uuid_after_apply)
|
||||
pytest.fail("Config sent from Gateway is not received by AP")
|
||||
self.dut_library_object.get_latest_config_recieved(idx=i, print_log=True, attach_allure=True)
|
||||
|
||||
r_data = self.dut_library_object.ubus_call_ucentral_status(idx=i, print_log=False, attach_allure=False)
|
||||
latest_uuid = r_data["latest"]
|
||||
|
||||
r_data = self.dut_library_object.ubus_call_ucentral_status(idx=i, print_log=False, attach_allure=False)
|
||||
active_uuid = r_data["active"]
|
||||
|
||||
x = 0
|
||||
while latest_uuid == active_uuid:
|
||||
time.sleep(10)
|
||||
x += 1
|
||||
logging.info("active_uuid: " + str(active_uuid))
|
||||
logging.info("latest_uuid: " + str(latest_uuid))
|
||||
r_data = self.dut_library_object.ubus_call_ucentral_status(idx=i, print_log=False, attach_allure=False)
|
||||
active_uuid = r_data["active"]
|
||||
if x == 5:
|
||||
break
|
||||
if latest_uuid != active_uuid:
|
||||
logging.error("Config is not received by AP")
|
||||
logging.info("uuid_before_apply: ", uuid_before_apply)
|
||||
logging.info("uuid_after_apply: ", uuid_after_apply)
|
||||
pytest.fail("Config sent from Gateway is not received by AP")
|
||||
self.dut_library_object.get_active_config(idx=i, print_log=True, attach_allure=True)
|
||||
|
||||
logging.info("Config is Properly Applied on AP, Waiting for 30 Seconds for All interfaces to come up")
|
||||
# wait time interfaces to come up
|
||||
time.sleep(30)
|
||||
|
||||
self.post_apply_check(idx=i) # Do check AP after pushing the configuration
|
||||
ret_val[self.device_under_tests_info[i]["identifier"]] = self.get_applied_ssid_info(idx=i,
|
||||
profile_object=profile_object)
|
||||
return ret_val
|
||||
|
||||
"""
|
||||
setup_special_configuration - Method to configure APs in mesh operating modes with multiple SSID's and multiple AP's
|
||||
@@ -323,11 +364,43 @@ class tip_2x:
|
||||
"""
|
||||
return r_val
|
||||
|
||||
def get_applied_ssid_info(self, profile_object=None, idx=0):
|
||||
if profile_object is None:
|
||||
logging.error("Profile object is None, Unable to fetch ssid info from AP")
|
||||
return None
|
||||
ssid_info_sdk = profile_object.get_ssid_info()
|
||||
ap_wifi_data = self.dut_library_object.get_iwinfo(idx=idx)
|
||||
o = ap_wifi_data.split()
|
||||
iwinfo_bssid_data = {}
|
||||
for i in range(len(o)):
|
||||
if o[i].__contains__("ESSID"):
|
||||
if o[i + 9].__contains__("2.4"):
|
||||
band = "2G"
|
||||
else:
|
||||
band = "5G"
|
||||
iwinfo_bssid_data[o[i - 1]] = [o[i + 1].replace('"', ''), o[i + 4], band]
|
||||
for p in iwinfo_bssid_data:
|
||||
for q in ssid_info_sdk:
|
||||
if iwinfo_bssid_data[p][0] == q[0] and iwinfo_bssid_data[p][2] == q[3]:
|
||||
q.append(iwinfo_bssid_data[p][1])
|
||||
return ssid_info_sdk
|
||||
|
||||
def get_dut_version(self):
|
||||
pass
|
||||
version_info = []
|
||||
for ap in range(len(self.device_under_tests_info)):
|
||||
version_info.append(self.dut_library_object.get_ap_version(idx=ap, print_log=True))
|
||||
return version_info
|
||||
|
||||
def get_controller_version(self):
|
||||
pass
|
||||
version_info = dict()
|
||||
version_info["ow_fms"] = self.controller_library_object.get_sdk_version_fms()
|
||||
version_info["ow_gw"] = self.controller_library_object.get_sdk_version_gw()
|
||||
version_info["ow_sec"] = self.controller_library_object.get_sdk_version_sec()
|
||||
version_info["ow_prov"] = self.controller_library_object.get_sdk_version_prov()
|
||||
version_info["ow_rrm"] = self.controller_library_object.get_sdk_version_owrrm()
|
||||
version_info["ow_analytics"] = self.controller_library_object.get_sdk_version_ow_analytics()
|
||||
version_info["ow_sub"] = self.controller_library_object.get_sdk_version_owsub()
|
||||
return version_info
|
||||
|
||||
# TODO: Get the vlans info such as vlan-ids
|
||||
# Jitendra
|
||||
@@ -338,10 +411,7 @@ class tip_2x:
|
||||
# TODO: Get the wireless info data structure such as (ssid, bssid, passkey, encryption, band, channel)
|
||||
# Jitendra
|
||||
|
||||
def wireless_info(self):
|
||||
pass
|
||||
|
||||
def pre_apply_check(self):
|
||||
def pre_apply_check(self, idx=0):
|
||||
"""
|
||||
serial connection check
|
||||
ubus call ucentral status
|
||||
@@ -352,11 +422,11 @@ class tip_2x:
|
||||
start logger to collect ap logs before config apply
|
||||
Timestamp before doing config apply
|
||||
"""
|
||||
for i in range(0, len(self.device_under_tests_info)):
|
||||
self.dut_library_object.check_serial_connection(idx=i)
|
||||
self.dut_library_object.setup_serial_environment(idx=i)
|
||||
self.dut_library_object.verify_certificates(idx=i)
|
||||
ret_val = self.dut_library_object.ubus_call_ucentral_status(idx=i)
|
||||
|
||||
self.dut_library_object.check_serial_connection(idx=idx)
|
||||
self.dut_library_object.setup_serial_environment(idx=idx)
|
||||
self.dut_library_object.verify_certificates(idx=idx)
|
||||
ret_val = self.dut_library_object.ubus_call_ucentral_status(idx=idx)
|
||||
if not ret_val["connected"] or ret_val["connected"] is None:
|
||||
# TODO: check the connectivity (if it is not connected, then check the lanforge wan port and bring it
|
||||
# up if lanforge eth is in down state. Also check the link state of eth port with ip address
|
||||
@@ -366,6 +436,53 @@ class tip_2x:
|
||||
# Jitendra
|
||||
pytest.fail("AP is in disconnected state from Ucentral gateway!!!")
|
||||
|
||||
def post_apply_check(self, idx=0):
|
||||
"""
|
||||
ubus call ucentral status
|
||||
ifconfig - check if up0v0 has ip address
|
||||
wifi status - check if all phy radios are up
|
||||
"""
|
||||
ret_val = self.dut_library_object.ubus_call_ucentral_status(idx=idx)
|
||||
if not ret_val["connected"] or ret_val["connected"] is None:
|
||||
logging.error(" AP Went to Disconnected State after Applying Config, Checking again after 30 Seconds")
|
||||
time.sleep(30)
|
||||
ret_val = self.dut_library_object.ubus_call_ucentral_status(idx=idx)
|
||||
if not ret_val["connected"] or ret_val["connected"] is None:
|
||||
logging.error("Dang !!!, AP is still in Disconnected State. Your Config Messed up.")
|
||||
logging.error("Failed the post apply check on: " + self.device_under_tests_info[idx]["identifier"])
|
||||
self.dut_library_object.check_connectivity(idx=idx)
|
||||
self.dut_library_object.check_connectivity(idx=idx)
|
||||
r_data = self.dut_library_object.get_wifi_status(idx=idx)
|
||||
logging.info("Checking Wifi Status after Config Apply...")
|
||||
for radio in r_data:
|
||||
if not r_data[radio]["up"]:
|
||||
logging.error(radio + " is in down State...")
|
||||
pytest.fail(radio + " is in down State after config apply")
|
||||
else:
|
||||
logging.info(radio + " is up and running")
|
||||
|
||||
def setup_environment_properties(self, add_allure_environment_property=None):
|
||||
if add_allure_environment_property is None:
|
||||
return
|
||||
add_allure_environment_property('Cloud-Controller-SDK-URL', self.controller_data.get("url"))
|
||||
sdk_version_data = self.get_controller_version()
|
||||
for microservice in sdk_version_data:
|
||||
add_allure_environment_property(microservice + '-version',
|
||||
str(sdk_version_data.get(microservice)))
|
||||
dut_versions = self.get_dut_version()
|
||||
for i in range(len(self.device_under_tests_info)):
|
||||
add_allure_environment_property("Firmware-Version_" + self.device_under_tests_info[i]["identifier"],
|
||||
str(dut_versions[i]))
|
||||
|
||||
for dut in self.device_under_tests_info:
|
||||
models = []
|
||||
identifiers = []
|
||||
models.append(dut["model"])
|
||||
identifiers.append(dut["identifier"])
|
||||
add_allure_environment_property('DUT-Model/s', ", ".join(models))
|
||||
add_allure_environment_property('Serial-Number/s', ", ".join(identifiers))
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
basic_1 = {
|
||||
@@ -380,13 +497,13 @@ if __name__ == '__main__':
|
||||
"supported_bands": ["2G", "5G"],
|
||||
"supported_modes": ["BRIDGE", "NAT", "VLAN"],
|
||||
"mode": "wifi6",
|
||||
"identifier": "c44bd1005b30",
|
||||
"serial_port": True,
|
||||
"host_ip": "10.28.3.100",
|
||||
"identifier": "903cb36c44f0",
|
||||
"method": "serial",
|
||||
"host_ip": "192.168.200.101",
|
||||
"host_username": "lanforge",
|
||||
"host_password": "pumpkin77",
|
||||
"host_password": "Endurance@123",
|
||||
"host_ssh_port": 22,
|
||||
"serial_tty": "/dev/ttyAP8",
|
||||
"serial_tty": "/dev/ttyUSB0",
|
||||
"firmware_version": "next-latest"
|
||||
}],
|
||||
"traffic_generator": {}
|
||||
@@ -405,12 +522,13 @@ if __name__ == '__main__':
|
||||
{"ssid_name": "ssid_wpa_5g_br", "appliedRadios": ["5G"],
|
||||
"security_key": "something"}],
|
||||
"wpa2_personal": [
|
||||
{"ssid_name": "ssid_wpa2_2g_br", "appliedRadios": ["2G"], "security_key": "something"},
|
||||
{"ssid_name": "ssid_wpa2_5g_br", "appliedRadios": ["5G"],
|
||||
"security_key": "something"}]},
|
||||
{"ssid_name": "TestSSID-2G", "appliedRadios": ["2G"], "security_key": "OpenWifi"},
|
||||
{"ssid_name": "TestSSID-5G", "appliedRadios": ["5G"],
|
||||
"security_key": "OpenWifi"}]},
|
||||
"rf": {"2G": {}, "5G": {}, "6G": {}},
|
||||
"radius": False
|
||||
}
|
||||
target = [['2G', 'wpa'], ['5G', 'open'], ['5G', 'wpa']]
|
||||
var.setup_basic_configuration(configuration=setup_params_general, requested_combination=target)
|
||||
target = [['2G', 'wpa2_personal'], ['5G', 'wpa2_personal']]
|
||||
# var.setup_basic_configuration(configuration=setup_params_general, requested_combination=target)
|
||||
var.get_dut_version()
|
||||
var.teardown_objects()
|
||||
|
||||
@@ -361,12 +361,6 @@ def get_device_configuration(device, request):
|
||||
yield PERFECTO_DETAILS[device]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def get_apnos():
|
||||
"""yields the LIBRARY for APNOS, Reduces the use of imports across files"""
|
||||
yield APNOS
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def get_equipment_ref(request, setup_controller, testbed, get_configuration):
|
||||
""""""
|
||||
|
||||
@@ -732,7 +732,7 @@ class Fixtures_2x:
|
||||
connected, latest, active = ap_ssh.get_ucentral_status()
|
||||
if x == 5:
|
||||
break
|
||||
onnected, latest, active = ap_ssh.get_ucentral_status()
|
||||
connected, latest, active = ap_ssh.get_ucentral_status()
|
||||
if latest == latest_old:
|
||||
latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest))
|
||||
allure.attach(name="Latest Config Received by AP: ",
|
||||
|
||||
Reference in New Issue
Block a user