From 3bd86de755ba3023bca100858d70764863f2e1e8 Mon Sep 17 00:00:00 2001 From: shivam Date: Wed, 20 Jul 2022 15:13:25 +0530 Subject: [PATCH] Base setup for tip_2x package and standard format for wlan-testing Signed-off-by: shivam --- libs/SetupLibrary.py | 79 ++ libs/__init__.py | 0 libs/openwrt_ctl.py | 385 ++++++++++ libs/tip_1x/__init__.py | 0 libs/tip_1x/controller.py | 1418 +++++++++++++++++++++++++++++++++++ libs/tip_2x/__init__.py | 120 +++ libs/tip_2x/ap_lib.py | 11 + libs/tip_2x/controller.py | 1461 +++++++++++++++++++++++++++++++++++++ 8 files changed, 3474 insertions(+) create mode 100644 libs/SetupLibrary.py create mode 100644 libs/__init__.py create mode 100755 libs/openwrt_ctl.py create mode 100644 libs/tip_1x/__init__.py create mode 100644 libs/tip_1x/controller.py create mode 100644 libs/tip_2x/__init__.py create mode 100644 libs/tip_2x/ap_lib.py create mode 100644 libs/tip_2x/controller.py diff --git a/libs/SetupLibrary.py b/libs/SetupLibrary.py new file mode 100644 index 000000000..7f3220161 --- /dev/null +++ b/libs/SetupLibrary.py @@ -0,0 +1,79 @@ +import logging + +import paramiko +from scp import SCPClient + +logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) + + +class SetupLibrary: + + def __init__(self, remote_ip="", + remote_ssh_port=22, + remote_ssh_username="lanforge", + remote_ssh_password="lanforge", + pwd="", + ): + self.pwd = pwd + self.remote_ip = remote_ip + self.remote_ssh_username = remote_ssh_username + self.remote_ssh_password = remote_ssh_password + self.remote_ssh_port = remote_ssh_port + + def setup_serial_environment(self): + client = self.ssh_cli_connect() + cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd) + output = str(stdout.read()) + if output.__contains__("False"): + cmd = 'mkdir ~/cicd-git/' + client.exec_command(cmd) + cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd) + output = str(stdout.read()) + if output.__contains__("False"): + print("Copying openwrt_ctl serial control Script...") + with SCPClient(client.get_transport()) as scp: + scp.put(self.pwd + 'openwrt_ctl.py', '~/cicd-git/openwrt_ctl.py') # Copy my_file.txt to the server + cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd) + var = str(stdout.read()) + client.close() + + def ssh_cli_connect(self): + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + logging.info("Trying SSH Connection to: " + str(self.remote_ip) + + " on port: " + str(self.remote_ssh_port) + + " with username: " + str(self.remote_ssh_username) + + " and password: " + str(self.remote_ssh_password)) + client.connect(self.remote_ip, username=self.remote_ssh_username, password=self.remote_ssh_password, + port=self.remote_ssh_port, timeout=10, allow_agent=False, banner_timeout=200) + return client + + def check_serial_connection(self, tty="/dev/ttyUSB0"): + client = self.ssh_cli_connect() + cmd = 'ls /dev/tty*' + stdin, stdout, stderr = client.exec_command(cmd) + output = str(stdout.read().decode('utf-8')) + client.close() + available_tty_ports = output.split("\n") + if tty in available_tty_ports: + logging.info("Expected Serial Console Port is available on Remote system: " + tty) + return tty in available_tty_ports + + def kill_all_minicom_process(self, tty="/dev/ttyUSB0"): + client = self.ssh_cli_connect() + stdin, stdout, stderr = client.exec_command("fuser -k " + tty) + print(stdout.read()) + client.close() + + +if __name__ == '__main__': + obj = SetupLibrary(remote_ip="192.168.52.89", + remote_ssh_port=22, + pwd="") + + obj.setup_serial_environment() + obj.check_serial_connection(tty="/dev/ttyUSB0") + obj.kill_all_minicom_process(tty="/dev/ttyUSB0") diff --git a/libs/__init__.py b/libs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/libs/openwrt_ctl.py b/libs/openwrt_ctl.py new file mode 100755 index 000000000..ec3f18174 --- /dev/null +++ b/libs/openwrt_ctl.py @@ -0,0 +1,385 @@ +#!/usr/bin/python3 +''' + +make sure pexpect is installed: +$ sudo yum install python3-pexpect + +You might need to install pexpect-serial using pip: +$ pip3 install serial +$ pip3 install pexpect-serial + +./openwrt_ctl.py -l stdout -u root -p TIP -s serial --tty ttyUSB0 + +# Set up reverse ssh tunnel +./openwrt_ctl.py --tty /dev/ttyAP1 --action ssh-tunnel \ + --value "ssh -y -y -f -N -T -M -R 9999:localhost:22 lanforge@10.28.3.100" \ + --value2 password-for-10.28.3.100 --log stdout --scheme serial --prompt root@Open +''' + +import sys + + + +if sys.version_info[0] != 3: + print("This script requires Python 3") + exit() + +try: + import importlib + re = importlib.import_module("re") + logging = importlib.import_module("logging") + time = importlib.import_module("time") + sleep = time.sleep + pexpect = importlib.import_module("pexpect") + serial = importlib.import_module("serial") + pexpect_serial = importlib.import_module("pexpect_serial") + SerialSpawn = pexpect_serial.SerialSpawn + pprint = importlib.import_module("pprint") + telnetlib = importlib.import_module("telnetlib") + argparse = importlib.import_module("argparse") +except ImportError as e: + print(e) + sys.exit("Python Import Error: " + str(e)) + +default_host = "localhost" +default_ports = { + "serial": None, + "ssh": 22, + "telnet": 23 +} +NL = "\n" +CR = "\r\n" +Q = '"' +A = "'" +FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s' +prompt = "root@OpenWrt:" + + +def usage(): + print("$0 used connect to OpenWrt AP or similar Linux machine:") + print("-d|--dest IP address of the OpenWrt AP, for ssh/telnet scheme") + print("-o|--port IP port of the OpenWrt AP, for ssh/telnet scheme") + print("-t|--tty Serial port, if using serial scheme") + print("-u|--user login name") + print("-p|--pass password") + print("--prompt Prompt to look for when commands are done (default: root@OpenWrt)") + print("-s|--scheme (serial|telnet|ssh): connect via serial, ssh or telnet") + print("-l|--log file log messages here") + print("--action (logread | journalctl | lurk | sysupgrade | download | upload | reboot | cmd | ssh-tunnel") + print("--value (option to help complete the action") + print("--value2 (option to help complete the action, dest filename for download, passwd for ssh-tunnel") + print("-h|--help") + + +# see https://stackoverflow.com/a/13306095/11014343 +class FileAdapter(object): + def __init__(self, logger): + self.logger = logger + + def write(self, data): + # NOTE: data can be a partial line, multiple lines + data = data.strip() # ignore leading/trailing whitespace + if data: # non-blank + self.logger.info(data) + + def flush(self): + pass # leave it to logging to flush properly + + +def main(): + global prompt + + parser = argparse.ArgumentParser(description="OpenWrt AP Control Script") + parser.add_argument("-d", "--dest", type=str, help="address of the cisco controller_tests") + parser.add_argument("-o", "--port", type=int, help="control port on the controller_tests") + parser.add_argument("-u", "--user", type=str, help="credential login/username") + parser.add_argument("-p", "--passwd", type=str, help="credential password") + parser.add_argument("-P", "--prompt", type=str, help="Prompt to look for") + parser.add_argument("-s", "--scheme", type=str, choices=["serial", "ssh", "telnet"], + help="Connect via serial, ssh or telnet") + parser.add_argument("-t", "--tty", type=str, help="tty serial device") + parser.add_argument("-l", "--log", type=str, help="logfile for messages, stdout means output to console") + parser.add_argument("--action", type=str, help="perform action", + choices=["logread", "journalctl", "lurk", "sysupgrade", "sysupgrade-n", "download", "upload", + "reboot", "cmd", "ssh-tunnel"]) + parser.add_argument("--value", type=str, help="set value") + parser.add_argument("--value2", type=str, help="set value2") + tty = None + + args = None + try: + args = parser.parse_args() + host = args.dest + scheme = args.scheme + port = args.port + # port = (default_ports[scheme], args.port)[args.port != None] + user = args.user + passwd = args.passwd + logfile = args.log + tty = args.tty; + if (args.prompt != None): + prompt = args.prompt + filehandler = None + except Exception as e: + logging.exception(e); + usage() + exit(2); + + console_handler = logging.StreamHandler() + formatter = logging.Formatter(FORMAT) + logg = logging.getLogger(__name__) + logg.setLevel(logging.DEBUG) + file_handler = None + if (logfile is not None): + if (logfile != "stdout"): + file_handler = logging.FileHandler(logfile, "w") + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(formatter) + logg.addHandler(file_handler) + logging.basicConfig(format=FORMAT, handlers=[file_handler]) + else: + # stdout logging + logging.basicConfig(format=FORMAT, handlers=[console_handler]) + + CCPROMPT = prompt + + ser = None + egg = None # think "eggpect" + try: + if (scheme == "serial"): + # eggspect = pexpect.fdpexpect.fdspan(telcon, logfile=sys.stdout.buffer) + + ser = serial.Serial(tty, 115200, timeout=5) + + egg = SerialSpawn(ser); + egg.logfile = FileAdapter(logg) + egg.sendline(NL) + has_reset = False + try: + logg.info("prompt: %s user: %s passwd: %s" % (prompt, user, passwd)) + while True: + i = egg.expect([prompt, "Please press Enter to activate", "login:", "Password:", "IPQ6018#"], + timeout=3) + logg.info("expect-0: %i" % (i)) + if (i == 0): + logg.info("Found prompt, login complete.") + break + if (i == 1): + logg.info("Sending newline") + egg.setdline(NL) + if (i == 2): + logg.info("Sending username: %s" % (user)) + egg.sendline(user) + if (i == 3): + logg.info("Sending password: %s" % (passwd)) + egg.sendline(passwd) + if (i == 4): # in bootloader + if has_reset: + logg.info("ERROR: Have reset once already, back in bootloader?") + sys.exit(1) + has_reset = True + logg.info("In boot loader, will reset and sleep 30 seconds") + egg.sendline("reset") + time.sleep(30) + egg.sendline(NL) + + except Exception as e: + # maybe something like 'logread -f' is running? + # send ctrl-c + egg.send(chr(3)) + + elif (scheme == "ssh"): + # Not implemented/tested currently. --Ben + if (port is None): + port = 22 + cmd = "ssh -p%d %s@%s" % (port, user, host) + logg.info("Spawn: " + cmd + NL) + egg = pexpect.spawn(cmd) + # egg.logfile_read = sys.stdout.buffer + egg.logfile = FileAdapter(logg) + i = egg.expect(["password:", "continue connecting (yes/no)?"], timeout=3) + time.sleep(0.1) + if i == 1: + egg.sendline('yes') + egg.expect('password:') + sleep(0.1) + egg.sendline(passwd) + + elif (scheme == "telnet"): + # Not implemented/tested currently. --Ben + if (port is None): + port = 23 + cmd = "telnet %s %d" % (host, port) + logg.info("Spawn: " + cmd + NL) + egg = pexpect.spawn(cmd) + egg.logfile = FileAdapter(logg) + time.sleep(0.1) + egg.sendline(' ') + egg.expect('User\:') + egg.sendline(user) + egg.expect('Password\:') + egg.sendline(passwd) + egg.sendline('config paging disable') + else: + usage() + exit(1) + except Exception as e: + logging.exception(e); + + command = None + + CLOSEDBYREMOTE = "closed by remote host." + CLOSEDCX = "Connection to .* closed." + + try: + egg.expect(CCPROMPT) + except Exception as e: + egg.sendline(NL) + + TO = 10 + wait_forever = False + + # Clean pending output + egg.sendline("echo __hello__") + egg.expect("__hello__") + egg.expect(CCPROMPT) + + logg.info("Action[%s] Value[%s] Value2[%s]" % (args.action, args.value, args.value2)) + + if (args.action == "reboot"): + command = "reboot" + TO = 60 + + if (args.action == "cmd"): + if (args.value is None): + raise Exception("cmd requires value to be set.") + command = "%s" % (args.value) + + if (args.action == "logread"): + command = "logread -f" + TO = 1 + wait_forever = True + + if (args.action == "journalctl"): + command = "journalctl -f" + TO = 1 + wait_forever = True + + if (args.action == "lurk"): + command = "date" + TO = 1 + wait_forever = True + + if (args.action == "ssh-tunnel"): + command = "%s" % (args.value) + passwd = "%s" % (args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline(passwd) + egg.expect(CCPROMPT, timeout=20) + return + + if ((args.action == "sysupgrade") or (args.action == "sysupgrade-n")): + command = "scp %s /tmp/new_img.bin" % (args.value) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline("lanforge") + egg.expect(CCPROMPT, timeout=20) + if (args.action == "sysupgrade-n"): + egg.sendline("sysupgrade -n /tmp/new_img.bin") + else: + egg.sendline("sysupgrade /tmp/new_img.bin") + egg.expect("link becomes ready", timeout=100) + return + + if (args.action == "download"): + command = "scp %s /tmp/%s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("Network unreachable, wait 15 seconds and try again.") + time.sleep(15) + command = "scp %s /tmp/%s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("ERROR: Could not connect to LANforge to get download file") + exit(2) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline("lanforge") + egg.expect(CCPROMPT, timeout=20) + return + + if (args.action == "upload"): + command = "scp %s %s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("Network unreachable, wait 15 seconds and try again.") + time.sleep(15) + command = "scp /tmp/%s %s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("ERROR: Could not connect to LANforge to put upload file") + exit(2) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline("lanforge") + egg.expect(CCPROMPT, timeout=20) + return + + if (command is None): + logg.info("No command specified, going to log out.") + else: + logg.info("Command[%s]" % command) + egg.sendline(command); + while True: + try: + i = egg.expect([CCPROMPT, "kmodloader: done loading kernel", "\n"], timeout=TO) + print(egg.before.decode('utf-8', 'ignore')) + if i == 1: + egg.sendline(' ') + egg.expect(CCPROMPT, timeout=20) + print(egg.before.decode('utf-8', 'ignore')) + if i == 2: # new line of text, just print and continue + continue + + if not wait_forever: + break + + except Exception as e: + # Some commands take a long time (logread -f) + if not wait_forever: + logging.exception(e) + break + + +# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- +if __name__ == '__main__': + main() + +#### +#### +#### diff --git a/libs/tip_1x/__init__.py b/libs/tip_1x/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/libs/tip_1x/controller.py b/libs/tip_1x/controller.py new file mode 100644 index 000000000..7bcf74844 --- /dev/null +++ b/libs/tip_1x/controller.py @@ -0,0 +1,1418 @@ +# !/usr/local/lib64/python3.8 +""" + Controller Library + 1. controller_data/sdk_base_url + 2. login credentials +""" +import base64 +import datetime +import json +import re +import ssl +import time +import urllib + +import requests +import swagger_client +from swagger_client import FirmwareManagementApi +from swagger_client import EquipmentGatewayApi +from bs4 import BeautifulSoup +import threading + + +class ConfigureController: + + def __init__(self): + self.configuration = swagger_client.Configuration() + + def set_credentials(self, controller_data=None): + if dict(controller_data).keys().__contains__("username") and dict(controller_data).keys().__contains__( + "password"): + self.configuration.username = controller_data["username"] + self.configuration.password = controller_data["password"] + print("Login Credentials set to custom: \n user_id: %s\n password: %s\n" % (controller_data["username"], + controller_data["password"])) + return True + else: + self.configuration.username = "support@example.com" + self.configuration.password = "support" + print("Login Credentials set to default: \n user_id: %s\n password: %s\n" % ("support@example.com", + "support")) + return False + + def select_controller_data(self, controller_data=None): + if dict(controller_data).keys().__contains__("url") is None: + print("No controller_data Selected") + exit() + self.sdk_base_url = controller_data["url"] + self.configuration.host = self.sdk_base_url + print("controller_data Selected: %s\n SDK_BASE_URL: %s\n" % (controller_data["url"], self.sdk_base_url)) + return True + + def set_sdk_base_url(self, sdk_base_url=None): + if sdk_base_url is None: + print("URL is None") + exit() + self.configuration.host = sdk_base_url + return True + + +""" + Library for cloud_controller_tests generic usages, it instantiate the bearer and credentials. + It provides the connectivity to the cloud. + Instantiate the Object by providing the controller_data=controller_url, customer_id=2 +""" + + +class Controller(ConfigureController): + """ + constructor for cloud_controller_tests library + """ + + def __init__(self, controller_data=None, customer_id=None): + super().__init__() + self.controller_data = controller_data + self.customer_id = customer_id + if customer_id is None: + self.customer_id = 2 + print("Setting to default Customer ID 2") + # + # Setting the Controller Client Configuration + self.select_controller_data(controller_data=controller_data) + self.set_credentials(controller_data=controller_data) + self.configuration.refresh_api_key_hook = self.get_bearer_token + + # Connecting to Controller + self.api_client = swagger_client.ApiClient(self.configuration) + self.login_client = swagger_client.LoginApi(api_client=self.api_client) + self.bearer = False + self.disconnect = False + # Token expiry in seconds + self.token_expiry = 1000 + self.token_timestamp = time.time() + try: + + self.bearer = self.get_bearer_token() + # t1 = threading.Thread(target=self.refresh_instance) + # t1.start() + self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token + self.status_client = swagger_client.StatusApi(api_client=self.api_client) + self.equipment_client = swagger_client.EquipmentApi(self.api_client) + self.profile_client = swagger_client.ProfileApi(self.api_client) + self.api_client.configuration.api_key_prefix = { + "Authorization": "Bearer " + self.bearer._access_token + } + self.api_client.configuration.refresh_api_key_hook = self.refresh_instance + self.ping_response = self.portal_ping() + print("Portal details :: \n", self.ping_response) + except Exception as e: + self.bearer = False + print(e) + + print("Connected to Controller Server") + + def get_bearer_token(self): + request_body = { + "userId": self.configuration.username, + "password": self.configuration.password + } + return self.login_client.get_access_token(request_body) + + def refresh_instance(self): + # Refresh token 10 seconds before it's expiry + if time.time() - self.token_timestamp > self.token_expiry - 10: + self.token_timestamp = time.time() + print("Refreshing the controller API token") + self.disconnect_Controller() + self.api_client = swagger_client.ApiClient(self.configuration) + self.login_client = swagger_client.LoginApi(api_client=self.api_client) + self.bearer = self.get_bearer_token() + self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token + self.status_client = swagger_client.StatusApi(api_client=self.api_client) + self.equipment_client = swagger_client.EquipmentApi(self.api_client) + self.profile_client = swagger_client.ProfileApi(self.api_client) + self.api_client.configuration.api_key_prefix = { + "Authorization": "Bearer " + self.bearer._access_token + } + self.api_client.configuration.refresh_api_key_hook = self.refresh_instance + self.ping_response = self.portal_ping() + print("Portal details :: \n", self.ping_response) + if self.ping_response._application_name != 'PortalServer': + print("Server not Reachable") + exit() + print("Connected to Controller Server") + + def portal_ping(self): + self.refresh_instance() + return self.login_client.portal_ping() + + def disconnect_Controller(self): + self.refresh_instance() + self.disconnect = True + self.api_client.__del__() + + # Returns a List of All the Equipments that are available in the cloud instances + def get_equipment_by_customer_id(self, max_items=10): + self.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": """ + str(max_items) + """ + }""" + self.refresh_instance() + equipment_data = self.equipment_client.get_equipment_by_customer_id(customer_id=self.customer_id, + pagination_context=pagination_context) + return equipment_data._items + + # check if equipment with the given equipment_id is available in cloud instance or not + def validate_equipment_availability(self, equipment_id=None): + self.refresh_instance() + data = self.get_equipment_by_customer_id() + for i in data: + if i._id == equipment_id: + return i._id + return -1 + + # Need to be added in future + def request_ap_reboot(self): + self.refresh_instance() + pass + + # Get the equipment id, of a equipment with a serial number + def get_equipment_id(self, serial_number=None): + self.refresh_instance() + equipment_data = self.get_equipment_by_customer_id(max_items=100) + # print(len(equipment_data)) + for equipment in equipment_data: + if equipment._serial == serial_number: + return equipment._id + + # Get the equipment model name of a given equipment_id + def get_model_name(self, equipment_id=None): + self.refresh_instance() + if equipment_id is None: + return None + self.refresh_instance() + data = self.equipment_client.get_equipment_by_id(equipment_id=equipment_id) + print(str(data._details._equipment_model)) + return str(data._details._equipment_model) + + # Needs Bug fix from swagger code generation side + def get_ap_firmware_new_method(self, equipment_id=None): + self.refresh_instance() + response = self.status_client.get_status_by_customer_equipment(customer_id=self.customer_id, + equipment_id=equipment_id) + print(response[2]) + + # Old Method, will be depreciated in future + def get_ap_firmware_old_method(self, equipment_id=None): + self.refresh_instance() + url = self.configuration.host + "/portal/status/forEquipment?customerId=" + str( + self.customer_id) + "&equipmentId=" + str(equipment_id) + payload = {} + headers = self.configuration.api_key_prefix + response = requests.request("GET", url, headers=headers, data=payload) + + if response.status_code == 200: + status_data = response.json() + # print(status_data) + try: + current_ap_fw = status_data[2]['details']['reportedSwVersion'] + # print(current_ap_fw) + return current_ap_fw + except Exception as e: + print(e) + current_ap_fw = "error" + return e + + else: + return False + + """ + Profile Utilities + """ + + def get_current_profile_on_equipment(self, equipment_id=None): + self.refresh_instance() + default_equipment_data = self.equipment_client.get_equipment_by_id(equipment_id=equipment_id, async_req=False) + return default_equipment_data._profile_id + + # Get the ssid's that are used by the equipment + def get_ssids_on_equipment(self, equipment_id=None): + self.refresh_instance() + profile_id = self.get_current_profile_on_equipment(equipment_id=equipment_id) + all_profiles = self.profile_client.get_profile_with_children(profile_id=profile_id) + ssid_name_list = [] + for i in all_profiles: + if i._profile_type == "ssid": + ssid_name_list.append(i._details['ssid']) + return all_profiles + + # Get the child ssid profiles that are used by equipment ap profile of given profile id + def get_ssid_profiles_from_equipment_profile(self, profile_id=None): + self.refresh_instance() + equipment_ap_profile = self.profile_client.get_profile_by_id(profile_id=profile_id) + ssid_name_list = [] + child_profile_ids = equipment_ap_profile.child_profile_ids + for i in child_profile_ids: + profile = self.profile_client.get_profile_by_id(profile_id=i) + if profile._profile_type == "ssid": + ssid_name_list.append(profile._details['ssid']) + return ssid_name_list + + +""" + Library for Profile Utility, Creating Profiles and Pushing and Deleting them + Steps to create a Profile on Controller: + create a RF Profile + create a Radius Profile + create ssid profiles, and add the radius profile in them, if needed (only used by eap ssid's) + + create equipment_ap profile, and add the rf profile and ssid profiles + Now using push profile method, equipment_ap profile will be pushed to an AP of given equipment_id + +""" + + +class ProfileUtility: + """ + constructor for Access Point Utility library + """ + + def __init__(self, sdk_client=None, controller_data=None, customer_id=None): + if sdk_client is None: + sdk_client = Controller(controller_data=controller_data, customer_id=customer_id) + self.sdk_client = sdk_client + self.sdk_client.refresh_instance() + self.profile_client = swagger_client.ProfileApi(api_client=self.sdk_client.api_client) + self.profile_creation_ids = { + "ssid": [], + "ap": [], + "radius": [], + "rf": [], + "passpoint_osu_id_provider": [], + "passpoint_operator": [], + "passpoint_venue": [], + "passpoint": [] + } + self.profile_name_with_id = {} + self.default_profiles = {} + self.profile_ids = [] + + def cleanup_objects(self): + self.sdk_client.refresh_instance() + self.profile_creation_ids = { + "ssid": [], + "ap": [], + "radius": [], + "rf": [], + "passpoint_osu_id_provider": [], + "passpoint_operator": [], + "passpoint_venue": [], + "passpoint": [] + } + self.profile_name_with_id = {} + self.default_profiles = {} + self.profile_ids = [] + + def get_profile_by_name(self, profile_name=None): + self.sdk_client.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 1000 + }""" + profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, + pagination_context=pagination_context) + + for i in profiles._items: + if i._name == profile_name: + return i + return None + + def get_ssid_name_by_profile_id(self, profile_id=None): + self.sdk_client.refresh_instance() + profiles = self.profile_client.get_profile_by_id(profile_id=profile_id) + return profiles._details["ssid"] + + """ + default templates are as follows : + profile_name= TipWlan-rf/ + Radius-Profile/ + TipWlan-2-Radios/ + TipWlan-3-Radios/ + TipWlan-Cloud-Wifi/ + Captive-Portal + """ + + def get_default_profiles(self): + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 100 + }""" + self.sdk_client.refresh_instance() + items = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, + pagination_context=pagination_context) + + for i in items._items: + # print(i._name, i._id) + if i._name == "TipWlan-Cloud-Wifi": + self.default_profiles['ssid'] = i + if i._name == "TipWlan-3-Radios": + self.default_profiles['equipment_ap_3_radios'] = i + if i._name == "TipWlan-2-Radios": + self.default_profiles['equipment_ap_2_radios'] = i + if i._name == "Captive-Portal": + self.default_profiles['captive_portal'] = i + if i._name == "Radius-Profile": + self.default_profiles['radius'] = i + if i._name == "TipWlan-rf": + self.default_profiles['rf'] = i + # print(i) + + # This will delete the Profiles associated with an equipment of givwn equipment_id, and associate it to default + # equipment_ap profile + def delete_current_profile(self, equipment_id=None): + self.sdk_client.refresh_instance() + equipment_data = self.sdk_client.equipment_client.get_equipment_by_id(equipment_id=equipment_id) + + data = self.profile_client.get_profile_with_children(profile_id=equipment_data._profile_id) + delete_ids = [] + for i in data: + if i._name == "TipWlan-rf": + continue + else: + delete_ids.append(i._id) + # print(delete_ids) + self.get_default_profiles() + self.profile_creation_ids['ap'] = self.default_profiles['equipment_ap_3_radios']._id + # print(self.profile_creation_ids) + self.push_profile_old_method(equipment_id=equipment_id) + self.delete_profile(profile_id=delete_ids) + + # This will delete all the profiles on an controller instance, except the default profiles + def cleanup_profiles(self): + self.sdk_client.refresh_instance() + try: + self.get_default_profiles() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 10000 + }""" + skip_delete_id = [] + for i in self.default_profiles: + skip_delete_id.append(self.default_profiles[i]._id) + + all_profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, + pagination_context=pagination_context) + + delete_ids = [] + for i in all_profiles._items: + delete_ids.append(i._id) + skip_delete_id = [] + for i in self.default_profiles: + skip_delete_id.append(self.default_profiles[i]._id) + delete_ids = list(set(delete_ids) - set(delete_ids).intersection(set(skip_delete_id))) + print(delete_ids) + for i in delete_ids: + self.set_equipment_to_profile(profile_id=i) + self.delete_profile(profile_id=delete_ids) + status = True + except Exception as e: + print(e) + status = False + return status + + # Delete any profile with the given name + def delete_profile_by_name(self, profile_name=None): + self.sdk_client.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 5000 + }""" + all_profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, + pagination_context=pagination_context) + for i in all_profiles._items: + if i._name == profile_name: + counts = self.profile_client.get_counts_of_equipment_that_use_profiles([i._id])[0] + if counts._value2: + self.set_equipment_to_profile(profile_id=i._id) + self.delete_profile(profile_id=[i._id]) + else: + self.delete_profile(profile_id=[i._id]) + + # This method will set all the equipments to default equipment_ap profile, those having the profile_id passed in + # argument + def set_equipment_to_profile(self, profile_id=None): + self.sdk_client.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 5000 + }""" + equipment_data = self.sdk_client.equipment_client. \ + get_equipment_by_customer_id(customer_id=2, + pagination_context=pagination_context) + self.get_default_profiles() + for i in equipment_data._items: + if i._profile_id == profile_id: + self.profile_creation_ids['ap'] = self.default_profiles['equipment_ap_2_radios']._id + self.push_profile_old_method(equipment_id=i._id) + time.sleep(2) + + """ + method call: used to create the rf profile and push set the parameters accordingly and update + Library method to create a new rf profile: Now using default profile + """ + + def set_rf_profile(self, profile_data=None, mode=None): + self.sdk_client.refresh_instance() + self.get_default_profiles() + if mode == "wifi5": + default_profile = self.default_profiles['rf'] + default_profile._name = profile_data["name"] + + default_profile._details["rfConfigMap"]["is2dot4GHz"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHz"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHzL"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHzU"]["rf"] = profile_data["name"] + for i in default_profile._details["rfConfigMap"]: + for j in profile_data: + if i == j: + for k in default_profile._details["rfConfigMap"][i]: + for l in profile_data[j]: + if l == k: + default_profile._details["rfConfigMap"][i][l] = profile_data[j][l] + profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['rf'].append(profile._id) + return profile + if mode == "wifi6": + default_profile = self.default_profiles['rf'] + default_profile._name = profile_data["name"] + default_profile._details["rfConfigMap"]["is2dot4GHz"]["activeScanSettings"]["enabled"] = False + default_profile._details["rfConfigMap"]["is2dot4GHz"]["radioMode"] = 'modeAX' + default_profile._details["rfConfigMap"]["is5GHz"]["radioMode"] = 'modeAX' + default_profile._details["rfConfigMap"]["is5GHzL"]["radioMode"] = 'modeAX' + default_profile._details["rfConfigMap"]["is5GHzU"]["radioMode"] = 'modeAX' + default_profile._details["rfConfigMap"]["is2dot4GHz"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHz"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHzL"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHzU"]["rf"] = profile_data["name"] + default_profile._name = profile_data["name"] + for i in default_profile._details["rfConfigMap"]: + for j in profile_data: + if i == j: + for k in default_profile._details["rfConfigMap"][i]: + for l in profile_data[j]: + if l == k: + default_profile._details["rfConfigMap"][i][l] = profile_data[j][l] + profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['rf'].append(profile._id) + return profile + + """ + method call: used to create a ssid profile with the given parameters + """ + + # Open + def create_open_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'open' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + self.profile_name_with_id[profile_data["ssid_name"]] = profile_id + except Exception as e: + print(e) + profile = "error" + + return profile + + # wpa personal + def create_wpa_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + self.get_default_profiles() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpaPSK' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa2 personal + def create_wpa2_personal_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpa2OnlyPSK' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 personal + def create_wpa3_personal_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpa3OnlySAE' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 personal mixed mode + def create_wpa3_personal_mixed_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpa3MixedSAE' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa wpa2 personal mixed mode + def create_wpa_wpa2_personal_mixed_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpa2PSK' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa enterprise done + def create_wpa_enterprise_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpaRadius' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa wpa2 enterprise mixed mode done + def create_wpa_wpa2_enterprise_mixed_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpa2Radius' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa2 enterprise mode ssid profile + def create_wpa2_enterprise_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpa2OnlyRadius' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 enterprise mode + def create_wpa3_enterprise_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpa3OnlyEAP' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 enterprise mixed mode done + def create_wpa3_enterprise_mixed_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpa3MixedEAP' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 enterprise mixed mode done + def create_wep_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wep' + default_profile._details['wepConfig'] = {} + default_profile._details['wepConfig']["model_type"] = "WepConfiguration" + default_profile._details['wepConfig']["wepAuthType"] = "open" + default_profile._details['wepConfig']["primaryTxKeyId"] = profile_data["default_key_id"] + default_profile._details['wepConfig']["wepKeys"] = [{'model_type': 'WepKey', + 'txKey': profile_data["wep_key"], + 'txKeyConverted': None, + 'txKeyType': 'wep64'}, + {'model_type': 'WepKey', + 'txKey': profile_data["wep_key"], + 'txKeyConverted': None, + 'txKeyType': 'wep64'}, + {'model_type': 'WepKey', + 'txKey': profile_data["wep_key"], + 'txKeyConverted': None, + 'txKeyType': 'wep64'}, + {'model_type': 'WepKey', + 'txKey': profile_data["wep_key"], + 'txKeyConverted': None, + 'txKeyType': 'wep64'}] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + def __get_boolean(self, flag): + return 'true' if flag in ["Enabled", "True"] else 'false' + + # wpa eap general method + def __create_wpa_eap_passpoint_ssid_profiles(self, profile_data=None, secure_mode=None): + try: + if profile_data is None or secure_mode is None: + return False + default_profile = self.default_profiles["ssid"] + default_profile._details["appliedRadios"] = profile_data["appliedRadios"] + default_profile._name = profile_data["profile_name"] + default_profile._details["vlanId"] = profile_data["vlan"] + default_profile._details["ssid"] = profile_data["ssid_name"] + default_profile._details["forwardMode"] = profile_data["mode"] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details["secureMode"] = secure_mode + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["ssid"].append(profile_id) + self.profile_ids.append(profile_id) + self.profile_name_with_id[profile_data["ssid_name"]] = profile_id + except Exception as e: + print(e) + profile = False + return profile + + # wpa eap passpoint + def create_wpa_eap_passpoint_ssid_profile(self, profile_data=None): + if profile_data is None: + return False + return self.__create_wpa_eap_passpoint_ssid_profiles(profile_data, "wpaEAP") + + # wpa2 eap passpoint + def create_wpa2_eap_passpoint_ssid_profile(self, profile_data=None): + if profile_data is None: + return False + return self.__create_wpa_eap_passpoint_ssid_profiles(profile_data, "wpa2EAP") + + # wpa2only eap passpoint + def create_wpa2_only_eap_passpoint_ssid_profile(self, profile_data=None): + if profile_data is None: + return False + return self.__create_wpa_eap_passpoint_ssid_profiles(profile_data, "wpa2OnlyEAP") + + # passpoint osu id provider profile + def create_passpoint_osu_id_provider_profile(self, profile_data=None): + try: + if profile_data is None: + return False + default_profile = dict() + default_profile["model_type"] = "Profile" + default_profile["customerId"] = self.sdk_client.customer_id + default_profile["profileType"] = "passpoint_osu_id_provider" + default_profile["name"] = profile_data["profile_name"] + details = dict() + details["model_type"] = "PasspointOsuProviderProfile" + mcc_mnc = dict() + if (profile_data["mcc"] and profile_data["mnc"]) is not None: + mcc_mnc = {"mcc": profile_data["mcc"], "mnc": profile_data["mnc"]} + if profile_data["network"] is not None: + mcc_mnc["network"] = profile_data["network"] + if mcc_mnc: + details["mccMncList"] = [mcc_mnc] + if (profile_data["mcc"] and profile_data["mnc"]) is not None: + details["mccMncList"] = [{"mcc": profile_data["mcc"], "mnc": profile_data["mnc"]}] + if profile_data["osu_nai_standalone"] is not None: + details["osuNaiStandalone"] = profile_data["osu_nai_standalone"] + if profile_data["osu_nai_shared"] is not None: + details["osuNaiShared"] = profile_data["osu_nai_shared"] + if profile_data["nai_realms"] is not None: + details["naiRealmList"] = [{"naiRealms": [profile_data["nai_realms"]["domain"]], + "encoding": profile_data["nai_realms"]["encoding"], + "eapMap": profile_data["nai_realms"]["eap_map"] + }] + details["roamingOi"] = profile_data["roaming_oi"] + default_profile['details'] = details + default_profile['childProfileIds'] = [] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["passpoint_osu_id_provider"].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # passpoint operator profile + def create_passpoint_operator_profile(self, profile_data=None): + try: + if profile_data is None: + return False + default_profile = dict() + default_profile["model_type"] = "Profile" + default_profile["customerId"] = self.sdk_client.customer_id + default_profile["profileType"] = "passpoint_operator" + default_profile["name"] = profile_data["profile_name"] + + default_profile["details"] = dict() + default_profile["details"]["model_type"] = "PasspointOperatorProfile" + default_profile["details"]["serverOnlyAuthenticatedL2EncryptionNetwork"] = \ + self.__get_boolean(profile_data["osen"]) + operator_names = [] + operators = profile_data["operator_names"] + for operator in profile_data["operator_names"]: + operator_temp = dict() + for key in operator.keys(): + if key == "name": + operator_temp["dupleName"] = operator["name"] + else: + operator_temp[key] = operator[key] + operator_names.append(operator_temp) + default_profile["details"]["operatorFriendlyName"] = operator_names + default_profile["details"]["domainNameList"] = profile_data["domain_name_list"] + default_profile["childProfileIds"] = [] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["passpoint_operator"].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + profile = False + return profile + + # passpoint venue profile + def create_passpoint_venue_profile(self, profile_data=None): + try: + if profile_data is None: + return False + default_profile = dict() + default_profile["model_type"] = "Profile" + default_profile["customerId"] = self.sdk_client.customer_id + default_profile["profileType"] = "passpoint_venue" + default_profile["name"] = profile_data["profile_name"] + default_profile["details"] = dict() + default_profile["details"]["model_type"] = "PasspointVenueProfile" + venue_names = [] + for venue in profile_data["venue_names"]: + venue_temp = dict() + for key in venue.keys(): + if key == "name": + venue_temp["dupleName"] = venue["name"] + if key == "url": + venue_temp["venueUrl"] = venue["url"] + venue_names.append(venue_temp) + default_profile["details"]["venueNameSet"] = venue_names + allowed_venue_groups = {"Unspecified": 0, "Assembly": 1, "Business": 2, "Educational": 3, + "Factory and Industrial": 4, "Institutional": 5, "Mercantile": 6, "Residential": 7} + allowed_venue_types = {"Unspecified Assembly": 0, "Areana": 1, "Stadium": 2, "Passenger Terminal": 3, + "Amphitheatre": 4, "Amusement Park": 5, "Place of Worship": 6, + "Convention Center": 7, + "Library": 8, "Museum": 9, "Restaurant": 10, "Theatre": 11, "Bar": 12, + "Coffee Shop": 13, + "Zoo or Aquarium": 14, "Emergency Coordination Center": 15, + "Unspecified Business": 0, "Doctor or Dentist office": 1, "Bank": 2, + "Fire Station": 3, + "Police Station": 4, "Post Office": 5, "Professional Office": 6, + "Research and Development Facility": 7, "Attorney Office": 8, + "Unspecified Educational": 0, "School, Primary": 1, "School, Secondary": 2, + "University or College": 3, "Unspecified Factory and Industrial": 0, "Factory": 1, + "Unspecified Institutional": 0, "Hospital": 1, "Long-Term Care Facility": 2, + "Alcohol and Drug Re-habilitation Center": 3, "Group Home": 4, "Prison or Jail": 5, + "Unspecified Mercantile": 0, "Retail Store": 1, "Grocery Market": 2, + "Automotive Service Station": 3, "Shopping Mall": 4, "Gas Station": 5, + "Unspecified Residential": 0, "Pivate Residence": 1, "Hotel or Model": 2, + "Dormitory": 3, "Boarding House": 4} + default_profile["details"]["venueTypeAssignment"] = {"venueGroupId": + allowed_venue_groups[ + profile_data["venue_type"]["group"]], + "venueTypeId": + allowed_venue_types[ + profile_data["venue_type"]["type"]]} + default_profile["childProfileIds"] = [] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["passpoint_venue"].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # passpoint profile + def create_passpoint_profile(self, profile_data=None): + try: + if profile_data is None: + return False + default_profile = dict() + default_profile["model_type"] = "Profile" + default_profile["customerId"] = self.sdk_client.customer_id + default_profile["profileType"] = "passpoint" + default_profile["name"] = profile_data["profile_name"] + + default_profile["details"] = dict() + default_profile["details"]["model_type"] = "PasspointProfile" + default_profile["details"]["enableInterworkingAndHs20"] = self.__get_boolean( + profile_data["interworking_hs2dot0"]) + if profile_data["hessid"] is not None: + default_profile["details"]["hessid"] = dict() + default_profile["details"]["hessid"]["addressAsString"] = profile_data["hessid"] + default_profile["details"]["passpointAccessNetworkType"] = \ + profile_data["access_network"]["Access Network Type"].replace(' ', '_').lower() + default_profile["details"]["passpointNetworkAuthenticationType"] = \ + profile_data["access_network"]["Authentication Type"].replace('&', 'and').replace(' ', '_').lower() + default_profile["details"]["emergencyServicesReachable"] = self.__get_boolean( + profile_data["access_network"][ + "Emergency Services Reachable"]) + default_profile["details"]["unauthenticatedEmergencyServiceAccessible"] = self.__get_boolean( + profile_data["access_network"][ + "Unauthenticated Emergency Service"]) + default_profile["details"]["internetConnectivity"] = self.__get_boolean(profile_data["ip_connectivity"][ + "Internet Connectivity"]) + capability_set = [] + for cap in profile_data["ip_connectivity"]["Connection Capability"]: + capability_info = dict() + capability_info["connectionCapabilitiesPortNumber"] = cap["port"] + capability_info["connectionCapabilitiesIpProtocol"] = cap["protocol"] + capability_info["connectionCapabilitiesStatus"] = cap["status"] + capability_set.append(capability_info) + default_profile["details"]["connectionCapabilitySet"] = capability_set + default_profile["details"]["ipAddressTypeAvailability"] = profile_data["ip_connectivity"]["IP Address Type"] + allowed_gas_address_behavior = {"P2P Spec Workaround From Request": "p2pSpecWorkaroundFromRequest", + "forceNonCompliantBehaviourFromRequest": "forceNonCompliantBehaviourFromRequest", + "IEEE 80211 Standard Compliant Only": "ieee80211StandardCompliantOnly"} + default_profile["details"]["gasAddr3Behaviour"] = allowed_gas_address_behavior[ + profile_data["ip_connectivity"] + ["GAS Address 3 Behaviour"]] + default_profile["details"]["anqpDomainId"] = profile_data["ip_connectivity"]["ANQP Domain ID"] + default_profile["details"]["disableDownstreamGroupAddressedForwarding"] = self.__get_boolean( + profile_data["ip_connectivity"][ + "Disable DGAF"]) + default_profile["details"]["associatedAccessSsidProfileIds"] = profile_data["allowed_ssids"] + default_profile["details"]["passpointOperatorProfileId"] = self.profile_creation_ids["passpoint_operator"][0] + default_profile["details"]["passpointVenueProfileId"] = self.profile_creation_ids["passpoint_venue"][0] + default_profile["details"]["passpointOsuProviderProfileIds"] = self.profile_creation_ids[ + "passpoint_osu_id_provider"] + default_profile["details"]["accessNetworkType"] = \ + profile_data["access_network"]["Access Network Type"].replace(' ', '_').lower() + # osuSsidProfileId is needed for R2 + default_profile["details"]["networkAuthenticationType"] = \ + profile_data["access_network"]["Authentication Type"].replace('&', 'and').replace(' ', '_').lower() + default_profile["childProfileIds"] = self.profile_creation_ids["passpoint_venue"] + \ + self.profile_creation_ids["passpoint_operator"] + \ + self.profile_creation_ids["passpoint_osu_id_provider"] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["passpoint"].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + """ + method call: used to create a ap profile that contains the given ssid profiles + """ + + def set_ap_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + if profile_data is None: + return False + default_profile = self.default_profiles['equipment_ap_2_radios'] + default_profile._child_profile_ids = [] + for i in self.profile_creation_ids: + if i not in ["ap", "passpoint_osu_id_provider", "passpoint_operator", "passpoint_venue", "passpoint", + "radius"]: + for j in self.profile_creation_ids[i]: + default_profile._child_profile_ids.append(j) + + default_profile._name = profile_data['profile_name'] + # print(default_profile) + default_profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['ap'] = default_profile._id + self.profile_ids.append(default_profile._id) + return default_profile + + """ + method call: used to create a ap profile that contains the given ssid profiles + """ + + def set_ap_profile_custom(self, profile_data=None): + self.sdk_client.refresh_instance() + if profile_data is None: + return False + default_profile = self.default_profiles['equipment_ap_2_radios'] + default_profile._child_profile_ids = [] + for i in self.profile_creation_ids: + if i not in ["ap", "passpoint_osu_id_provider", "passpoint_operator", "passpoint_venue", "passpoint", + "radius", "ssid"]: + for j in self.profile_creation_ids[i]: + default_profile._child_profile_ids.append(j) + for ssid in profile_data["ssid_names"]: + default_profile._child_profile_ids.append(self.profile_name_with_id[ssid]) + default_profile._name = profile_data['profile_name'] + default_profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['ap'] = default_profile._id + self.profile_ids.append(default_profile._id) + return default_profile + + """ + method call: used to create a ap profile that contains the specific ssid profiles + """ + + def update_ap_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + if profile_data is None: + print("profile info is None, Please specify the profile info that you want to update") + return False + + child_profiles_to_apply = [] + try: + for ssid in profile_data["ssid_names"]: + child_profiles_to_apply.append(self.profile_name_with_id[ssid]) + default_profile = self.get_profile_by_name(profile_name=profile_data["profile_name"]) + for i in self.profile_creation_ids: + if i not in ["ap", "passpoint_osu_id_provider", "passpoint_operator", "passpoint_venue", "passpoint", + "radius", "ssid"]: + for j in self.profile_creation_ids[i]: + child_profiles_to_apply.append(j) + default_profile._child_profile_ids = child_profiles_to_apply + default_profile = self.profile_client.update_profile(default_profile) + return True + except Exception as e: + print(e) + return False + + """ + method call: used to create a radius profile with the settings given + """ + + def create_radius_profile(self, radius_info=None, radius_accounting_info=None): + self.sdk_client.refresh_instance() + default_profile = self.default_profiles['radius'] + default_profile._name = radius_info['name'] + default_profile._details['primaryRadiusAuthServer'] = {} + default_profile._details['primaryRadiusAuthServer']['ipAddress'] = radius_info['ip'] + default_profile._details['primaryRadiusAuthServer']['port'] = radius_info['port'] + default_profile._details['primaryRadiusAuthServer']['secret'] = radius_info['secret'] + if radius_accounting_info is not None: + default_profile._details["primaryRadiusAccountingServer"] = {} + default_profile._details["primaryRadiusAccountingServer"]["ipAddress"] = radius_accounting_info["ip"] + default_profile._details["primaryRadiusAccountingServer"]["port"] = radius_accounting_info["port"] + default_profile._details["primaryRadiusAccountingServer"]["secret"] = radius_accounting_info["secret"] + default_profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['radius'] = [default_profile._id] + self.profile_ids.append(default_profile._id) + return default_profile + + """ + method to push the profile to the given equipment + """ + + # Under a Bug, depreciated until resolved, should be used primarily + def push_profile(self, equipment_id=None): + self.sdk_client.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 100 + }""" + default_equipment_data = self.sdk_client.equipment_client.get_equipment_by_id(equipment_id=11, async_req=False) + # default_equipment_data._details[] = self.profile_creation_ids['ap'] + # print(default_equipment_data) + # print(self.sdk_client.equipment_client.update_equipment(body=default_equipment_data, async_req=True)) + + """ + method to verify if the expected ssid's are loaded in the ap vif config + """ + + def update_ssid_name(self, profile_name=None, new_profile_name=None): + self.sdk_client.refresh_instance() + if profile_name is None: + print("profile name is None, Please specify the ssid profile name that you want to modify") + return False + if new_profile_name is None: + print("Please specify the new name for ssid profile that you want to make to") + return False + + try: + profile = self.get_profile_by_name(profile_name=profile_name) + profile._details['ssid'] = new_profile_name + self.profile_client.update_profile(profile) + return True + except Exception as e: + return False + + def update_ssid_profile(self, profile_info=None): + self.sdk_client.refresh_instance() + if profile_info is None: + print("profile info is None, Please specify the profile info that you want to update") + return False + + try: + profile = self.get_profile_by_name(profile_name=profile_info["ssid_profile_name"]) + profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + profile._child_profile_ids = self.profile_creation_ids["radius"] + self.profile_creation_ids["passpoint"] + if "radius_configuration" in profile_info.keys(): + if "radius_acounting_service_interval" in profile_info["radius_configuration"].keys(): + profile._details["radiusAcountingServiceInterval"] = profile_info["radius_configuration"]["radius_acounting_service_interval"] + if "user_defined_nas_id" in profile_info["radius_configuration"].keys(): + profile._details["radiusClientConfiguration"]["userDefinedNasId"] = profile_info["radius_configuration"]["user_defined_nas_id"] + if "operator_id" in profile_info["radius_configuration"].keys(): + profile._details["radiusClientConfiguration"]["operatorId"] = profile_info["radius_configuration"]["operator_id"] + self.profile_client.update_profile(profile) + return True + except Exception as e: + print(e) + return False + + def clear_ssid_profile(self, profile_name=None): + if profile_name is None: + print("profile name is None, Please specify the ssid profile name that you want to update") + return False + + try: + profile = self.get_profile_by_name(profile_name=profile_name) + profile._details["radiusServiceId"] = None + profile._child_profile_ids = [] + self.profile_client.update_profile(profile) + return True + except Exception as e: + print(e) + return False + + """ + method to delete a profile by its id + """ + + def delete_profile(self, profile_id=None): + self.sdk_client.refresh_instance() + for i in profile_id: + self.profile_client.delete_profile(profile_id=i) + + # Need to be depreciated by using push_profile method + def push_profile_old_method(self, equipment_id=None): + self.sdk_client.refresh_instance() + if equipment_id is None: + return 0 + url = self.sdk_client.configuration.host + "/portal/equipment?equipmentId=" + str(equipment_id) + payload = {} + headers = self.sdk_client.configuration.api_key_prefix + response = requests.request("GET", url, headers=headers, data=payload) + equipment_info = response.json() + equipment_info['profileId'] = self.profile_creation_ids['ap'] + url = self.sdk_client.configuration.host + "/portal/equipment" + headers = { + 'Content-Type': 'application/json', + 'Authorization': self.sdk_client.configuration.api_key_prefix['Authorization'] + } + + response = requests.request("PUT", url, headers=headers, data=json.dumps(equipment_info)) + return response + + +""" + + FirmwareUtility class + uses JfrogUtility base class + sdk_client [ controller_tests instance ] + controller_data [ sdk_base_url ] needed only if sdk_instance is not passed + customer_id [ 2 ] needed only if sdk_instance is not passed +""" + + +class FirmwareUtility: + + def __init__(self, + sdk_client=None, + jfrog_credentials=None, + controller_data=None, + customer_id=2, + model=None, + version_url=None): + # super().__init__(credentials=jfrog_credentials) + if sdk_client is None: + sdk_client = Controller(controller_data=controller_data, customer_id=customer_id) + self.sdk_client = sdk_client + self.sdk_client.refresh_instance() + self.firmware_client = FirmwareManagementApi(api_client=sdk_client.api_client) + # self.jfrog_client = JFrogUtility(credentials=jfrog_credentials) + self.equipment_gateway_client = EquipmentGatewayApi(api_client=sdk_client.api_client) + self.model = model + self.fw_version = version_url + + def get_fw_version(self): + fw_version = self.fw_version.split("/")[-1] + return fw_version + + def upload_fw_on_cloud(self, force_upload=False): + self.sdk_client.refresh_instance() + fw_version = self.fw_version.split("/")[-1] + print("Upload fw version :", self.fw_version) + fw_id = self.is_fw_available(fw_version=fw_version) + if fw_id and not force_upload: + print("Skipping upload, Firmware Already Available", "Force Upload :", force_upload) + # Don't Upload the fw + return fw_id + else: + if fw_id and force_upload: + print("Firmware Version Already Available, Deleting and Uploading Again", + " Force Upload :", force_upload) + self.firmware_client.delete_firmware_version(firmware_version_id=fw_id) + print("Deleted Firmware Image from cloud, uploading again") + time.sleep(2) + # if force_upload is true and latest image available, then delete the image + firmware_data = { + "id": 0, + "equipmentType": "AP", + "modelId": str(self.model).upper(), + "versionName": fw_version, + "description": fw_version + " FW VERSION", + "filename": self.fw_version, + } + firmware_id = self.firmware_client.create_firmware_version(body=firmware_data) + print("Uploaded the Image: ", fw_version) + return firmware_id._id + + def upgrade_fw(self, equipment_id=None, force_upgrade=False, force_upload=False): + self.sdk_client.refresh_instance() + if equipment_id is None: + print("No Equipment Id Given") + exit() + if (force_upgrade is True) or (self.should_upgrade_ap_fw(equipment_id=equipment_id)): + firmware_id = self.upload_fw_on_cloud(force_upload=force_upload) + time.sleep(5) + try: + obj = self.equipment_gateway_client.request_firmware_update(equipment_id=equipment_id, + firmware_version_id=firmware_id) + print("Request firmware upgrade Success! waiting for 300 sec") + time.sleep(400) + except Exception as e: + print(e) + obj = False + return obj + # Write the upgrade fw logic here + + def should_upgrade_ap_fw(self, equipment_id=None): + self.sdk_client.refresh_instance() + current_fw = self.sdk_client.get_ap_firmware_old_method(equipment_id=equipment_id) + latest_fw = self.get_fw_version() + print(self.model, current_fw, latest_fw) + if current_fw == latest_fw: + return False + else: + return True + + def is_fw_available(self, fw_version=None): + self.sdk_client.refresh_instance() + if fw_version is None: + exit() + try: + firmware_version = self.firmware_client.get_firmware_version_by_name( + firmware_version_name=fw_version) + firmware_version = firmware_version._id + print("Firmware ID: ", firmware_version) + except Exception as e: + print(e) + firmware_version = False + print("firmware not available: ", firmware_version) + return firmware_version + + +# This is for Unit tests on Controller Library +if __name__ == '__main__': + controller = { + 'url': "https://wlan-portal-svc-nola-ext-04.cicd.lab.wlan.tip.build", # API base url for the controller + 'username': 'support@example.com', + 'password': 'support', + 'version': "1.1.0-SNAPSHOT", + 'commit_date': "2021-04-27" + } + api = Controller(controller_data=controller) + profile = ProfileUtility(sdk_client=api) + profile_data = { + "name": "test-rf-wifi-6", + "is2dot4GHz": {}, + "is5GHz": {"channelBandwidth": "is20MHz"}, + "is5GHzL": {"channelBandwidth": "is20MHz"}, + "is5GHzU": {"channelBandwidth": "is20MHz"} + } + profile.set_rf_profile(profile_data=profile_data, mode="wifi6") + print(profile.default_profiles["rf"]) + # profile.cleanup_profiles() + + # profile.get_default_profiles() + # profile_data = { + # "profile_name": "ssid_wep_2g", + # "ssid_name": "ssid_wep_2g", + # "appliedRadios": ["is2dot4GHz"], + # "default_key_id" : 1, + # "wep_key" : 1234567890, + # "vlan": 1, + # "mode": "BRIDGE" + # } + # profile.create_wep_ssid_profile(profile_data=profile_data) + # print(profile.get_profile_by_name(profile_name="wpa_wpa2_eap")) + # profile.get_default_profiles() + api.disconnect_Controller() diff --git a/libs/tip_2x/__init__.py b/libs/tip_2x/__init__.py new file mode 100644 index 000000000..ef60ae1ad --- /dev/null +++ b/libs/tip_2x/__init__.py @@ -0,0 +1,120 @@ +""" + Telecom Infra Project OpenWifi 2.X (Ucentral libraries for Test Automation) + + +""" +import importlib + +logging = importlib.import_module("logging") + +ap_lib = importlib.import_module("ap_lib") +controller = importlib.import_module("controller") + +""" + Custom Class Imports needed for OpenWifi 2.X +""" + +ConfigureController = controller.ConfigureController +Controller = controller.Controller +FMSUtils = controller.FMSUtils +ProvUtils = controller.ProvUtils +UProfileUtility = controller.UProfileUtility + +APLIBS = ap_lib.APLIBS + +""" + Setting default Logging +""" + + +class tip_2x: + """ + Standard OpenWifi wlan-testing specific variables + + """ + controller_data = {} + device_under_tests_info = [] + + """ + OpenWifi 2.x Specific Variables that will be only scoped in tip_2x Library + + """ + ow_sec_url = "" + ow_sec_login_username = "" + ow_sec_login_password = "" + + controller_library_object = object() + dut_library_object = object() + + def __init__(self, controller_data=None, + device_under_tests_info=[], logging_level=logging.INFO): + logging.basicConfig(format='%(asctime)s - %(message)s', level=logging_level) + if controller_data is None: + controller_data = {} + self.controller_data = controller_data + self.device_under_tests_info = device_under_tests_info + self.setup_metadata() + + """ + Controller and Access Point specific metadata that is related to OpenWifi 2.x + """ + + def setup_metadata(self): + logging.info("setting up the Controller metadata for tip_2x Library: " + str(self.controller_data)) + logging.info("setting up the DUT metadata for tip_2x Library: " + str(self.device_under_tests_info)) + logging.info("Number of DUT's configured: " + str(len(self.device_under_tests_info))) + self.ow_sec_url = self.controller_data["url"] + self.ow_sec_login_username = self.controller_data["username"] + self.ow_sec_login_password = self.controller_data["password"] + + def setup_objects(self): + self.controller_library_object = Controller() + self.dut_library_object = APLIBS() + + """ Standard getter methods. Should be available for all type of libraries. Commonly used by wlan-testing""" + + def get_dut_library_object(self): + return self.dut_library_object + + def get_controller_library_object(self): + return self.controller_library_object + + def get_controller_data(self): + return self.controller_data + + def get_device_under_tests_info(self): + return self.device_under_tests_info + + def get_number_of_dut(self): + return len(self.device_under_tests_info) + + def get_dut_logs(self, dut_idx=0): + return self.dut_library_object.get_logs(idx=0) + + def get_controller_logs(self): + return "" + + +if __name__ == '__main__': + basic_1 = { + "target": "tip_2x", + "controller": { + "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", + "username": "tip@ucentral.com", + "password": "OpenWifi%123" + }, + "device_under_tests": [{ + "model": "wallys_dr40x9", + "mode": "wifi5", + "serial": "c44bd1005b30", + "jumphost": True, + "ip": "10.28.3.100", + "username": "lanforge", + "password": "pumpkin77", + "port": 22, + "serial_tty": "/dev/ttyAP8", + "version": "next-latest" + }], + "traffic_generator": {} + } + var = tip_2x(controller_data=basic_1["controller"], device_under_tests_info=basic_1["device_under_tests"]) diff --git a/libs/tip_2x/ap_lib.py b/libs/tip_2x/ap_lib.py new file mode 100644 index 000000000..952df0209 --- /dev/null +++ b/libs/tip_2x/ap_lib.py @@ -0,0 +1,11 @@ +import importlib +import sys + +sys.path.append('..') +SetupLibrary = importlib.import_module("SetupLibrary") + + +class APLIBS: + + def __init__(self): + pass diff --git a/libs/tip_2x/controller.py b/libs/tip_2x/controller.py new file mode 100644 index 000000000..9292d2021 --- /dev/null +++ b/libs/tip_2x/controller.py @@ -0,0 +1,1461 @@ +""" + + Base Library for Ucentral + +""" +import datetime +import json +import sys +import time +from operator import itemgetter +from urllib.parse import urlparse + +import allure +import pytest +import requests + + +# logging.basicConfig(level=logging.DEBUG) +# from http.client import HTTPConnection +# +# HTTPConnection.debuglevel = 1 +# requests.logging.getLogger() + + +class ConfigureController: + + def __init__(self, controller_data): + self.username = controller_data["username"] + self.password = controller_data["password"] + self.host = urlparse(controller_data["url"]) + print(self.host) + self.access_token = "" + # self.session = requests.Session() + self.login_resp = self.login() + self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint() + if self.gw_host == "" or self.fms_host == "" or self.prov_host == "": + time.sleep(60) + self.gw_host, self.fms_host, self.prov_host = self.get_gw_endpoint() + if self.gw_host == "" or self.fms_host == "" or self.prov_host == "": + self.logout() + print(self.gw_host, self.fms_host, self.prov_host) + pytest.exit("All Endpoints not available in Controller Service") + sys.exit() + + def build_uri_sec(self, path): + new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path) + print(new_uri) + return new_uri + + def build_url_fms(self, path): + new_uri = 'https://%s:%d/api/v1/%s' % (self.fms_host.hostname, self.fms_host.port, path) + print(new_uri) + return new_uri + + def build_uri(self, path): + + new_uri = 'https://%s:%d/api/v1/%s' % (self.gw_host.hostname, self.gw_host.port, path) + print(new_uri) + return new_uri + + def build_url_prov(self, path): + new_uri = 'https://%s:%d/api/v1/%s' % (self.prov_host.hostname, self.prov_host.port, path) + print(new_uri) + return new_uri + + def request(self, service, command, method, params, payload): + if service == "sec": + uri = self.build_uri_sec(command) + elif service == "gw": + uri = self.build_uri(command) + elif service == "fms": + uri = self.build_url_fms(command) + elif service == "prov": + uri = self.build_url_prov(command) + else: + raise NameError("Invalid service code for request.") + + print(uri) + params = params + if method == "GET": + resp = requests.get(uri, headers=self.make_headers(), params=params, verify=False, timeout=100) + elif method == "POST": + print(uri, payload, params) + resp = requests.post(uri, params=params, data=payload, headers=self.make_headers(), verify=False, + timeout=100) + elif method == "PUT": + resp = requests.put(uri, params=params, data=payload, verify=False, timeout=100) + elif method == "DELETE": + resp = requests.delete(uri, headers=self.make_headers(), params=params, verify=False, timeout=100) + + self.check_response(method, resp, self.make_headers(), payload, uri) + + return resp + + def login(self): + uri = self.build_uri_sec("oauth2") + # self.session.mount(uri, HTTPAdapter(max_retries=15)) + payload = json.dumps({"userId": self.username, "password": self.password}) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("POST", resp, "", payload, uri) + token = resp + self.access_token = token.json()['access_token'] + resp.close() + if resp.status_code != 200: + pytest.exit(str(resp.json())) + # self.session.headers.update({'Authorization': self.access_token}) + return token + + def get_gw_endpoint(self): + uri = self.build_uri_sec("systemEndpoints") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + services = resp.json() + gw_host = "" + fms_host = "" + for service in services['endpoints']: + if service['type'] == "owgw": + gw_host = urlparse(service["uri"]) + if service['type'] == "owfms": + fms_host = urlparse(service["uri"]) + if service['type'] == "owprov": + prov_host = urlparse(service["uri"]) + return gw_host, fms_host, prov_host + + def logout(self): + uri = self.build_uri_sec('oauth2/%s' % self.access_token) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("DELETE", resp, self.make_headers(), "", uri) + r = resp + resp.close() + return r + + def make_headers(self): + headers = {'Authorization': 'Bearer %s' % self.access_token, + "Connection": "keep-alive", + "Content-Type": "application/json", + "Keep-Alive": "timeout=10, max=1000" + } + return headers + + def check_response(self, cmd, response, headers, data_str, url): + try: + print("Command Response: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "Response Code: " + str(response.status_code) + "\n" + + "Response Body: " + str(response.json())) + allure.attach(name="Command Response: ", body="Command Response: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "Response Code: " + str(response.status_code) + "\n" + + "Response Body: " + str(response.json())) + except: + pass + return True + + +class Controller(ConfigureController): + + def __init__(self, controller_data=None): + super().__init__(controller_data) + + def get_devices(self): + uri = self.build_uri("devices") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_device_by_serial_number(self, serial_number): + uri = self.build_uri("device/" + serial_number) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_sdk_version(self): + uri = self.build_uri("system?command=info") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + version = resp.json() + return version['version'] + + def get_system_gw(self): + uri = self.build_uri("system?command=info") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_system_fms(self): + uri = self.build_url_fms("system?command=info") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_system_prov(self): + uri = self.build_url_prov("system?command=info") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_device_uuid(self, serial_number): + device_info = self.get_device_by_serial_number(serial_number=serial_number) + device_info = device_info.json() + return device_info["UUID"] + + def add_device_to_gw(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number) + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def delete_device_from_gw(self, device_name): + uri = self.build_uri("device/" + device_name) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("DELETE", resp, self.make_headers(), "", uri) + return resp + + def get_commands(self): + uri = self.build_uri("commands") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_device_logs(self, serial_number): + uri = self.build_uri("device/" + serial_number + "/logs") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_device_health_checks(self, serial_number): + uri = self.build_uri("device/" + serial_number + "/healthchecks") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_device_capabilities(self, serial_number): + uri = self.build_uri("device/" + serial_number + "/capabilities") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_device_statistics(self, serial_number): + uri = self.build_uri("device/" + serial_number + "/statistics") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def get_device_status(self, serial_number): + uri = self.build_uri("device/" + serial_number + "/status") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def ap_reboot(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/reboot") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def ap_factory_reset(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/factory") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def ping_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/ping") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def led_blink_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/leds") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def trace_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/trace") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def wifi_scan_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/wifiscan") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def request_specific_msg_from_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/request") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def event_queue(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/eventqueue") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def telemetry(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/telemetry") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def get_rtty_params(self, serial_number): + uri = self.build_uri("device/" + serial_number + "/rtty") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + return resp + + def edit_device_on_gw(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number) + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.make_headers())) + resp = requests.put(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + + self.check_response("PUT", resp, self.make_headers(), payload, uri) + return resp + + +class FMSUtils: + + def __init__(self, sdk_client=None, controller_data=None): + if sdk_client is None: + self.sdk_client = Controller(controller_data=controller_data) + self.sdk_client = sdk_client + + def upgrade_firmware(self, serial="", url=""): + payload = "{ \"serialNumber\" : " + "\"" + \ + serial + "\"" + " , \"uri\" : " \ + + "\"" + url \ + + "\"" + ", \"when\" : 0" \ + + " }" + command = "device/" + serial + "/upgrade" + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(command) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(command) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + response = self.sdk_client.request(service="gw", command="device/" + serial + "/upgrade", + method="POST", params="serialNumber=" + serial, + payload="{ \"serialNumber\" : " + "\"" + serial + "\"" + + " , \"uri\" : " + "\"" + url + "\"" + + ", \"when\" : 0" + " }") + + def ap_model_lookup(self, model=""): + devices = self.get_device_set() + model_name = "" + for device in devices['deviceTypes']: + if str(device).__eq__(model): + model_name = device + return model_name + + def get_revisions(self): + response = self.sdk_client.request(service="fms", command="firmwares", method="GET", params="revisionSet=true", + payload="") + if response.status_code == 200: + return response.json() + else: + return {} + + def get_latest_fw(self, model=""): + + device_type = self.ap_model_lookup(model=model) + + response = self.sdk_client.request(service="fms", command="firmwares", method="GET", + params="latestOnly=true&deviceType=" + device_type, + payload="") + if response.status_code == 200: + return response.json() + else: + return {} + + def get_device_set(self): + response = self.sdk_client.request(service="fms", command="firmwares", method="GET", params="deviceSet=true", + payload="") + if response.status_code == 200: + return response.json() + else: + return {} + + def get_firmwares(self, limit="10000", model="", latestonly="", branch="", commit_id="", offset="3000"): + + deviceType = self.ap_model_lookup(model=model) + params = "limit=" + limit + \ + "&deviceType=" + deviceType + \ + "&latestonly=" + latestonly + \ + "offset=" + offset + command = "firmwares" + response = self.sdk_client.request(service="fms", command=command, method="GET", params=params, payload="") + allure.attach(name=command + params, + body=str(response.status_code) + "\n" + str(response.json()), + attachment_type=allure.attachment_type.JSON) + if response.status_code == 200: + data = response.json() + newlist = sorted(data['firmwares'], key=itemgetter('created')) + # for i in newlist: + # print(i['uri']) + # print(i['revision']) + # print(newlist) + + return newlist + # print(data) + + return "error" + + +class ProvUtils: + + def __init__(self, sdk_client=None, controller_data=None): + if sdk_client is None: + self.sdk_client = Controller(controller_data=controller_data) + self.sdk_client = sdk_client + + def get_inventory(self): + uri = self.sdk_client.build_url_prov("inventory") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def get_inventory_by_device(self, device_name): + uri = self.sdk_client.build_url_prov("inventory/" + device_name) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def get_system_prov(self): + uri = self.sdk_client.build_url_prov("system?command=info") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def add_device_to_inventory(self, device_name, payload): + uri = self.sdk_client.build_url_prov("inventory/" + device_name) + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + + resp = requests.post(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + + self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + def delete_device_from_inventory(self, device_name): + uri = self.sdk_client.build_url_prov("inventory/" + device_name) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.delete(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("DELETE", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def get_entity(self): + uri = self.sdk_client.build_url_prov("entity") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def get_entity_by_id(self, entity_id): + uri = self.sdk_client.build_url_prov("entity/" + entity_id) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def add_entity(self, payload): + uri = self.sdk_client.build_url_prov("entity/1") + + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.post(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + + self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + def delete_entity(self, entity_id): + uri = self.sdk_client.build_url_prov("entity/" + entity_id) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.delete(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("DELETE", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def edit_device_from_inventory(self, device_name, payload): + uri = self.sdk_client.build_url_prov("inventory/" + device_name) + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.put(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + + self.sdk_client.check_response("PUT", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + def edit_entity(self, payload, entity_id): + uri = self.sdk_client.build_url_prov("entity/" + entity_id) + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.put(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + + self.sdk_client.check_response("PUT", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + def get_contact(self): + uri = self.sdk_client.build_url_prov("contact") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def get_contact_by_id(self, contact_id): + uri = self.sdk_client.build_url_prov("contact/" + contact_id) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def add_contact(self, payload): + uri = self.sdk_client.build_url_prov("contact/1") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.post(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + + self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + def delete_contact(self, contact_id): + uri = self.sdk_client.build_url_prov("contact/" + contact_id) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.delete(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("DELETE", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def edit_contact(self, payload, contact_id): + uri = self.sdk_client.build_url_prov("contact/" + contact_id) + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.put(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + + self.sdk_client.check_response("PUT", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + def get_location(self): + uri = self.sdk_client.build_url_prov("location") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def get_location_by_id(self, location_id): + uri = self.sdk_client.build_url_prov("location/" + location_id) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def add_location(self, payload): + uri = self.sdk_client.build_url_prov("location/1") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.post(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + + self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + def delete_location(self, location_id): + uri = self.sdk_client.build_url_prov("location/" + location_id) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.delete(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("DELETE", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def edit_location(self, payload, location_id): + uri = self.sdk_client.build_url_prov("location/" + location_id) + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.put(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + + self.sdk_client.check_response("PUT", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + def get_venue(self): + uri = self.sdk_client.build_url_prov("venue") + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def get_venue_by_id(self, venue_id): + uri = self.sdk_client.build_url_prov("venue/" + venue_id) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.get(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("GET", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def add_venue(self, payload): + uri = self.sdk_client.build_url_prov("venue/0") + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.post(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + def delete_venue(self, venue_id): + uri = self.sdk_client.build_url_prov("venue/" + venue_id) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.delete(uri, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("DELETE", resp, self.sdk_client.make_headers(), "", uri) + return resp + + def edit_venue(self, payload, venue_id): + uri = self.sdk_client.build_url_prov("venue/" + venue_id) + payload = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + resp = requests.put(uri, data=payload, headers=self.sdk_client.make_headers(), verify=False, timeout=100) + self.sdk_client.check_response("PUT", resp, self.sdk_client.make_headers(), payload, uri) + return resp + + +class UProfileUtility: + + def __init__(self, sdk_client=None, controller_data=None): + if sdk_client is None: + self.sdk_client = Controller(controller_data=controller_data) + self.sdk_client = sdk_client + self.base_profile_config = { + "uuid": 1, + "radios": [], + "interfaces": [{ + "name": "WAN", + "role": "upstream", + "services": ["ssh", "lldp", "dhcp-snooping"], + "ethernet": [ + { + "select-ports": [ + "WAN*" + ] + } + ], + "ipv4": { + "addressing": "dynamic" + } + }, + { + "name": "LAN", + "role": "downstream", + "services": ["ssh", "lldp", "dhcp-snooping"], + "ethernet": [ + { + "select-ports": [ + "LAN*" + ] + } + ], + "ipv4": { + "addressing": "static", + "subnet": "192.168.1.1/16", + "dhcp": { + "lease-first": 10, + "lease-count": 10000, + "lease-time": "6h" + } + }, + }], + "metrics": { + "statistics": { + "interval": 60, + "types": ["ssids", "lldp", "clients"] + }, + "health": { + "interval": 120 + }, + "wifi-frames": { + "filters": ["probe", + "auth", + "assoc", + "disassoc", + "deauth", + "local-deauth", + "inactive-deauth", + "key-mismatch", + "beacon-report", + "radar-detected"] + }, + "dhcp-snooping": { + "filters": ["ack", "discover", "offer", "request", "solicit", "reply", "renew"] + } + }, + "services": { + "lldp": { + "describe": "TIP OpenWiFi", + "location": "QA" + }, + "ssh": { + "port": 22 + } + } + } + self.vlan_section = { + "name": "WAN100", + "role": "upstream", + "vlan": { + "id": 100 + }, + "ethernet": [ + { + "select-ports": [ + "WAN*" + ] + } + ], + "ipv4": { + "addressing": "dynamic" + } + } + self.mode = None + + def set_mesh_services(self): + self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24" + self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100 + del self.base_profile_config['metrics']['wifi-frames'] + del self.base_profile_config['metrics']['dhcp-snooping'] + var = { + "filters": ["probe", + "auth"] + } + self.base_profile_config["metrics"]['wifi-frames'] = var + del self.base_profile_config['services'] + var2 = { + "lldp": { + "describe": "uCentral", + "location": "universe" + }, + "ssh": { + "port": 22 + } + } + self.base_profile_config['services'] = var2 + + def set_express_wifi(self, open_flow=None): + if self.mode == "NAT": + self.base_profile_config["interfaces"][1]["services"] = ["ssh", "lldp", "open-flow"] + self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.97.1/24" + self.base_profile_config["interfaces"][1]["ipv4"]["dhcp"]["lease-count"] = 100 + self.base_profile_config['services']["open-flow"] = open_flow + self.base_profile_config['services']['lldp']['describe'] = "OpenWiFi - expressWiFi" + self.base_profile_config['services']['lldp']['location'] = "Hotspot" + + def set_captive_portal(self): + + if self.mode == "NAT": + max_client = { + "max-clients": 32 + } + # sourceFile = open('captive_config.py', 'w') + + self.base_profile_config["interfaces"][1]["name"] = "captive" + self.base_profile_config["interfaces"][1]["ipv4"]["subnet"] = "192.168.2.1/24" + self.base_profile_config["interfaces"][1]["captive"] = max_client + del self.base_profile_config["interfaces"][1]["ethernet"] + del self.base_profile_config["interfaces"][1]["services"] + del self.base_profile_config["metrics"]["wifi-frames"] + del self.base_profile_config["metrics"]["dhcp-snooping"] + # print(self.base_profile_config) + # print(self.base_profile_config, file=sourceFile) + # sourceFile.close() + + def encryption_lookup(self, encryption="psk"): + encryption_mapping = { + "none": "open", + "psk": "wpa", + "psk2": "wpa2", + "sae": "wpa3", + "psk-mixed": "wpa|wpa2", + "sae-mixed": "wpa3", + "wpa": 'wap', + "wpa2": "eap", + "wpa3": "eap", + "wpa-mixed": "eap", + "wpa3-mixed": "sae" + } + if encryption in encryption_mapping.keys(): + return encryption_mapping[encryption] + else: + return False + + def get_ssid_info(self): + ssid_info = [] + for interfaces in self.base_profile_config["interfaces"]: + if "ssids" in interfaces.keys(): + for ssid_data in interfaces["ssids"]: + for band in ssid_data["wifi-bands"]: + temp = [ssid_data["name"]] + if ssid_data["encryption"]["proto"] == "none" or "radius" in ssid_data.keys(): + temp.append(self.encryption_lookup(encryption=ssid_data["encryption"]["proto"])) + temp.append('[BLANK]') + else: + temp.append(self.encryption_lookup(encryption=ssid_data["encryption"]["proto"])) + temp.append(ssid_data["encryption"]["key"]) + temp.append(band) + ssid_info.append(temp) + return ssid_info + + def set_radio_config(self, radio_config={}): + base_radio_config_2g = { + "band": "2G", + "country": "CA", + "channel-mode": "HE", + "channel": "auto" + } + base_radio_config_5g = { + "band": "5G", + "country": "CA", + "allow-dfs": True, + "channel-mode": "HE", + "channel": "auto" + } + for band in radio_config: + if band == "2G": + for keys in radio_config[band]: + base_radio_config_2g[keys] = radio_config[band][keys] + if band == "5G": + for keys in radio_config[band]: + base_radio_config_5g[keys] = radio_config[band][keys] + # if band == "6G": + # for keys in radio_config[band]: + # base_radio_config_6g[keys] = radio_config[band][keys] + + self.base_profile_config["radios"].append(base_radio_config_2g) + self.base_profile_config["radios"].append(base_radio_config_5g) + print(self.base_profile_config) + self.vlan_section["ssids"] = [] + self.vlan_ids = [] + + def set_mode(self, mode, mesh=False): + self.mode = mode + if mode == "NAT": + if mesh: + self.base_profile_config['interfaces'][0]['tunnel'] = { + "proto": "mesh" + } + self.base_profile_config['interfaces'][1]['ssids'] = [] + elif mode == "BRIDGE": + if mesh: + self.base_profile_config['interfaces'][0]['tunnel'] = { + "proto": "mesh" + } + self.base_profile_config['interfaces'][0]['ssids'] = [] + elif mode == "VLAN": + if mesh: + self.base_profile_config['interfaces'][0]['tunnel'] = { + "proto": "mesh" + } + del self.base_profile_config['interfaces'][1] + self.base_profile_config['interfaces'][0]['ssids'] = [] + self.base_profile_config['interfaces'] = [] + wan_section_vlan = { + "name": "WAN", + "role": "upstream", + "services": ["lldp", "ssh", "dhcp-snooping"], + "ethernet": [ + { + "select-ports": [ + "WAN*" + ] + } + ], + "ipv4": { + "addressing": "dynamic" + } + } + self.base_profile_config['interfaces'].append(wan_section_vlan) + else: + print("Invalid Mode") + return 0 + + def add_ssid(self, ssid_data, radius=False, radius_auth_data={}, radius_accounting_data={}): + print("ssid data : ", ssid_data) + ssid_info = {'name': ssid_data["ssid_name"], "bss-mode": "ap", "wifi-bands": [], "services": ["wifi-frames"]} + for options in ssid_data: + if options == "multi-psk": + ssid_info[options] = ssid_data[options] + print("hi", ssid_info) + if options == "rate-limit": + ssid_info[options] = ssid_data[options] + for i in ssid_data["appliedRadios"]: + ssid_info["wifi-bands"].append(i) + ssid_info['encryption'] = {} + ssid_info['encryption']['proto'] = ssid_data["security"] + try: + ssid_info['encryption']['key'] = ssid_data["security_key"] + except Exception as e: + pass + ssid_info['encryption']['ieee80211w'] = "optional" + if radius: + ssid_info["radius"] = {} + ssid_info["radius"]["authentication"] = { + "host": radius_auth_data["ip"], + "port": radius_auth_data["port"], + "secret": radius_auth_data["secret"] + } + ssid_info["radius"]["accounting"] = { + "host": radius_accounting_data["ip"], + "port": radius_accounting_data["port"], + "secret": radius_accounting_data["secret"] + } + if self.mode == "NAT": + self.base_profile_config['interfaces'][1]['ssids'].append(ssid_info) + elif self.mode == "BRIDGE": + self.base_profile_config['interfaces'][0]['ssids'].append(ssid_info) + elif self.mode == "VLAN": + vid = ssid_data["vlan"] + self.vlan_section = { + "name": "WAN100", + "role": "upstream", + "services": ["lldp", "dhcp-snooping"], + "vlan": { + "id": 100 + }, + "ethernet": [ + { + "select-ports": [ + "WAN*" + ] + } + ], + "ipv4": { + "addressing": "dynamic" + } + } + vlan_section = self.vlan_section + if vid in self.vlan_ids: + print("sss", self.vlan_ids) + for i in self.base_profile_config['interfaces']: + if i["name"] == "WANv%s" % (vid): + i["ssids"].append(ssid_info) + else: + print(self.vlan_ids) + self.vlan_ids.append(vid) + vlan_section['name'] = "WANv%s" % (vid) + vlan_section['vlan']['id'] = int(vid) + vlan_section["ssids"] = [] + vlan_section["ssids"].append(ssid_info) + self.base_profile_config['interfaces'].append(vlan_section) + print(vlan_section) + vsection = 0 + else: + print("invalid mode") + pytest.exit("invalid Operating Mode") + + def push_config(self, serial_number): + payload = {"configuration": self.base_profile_config, "serialNumber": serial_number, "UUID": 1} + uri = self.sdk_client.build_uri("device/" + serial_number + "/configure") + basic_cfg_str = json.dumps(payload) + print("Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + allure.attach(name="Sending Command:", body="Sending Command: " + "\n" + + "TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" + + "URI: " + str(uri) + "\n" + + "Data: " + str(payload) + "\n" + + "Headers: " + str(self.sdk_client.make_headers())) + + resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(), + verify=False, timeout=100) + self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri) + resp.close() + return resp + + +if __name__ == '__main__': + controller = { + 'url': 'https://sec-qa01.cicd.lab.wlan.tip.build:16001', # API base url for the controller + 'username': "tip@ucentral.com", + 'password': 'OpenWifi%123', + } + obj = Controller(controller_data=controller) + # po = ProvUtils(sdk_client=obj) + # print(po.get_inventory()) + # up = UProfileUtility(sdk_client=obj, controller_data=controller) + # up.set_mode(mode="BRIDGE") + # up.set_radio_config() + # up.add_ssid(ssid_data={"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["5G"], "security_key": "something", "security": "psk2"}) + # up.push_config(serial_number="3c2c99f44e77") + # print(obj.get_device_by_serial_number(serial_number="3c2c99f44e77")) + # print(datetime.datetime.utcnow()) + # fms = FMSUtils(sdk_client=obj) + # new = fms.get_firmwares(model='ecw5410') + # for i in new: + # print(i) + # print(len(new)) + + # print(profile.get_ssid_info()) + # # print(obj.get_devices()) + obj.logout()