Files
wlan-testing/libs/controller/controller_2x/controller.py
Haricharan Jaka 837ba81d68 Added system info test case of Provision UI (#474)
* Added system info test case of Provision UI

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added url of Provision UI in allure

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added Prov UI test cases for creating device in Inventory

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Repalced = with : in allure of Prov UI url

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* changed the test case name

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Changed the payload parameters to match Prov UI

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added configuration in Prov class

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Removed the class of ProvUtils

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added the prov UI api testcases for adding and deleting a device

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Removed the configuration file

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added prov ui fixture

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added Prov UI controller object and fixture

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added prov controller obj

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Changed the allure report name for some attchments

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>
2022-04-25 21:28:54 +05:30

717 lines
27 KiB
Python

"""
Base Library for Ucentral
"""
import datetime
import json
import ssl
import sys
import time
from urllib.parse import urlparse
import pytest
import allure
import requests
from operator import itemgetter
from pathlib import Path
from requests.adapters import HTTPAdapter
import logging
# logging.basicConfig(level=logging.DEBUG)
# from http.client import HTTPConnection
#
# HTTPConnection.debuglevel = 1
# requests.logging.getLogger()
class ConfigureController:
def __init__(self, controller_data):
self.username = controller_data["username"]
self.password = controller_data["password"]
self.host = urlparse(controller_data["url"])
print(self.host)
self.access_token = ""
# self.session = requests.Session()
self.login_resp = self.login()
self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint()
if self.gw_host == "" or self.fms_host == "" or self.prov_host == "":
time.sleep(60)
self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint()
if self.gw_host == "" or self.fms_host == "" or self.prov_host == "":
self.logout()
print(self.gw_host, self.fms_host, self.prov_host)
pytest.exit("All Endpoints not available in Controller Service")
sys.exit()
def build_uri_sec(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path)
print(new_uri)
return new_uri
def build_url_fms(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.fms_host.hostname, self.fms_host.port, path)
print(new_uri)
return new_uri
def build_uri(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.gw_host.hostname, self.gw_host.port, path)
print(new_uri)
return new_uri
def request(self, service, command, method, params, payload):
if service == "sec":
uri = self.build_uri_sec(command)
elif service == "gw":
uri = self.build_uri(command)
elif service == "fms":
uri = self.build_url_fms(command)
elif service == "prov":
uri = self.build_url_prov(command)
else:
raise NameError("Invalid service code for request.")
print(uri)
params = params
if method == "GET":
resp = requests.get(uri, headers=self.make_headers(), params=params, verify=False, timeout=100)
elif method == "POST":
print(uri, payload, params)
resp = requests.post(uri, params=params, data=payload, headers=self.make_headers(), verify=False,
timeout=100)
elif method == "PUT":
resp = requests.put(uri, params=params, data=payload, verify=False, timeout=100)
elif method == "DELETE":
resp = requests.delete(uri, headers=self.make_headers(), params=params, verify=False, timeout=100)
self.check_response(method, resp, self.make_headers(), payload, uri)
print(resp)
return resp
def login(self):
uri = self.build_uri_sec("oauth2")
# self.session.mount(uri, HTTPAdapter(max_retries=15))
payload = json.dumps({"userId": self.username, "password": self.password})
resp = requests.post(uri, data=payload, verify=False, timeout=100)
self.check_response("POST", resp, "", payload, uri)
token = resp.json()
self.access_token = token["access_token"]
print(token)
if resp.status_code != 200:
print(resp.status_code)
print(resp.json())
pytest.exit(str(resp.json()))
# self.session.headers.update({'Authorization': self.access_token})
return resp
def get_gw_endpoint(self):
uri = self.build_uri_sec("systemEndpoints")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("GET", resp, self.make_headers(), "", uri)
services = resp.json()
print(services)
gw_host = ""
fms_host = ""
for service in services['endpoints']:
if service['type'] == "owgw":
gw_host = urlparse(service["uri"])
if service['type'] == "owfms":
fms_host = urlparse(service["uri"])
if service['type'] == "owprov":
prov_host = urlparse(service["uri"])
return gw_host, fms_host, prov_host
def logout(self):
uri = self.build_uri_sec('oauth2/%s' % self.access_token)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
print('Logged out:', resp.status_code)
return resp
def make_headers(self):
headers = {'Authorization': 'Bearer %s' % self.access_token,
"Connection": "keep-alive",
"Keep-Alive": "timeout=10, max=1000"
}
return headers
def check_response(self, cmd, response, headers, data_str, url):
if response.status_code >= 400:
if response.status_code >= 400:
print("check-response: ERROR, url: ", url)
else:
print("check-response: url: ", url)
print("Command: ", cmd)
print("response-status: ", response.status_code)
print("response-headers: ", response.headers)
print("response-content: ", response.content)
print("headers: ", headers)
print("data-str: ", data_str)
if response.status_code >= 400:
# if True:
raise NameError("Invalid response code.")
return True
class Controller(ConfigureController):
def __init__(self, controller_data=None):
super().__init__(controller_data)
def get_devices(self):
uri = self.build_uri("devices/")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
devices = resp.json()
# resp.close()()
return devices
def get_device_by_serial_number(self, serial_number=None):
uri = self.build_uri("device/" + serial_number)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
device = resp.json()
# resp.close()()
return device
def get_sdk_version(self):
uri = self.build_uri("system/?command=info")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
version = resp.json()
print(version)
# resp.close()()
return version['version']
def get_system_gw(self):
uri = self.build_uri("system/?command=info")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_system_fms(self):
uri = self.build_url_fms("system/?command=info")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_device_uuid(self, serial_number):
device_info = self.get_device_by_serial_number(serial_number=serial_number)
return device_info["UUID"]
class FMSUtils:
def __init__(self, sdk_client=None, controller_data=None):
if sdk_client is None:
self.sdk_client = Controller(controller_data=controller_data)
self.sdk_client = sdk_client
def upgrade_firmware(self, serial="", url=""):
response = self.sdk_client.request(service="gw", command="device/" + serial + "/upgrade",
method="POST", params="serialNumber=" + serial,
payload="{ \"serialNumber\" : " + "\"" + serial + "\"" +
" , \"uri\" : " + "\"" + url + "\"" +
", \"when\" : 0" + " }")
print(response.json())
allure.attach(name="REST - firmware upgrade response: ",
body=str(response.status_code) + "\n" +
str(response.json()) + "\n"
)
print(response)
def ap_model_lookup(self, model=""):
devices = self.get_device_set()
model_name = ""
for device in devices['deviceTypes']:
if str(device).__eq__(model):
model_name = device
return model_name
def get_revisions(self):
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET", params="revisionSet=true",
payload="")
if response.status_code == 200:
return response.json()
else:
return {}
def get_latest_fw(self, model=""):
device_type = self.ap_model_lookup(model=model)
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET",
params="latestOnly=true&deviceType=" + device_type,
payload="")
if response.status_code == 200:
return response.json()
else:
return {}
def get_device_set(self):
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET", params="deviceSet=true",
payload="")
if response.status_code == 200:
return response.json()
else:
return {}
def get_firmwares(self, limit="10000", model="", latestonly="", branch="", commit_id="", offset="3000"):
deviceType = self.ap_model_lookup(model=model)
params = "limit=" + limit + \
"&deviceType=" + deviceType + \
"&latestonly=" + latestonly + \
"offset=" + offset
command = "firmwares/"
response = self.sdk_client.request(service="fms", command=command, method="GET", params=params, payload="")
allure.attach(name=command + params,
body=str(response.status_code) + "\n" + str(response.json()),
attachment_type=allure.attachment_type.JSON)
if response.status_code == 200:
data = response.json()
newlist = sorted(data['firmwares'], key=itemgetter('created'))
# for i in newlist:
# print(i['uri'])
# print(i['revision'])
# print(newlist)
return newlist
# print(data)
return "error"
class ProvUtils(ConfigureController):
def __init__(self, controller_data=None):
super().__init__(controller_data)
def build_url_prov(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.prov_host.hostname, self.prov_host.port, path)
print(new_uri)
return new_uri
def get_inventory(self):
uri = self.build_url_prov("inventory")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_inventory_by_device(self, device_name):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_system_prov(self):
uri = self.build_url_prov("system/?command=info")
allure.attach(name="Url of Prov UI:", body=str(uri))
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def add_device_to_inventory(self, device_name, payload):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_device_from_inventory(self, device_name):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
class UProfileUtility:
def __init__(self, sdk_client=None, controller_data=None):
if sdk_client is None:
self.sdk_client = Controller(controller_data=controller_data)
self.sdk_client = sdk_client
self.base_profile_config = {
"uuid": 1,
"radios": [],
"interfaces": [{
"name": "WAN",
"role": "upstream",
"services": ["ssh", "lldp", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
},
{
"name": "LAN",
"role": "downstream",
"services": ["ssh", "lldp", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"LAN*"
]
}
],
"ipv4": {
"addressing": "static",
"subnet": "192.168.1.1/16",
"dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
}
},
}],
"metrics": {
"statistics": {
"interval": 60,
"types": ["ssids", "lldp", "clients"]
},
"health": {
"interval": 120
},
"wifi-frames": {
"filters": ["probe",
"auth",
"assoc",
"disassoc",
"deauth",
"local-deauth",
"inactive-deauth",
"key-mismatch",
"beacon-report",
"radar-detected"]
},
"dhcp-snooping": {
"filters": ["ack", "discover", "offer", "request", "solicit", "reply", "renew"]
}
},
"services": {
"lldp": {
"describe": "TIP OpenWiFi",
"location": "QA"
},
"ssh": {
"port": 22
}
}
}
self.vlan_section = {
"name": "WAN100",
"role": "upstream",
"vlan": {
"id": 100
},
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
self.mode = None
def set_mesh_services(self):
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24"
self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100
del self.base_profile_config['metrics']['wifi-frames']
del self.base_profile_config['metrics']['dhcp-snooping']
var = {
"filters": ["probe",
"auth"]
}
self.base_profile_config["metrics"]['wifi-frames'] = var
del self.base_profile_config['services']
var2 = {
"lldp": {
"describe": "uCentral",
"location": "universe"
},
"ssh": {
"port": 22
}
}
self.base_profile_config['services'] = var2
def set_express_wifi(self, open_flow=None):
if self.mode == "NAT":
self.base_profile_config["interfaces"][1]["services"] = ["ssh", "lldp", "open-flow"]
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24"
self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100
self.base_profile_config['services']["open-flow"] = open_flow
self.base_profile_config['services']['lldp']['describe'] = "OpenWiFi - expressWiFi"
self.base_profile_config['services']['lldp']['location'] = "Hotspot"
def set_captive_portal(self):
if self.mode == "NAT":
max_client = {
"max-clients": 32
}
# sourceFile = open('captive_config.py', 'w')
self.base_profile_config["interfaces"][1]["name"] = "captive"
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.2.1/24"
self.base_profile_config["interfaces"][1]["captive"] = max_client
del self.base_profile_config["interfaces"][1]["ethernet"]
del self.base_profile_config["interfaces"][1]["services"]
del self.base_profile_config["metrics"]["wifi-frames"]
del self.base_profile_config["metrics"]["dhcp-snooping"]
# print(self.base_profile_config)
# print(self.base_profile_config, file=sourceFile)
# sourceFile.close()
def encryption_lookup(self, encryption="psk"):
encryption_mapping = {
"none": "open",
"psk": "wpa",
"psk2": "wpa2",
"sae": "wpa3",
"psk-mixed": "wpa|wpa2",
"sae-mixed": "wpa3",
"wpa": 'wap',
"wpa2": "eap",
"wpa3": "eap",
"wpa-mixed": "eap",
"wpa3-mixed": "sae"
}
if encryption in encryption_mapping.keys():
return encryption_mapping[encryption]
else:
return False
def get_ssid_info(self):
ssid_info = []
for interfaces in self.base_profile_config["interfaces"]:
if "ssids" in interfaces.keys():
for ssid_data in interfaces["ssids"]:
for band in ssid_data["wifi-bands"]:
temp = [ssid_data["name"]]
if ssid_data["encryption"]["proto"] == "none" or "radius" in ssid_data.keys():
temp.append(self.encryption_lookup(encryption=ssid_data["encryption"]["proto"]))
temp.append('[BLANK]')
else:
temp.append(self.encryption_lookup(encryption=ssid_data["encryption"]["proto"]))
temp.append(ssid_data["encryption"]["key"])
temp.append(band)
ssid_info.append(temp)
return ssid_info
def set_radio_config(self, radio_config={}):
base_radio_config_2g = {
"band": "2G",
"country": "CA",
"channel-mode": "HE",
"channel": "auto"
}
base_radio_config_5g = {
"band": "5G",
"country": "CA",
"allow-dfs": True,
"channel-mode": "HE",
"channel": "auto"
}
for band in radio_config:
if band == "2G":
for keys in radio_config[band]:
base_radio_config_2g[keys] = radio_config[band][keys]
if band == "5G":
for keys in radio_config[band]:
base_radio_config_5g[keys] = radio_config[band][keys]
# if band == "6G":
# for keys in radio_config[band]:
# base_radio_config_6g[keys] = radio_config[band][keys]
self.base_profile_config["radios"].append(base_radio_config_2g)
self.base_profile_config["radios"].append(base_radio_config_5g)
print(self.base_profile_config)
self.vlan_section["ssids"] = []
self.vlan_ids = []
def set_mode(self, mode, mesh=False):
self.mode = mode
if mode == "NAT":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
self.base_profile_config['interfaces'][1]['ssids'] = []
elif mode == "BRIDGE":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
self.base_profile_config['interfaces'][0]['ssids'] = []
elif mode == "VLAN":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
del self.base_profile_config['interfaces'][1]
self.base_profile_config['interfaces'][0]['ssids'] = []
self.base_profile_config['interfaces'] = []
wan_section_vlan = {
"name": "WAN",
"role": "upstream",
"services": ["lldp", "ssh", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
self.base_profile_config['interfaces'].append(wan_section_vlan)
else:
print("Invalid Mode")
return 0
def add_ssid(self, ssid_data, radius=False, radius_auth_data={}, radius_accounting_data={}):
print("ssid data : ", ssid_data)
ssid_info = {'name': ssid_data["ssid_name"], "bss-mode": "ap", "wifi-bands": [], "services": ["wifi-frames"]}
for options in ssid_data:
if options == "multi-psk":
ssid_info[options] = ssid_data[options]
print("hi", ssid_info)
if options == "rate-limit":
ssid_info[options] = ssid_data[options]
for i in ssid_data["appliedRadios"]:
ssid_info["wifi-bands"].append(i)
ssid_info['encryption'] = {}
ssid_info['encryption']['proto'] = ssid_data["security"]
try:
ssid_info['encryption']['key'] = ssid_data["security_key"]
except Exception as e:
pass
ssid_info['encryption']['ieee80211w'] = "optional"
if radius:
ssid_info["radius"] = {}
ssid_info["radius"]["authentication"] = {
"host": radius_auth_data["ip"],
"port": radius_auth_data["port"],
"secret": radius_auth_data["secret"]
}
ssid_info["radius"]["accounting"] = {
"host": radius_accounting_data["ip"],
"port": radius_accounting_data["port"],
"secret": radius_accounting_data["secret"]
}
if self.mode == "NAT":
self.base_profile_config['interfaces'][1]['ssids'].append(ssid_info)
elif self.mode == "BRIDGE":
self.base_profile_config['interfaces'][0]['ssids'].append(ssid_info)
elif self.mode == "VLAN":
vid = ssid_data["vlan"]
self.vlan_section = {
"name": "WAN100",
"role": "upstream",
"services": ["lldp", "dhcp-snooping"],
"vlan": {
"id": 100
},
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
vlan_section = self.vlan_section
if vid in self.vlan_ids:
print("sss", self.vlan_ids)
for i in self.base_profile_config['interfaces']:
if i["name"] == "WANv%s" % (vid):
i["ssids"].append(ssid_info)
else:
print(self.vlan_ids)
self.vlan_ids.append(vid)
vlan_section['name'] = "WANv%s" % (vid)
vlan_section['vlan']['id'] = int(vid)
vlan_section["ssids"] = []
vlan_section["ssids"].append(ssid_info)
self.base_profile_config['interfaces'].append(vlan_section)
print(vlan_section)
vsection = 0
else:
print("invalid mode")
pytest.exit("invalid Operating Mode")
def push_config(self, serial_number):
payload = {"configuration": self.base_profile_config, "serialNumber": serial_number, "UUID": 1}
uri = self.sdk_client.build_uri("device/" + serial_number + "/configure")
basic_cfg_str = json.dumps(payload)
allure.attach(name="ucentral_config: ",
body=str(basic_cfg_str),
attachment_type=allure.attachment_type.JSON)
print("JSON Post Configure: " + str(basic_cfg_str))
print("Sending Configure Command: ", datetime.datetime.utcnow())
resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(),
verify=False, timeout=100)
print(resp.json())
print(resp.status_code)
allure.attach(name="/configure response: " + str(resp.status_code), body=str(resp.json()),
attachment_type=allure.attachment_type.JSON)
self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri)
# print(resp.url)
resp.close()
print(resp)
if __name__ == '__main__':
controller = {
'url': 'https://sec-qa01.cicd.lab.wlan.tip.build:16001', # API base url for the controller
'username': "tip@ucentral.com",
'password': 'OpenWifi%123',
}
obj = Controller(controller_data=controller)
up = UProfileUtility(sdk_client=obj, controller_data=controller)
up.set_mode(mode="BRIDGE")
up.set_radio_config()
up.add_ssid(ssid_data={"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something", "security": "psk2"})
up.push_config(serial_number="3c2c99f44e77")
print(obj.get_device_by_serial_number(serial_number="3c2c99f44e77"))
# print(datetime.datetime.utcnow())
# fms = FMSUtils(sdk_client=obj)
# new = fms.get_firmwares(model='ecw5410')
# for i in new:
# print(i)
# print(len(new))
# print(profile.get_ssid_info())
# # print(obj.get_devices())
# obj.logout()