diff --git a/libs/controller/controller.py b/libs/controller/controller.py deleted file mode 100644 index 0f5d20ed6..000000000 --- a/libs/controller/controller.py +++ /dev/null @@ -1,1419 +0,0 @@ -# !/usr/local/lib64/python3.8 -""" - Controller Library - 1. controller_data/sdk_base_url - 2. login credentials -""" -import base64 -import datetime -import json -import re -import ssl -import time -import urllib - -import requests -import swagger_client -from swagger_client import FirmwareManagementApi -from swagger_client import EquipmentGatewayApi -from bs4 import BeautifulSoup -import threading - - -class ConfigureController: - - def __init__(self): - self.configuration = swagger_client.Configuration() - - def set_credentials(self, controller_data=None): - if dict(controller_data).keys().__contains__("username") and dict(controller_data).keys().__contains__( - "password"): - self.configuration.username = controller_data["username"] - self.configuration.password = controller_data["password"] - print("Login Credentials set to custom: \n user_id: %s\n password: %s\n" % (controller_data["username"], - controller_data["password"])) - return True - else: - self.configuration.username = "support@example.com" - self.configuration.password = "support" - print("Login Credentials set to default: \n user_id: %s\n password: %s\n" % ("support@example.com", - "support")) - return False - - def select_controller_data(self, controller_data=None): - if dict(controller_data).keys().__contains__("url") is None: - print("No controller_data Selected") - exit() - self.sdk_base_url = controller_data["url"] - self.configuration.host = self.sdk_base_url - print("controller_data Selected: %s\n SDK_BASE_URL: %s\n" % (controller_data["url"], self.sdk_base_url)) - return True - - def set_sdk_base_url(self, sdk_base_url=None): - if sdk_base_url is None: - print("URL is None") - exit() - self.configuration.host = sdk_base_url - return True - - -""" - Library for cloud_controller_tests generic usages, it instantiate the bearer and credentials. - It provides the connectivity to the cloud. - Instantiate the Object by providing the controller_data=controller_url, customer_id=2 -""" - - -class Controller(ConfigureController): - """ - constructor for cloud_controller_tests library : can be used from pytest framework - """ - - def __init__(self, controller_data=None, customer_id=None): - super().__init__() - self.controller_data = controller_data - self.customer_id = customer_id - if customer_id is None: - self.customer_id = 2 - print("Setting to default Customer ID 2") - # - # Setting the Controller Client Configuration - self.select_controller_data(controller_data=controller_data) - self.set_credentials(controller_data=controller_data) - self.configuration.refresh_api_key_hook = self.get_bearer_token - - # Connecting to Controller - self.api_client = swagger_client.ApiClient(self.configuration) - self.login_client = swagger_client.LoginApi(api_client=self.api_client) - self.bearer = False - self.disconnect = False - # Token expiry in seconds - self.token_expiry = 1000 - self.token_timestamp = time.time() - try: - - self.bearer = self.get_bearer_token() - # t1 = threading.Thread(target=self.refresh_instance) - # t1.start() - self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token - self.status_client = swagger_client.StatusApi(api_client=self.api_client) - self.equipment_client = swagger_client.EquipmentApi(self.api_client) - self.profile_client = swagger_client.ProfileApi(self.api_client) - self.api_client.configuration.api_key_prefix = { - "Authorization": "Bearer " + self.bearer._access_token - } - self.api_client.configuration.refresh_api_key_hook = self.refresh_instance - self.ping_response = self.portal_ping() - print("Portal details :: \n", self.ping_response) - except Exception as e: - self.bearer = False - print(e) - - print("Connected to Controller Server") - - def get_bearer_token(self): - request_body = { - "userId": self.configuration.username, - "password": self.configuration.password - } - return self.login_client.get_access_token(request_body) - - def refresh_instance(self): - # Refresh token 10 seconds before it's expiry - if time.time() - self.token_timestamp > self.token_expiry - 10: - self.token_timestamp = time.time() - print("Refreshing the controller API token") - self.disconnect_Controller() - self.api_client = swagger_client.ApiClient(self.configuration) - self.login_client = swagger_client.LoginApi(api_client=self.api_client) - self.bearer = self.get_bearer_token() - self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token - self.status_client = swagger_client.StatusApi(api_client=self.api_client) - self.equipment_client = swagger_client.EquipmentApi(self.api_client) - self.profile_client = swagger_client.ProfileApi(self.api_client) - self.api_client.configuration.api_key_prefix = { - "Authorization": "Bearer " + self.bearer._access_token - } - self.api_client.configuration.refresh_api_key_hook = self.refresh_instance - self.ping_response = self.portal_ping() - print("Portal details :: \n", self.ping_response) - if self.ping_response._application_name != 'PortalServer': - print("Server not Reachable") - exit() - print("Connected to Controller Server") - - def portal_ping(self): - self.refresh_instance() - return self.login_client.portal_ping() - - def disconnect_Controller(self): - self.refresh_instance() - self.disconnect = True - self.api_client.__del__() - - # Returns a List of All the Equipments that are available in the cloud instances - def get_equipment_by_customer_id(self, max_items=10): - self.refresh_instance() - pagination_context = """{ - "model_type": "PaginationContext", - "maxItemsPerPage": """ + str(max_items) + """ - }""" - self.refresh_instance() - equipment_data = self.equipment_client.get_equipment_by_customer_id(customer_id=self.customer_id, - pagination_context=pagination_context) - return equipment_data._items - - # check if equipment with the given equipment_id is available in cloud instance or not - def validate_equipment_availability(self, equipment_id=None): - self.refresh_instance() - data = self.get_equipment_by_customer_id() - for i in data: - if i._id == equipment_id: - return i._id - return -1 - - # Need to be added in future - def request_ap_reboot(self): - self.refresh_instance() - pass - - # Get the equipment id, of a equipment with a serial number - def get_equipment_id(self, serial_number=None): - self.refresh_instance() - equipment_data = self.get_equipment_by_customer_id(max_items=100) - # print(len(equipment_data)) - for equipment in equipment_data: - if equipment._serial == serial_number: - return equipment._id - - # Get the equipment model name of a given equipment_id - def get_model_name(self, equipment_id=None): - self.refresh_instance() - if equipment_id is None: - return None - self.refresh_instance() - data = self.equipment_client.get_equipment_by_id(equipment_id=equipment_id) - print(str(data._details._equipment_model)) - return str(data._details._equipment_model) - - # Needs Bug fix from swagger code generation side - def get_ap_firmware_new_method(self, equipment_id=None): - self.refresh_instance() - response = self.status_client.get_status_by_customer_equipment(customer_id=self.customer_id, - equipment_id=equipment_id) - print(response[2]) - - # Old Method, will be depreciated in future - def get_ap_firmware_old_method(self, equipment_id=None): - self.refresh_instance() - url = self.configuration.host + "/portal/status/forEquipment?customerId=" + str( - self.customer_id) + "&equipmentId=" + str(equipment_id) - payload = {} - headers = self.configuration.api_key_prefix - response = requests.request("GET", url, headers=headers, data=payload) - - if response.status_code == 200: - status_data = response.json() - # print(status_data) - try: - current_ap_fw = status_data[2]['details']['reportedSwVersion'] - # print(current_ap_fw) - return current_ap_fw - except Exception as e: - print(e) - current_ap_fw = "error" - return e - - else: - return False - - """ - Profile Utilities - """ - - def get_current_profile_on_equipment(self, equipment_id=None): - self.refresh_instance() - default_equipment_data = self.equipment_client.get_equipment_by_id(equipment_id=equipment_id, async_req=False) - return default_equipment_data._profile_id - - # Get the ssid's that are used by the equipment - def get_ssids_on_equipment(self, equipment_id=None): - self.refresh_instance() - profile_id = self.get_current_profile_on_equipment(equipment_id=equipment_id) - all_profiles = self.profile_client.get_profile_with_children(profile_id=profile_id) - ssid_name_list = [] - for i in all_profiles: - if i._profile_type == "ssid": - ssid_name_list.append(i._details['ssid']) - return all_profiles - - # Get the child ssid profiles that are used by equipment ap profile of given profile id - def get_ssid_profiles_from_equipment_profile(self, profile_id=None): - self.refresh_instance() - equipment_ap_profile = self.profile_client.get_profile_by_id(profile_id=profile_id) - ssid_name_list = [] - child_profile_ids = equipment_ap_profile.child_profile_ids - for i in child_profile_ids: - profile = self.profile_client.get_profile_by_id(profile_id=i) - if profile._profile_type == "ssid": - ssid_name_list.append(profile._details['ssid']) - return ssid_name_list - - -""" - Library for Profile Utility, Creating Profiles and Pushing and Deleting them - Steps to create a Profile on Controller: - create a RF Profile - create a Radius Profile - create ssid profiles, and add the radius profile in them, if needed (only used by eap ssid's) - - create equipment_ap profile, and add the rf profile and ssid profiles - Now using push profile method, equipment_ap profile will be pushed to an AP of given equipment_id - -""" - - -class ProfileUtility: - """ - constructor for Access Point Utility library : can be used from pytest framework - to control Access Points - """ - - def __init__(self, sdk_client=None, controller_data=None, customer_id=None): - if sdk_client is None: - sdk_client = Controller(controller_data=controller_data, customer_id=customer_id) - self.sdk_client = sdk_client - self.sdk_client.refresh_instance() - self.profile_client = swagger_client.ProfileApi(api_client=self.sdk_client.api_client) - self.profile_creation_ids = { - "ssid": [], - "ap": [], - "radius": [], - "rf": [], - "passpoint_osu_id_provider": [], - "passpoint_operator": [], - "passpoint_venue": [], - "passpoint": [] - } - self.profile_name_with_id = {} - self.default_profiles = {} - self.profile_ids = [] - - def cleanup_objects(self): - self.sdk_client.refresh_instance() - self.profile_creation_ids = { - "ssid": [], - "ap": [], - "radius": [], - "rf": [], - "passpoint_osu_id_provider": [], - "passpoint_operator": [], - "passpoint_venue": [], - "passpoint": [] - } - self.profile_name_with_id = {} - self.default_profiles = {} - self.profile_ids = [] - - def get_profile_by_name(self, profile_name=None): - self.sdk_client.refresh_instance() - pagination_context = """{ - "model_type": "PaginationContext", - "maxItemsPerPage": 1000 - }""" - profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, - pagination_context=pagination_context) - - for i in profiles._items: - if i._name == profile_name: - return i - return None - - def get_ssid_name_by_profile_id(self, profile_id=None): - self.sdk_client.refresh_instance() - profiles = self.profile_client.get_profile_by_id(profile_id=profile_id) - return profiles._details["ssid"] - - """ - default templates are as follows : - profile_name= TipWlan-rf/ - Radius-Profile/ - TipWlan-2-Radios/ - TipWlan-3-Radios/ - TipWlan-Cloud-Wifi/ - Captive-Portal - """ - - def get_default_profiles(self): - pagination_context = """{ - "model_type": "PaginationContext", - "maxItemsPerPage": 100 - }""" - self.sdk_client.refresh_instance() - items = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, - pagination_context=pagination_context) - - for i in items._items: - # print(i._name, i._id) - if i._name == "TipWlan-Cloud-Wifi": - self.default_profiles['ssid'] = i - if i._name == "TipWlan-3-Radios": - self.default_profiles['equipment_ap_3_radios'] = i - if i._name == "TipWlan-2-Radios": - self.default_profiles['equipment_ap_2_radios'] = i - if i._name == "Captive-Portal": - self.default_profiles['captive_portal'] = i - if i._name == "Radius-Profile": - self.default_profiles['radius'] = i - if i._name == "TipWlan-rf": - self.default_profiles['rf'] = i - # print(i) - - # This will delete the Profiles associated with an equipment of givwn equipment_id, and associate it to default - # equipment_ap profile - def delete_current_profile(self, equipment_id=None): - self.sdk_client.refresh_instance() - equipment_data = self.sdk_client.equipment_client.get_equipment_by_id(equipment_id=equipment_id) - - data = self.profile_client.get_profile_with_children(profile_id=equipment_data._profile_id) - delete_ids = [] - for i in data: - if i._name == "TipWlan-rf": - continue - else: - delete_ids.append(i._id) - # print(delete_ids) - self.get_default_profiles() - self.profile_creation_ids['ap'] = self.default_profiles['equipment_ap_3_radios']._id - # print(self.profile_creation_ids) - self.push_profile_old_method(equipment_id=equipment_id) - self.delete_profile(profile_id=delete_ids) - - # This will delete all the profiles on an controller instance, except the default profiles - def cleanup_profiles(self): - self.sdk_client.refresh_instance() - try: - self.get_default_profiles() - pagination_context = """{ - "model_type": "PaginationContext", - "maxItemsPerPage": 10000 - }""" - skip_delete_id = [] - for i in self.default_profiles: - skip_delete_id.append(self.default_profiles[i]._id) - - all_profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, - pagination_context=pagination_context) - - delete_ids = [] - for i in all_profiles._items: - delete_ids.append(i._id) - skip_delete_id = [] - for i in self.default_profiles: - skip_delete_id.append(self.default_profiles[i]._id) - delete_ids = list(set(delete_ids) - set(delete_ids).intersection(set(skip_delete_id))) - print(delete_ids) - for i in delete_ids: - self.set_equipment_to_profile(profile_id=i) - self.delete_profile(profile_id=delete_ids) - status = True - except Exception as e: - print(e) - status = False - return status - - # Delete any profile with the given name - def delete_profile_by_name(self, profile_name=None): - self.sdk_client.refresh_instance() - pagination_context = """{ - "model_type": "PaginationContext", - "maxItemsPerPage": 5000 - }""" - all_profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, - pagination_context=pagination_context) - for i in all_profiles._items: - if i._name == profile_name: - counts = self.profile_client.get_counts_of_equipment_that_use_profiles([i._id])[0] - if counts._value2: - self.set_equipment_to_profile(profile_id=i._id) - self.delete_profile(profile_id=[i._id]) - else: - self.delete_profile(profile_id=[i._id]) - - # This method will set all the equipments to default equipment_ap profile, those having the profile_id passed in - # argument - def set_equipment_to_profile(self, profile_id=None): - self.sdk_client.refresh_instance() - pagination_context = """{ - "model_type": "PaginationContext", - "maxItemsPerPage": 5000 - }""" - equipment_data = self.sdk_client.equipment_client. \ - get_equipment_by_customer_id(customer_id=2, - pagination_context=pagination_context) - self.get_default_profiles() - for i in equipment_data._items: - if i._profile_id == profile_id: - self.profile_creation_ids['ap'] = self.default_profiles['equipment_ap_2_radios']._id - self.push_profile_old_method(equipment_id=i._id) - time.sleep(2) - - """ - method call: used to create the rf profile and push set the parameters accordingly and update - Library method to create a new rf profile: Now using default profile - """ - - def set_rf_profile(self, profile_data=None, mode=None): - self.sdk_client.refresh_instance() - self.get_default_profiles() - if mode == "wifi5": - default_profile = self.default_profiles['rf'] - default_profile._name = profile_data["name"] - - default_profile._details["rfConfigMap"]["is2dot4GHz"]["rf"] = profile_data["name"] - default_profile._details["rfConfigMap"]["is5GHz"]["rf"] = profile_data["name"] - default_profile._details["rfConfigMap"]["is5GHzL"]["rf"] = profile_data["name"] - default_profile._details["rfConfigMap"]["is5GHzU"]["rf"] = profile_data["name"] - for i in default_profile._details["rfConfigMap"]: - for j in profile_data: - if i == j: - for k in default_profile._details["rfConfigMap"][i]: - for l in profile_data[j]: - if l == k: - default_profile._details["rfConfigMap"][i][l] = profile_data[j][l] - profile = self.profile_client.create_profile(body=default_profile) - self.profile_creation_ids['rf'].append(profile._id) - return profile - if mode == "wifi6": - default_profile = self.default_profiles['rf'] - default_profile._name = profile_data["name"] - default_profile._details["rfConfigMap"]["is2dot4GHz"]["activeScanSettings"]["enabled"] = False - default_profile._details["rfConfigMap"]["is2dot4GHz"]["radioMode"] = 'modeAX' - default_profile._details["rfConfigMap"]["is5GHz"]["radioMode"] = 'modeAX' - default_profile._details["rfConfigMap"]["is5GHzL"]["radioMode"] = 'modeAX' - default_profile._details["rfConfigMap"]["is5GHzU"]["radioMode"] = 'modeAX' - default_profile._details["rfConfigMap"]["is2dot4GHz"]["rf"] = profile_data["name"] - default_profile._details["rfConfigMap"]["is5GHz"]["rf"] = profile_data["name"] - default_profile._details["rfConfigMap"]["is5GHzL"]["rf"] = profile_data["name"] - default_profile._details["rfConfigMap"]["is5GHzU"]["rf"] = profile_data["name"] - default_profile._name = profile_data["name"] - for i in default_profile._details["rfConfigMap"]: - for j in profile_data: - if i == j: - for k in default_profile._details["rfConfigMap"][i]: - for l in profile_data[j]: - if l == k: - default_profile._details["rfConfigMap"][i][l] = profile_data[j][l] - profile = self.profile_client.create_profile(body=default_profile) - self.profile_creation_ids['rf'].append(profile._id) - return profile - - """ - method call: used to create a ssid profile with the given parameters - """ - - # Open - def create_open_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - - default_profile._name = profile_data['profile_name'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details['secureMode'] = 'open' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - self.profile_name_with_id[profile_data["ssid_name"]] = profile_id - except Exception as e: - print(e) - profile = "error" - - return profile - - # wpa personal - def create_wpa_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - self.get_default_profiles() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['keyStr'] = profile_data['security_key'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details['secureMode'] = 'wpaPSK' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa2 personal - def create_wpa2_personal_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['keyStr'] = profile_data['security_key'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details['secureMode'] = 'wpa2OnlyPSK' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa3 personal - def create_wpa3_personal_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['keyStr'] = profile_data['security_key'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details['secureMode'] = 'wpa3OnlySAE' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa3 personal mixed mode - def create_wpa3_personal_mixed_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['keyStr'] = profile_data['security_key'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details['secureMode'] = 'wpa3MixedSAE' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa wpa2 personal mixed mode - def create_wpa_wpa2_personal_mixed_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['keyStr'] = profile_data['security_key'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details['secureMode'] = 'wpa2PSK' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa enterprise done - def create_wpa_enterprise_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] - default_profile._child_profile_ids = self.profile_creation_ids["radius"] - default_profile._details['secureMode'] = 'wpaRadius' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa wpa2 enterprise mixed mode done - def create_wpa_wpa2_enterprise_mixed_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] - default_profile._child_profile_ids = self.profile_creation_ids["radius"] - default_profile._details['secureMode'] = 'wpa2Radius' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa2 enterprise mode ssid profile - def create_wpa2_enterprise_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] - default_profile._child_profile_ids = self.profile_creation_ids["radius"] - default_profile._details['secureMode'] = 'wpa2OnlyRadius' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa3 enterprise mode - def create_wpa3_enterprise_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] - default_profile._child_profile_ids = self.profile_creation_ids["radius"] - default_profile._details['secureMode'] = 'wpa3OnlyEAP' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa3 enterprise mixed mode done - def create_wpa3_enterprise_mixed_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] - default_profile._child_profile_ids = self.profile_creation_ids["radius"] - default_profile._details['secureMode'] = 'wpa3MixedEAP' - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # wpa3 enterprise mixed mode done - def create_wep_ssid_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - try: - if profile_data is None: - return False - default_profile = self.default_profiles['ssid'] - default_profile._details['appliedRadios'] = profile_data["appliedRadios"] - default_profile._name = profile_data['profile_name'] - default_profile._details['vlanId'] = profile_data['vlan'] - default_profile._details['ssid'] = profile_data['ssid_name'] - default_profile._details['forwardMode'] = profile_data['mode'] - default_profile._details['secureMode'] = 'wep' - default_profile._details['wepConfig'] = {} - default_profile._details['wepConfig']["model_type"] = "WepConfiguration" - default_profile._details['wepConfig']["wepAuthType"] = "open" - default_profile._details['wepConfig']["primaryTxKeyId"] = profile_data["default_key_id"] - default_profile._details['wepConfig']["wepKeys"] = [{'model_type': 'WepKey', - 'txKey': profile_data["wep_key"], - 'txKeyConverted': None, - 'txKeyType': 'wep64'}, - {'model_type': 'WepKey', - 'txKey': profile_data["wep_key"], - 'txKeyConverted': None, - 'txKeyType': 'wep64'}, - {'model_type': 'WepKey', - 'txKey': profile_data["wep_key"], - 'txKeyConverted': None, - 'txKeyType': 'wep64'}, - {'model_type': 'WepKey', - 'txKey': profile_data["wep_key"], - 'txKeyConverted': None, - 'txKeyType': 'wep64'}] - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids['ssid'].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - def __get_boolean(self, flag): - return 'true' if flag in ["Enabled", "True"] else 'false' - - # wpa eap general method - def __create_wpa_eap_passpoint_ssid_profiles(self, profile_data=None, secure_mode=None): - try: - if profile_data is None or secure_mode is None: - return False - default_profile = self.default_profiles["ssid"] - default_profile._details["appliedRadios"] = profile_data["appliedRadios"] - default_profile._name = profile_data["profile_name"] - default_profile._details["vlanId"] = profile_data["vlan"] - default_profile._details["ssid"] = profile_data["ssid_name"] - default_profile._details["forwardMode"] = profile_data["mode"] - default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] - default_profile._child_profile_ids = self.profile_creation_ids["radius"] - default_profile._details["secureMode"] = secure_mode - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids["ssid"].append(profile_id) - self.profile_ids.append(profile_id) - self.profile_name_with_id[profile_data["ssid_name"]] = profile_id - except Exception as e: - print(e) - profile = False - return profile - - # wpa eap passpoint - def create_wpa_eap_passpoint_ssid_profile(self, profile_data=None): - if profile_data is None: - return False - return self.__create_wpa_eap_passpoint_ssid_profiles(profile_data, "wpaEAP") - - # wpa2 eap passpoint - def create_wpa2_eap_passpoint_ssid_profile(self, profile_data=None): - if profile_data is None: - return False - return self.__create_wpa_eap_passpoint_ssid_profiles(profile_data, "wpa2EAP") - - # wpa2only eap passpoint - def create_wpa2_only_eap_passpoint_ssid_profile(self, profile_data=None): - if profile_data is None: - return False - return self.__create_wpa_eap_passpoint_ssid_profiles(profile_data, "wpa2OnlyEAP") - - # passpoint osu id provider profile - def create_passpoint_osu_id_provider_profile(self, profile_data=None): - try: - if profile_data is None: - return False - default_profile = dict() - default_profile["model_type"] = "Profile" - default_profile["customerId"] = self.sdk_client.customer_id - default_profile["profileType"] = "passpoint_osu_id_provider" - default_profile["name"] = profile_data["profile_name"] - details = dict() - details["model_type"] = "PasspointOsuProviderProfile" - mcc_mnc = dict() - if (profile_data["mcc"] and profile_data["mnc"]) is not None: - mcc_mnc = {"mcc": profile_data["mcc"], "mnc": profile_data["mnc"]} - if profile_data["network"] is not None: - mcc_mnc["network"] = profile_data["network"] - if mcc_mnc: - details["mccMncList"] = [mcc_mnc] - if (profile_data["mcc"] and profile_data["mnc"]) is not None: - details["mccMncList"] = [{"mcc": profile_data["mcc"], "mnc": profile_data["mnc"]}] - if profile_data["osu_nai_standalone"] is not None: - details["osuNaiStandalone"] = profile_data["osu_nai_standalone"] - if profile_data["osu_nai_shared"] is not None: - details["osuNaiShared"] = profile_data["osu_nai_shared"] - if profile_data["nai_realms"] is not None: - details["naiRealmList"] = [{"naiRealms": [profile_data["nai_realms"]["domain"]], - "encoding": profile_data["nai_realms"]["encoding"], - "eapMap": profile_data["nai_realms"]["eap_map"] - }] - details["roamingOi"] = profile_data["roaming_oi"] - default_profile['details'] = details - default_profile['childProfileIds'] = [] - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids["passpoint_osu_id_provider"].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # passpoint operator profile - def create_passpoint_operator_profile(self, profile_data=None): - try: - if profile_data is None: - return False - default_profile = dict() - default_profile["model_type"] = "Profile" - default_profile["customerId"] = self.sdk_client.customer_id - default_profile["profileType"] = "passpoint_operator" - default_profile["name"] = profile_data["profile_name"] - - default_profile["details"] = dict() - default_profile["details"]["model_type"] = "PasspointOperatorProfile" - default_profile["details"]["serverOnlyAuthenticatedL2EncryptionNetwork"] = \ - self.__get_boolean(profile_data["osen"]) - operator_names = [] - operators = profile_data["operator_names"] - for operator in profile_data["operator_names"]: - operator_temp = dict() - for key in operator.keys(): - if key == "name": - operator_temp["dupleName"] = operator["name"] - else: - operator_temp[key] = operator[key] - operator_names.append(operator_temp) - default_profile["details"]["operatorFriendlyName"] = operator_names - default_profile["details"]["domainNameList"] = profile_data["domain_name_list"] - default_profile["childProfileIds"] = [] - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids["passpoint_operator"].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - profile = False - return profile - - # passpoint venue profile - def create_passpoint_venue_profile(self, profile_data=None): - try: - if profile_data is None: - return False - default_profile = dict() - default_profile["model_type"] = "Profile" - default_profile["customerId"] = self.sdk_client.customer_id - default_profile["profileType"] = "passpoint_venue" - default_profile["name"] = profile_data["profile_name"] - default_profile["details"] = dict() - default_profile["details"]["model_type"] = "PasspointVenueProfile" - venue_names = [] - for venue in profile_data["venue_names"]: - venue_temp = dict() - for key in venue.keys(): - if key == "name": - venue_temp["dupleName"] = venue["name"] - if key == "url": - venue_temp["venueUrl"] = venue["url"] - venue_names.append(venue_temp) - default_profile["details"]["venueNameSet"] = venue_names - allowed_venue_groups = {"Unspecified": 0, "Assembly": 1, "Business": 2, "Educational": 3, - "Factory and Industrial": 4, "Institutional": 5, "Mercantile": 6, "Residential": 7} - allowed_venue_types = {"Unspecified Assembly": 0, "Areana": 1, "Stadium": 2, "Passenger Terminal": 3, - "Amphitheatre": 4, "Amusement Park": 5, "Place of Worship": 6, - "Convention Center": 7, - "Library": 8, "Museum": 9, "Restaurant": 10, "Theatre": 11, "Bar": 12, - "Coffee Shop": 13, - "Zoo or Aquarium": 14, "Emergency Coordination Center": 15, - "Unspecified Business": 0, "Doctor or Dentist office": 1, "Bank": 2, - "Fire Station": 3, - "Police Station": 4, "Post Office": 5, "Professional Office": 6, - "Research and Development Facility": 7, "Attorney Office": 8, - "Unspecified Educational": 0, "School, Primary": 1, "School, Secondary": 2, - "University or College": 3, "Unspecified Factory and Industrial": 0, "Factory": 1, - "Unspecified Institutional": 0, "Hospital": 1, "Long-Term Care Facility": 2, - "Alcohol and Drug Re-habilitation Center": 3, "Group Home": 4, "Prison or Jail": 5, - "Unspecified Mercantile": 0, "Retail Store": 1, "Grocery Market": 2, - "Automotive Service Station": 3, "Shopping Mall": 4, "Gas Station": 5, - "Unspecified Residential": 0, "Pivate Residence": 1, "Hotel or Model": 2, - "Dormitory": 3, "Boarding House": 4} - default_profile["details"]["venueTypeAssignment"] = {"venueGroupId": - allowed_venue_groups[ - profile_data["venue_type"]["group"]], - "venueTypeId": - allowed_venue_types[ - profile_data["venue_type"]["type"]]} - default_profile["childProfileIds"] = [] - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids["passpoint_venue"].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - # passpoint profile - def create_passpoint_profile(self, profile_data=None): - try: - if profile_data is None: - return False - default_profile = dict() - default_profile["model_type"] = "Profile" - default_profile["customerId"] = self.sdk_client.customer_id - default_profile["profileType"] = "passpoint" - default_profile["name"] = profile_data["profile_name"] - - default_profile["details"] = dict() - default_profile["details"]["model_type"] = "PasspointProfile" - default_profile["details"]["enableInterworkingAndHs20"] = self.__get_boolean( - profile_data["interworking_hs2dot0"]) - if profile_data["hessid"] is not None: - default_profile["details"]["hessid"] = dict() - default_profile["details"]["hessid"]["addressAsString"] = profile_data["hessid"] - default_profile["details"]["passpointAccessNetworkType"] = \ - profile_data["access_network"]["Access Network Type"].replace(' ', '_').lower() - default_profile["details"]["passpointNetworkAuthenticationType"] = \ - profile_data["access_network"]["Authentication Type"].replace('&', 'and').replace(' ', '_').lower() - default_profile["details"]["emergencyServicesReachable"] = self.__get_boolean( - profile_data["access_network"][ - "Emergency Services Reachable"]) - default_profile["details"]["unauthenticatedEmergencyServiceAccessible"] = self.__get_boolean( - profile_data["access_network"][ - "Unauthenticated Emergency Service"]) - default_profile["details"]["internetConnectivity"] = self.__get_boolean(profile_data["ip_connectivity"][ - "Internet Connectivity"]) - capability_set = [] - for cap in profile_data["ip_connectivity"]["Connection Capability"]: - capability_info = dict() - capability_info["connectionCapabilitiesPortNumber"] = cap["port"] - capability_info["connectionCapabilitiesIpProtocol"] = cap["protocol"] - capability_info["connectionCapabilitiesStatus"] = cap["status"] - capability_set.append(capability_info) - default_profile["details"]["connectionCapabilitySet"] = capability_set - default_profile["details"]["ipAddressTypeAvailability"] = profile_data["ip_connectivity"]["IP Address Type"] - allowed_gas_address_behavior = {"P2P Spec Workaround From Request": "p2pSpecWorkaroundFromRequest", - "forceNonCompliantBehaviourFromRequest": "forceNonCompliantBehaviourFromRequest", - "IEEE 80211 Standard Compliant Only": "ieee80211StandardCompliantOnly"} - default_profile["details"]["gasAddr3Behaviour"] = allowed_gas_address_behavior[ - profile_data["ip_connectivity"] - ["GAS Address 3 Behaviour"]] - default_profile["details"]["anqpDomainId"] = profile_data["ip_connectivity"]["ANQP Domain ID"] - default_profile["details"]["disableDownstreamGroupAddressedForwarding"] = self.__get_boolean( - profile_data["ip_connectivity"][ - "Disable DGAF"]) - default_profile["details"]["associatedAccessSsidProfileIds"] = profile_data["allowed_ssids"] - default_profile["details"]["passpointOperatorProfileId"] = self.profile_creation_ids["passpoint_operator"][0] - default_profile["details"]["passpointVenueProfileId"] = self.profile_creation_ids["passpoint_venue"][0] - default_profile["details"]["passpointOsuProviderProfileIds"] = self.profile_creation_ids[ - "passpoint_osu_id_provider"] - default_profile["details"]["accessNetworkType"] = \ - profile_data["access_network"]["Access Network Type"].replace(' ', '_').lower() - # osuSsidProfileId is needed for R2 - default_profile["details"]["networkAuthenticationType"] = \ - profile_data["access_network"]["Authentication Type"].replace('&', 'and').replace(' ', '_').lower() - default_profile["childProfileIds"] = self.profile_creation_ids["passpoint_venue"] + \ - self.profile_creation_ids["passpoint_operator"] + \ - self.profile_creation_ids["passpoint_osu_id_provider"] - profile = self.profile_client.create_profile(body=default_profile) - profile_id = profile._id - self.profile_creation_ids["passpoint"].append(profile_id) - self.profile_ids.append(profile_id) - except Exception as e: - print(e) - profile = False - return profile - - """ - method call: used to create a ap profile that contains the given ssid profiles - """ - - def set_ap_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - if profile_data is None: - return False - default_profile = self.default_profiles['equipment_ap_2_radios'] - default_profile._child_profile_ids = [] - for i in self.profile_creation_ids: - if i not in ["ap", "passpoint_osu_id_provider", "passpoint_operator", "passpoint_venue", "passpoint", - "radius"]: - for j in self.profile_creation_ids[i]: - default_profile._child_profile_ids.append(j) - - default_profile._name = profile_data['profile_name'] - # print(default_profile) - default_profile = self.profile_client.create_profile(body=default_profile) - self.profile_creation_ids['ap'] = default_profile._id - self.profile_ids.append(default_profile._id) - return default_profile - - """ - method call: used to create a ap profile that contains the given ssid profiles - """ - - def set_ap_profile_custom(self, profile_data=None): - self.sdk_client.refresh_instance() - if profile_data is None: - return False - default_profile = self.default_profiles['equipment_ap_2_radios'] - default_profile._child_profile_ids = [] - for i in self.profile_creation_ids: - if i not in ["ap", "passpoint_osu_id_provider", "passpoint_operator", "passpoint_venue", "passpoint", - "radius", "ssid"]: - for j in self.profile_creation_ids[i]: - default_profile._child_profile_ids.append(j) - for ssid in profile_data["ssid_names"]: - default_profile._child_profile_ids.append(self.profile_name_with_id[ssid]) - default_profile._name = profile_data['profile_name'] - default_profile = self.profile_client.create_profile(body=default_profile) - self.profile_creation_ids['ap'] = default_profile._id - self.profile_ids.append(default_profile._id) - return default_profile - - """ - method call: used to create a ap profile that contains the specific ssid profiles - """ - - def update_ap_profile(self, profile_data=None): - self.sdk_client.refresh_instance() - if profile_data is None: - print("profile info is None, Please specify the profile info that you want to update") - return False - - child_profiles_to_apply = [] - try: - for ssid in profile_data["ssid_names"]: - child_profiles_to_apply.append(self.profile_name_with_id[ssid]) - default_profile = self.get_profile_by_name(profile_name=profile_data["profile_name"]) - for i in self.profile_creation_ids: - if i not in ["ap", "passpoint_osu_id_provider", "passpoint_operator", "passpoint_venue", "passpoint", - "radius", "ssid"]: - for j in self.profile_creation_ids[i]: - child_profiles_to_apply.append(j) - default_profile._child_profile_ids = child_profiles_to_apply - default_profile = self.profile_client.update_profile(default_profile) - return True - except Exception as e: - print(e) - return False - - """ - method call: used to create a radius profile with the settings given - """ - - def create_radius_profile(self, radius_info=None, radius_accounting_info=None): - self.sdk_client.refresh_instance() - default_profile = self.default_profiles['radius'] - default_profile._name = radius_info['name'] - default_profile._details['primaryRadiusAuthServer'] = {} - default_profile._details['primaryRadiusAuthServer']['ipAddress'] = radius_info['ip'] - default_profile._details['primaryRadiusAuthServer']['port'] = radius_info['port'] - default_profile._details['primaryRadiusAuthServer']['secret'] = radius_info['secret'] - if radius_accounting_info is not None: - default_profile._details["primaryRadiusAccountingServer"] = {} - default_profile._details["primaryRadiusAccountingServer"]["ipAddress"] = radius_accounting_info["ip"] - default_profile._details["primaryRadiusAccountingServer"]["port"] = radius_accounting_info["port"] - default_profile._details["primaryRadiusAccountingServer"]["secret"] = radius_accounting_info["secret"] - default_profile = self.profile_client.create_profile(body=default_profile) - self.profile_creation_ids['radius'] = [default_profile._id] - self.profile_ids.append(default_profile._id) - return default_profile - - """ - method to push the profile to the given equipment - """ - - # Under a Bug, depreciated until resolved, should be used primarily - def push_profile(self, equipment_id=None): - self.sdk_client.refresh_instance() - pagination_context = """{ - "model_type": "PaginationContext", - "maxItemsPerPage": 100 - }""" - default_equipment_data = self.sdk_client.equipment_client.get_equipment_by_id(equipment_id=11, async_req=False) - # default_equipment_data._details[] = self.profile_creation_ids['ap'] - # print(default_equipment_data) - # print(self.sdk_client.equipment_client.update_equipment(body=default_equipment_data, async_req=True)) - - """ - method to verify if the expected ssid's are loaded in the ap vif config - """ - - def update_ssid_name(self, profile_name=None, new_profile_name=None): - self.sdk_client.refresh_instance() - if profile_name is None: - print("profile name is None, Please specify the ssid profile name that you want to modify") - return False - if new_profile_name is None: - print("Please specify the new name for ssid profile that you want to make to") - return False - - try: - profile = self.get_profile_by_name(profile_name=profile_name) - profile._details['ssid'] = new_profile_name - self.profile_client.update_profile(profile) - return True - except Exception as e: - return False - - def update_ssid_profile(self, profile_info=None): - self.sdk_client.refresh_instance() - if profile_info is None: - print("profile info is None, Please specify the profile info that you want to update") - return False - - try: - profile = self.get_profile_by_name(profile_name=profile_info["ssid_profile_name"]) - profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] - profile._child_profile_ids = self.profile_creation_ids["radius"] + self.profile_creation_ids["passpoint"] - if "radius_configuration" in profile_info.keys(): - if "radius_acounting_service_interval" in profile_info["radius_configuration"].keys(): - profile._details["radiusAcountingServiceInterval"] = profile_info["radius_configuration"]["radius_acounting_service_interval"] - if "user_defined_nas_id" in profile_info["radius_configuration"].keys(): - profile._details["radiusClientConfiguration"]["userDefinedNasId"] = profile_info["radius_configuration"]["user_defined_nas_id"] - if "operator_id" in profile_info["radius_configuration"].keys(): - profile._details["radiusClientConfiguration"]["operatorId"] = profile_info["radius_configuration"]["operator_id"] - self.profile_client.update_profile(profile) - return True - except Exception as e: - print(e) - return False - - def clear_ssid_profile(self, profile_name=None): - if profile_name is None: - print("profile name is None, Please specify the ssid profile name that you want to update") - return False - - try: - profile = self.get_profile_by_name(profile_name=profile_name) - profile._details["radiusServiceId"] = None - profile._child_profile_ids = [] - self.profile_client.update_profile(profile) - return True - except Exception as e: - print(e) - return False - - """ - method to delete a profile by its id - """ - - def delete_profile(self, profile_id=None): - self.sdk_client.refresh_instance() - for i in profile_id: - self.profile_client.delete_profile(profile_id=i) - - # Need to be depreciated by using push_profile method - def push_profile_old_method(self, equipment_id=None): - self.sdk_client.refresh_instance() - if equipment_id is None: - return 0 - url = self.sdk_client.configuration.host + "/portal/equipment?equipmentId=" + str(equipment_id) - payload = {} - headers = self.sdk_client.configuration.api_key_prefix - response = requests.request("GET", url, headers=headers, data=payload) - equipment_info = response.json() - equipment_info['profileId'] = self.profile_creation_ids['ap'] - url = self.sdk_client.configuration.host + "/portal/equipment" - headers = { - 'Content-Type': 'application/json', - 'Authorization': self.sdk_client.configuration.api_key_prefix['Authorization'] - } - - response = requests.request("PUT", url, headers=headers, data=json.dumps(equipment_info)) - return response - - -""" - - FirmwareUtility class - uses JfrogUtility base class - sdk_client [ controller_tests instance ] - controller_data [ sdk_base_url ] needed only if sdk_instance is not passed - customer_id [ 2 ] needed only if sdk_instance is not passed -""" - - -class FirmwareUtility: - - def __init__(self, - sdk_client=None, - jfrog_credentials=None, - controller_data=None, - customer_id=2, - model=None, - version_url=None): - # super().__init__(credentials=jfrog_credentials) - if sdk_client is None: - sdk_client = Controller(controller_data=controller_data, customer_id=customer_id) - self.sdk_client = sdk_client - self.sdk_client.refresh_instance() - self.firmware_client = FirmwareManagementApi(api_client=sdk_client.api_client) - # self.jfrog_client = JFrogUtility(credentials=jfrog_credentials) - self.equipment_gateway_client = EquipmentGatewayApi(api_client=sdk_client.api_client) - self.model = model - self.fw_version = version_url - - def get_fw_version(self): - fw_version = self.fw_version.split("/")[-1] - return fw_version - - def upload_fw_on_cloud(self, force_upload=False): - self.sdk_client.refresh_instance() - fw_version = self.fw_version.split("/")[-1] - print("Upload fw version :", self.fw_version) - fw_id = self.is_fw_available(fw_version=fw_version) - if fw_id and not force_upload: - print("Skipping upload, Firmware Already Available", "Force Upload :", force_upload) - # Don't Upload the fw - return fw_id - else: - if fw_id and force_upload: - print("Firmware Version Already Available, Deleting and Uploading Again", - " Force Upload :", force_upload) - self.firmware_client.delete_firmware_version(firmware_version_id=fw_id) - print("Deleted Firmware Image from cloud, uploading again") - time.sleep(2) - # if force_upload is true and latest image available, then delete the image - firmware_data = { - "id": 0, - "equipmentType": "AP", - "modelId": str(self.model).upper(), - "versionName": fw_version, - "description": fw_version + " FW VERSION", - "filename": self.fw_version, - } - firmware_id = self.firmware_client.create_firmware_version(body=firmware_data) - print("Uploaded the Image: ", fw_version) - return firmware_id._id - - def upgrade_fw(self, equipment_id=None, force_upgrade=False, force_upload=False): - self.sdk_client.refresh_instance() - if equipment_id is None: - print("No Equipment Id Given") - exit() - if (force_upgrade is True) or (self.should_upgrade_ap_fw(equipment_id=equipment_id)): - firmware_id = self.upload_fw_on_cloud(force_upload=force_upload) - time.sleep(5) - try: - obj = self.equipment_gateway_client.request_firmware_update(equipment_id=equipment_id, - firmware_version_id=firmware_id) - print("Request firmware upgrade Success! waiting for 300 sec") - time.sleep(400) - except Exception as e: - print(e) - obj = False - return obj - # Write the upgrade fw logic here - - def should_upgrade_ap_fw(self, equipment_id=None): - self.sdk_client.refresh_instance() - current_fw = self.sdk_client.get_ap_firmware_old_method(equipment_id=equipment_id) - latest_fw = self.get_fw_version() - print(self.model, current_fw, latest_fw) - if current_fw == latest_fw: - return False - else: - return True - - def is_fw_available(self, fw_version=None): - self.sdk_client.refresh_instance() - if fw_version is None: - exit() - try: - firmware_version = self.firmware_client.get_firmware_version_by_name( - firmware_version_name=fw_version) - firmware_version = firmware_version._id - print("Firmware ID: ", firmware_version) - except Exception as e: - print(e) - firmware_version = False - print("firmware not available: ", firmware_version) - return firmware_version - - -# This is for Unit tests on Controller Library -if __name__ == '__main__': - controller = { - 'url': "https://wlan-portal-svc-nola-ext-04.cicd.lab.wlan.tip.build", # API base url for the controller - 'username': 'support@example.com', - 'password': 'support', - 'version': "1.1.0-SNAPSHOT", - 'commit_date': "2021-04-27" - } - api = Controller(controller_data=controller) - profile = ProfileUtility(sdk_client=api) - profile_data = { - "name": "test-rf-wifi-6", - "is2dot4GHz": {}, - "is5GHz": {"channelBandwidth": "is20MHz"}, - "is5GHzL": {"channelBandwidth": "is20MHz"}, - "is5GHzU": {"channelBandwidth": "is20MHz"} - } - profile.set_rf_profile(profile_data=profile_data, mode="wifi6") - print(profile.default_profiles["rf"]) - # profile.cleanup_profiles() - - # profile.get_default_profiles() - # profile_data = { - # "profile_name": "ssid_wep_2g", - # "ssid_name": "ssid_wep_2g", - # "appliedRadios": ["is2dot4GHz"], - # "default_key_id" : 1, - # "wep_key" : 1234567890, - # "vlan": 1, - # "mode": "BRIDGE" - # } - # profile.create_wep_ssid_profile(profile_data=profile_data) - # print(profile.get_profile_by_name(profile_name="wpa_wpa2_eap")) - # profile.get_default_profiles() - api.disconnect_Controller() diff --git a/libs/pytest_fixtures/__init__.py b/libs/pytest_fixtures/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/conftest.py b/tests/conftest.py index 97950f3cd..59e666d9d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -41,8 +41,8 @@ from LANforge.LFUtils import * if 'py-json' not in sys.path: sys.path.append('../py-scripts') from apnos.apnos import APNOS -from controller.controller import Controller -from controller.controller import FirmwareUtility +from controller.controller_1x.controller import Controller +from controller.controller_1x.controller import FirmwareUtility import pytest from cv_test_manager import cv_test from configuration import CONFIGURATION diff --git a/tests/e2e/advanced/conftest.py b/tests/e2e/advanced/conftest.py index da719e185..cc0d86943 100644 --- a/tests/e2e/advanced/conftest.py +++ b/tests/e2e/advanced/conftest.py @@ -9,7 +9,7 @@ sys.path.append( if "libs" not in sys.path: sys.path.append(f'../libs') -from controller.controller import ProfileUtility +from controller.controller_1x.controller import ProfileUtility import time from lanforge.lf_tests import RunTest from lanforge.lf_tools import ChamberView diff --git a/tests/e2e/basic/conftest.py b/tests/e2e/basic/conftest.py index 9e3e0337f..41fd09f61 100644 --- a/tests/e2e/basic/conftest.py +++ b/tests/e2e/basic/conftest.py @@ -10,7 +10,7 @@ sys.path.append( if "libs" not in sys.path: sys.path.append(f'../libs') -from controller.controller import ProfileUtility +from controller.controller_1x.controller import ProfileUtility from controller.controller_2x.controller import UProfileUtility import time from lanforge.lf_tests import RunTest diff --git a/tests/e2e/interOp/android/OpenRoamingPassPoint/conftest.py b/tests/e2e/interOp/android/OpenRoamingPassPoint/conftest.py index a1c92b1e1..9a3acae77 100644 --- a/tests/e2e/interOp/android/OpenRoamingPassPoint/conftest.py +++ b/tests/e2e/interOp/android/OpenRoamingPassPoint/conftest.py @@ -20,7 +20,7 @@ from configuration import PASSPOINT_PROVIDER_INFO from configuration import PASSPOINT_OPERATOR_INFO from configuration import PASSPOINT_VENUE_INFO from configuration import PASSPOINT_PROFILE_INFO -from controller.controller import ProfileUtility +from controller.controller_1x.controller import ProfileUtility @allure.feature("PASSPOINT CONNECTIVITY SETUP") @@ -37,7 +37,6 @@ def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment profile_data = {} if parameter["mode"] not in ["BRIDGE", "NAT", "VLAN"]: print("Invalid Mode: ", parameter["mode"]) - allure.attach(body=parameter["mode"], name="Invalid Mode: ") yield test_cases if parameter["mode"] == "NAT": diff --git a/tests/e2e/interOp/conftest.py b/tests/e2e/interOp/conftest.py index ae5a00a2c..3c43d1989 100644 --- a/tests/e2e/interOp/conftest.py +++ b/tests/e2e/interOp/conftest.py @@ -20,9 +20,9 @@ if "libs" not in sys.path: sys.path.append(f'../libs') import allure from apnos.apnos import APNOS -from controller.controller import Controller -from controller.controller import ProfileUtility -from controller.controller import FirmwareUtility +from controller.controller_1x.controller import Controller +from controller.controller_1x.controller import ProfileUtility +from controller.controller_1x.controller import FirmwareUtility import pytest import logging from configuration import RADIUS_SERVER_DATA @@ -68,7 +68,7 @@ def get_APToMobileDevice_data(request): "UploadMbps": "//*[@id='knowledge-verticals-internetspeedtest__upload']/P[@class='spiqle']", #Android "platformName-android": request.config.getini("platformName-android"), - "appPackage-android": request.config.getini("appPackage-android") + "appPackage-android": request.config.getini("appPackage-android") } yield passPoint_data @@ -93,7 +93,7 @@ def get_ToggleAirplaneMode_data(request): "UploadMbps": "//*[@id='knowledge-verticals-internetspeedtest__upload']/P[@class='spiqle']", #Android "platformName-android": request.config.getini("platformName-android"), - "appPackage-android": request.config.getini("appPackage-android") + "appPackage-android": request.config.getini("appPackage-android") } yield passPoint_data @@ -104,7 +104,7 @@ def get_ToggleWifiMode_data(request): "bundleId-iOS-Settings": request.config.getini("bundleId-iOS-Settings"), #Android "platformName-android": request.config.getini("platformName-android"), - "appPackage-android": request.config.getini("appPackage-android") + "appPackage-android": request.config.getini("appPackage-android") } yield passPoint_data @@ -189,401 +189,45 @@ def get_current_profile_cloud(instantiate_profile): @pytest.fixture(scope="session") def setup_vlan(): vlan_id = [100] - allure.attach(body=str(vlan_id), name="VLAN Created: ") yield vlan_id[0] -@allure.feature("CLIENT CONNECTIVITY SETUP") @pytest.fixture(scope="class") -def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment_id, - instantiate_profile, get_markers, - get_security_flags, get_configuration, radius_info, get_apnos): - instantiate_profile = instantiate_profile(sdk_client=setup_controller) - vlan_id, mode = 0, 0 - instantiate_profile.cleanup_objects() - parameter = dict(request.param) - print(parameter) - test_cases = {} - profile_data = {} - if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]: - print("Invalid Mode: ", parameter['mode']) - allure.attach(body=parameter['mode'], name="Invalid Mode: ") - yield test_cases +def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment_id, fixtures_ver, + instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools, + get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info): + lf_tools.reset_scenario() + param = dict(request.param) - if parameter['mode'] == "NAT": - mode = "NAT" - vlan_id = 1 - if parameter['mode'] == "BRIDGE": - mode = "BRIDGE" - vlan_id = 1 - if parameter['mode'] == "VLAN": - mode = "BRIDGE" - vlan_id = setup_vlan + # VLAN Setup + if request.param["mode"] == "VLAN": - instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Equipment-AP-" + parameter['mode']) + vlan_list = list() + refactored_vlan_list = list() + ssid_modes = request.param["ssid_modes"].keys() + for mode in ssid_modes: + for ssid in range(len(request.param["ssid_modes"][mode])): + if "vlan" in request.param["ssid_modes"][mode][ssid]: + vlan_list.append(request.param["ssid_modes"][mode][ssid]["vlan"]) + else: + pass + if vlan_list: + [refactored_vlan_list.append(x) for x in vlan_list if x not in refactored_vlan_list] + vlan_list = refactored_vlan_list + for i in range(len(vlan_list)): + if vlan_list[i] > 4095 or vlan_list[i] < 1: + vlan_list.pop(i) + if request.param["mode"] == "VLAN": + lf_tools.add_vlan(vlan_ids=vlan_list) - profile_data["equipment_ap"] = {"profile_name": testbed + "-Equipment-AP-" + parameter['mode']} - profile_data["ssid"] = {} - for i in parameter["ssid_modes"]: - profile_data["ssid"][i] = [] - for j in range(len(parameter["ssid_modes"][i])): - profile_name = testbed + "-SSID-" + i + "-" + str(j) + "-" + parameter['mode'] - data = parameter["ssid_modes"][i][j] - data["profile_name"] = profile_name - if "mode" not in dict(data).keys(): - data["mode"] = mode - if "vlan" not in dict(data).keys(): - data["vlan"] = vlan_id - instantiate_profile.delete_profile_by_name(profile_name=profile_name) - profile_data["ssid"][i].append(data) - # print(profile_name) - # print(profile_data) + # call this, if 1.x + return_1x = fixtures_ver.setup_profiles(request, param, setup_controller, testbed, setup_vlan, get_equipment_id, + instantiate_profile, + get_markers, create_lanforge_chamberview_dut, lf_tools, + get_security_flags, get_configuration, radius_info, get_apnos, + radius_accounting_info) + yield return_1x - instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + mode) - time.sleep(10) - """ - Setting up rf profile - """ - rf_profile_data = { - "name": "RF-Profile-" + testbed + "-" + parameter['mode'] + "-" + - get_configuration['access_point'][0]['mode'] - } - - for i in parameter["rf"]: - rf_profile_data[i] = parameter['rf'][i] - # print(rf_profile_data) - - try: - instantiate_profile.delete_profile_by_name(profile_name=rf_profile_data['name']) - instantiate_profile.set_rf_profile(profile_data=rf_profile_data, - mode=get_configuration['access_point'][0]['mode']) - allure.attach(body=str(rf_profile_data), - name="RF Profile Created : " + get_configuration['access_point'][0]['mode']) - except Exception as e: - print(e) - allure.attach(body=str(e), name="Exception ") - - # Radius Profile Creation - if parameter["radius"]: - radius_info = radius_info - radius_info["name"] = testbed + "-Automation-Radius-Profile-" + testbed - instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + testbed) - try: - # pass - instantiate_profile.create_radius_profile(radius_info=radius_info) - allure.attach(body=str(radius_info), - name="Radius Profile Created") - test_cases['radius_profile'] = True - except Exception as e: - print(e) - test_cases['radius_profile'] = False - - # SSID Profile Creation - print(get_markers) - for mode in profile_data['ssid']: - if mode == "open": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if "twog" in get_markers.keys() and get_markers["twog"] and "is2dot4GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_open_ssid_profile(profile_data=j) - test_cases["open_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["open_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - try: - if "fiveg" in get_markers.keys() and get_markers["fiveg"] and "is5GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_open_ssid_profile(profile_data=j) - test_cases["open_5g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["open_5g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if "twog" in get_markers.keys() and get_markers["twog"] and "is2dot4GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa_ssid_profile(profile_data=j) - test_cases["wpa_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - try: - if "fiveg" in get_markers.keys() and get_markers["fiveg"] and "is5GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa_ssid_profile(profile_data=j) - test_cases["wpa_5g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa_5g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - if mode == "wpa2_personal": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if "twog" in get_markers.keys() and get_markers["twog"] and "is2dot4GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa2_personal_ssid_profile(profile_data=j) - test_cases["wpa2_personal_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa2_personal_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - try: - if "fiveg" in get_markers.keys() and get_markers["fiveg"] and "is5GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa2_personal_ssid_profile(profile_data=j) - test_cases["wpa2_personal_5g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa2_personal_5g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa_wpa2_personal_mixed": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if "twog" in get_markers.keys() and get_markers["twog"] and "is2dot4GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa_wpa2_personal_mixed_ssid_profile(profile_data=j) - test_cases["wpa_wpa2_personal_mixed_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa_wpa2_personal_mixed_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - try: - if "fiveg" in get_markers.keys() and get_markers["fiveg"] and "is5GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa_wpa2_personal_mixed_ssid_profile(profile_data=j) - test_cases["wpa_wpa2_personal_mixed_5g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa_wpa2_personal_mixed_5g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - if mode == "wpa3_personal": - for j in profile_data["ssid"][mode]: - print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if "twog" in get_markers.keys() and get_markers["twog"] and "is2dot4GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa3_personal_ssid_profile(profile_data=j) - test_cases["wpa3_personal_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_personal_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - try: - if "fiveg" in get_markers.keys() and get_markers["fiveg"] and "is5GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa3_personal_ssid_profile(profile_data=j) - test_cases["wpa3_personal_5g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_personal_5g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - if mode == "wpa3_personal_mixed": - for j in profile_data["ssid"][mode]: - print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if "twog" in get_markers.keys() and get_markers["twog"] and "is2dot4GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa3_personal_mixed_ssid_profile( - profile_data=j) - test_cases["wpa3_personal_mixed_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_personal_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - try: - if "fiveg" in get_markers.keys() and get_markers["fiveg"] and "is5GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa3_personal_mixed_ssid_profile( - profile_data=j) - test_cases["wpa3_personal_mixed_5g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_personal_5g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa2_enterprise": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if "twog" in get_markers.keys() and get_markers["twog"] and "is2dot4GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa2_enterprise_ssid_profile(profile_data=j) - test_cases["wpa2_enterprise_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa2_enterprise_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - try: - if "fiveg" in get_markers.keys() and get_markers["fiveg"] and "is5GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa2_enterprise_ssid_profile(profile_data=j) - test_cases["wpa2_enterprise_5g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa2_enterprise_5g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa3_enterprise": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if "twog" in get_markers.keys() and get_markers["twog"] and "is2dot4GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa3_enterprise_ssid_profile(profile_data=j) - test_cases["wpa3_enterprise_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_enterprise_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - try: - if "fiveg" in get_markers.keys() and get_markers["fiveg"] and "is5GHz" in list( - j["appliedRadios"]): - creates_profile = instantiate_profile.create_wpa3_enterprise_ssid_profile(profile_data=j) - test_cases["wpa3_enterprise_5g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_enterprise_5g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - # Equipment AP Profile Creation - try: - instantiate_profile.set_ap_profile(profile_data=profile_data['equipment_ap']) - test_cases["equipment_ap"] = True - allure.attach(body=str(profile_data['equipment_ap']), - name="Equipment AP Profile Created") - except Exception as e: - print(e) - test_cases["equipment_ap"] = False - allure.attach(body=str(e), - name="Equipment AP Profile Creation Failed") - - # Push the Equipment AP Profile to AP - try: - for i in get_equipment_id: - instantiate_profile.push_profile_old_method(equipment_id=i) - except Exception as e: - print(e) - print("failed to create AP Profile") - - ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/") - ssid_names = [] - for i in instantiate_profile.profile_creation_ids["ssid"]: - ssid_names.append(instantiate_profile.get_ssid_name_by_profile_id(profile_id=i)) - ssid_names.sort() - - # This loop will check the VIF Config with cloud profile - vif_config = [] - test_cases['vifc'] = False - for i in range(0, 18): - vif_config = list(ap_ssh.get_vif_config_ssids()) - vif_config.sort() - print(vif_config) - print(ssid_names) - if ssid_names == vif_config: - test_cases['vifc'] = True - break - time.sleep(10) - allure.attach(body=str("VIF Config: " + str(vif_config) + "\n" + "SSID Pushed from Controller: " + str(ssid_names)), - name="SSID Profiles in VIF Config and Controller: ") - ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/") - - # This loop will check the VIF Config with VIF State - test_cases['vifs'] = False - for i in range(0, 18): - vif_state = list(ap_ssh.get_vif_state_ssids()) - vif_state.sort() - vif_config = list(ap_ssh.get_vif_config_ssids()) - vif_config.sort() - print(vif_config) - print(vif_state) - if vif_state == vif_config: - test_cases['vifs'] = True - break - time.sleep(10) - allure.attach(body=str("VIF Config: " + str(vif_config) + "\n" + "VIF State: " + str(vif_state)), - name="SSID Profiles in VIF Config and VIF State: ") - print(test_cases) - - def teardown_session(): - print("\nRemoving Profiles") - instantiate_profile.delete_profile_by_name(profile_name=profile_data['equipment_ap']['profile_name']) - instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["ssid"]) - instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["radius"]) - instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["rf"]) - allure.attach(body=str(profile_data['equipment_ap']['profile_name'] + "\n"), - name="Tear Down in Profiles ") - time.sleep(20) - - request.addfinalizer(teardown_session) - yield test_cases @@ -610,7 +254,7 @@ def failure_tracking_fixture(request): print("tests_failed_during_module: ") print(tests_failed_during_module) yield tests_failed_during_module - + @pytest.fixture(scope="class") def get_vif_state(get_apnos, get_configuration): @@ -641,7 +285,7 @@ def pytest_runtest_makereport(item, call): TestCaseFullName = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] nCurrentTestMethodNameSplit = re.sub(r'\[.*?\]\ *', "", TestCaseFullName) #print("TestCasefullNameTEST: " + TestCaseFullName) - try: + try: #TestCaseName = nCurrentTestMethodNameSplit.removeprefix('test_') TestCaseName = nCurrentTestMethodNameSplit.replace('test_', '') #print ("\nTestCaseName: " + TestCaseName) @@ -653,7 +297,7 @@ def pytest_runtest_makereport(item, call): #exception = call.excinfo.value #exception_class = call.excinfo.type #exception_class_name = call.excinfo.typename - + #exception_traceback = call.excinfo.traceback if result.outcome == "failed": @@ -663,7 +307,7 @@ def pytest_runtest_makereport(item, call): testCaseNameList.append(TestCaseName) testCaseStatusList.append(testCaseFailedStatusValue) testCaseErrorMsg.append(exception_type_and_message_formatted) - testCaseReportURL.append(reporting_client.report_url()) + testCaseReportURL.append(reporting_client.report_url()) print("\n TestStatus: " + testCaseFailedStatusValue) print(" FailureMsg: " + str(testCaseErrorMsg)) @@ -687,9 +331,9 @@ def pytest_runtest_makereport(item, call): testCaseErrorMsg.append(str(exception_type_Skipped_message_formatted)) testCaseReportURL.append(reporting_client.report_url()) print("\n TestStatus: " + testCaseSkippedStatusValue) - print(" FailureMsg: " + str(testCaseErrorMsg)) - reportPerfecto(TestCaseName, testCaseSkippedStatusValue, testCaseErrorMsg, str(reporting_client.report_url())) - + print(" FailureMsg: " + str(testCaseErrorMsg)) + reportPerfecto(TestCaseName, testCaseSkippedStatusValue, testCaseErrorMsg, str(reporting_client.report_url())) + def pytest_sessionfinish(session, exitstatus): @@ -699,17 +343,17 @@ def pytest_sessionfinish(session, exitstatus): passed_amount = sum(1 for result in session.results.values() if result.passed) failed_amount = sum(1 for result in session.results.values() if result.failed) skipped_amount = sum(1 for result in session.results.values() if result.skipped) - # print(f'There are {passed_amount} passed and {failed_amount} failed tests') + # print(f'There are {passed_amount} passed and {failed_amount} failed tests') TotalExecutedCount = failed_amount + passed_amount + skipped_amount print('\n------------------------------------') - print('Interop Perfecto TestCase Execution Summary') + print('Interop Perfecto TestCase Execution Summary') print('------------------------------------') - print('Total TestCase Executed: ' + str(TotalExecutedCount)) + print('Total TestCase Executed: ' + str(TotalExecutedCount)) print('Total Passed: ' + str(passed_amount)) print('Total Failed: ' + str(failed_amount)) print('Total Skipped: ' + str(skipped_amount) + "\n") - + try: for index in range(len(testCaseNameList)): print(str(index+1) + ") " + str(testCaseNameList[index]) + " : " + str(testCaseStatusList[index])) @@ -725,16 +369,16 @@ def setup_perfectoMobile_android(request): from appium import webdriver driver = None reporting_client = None - + warnings.simplefilter("ignore", ResourceWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - + capabilities = { 'platformName': request.config.getini("platformName-android"), 'model': request.config.getini("model-android"), 'browserName': 'mobileOS', #'automationName' : 'Appium', - 'securityToken' : request.config.getini("securityToken"), + 'securityToken' : request.config.getini("securityToken"), 'useAppiumForWeb' : 'false', 'useAppiumForHybrid' : 'false', #'bundleId' : request.config.getini("appPackage-android"), @@ -742,7 +386,7 @@ def setup_perfectoMobile_android(request): driver = webdriver.Remote('https://'+request.config.getini("perfectoURL")+'.perfectomobile.com/nexperience/perfectomobile/wd/hub', capabilities) driver.implicitly_wait(35) - + TestCaseFullName = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] nCurrentTestMethodNameSplit = re.sub(r'\[.*?\]\ *', "", TestCaseFullName) try: @@ -753,7 +397,7 @@ def setup_perfectoMobile_android(request): TestCaseName = nCurrentTestMethodNameSplit print("\nUpgrade Python to 3.9 to avoid test_ string in your test case name, see below URL") #print("https://www.andreagrandi.it/2020/10/11/python39-introduces-removeprefix-removesuffix/") - + projectname = request.config.getini("projectName") projectversion = request.config.getini("projectVersion") jobname = request.config.getini("jobName") @@ -774,14 +418,14 @@ def setup_perfectoMobile_android(request): print("----------------------------------------------------------\n\n\n\n") driver.close() except Exception as e: - print(" -- Exception While Tear Down --") + print(" -- Exception While Tear Down --") driver.close() print (e) finally: try: driver.quit() except Exception as e: - print(" -- Exception Not Able To Quit --") + print(" -- Exception Not Able To Quit --") print (e) request.addfinalizer(teardown) @@ -789,7 +433,7 @@ def setup_perfectoMobile_android(request): if driver is None: yield -1 else: - yield driver,reporting_client + yield driver,reporting_client def reportClient(value): global reporting_client # declare a to be a global @@ -811,7 +455,7 @@ def setup_perfectoMobileWeb(request): from selenium import webdriver rdriver = None reporting_client = None - + warnings.simplefilter("ignore", ResourceWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -839,18 +483,18 @@ def setup_perfectoMobileWeb(request): def teardown(): try: - print(" -- Tear Down --") + print(" -- Tear Down --") reporting_client.test_stop(TestResultFactory.create_success()) print('Report-Url: ' + reporting_client.report_url() + '\n') rdriver.close() except Exception as e: - print(" -- Exception Not Able To close --") + print(" -- Exception Not Able To close --") print (e.message) finally: try: rdriver.quit() except Exception as e: - print(" -- Exception Not Able To Quit --") + print(" -- Exception Not Able To Quit --") print (e.message) request.addfinalizer(teardown) @@ -858,24 +502,24 @@ def setup_perfectoMobileWeb(request): if rdriver is None: yield -1 else: - yield rdriver,reporting_client + yield rdriver,reporting_client @pytest.fixture(scope="function") def setup_perfectoMobile_iOS(request): from appium import webdriver driver = None reporting_client = None - + warnings.simplefilter("ignore", ResourceWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - + capabilities = { 'platformName': request.config.getini("platformName-iOS"), 'model': request.config.getini("model-iOS"), 'browserName': 'safari', #'automationName' : 'Appium', - 'securityToken' : request.config.getini("securityToken"), - 'useAppiumForWeb' : 'false', + 'securityToken' : request.config.getini("securityToken"), + 'useAppiumForWeb' : 'false', 'autoAcceptAlerts' : 'true', #'bundleId' : request.config.getini("bundleId-iOS"), 'useAppiumForHybrid' : 'false', @@ -883,7 +527,7 @@ def setup_perfectoMobile_iOS(request): driver = webdriver.Remote('https://'+request.config.getini("perfectoURL")+'.perfectomobile.com/nexperience/perfectomobile/wd/hub', capabilities) driver.implicitly_wait(35) - + TestCaseFullName = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] nCurrentTestMethodNameSplit = re.sub(r'\[.*?\]\ *', "", TestCaseFullName) try: @@ -894,7 +538,7 @@ def setup_perfectoMobile_iOS(request): TestCaseName = nCurrentTestMethodNameSplit print("\nUpgrade Python to 3.9 to avoid test_ string in your test case name, see below URL") #print("https://www.andreagrandi.it/2020/10/11/python39-introduces-removeprefix-removesuffix/") - + projectname = request.config.getini("projectName") projectversion = request.config.getini("projectVersion") jobname = request.config.getini("jobName") @@ -915,20 +559,20 @@ def setup_perfectoMobile_iOS(request): print("----------------------------------------------------------\n\n\n\n") driver.close() except Exception as e: - print(" -- Exception While Tear Down --") + print(" -- Exception While Tear Down --") driver.close() print (e) finally: try: driver.quit() except Exception as e: - print(" -- Exception Not Able To Quit --") + print(" -- Exception Not Able To Quit --") print (e) - + request.addfinalizer(teardown) if driver is None: yield -1 else: - yield driver,reporting_client + yield driver,reporting_client diff --git a/tests/e2e/interOp/iOS/OpenRoamingPassPoint/conftest.py b/tests/e2e/interOp/iOS/OpenRoamingPassPoint/conftest.py index a1c92b1e1..31b125cf4 100644 --- a/tests/e2e/interOp/iOS/OpenRoamingPassPoint/conftest.py +++ b/tests/e2e/interOp/iOS/OpenRoamingPassPoint/conftest.py @@ -20,7 +20,7 @@ from configuration import PASSPOINT_PROVIDER_INFO from configuration import PASSPOINT_OPERATOR_INFO from configuration import PASSPOINT_VENUE_INFO from configuration import PASSPOINT_PROFILE_INFO -from controller.controller import ProfileUtility +from controller.controller_1x.controller import ProfileUtility @allure.feature("PASSPOINT CONNECTIVITY SETUP") diff --git a/tests/fixtures_1x.py b/tests/fixtures_1x.py index cfa932821..d51539fe3 100644 --- a/tests/fixtures_1x.py +++ b/tests/fixtures_1x.py @@ -21,8 +21,8 @@ from LANforge.LFUtils import * if 'py-json' not in sys.path: sys.path.append('../py-scripts') from apnos.apnos import APNOS -from controller.controller import Controller -from controller.controller import FirmwareUtility +from controller.controller_1x.controller import Controller +from controller.controller_1x.controller import FirmwareUtility import pytest from cv_test_manager import cv_test from configuration import CONFIGURATION