Files
wlan-testing/libs/controller/controller.py
shivamcandela 26d55fdcc2 unwanted print changes
Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
2021-05-10 17:13:15 +05:30

875 lines
38 KiB
Python

# !/usr/local/lib64/python3.8
"""
Controller Library
1. controller_data/sdk_base_url
2. login credentials
"""
import json
import re
import ssl
import time
import urllib
import requests
import swagger_client
from bs4 import BeautifulSoup
from swagger_client import EquipmentGatewayApi
from swagger_client import FirmwareManagementApi
import allure
class ConfigureController:
def __init__(self):
self.configuration = swagger_client.Configuration()
def set_credentials(self, controller_data=None):
if dict(controller_data).keys().__contains__("username") and dict(controller_data).keys().__contains__(
"password"):
self.configuration.username = "support@example.com"
self.configuration.password = "support"
print("Login Credentials set to default: \n user_id: %s\n password: %s\n" % ("support@example.com",
"support"))
return False
else:
self.configuration.username = controller_data["username"]
self.configuration.password = controller_data["password"]
print("Login Credentials set to custom: \n user_id: %s\n password: %s\n" % (controller_data['userId'],
controller_data['password']))
return True
def select_controller_data(self, controller_data=None):
if dict(controller_data).keys().__contains__("url") is None:
print("No controller_data Selected")
exit()
self.sdk_base_url = controller_data["url"]
self.configuration.host = self.sdk_base_url
print("controller_data Selected: %s\n SDK_BASE_URL: %s\n" % (controller_data["url"], self.sdk_base_url))
return True
def set_sdk_base_url(self, sdk_base_url=None):
if sdk_base_url is None:
print("URL is None")
exit()
self.configuration.host = sdk_base_url
return True
"""
Library for cloud_controller_tests generic usages, it instantiate the bearer and credentials.
It provides the connectivity to the cloud.
Instantiate the Object by providing the controller_data=controller_url, customer_id=2
"""
class Controller(ConfigureController):
"""
constructor for cloud_controller_tests library : can be used from pytest framework
"""
def __init__(self, controller_data=None, customer_id=None):
super().__init__()
self.controller_data = controller_data
self.customer_id = customer_id
if customer_id is None:
self.customer_id = 2
print("Setting to default Customer ID 2")
# Setting the Controller Client Configuration
self.select_controller_data(controller_data=controller_data)
self.set_credentials(controller_data=controller_data)
# self.configuration.refresh_api_key_hook = self.get_bearer_token
# Connecting to Controller
self.api_client = swagger_client.ApiClient(self.configuration)
self.login_client = swagger_client.LoginApi(api_client=self.api_client)
self.bearer = self.get_bearer_token()
self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token
self.status_client = swagger_client.StatusApi(api_client=self.api_client)
self.equipment_client = swagger_client.EquipmentApi(self.api_client)
self.profile_client = swagger_client.ProfileApi(self.api_client)
self.api_client.configuration.api_key_prefix = {
"Authorization": "Bearer " + self.bearer._access_token
}
self.api_client.configuration.refresh_api_key_hook = self.get_bearer_token()
self.ping_response = self.portal_ping()
self.default_profiles = {}
# print(self.bearer)
if self.ping_response._application_name != 'PortalServer':
print("Server not Reachable")
exit()
print("Connected to Controller Server")
def get_bearer_token(self):
request_body = {
"userId": self.configuration.username,
"password": self.configuration.password
}
return self.login_client.get_access_token(request_body)
def refresh_instance(self):
# Connecting to Controller
self.api_client = swagger_client.ApiClient(self.configuration)
self.login_client = swagger_client.LoginApi(api_client=self.api_client)
self.bearer = self.get_bearer_token()
self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token
self.status_client = swagger_client.StatusApi(api_client=self.api_client)
self.equipment_client = swagger_client.EquipmentApi(self.api_client)
self.profile_client = swagger_client.ProfileApi(self.api_client)
self.api_client.configuration.api_key_prefix = {
"Authorization": "Bearer " + self.bearer._access_token
}
self.api_client.configuration.refresh_api_key_hook = self.get_bearer_token()
self.ping_response = self.portal_ping()
self.default_profiles = {}
# print(self.bearer)
if self.ping_response._application_name != 'PortalServer':
print("Server not Reachable")
exit()
print("Connected to Controller Server")
def portal_ping(self):
return self.login_client.portal_ping()
def disconnect_Controller(self):
self.api_client.__del__()
# Returns a List of All the Equipments that are available in the cloud instances
def get_equipment_by_customer_id(self, max_items=10):
pagination_context = """{
"model_type": "PaginationContext",
"maxItemsPerPage": """ + str(max_items) + """
}"""
equipment_data = self.equipment_client.get_equipment_by_customer_id(customer_id=self.customer_id,
pagination_context=pagination_context)
return equipment_data._items
# check if equipment with the given equipment_id is available in cloud instance or not
def validate_equipment_availability(self, equipment_id=None):
data = self.get_equipment_by_customer_id()
for i in data:
if i._id == equipment_id:
return i._id
return -1
# Need to be added in future
def request_ap_reboot(self):
pass
# Get the equipment id, of a equipment with a serial number
def get_equipment_id(self, serial_number=None):
equipment_data = self.get_equipment_by_customer_id(max_items=100)
# print(equipment_data)
for equipment in equipment_data:
print(equipment._id)
if equipment._serial == serial_number:
return equipment._id
# Get the equipment model name of a given equipment_id
def get_model_name(self, equipment_id=None):
if equipment_id is None:
return None
data = self.equipment_client.get_equipment_by_id(equipment_id=equipment_id)
print(str(data._details._equipment_model))
return str(data._details._equipment_model)
# Needs Bug fix from swagger code generation side
def get_ap_firmware_new_method(self, equipment_id=None):
response = self.status_client.get_status_by_customer_equipment(customer_id=self.customer_id,
equipment_id=equipment_id)
print(response[2])
# Old Method, will be depreciated in future
def get_ap_firmware_old_method(self, equipment_id=None):
url = self.configuration.host + "/portal/status/forEquipment?customerId=" + str(
self.customer_id) + "&equipmentId=" + str(equipment_id)
payload = {}
headers = self.configuration.api_key_prefix
response = requests.request("GET", url, headers=headers, data=payload)
if response.status_code == 200:
status_data = response.json()
# print(status_data)
try:
current_ap_fw = status_data[2]['details']['reportedSwVersion']
print(current_ap_fw)
return current_ap_fw
except Exception as e:
current_ap_fw = "error"
return False
else:
return False
"""
Profile Utilities
"""
def get_current_profile_on_equipment(self, equipment_id=None):
default_equipment_data = self.equipment_client.get_equipment_by_id(equipment_id=equipment_id, async_req=False)
return default_equipment_data._profile_id
# Get the ssid's that are used by the equipment
def get_ssids_on_equipment(self, equipment_id=None):
profile_id = self.get_current_profile_on_equipment(equipment_id=equipment_id)
all_profiles = self.profile_client.get_profile_with_children(profile_id=profile_id)
ssid_name_list = []
for i in all_profiles:
if i._profile_type == "ssid":
ssid_name_list.append(i._details['ssid'])
return all_profiles
# Get the child ssid profiles that are used by equipment ap profile of given profile id
def get_ssid_profiles_from_equipment_profile(self, profile_id=None):
equipment_ap_profile = self.profile_client.get_profile_by_id(profile_id=profile_id)
ssid_name_list = []
child_profile_ids = equipment_ap_profile.child_profile_ids
for i in child_profile_ids:
profile = self.profile_client.get_profile_by_id(profile_id=i)
if profile._profile_type == "ssid":
ssid_name_list.append(profile._details['ssid'])
return ssid_name_list
"""
Library for Profile Utility, Creating Profiles and Pushing and Deleting them
Steps to create a Profile on Controller:
create a RF Profile
create a Radius Profile
create ssid profiles, and add the radius profile in them, if needed (only used by eap ssid's)
create equipment_ap profile, and add the rf profile and ssid profiles
Now using push profile method, equipment_ap profile will be pushed to an AP of given equipment_id
"""
class ProfileUtility:
"""
constructor for Access Point Utility library : can be used from pytest framework
to control Access Points
"""
def __init__(self, sdk_client=None, controller_data=None, customer_id=None):
if sdk_client is None:
sdk_client = Controller(controller_data=controller_data, customer_id=customer_id)
self.sdk_client = sdk_client
self.profile_client = swagger_client.ProfileApi(api_client=self.sdk_client.api_client)
self.profile_creation_ids = {
"ssid": [],
"ap": [],
"radius": [],
"rf": []
}
self.default_profiles = {}
self.profile_ids = []
def cleanup_objects(self):
self.profile_creation_ids = {
"ssid": [],
"ap": [],
"radius": [],
"rf": []
}
self.default_profiles = {}
self.profile_ids = []
def get_profile_by_name(self, profile_name=None):
pagination_context = """{
"model_type": "PaginationContext",
"maxItemsPerPage": 1000
}"""
profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id,
pagination_context=pagination_context)
for i in profiles._items:
if i._name == profile_name:
return i
return None
def get_ssid_name_by_profile_id(self, profile_id=None):
profiles = self.profile_client.get_profile_by_id(profile_id=profile_id)
return profiles._details["ssid"]
"""
default templates are as follows :
profile_name= TipWlan-rf/
Radius-Profile/
TipWlan-2-Radios/
TipWlan-3-Radios/
TipWlan-Cloud-Wifi/
Captive-Portal
"""
def get_default_profiles(self):
pagination_context = """{
"model_type": "PaginationContext",
"maxItemsPerPage": 100
}"""
items = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id,
pagination_context=pagination_context)
for i in items._items:
# print(i._name, i._id)
if i._name == "TipWlan-Cloud-Wifi":
self.default_profiles['ssid'] = i
if i._name == "TipWlan-3-Radios":
self.default_profiles['equipment_ap_3_radios'] = i
if i._name == "TipWlan-2-Radios":
self.default_profiles['equipment_ap_2_radios'] = i
if i._name == "Captive-Portal":
self.default_profiles['captive_portal'] = i
if i._name == "Radius-Profile":
self.default_profiles['radius'] = i
if i._name == "TipWlan-rf":
self.default_profiles['rf'] = i
# This will delete the Profiles associated with an equipment of givwn equipment_id, and associate it to default
# equipment_ap profile
def delete_current_profile(self, equipment_id=None):
equipment_data = self.sdk_client.equipment_client.get_equipment_by_id(equipment_id=equipment_id)
data = self.profile_client.get_profile_with_children(profile_id=equipment_data._profile_id)
delete_ids = []
for i in data:
if i._name == "TipWlan-rf":
continue
else:
delete_ids.append(i._id)
# print(delete_ids)
self.get_default_profiles()
self.profile_creation_ids['ap'] = self.default_profiles['equipment_ap_3_radios']._id
# print(self.profile_creation_ids)
self.push_profile_old_method(equipment_id=equipment_id)
self.delete_profile(profile_id=delete_ids)
# This will delete all the profiles on an controller instance, except the default profiles
def cleanup_profiles(self):
try:
self.get_default_profiles()
pagination_context = """{
"model_type": "PaginationContext",
"maxItemsPerPage": 10000
}"""
skip_delete_id = []
for i in self.default_profiles:
skip_delete_id.append(self.default_profiles[i]._id)
all_profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id,
pagination_context=pagination_context)
delete_ids = []
for i in all_profiles._items:
delete_ids.append(i._id)
skip_delete_id = []
for i in self.default_profiles:
skip_delete_id.append(self.default_profiles[i]._id)
delete_ids = list(set(delete_ids) - set(delete_ids).intersection(set(skip_delete_id)))
print(delete_ids)
for i in delete_ids:
self.set_equipment_to_profile(profile_id=i)
self.delete_profile(profile_id=delete_ids)
status = True
except Exception as e:
print(e)
status = False
return status
# Delete any profile with the given name
def delete_profile_by_name(self, profile_name=None):
pagination_context = """{
"model_type": "PaginationContext",
"maxItemsPerPage": 5000
}"""
all_profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id,
pagination_context=pagination_context)
for i in all_profiles._items:
if i._name == profile_name:
counts = self.profile_client.get_counts_of_equipment_that_use_profiles([i._id])[0]
if counts._value2:
self.set_equipment_to_profile(profile_id=i._id)
self.delete_profile(profile_id=[i._id])
else:
self.delete_profile(profile_id=[i._id])
# This method will set all the equipments to default equipment_ap profile, those having the profile_id passed in
# argument
def set_equipment_to_profile(self, profile_id=None):
pagination_context = """{
"model_type": "PaginationContext",
"maxItemsPerPage": 5000
}"""
equipment_data = self.sdk_client.equipment_client.get_equipment_by_customer_id(customer_id=2,
pagination_context=pagination_context)
self.get_default_profiles()
for i in equipment_data._items:
if i._profile_id == profile_id:
self.profile_creation_ids['ap'] = self.default_profiles['equipment_ap_2_radios']._id
self.push_profile_old_method(equipment_id=i._id)
time.sleep(2)
"""
method call: used to create the rf profile and push set the parameters accordingly and update
Library method to create a new rf profile: Now using default profile
"""
def set_rf_profile(self, profile_data=None, mode=None):
self.get_default_profiles()
if mode == "wifi5":
default_profile = self.default_profiles['rf']
default_profile._name = profile_data["name"]
default_profile._details["rfConfigMap"]["is2dot4GHz"]["rf"] = profile_data["name"]
default_profile._details["rfConfigMap"]["is5GHz"]["rf"] = profile_data["name"]
default_profile._details["rfConfigMap"]["is5GHzL"]["rf"] = profile_data["name"]
default_profile._details["rfConfigMap"]["is5GHzU"]["rf"] = profile_data["name"]
profile = self.profile_client.create_profile(body=default_profile)
self.profile_creation_ids['rf'].append(profile._id)
return profile
if mode == "wifi6":
default_profile = self.default_profiles['rf']
default_profile._name = profile_data["name"]
default_profile._details["rfConfigMap"]["is2dot4GHz"]["activeScanSettings"]["enabled"] = False
default_profile._details["rfConfigMap"]["is2dot4GHz"]["radioMode"] = 'modeAX'
default_profile._details["rfConfigMap"]["is5GHz"]["radioMode"] = 'modeAX'
default_profile._details["rfConfigMap"]["is5GHzL"]["radioMode"] = 'modeAX'
default_profile._details["rfConfigMap"]["is5GHzU"]["radioMode"] = 'modeAX'
default_profile._details["rfConfigMap"]["is2dot4GHz"]["rf"] = profile_data["name"]
default_profile._details["rfConfigMap"]["is5GHz"]["rf"] = profile_data["name"]
default_profile._details["rfConfigMap"]["is5GHzL"]["rf"] = profile_data["name"]
default_profile._details["rfConfigMap"]["is5GHzU"]["rf"] = profile_data["name"]
default_profile._name = profile_data["name"]
profile = self.profile_client.create_profile(body=default_profile)
self.profile_creation_ids['rf'].append(profile._id)
return profile
"""
method call: used to create a ssid profile with the given parameters
"""
def create_open_ssid_profile(self, two4g=True, fiveg=True, profile_data=None):
try:
if profile_data is None:
return False
default_profile = self.default_profiles['ssid']
default_profile._details['appliedRadios'] = []
if two4g is True:
default_profile._details['appliedRadios'].append("is2dot4GHz")
if fiveg is True:
default_profile._details['appliedRadios'].append("is5GHzU")
default_profile._details['appliedRadios'].append("is5GHz")
default_profile._details['appliedRadios'].append("is5GHzL")
default_profile._name = profile_data['profile_name']
default_profile._details['vlanId'] = profile_data['vlan']
default_profile._details['ssid'] = profile_data['ssid_name']
default_profile._details['forwardMode'] = profile_data['mode']
default_profile._details['secureMode'] = 'open'
profile = self.profile_client.create_profile(body=default_profile)
profile_id = profile._id
self.profile_creation_ids['ssid'].append(profile_id)
self.profile_ids.append(profile_id)
except Exception as e:
profile = "error"
return profile
def create_wpa_ssid_profile(self, two4g=True, fiveg=True, profile_data=None):
try:
if profile_data is None:
return False
default_profile = self.default_profiles['ssid']
default_profile._details['appliedRadios'] = []
if two4g is True:
default_profile._details['appliedRadios'].append("is2dot4GHz")
if fiveg is True:
default_profile._details['appliedRadios'].append("is5GHzU")
default_profile._details['appliedRadios'].append("is5GHz")
default_profile._details['appliedRadios'].append("is5GHzL")
default_profile._name = profile_data['profile_name']
default_profile._details['vlanId'] = profile_data['vlan']
default_profile._details['ssid'] = profile_data['ssid_name']
default_profile._details['keyStr'] = profile_data['security_key']
default_profile._details['forwardMode'] = profile_data['mode']
default_profile._details['secureMode'] = 'wpaPSK'
profile = self.profile_client.create_profile(body=default_profile)
profile_id = profile._id
self.profile_creation_ids['ssid'].append(profile_id)
self.profile_ids.append(profile_id)
except Exception as e:
profile = False
return profile
def create_wpa2_personal_ssid_profile(self, two4g=True, fiveg=True, profile_data=None):
try:
if profile_data is None:
return False
default_profile = self.default_profiles['ssid']
default_profile._details['appliedRadios'] = []
if two4g is True:
default_profile._details['appliedRadios'].append("is2dot4GHz")
if fiveg is True:
default_profile._details['appliedRadios'].append("is5GHzU")
default_profile._details['appliedRadios'].append("is5GHz")
default_profile._details['appliedRadios'].append("is5GHzL")
default_profile._name = profile_data['profile_name']
default_profile._details['vlanId'] = profile_data['vlan']
default_profile._details['ssid'] = profile_data['ssid_name']
default_profile._details['keyStr'] = profile_data['security_key']
default_profile._details['forwardMode'] = profile_data['mode']
default_profile._details['secureMode'] = 'wpa2OnlyPSK'
profile = self.profile_client.create_profile(body=default_profile)
profile_id = profile._id
self.profile_creation_ids['ssid'].append(profile_id)
self.profile_ids.append(profile_id)
except Exception as e:
profile = False
return profile
def create_wpa3_personal_ssid_profile(self, two4g=True, fiveg=True, profile_data=None):
if profile_data is None:
return False
default_profile = self.default_profiles['ssid']
default_profile._details['appliedRadios'] = []
if two4g is True:
default_profile._details['appliedRadios'].append("is2dot4GHz")
if fiveg is True:
default_profile._details['appliedRadios'].append("is5GHzU")
default_profile._details['appliedRadios'].append("is5GHz")
default_profile._details['appliedRadios'].append("is5GHzL")
default_profile._name = profile_data['profile_name']
default_profile._details['vlanId'] = profile_data['vlan']
default_profile._details['ssid'] = profile_data['ssid_name']
default_profile._details['keyStr'] = profile_data['security_key']
default_profile._details['forwardMode'] = profile_data['mode']
default_profile._details['secureMode'] = 'wpa3OnlyPSK'
profile_id = self.profile_client.create_profile(body=default_profile)._id
self.profile_creation_ids['ssid'].append(profile_id)
self.profile_ids.append(profile_id)
return True
def create_wpa2_enterprise_ssid_profile(self, two4g=True, fiveg=True, profile_data=None):
try:
if profile_data is None:
return False
default_profile = self.default_profiles['ssid']
default_profile._details['appliedRadios'] = []
if two4g is True:
default_profile._details['appliedRadios'].append("is2dot4GHz")
if fiveg is True:
default_profile._details['appliedRadios'].append("is5GHzU")
default_profile._details['appliedRadios'].append("is5GHz")
default_profile._details['appliedRadios'].append("is5GHzL")
default_profile._name = profile_data['profile_name']
default_profile._details['vlanId'] = profile_data['vlan']
default_profile._details['ssid'] = profile_data['ssid_name']
default_profile._details['forwardMode'] = profile_data['mode']
default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0]
default_profile._child_profile_ids = self.profile_creation_ids["radius"]
default_profile._details['secureMode'] = 'wpa2OnlyRadius'
profile = self.profile_client.create_profile(body=default_profile)
profile_id = profile._id
self.profile_creation_ids['ssid'].append(profile_id)
self.profile_ids.append(profile_id)
except Exception as e:
print(e)
profile = False
return profile
def create_wpa3_enterprise_ssid_profile(self, two4g=True, fiveg=True, profile_data=None):
if profile_data is None:
return False
default_profile = self.default_profiles['ssid']
default_profile._details['appliedRadios'] = []
if two4g is True:
default_profile._details['appliedRadios'].append("is2dot4GHz")
if fiveg is True:
default_profile._details['appliedRadios'].append("is5GHzU")
default_profile._details['appliedRadios'].append("is5GHz")
default_profile._details['appliedRadios'].append("is5GHzL")
default_profile._name = profile_data['profile_name']
default_profile._details['vlanId'] = profile_data['vlan']
default_profile._details['ssid'] = profile_data['ssid_name']
default_profile._details['keyStr'] = profile_data['security_key']
default_profile._details['forwardMode'] = profile_data['mode']
default_profile._details['secureMode'] = 'wpa3OnlyRadius'
default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0]
default_profile._child_profile_ids = self.profile_creation_ids["radius"]
profile_id = self.profile_client.create_profile(body=default_profile)._id
self.profile_creation_ids['ssid'].append(profile_id)
self.profile_ids.append(profile_id)
return True
"""
method call: used to create a ap profile that contains the given ssid profiles
"""
def set_ap_profile(self, profile_data=None):
if profile_data is None:
return False
default_profile = self.default_profiles['equipment_ap_2_radios']
default_profile._child_profile_ids = []
for i in self.profile_creation_ids:
if i != 'ap':
for j in self.profile_creation_ids[i]:
default_profile._child_profile_ids.append(j)
default_profile._name = profile_data['profile_name']
# print(default_profile)
default_profile = self.profile_client.create_profile(body=default_profile)
self.profile_creation_ids['ap'] = default_profile._id
self.profile_ids.append(default_profile._id)
return default_profile
"""
method call: used to create a radius profile with the settings given
"""
def create_radius_profile(self, radius_info=None):
default_profile = self.default_profiles['radius']
default_profile._name = radius_info['name']
default_profile._details['primaryRadiusAuthServer']['ipAddress'] = radius_info['ip']
default_profile._details['primaryRadiusAuthServer']['port'] = radius_info['port']
default_profile._details['primaryRadiusAuthServer']['secret'] = radius_info['secret']
default_profile = self.profile_client.create_profile(body=default_profile)
self.profile_creation_ids['radius'] = [default_profile._id]
self.profile_ids.append(default_profile._id)
return default_profile
"""
method to push the profile to the given equipment
"""
# Under a Bug, depreciated until resolved, should be used primarily
def push_profile(self, equipment_id=None):
pagination_context = """{
"model_type": "PaginationContext",
"maxItemsPerPage": 100
}"""
default_equipment_data = self.sdk_client.equipment_client.get_equipment_by_id(equipment_id=11, async_req=False)
# default_equipment_data._details[] = self.profile_creation_ids['ap']
# print(default_equipment_data)
# print(self.sdk_client.equipment_client.update_equipment(body=default_equipment_data, async_req=True))
"""
method to verify if the expected ssid's are loaded in the ap vif config
"""
def update_ssid_name(self, profile_name=None, new_profile_name=None):
if profile_name is None:
print("profile name is None, Please specify the ssid profile name that you want to modify")
return False
if new_profile_name is None:
print("Please specify the new name for ssid profile that you want to make to")
return False
try:
profile = self.get_profile_by_name(profile_name=profile_name)
profile._details['ssid'] = new_profile_name
self.profile_client.update_profile(profile)
return True
except Exception as e:
return False
"""
method to delete a profile by its id
"""
def delete_profile(self, profile_id=None):
for i in profile_id:
self.profile_client.delete_profile(profile_id=i)
# Need to be depreciated by using push_profile method
def push_profile_old_method(self, equipment_id=None):
if equipment_id is None:
return 0
url = self.sdk_client.configuration.host + "/portal/equipment?equipmentId=" + str(equipment_id)
payload = {}
headers = self.sdk_client.configuration.api_key_prefix
response = requests.request("GET", url, headers=headers, data=payload)
equipment_info = response.json()
equipment_info['profileId'] = self.profile_creation_ids['ap']
url = self.sdk_client.configuration.host + "/portal/equipment"
headers = {
'Content-Type': 'application/json',
'Authorization': self.sdk_client.configuration.api_key_prefix['Authorization']
}
response = requests.request("PUT", url, headers=headers, data=json.dumps(equipment_info))
return response
"""
JfrogUtility base class used for FirmwareUtility for Artifactory Management
Used for getting the latest firmware image
credentials required are as follows:
username
password
jfrog-base-url
build [pending]
branch [dev, trunk]
"""
class JFrogUtility:
def __init__(self, credentials=None):
if credentials is None:
exit()
self.jfrog_url = credentials["jfrog-base-url"]
self.build = credentials["build"]
self.branch = credentials["branch"]
ssl._create_default_https_context = ssl._create_unverified_context
def get_build(self, model=None, version=None):
jfrog_url = self.jfrog_url + "/" + model + "/" + self.branch + "/"
''' FIND THE LATEST FILE NAME'''
print(jfrog_url)
req = urllib.request.Request(jfrog_url)
response = urllib.request.urlopen(req)
# print(response)
html = response.read()
soup = BeautifulSoup(html, features="html.parser")
if self.branch == "trunk":
self.build = model
last_link = soup.find_all('a', href=re.compile(self.build))
latest_fw = None
for i in last_link:
if str(i['href']).__contains__(version):
latest_fw = i['href']
latest_fw = latest_fw.replace('.tar.gz', '')
print("Using Firmware Image: ", latest_fw)
return latest_fw
"""
FirmwareUtility class
uses JfrogUtility base class
sdk_client [ controller_tests instance ]
controller_data [ sdk_base_url ] needed only if sdk_instance is not passed
customer_id [ 2 ] needed only if sdk_instance is not passed
"""
class FirmwareUtility(JFrogUtility):
def __init__(self,
sdk_client=None,
jfrog_credentials=None,
controller_data=None,
customer_id=None,
model=None,
version=None):
super().__init__(credentials=jfrog_credentials)
if sdk_client is None:
sdk_client = Controller(controller_data=controller_data, customer_id=customer_id)
self.sdk_client = sdk_client
self.firmware_client = FirmwareManagementApi(api_client=sdk_client.api_client)
self.jfrog_client = JFrogUtility(credentials=jfrog_credentials)
self.equipment_gateway_client = EquipmentGatewayApi(api_client=sdk_client.api_client)
self.model = model
self.fw_version = version
def get_fw_version(self):
# Get The equipment model
self.latest_fw = self.get_build(model=self.model, version=self.fw_version)
return self.latest_fw
def upload_fw_on_cloud(self, fw_version=None, force_upload=False):
print("Upload fw version :", fw_version)
# force_upload = True
# if fw_latest available and force upload is False -- Don't upload
# if fw_latest available and force upload is True -- Upload
# if fw_latest is not available -- Upload
fw_id = self.is_fw_available(fw_version=fw_version)
if fw_id and not force_upload:
print("Firmware Version Already Available, Skipping upload", "Force Upload :", force_upload)
# Don't Upload the fw
return fw_id
else:
if fw_id and force_upload:
print("Firmware Version Already Available, Deleting and Uploading Again")
self.firmware_client.delete_firmware_version(firmware_version_id=fw_id)
print("Force Upload :", force_upload, " Deleted current Image")
time.sleep(2)
# if force_upload is true and latest image available, then delete the image
firmware_data = {
"id": 0,
"equipmentType": "AP",
"modelId": fw_version.split("-")[0],
"versionName": fw_version + ".tar.gz",
"description": fw_version + " FW VERSION",
"filename": self.jfrog_url + "/" + self.model + "/" + self.branch + "/" + fw_version + ".tar.gz",
}
print(firmware_data["filename"])
firmware_id = self.firmware_client.create_firmware_version(body=firmware_data)
print("Force Upload :", force_upload, " Uploaded the Image")
return firmware_id._id
def upgrade_fw(self, equipment_id=None, force_upgrade=False, force_upload=False):
if equipment_id is None:
print("No Equipment Id Given")
exit()
if (force_upgrade is True) or (self.should_upgrade_ap_fw(equipment_id=equipment_id)):
model = self.sdk_client.get_model_name(equipment_id=equipment_id).lower()
latest_fw = self.get_fw_version()
firmware_id = self.upload_fw_on_cloud(fw_version=latest_fw, force_upload=force_upload)
time.sleep(5)
try:
obj = self.equipment_gateway_client.request_firmware_update(equipment_id=equipment_id,
firmware_version_id=firmware_id)
print("Request firmware upgrade Success! waiting for 300 sec")
time.sleep(300)
except Exception as e:
print(e)
obj = False
return obj
# Write the upgrade fw logic here
def should_upgrade_ap_fw(self, equipment_id=None):
current_fw = self.sdk_client.get_ap_firmware_old_method(equipment_id=equipment_id)
latest_fw = self.get_fw_version()
print(self.model, current_fw, latest_fw)
if current_fw == latest_fw:
return False
else:
return True
def is_fw_available(self, fw_version=None):
if fw_version is None:
exit()
try:
firmware_version = self.firmware_client.get_firmware_version_by_name(
firmware_version_name=fw_version + ".tar.gz")
firmware_version = firmware_version._id
print("Firmware ID: ", firmware_version)
except Exception as e:
print(e)
firmware_version = False
print("firmware not available: ", firmware_version)
return firmware_version
if __name__ == '__main__':
"""
Examples to Try Out
"""
controller = {
'url': "https://wlan-portal-svc-nola-ext-03.cicd.lab.wlan.tip.build", # API base url for the controller
'username': 'support@example.com',
'password': 'support',
'version': "1.1.0-SNAPSHOT",
'commit_date': "2021-04-27"
}
sdk_client = Controller(controller_data=controller)
# Use Library/ Method Here
sdk_client.disconnect_Controller()