token library

Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
shivamcandela
2021-06-08 20:37:04 +05:30
parent 0753b3b3cb
commit 58fa879b6a
6 changed files with 347 additions and 288 deletions

23
libs/controller/cert.pem Normal file
View File

@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIIDyzCCArOgAwIBAgIUFH6sVGySAPHbX9TuNTCERV/sLdUwDQYJKoZIhvcNAQEL
BQAwdTELMAkGA1UEBhMCQ0ExCzAJBgNVBAgMAkJDMRIwEAYDVQQHDAlWYW5jb3V2
ZXIxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEiMCAGCSqGSIb3
DQEJARYTc3RlcGhhbmVAYXJpbGlhLmNvbTAeFw0yMTAyMjgwMDU4NDdaFw0zMTAy
MjYwMDU4NDdaMHUxCzAJBgNVBAYTAkNBMQswCQYDVQQIDAJCQzESMBAGA1UEBwwJ
VmFuY291dmVyMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxIjAg
BgkqhkiG9w0BCQEWE3N0ZXBoYW5lQGFyaWxpYS5jb20wggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQDSAT5BOKAR5o/Sfo1CVTQMK6bcXJ+Cgh0MT4pkLi9D
yIPyvuYZTBRxXgR2B6AFRpQODc1ZV3OGHeiTRrxaaebTHPQ3A7abrjY76qdl2Jb2
FJlCAkRQp9+MYfQFIjfWOxH3zSxY/TsEtHInI7/vldV1GraOFKVs+On8HB9/ubGz
eZigyxS+EI/VW014YrjHc9Gv0zr6IZHPAtoY4v5c9Kg4lpQr0QCOKGWPqrbh7LWK
zsmhvvuHJZiRFumxrnO2uiXU+s2A5jX5plVIKsfpY+VlNGLE/ltT0LwnOi39Pe03
Bmmir2a31ATj4AWuG4huKYSUgyZntZ4tl3/e7IqmavVbAgMBAAGjUzBRMB0GA1Ud
DgQWBBQkMn0xl4afjEcZsqnm9r98lWEpVjAfBgNVHSMEGDAWgBQkMn0xl4afjEcZ
sqnm9r98lWEpVjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBo
0sVma03orhnw2b4D6IURr8wmk2qn917KW0kkklzkf28n5Gkj2CmM2cfvI4N22nwu
Cun841oOYE84gD246f1rUO2ebjveUVYHJWKGcLIvHJt50FCxIXSwLDdSbeVjRshg
8Dr2M3om0h4q55TCdT9j/ucbPSwJ1Y3g037iUVVZXbtrPmUMUQj9aAP9KUg+IprJ
rDPcm+smqzOy8bnGpBXg6WlZfdbhaWAKLsJULHuIi4DvAafmOHkRogEyuYmTMJg9
ob9D5kvmS9GMra3AIoMzsibv4sMWLObXk9pzvhDy5a/KPaQmwWmfibcQhahQZOUM
mgYyVxzP9sAyLY7mfL54
-----END CERTIFICATE-----

View File

@@ -3,120 +3,265 @@
Base Library for Ucentral
"""
import json
import ssl
import sys
from urllib.parse import urlparse
import pytest
import allure
import requests
from ucentral_gw import swagger_client
from pathlib import Path
class ConfigureController:
def __init__(self):
self.configuration = swagger_client.Configuration()
def __init__(self, controller_data):
self.username = controller_data["username"]
self.password = controller_data["password"]
self.host = urlparse(controller_data["url"])
self.access_token = ""
self.login()
def set_credentials(self, controller_data=None):
if dict(controller_data).keys().__contains__("username") and dict(controller_data).keys().__contains__(
"password"):
self.configuration.username = "support@example.com"
self.configuration.password = "support"
print("Login Credentials set to default: \n user_id: %s\n password: %s\n" % ("support@example.com",
"support"))
return False
def build_uri(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path)
return new_uri
def login(self):
uri = self.build_uri("oauth2")
payload = json.dumps({"userId": self.username, "password": self.password})
resp = requests.post(uri, data=payload, verify="")
self.check_response("POST", resp, "", payload, uri)
token = resp.json()
self.access_token = token["access_token"]
def logout(self):
global access_token
uri = self.build_uri('oauth2/%s' % access_token)
resp = requests.delete(uri, headers=self.make_headers(), verify=False)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
print('Logged out:', resp.status_code)
def make_headers(self):
headers = {'Authorization': 'Bearer %s' % self.access_token}
return headers
def check_response(self, cmd, response, headers, data_str, url):
if response.status_code >= 400:
if response.status_code >= 400:
print("check-response: ERROR, url: ", url)
else:
self.configuration.username = controller_data["username"]
self.configuration.password = controller_data["password"]
print("Login Credentials set to custom: \n user_id: %s\n password: %s\n" % (controller_data['userId'],
controller_data['password']))
return True
print("check-response: url: ", url)
print("Command: ", cmd)
print("response-status: ", response.status_code)
print("response-headers: ", response.headers)
print("response-content: ", response.content)
print("headers: ", headers)
print("data-str: ", data_str)
def select_controller_data(self, controller_data=None):
if dict(controller_data).keys().__contains__("url") is None:
print("No controller_data Selected")
exit()
self.sdk_base_url = controller_data["url"]
self.configuration.host = self.sdk_base_url
print("controller_data Selected: %s\n SDK_BASE_URL: %s\n" % (controller_data["url"], self.sdk_base_url))
return True
def set_sdk_base_url(self, sdk_base_url=None):
if sdk_base_url is None:
print("URL is None")
exit()
self.configuration.host = sdk_base_url
if response.status_code >= 400:
# if True:
raise NameError("Invalid response code.")
return True
class Controller(ConfigureController):
def __init__(self, controller_data=None, customer_id=None):
super().__init__()
self.controller_data = controller_data
self.customer_id = customer_id
if customer_id is None:
self.customer_id = 2
print("Setting to default Customer ID 2")
#
# Setting the Controller Client Configuration
self.select_controller_data(controller_data=controller_data)
self.set_credentials(controller_data=controller_data)
# self.configuration.refresh_api_key_hook = self.get_bearer_token
#
# # Connecting to Controller
# self.api_client = swagger_client.ApiClient(self.configuration)
# self.login_client = swagger_client.LoginApi(api_client=self.api_client)
# self.bearer = False
# self.disconnect = False
# # Token expiry in seconds
# self.token_expiry = 1000
# self.token_timestamp = time.time()
# try:
#
# self.bearer = self.get_bearer_token()
# # t1 = threading.Thread(target=self.refresh_instance)
# # t1.start()
# self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token
# self.status_client = swagger_client.StatusApi(api_client=self.api_client)
# self.equipment_client = swagger_client.EquipmentApi(self.api_client)
# self.profile_client = swagger_client.ProfileApi(self.api_client)
# self.api_client.configuration.api_key_prefix = {
# "Authorization": "Bearer " + self.bearer._access_token
# }
# self.api_client.configuration.refresh_api_key_hook = self.refresh_instance
# except Exception as e:
# self.bearer = False
# print(e)
print("Connected to Controller Server")
def __init__(self, controller_data=None):
super().__init__(controller_data)
def get_devices(self):
uri = self.build_uri("devices/")
resp = requests.get(uri, headers=self.make_headers(), verify=False)
self.check_response("GET", resp, self.make_headers(), "", uri)
devices = resp.json()
return devices
def get_device_by_serial_number(self, serial_number=None):
uri = self.build_uri("devices/"+serial_number)
resp = requests.get(uri, headers=self.make_headers(), verify=False)
self.check_response("GET", resp, self.make_headers(), "", uri)
device = resp.json()
return device
class ProfileUtility:
def __init__(self, sdk_client=None, controller_data=None):
if sdk_client is None:
self.sdk_client = Controller(controller_data=controller_data)
self.sdk_client=sdk_client
self.base_profile_config = {
"uuid": 1,
"radios": [{},{}],
"interfaces": [{}, {}],
"metrics": {},
"services": {},
}
def set_radio_config(self, radio_config):
for i in radio_config:
self.base_profile_config["radios"][0] = {
"band": "2G",
"country": "US",
"channel-mode": "HE",
"channel-width": 20,
"channel": 11
}
pass
UCENTRAL_BASE_CFG = {
"uuid": 1,
"radios": [
{
"band": "2G",
"country": "US",
"channel-mode": "HE",
"channel-width": 20,
"channel": 11
},
{
"band": "5G",
"country": "US",
"channel-mode": "HE",
"channel-width": 80,
"channel": 36
}
], # Similar to RF Profile
"interfaces": [
{
"name": "WAN",
"role": "upstream",
"services": ["lldp"],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
},
"ssids": [
{
"name": "OpenWifi",
"wifi-bands": [
"2G"
],
"bss-mode": "ap",
"encryption": {
"proto": "psk2",
"key": "OpenWifi",
"ieee80211w": "optional"
}
},
{
"name": "OpenWifi",
"wifi-bands": [
"5G"
],
"bss-mode": "ap",
"encryption": {
"proto": "psk2",
"key": "OpenWifi",
"ieee80211w": "optional"
}
}
]
}, # SSID Information is here
{
"name": "LAN",
"role": "downstream",
"services": ["ssh", "lldp"],
"ethernet": [
{
"select-ports": [
"LAN*"
]
}
],
"ipv4": {
"addressing": "static",
"subnet": "192.168.1.1/16",
"dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
}
},
"ssids": [
{
"name": "OpenWifi",
"wifi-bands": [
"2G"
],
"bss-mode": "ap",
"encryption": {
"proto": "psk2",
"key": "OpenWifi",
"ieee80211w": "optional"
}
},
{
"name": "OpenWifi",
"wifi-bands": [
"5G"
],
"bss-mode": "ap",
"encryption": {
"proto": "psk2",
"key": "OpenWifi",
"ieee80211w": "optional"
}
}
]
} # LAN/WAN Information is here
],
"metrics": {
"statistics": {
"interval": 120,
"types": ["ssids", "lldp", "clients"]
},
"health": {
"interval": 120
}
},
"services": {
"lldp": {
"describe": "uCentral",
"location": "universe"
},
"ssh": {
"port": 22
}
}
}
controller = {
'url': "https://wlan-portal-svc-nola-01.cicd.lab.wlan.tip.build", # API base url for the controller
'username': 'support@example.com',
'password': 'support',
'version': "1.1.0-SNAPSHOT",
'commit_date': "2021-04-27"
'url': "https://tip-f34.candelatech.com:16001/api/v1/oauth2", # API base url for the controller
'username': "tip@ucentral.com",
'password': 'openwifi',
# 'version': "1.1.0-SNAPSHOT",
# 'commit_date': "2021-04-27"
}
Controller(controller_data=controller)
obj = Controller(controller_data=controller)
equipments = obj.get_equipment()
print(equipments)
for i in equipments:
for j in equipments[i]:
print(j)
# print(equipments)
#
#
#
@@ -133,135 +278,7 @@ Controller(controller_data=controller)
#
# print(sys.path)
# exit()
# UCENTRAL_BASE_CFG = {
# "uuid": 1,
# "radios": [
# {
# "band": "2G",
# "country": "US",
# "channel-mode": "HE",
# "channel-width": 20,
# "channel": 11
# },
# {
# "band": "5G",
# "country": "US",
# "channel-mode": "HE",
# "channel-width": 80,
# "channel": 36
# }
# ], # Similar to RF Profile
#
# "interfaces": [
# {
# "name": "WAN",
# "role": "upstream",
# "services": ["lldp"],
# "ethernet": [
# {
# "select-ports": [
# "WAN*"
# ]
# }
# ],
# "ipv4": {
# "addressing": "dynamic"
# },
# "ssids": [
# {
# "name": "OpenWifi",
# "wifi-bands": [
# "2G"
# ],
# "bss-mode": "ap",
# "encryption": {
# "proto": "psk2",
# "key": "OpenWifi",
# "ieee80211w": "optional"
# }
# },
# {
# "name": "OpenWifi",
# "wifi-bands": [
# "5G"
# ],
# "bss-mode": "ap",
# "encryption": {
# "proto": "psk2",
# "key": "OpenWifi",
# "ieee80211w": "optional"
# }
# }
# ]
# }, # SSID Information is here
# {
# "name": "LAN",
# "role": "downstream",
# "services": ["ssh", "lldp"],
# "ethernet": [
# {
# "select-ports": [
# "LAN*"
# ]
# }
# ],
# "ipv4": {
# "addressing": "static",
# "subnet": "192.168.1.1/16",
# "dhcp": {
# "lease-first": 10,
# "lease-count": 10000,
# "lease-time": "6h"
# }
# },
# "ssids": [
# {
# "name": "OpenWifi",
# "wifi-bands": [
# "2G"
# ],
# "bss-mode": "ap",
# "encryption": {
# "proto": "psk2",
# "key": "OpenWifi",
# "ieee80211w": "optional"
# }
# },
# {
# "name": "OpenWifi",
# "wifi-bands": [
# "5G"
# ],
# "bss-mode": "ap",
# "encryption": {
# "proto": "psk2",
# "key": "OpenWifi",
# "ieee80211w": "optional"
# }
# }
# ]
#
# } # LAN/WAN Information is here
# ],
# "metrics": {
# "statistics": {
# "interval": 120,
# "types": ["ssids", "lldp", "clients"]
# },
# "health": {
# "interval": 120
# }
# },
# "services": {
# "lldp": {
# "describe": "uCentral",
# "location": "universe"
# },
# "ssh": {
# "port": 22
# }
# }
# }
#
# profile_data = {
# "mode": "BRIDGE",

View File

@@ -89,3 +89,9 @@ class ChamberView:
cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port, )
json_response = cli_base.json_get(_req_url=_req_url)
return json_response
def add_stations(self, num_stations=5):
self.CreateChamberview.setup(create_scenario=self.scenario_name,
raw_line=self.raw_line)
return True

View File

@@ -17,7 +17,7 @@ CONFIGURATION = {
'username': "lanforge",
'password': "pumpkin77",
'port': 8803, # 22,
'jumphost_tty': '/dev/ttyAP1',
'jumphost_tty': '/dev/ttyAP2',
'version': "https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/ecw5410/trunk/ecw5410-1.1.0.tar.gz"
}
],

View File

@@ -135,6 +135,7 @@ class TestSetupBridgeEnterpriseSuiteA(object):
msg='Failed to create profile')
assert False
@pytest.mark.sanity_55
def test_setup_equipment_ap_profile(self, setup_profiles, update_report,
test_cases):
""" Equipment AP Profile Suite A Enterprise """
@@ -149,6 +150,7 @@ class TestSetupBridgeEnterpriseSuiteA(object):
msg='Failed to create profile')
assert False
@pytest.mark.sanity_55
def test_verify_vif_config(self, setup_profiles, update_report,
test_cases):
""" VIF Config Suite A Enterprise """
@@ -163,6 +165,7 @@ class TestSetupBridgeEnterpriseSuiteA(object):
msg='Failed to push profile')
assert False
@pytest.mark.sanity_55
@allure.severity(allure.severity_level.BLOCKER)
def test_verify_vif_state(self, setup_profiles, update_report,
test_cases):

View File

@@ -1,58 +1,68 @@
# """
# Performance : Wifi Capacity Test : NAT Mode
#
# """
#
# import pytest
# import allure
#
# pytestmark = [pytest.mark.wifi_capacity_test, pytest.mark.nat]
#
# setup_params_general = {
# "mode": "NAT",
# "ssid_modes": {
# "wpa2_personal": [
# {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["is2dot4GHz"], "security_key": "something"},
# {"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["is5GHzU", "is5GHz", "is5GHzL"],
# "security_key": "something"}]},
# "rf": {},
# "radius": False
# }
#
#
# @pytest.mark.basic
# @allure.feature("NAT MODE CLIENT CONNECTIVITY")
# @pytest.mark.parametrize(
# 'setup_profiles',
# [setup_params_general],
# indirect=True,
# scope="class"
# )
# @pytest.mark.usefixtures("setup_profiles")
# class TestWifiCapacityNATMode(object):
#
# @pytest.mark.wpa2_personal
# @pytest.mark.twog
# def test_client_wpa2_personal_2g(self, lf_test, station_names_twog, create_lanforge_chamberview_dut,
# get_configuration):
# profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0]
# ssid_name = profile_data["ssid_name"]
# security_key = profile_data["security_key"]
# security = "wpa2"
# mode = "NAT"
# band = "twog"
# vlan = 1
# dut_name = create_lanforge_chamberview_dut
# PASS = lf_test.wifi_capacity(ssid=profile_data["ssid_name"], paswd=profile_data["security_key"],
# security="wpa2", mode="NAT", band="twog",
# instance_name="wct_instance", )
# assert PASS
#
"""
Performance : Wifi Capacity Test : NAT Mode
pytest -m "performance and wifi_capacity_test and nat"
"""
import pytest
import allure
pytestmark = [pytest.mark.wifi_capacity_test, pytest.mark.nat]
setup_params_general = {
"mode": "NAT",
"ssid_modes": {
"wpa2_personal": [
{"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["is2dot4GHz"], "security_key": "something"},
{"ssid_name": "ssid_wpa2_5g", "appliedRadios": ["is5GHzU", "is5GHz", "is5GHzL"],
"security_key": "something"}]},
"rf": {},
"radius": False
}
@allure.feature("NAT MODE CLIENT CONNECTIVITY")
@pytest.mark.parametrize(
'setup_profiles',
[setup_params_general],
indirect=True,
scope="class"
)
@pytest.mark.usefixtures("setup_profiles")
class TestWifiCapacityNATMode(object):
"""wifi_capacity_test
pytest -m "performance and wifi_capacity_test and nat"
"""
@pytest.mark.wpa2_personal
@pytest.mark.twog
def test_client_wpa2_personal_2g(self, lf_test, lf_tools,
station_names_twog, create_lanforge_chamberview_dut,
get_configuration):
"""wifi_capacity_test
pytest -m "performance and wifi_capacity_test and nat and wpa2_personal and twog"
"""
profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0]
ssid_name = profile_data["ssid_name"]
security_key = profile_data["security_key"]
security = "wpa2"
mode = "NAT"
band = "twog"
vlan = 1
dut_name = create_lanforge_chamberview_dut
PASS = lf_test.wifi_capacity(ssid=ssid_name, paswd=security_key,
security=security, mode=mode, band=band,
instance_name="wct_instance" )
assert PASS
# @pytest.mark.wpa2_personal
# @pytest.mark.fiveg
# def test_client_wpa2_personal_2g(self, lf_test, station_names_twog, create_lanforge_chamberview_dut,
# def test_client_wpa2_personal_5g(self, lf_test, lf_tools,
# station_names_twog, create_lanforge_chamberview_dut,
# get_configuration):
# profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][0]
# """wifi_capacity_test
# pytest -m "performance and wifi_capacity_test and nat and wpa2_personal and fiveg"
# """
# profile_data = setup_params_general["ssid_modes"]["wpa2_personal"][1]
# ssid_name = profile_data["ssid_name"]
# security_key = profile_data["security_key"]
# security = "wpa2"