Files
wlan-testing/libs/controller/controller_2x/controller.py
Shivam Thakur 516358d1b5 Wifi 7868 restructuring sanity suite to cover golden coverage of suites (#529)
* WIFI-7868: Added ow_sanity_lf suite for sanity restructuring

Added client connectivity tests under the ow_sanity_lf

Signed-off-by: shivam <shivam.thakur@candelatech.com>

* [WIFI-7869] Fix: regression testing AWS variables placement

Signed-off-by: Dmitry Dunaev <dmitry@opsfleet.com>

* [WIFI-7779] Add: helm deploy support for mailer envs (#517)

Signed-off-by: Dmitry Dunaev <dmitry@opsfleet.com>

* Also pin kubectl version when destroying namespace

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* [WIFI-7604] Add workflows to test deployment upgrade paths (#471)

* Create and switch to composite action for Docker Compose in Kubernetes deployment

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Add missing inputs to composite action

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Add input for hashed password and adapt variable names

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix input variable names

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Add Docker Compose upgrade test workflow

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Get deploy script according to deployment_version input

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Hard-code branch for testing

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Re-add RTTY_TOKEN for testing previous release upgrades

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* [WIFI-7697] Move deploy related files from wlan-testing to wlan-cloud-ucentral-deploy (#472)

* Change composite actions and workflows to use wlan-cloud-ucentral-deploy repo

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Checkout target branch matching with deployment version

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Remove working-directory option

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Rename image version variables to enable search and replace in deploy script

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Remove defaults from action inputs

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Try removing owsub and owanalytics ports for 2.4 deployment

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Actually remove ports since commenting out doesn't work

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* [WIFI-7766] Switch Docker Compose testing workflow to deployment on EC2 (#499)

* Switch from Kubernetes pod to EC2 instance deployment

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix indentation

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Add VPC subnet ID to use existing security group

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Move env variable definition to executed SSH command

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Remove defaults from inputs to avoid overwriting deployment_version

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix syntax

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix variable syntax

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Add my own SSH key for debugging

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Revert "Add my own SSH key for debugging"

This reverts commit e4a9fee140.

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Increase AWS instance size

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Update package lists before installing

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Add curl insecure option and append self-signed REST API cert to AP cert trust store

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Pass FLAGS variable to clustersysteminfo Docker container

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Switch to github.run_number for uniqueness

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Start Docker manually on instance

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Install Docker using the repository

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Do not destroy created instances for debugging

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Quote selfsigned_restapi_ca

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Split SSH commands into multiple lines for better readability

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix syntax

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix typo

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Revert "Split SSH commands into multiple lines for better readability"

This reverts commit c08ec026cfe3915e36cef3215226b41dd0319ed8.

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Revert "Fix syntax"

This reverts commit f408cfd19f5cf01d6b1d1f78c930897f9442c7de.

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Add debugging output for REST API CA cert

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Add: RTTY_TOKEN env for older versions support

Signed-off-by: Dmitry Dunaev <dmitry@opsfleet.com>

* Download self-signed REST API CA cert directly to virtual AP

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Switch to self-signed certs

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix variable name

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix step name

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Change hostname and use sudo to add hosts entry

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix syntax

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Actually add IP to hosts entry

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix missing hostname in AP hosts entry

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Adapt Docker Compose upgrade workflow to EC2 instance deployment

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Configure Docker to log to logstash input

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix Docker daemon config

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Add missing EOF

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix indentation

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix syntax

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Try to fix syntax again

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Use different delimiter

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix conflict after rebase

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix workflow syntax

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Disable StrictHostKeyChecking in workflow

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

Co-authored-by: Dmitry Dunaev <dmitry@opsfleet.com>

* Remove obsolete inputs and add RTTY_TOKEN to other workflows as well

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Fix value file names

Signed-off-by: Johann Hoffmann <johann.hoffmann@mailbox.org>

* Wifi 7858 (#518)

* Crated a new file for FMS related testcases

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Removed Fms system ifo test case from gateway service file

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added Location related test cases of Prov UI

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Added Location related functions for support

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Changed the function name

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* Changed the allure attachment names on edit functions

Signed-off-by: haricharan-jaka <haricharan.jaka@candelatech.com>

* WIFI-7868: Added multivlan scenarios in ow_sanity and removed Some of the rate limiting as they take a long time to complete

Added few up, dw, up+dw in rate limit for ow_sanity_lf

Signed-off-by: shivam <shivam.thakur@candelatech.com>

* WIFI-7868 : Added dvlan and rate limit from radius in sanity suite and reduced the rate limiting tests as they are taking alot of time

Signed-off-by: shivam <shivam.thakur@candelatech.com>

* WIFI-7868 Adding some marker improvements to reduce number of test cases and increase coverage

optimised rate limiting, added dvlan, added rate limit with radius

Signed-off-by: shivam <shivam.thakur@candelatech.com>

Co-authored-by: Dmitry Dunaev <dmitry@opsfleet.com>
Co-authored-by: Dmitry Dunaev <83591011+dunaev-opsfleet@users.noreply.github.com>
Co-authored-by: Johann Hoffmann <johann.hoffmann@mailbox.org>
Co-authored-by: Haricharan Jaka <87761091+haricharan-jaka@users.noreply.github.com>
2022-05-17 01:32:12 +05:30

964 lines
37 KiB
Python

"""
Base Library for Ucentral
"""
import datetime
import json
import ssl
import sys
import time
from urllib.parse import urlparse
import pytest
import allure
import requests
from operator import itemgetter
from pathlib import Path
from requests.adapters import HTTPAdapter
import logging
# logging.basicConfig(level=logging.DEBUG)
# from http.client import HTTPConnection
#
# HTTPConnection.debuglevel = 1
# requests.logging.getLogger()
class ConfigureController:
def __init__(self, controller_data):
self.username = controller_data["username"]
self.password = controller_data["password"]
self.host = urlparse(controller_data["url"])
print(self.host)
self.access_token = ""
# self.session = requests.Session()
self.login_resp = self.login()
self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint()
if self.gw_host == "" or self.fms_host == "" or self.prov_host == "":
time.sleep(60)
self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint()
if self.gw_host == "" or self.fms_host == "" or self.prov_host == "":
self.logout()
print(self.gw_host, self.fms_host, self.prov_host)
pytest.exit("All Endpoints not available in Controller Service")
sys.exit()
def build_uri_sec(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path)
print(new_uri)
return new_uri
def build_url_fms(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.fms_host.hostname, self.fms_host.port, path)
print(new_uri)
return new_uri
def build_uri(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.gw_host.hostname, self.gw_host.port, path)
print(new_uri)
return new_uri
def build_url_prov(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.prov_host.hostname, self.prov_host.port, path)
print(new_uri)
return new_uri
def request(self, service, command, method, params, payload):
if service == "sec":
uri = self.build_uri_sec(command)
elif service == "gw":
uri = self.build_uri(command)
elif service == "fms":
uri = self.build_url_fms(command)
elif service == "prov":
uri = self.build_url_prov(command)
else:
raise NameError("Invalid service code for request.")
print(uri)
params = params
if method == "GET":
resp = requests.get(uri, headers=self.make_headers(), params=params, verify=False, timeout=100)
elif method == "POST":
print(uri, payload, params)
resp = requests.post(uri, params=params, data=payload, headers=self.make_headers(), verify=False,
timeout=100)
elif method == "PUT":
resp = requests.put(uri, params=params, data=payload, verify=False, timeout=100)
elif method == "DELETE":
resp = requests.delete(uri, headers=self.make_headers(), params=params, verify=False, timeout=100)
self.check_response(method, resp, self.make_headers(), payload, uri)
print(resp)
return resp
def login(self):
uri = self.build_uri_sec("oauth2")
# self.session.mount(uri, HTTPAdapter(max_retries=15))
payload = json.dumps({"userId": self.username, "password": self.password})
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("POST", resp, "", payload, uri)
token = resp.json()
self.access_token = token["access_token"]
print(token)
if resp.status_code != 200:
print(resp.status_code)
print(resp.json())
pytest.exit(str(resp.json()))
# self.session.headers.update({'Authorization': self.access_token})
return resp
def get_gw_endpoint(self):
uri = self.build_uri_sec("systemEndpoints")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("GET", resp, self.make_headers(), "", uri)
services = resp.json()
print(services)
gw_host = ""
fms_host = ""
for service in services['endpoints']:
if service['type'] == "owgw":
gw_host = urlparse(service["uri"])
if service['type'] == "owfms":
fms_host = urlparse(service["uri"])
if service['type'] == "owprov":
prov_host = urlparse(service["uri"])
return gw_host, fms_host, prov_host
def logout(self):
uri = self.build_uri_sec('oauth2/%s' % self.access_token)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
print('Logged out:', resp.status_code)
return resp
def make_headers(self):
headers = {'Authorization': 'Bearer %s' % self.access_token,
"Connection": "keep-alive",
"Content-Type": "application/json",
"Keep-Alive": "timeout=10, max=1000"
}
return headers
def check_response(self, cmd, response, headers, data_str, url):
if response.status_code >= 400:
if response.status_code >= 400:
print("check-response: ERROR, url: ", url)
else:
print("check-response: url: ", url)
print("Command: ", cmd)
print("response-status: ", response.status_code)
print("response-headers: ", response.headers)
print("response-content: ", response.content)
print("headers: ", headers)
print("data-str: ", data_str)
if response.status_code >= 400:
# if True:
raise NameError("Invalid response code.")
return True
class Controller(ConfigureController):
def __init__(self, controller_data=None):
super().__init__(controller_data)
def get_devices(self):
uri = self.build_uri("devices/")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_device_by_serial_number(self, serial_number):
uri = self.build_uri("device/" + serial_number)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_sdk_version(self):
uri = self.build_uri("system/?command=info")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
version = resp.json()
print(version)
# resp.close()()
return version['version']
def get_system_gw(self):
uri = self.build_uri("system/?command=info")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_system_fms(self):
uri = self.build_url_fms("system/?command=info")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_system_prov(self):
uri = self.build_url_prov("system/?command=info")
allure.attach(name="Url of Prov UI:", body=str(uri))
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_device_uuid(self, serial_number):
device_info = self.get_device_by_serial_number(serial_number=serial_number)
return device_info["UUID"]
def add_device_to_gw(self, serial_number, payload):
uri = self.build_uri("device/" + serial_number)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_device_from_gw(self, device_name):
uri = self.build_uri("device/" + device_name)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
def get_commands(self):
uri = self.build_uri("commands")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_device_logs(self, serial_number):
uri = self.build_uri("device/" + serial_number + "/logs")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_device_health_checks(self, serial_number):
uri = self.build_uri("device/" + serial_number + "/healthchecks")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_device_capabilities(self, serial_number):
uri = self.build_uri("device/" + serial_number + "/capabilities")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_device_statistics(self, serial_number):
uri = self.build_uri("device/" + serial_number + "/statistics")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_device_status(self, serial_number):
uri = self.build_uri("device/" + serial_number + "/status")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
class FMSUtils:
def __init__(self, sdk_client=None, controller_data=None):
if sdk_client is None:
self.sdk_client = Controller(controller_data=controller_data)
self.sdk_client = sdk_client
def upgrade_firmware(self, serial="", url=""):
response = self.sdk_client.request(service="gw", command="device/" + serial + "/upgrade",
method="POST", params="serialNumber=" + serial,
payload="{ \"serialNumber\" : " + "\"" + serial + "\"" +
" , \"uri\" : " + "\"" + url + "\"" +
", \"when\" : 0" + " }")
print(response.json())
allure.attach(name="REST - firmware upgrade response: ",
body=str(response.status_code) + "\n" +
str(response.json()) + "\n"
)
print(response)
def ap_model_lookup(self, model=""):
devices = self.get_device_set()
model_name = ""
for device in devices['deviceTypes']:
if str(device).__eq__(model):
model_name = device
return model_name
def get_revisions(self):
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET", params="revisionSet=true",
payload="")
if response.status_code == 200:
return response.json()
else:
return {}
def get_latest_fw(self, model=""):
device_type = self.ap_model_lookup(model=model)
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET",
params="latestOnly=true&deviceType=" + device_type,
payload="")
if response.status_code == 200:
return response.json()
else:
return {}
def get_device_set(self):
response = self.sdk_client.request(service="fms", command="firmwares/", method="GET", params="deviceSet=true",
payload="")
if response.status_code == 200:
return response.json()
else:
return {}
def get_firmwares(self, limit="10000", model="", latestonly="", branch="", commit_id="", offset="3000"):
deviceType = self.ap_model_lookup(model=model)
params = "limit=" + limit + \
"&deviceType=" + deviceType + \
"&latestonly=" + latestonly + \
"offset=" + offset
command = "firmwares/"
response = self.sdk_client.request(service="fms", command=command, method="GET", params=params, payload="")
allure.attach(name=command + params,
body=str(response.status_code) + "\n" + str(response.json()),
attachment_type=allure.attachment_type.JSON)
if response.status_code == 200:
data = response.json()
newlist = sorted(data['firmwares'], key=itemgetter('created'))
# for i in newlist:
# print(i['uri'])
# print(i['revision'])
# print(newlist)
return newlist
# print(data)
return "error"
class ProvUtils(ConfigureController):
def __init__(self, controller_data=None):
super().__init__(controller_data)
def build_url_prov(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.prov_host.hostname, self.prov_host.port, path)
print(new_uri)
return new_uri
def get_inventory(self):
uri = self.build_url_prov("inventory")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_inventory_by_device(self, device_name):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_system_prov(self):
uri = self.build_url_prov("system/?command=info")
allure.attach(name="Url of Prov UI:", body=str(uri))
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def add_device_to_inventory(self, device_name, payload):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_device_from_inventory(self, device_name):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
def get_entity(self):
uri = self.build_url_prov("entity")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_entity_by_id(self,entity_id):
uri = self.build_url_prov("entity/" + entity_id)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def add_entity(self, payload):
uri = self.build_url_prov("entity/1")
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_entity(self, entity_id):
uri = self.build_url_prov("entity/" + entity_id)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
def edit_device_from_inventory(self, device_name, payload):
uri = self.build_url_prov("inventory/" + device_name)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.put(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("PUT", resp, self.make_headers(), payload, uri)
return resp
def edit_entity(self, payload, entity_id):
uri = self.build_url_prov("entity/" + entity_id)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.put(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("PUT", resp, self.make_headers(), payload, uri)
return resp
def get_contact(self):
uri = self.build_url_prov("contact")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_contact_by_id(self, contact_id):
uri = self.build_url_prov("contact/" + contact_id)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def add_contact(self, payload):
uri = self.build_url_prov("contact/1")
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_contact(self, contact_id):
uri = self.build_url_prov("contact/" + contact_id)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
def edit_contact(self, payload, contact_id):
uri = self.build_url_prov("contact/" + contact_id)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.put(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("PUT", resp, self.make_headers(), payload, uri)
return resp
def get_location(self):
uri = self.build_url_prov("location")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_location_by_id(self, location_id):
uri = self.build_url_prov("location/" + location_id)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def add_location(self, payload):
uri = self.build_url_prov("location/1")
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_location(self, location_id):
uri = self.build_url_prov("location/" + location_id)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
def edit_location(self, payload, location_id):
uri = self.build_url_prov("location/" + location_id)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.put(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("PUT", resp, self.make_headers(), payload, uri)
return resp
def get_venue(self):
uri = self.build_url_prov("venue/")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def get_venue_by_id(self, venue_id):
uri = self.build_url_prov("venue/" + venue_id)
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
return resp
def add_venue(self, payload):
uri = self.build_url_prov("venue/0")
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("POST", resp, self.make_headers(), payload, uri)
return resp
def delete_venue(self, venue_id):
uri = self.build_url_prov("venue/" + venue_id)
print(uri)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
return resp
def edit_venue(self, payload, venue_id):
uri = self.build_url_prov("venue/" + venue_id)
print(uri)
print(payload)
payload = json.dumps(payload)
resp = requests.put(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("PUT", resp, self.make_headers(), payload, uri)
return resp
class UProfileUtility:
def __init__(self, sdk_client=None, controller_data=None):
if sdk_client is None:
self.sdk_client = Controller(controller_data=controller_data)
self.sdk_client = sdk_client
self.base_profile_config = {
"uuid": 1,
"radios": [],
"interfaces": [{
"name": "WAN",
"role": "upstream",
"services": ["ssh", "lldp", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
},
{
"name": "LAN",
"role": "downstream",
"services": ["ssh", "lldp", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"LAN*"
]
}
],
"ipv4": {
"addressing": "static",
"subnet": "192.168.1.1/16",
"dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
}
},
}],
"metrics": {
"statistics": {
"interval": 60,
"types": ["ssids", "lldp", "clients"]
},
"health": {
"interval": 120
},
"wifi-frames": {
"filters": ["probe",
"auth",
"assoc",
"disassoc",
"deauth",
"local-deauth",
"inactive-deauth",
"key-mismatch",
"beacon-report",
"radar-detected"]
},
"dhcp-snooping": {
"filters": ["ack", "discover", "offer", "request", "solicit", "reply", "renew"]
}
},
"services": {
"lldp": {
"describe": "TIP OpenWiFi",
"location": "QA"
},
"ssh": {
"port": 22
}
}
}
self.vlan_section = {
"name": "WAN100",
"role": "upstream",
"vlan": {
"id": 100
},
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
self.mode = None
def set_mesh_services(self):
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24"
self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100
del self.base_profile_config['metrics']['wifi-frames']
del self.base_profile_config['metrics']['dhcp-snooping']
var = {
"filters": ["probe",
"auth"]
}
self.base_profile_config["metrics"]['wifi-frames'] = var
del self.base_profile_config['services']
var2 = {
"lldp": {
"describe": "uCentral",
"location": "universe"
},
"ssh": {
"port": 22
}
}
self.base_profile_config['services'] = var2
def set_express_wifi(self, open_flow=None):
if self.mode == "NAT":
self.base_profile_config["interfaces"][1]["services"] = ["ssh", "lldp", "open-flow"]
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24"
self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100
self.base_profile_config['services']["open-flow"] = open_flow
self.base_profile_config['services']['lldp']['describe'] = "OpenWiFi - expressWiFi"
self.base_profile_config['services']['lldp']['location'] = "Hotspot"
def set_captive_portal(self):
if self.mode == "NAT":
max_client = {
"max-clients": 32
}
# sourceFile = open('captive_config.py', 'w')
self.base_profile_config["interfaces"][1]["name"] = "captive"
self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.2.1/24"
self.base_profile_config["interfaces"][1]["captive"] = max_client
del self.base_profile_config["interfaces"][1]["ethernet"]
del self.base_profile_config["interfaces"][1]["services"]
del self.base_profile_config["metrics"]["wifi-frames"]
del self.base_profile_config["metrics"]["dhcp-snooping"]
# print(self.base_profile_config)
# print(self.base_profile_config, file=sourceFile)
# sourceFile.close()
def encryption_lookup(self, encryption="psk"):
encryption_mapping = {
"none": "open",
"psk": "wpa",
"psk2": "wpa2",
"sae": "wpa3",
"psk-mixed": "wpa|wpa2",
"sae-mixed": "wpa3",
"wpa": 'wap',
"wpa2": "eap",
"wpa3": "eap",
"wpa-mixed": "eap",
"wpa3-mixed": "sae"
}
if encryption in encryption_mapping.keys():
return encryption_mapping[encryption]
else:
return False
def get_ssid_info(self):
ssid_info = []
for interfaces in self.base_profile_config["interfaces"]:
if "ssids" in interfaces.keys():
for ssid_data in interfaces["ssids"]:
for band in ssid_data["wifi-bands"]:
temp = [ssid_data["name"]]
if ssid_data["encryption"]["proto"] == "none" or "radius" in ssid_data.keys():
temp.append(self.encryption_lookup(encryption=ssid_data["encryption"]["proto"]))
temp.append('[BLANK]')
else:
temp.append(self.encryption_lookup(encryption=ssid_data["encryption"]["proto"]))
temp.append(ssid_data["encryption"]["key"])
temp.append(band)
ssid_info.append(temp)
return ssid_info
def set_radio_config(self, radio_config={}):
base_radio_config_2g = {
"band": "2G",
"country": "CA",
"channel-mode": "HE",
"channel": "auto"
}
base_radio_config_5g = {
"band": "5G",
"country": "CA",
"allow-dfs": True,
"channel-mode": "HE",
"channel": "auto"
}
for band in radio_config:
if band == "2G":
for keys in radio_config[band]:
base_radio_config_2g[keys] = radio_config[band][keys]
if band == "5G":
for keys in radio_config[band]:
base_radio_config_5g[keys] = radio_config[band][keys]
# if band == "6G":
# for keys in radio_config[band]:
# base_radio_config_6g[keys] = radio_config[band][keys]
self.base_profile_config["radios"].append(base_radio_config_2g)
self.base_profile_config["radios"].append(base_radio_config_5g)
print(self.base_profile_config)
self.vlan_section["ssids"] = []
self.vlan_ids = []
def set_mode(self, mode, mesh=False):
self.mode = mode
if mode == "NAT":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
self.base_profile_config['interfaces'][1]['ssids'] = []
elif mode == "BRIDGE":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
self.base_profile_config['interfaces'][0]['ssids'] = []
elif mode == "VLAN":
if mesh:
self.base_profile_config['interfaces'][0]['tunnel'] = {
"proto": "mesh"
}
del self.base_profile_config['interfaces'][1]
self.base_profile_config['interfaces'][0]['ssids'] = []
self.base_profile_config['interfaces'] = []
wan_section_vlan = {
"name": "WAN",
"role": "upstream",
"services": ["lldp", "ssh", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
self.base_profile_config['interfaces'].append(wan_section_vlan)
else:
print("Invalid Mode")
return 0
def add_ssid(self, ssid_data, radius=False, radius_auth_data={}, radius_accounting_data={}):
print("ssid data : ", ssid_data)
ssid_info = {'name': ssid_data["ssid_name"], "bss-mode": "ap", "wifi-bands": [], "services": ["wifi-frames"]}
for options in ssid_data:
if options == "multi-psk":
ssid_info[options] = ssid_data[options]
print("hi", ssid_info)
if options == "rate-limit":
ssid_info[options] = ssid_data[options]
for i in ssid_data["appliedRadios"]:
ssid_info["wifi-bands"].append(i)
ssid_info['encryption'] = {}
ssid_info['encryption']['proto'] = ssid_data["security"]
try:
ssid_info['encryption']['key'] = ssid_data["security_key"]
except Exception as e:
pass
ssid_info['encryption']['ieee80211w'] = "optional"
if radius:
ssid_info["radius"] = {}
ssid_info["radius"]["authentication"] = {
"host": radius_auth_data["ip"],
"port": radius_auth_data["port"],
"secret": radius_auth_data["secret"]
}
ssid_info["radius"]["accounting"] = {
"host": radius_accounting_data["ip"],
"port": radius_accounting_data["port"],
"secret": radius_accounting_data["secret"]
}
if self.mode == "NAT":
self.base_profile_config['interfaces'][1]['ssids'].append(ssid_info)
elif self.mode == "BRIDGE":
self.base_profile_config['interfaces'][0]['ssids'].append(ssid_info)
elif self.mode == "VLAN":
vid = ssid_data["vlan"]
self.vlan_section = {
"name": "WAN100",
"role": "upstream",
"services": ["lldp", "dhcp-snooping"],
"vlan": {
"id": 100
},
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
vlan_section = self.vlan_section
if vid in self.vlan_ids:
print("sss", self.vlan_ids)
for i in self.base_profile_config['interfaces']:
if i["name"] == "WANv%s" % (vid):
i["ssids"].append(ssid_info)
else:
print(self.vlan_ids)
self.vlan_ids.append(vid)
vlan_section['name'] = "WANv%s" % (vid)
vlan_section['vlan']['id'] = int(vid)
vlan_section["ssids"] = []
vlan_section["ssids"].append(ssid_info)
self.base_profile_config['interfaces'].append(vlan_section)
print(vlan_section)
vsection = 0
else:
print("invalid mode")
pytest.exit("invalid Operating Mode")
def push_config(self, serial_number):
payload = {"configuration": self.base_profile_config, "serialNumber": serial_number, "UUID": 1}
uri = self.sdk_client.build_uri("device/" + serial_number + "/configure")
basic_cfg_str = json.dumps(payload)
allure.attach(name="ucentral_config: ",
body=str(basic_cfg_str),
attachment_type=allure.attachment_type.JSON)
print("JSON Post Configure: " + str(basic_cfg_str))
print("Sending Configure Command: ", datetime.datetime.utcnow())
resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(),
verify=False, timeout=100)
print(resp.json())
print(resp.status_code)
allure.attach(name="/configure response: " + str(resp.status_code), body=str(resp.json()),
attachment_type=allure.attachment_type.JSON)
self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri)
print(resp.url)
resp.close()
print(resp)
return resp
if __name__ == '__main__':
controller = {
'url': 'https://sec-qa01.cicd.lab.wlan.tip.build:16001', # API base url for the controller
'username': "tip@ucentral.com",
'password': 'OpenWifi%123',
}
obj = Controller(controller_data=controller)
# up = UProfileUtility(sdk_client=obj, controller_data=controller)
# up.set_mode(mode="BRIDGE")
# up.set_radio_config()
# up.add_ssid(ssid_data={"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something", "security": "psk2"})
# up.push_config(serial_number="3c2c99f44e77")
# print(obj.get_device_by_serial_number(serial_number="3c2c99f44e77"))
# print(datetime.datetime.utcnow())
# fms = FMSUtils(sdk_client=obj)
# new = fms.get_firmwares(model='ecw5410')
# for i in new:
# print(i)
# print(len(new))
# print(profile.get_ssid_info())
# # print(obj.get_devices())
obj.logout()