Files
wlan-testing/libs/controller/controller_2x/controller.py
Haricharan Jaka 07c6e4d78a Wifi 7859 (#516)
* Added Contact Related testcases of Prov UI

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added Prov UI contact related test cases

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added Prov UI contact related function for support

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Changed the test case function names

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>
2022-05-10 13:48:09 +05:30

822 lines
31 KiB
Python

"""
Base Library for Ucentral
"""
import datetime
import json
import ssl
import sys
import time
from urllib.parse import urlparse
import pytest
import allure
import requests
from operator import itemgetter
from pathlib import Path
from requests.adapters import HTTPAdapter
import logging
# logging.basicConfig(level=logging.DEBUG)
# from http.client import HTTPConnection
#
# HTTPConnection.debuglevel = 1
# requests.logging.getLogger()
class ConfigureController:
def __init__(self, controller_data):
self.username = controller_data["username"]
self.password = controller_data["password"]
self.host = urlparse(controller_data["url"])
print(self.host)
self.access_token = ""
# self.session = requests.Session()
self.login_resp = self.login()
self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint()
if self.gw_host == "" or self.fms_host == "" or self.prov_host == "":
time.sleep(60)
self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint()
if self.gw_host == "" or self.fms_host == "" or self.prov_host == "":
self.logout()
print(self.gw_host, self.fms_host, self.prov_host)
pytest.exit("All Endpoints not available in Controller Service")
sys.exit()
def build_uri_sec(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path)
print(new_uri)
return new_uri
def build_url_fms(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.fms_host.hostname, self.fms_host.port, path)
print(new_uri)
return new_uri
def build_uri(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.gw_host.hostname, self.gw_host.port, path)
print(new_uri)
return new_uri
def build_url_prov(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.prov_host.hostname, self.prov_host.port, path)
print(new_uri)
return new_uri
def request(self, service, command, method, params, payload):
if service == "sec":
uri = self.build_uri_sec(command)
elif service == "gw":
uri = self.build_uri(command)
elif service == "fms":
uri = self.build_url_fms(command)
elif service == "prov":
uri = self.build_url_prov(command)
else:
raise NameError("Invalid service code for request.")
print(uri)
params = params
if method == "GET":
resp = requests.get(uri, headers=self.make_headers(), params=params, verify=False, timeout=100)
elif method == "POST":
print(uri, payload, params)
resp = requests.post(uri, params=params, data=payload, headers=self.make_headers(), verify=False,
timeout=100)
elif method == "PUT":
resp = requests.put(uri, params=params, data=payload, verify=False, timeout=100)
elif method == "DELETE":
resp = requests.delete(uri, headers=self.make_headers(), params=params, verify=False, timeout=100)
self.check_response(method, resp, self.make_headers(), payload, uri)
print(resp)
return resp
def login(self):
uri = self.build_uri_sec("oauth2")
# self.session.mount(uri, HTTPAdapter(max_retries=15))
payload = json.dumps({"userId": self.username, "password": self.password})
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("POST", resp, "", payload, uri)
token = resp.json()
self.access_token = token["access_token"]
print(token)
if resp.status_code != 200:
print(resp.status_code)
print(resp.json())
pytest.exit(str(resp.json()))
# self.session.headers.update({'Authorization': self.access_token})
return resp
def get_gw_endpoint(self):
uri = self.build_uri_sec("systemEndpoints")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("GET", resp, self.make_headers(), "", uri)
services = resp.json()
print(services)
gw_host = ""
fms_host = ""
for service in services['endpoints']:
if service['type'] == "owgw":
gw_host = urlparse(service["uri"])
if service['type'] == "owfms":
fms_host = urlparse(service["uri"])
if service['type'] == "owprov":
prov_host = urlparse(service["uri"])
return gw_host, fms_host, prov_host
def logout(self):
uri = self.build_uri_sec('oauth2/%s' % self.access_token)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
print('Logged out:', resp.status_code)
return resp
def make_headers(self):
headers = {'Authorization': 'Bearer %s' % self.access_token,
"Connection": "keep-alive",
"Content-Type": "application/json",
"Keep-Alive": "timeout=10, max=1000"
}
return headers
def check_response(self, cmd, response, headers, data_str, url):
if response.status_code >= 400:
if response.status_code >= 400:
print("check-response: ERROR, url: ", url)
else:
print("check-response: url: ", url)
print("Command: ", cmd)
print("response-status: ", response.status_code)
print("response-headers: ", response.headers)
print("response-content: ", response.content)
print("headers: ", headers)
print("data-str: ", data_str)
if response.status_code >= 400:
# if True:
raise NameError("Invalid response code.")
return True
class Controller(ConfigureController):
def __init__(self, controller_data=None):
super().__init__(controller_data)
def get_devices(self):
uri = self.build_uri("devices/")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
devices = resp.json()
# resp.close()()
return devices
def get_device_by_serial_number(self, serial_number=None):
uri = self.build_uri("device/" + serial_number)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
device = resp.json()
# resp.close()()
return device
def get_sdk_version(self):
uri = self.build_uri("system/?command=info")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
version = resp.json()
print(version)
# resp.close()()
return version['version']
def get_system_gw(self):
uri = self.build_uri("system/?command=info")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_system_fms(self):
uri = self.build_url_fms("system/?command=info")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_system_prov(self):
uri = self.build_url_prov("system/?command=info")
allure.attach(name="Url of Prov UI:", body=str(uri))
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_device_uuid(self, serial_number):
device_info = self.get_device_by_serial_number(serial_number=serial_number)
return device_info["UUID"]
class FMSUtils:
def __init__(self, sdk_client=None, controller_data=None):
if sdk_client is None:
self.sdk_client = Controller(controller_data=controller_data)
self.sdk_client = sdk_client
def upgrade_firmware(self, serial="", url=""):
response = self.sdk_client.request(service="gw", command="device/" + serial + "/upgrade",
method="POST", params="serialNumber=" + serial,
payload="{ \"serialNumber\" : " + "\"" + serial + "\"" +
" , \"uri\" : " + "\"" + url + "\"" +
", \"when\" : 0" + " }")
print(response.json())
allure.attach(name="REST - firmware upgrade response: ",
body=str(response.status_code) + "\n" +
str(response.json()) + "\n"
)
print(response)
def ap_model_lookup(self, model=""):
devices = self.get_device_set()
model_name = ""
for device in devices['deviceTypes']:
if str(device).__eq__(model):
model_name = device
return model_name
def get_revisions(self):
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET", params="revisionSet=true",
payload="")
if response.status_code == 200:
return response.json()
else:
return {}
def get_latest_fw(self, model=""):
device_type = self.ap_model_lookup(model=model)
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET",
params="latestOnly=true&deviceType=" + device_type,
payload="")
if response.status_code == 200:
return response.json()
else:
return {}
def get_device_set(self):
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET", params="deviceSet=true",
payload="")
if response.status_code == 200:
return response.json()
else:
return {}
def get_firmwares(self, limit="10000", model="", latestonly="", branch="", commit_id="", offset="3000"):
deviceType = self.ap_model_lookup(model=model)
params = "limit=" + limit + \
"&deviceType=" + deviceType + \
"&latestonly=" + latestonly + \
"offset=" + offset
command = "firmwares/"
response = self.sdk_client.request(service="fms", command=command, method="GET", params=params, payload="")
allure.attach(name=command + params,
body=str(response.status_code) + "\n" + str(response.json()),
attachment_type=allure.attachment_type.JSON)
if response.status_code == 200:
data = response.json()
newlist = sorted(data['firmwares'], key=itemgetter('created'))
# for i in newlist:
# print(i['uri'])
# print(i['revision'])
# print(newlist)
return newlist
# print(data)
return "error"
class ProvUtils(ConfigureController):
def __init__(self, controller_data=None):
super().__init__(controller_data)
def build_url_prov(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.prov_host.hostname, self.prov_host.port, path)
print(new_uri)
return new_uri
def get_inventory(self):
uri = self.build_url_prov("inventory")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_inventory_by_device(self, device_name):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_system_prov(self):
uri = self.build_url_prov("system/?command=info")
allure.attach(name="Url of Prov UI:", body=str(uri))
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def add_device_to_inventory(self, device_name, payload):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_device_from_inventory(self, device_name):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
def get_entity(self):
uri = self.build_url_prov("entity")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_entity_by_id(self,entity_id):
uri = self.build_url_prov("entity/" + entity_id)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def add_entity(self, payload):
uri = self.build_url_prov("entity/1")
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_entity(self, entity_id):
uri = self.build_url_prov("entity/" + entity_id)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
def edit_device_from_inventory(self, device_name, payload):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.put(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("PUT", resp, self.make_headers(), payload, uri)
return resp
def edit_entity(self, payload, entity_id):
uri = self.build_url_prov("entity/" + entity_id)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.put(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("PUT", resp, self.make_headers(), payload, uri)
return resp
def get_contact(self):
uri = self.build_url_prov("contact")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_contact_by_id(self, contact_id):
uri = self.build_url_prov("contact/" + contact_id)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def add_contact(self, payload):
uri = self.build_url_prov("contact/1")
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_contact(self, contact_id):
uri = self.build_url_prov("contact/" + contact_id)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
def edit_contact(self, payload, contact_id):
uri = self.build_url_prov("contact/" + contact_id)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.put(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("PUT", resp, self.make_headers(), payload, uri)
return resp
class UProfileUtility:
def __init__(self, sdk_client=None, controller_data=None):
if sdk_client is None:
self.sdk_client = Controller(controller_data=controller_data)
self.sdk_client = sdk_client
self.base_profile_config = {
"uuid": 1,
"radios": [],
"interfaces": [{
"name": "WAN",
"role": "upstream",
"services": ["ssh", "lldp", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
},
{
"name": "LAN",
"role": "downstream",
"services": ["ssh", "lldp", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"LAN*"
]
}
],
"ipv4": {
"addressing": "static",
"subnet": "192.168.1.1/16",
"dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
}
},
}],
"metrics": {
"statistics": {
"interval": 60,
"types": ["ssids", "lldp", "clients"]
},
"health": {
"interval": 120
},
"wifi-frames": {
"filters": ["probe",
"auth",
"assoc",
"disassoc",
"deauth",
"local-deauth",
"inactive-deauth",
"key-mismatch",
"beacon-report",
"radar-detected"]
},
"dhcp-snooping": {
"filters": ["ack", "discover", "offer", "request", "solicit", "reply", "renew"]
}
},
"services": {
"lldp": {
"describe": "TIP OpenWiFi",
"location": "QA"
},
"ssh": {
"port": 22
}
}
}
self.vlan_section = {
"name": "WAN100",
"role": "upstream",
"vlan": {
"id": 100
},
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
self.mode = None
def set_mesh_services(self):
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24"
self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100
del self.base_profile_config['metrics']['wifi-frames']
del self.base_profile_config['metrics']['dhcp-snooping']
var = {
"filters": ["probe",
"auth"]
}
self.base_profile_config["metrics"]['wifi-frames'] = var
del self.base_profile_config['services']
var2 = {
"lldp": {
"describe": "uCentral",
"location": "universe"
},
"ssh": {
"port": 22
}
}
self.base_profile_config['services'] = var2
def set_express_wifi(self, open_flow=None):
if self.mode == "NAT":
self.base_profile_config["interfaces"][1]["services"] = ["ssh", "lldp", "open-flow"]
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24"
self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100
self.base_profile_config['services']["open-flow"] = open_flow
self.base_profile_config['services']['lldp']['describe'] = "OpenWiFi - expressWiFi"
self.base_profile_config['services']['lldp']['location'] = "Hotspot"
def set_captive_portal(self):
if self.mode == "NAT":
max_client = {
"max-clients": 32
}
# sourceFile = open('captive_config.py', 'w')
self.base_profile_config["interfaces"][1]["name"] = "captive"
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.2.1/24"
self.base_profile_config["interfaces"][1]["captive"] = max_client
del self.base_profile_config["interfaces"][1]["ethernet"]
del self.base_profile_config["interfaces"][1]["services"]
del self.base_profile_config["metrics"]["wifi-frames"]
del self.base_profile_config["metrics"]["dhcp-snooping"]
# print(self.base_profile_config)
# print(self.base_profile_config, file=sourceFile)
# sourceFile.close()
def encryption_lookup(self, encryption="psk"):
encryption_mapping = {
"none": "open",
"psk": "wpa",
"psk2": "wpa2",
"sae": "wpa3",
"psk-mixed": "wpa|wpa2",
"sae-mixed": "wpa3",
"wpa": 'wap',
"wpa2": "eap",
"wpa3": "eap",
"wpa-mixed": "eap",
"wpa3-mixed": "sae"
}
if encryption in encryption_mapping.keys():
return encryption_mapping[encryption]
else:
return False
def get_ssid_info(self):
ssid_info = []
for interfaces in self.base_profile_config["interfaces"]:
if "ssids" in interfaces.keys():
for ssid_data in interfaces["ssids"]:
for band in ssid_data["wifi-bands"]:
temp = [ssid_data["name"]]
if ssid_data["encryption"]["proto"] == "none" or "radius" in ssid_data.keys():
temp.append(self.encryption_lookup(encryption=ssid_data["encryption"]["proto"]))
temp.append('[BLANK]')
else:
temp.append(self.encryption_lookup(encryption=ssid_data["encryption"]["proto"]))
temp.append(ssid_data["encryption"]["key"])
temp.append(band)
ssid_info.append(temp)
return ssid_info
def set_radio_config(self, radio_config={}):
base_radio_config_2g = {
"band": "2G",
"country": "CA",
"channel-mode": "HE",
"channel": "auto"
}
base_radio_config_5g = {
"band": "5G",
"country": "CA",
"allow-dfs": True,
"channel-mode": "HE",
"channel": "auto"
}
for band in radio_config:
if band == "2G":
for keys in radio_config[band]:
base_radio_config_2g[keys] = radio_config[band][keys]
if band == "5G":
for keys in radio_config[band]:
base_radio_config_5g[keys] = radio_config[band][keys]
# if band == "6G":
# for keys in radio_config[band]:
# base_radio_config_6g[keys] = radio_config[band][keys]
self.base_profile_config["radios"].append(base_radio_config_2g)
self.base_profile_config["radios"].append(base_radio_config_5g)
print(self.base_profile_config)
self.vlan_section["ssids"] = []
self.vlan_ids = []
def set_mode(self, mode, mesh=False):
self.mode = mode
if mode == "NAT":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
self.base_profile_config['interfaces'][1]['ssids'] = []
elif mode == "BRIDGE":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
self.base_profile_config['interfaces'][0]['ssids'] = []
elif mode == "VLAN":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
del self.base_profile_config['interfaces'][1]
self.base_profile_config['interfaces'][0]['ssids'] = []
self.base_profile_config['interfaces'] = []
wan_section_vlan = {
"name": "WAN",
"role": "upstream",
"services": ["lldp", "ssh", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
self.base_profile_config['interfaces'].append(wan_section_vlan)
else:
print("Invalid Mode")
return 0
def add_ssid(self, ssid_data, radius=False, radius_auth_data={}, radius_accounting_data={}):
print("ssid data : ", ssid_data)
ssid_info = {'name': ssid_data["ssid_name"], "bss-mode": "ap", "wifi-bands": [], "services": ["wifi-frames"]}
for options in ssid_data:
if options == "multi-psk":
ssid_info[options] = ssid_data[options]
print("hi", ssid_info)
if options == "rate-limit":
ssid_info[options] = ssid_data[options]
for i in ssid_data["appliedRadios"]:
ssid_info["wifi-bands"].append(i)
ssid_info['encryption'] = {}
ssid_info['encryption']['proto'] = ssid_data["security"]
try:
ssid_info['encryption']['key'] = ssid_data["security_key"]
except Exception as e:
pass
ssid_info['encryption']['ieee80211w'] = "optional"
if radius:
ssid_info["radius"] = {}
ssid_info["radius"]["authentication"] = {
"host": radius_auth_data["ip"],
"port": radius_auth_data["port"],
"secret": radius_auth_data["secret"]
}
ssid_info["radius"]["accounting"] = {
"host": radius_accounting_data["ip"],
"port": radius_accounting_data["port"],
"secret": radius_accounting_data["secret"]
}
if self.mode == "NAT":
self.base_profile_config['interfaces'][1]['ssids'].append(ssid_info)
elif self.mode == "BRIDGE":
self.base_profile_config['interfaces'][0]['ssids'].append(ssid_info)
elif self.mode == "VLAN":
vid = ssid_data["vlan"]
self.vlan_section = {
"name": "WAN100",
"role": "upstream",
"services": ["lldp", "dhcp-snooping"],
"vlan": {
"id": 100
},
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
vlan_section = self.vlan_section
if vid in self.vlan_ids:
print("sss", self.vlan_ids)
for i in self.base_profile_config['interfaces']:
if i["name"] == "WANv%s" % (vid):
i["ssids"].append(ssid_info)
else:
print(self.vlan_ids)
self.vlan_ids.append(vid)
vlan_section['name'] = "WANv%s" % (vid)
vlan_section['vlan']['id'] = int(vid)
vlan_section["ssids"] = []
vlan_section["ssids"].append(ssid_info)
self.base_profile_config['interfaces'].append(vlan_section)
print(vlan_section)
vsection = 0
else:
print("invalid mode")
pytest.exit("invalid Operating Mode")
def push_config(self, serial_number):
payload = {"configuration": self.base_profile_config, "serialNumber": serial_number, "UUID": 1}
uri = self.sdk_client.build_uri("device/" + serial_number + "/configure")
basic_cfg_str = json.dumps(payload)
allure.attach(name="ucentral_config: ",
body=str(basic_cfg_str),
attachment_type=allure.attachment_type.JSON)
print("JSON Post Configure: " + str(basic_cfg_str))
print("Sending Configure Command: ", datetime.datetime.utcnow())
resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(),
verify=False, timeout=100)
print(resp.json())
print(resp.status_code)
allure.attach(name="/configure response: " + str(resp.status_code), body=str(resp.json()),
attachment_type=allure.attachment_type.JSON)
self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri)
# print(resp.url)
resp.close()
print(resp)
if __name__ == '__main__':
controller = {
'url': 'https://sec-qa01.cicd.lab.wlan.tip.build:16001', # API base url for the controller
'username': "tip@ucentral.com",
'password': 'OpenWifi%123',
}
obj = Controller(controller_data=controller)
# up = UProfileUtility(sdk_client=obj, controller_data=controller)
# up.set_mode(mode="BRIDGE")
# up.set_radio_config()
# up.add_ssid(ssid_data={"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something", "security": "psk2"})
# up.push_config(serial_number="3c2c99f44e77")
# print(obj.get_device_by_serial_number(serial_number="3c2c99f44e77"))
# print(datetime.datetime.utcnow())
# fms = FMSUtils(sdk_client=obj)
# new = fms.get_firmwares(model='ecw5410')
# for i in new:
# print(i)
# print(len(new))
# print(profile.get_ssid_info())
# # print(obj.get_devices())
# obj.logout()