From 53edeaa71db913016e75c9de4dbbf5fac13a74c4 Mon Sep 17 00:00:00 2001 From: shivamcandela Date: Sun, 8 Aug 2021 01:08:12 +0530 Subject: [PATCH] changed path of ucentral ctlr library Signed-off-by: shivamcandela --- libs/controller/controller_2x/controller.py | 380 ++++++++++++++++++++ 1 file changed, 380 insertions(+) create mode 100644 libs/controller/controller_2x/controller.py diff --git a/libs/controller/controller_2x/controller.py b/libs/controller/controller_2x/controller.py new file mode 100644 index 000000000..fa09938d6 --- /dev/null +++ b/libs/controller/controller_2x/controller.py @@ -0,0 +1,380 @@ +""" + + Base Library for Ucentral + +""" +import json +import ssl +import sys +import time +from urllib.parse import urlparse +import pytest +import allure +import requests +from pathlib import Path + +from requests.adapters import HTTPAdapter +import logging + + +# logging.basicConfig(level=logging.DEBUG) +# from http.client import HTTPConnection +# +# HTTPConnection.debuglevel = 1 +# requests.logging.getLogger() + + +class ConfigureController: + + def __init__(self, controller_data): + self.username = controller_data["username"] + self.password = controller_data["password"] + self.host = urlparse(controller_data["url"]) + print(self.host) + self.access_token = "" + # self.session = requests.Session() + self.login_resp = self.login() + self.gw_host = self.get_endpoint() + + def build_uri_sec(self, path): + new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path) + print(new_uri) + return new_uri + + def build_uri(self, path): + + new_uri = 'https://%s:%d/api/v1/%s' % (self.gw_host.hostname, self.gw_host.port, path) + print(new_uri) + return new_uri + + def login(self): + uri = self.build_uri_sec("oauth2") + # self.session.mount(uri, HTTPAdapter(max_retries=15)) + payload = json.dumps({"userId": self.username, "password": self.password}) + resp = requests.post(uri, data=payload, verify=False, timeout=100) + self.check_response("POST", resp, "", payload, uri) + token = resp.json() + self.access_token = token["access_token"] + print(token) + + # self.session.headers.update({'Authorization': self.access_token}) + return resp + + def get_endpoint(self): + uri = self.build_uri_sec("systemEndpoints") + print(uri) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + print(resp) + self.check_response("GET", resp, self.make_headers(), "", uri) + devices = resp.json() + print(devices["endpoints"][0]["uri"]) + gw_host = urlparse(devices["endpoints"][0]["uri"]) + return gw_host + + def logout(self): + uri = self.build_uri_sec('oauth2/%s' % self.access_token) + resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("DELETE", resp, self.make_headers(), "", uri) + print('Logged out:', resp.status_code) + return resp + + def make_headers(self): + headers = {'Authorization': 'Bearer %s' % self.access_token, + "Connection": "keep-alive", + "Keep-Alive": "timeout=10, max=1000" + } + return headers + + def check_response(self, cmd, response, headers, data_str, url): + if response.status_code >= 400: + if response.status_code >= 400: + print("check-response: ERROR, url: ", url) + else: + print("check-response: url: ", url) + print("Command: ", cmd) + print("response-status: ", response.status_code) + print("response-headers: ", response.headers) + print("response-content: ", response.content) + print("headers: ", headers) + print("data-str: ", data_str) + + if response.status_code >= 400: + # if True: + raise NameError("Invalid response code.") + return True + + +class Controller(ConfigureController): + + def __init__(self, controller_data=None): + super().__init__(controller_data) + + def get_devices(self): + uri = self.build_uri("devices/") + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + devices = resp.json() + # resp.close()() + return devices + + def get_device_by_serial_number(self, serial_number=None): + uri = self.build_uri("device/" + serial_number) + resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) + self.check_response("GET", resp, self.make_headers(), "", uri) + device = resp.json() + # resp.close()() + return device + + def get_device_uuid(self, serial_number): + device_info = self.get_device_by_serial_number(serial_number=serial_number) + return device_info["UUID"] + + +class UProfileUtility: + + def __init__(self, sdk_client=None, controller_data=None): + if sdk_client is None: + self.sdk_client = UController(controller_data=controller_data) + self.sdk_client = sdk_client + self.base_profile_config = { + "uuid": 1, + "radios": [], + "interfaces": [{ + "name": "WAN", + "role": "upstream", + "services": ["lldp", "dhcp-snooping"], + "ethernet": [ + { + "select-ports": [ + "WAN*" + ] + } + ], + "ipv4": { + "addressing": "dynamic" + } + }, + { + "name": "LAN", + "role": "downstream", + "services": ["ssh", "lldp", "dhcp-snooping"], + "ethernet": [ + { + "select-ports": [ + "LAN*" + ] + } + ], + "ipv4": { + "addressing": "static", + "subnet": "192.168.1.1/16", + "dhcp": { + "lease-first": 10, + "lease-count": 10000, + "lease-time": "6h" + } + }, + }], + "metrics": { + "statistics": { + "interval": 60, + "types": ["ssids", "lldp", "clients"] + }, + "health": { + "interval": 120 + }, + "wifi-frames": { + "filters": ["probe", + "auth", + "assoc", + "disassoc", + "deauth", + "local-deauth", + "inactive-deauth", + "key-mismatch", + "beacon-report", + "radar-detected"] + }, + "dhcp-snooping": { + "filters": ["ack", "discover", "offer", "request", "solicit", "reply", "renew"] + } + }, + "services": { + "lldp": { + "describe": "TIP OpenWiFi", + "location": "QA" + }, + "ssh": { + "port": 22 + } + } + } + self.vlan_section = { + "name": "WAN100", + "role": "upstream", + "vlan": { + "id": 100 + }, + "ethernet": [ + { + "select-ports": [ + "WAN*" + ] + } + ], + "ipv4": { + "addressing": "dynamic" + } + } + self.mode = None + + def set_radio_config(self, radio_config=None): + self.base_profile_config["radios"].append({ + "band": "2G", + "country": "US", + # "channel-mode": "HE", + "channel-width": 20, + # "channel": 11 + }) + self.base_profile_config["radios"].append({ + "band": "5G", + "country": "US", + # "channel-mode": "HE", + "channel-width": 80, + # "channel": "auto" + }) + + self.vlan_section["ssids"] = [] + self.vlan_ids = [] + + def set_mode(self, mode): + self.mode = mode + if mode == "NAT": + self.base_profile_config['interfaces'][1]['ssids'] = [] + + elif mode == "BRIDGE": + del self.base_profile_config['interfaces'][1] + self.base_profile_config['interfaces'][0]['ssids'] = [] + elif mode == "VLAN": + del self.base_profile_config['interfaces'][1] + self.base_profile_config['interfaces'][0]['ssids'] = [] + self.base_profile_config['interfaces'] = [] + wan_section_vlan = { + "name": "WAN", + "role": "upstream", + "services": ["lldp"], + "ethernet": [ + { + "select-ports": [ + "WAN*" + ] + } + ], + "ipv4": { + "addressing": "dynamic" + } + } + self.base_profile_config['interfaces'].append(wan_section_vlan) + else: + print("Invalid Mode") + return 0 + + def add_ssid(self, ssid_data, radius=False, radius_auth_data={}, radius_accounting_data={}): + print("ssid data : ", ssid_data) + ssid_info = {'name': ssid_data["ssid_name"], "bss-mode": "ap", "wifi-bands": [], "services": ["wifi-frames"]} + for i in ssid_data["appliedRadios"]: + ssid_info["wifi-bands"].append(i) + ssid_info['encryption'] = {} + ssid_info['encryption']['proto'] = ssid_data["security"] + try: + ssid_info['encryption']['key'] = ssid_data["security_key"] + except Exception as e: + pass + ssid_info['encryption']['ieee80211w'] = "optional" + if radius: + ssid_info["radius"] = {} + ssid_info["radius"]["authentication"] = { + "host": radius_auth_data["ip"], + "port": radius_auth_data["port"], + "secret": radius_auth_data["secret"] + } + ssid_info["radius"]["accounting"] = { + "host": radius_accounting_data["ip"], + "port": radius_accounting_data["port"], + "secret": radius_accounting_data["secret"] + } + if self.mode == "NAT": + self.base_profile_config['interfaces'][1]['ssids'].append(ssid_info) + elif self.mode == "BRIDGE": + self.base_profile_config['interfaces'][0]['ssids'].append(ssid_info) + elif self.mode == "VLAN": + vid = ssid_data["vlan"] + self.vlan_section = { + "name": "WAN100", + "role": "upstream", + "services": ["lldp", "dhcp-snooping"], + "vlan": { + "id": 100 + }, + "ethernet": [ + { + "select-ports": [ + "WAN*" + ] + } + ], + "ipv4": { + "addressing": "dynamic" + } + } + vlan_section = self.vlan_section + if vid in self.vlan_ids: + print("sss", self.vlan_ids) + for i in self.base_profile_config['interfaces']: + if i["name"] == "WANv%s" % (vid): + i["ssids"].append(ssid_info) + else: + print(self.vlan_ids) + self.vlan_ids.append(vid) + vlan_section['name'] = "WANv%s" % (vid) + vlan_section['vlan']['id'] = int(vid) + vlan_section["ssids"] = [] + vlan_section["ssids"].append(ssid_info) + self.base_profile_config['interfaces'].append(vlan_section) + print(vlan_section) + vsection = 0 + else: + print("invalid mode") + pytest.exit("invalid Operating Mode") + + def push_config(self, serial_number): + payload = {"configuration": self.base_profile_config, "serialNumber": serial_number, "UUID": 0} + + uri = self.sdk_client.build_uri("device/" + serial_number + "/configure") + basic_cfg_str = json.dumps(payload) + allure.attach(name="ucentral_config: ", body=str(self.base_profile_config)) + print(self.base_profile_config) + resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(), + verify=False, timeout=100) + self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri) + print(resp.url) + resp.close() + print(resp) + + +if __name__ == '__main__': + controller = { + 'url': 'https://sec-ucentral-qa01.cicd.lab.wlan.tip.build:16001', # API base url for the controller + 'username': "tip@ucentral.com", + 'password': 'openwifi', + } + obj = Controller(controller_data=controller) + # profile = UProfileUtility(sdk_client=obj) + # profile.set_mode(mode="BRIDGE") + # profile.set_radio_config() + # ssid = {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security": "psk", "security_key": "something", + # "vlan": 100} + # profile.add_ssid(ssid_data=ssid) + # profile.push_config(serial_number="903cb39d6918") + # print(obj.get_devices()) + obj.logout()