diff --git a/lanforge/lanforge-scripts b/lanforge/lanforge-scripts index 7fe8579ce..16276e080 160000 --- a/lanforge/lanforge-scripts +++ b/lanforge/lanforge-scripts @@ -1 +1 @@ -Subproject commit 7fe8579cebaa4a7559049e4cbaa30f805098555e +Subproject commit 16276e080cce82cac7d6d27737893f872d4c7a96 diff --git a/libs/controller/controller_1x/__init__.py b/libs/controller/controller_1x/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/libs/controller/controller_1x/controller.py b/libs/controller/controller_1x/controller.py new file mode 100644 index 000000000..0f5d20ed6 --- /dev/null +++ b/libs/controller/controller_1x/controller.py @@ -0,0 +1,1419 @@ +# !/usr/local/lib64/python3.8 +""" + Controller Library + 1. controller_data/sdk_base_url + 2. login credentials +""" +import base64 +import datetime +import json +import re +import ssl +import time +import urllib + +import requests +import swagger_client +from swagger_client import FirmwareManagementApi +from swagger_client import EquipmentGatewayApi +from bs4 import BeautifulSoup +import threading + + +class ConfigureController: + + def __init__(self): + self.configuration = swagger_client.Configuration() + + def set_credentials(self, controller_data=None): + if dict(controller_data).keys().__contains__("username") and dict(controller_data).keys().__contains__( + "password"): + self.configuration.username = controller_data["username"] + self.configuration.password = controller_data["password"] + print("Login Credentials set to custom: \n user_id: %s\n password: %s\n" % (controller_data["username"], + controller_data["password"])) + return True + else: + self.configuration.username = "support@example.com" + self.configuration.password = "support" + print("Login Credentials set to default: \n user_id: %s\n password: %s\n" % ("support@example.com", + "support")) + return False + + def select_controller_data(self, controller_data=None): + if dict(controller_data).keys().__contains__("url") is None: + print("No controller_data Selected") + exit() + self.sdk_base_url = controller_data["url"] + self.configuration.host = self.sdk_base_url + print("controller_data Selected: %s\n SDK_BASE_URL: %s\n" % (controller_data["url"], self.sdk_base_url)) + return True + + def set_sdk_base_url(self, sdk_base_url=None): + if sdk_base_url is None: + print("URL is None") + exit() + self.configuration.host = sdk_base_url + return True + + +""" + Library for cloud_controller_tests generic usages, it instantiate the bearer and credentials. + It provides the connectivity to the cloud. + Instantiate the Object by providing the controller_data=controller_url, customer_id=2 +""" + + +class Controller(ConfigureController): + """ + constructor for cloud_controller_tests library : can be used from pytest framework + """ + + def __init__(self, controller_data=None, customer_id=None): + super().__init__() + self.controller_data = controller_data + self.customer_id = customer_id + if customer_id is None: + self.customer_id = 2 + print("Setting to default Customer ID 2") + # + # Setting the Controller Client Configuration + self.select_controller_data(controller_data=controller_data) + self.set_credentials(controller_data=controller_data) + self.configuration.refresh_api_key_hook = self.get_bearer_token + + # Connecting to Controller + self.api_client = swagger_client.ApiClient(self.configuration) + self.login_client = swagger_client.LoginApi(api_client=self.api_client) + self.bearer = False + self.disconnect = False + # Token expiry in seconds + self.token_expiry = 1000 + self.token_timestamp = time.time() + try: + + self.bearer = self.get_bearer_token() + # t1 = threading.Thread(target=self.refresh_instance) + # t1.start() + self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token + self.status_client = swagger_client.StatusApi(api_client=self.api_client) + self.equipment_client = swagger_client.EquipmentApi(self.api_client) + self.profile_client = swagger_client.ProfileApi(self.api_client) + self.api_client.configuration.api_key_prefix = { + "Authorization": "Bearer " + self.bearer._access_token + } + self.api_client.configuration.refresh_api_key_hook = self.refresh_instance + self.ping_response = self.portal_ping() + print("Portal details :: \n", self.ping_response) + except Exception as e: + self.bearer = False + print(e) + + print("Connected to Controller Server") + + def get_bearer_token(self): + request_body = { + "userId": self.configuration.username, + "password": self.configuration.password + } + return self.login_client.get_access_token(request_body) + + def refresh_instance(self): + # Refresh token 10 seconds before it's expiry + if time.time() - self.token_timestamp > self.token_expiry - 10: + self.token_timestamp = time.time() + print("Refreshing the controller API token") + self.disconnect_Controller() + self.api_client = swagger_client.ApiClient(self.configuration) + self.login_client = swagger_client.LoginApi(api_client=self.api_client) + self.bearer = self.get_bearer_token() + self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token + self.status_client = swagger_client.StatusApi(api_client=self.api_client) + self.equipment_client = swagger_client.EquipmentApi(self.api_client) + self.profile_client = swagger_client.ProfileApi(self.api_client) + self.api_client.configuration.api_key_prefix = { + "Authorization": "Bearer " + self.bearer._access_token + } + self.api_client.configuration.refresh_api_key_hook = self.refresh_instance + self.ping_response = self.portal_ping() + print("Portal details :: \n", self.ping_response) + if self.ping_response._application_name != 'PortalServer': + print("Server not Reachable") + exit() + print("Connected to Controller Server") + + def portal_ping(self): + self.refresh_instance() + return self.login_client.portal_ping() + + def disconnect_Controller(self): + self.refresh_instance() + self.disconnect = True + self.api_client.__del__() + + # Returns a List of All the Equipments that are available in the cloud instances + def get_equipment_by_customer_id(self, max_items=10): + self.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": """ + str(max_items) + """ + }""" + self.refresh_instance() + equipment_data = self.equipment_client.get_equipment_by_customer_id(customer_id=self.customer_id, + pagination_context=pagination_context) + return equipment_data._items + + # check if equipment with the given equipment_id is available in cloud instance or not + def validate_equipment_availability(self, equipment_id=None): + self.refresh_instance() + data = self.get_equipment_by_customer_id() + for i in data: + if i._id == equipment_id: + return i._id + return -1 + + # Need to be added in future + def request_ap_reboot(self): + self.refresh_instance() + pass + + # Get the equipment id, of a equipment with a serial number + def get_equipment_id(self, serial_number=None): + self.refresh_instance() + equipment_data = self.get_equipment_by_customer_id(max_items=100) + # print(len(equipment_data)) + for equipment in equipment_data: + if equipment._serial == serial_number: + return equipment._id + + # Get the equipment model name of a given equipment_id + def get_model_name(self, equipment_id=None): + self.refresh_instance() + if equipment_id is None: + return None + self.refresh_instance() + data = self.equipment_client.get_equipment_by_id(equipment_id=equipment_id) + print(str(data._details._equipment_model)) + return str(data._details._equipment_model) + + # Needs Bug fix from swagger code generation side + def get_ap_firmware_new_method(self, equipment_id=None): + self.refresh_instance() + response = self.status_client.get_status_by_customer_equipment(customer_id=self.customer_id, + equipment_id=equipment_id) + print(response[2]) + + # Old Method, will be depreciated in future + def get_ap_firmware_old_method(self, equipment_id=None): + self.refresh_instance() + url = self.configuration.host + "/portal/status/forEquipment?customerId=" + str( + self.customer_id) + "&equipmentId=" + str(equipment_id) + payload = {} + headers = self.configuration.api_key_prefix + response = requests.request("GET", url, headers=headers, data=payload) + + if response.status_code == 200: + status_data = response.json() + # print(status_data) + try: + current_ap_fw = status_data[2]['details']['reportedSwVersion'] + # print(current_ap_fw) + return current_ap_fw + except Exception as e: + print(e) + current_ap_fw = "error" + return e + + else: + return False + + """ + Profile Utilities + """ + + def get_current_profile_on_equipment(self, equipment_id=None): + self.refresh_instance() + default_equipment_data = self.equipment_client.get_equipment_by_id(equipment_id=equipment_id, async_req=False) + return default_equipment_data._profile_id + + # Get the ssid's that are used by the equipment + def get_ssids_on_equipment(self, equipment_id=None): + self.refresh_instance() + profile_id = self.get_current_profile_on_equipment(equipment_id=equipment_id) + all_profiles = self.profile_client.get_profile_with_children(profile_id=profile_id) + ssid_name_list = [] + for i in all_profiles: + if i._profile_type == "ssid": + ssid_name_list.append(i._details['ssid']) + return all_profiles + + # Get the child ssid profiles that are used by equipment ap profile of given profile id + def get_ssid_profiles_from_equipment_profile(self, profile_id=None): + self.refresh_instance() + equipment_ap_profile = self.profile_client.get_profile_by_id(profile_id=profile_id) + ssid_name_list = [] + child_profile_ids = equipment_ap_profile.child_profile_ids + for i in child_profile_ids: + profile = self.profile_client.get_profile_by_id(profile_id=i) + if profile._profile_type == "ssid": + ssid_name_list.append(profile._details['ssid']) + return ssid_name_list + + +""" + Library for Profile Utility, Creating Profiles and Pushing and Deleting them + Steps to create a Profile on Controller: + create a RF Profile + create a Radius Profile + create ssid profiles, and add the radius profile in them, if needed (only used by eap ssid's) + + create equipment_ap profile, and add the rf profile and ssid profiles + Now using push profile method, equipment_ap profile will be pushed to an AP of given equipment_id + +""" + + +class ProfileUtility: + """ + constructor for Access Point Utility library : can be used from pytest framework + to control Access Points + """ + + def __init__(self, sdk_client=None, controller_data=None, customer_id=None): + if sdk_client is None: + sdk_client = Controller(controller_data=controller_data, customer_id=customer_id) + self.sdk_client = sdk_client + self.sdk_client.refresh_instance() + self.profile_client = swagger_client.ProfileApi(api_client=self.sdk_client.api_client) + self.profile_creation_ids = { + "ssid": [], + "ap": [], + "radius": [], + "rf": [], + "passpoint_osu_id_provider": [], + "passpoint_operator": [], + "passpoint_venue": [], + "passpoint": [] + } + self.profile_name_with_id = {} + self.default_profiles = {} + self.profile_ids = [] + + def cleanup_objects(self): + self.sdk_client.refresh_instance() + self.profile_creation_ids = { + "ssid": [], + "ap": [], + "radius": [], + "rf": [], + "passpoint_osu_id_provider": [], + "passpoint_operator": [], + "passpoint_venue": [], + "passpoint": [] + } + self.profile_name_with_id = {} + self.default_profiles = {} + self.profile_ids = [] + + def get_profile_by_name(self, profile_name=None): + self.sdk_client.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 1000 + }""" + profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, + pagination_context=pagination_context) + + for i in profiles._items: + if i._name == profile_name: + return i + return None + + def get_ssid_name_by_profile_id(self, profile_id=None): + self.sdk_client.refresh_instance() + profiles = self.profile_client.get_profile_by_id(profile_id=profile_id) + return profiles._details["ssid"] + + """ + default templates are as follows : + profile_name= TipWlan-rf/ + Radius-Profile/ + TipWlan-2-Radios/ + TipWlan-3-Radios/ + TipWlan-Cloud-Wifi/ + Captive-Portal + """ + + def get_default_profiles(self): + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 100 + }""" + self.sdk_client.refresh_instance() + items = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, + pagination_context=pagination_context) + + for i in items._items: + # print(i._name, i._id) + if i._name == "TipWlan-Cloud-Wifi": + self.default_profiles['ssid'] = i + if i._name == "TipWlan-3-Radios": + self.default_profiles['equipment_ap_3_radios'] = i + if i._name == "TipWlan-2-Radios": + self.default_profiles['equipment_ap_2_radios'] = i + if i._name == "Captive-Portal": + self.default_profiles['captive_portal'] = i + if i._name == "Radius-Profile": + self.default_profiles['radius'] = i + if i._name == "TipWlan-rf": + self.default_profiles['rf'] = i + # print(i) + + # This will delete the Profiles associated with an equipment of givwn equipment_id, and associate it to default + # equipment_ap profile + def delete_current_profile(self, equipment_id=None): + self.sdk_client.refresh_instance() + equipment_data = self.sdk_client.equipment_client.get_equipment_by_id(equipment_id=equipment_id) + + data = self.profile_client.get_profile_with_children(profile_id=equipment_data._profile_id) + delete_ids = [] + for i in data: + if i._name == "TipWlan-rf": + continue + else: + delete_ids.append(i._id) + # print(delete_ids) + self.get_default_profiles() + self.profile_creation_ids['ap'] = self.default_profiles['equipment_ap_3_radios']._id + # print(self.profile_creation_ids) + self.push_profile_old_method(equipment_id=equipment_id) + self.delete_profile(profile_id=delete_ids) + + # This will delete all the profiles on an controller instance, except the default profiles + def cleanup_profiles(self): + self.sdk_client.refresh_instance() + try: + self.get_default_profiles() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 10000 + }""" + skip_delete_id = [] + for i in self.default_profiles: + skip_delete_id.append(self.default_profiles[i]._id) + + all_profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, + pagination_context=pagination_context) + + delete_ids = [] + for i in all_profiles._items: + delete_ids.append(i._id) + skip_delete_id = [] + for i in self.default_profiles: + skip_delete_id.append(self.default_profiles[i]._id) + delete_ids = list(set(delete_ids) - set(delete_ids).intersection(set(skip_delete_id))) + print(delete_ids) + for i in delete_ids: + self.set_equipment_to_profile(profile_id=i) + self.delete_profile(profile_id=delete_ids) + status = True + except Exception as e: + print(e) + status = False + return status + + # Delete any profile with the given name + def delete_profile_by_name(self, profile_name=None): + self.sdk_client.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 5000 + }""" + all_profiles = self.profile_client.get_profiles_by_customer_id(customer_id=self.sdk_client.customer_id, + pagination_context=pagination_context) + for i in all_profiles._items: + if i._name == profile_name: + counts = self.profile_client.get_counts_of_equipment_that_use_profiles([i._id])[0] + if counts._value2: + self.set_equipment_to_profile(profile_id=i._id) + self.delete_profile(profile_id=[i._id]) + else: + self.delete_profile(profile_id=[i._id]) + + # This method will set all the equipments to default equipment_ap profile, those having the profile_id passed in + # argument + def set_equipment_to_profile(self, profile_id=None): + self.sdk_client.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 5000 + }""" + equipment_data = self.sdk_client.equipment_client. \ + get_equipment_by_customer_id(customer_id=2, + pagination_context=pagination_context) + self.get_default_profiles() + for i in equipment_data._items: + if i._profile_id == profile_id: + self.profile_creation_ids['ap'] = self.default_profiles['equipment_ap_2_radios']._id + self.push_profile_old_method(equipment_id=i._id) + time.sleep(2) + + """ + method call: used to create the rf profile and push set the parameters accordingly and update + Library method to create a new rf profile: Now using default profile + """ + + def set_rf_profile(self, profile_data=None, mode=None): + self.sdk_client.refresh_instance() + self.get_default_profiles() + if mode == "wifi5": + default_profile = self.default_profiles['rf'] + default_profile._name = profile_data["name"] + + default_profile._details["rfConfigMap"]["is2dot4GHz"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHz"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHzL"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHzU"]["rf"] = profile_data["name"] + for i in default_profile._details["rfConfigMap"]: + for j in profile_data: + if i == j: + for k in default_profile._details["rfConfigMap"][i]: + for l in profile_data[j]: + if l == k: + default_profile._details["rfConfigMap"][i][l] = profile_data[j][l] + profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['rf'].append(profile._id) + return profile + if mode == "wifi6": + default_profile = self.default_profiles['rf'] + default_profile._name = profile_data["name"] + default_profile._details["rfConfigMap"]["is2dot4GHz"]["activeScanSettings"]["enabled"] = False + default_profile._details["rfConfigMap"]["is2dot4GHz"]["radioMode"] = 'modeAX' + default_profile._details["rfConfigMap"]["is5GHz"]["radioMode"] = 'modeAX' + default_profile._details["rfConfigMap"]["is5GHzL"]["radioMode"] = 'modeAX' + default_profile._details["rfConfigMap"]["is5GHzU"]["radioMode"] = 'modeAX' + default_profile._details["rfConfigMap"]["is2dot4GHz"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHz"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHzL"]["rf"] = profile_data["name"] + default_profile._details["rfConfigMap"]["is5GHzU"]["rf"] = profile_data["name"] + default_profile._name = profile_data["name"] + for i in default_profile._details["rfConfigMap"]: + for j in profile_data: + if i == j: + for k in default_profile._details["rfConfigMap"][i]: + for l in profile_data[j]: + if l == k: + default_profile._details["rfConfigMap"][i][l] = profile_data[j][l] + profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['rf'].append(profile._id) + return profile + + """ + method call: used to create a ssid profile with the given parameters + """ + + # Open + def create_open_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'open' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + self.profile_name_with_id[profile_data["ssid_name"]] = profile_id + except Exception as e: + print(e) + profile = "error" + + return profile + + # wpa personal + def create_wpa_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + self.get_default_profiles() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpaPSK' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa2 personal + def create_wpa2_personal_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpa2OnlyPSK' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 personal + def create_wpa3_personal_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpa3OnlySAE' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 personal mixed mode + def create_wpa3_personal_mixed_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpa3MixedSAE' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa wpa2 personal mixed mode + def create_wpa_wpa2_personal_mixed_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['keyStr'] = profile_data['security_key'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wpa2PSK' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa enterprise done + def create_wpa_enterprise_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpaRadius' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa wpa2 enterprise mixed mode done + def create_wpa_wpa2_enterprise_mixed_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpa2Radius' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa2 enterprise mode ssid profile + def create_wpa2_enterprise_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpa2OnlyRadius' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 enterprise mode + def create_wpa3_enterprise_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpa3OnlyEAP' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 enterprise mixed mode done + def create_wpa3_enterprise_mixed_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details['secureMode'] = 'wpa3MixedEAP' + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # wpa3 enterprise mixed mode done + def create_wep_ssid_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + try: + if profile_data is None: + return False + default_profile = self.default_profiles['ssid'] + default_profile._details['appliedRadios'] = profile_data["appliedRadios"] + default_profile._name = profile_data['profile_name'] + default_profile._details['vlanId'] = profile_data['vlan'] + default_profile._details['ssid'] = profile_data['ssid_name'] + default_profile._details['forwardMode'] = profile_data['mode'] + default_profile._details['secureMode'] = 'wep' + default_profile._details['wepConfig'] = {} + default_profile._details['wepConfig']["model_type"] = "WepConfiguration" + default_profile._details['wepConfig']["wepAuthType"] = "open" + default_profile._details['wepConfig']["primaryTxKeyId"] = profile_data["default_key_id"] + default_profile._details['wepConfig']["wepKeys"] = [{'model_type': 'WepKey', + 'txKey': profile_data["wep_key"], + 'txKeyConverted': None, + 'txKeyType': 'wep64'}, + {'model_type': 'WepKey', + 'txKey': profile_data["wep_key"], + 'txKeyConverted': None, + 'txKeyType': 'wep64'}, + {'model_type': 'WepKey', + 'txKey': profile_data["wep_key"], + 'txKeyConverted': None, + 'txKeyType': 'wep64'}, + {'model_type': 'WepKey', + 'txKey': profile_data["wep_key"], + 'txKeyConverted': None, + 'txKeyType': 'wep64'}] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids['ssid'].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + def __get_boolean(self, flag): + return 'true' if flag in ["Enabled", "True"] else 'false' + + # wpa eap general method + def __create_wpa_eap_passpoint_ssid_profiles(self, profile_data=None, secure_mode=None): + try: + if profile_data is None or secure_mode is None: + return False + default_profile = self.default_profiles["ssid"] + default_profile._details["appliedRadios"] = profile_data["appliedRadios"] + default_profile._name = profile_data["profile_name"] + default_profile._details["vlanId"] = profile_data["vlan"] + default_profile._details["ssid"] = profile_data["ssid_name"] + default_profile._details["forwardMode"] = profile_data["mode"] + default_profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + default_profile._child_profile_ids = self.profile_creation_ids["radius"] + default_profile._details["secureMode"] = secure_mode + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["ssid"].append(profile_id) + self.profile_ids.append(profile_id) + self.profile_name_with_id[profile_data["ssid_name"]] = profile_id + except Exception as e: + print(e) + profile = False + return profile + + # wpa eap passpoint + def create_wpa_eap_passpoint_ssid_profile(self, profile_data=None): + if profile_data is None: + return False + return self.__create_wpa_eap_passpoint_ssid_profiles(profile_data, "wpaEAP") + + # wpa2 eap passpoint + def create_wpa2_eap_passpoint_ssid_profile(self, profile_data=None): + if profile_data is None: + return False + return self.__create_wpa_eap_passpoint_ssid_profiles(profile_data, "wpa2EAP") + + # wpa2only eap passpoint + def create_wpa2_only_eap_passpoint_ssid_profile(self, profile_data=None): + if profile_data is None: + return False + return self.__create_wpa_eap_passpoint_ssid_profiles(profile_data, "wpa2OnlyEAP") + + # passpoint osu id provider profile + def create_passpoint_osu_id_provider_profile(self, profile_data=None): + try: + if profile_data is None: + return False + default_profile = dict() + default_profile["model_type"] = "Profile" + default_profile["customerId"] = self.sdk_client.customer_id + default_profile["profileType"] = "passpoint_osu_id_provider" + default_profile["name"] = profile_data["profile_name"] + details = dict() + details["model_type"] = "PasspointOsuProviderProfile" + mcc_mnc = dict() + if (profile_data["mcc"] and profile_data["mnc"]) is not None: + mcc_mnc = {"mcc": profile_data["mcc"], "mnc": profile_data["mnc"]} + if profile_data["network"] is not None: + mcc_mnc["network"] = profile_data["network"] + if mcc_mnc: + details["mccMncList"] = [mcc_mnc] + if (profile_data["mcc"] and profile_data["mnc"]) is not None: + details["mccMncList"] = [{"mcc": profile_data["mcc"], "mnc": profile_data["mnc"]}] + if profile_data["osu_nai_standalone"] is not None: + details["osuNaiStandalone"] = profile_data["osu_nai_standalone"] + if profile_data["osu_nai_shared"] is not None: + details["osuNaiShared"] = profile_data["osu_nai_shared"] + if profile_data["nai_realms"] is not None: + details["naiRealmList"] = [{"naiRealms": [profile_data["nai_realms"]["domain"]], + "encoding": profile_data["nai_realms"]["encoding"], + "eapMap": profile_data["nai_realms"]["eap_map"] + }] + details["roamingOi"] = profile_data["roaming_oi"] + default_profile['details'] = details + default_profile['childProfileIds'] = [] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["passpoint_osu_id_provider"].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # passpoint operator profile + def create_passpoint_operator_profile(self, profile_data=None): + try: + if profile_data is None: + return False + default_profile = dict() + default_profile["model_type"] = "Profile" + default_profile["customerId"] = self.sdk_client.customer_id + default_profile["profileType"] = "passpoint_operator" + default_profile["name"] = profile_data["profile_name"] + + default_profile["details"] = dict() + default_profile["details"]["model_type"] = "PasspointOperatorProfile" + default_profile["details"]["serverOnlyAuthenticatedL2EncryptionNetwork"] = \ + self.__get_boolean(profile_data["osen"]) + operator_names = [] + operators = profile_data["operator_names"] + for operator in profile_data["operator_names"]: + operator_temp = dict() + for key in operator.keys(): + if key == "name": + operator_temp["dupleName"] = operator["name"] + else: + operator_temp[key] = operator[key] + operator_names.append(operator_temp) + default_profile["details"]["operatorFriendlyName"] = operator_names + default_profile["details"]["domainNameList"] = profile_data["domain_name_list"] + default_profile["childProfileIds"] = [] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["passpoint_operator"].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + profile = False + return profile + + # passpoint venue profile + def create_passpoint_venue_profile(self, profile_data=None): + try: + if profile_data is None: + return False + default_profile = dict() + default_profile["model_type"] = "Profile" + default_profile["customerId"] = self.sdk_client.customer_id + default_profile["profileType"] = "passpoint_venue" + default_profile["name"] = profile_data["profile_name"] + default_profile["details"] = dict() + default_profile["details"]["model_type"] = "PasspointVenueProfile" + venue_names = [] + for venue in profile_data["venue_names"]: + venue_temp = dict() + for key in venue.keys(): + if key == "name": + venue_temp["dupleName"] = venue["name"] + if key == "url": + venue_temp["venueUrl"] = venue["url"] + venue_names.append(venue_temp) + default_profile["details"]["venueNameSet"] = venue_names + allowed_venue_groups = {"Unspecified": 0, "Assembly": 1, "Business": 2, "Educational": 3, + "Factory and Industrial": 4, "Institutional": 5, "Mercantile": 6, "Residential": 7} + allowed_venue_types = {"Unspecified Assembly": 0, "Areana": 1, "Stadium": 2, "Passenger Terminal": 3, + "Amphitheatre": 4, "Amusement Park": 5, "Place of Worship": 6, + "Convention Center": 7, + "Library": 8, "Museum": 9, "Restaurant": 10, "Theatre": 11, "Bar": 12, + "Coffee Shop": 13, + "Zoo or Aquarium": 14, "Emergency Coordination Center": 15, + "Unspecified Business": 0, "Doctor or Dentist office": 1, "Bank": 2, + "Fire Station": 3, + "Police Station": 4, "Post Office": 5, "Professional Office": 6, + "Research and Development Facility": 7, "Attorney Office": 8, + "Unspecified Educational": 0, "School, Primary": 1, "School, Secondary": 2, + "University or College": 3, "Unspecified Factory and Industrial": 0, "Factory": 1, + "Unspecified Institutional": 0, "Hospital": 1, "Long-Term Care Facility": 2, + "Alcohol and Drug Re-habilitation Center": 3, "Group Home": 4, "Prison or Jail": 5, + "Unspecified Mercantile": 0, "Retail Store": 1, "Grocery Market": 2, + "Automotive Service Station": 3, "Shopping Mall": 4, "Gas Station": 5, + "Unspecified Residential": 0, "Pivate Residence": 1, "Hotel or Model": 2, + "Dormitory": 3, "Boarding House": 4} + default_profile["details"]["venueTypeAssignment"] = {"venueGroupId": + allowed_venue_groups[ + profile_data["venue_type"]["group"]], + "venueTypeId": + allowed_venue_types[ + profile_data["venue_type"]["type"]]} + default_profile["childProfileIds"] = [] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["passpoint_venue"].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + # passpoint profile + def create_passpoint_profile(self, profile_data=None): + try: + if profile_data is None: + return False + default_profile = dict() + default_profile["model_type"] = "Profile" + default_profile["customerId"] = self.sdk_client.customer_id + default_profile["profileType"] = "passpoint" + default_profile["name"] = profile_data["profile_name"] + + default_profile["details"] = dict() + default_profile["details"]["model_type"] = "PasspointProfile" + default_profile["details"]["enableInterworkingAndHs20"] = self.__get_boolean( + profile_data["interworking_hs2dot0"]) + if profile_data["hessid"] is not None: + default_profile["details"]["hessid"] = dict() + default_profile["details"]["hessid"]["addressAsString"] = profile_data["hessid"] + default_profile["details"]["passpointAccessNetworkType"] = \ + profile_data["access_network"]["Access Network Type"].replace(' ', '_').lower() + default_profile["details"]["passpointNetworkAuthenticationType"] = \ + profile_data["access_network"]["Authentication Type"].replace('&', 'and').replace(' ', '_').lower() + default_profile["details"]["emergencyServicesReachable"] = self.__get_boolean( + profile_data["access_network"][ + "Emergency Services Reachable"]) + default_profile["details"]["unauthenticatedEmergencyServiceAccessible"] = self.__get_boolean( + profile_data["access_network"][ + "Unauthenticated Emergency Service"]) + default_profile["details"]["internetConnectivity"] = self.__get_boolean(profile_data["ip_connectivity"][ + "Internet Connectivity"]) + capability_set = [] + for cap in profile_data["ip_connectivity"]["Connection Capability"]: + capability_info = dict() + capability_info["connectionCapabilitiesPortNumber"] = cap["port"] + capability_info["connectionCapabilitiesIpProtocol"] = cap["protocol"] + capability_info["connectionCapabilitiesStatus"] = cap["status"] + capability_set.append(capability_info) + default_profile["details"]["connectionCapabilitySet"] = capability_set + default_profile["details"]["ipAddressTypeAvailability"] = profile_data["ip_connectivity"]["IP Address Type"] + allowed_gas_address_behavior = {"P2P Spec Workaround From Request": "p2pSpecWorkaroundFromRequest", + "forceNonCompliantBehaviourFromRequest": "forceNonCompliantBehaviourFromRequest", + "IEEE 80211 Standard Compliant Only": "ieee80211StandardCompliantOnly"} + default_profile["details"]["gasAddr3Behaviour"] = allowed_gas_address_behavior[ + profile_data["ip_connectivity"] + ["GAS Address 3 Behaviour"]] + default_profile["details"]["anqpDomainId"] = profile_data["ip_connectivity"]["ANQP Domain ID"] + default_profile["details"]["disableDownstreamGroupAddressedForwarding"] = self.__get_boolean( + profile_data["ip_connectivity"][ + "Disable DGAF"]) + default_profile["details"]["associatedAccessSsidProfileIds"] = profile_data["allowed_ssids"] + default_profile["details"]["passpointOperatorProfileId"] = self.profile_creation_ids["passpoint_operator"][0] + default_profile["details"]["passpointVenueProfileId"] = self.profile_creation_ids["passpoint_venue"][0] + default_profile["details"]["passpointOsuProviderProfileIds"] = self.profile_creation_ids[ + "passpoint_osu_id_provider"] + default_profile["details"]["accessNetworkType"] = \ + profile_data["access_network"]["Access Network Type"].replace(' ', '_').lower() + # osuSsidProfileId is needed for R2 + default_profile["details"]["networkAuthenticationType"] = \ + profile_data["access_network"]["Authentication Type"].replace('&', 'and').replace(' ', '_').lower() + default_profile["childProfileIds"] = self.profile_creation_ids["passpoint_venue"] + \ + self.profile_creation_ids["passpoint_operator"] + \ + self.profile_creation_ids["passpoint_osu_id_provider"] + profile = self.profile_client.create_profile(body=default_profile) + profile_id = profile._id + self.profile_creation_ids["passpoint"].append(profile_id) + self.profile_ids.append(profile_id) + except Exception as e: + print(e) + profile = False + return profile + + """ + method call: used to create a ap profile that contains the given ssid profiles + """ + + def set_ap_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + if profile_data is None: + return False + default_profile = self.default_profiles['equipment_ap_2_radios'] + default_profile._child_profile_ids = [] + for i in self.profile_creation_ids: + if i not in ["ap", "passpoint_osu_id_provider", "passpoint_operator", "passpoint_venue", "passpoint", + "radius"]: + for j in self.profile_creation_ids[i]: + default_profile._child_profile_ids.append(j) + + default_profile._name = profile_data['profile_name'] + # print(default_profile) + default_profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['ap'] = default_profile._id + self.profile_ids.append(default_profile._id) + return default_profile + + """ + method call: used to create a ap profile that contains the given ssid profiles + """ + + def set_ap_profile_custom(self, profile_data=None): + self.sdk_client.refresh_instance() + if profile_data is None: + return False + default_profile = self.default_profiles['equipment_ap_2_radios'] + default_profile._child_profile_ids = [] + for i in self.profile_creation_ids: + if i not in ["ap", "passpoint_osu_id_provider", "passpoint_operator", "passpoint_venue", "passpoint", + "radius", "ssid"]: + for j in self.profile_creation_ids[i]: + default_profile._child_profile_ids.append(j) + for ssid in profile_data["ssid_names"]: + default_profile._child_profile_ids.append(self.profile_name_with_id[ssid]) + default_profile._name = profile_data['profile_name'] + default_profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['ap'] = default_profile._id + self.profile_ids.append(default_profile._id) + return default_profile + + """ + method call: used to create a ap profile that contains the specific ssid profiles + """ + + def update_ap_profile(self, profile_data=None): + self.sdk_client.refresh_instance() + if profile_data is None: + print("profile info is None, Please specify the profile info that you want to update") + return False + + child_profiles_to_apply = [] + try: + for ssid in profile_data["ssid_names"]: + child_profiles_to_apply.append(self.profile_name_with_id[ssid]) + default_profile = self.get_profile_by_name(profile_name=profile_data["profile_name"]) + for i in self.profile_creation_ids: + if i not in ["ap", "passpoint_osu_id_provider", "passpoint_operator", "passpoint_venue", "passpoint", + "radius", "ssid"]: + for j in self.profile_creation_ids[i]: + child_profiles_to_apply.append(j) + default_profile._child_profile_ids = child_profiles_to_apply + default_profile = self.profile_client.update_profile(default_profile) + return True + except Exception as e: + print(e) + return False + + """ + method call: used to create a radius profile with the settings given + """ + + def create_radius_profile(self, radius_info=None, radius_accounting_info=None): + self.sdk_client.refresh_instance() + default_profile = self.default_profiles['radius'] + default_profile._name = radius_info['name'] + default_profile._details['primaryRadiusAuthServer'] = {} + default_profile._details['primaryRadiusAuthServer']['ipAddress'] = radius_info['ip'] + default_profile._details['primaryRadiusAuthServer']['port'] = radius_info['port'] + default_profile._details['primaryRadiusAuthServer']['secret'] = radius_info['secret'] + if radius_accounting_info is not None: + default_profile._details["primaryRadiusAccountingServer"] = {} + default_profile._details["primaryRadiusAccountingServer"]["ipAddress"] = radius_accounting_info["ip"] + default_profile._details["primaryRadiusAccountingServer"]["port"] = radius_accounting_info["port"] + default_profile._details["primaryRadiusAccountingServer"]["secret"] = radius_accounting_info["secret"] + default_profile = self.profile_client.create_profile(body=default_profile) + self.profile_creation_ids['radius'] = [default_profile._id] + self.profile_ids.append(default_profile._id) + return default_profile + + """ + method to push the profile to the given equipment + """ + + # Under a Bug, depreciated until resolved, should be used primarily + def push_profile(self, equipment_id=None): + self.sdk_client.refresh_instance() + pagination_context = """{ + "model_type": "PaginationContext", + "maxItemsPerPage": 100 + }""" + default_equipment_data = self.sdk_client.equipment_client.get_equipment_by_id(equipment_id=11, async_req=False) + # default_equipment_data._details[] = self.profile_creation_ids['ap'] + # print(default_equipment_data) + # print(self.sdk_client.equipment_client.update_equipment(body=default_equipment_data, async_req=True)) + + """ + method to verify if the expected ssid's are loaded in the ap vif config + """ + + def update_ssid_name(self, profile_name=None, new_profile_name=None): + self.sdk_client.refresh_instance() + if profile_name is None: + print("profile name is None, Please specify the ssid profile name that you want to modify") + return False + if new_profile_name is None: + print("Please specify the new name for ssid profile that you want to make to") + return False + + try: + profile = self.get_profile_by_name(profile_name=profile_name) + profile._details['ssid'] = new_profile_name + self.profile_client.update_profile(profile) + return True + except Exception as e: + return False + + def update_ssid_profile(self, profile_info=None): + self.sdk_client.refresh_instance() + if profile_info is None: + print("profile info is None, Please specify the profile info that you want to update") + return False + + try: + profile = self.get_profile_by_name(profile_name=profile_info["ssid_profile_name"]) + profile._details["radiusServiceId"] = self.profile_creation_ids["radius"][0] + profile._child_profile_ids = self.profile_creation_ids["radius"] + self.profile_creation_ids["passpoint"] + if "radius_configuration" in profile_info.keys(): + if "radius_acounting_service_interval" in profile_info["radius_configuration"].keys(): + profile._details["radiusAcountingServiceInterval"] = profile_info["radius_configuration"]["radius_acounting_service_interval"] + if "user_defined_nas_id" in profile_info["radius_configuration"].keys(): + profile._details["radiusClientConfiguration"]["userDefinedNasId"] = profile_info["radius_configuration"]["user_defined_nas_id"] + if "operator_id" in profile_info["radius_configuration"].keys(): + profile._details["radiusClientConfiguration"]["operatorId"] = profile_info["radius_configuration"]["operator_id"] + self.profile_client.update_profile(profile) + return True + except Exception as e: + print(e) + return False + + def clear_ssid_profile(self, profile_name=None): + if profile_name is None: + print("profile name is None, Please specify the ssid profile name that you want to update") + return False + + try: + profile = self.get_profile_by_name(profile_name=profile_name) + profile._details["radiusServiceId"] = None + profile._child_profile_ids = [] + self.profile_client.update_profile(profile) + return True + except Exception as e: + print(e) + return False + + """ + method to delete a profile by its id + """ + + def delete_profile(self, profile_id=None): + self.sdk_client.refresh_instance() + for i in profile_id: + self.profile_client.delete_profile(profile_id=i) + + # Need to be depreciated by using push_profile method + def push_profile_old_method(self, equipment_id=None): + self.sdk_client.refresh_instance() + if equipment_id is None: + return 0 + url = self.sdk_client.configuration.host + "/portal/equipment?equipmentId=" + str(equipment_id) + payload = {} + headers = self.sdk_client.configuration.api_key_prefix + response = requests.request("GET", url, headers=headers, data=payload) + equipment_info = response.json() + equipment_info['profileId'] = self.profile_creation_ids['ap'] + url = self.sdk_client.configuration.host + "/portal/equipment" + headers = { + 'Content-Type': 'application/json', + 'Authorization': self.sdk_client.configuration.api_key_prefix['Authorization'] + } + + response = requests.request("PUT", url, headers=headers, data=json.dumps(equipment_info)) + return response + + +""" + + FirmwareUtility class + uses JfrogUtility base class + sdk_client [ controller_tests instance ] + controller_data [ sdk_base_url ] needed only if sdk_instance is not passed + customer_id [ 2 ] needed only if sdk_instance is not passed +""" + + +class FirmwareUtility: + + def __init__(self, + sdk_client=None, + jfrog_credentials=None, + controller_data=None, + customer_id=2, + model=None, + version_url=None): + # super().__init__(credentials=jfrog_credentials) + if sdk_client is None: + sdk_client = Controller(controller_data=controller_data, customer_id=customer_id) + self.sdk_client = sdk_client + self.sdk_client.refresh_instance() + self.firmware_client = FirmwareManagementApi(api_client=sdk_client.api_client) + # self.jfrog_client = JFrogUtility(credentials=jfrog_credentials) + self.equipment_gateway_client = EquipmentGatewayApi(api_client=sdk_client.api_client) + self.model = model + self.fw_version = version_url + + def get_fw_version(self): + fw_version = self.fw_version.split("/")[-1] + return fw_version + + def upload_fw_on_cloud(self, force_upload=False): + self.sdk_client.refresh_instance() + fw_version = self.fw_version.split("/")[-1] + print("Upload fw version :", self.fw_version) + fw_id = self.is_fw_available(fw_version=fw_version) + if fw_id and not force_upload: + print("Skipping upload, Firmware Already Available", "Force Upload :", force_upload) + # Don't Upload the fw + return fw_id + else: + if fw_id and force_upload: + print("Firmware Version Already Available, Deleting and Uploading Again", + " Force Upload :", force_upload) + self.firmware_client.delete_firmware_version(firmware_version_id=fw_id) + print("Deleted Firmware Image from cloud, uploading again") + time.sleep(2) + # if force_upload is true and latest image available, then delete the image + firmware_data = { + "id": 0, + "equipmentType": "AP", + "modelId": str(self.model).upper(), + "versionName": fw_version, + "description": fw_version + " FW VERSION", + "filename": self.fw_version, + } + firmware_id = self.firmware_client.create_firmware_version(body=firmware_data) + print("Uploaded the Image: ", fw_version) + return firmware_id._id + + def upgrade_fw(self, equipment_id=None, force_upgrade=False, force_upload=False): + self.sdk_client.refresh_instance() + if equipment_id is None: + print("No Equipment Id Given") + exit() + if (force_upgrade is True) or (self.should_upgrade_ap_fw(equipment_id=equipment_id)): + firmware_id = self.upload_fw_on_cloud(force_upload=force_upload) + time.sleep(5) + try: + obj = self.equipment_gateway_client.request_firmware_update(equipment_id=equipment_id, + firmware_version_id=firmware_id) + print("Request firmware upgrade Success! waiting for 300 sec") + time.sleep(400) + except Exception as e: + print(e) + obj = False + return obj + # Write the upgrade fw logic here + + def should_upgrade_ap_fw(self, equipment_id=None): + self.sdk_client.refresh_instance() + current_fw = self.sdk_client.get_ap_firmware_old_method(equipment_id=equipment_id) + latest_fw = self.get_fw_version() + print(self.model, current_fw, latest_fw) + if current_fw == latest_fw: + return False + else: + return True + + def is_fw_available(self, fw_version=None): + self.sdk_client.refresh_instance() + if fw_version is None: + exit() + try: + firmware_version = self.firmware_client.get_firmware_version_by_name( + firmware_version_name=fw_version) + firmware_version = firmware_version._id + print("Firmware ID: ", firmware_version) + except Exception as e: + print(e) + firmware_version = False + print("firmware not available: ", firmware_version) + return firmware_version + + +# This is for Unit tests on Controller Library +if __name__ == '__main__': + controller = { + 'url': "https://wlan-portal-svc-nola-ext-04.cicd.lab.wlan.tip.build", # API base url for the controller + 'username': 'support@example.com', + 'password': 'support', + 'version': "1.1.0-SNAPSHOT", + 'commit_date': "2021-04-27" + } + api = Controller(controller_data=controller) + profile = ProfileUtility(sdk_client=api) + profile_data = { + "name": "test-rf-wifi-6", + "is2dot4GHz": {}, + "is5GHz": {"channelBandwidth": "is20MHz"}, + "is5GHzL": {"channelBandwidth": "is20MHz"}, + "is5GHzU": {"channelBandwidth": "is20MHz"} + } + profile.set_rf_profile(profile_data=profile_data, mode="wifi6") + print(profile.default_profiles["rf"]) + # profile.cleanup_profiles() + + # profile.get_default_profiles() + # profile_data = { + # "profile_name": "ssid_wep_2g", + # "ssid_name": "ssid_wep_2g", + # "appliedRadios": ["is2dot4GHz"], + # "default_key_id" : 1, + # "wep_key" : 1234567890, + # "vlan": 1, + # "mode": "BRIDGE" + # } + # profile.create_wep_ssid_profile(profile_data=profile_data) + # print(profile.get_profile_by_name(profile_name="wpa_wpa2_eap")) + # profile.get_default_profiles() + api.disconnect_Controller() diff --git a/libs/controller/controller_2x/__init__.py b/libs/controller/controller_2x/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/libs/controller/ucentral_ctlr.py b/libs/controller/ucentral_ctlr.py deleted file mode 100644 index 328b02956..000000000 --- a/libs/controller/ucentral_ctlr.py +++ /dev/null @@ -1,380 +0,0 @@ -""" - - Base Library for Ucentral - -""" -import json -import ssl -import sys -import time -from urllib.parse import urlparse -import pytest -import allure -import requests -from pathlib import Path - -from requests.adapters import HTTPAdapter -import logging - - -# logging.basicConfig(level=logging.DEBUG) -# from http.client import HTTPConnection -# -# HTTPConnection.debuglevel = 1 -# requests.logging.getLogger() - - -class ConfigureController: - - def __init__(self, controller_data): - self.username = controller_data["username"] - self.password = controller_data["password"] - self.host = urlparse(controller_data["url"]) - print(self.host) - self.access_token = "" - # self.session = requests.Session() - self.login_resp = self.login() - self.gw_host = self.get_endpoint() - - def build_uri_sec(self, path): - new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path) - print(new_uri) - return new_uri - - def build_uri(self, path): - - new_uri = 'https://%s:%d/api/v1/%s' % (self.gw_host.hostname, self.gw_host.port, path) - print(new_uri) - return new_uri - - def login(self): - uri = self.build_uri_sec("oauth2") - # self.session.mount(uri, HTTPAdapter(max_retries=15)) - payload = json.dumps({"userId": self.username, "password": self.password}) - resp = requests.post(uri, data=payload, verify=False, timeout=100) - self.check_response("POST", resp, "", payload, uri) - token = resp.json() - self.access_token = token["access_token"] - print(token) - - # self.session.headers.update({'Authorization': self.access_token}) - return resp - - def get_endpoint(self): - uri = self.build_uri_sec("systemEndpoints") - print(uri) - resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) - print(resp) - self.check_response("GET", resp, self.make_headers(), "", uri) - devices = resp.json() - print(devices["endpoints"][0]["uri"]) - gw_host = urlparse(devices["endpoints"][0]["uri"]) - return gw_host - - def logout(self): - uri = self.build_uri_sec('oauth2/%s' % self.access_token) - resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100) - self.check_response("DELETE", resp, self.make_headers(), "", uri) - print('Logged out:', resp.status_code) - return resp - - def make_headers(self): - headers = {'Authorization': 'Bearer %s' % self.access_token, - "Connection": "keep-alive", - "Keep-Alive": "timeout=10, max=1000" - } - return headers - - def check_response(self, cmd, response, headers, data_str, url): - if response.status_code >= 400: - if response.status_code >= 400: - print("check-response: ERROR, url: ", url) - else: - print("check-response: url: ", url) - print("Command: ", cmd) - print("response-status: ", response.status_code) - print("response-headers: ", response.headers) - print("response-content: ", response.content) - print("headers: ", headers) - print("data-str: ", data_str) - - if response.status_code >= 400: - # if True: - raise NameError("Invalid response code.") - return True - - -class UController(ConfigureController): - - def __init__(self, controller_data=None): - super().__init__(controller_data) - - def get_devices(self): - uri = self.build_uri("devices/") - resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) - self.check_response("GET", resp, self.make_headers(), "", uri) - devices = resp.json() - # resp.close()() - return devices - - def get_device_by_serial_number(self, serial_number=None): - uri = self.build_uri("device/" + serial_number) - resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100) - self.check_response("GET", resp, self.make_headers(), "", uri) - device = resp.json() - # resp.close()() - return device - - def get_device_uuid(self, serial_number): - device_info = self.get_device_by_serial_number(serial_number=serial_number) - return device_info["UUID"] - - -class UProfileUtility: - - def __init__(self, sdk_client=None, controller_data=None): - if sdk_client is None: - self.sdk_client = UController(controller_data=controller_data) - self.sdk_client = sdk_client - self.base_profile_config = { - "uuid": 1, - "radios": [], - "interfaces": [{ - "name": "WAN", - "role": "upstream", - "services": ["lldp", "dhcp-snooping"], - "ethernet": [ - { - "select-ports": [ - "WAN*" - ] - } - ], - "ipv4": { - "addressing": "dynamic" - } - }, - { - "name": "LAN", - "role": "downstream", - "services": ["ssh", "lldp", "dhcp-snooping"], - "ethernet": [ - { - "select-ports": [ - "LAN*" - ] - } - ], - "ipv4": { - "addressing": "static", - "subnet": "192.168.1.1/16", - "dhcp": { - "lease-first": 10, - "lease-count": 10000, - "lease-time": "6h" - } - }, - }], - "metrics": { - "statistics": { - "interval": 60, - "types": ["ssids", "lldp", "clients"] - }, - "health": { - "interval": 120 - }, - "wifi-frames": { - "filters": ["probe", - "auth", - "assoc", - "disassoc", - "deauth", - "local-deauth", - "inactive-deauth", - "key-mismatch", - "beacon-report", - "radar-detected"] - }, - "dhcp-snooping": { - "filters": ["ack", "discover", "offer", "request", "solicit", "reply", "renew"] - } - }, - "services": { - "lldp": { - "describe": "TIP OpenWiFi", - "location": "QA" - }, - "ssh": { - "port": 22 - } - } - } - self.vlan_section = { - "name": "WAN100", - "role": "upstream", - "vlan": { - "id": 100 - }, - "ethernet": [ - { - "select-ports": [ - "WAN*" - ] - } - ], - "ipv4": { - "addressing": "dynamic" - } - } - self.mode = None - - def set_radio_config(self, radio_config=None): - self.base_profile_config["radios"].append({ - "band": "2G", - "country": "US", - # "channel-mode": "HE", - "channel-width": 20, - # "channel": 11 - }) - self.base_profile_config["radios"].append({ - "band": "5G", - "country": "US", - # "channel-mode": "HE", - "channel-width": 80, - # "channel": "auto" - }) - - self.vlan_section["ssids"] = [] - self.vlan_ids = [] - - def set_mode(self, mode): - self.mode = mode - if mode == "NAT": - self.base_profile_config['interfaces'][1]['ssids'] = [] - - elif mode == "BRIDGE": - del self.base_profile_config['interfaces'][1] - self.base_profile_config['interfaces'][0]['ssids'] = [] - elif mode == "VLAN": - del self.base_profile_config['interfaces'][1] - self.base_profile_config['interfaces'][0]['ssids'] = [] - self.base_profile_config['interfaces'] = [] - wan_section_vlan = { - "name": "WAN", - "role": "upstream", - "services": ["lldp"], - "ethernet": [ - { - "select-ports": [ - "WAN*" - ] - } - ], - "ipv4": { - "addressing": "dynamic" - } - } - self.base_profile_config['interfaces'].append(wan_section_vlan) - else: - print("Invalid Mode") - return 0 - - def add_ssid(self, ssid_data, radius=False, radius_auth_data={}, radius_accounting_data={}): - print("ssid data : ", ssid_data) - ssid_info = {'name': ssid_data["ssid_name"], "bss-mode": "ap", "wifi-bands": [], "services": ["wifi-frames"]} - for i in ssid_data["appliedRadios"]: - ssid_info["wifi-bands"].append(i) - ssid_info['encryption'] = {} - ssid_info['encryption']['proto'] = ssid_data["security"] - try: - ssid_info['encryption']['key'] = ssid_data["security_key"] - except Exception as e: - pass - ssid_info['encryption']['ieee80211w'] = "optional" - if radius: - ssid_info["radius"] = {} - ssid_info["radius"]["authentication"] = { - "host": radius_auth_data["ip"], - "port": radius_auth_data["port"], - "secret": radius_auth_data["secret"] - } - ssid_info["radius"]["accounting"] = { - "host": radius_accounting_data["ip"], - "port": radius_accounting_data["port"], - "secret": radius_accounting_data["secret"] - } - if self.mode == "NAT": - self.base_profile_config['interfaces'][1]['ssids'].append(ssid_info) - elif self.mode == "BRIDGE": - self.base_profile_config['interfaces'][0]['ssids'].append(ssid_info) - elif self.mode == "VLAN": - vid = ssid_data["vlan"] - self.vlan_section = { - "name": "WAN100", - "role": "upstream", - "services": ["lldp", "dhcp-snooping"], - "vlan": { - "id": 100 - }, - "ethernet": [ - { - "select-ports": [ - "WAN*" - ] - } - ], - "ipv4": { - "addressing": "dynamic" - } - } - vlan_section = self.vlan_section - if vid in self.vlan_ids: - print("sss", self.vlan_ids) - for i in self.base_profile_config['interfaces']: - if i["name"] == "WANv%s" % (vid): - i["ssids"].append(ssid_info) - else: - print(self.vlan_ids) - self.vlan_ids.append(vid) - vlan_section['name'] = "WANv%s" % (vid) - vlan_section['vlan']['id'] = int(vid) - vlan_section["ssids"] = [] - vlan_section["ssids"].append(ssid_info) - self.base_profile_config['interfaces'].append(vlan_section) - print(vlan_section) - vsection = 0 - else: - print("invalid mode") - pytest.exit("invalid Operating Mode") - - def push_config(self, serial_number): - payload = {"configuration": self.base_profile_config, "serialNumber": serial_number, "UUID": 0} - - uri = self.sdk_client.build_uri("device/" + serial_number + "/configure") - basic_cfg_str = json.dumps(payload) - allure.attach(name="ucentral_config: ", body=str(self.base_profile_config)) - print(self.base_profile_config) - resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(), - verify=False, timeout=100) - self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri) - print(resp.url) - resp.close() - print(resp) - - -if __name__ == '__main__': - controller = { - 'url': 'https://sec-ucentral-qa01.cicd.lab.wlan.tip.build:16001', # API base url for the controller - 'username': "tip@ucentral.com", - 'password': 'openwifi', - } - obj = UController(controller_data=controller) - profile = UProfileUtility(sdk_client=obj) - profile.set_mode(mode="BRIDGE") - profile.set_radio_config() - ssid = {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security": "psk", "security_key": "something", - "vlan": 100} - profile.add_ssid(ssid_data=ssid) - profile.push_config(serial_number="903cb39d6918") - # print(obj.get_devices()) - obj.logout() diff --git a/libs/pytest_fixtures/__init__.py b/libs/pytest_fixtures/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/conftest.py b/tests/conftest.py index 2f1feaccc..97950f3cd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,6 +10,7 @@ import allure import re import logging + from _pytest.fixtures import SubRequest from pyparsing import Optional @@ -41,7 +42,6 @@ if 'py-json' not in sys.path: sys.path.append('../py-scripts') from apnos.apnos import APNOS from controller.controller import Controller -from controller.ucentral_ctlr import UController from controller.controller import FirmwareUtility import pytest from cv_test_manager import cv_test @@ -59,6 +59,14 @@ from typing import Any, Callable, Optional from _pytest.fixtures import SubRequest from pytest import fixture +import fixtures_1x +from fixtures_1x import Fixtures_1x +import fixtures_2x +from fixtures_2x import Fixtures_2x + +ALLURE_ENVIRONMENT_PROPERTIES_FILE = 'environment.properties' +ALLUREDIR_OPTION = '--alluredir' + def pytest_addoption(parser): """pytest addoption function: contains ini objects and options""" @@ -201,14 +209,12 @@ def exit_on_fail(request): @pytest.fixture(scope="session") def radius_info(): """yields the radius server information from lab info file""" - allure.attach(body=str(RADIUS_SERVER_DATA), name="Radius server Info: ") yield RADIUS_SERVER_DATA @pytest.fixture(scope="session") def radius_accounting_info(): """yields the radius accounting information from lab info file""" - allure.attach(body=str(RADIUS_ACCOUNTING_DATA), name="Radius server Info: ") yield RADIUS_ACCOUNTING_DATA @@ -255,40 +261,19 @@ def instantiate_access_point(testbed, get_apnos, get_configuration): # Controller Fixture @pytest.fixture(scope="session") -def setup_controller(request, get_configuration, test_access_point, add_env_properties): +def setup_controller(request, get_configuration, test_access_point, add_env_properties, fixtures_ver): """sets up the controller connection and yields the sdk_client object""" - try: - if request.config.getoption("1.x"): - sdk_client = Controller(controller_data=get_configuration["controller"]) - - def teardown_controller(): - print("\nTest session Completed") - sdk_client.disconnect_Controller() - - request.addfinalizer(teardown_controller) - - else: - sdk_client = UController(controller_data=get_configuration["controller"]) - - - def teardown_ucontroller(): - print("\nTest session Completed") - sdk_client.logout() - try: - sdk_client.logout() - except Exception as e: - print(e) - - request.addfinalizer(teardown_ucontroller) - - except Exception as e: - print(e) - allure.attach(body=str(e), name="Controller Instantiation Failed: ") - sdk_client = False - pytest.exit("unable to communicate to Controller" + str(e)) + sdk_client = fixtures_ver.controller_obj + request.addfinalizer(fixtures_ver.disconnect) yield sdk_client +@pytest.fixture(scope="session") +def setup_firmware(fixtures_ver): + """ Fixture to Setup Firmware with the selected sdk """ + yield True + + @pytest.fixture(scope="session") def instantiate_firmware(request, setup_controller, get_configuration): """sets up firmware utility and yields the object for firmware upgrade""" @@ -609,12 +594,6 @@ def lf_tools(get_configuration, testbed): yield obj -# @pytest.fixture(scope="class") -# def create_vlan(request, testbed, get_configuration, lf_tools): -# """Create a vlan on lanforge""" -# - - @pytest.fixture(scope="session") def setup_influx(request, testbed, get_configuration): """ Setup Influx Parameters: Used in CV Automation""" @@ -634,10 +613,6 @@ def pytest_sessionstart(session): session.results = dict() -ALLURE_ENVIRONMENT_PROPERTIES_FILE = 'environment.properties' -ALLUREDIR_OPTION = '--alluredir' - - @fixture(scope='session', autouse=True) def add_allure_environment_property(request: SubRequest) -> Optional[Callable]: environment_properties = dict() @@ -660,17 +635,18 @@ def add_allure_environment_property(request: SubRequest) -> Optional[Callable]: @fixture(scope='session') -def get_uc_ap_version(get_apnos, get_configuration): - version_list = [] - for access_point_info in get_configuration['access_point']: - ap_ssh = get_apnos(access_point_info) - version = ap_ssh.get_ap_version_ucentral() - version_list.append(version) - yield version_list - - -@fixture(scope='session') -def add_env_properties(get_configuration, get_uc_ap_version, add_allure_environment_property: Callable) -> None: +def add_env_properties(get_configuration, get_apnos, fixtures_ver, add_allure_environment_property: Callable) -> None: add_allure_environment_property('Access-Point-Model', get_configuration["access_point"][0]["model"]) - add_allure_environment_property('Access-Point-Firmware-Version', get_uc_ap_version[0].split("\n")[1]) + add_allure_environment_property('Access-Point-Firmware-Version', + fixtures_ver.get_ap_version(get_apnos, get_configuration)[0].split("\n")[1]) add_allure_environment_property('Cloud-Controller-SDK-URL', get_configuration["controller"]["url"]) + add_allure_environment_property('AP-Serial-Number', get_configuration["access_point"][0]["serial"]) + + +@pytest.fixture(scope="session") +def fixtures_ver(request, get_configuration): + if request.config.getoption("1.x") is False: + obj = Fixtures_2x(configuration=get_configuration) + if request.config.getoption("1.x"): + obj = Fixtures_1x(configuration=get_configuration) + yield obj diff --git a/tests/e2e/basic/conftest.py b/tests/e2e/basic/conftest.py index e0aa4197b..9e3e0337f 100644 --- a/tests/e2e/basic/conftest.py +++ b/tests/e2e/basic/conftest.py @@ -11,7 +11,7 @@ if "libs" not in sys.path: sys.path.append(f'../libs') from controller.controller import ProfileUtility -from controller.ucentral_ctlr import UProfileUtility +from controller.controller_2x.controller import UProfileUtility import time from lanforge.lf_tests import RunTest from lanforge.lf_tools import ChamberView @@ -50,15 +50,17 @@ def create_lanforge_chamberview_dut(lf_tools): @pytest.fixture(scope="session") def setup_vlan(): vlan_id = [100] - allure.attach(body=str(vlan_id), name="VLAN Created: ") yield vlan_id[0] @pytest.fixture(scope="class") -def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment_id, +def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment_id, fixtures_ver, instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools, get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info): lf_tools.reset_scenario() + param = dict(request.param) + + # VLAN Setup if request.param["mode"] == "VLAN": vlan_list = list() @@ -78,833 +80,19 @@ def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment vlan_list.pop(i) if request.param["mode"] == "VLAN": lf_tools.add_vlan(vlan_ids=vlan_list) - if request.config.getoption("1.x"): - instantiate_profile = instantiate_profile(sdk_client=setup_controller) - vlan_id, mode = 0, 0 - instantiate_profile.cleanup_objects() - parameter = dict(request.param) - print(parameter) - test_cases = {} - profile_data = {} - if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]: - print("Invalid Mode: ", parameter['mode']) - allure.attach(body=parameter['mode'], name="Invalid Mode: ") - yield test_cases - if parameter['mode'] == "NAT": - mode = "NAT" - vlan_id = 1 - if parameter['mode'] == "BRIDGE": - mode = "BRIDGE" - vlan_id = 1 - if parameter['mode'] == "VLAN": - mode = "BRIDGE" - vlan_id = setup_vlan - - instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Equipment-AP-" + parameter['mode']) - - profile_data["equipment_ap"] = {"profile_name": testbed + "-Equipment-AP-" + parameter['mode']} - profile_data["ssid"] = {} - for i in parameter["ssid_modes"]: - profile_data["ssid"][i] = [] - for j in range(len(parameter["ssid_modes"][i])): - profile_name = testbed + "-SSID-" + i + "-" + str(j) + "-" + parameter['mode'] - data = parameter["ssid_modes"][i][j] - data["profile_name"] = profile_name - if "mode" not in dict(data).keys(): - data["mode"] = mode - if "vlan" not in dict(data).keys(): - data["vlan"] = vlan_id - instantiate_profile.delete_profile_by_name(profile_name=profile_name) - profile_data["ssid"][i].append(data) - # print(profile_name) - # print(profile_data) - - instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + mode) - time.sleep(10) - """ - Setting up rf profile - """ - rf_profile_data = { - "name": "RF-Profile-" + testbed + "-" + parameter['mode'] + "-" + - get_configuration['access_point'][0]['mode'] - } - - for i in parameter["rf"]: - rf_profile_data[i] = parameter['rf'][i] - # print(rf_profile_data) - - try: - instantiate_profile.delete_profile_by_name(profile_name=rf_profile_data['name']) - instantiate_profile.set_rf_profile(profile_data=rf_profile_data, - mode=get_configuration['access_point'][0]['mode']) - allure.attach(body=str(rf_profile_data), - name="RF Profile Created : " + get_configuration['access_point'][0]['mode']) - except Exception as e: - print(e) - allure.attach(body=str(e), name="Exception ") - - # Radius Profile Creation - if parameter["radius"]: - radius_info = radius_info - radius_info["name"] = testbed + "-Automation-Radius-Profile-" + mode - instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + mode) - try: - instantiate_profile.create_radius_profile(radius_info=radius_info) - allure.attach(body=str(radius_info), - name="Radius Profile Created") - test_cases['radius_profile'] = True - except Exception as e: - print(e) - test_cases['radius_profile'] = False - - # SSID Profile Creation - lf_dut_data = [] - for mode in profile_data['ssid']: - if mode == "open": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_open_ssid_profile(profile_data=j) - test_cases["open_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["open_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_wpa_ssid_profile(profile_data=j) - test_cases["wpa_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa2_personal": - for j in profile_data["ssid"][mode]: - # print(j) - - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_wpa2_personal_ssid_profile(profile_data=j) - test_cases["wpa2_personal_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa2_personal_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa_wpa2_personal_mixed": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - - creates_profile = instantiate_profile.create_wpa_wpa2_personal_mixed_ssid_profile( - profile_data=j) - test_cases["wpa_wpa2_personal_mixed_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa_wpa2_personal_mixed_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa3_personal": - for j in profile_data["ssid"][mode]: - print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - - creates_profile = instantiate_profile.create_wpa3_personal_ssid_profile(profile_data=j) - test_cases["wpa3_personal_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_personal_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - if mode == "wpa3_personal_mixed": - for j in profile_data["ssid"][mode]: - print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_wpa3_personal_mixed_ssid_profile( - profile_data=j) - test_cases["wpa3_personal_mixed_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_personal_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa_enterprise": - for j in profile_data["ssid"][mode]: - - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_wpa_enterprise_ssid_profile(profile_data=j) - test_cases["wpa_enterprise_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - - except Exception as e: - print(e) - test_cases["wpa_enterprise_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - if mode == "wpa2_enterprise": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_wpa2_enterprise_ssid_profile(profile_data=j) - test_cases["wpa2_enterprise_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa2_enterprise_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - if mode == "wpa3_enterprise": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_wpa3_enterprise_ssid_profile(profile_data=j) - test_cases["wpa3_enterprise_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_enterprise_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wpa_wpa2_enterprise_mixed": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_wpa_wpa2_enterprise_mixed_ssid_profile( - profile_data=j) - test_cases["wpa_wpa2_enterprise_mixed_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa_wpa2_enterprise_mixed_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - if mode == "wpa3_enterprise_mixed": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_wpa3_enterprise_mixed_ssid_profile( - profile_data=j) - test_cases["wpa3_enterprise_mixed_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_enterprise_mixed_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - - if mode == "wep": - for j in profile_data["ssid"][mode]: - # print(j) - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - for i in range(len(j["appliedRadios"])): - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") - j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") - creates_profile = instantiate_profile.create_wep_ssid_profile(profile_data=j) - test_cases["wpa3_enterprise_2g"] = True - allure.attach(body=str(creates_profile), - name="SSID Profile Created") - except Exception as e: - print(e) - test_cases["wpa3_enterprise_2g"] = False - allure.attach(body=str(e), - name="SSID Profile Creation Failed") - # Equipment AP Profile Creation - try: - instantiate_profile.set_ap_profile(profile_data=profile_data['equipment_ap']) - test_cases["equipment_ap"] = True - allure.attach(body=str(profile_data['equipment_ap']), - name="Equipment AP Profile Created") - except Exception as e: - print(e) - test_cases["equipment_ap"] = False - allure.attach(body=str(e), - name="Equipment AP Profile Creation Failed") - - # Push the Equipment AP Profile to AP - try: - for i in get_equipment_id: - instantiate_profile.push_profile_old_method(equipment_id=i) - except Exception as e: - print(e) - print("failed to create AP Profile") - - ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/") - # ssid_names = [] - # for i in instantiate_profile.profile_creation_ids["ssid"]: - # ssid_names.append(instantiate_profile.get_ssid_name_by_profile_id(profile_id=i)) - # ssid_names.sort() - ssid_names = [] - for i in lf_dut_data: - ssid_names.append(i["ssid_name"]) - ssid_names.sort() - # This loop will check the VIF Config with cloud profile - vif_config = [] - test_cases['vifc'] = False - for i in range(0, 18): - vif_config = list(ap_ssh.get_vif_config_ssids()) - vif_config.sort() - print(vif_config) - print(ssid_names) - if ssid_names == vif_config: - test_cases['vifc'] = True - break - time.sleep(10) - allure.attach( - body=str("VIF Config: " + str(vif_config) + "\n" + "SSID Pushed from Controller: " + str(ssid_names)), - name="SSID Profiles in VIF Config and Controller: ") - ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/") - - # This loop will check the VIF Config with VIF State - test_cases['vifs'] = False - for i in range(0, 18): - vif_state = list(ap_ssh.get_vif_state_ssids()) - vif_state.sort() - vif_config = list(ap_ssh.get_vif_config_ssids()) - vif_config.sort() - print(vif_config) - print(vif_state) - if vif_state == vif_config: - test_cases['vifs'] = True - break - time.sleep(10) - allure.attach(body=str("VIF Config: " + str(vif_config) + "\n" + "VIF State: " + str(vif_state)), - name="SSID Profiles in VIF Config and VIF State: ") - - ap_logs = ap_ssh.logread() - allure.attach(body=ap_logs, name="AP LOgs: ") - ssid_info = ap_ssh.get_ssid_info() - ssid_data = [] - print(ssid_info) - band_mapping = ap_ssh.get_bssid_band_mapping() - print(band_mapping) - idx_mapping = {} - for i in range(0, len(ssid_info)): - if ssid_info[i][1] == "OPEN": - ssid_info[i].append("") - if ssid_info[i][1] == "OPEN": - ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=OPEN" + - " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] - idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], - ssid_info[i][0]] - - if ssid_info[i][1] == "WPA": - ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA" + - " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] - idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], - ssid_info[i][0]] - if ssid_info[i][1] == "WPA2": - ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA2" + - " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] - idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], - ssid_info[i][0]] - if ssid_info[i][1] == "WPA3_PERSONAL": - ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA3" + - " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] - idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], - ssid_info[i][0]] - - if ssid_info[i][1] == "WPA | WPA2": - ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA|WPA2" + - " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] - idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], - ssid_info[i][0]] - - if ssid_info[i][1] == "EAP-TTLS": - ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=EAP-TTLS" + - " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] - idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], - ssid_info[i][0]] - ssid_data.append(ssid) - lf_tools.dut_idx_mapping = idx_mapping - # Add bssid password and security from iwinfo data - # Format SSID Data in the below format - # ssid_data = [ - # ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'], - # ['ssid_idx=1 ssid=Default-SSID-5gl password=12345678 bssid=90:3c:b3:94:48:59'] - # ] - allure.attach(name="SSID DATA IN LF DUT", body=str(ssid_data)) - lf_tools.update_ssid(ssid_data=ssid_data) - - def teardown_session(): - print("\nRemoving Profiles") - instantiate_profile.delete_profile_by_name(profile_name=profile_data['equipment_ap']['profile_name']) - instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["ssid"]) - instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["radius"]) - instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["rf"]) - allure.attach(body=str(profile_data['equipment_ap']['profile_name'] + "\n"), - name="Tear Down in Profiles ") - time.sleep(20) - - request.addfinalizer(teardown_session) - yield test_cases - else: - instantiate_profile_obj = instantiate_profile(sdk_client=setup_controller) - print(1, instantiate_profile_obj.sdk_client) - vlan_id, mode = 0, 0 - parameter = dict(request.param) - print(parameter) - test_cases = {} - profile_data = {} - if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]: - print("Invalid Mode: ", parameter['mode']) - yield test_cases - instantiate_profile_obj.set_radio_config() - if parameter['mode'] == "NAT": - mode = "NAT" - instantiate_profile_obj.set_mode(mode=mode) - vlan_id = 1 - if parameter['mode'] == "BRIDGE": - mode = "BRIDGE" - instantiate_profile_obj.set_mode(mode=mode) - vlan_id = 1 - if parameter['mode'] == "VLAN": - mode = "VLAN" - instantiate_profile_obj.set_mode(mode=mode) - vlan_id = setup_vlan - profile_data["ssid"] = {} - for i in parameter["ssid_modes"]: - profile_data["ssid"][i] = [] - for j in range(len(parameter["ssid_modes"][i])): - data = parameter["ssid_modes"][i][j] - profile_data["ssid"][i].append(data) - lf_dut_data = [] - for mode in profile_data['ssid']: - if mode == "open": - for j in profile_data["ssid"][mode]: - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - j["appliedRadios"] = list(set(j["appliedRadios"])) - j['security'] = 'none' - creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) - test_cases["wpa_2g"] = True - except Exception as e: - print(e) - test_cases["wpa_2g"] = False - if mode == "wpa": - for j in profile_data["ssid"][mode]: - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - j["appliedRadios"] = list(set(j["appliedRadios"])) - j['security'] = 'psk' - creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) - test_cases["wpa_2g"] = True - except Exception as e: - print(e) - test_cases["wpa_2g"] = False - if mode == "wpa2_personal": - for j in profile_data["ssid"][mode]: - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - j["appliedRadios"] = list(set(j["appliedRadios"])) - j['security'] = 'psk2' - creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) - test_cases["wpa_2g"] = True - except Exception as e: - print(e) - test_cases["wpa2_personal"] = False - if mode == "wpa_wpa2_personal_mixed": - for j in profile_data["ssid"][mode]: - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - j["appliedRadios"] = list(set(j["appliedRadios"])) - j['security'] = 'psk-mixed' - creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) - test_cases["wpa_2g"] = True - except Exception as e: - print(e) - test_cases["wpa2_personal"] = False - if mode == "wpa3_personal": - for j in profile_data["ssid"][mode]: - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - j["appliedRadios"] = list(set(j["appliedRadios"])) - j['security'] = 'sae' - creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) - test_cases["wpa_2g"] = True - except Exception as e: - print(e) - test_cases["wpa2_personal"] = False - if mode == "wpa3_personal_mixed": - for j in profile_data["ssid"][mode]: - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - j["appliedRadios"] = list(set(j["appliedRadios"])) - j['security'] = 'sae-mixed' - creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) - test_cases["wpa_2g"] = True - except Exception as e: - print(e) - test_cases["wpa2_personal"] = False - - # EAP SSID Modes - if mode == "wpa2_enterprise": - for j in profile_data["ssid"][mode]: - if mode in get_markers.keys() and get_markers[mode]: - try: - if j["appliedRadios"].__contains__("2G"): - lf_dut_data.append(j) - if j["appliedRadios"].__contains__("5G"): - lf_dut_data.append(j) - j["appliedRadios"] = list(set(j["appliedRadios"])) - j['security'] = 'wpa2' - RADIUS_SERVER_DATA = radius_info - RADIUS_ACCOUNTING_DATA = radius_accounting_info - creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j, radius=True, - radius_auth_data=RADIUS_SERVER_DATA, - radius_accounting_data=RADIUS_ACCOUNTING_DATA) - test_cases["wpa_2g"] = True - except Exception as e: - print(e) - test_cases["wpa2_personal"] = False - ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/", sdk="2.x") - connected, latest, active = ap_ssh.get_ucentral_status() - if connected == False: - pytest.exit("AP is disconnected from UC Gateway") - instantiate_profile_obj.push_config(serial_number=get_equipment_id[0]) - config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"')) - config["uuid"] = 0 - ap_config_latest = ap_ssh.get_uc_latest_config() - try: - ap_config_latest["uuid"] = 0 - except Exception as e: - print(e) - pass - x = 1 - while ap_config_latest != config: - time.sleep(5) - x += 1 - ap_config_latest = ap_ssh.get_uc_latest_config() - ap_config_latest["uuid"] = 0 - print("latest config: ", ap_config_latest) - print("config: ", config) - if x == 19: - break - if x < 19: - print("Config properly applied into AP", config) - ap_config_latest = ap_ssh.get_uc_latest_config() - ap_config_latest["uuid"] = 0 - - ap_config_active = ap_ssh.get_uc_active_config() - ap_config_active["uuid"] = 0 - x = 1 - while ap_config_active != ap_config_latest: - time.sleep(5) - x += 1 - ap_config_latest = ap_ssh.get_uc_latest_config() - ap_config_latest["uuid"] = 0 - - ap_config_active = ap_ssh.get_uc_active_config() - print("latest config: ", ap_config_latest) - print("Active config: ", ap_config_active) - ap_config_active["uuid"] = 0 - if x == 19: - break - allure_body = "AP config status: \n" + \ - "Active Config: " + str(ap_ssh.get_uc_active_config()) + "\n" \ - "Latest Config: ", str( - ap_ssh.get_uc_latest_config()) + "\n" \ - "Applied Config: ", str(config) - if x < 19: - print("AP is Broadcasting Applied Config") - allure.attach(name="AP is Broadcasting Applied Config", body="") - allure.attach(name="Config Info", body="Applied Config: " + str(ap_config_active)) - allure.attach(name="Config Info", body="AP is Broadc3asting Applied Config: " + str(ap_ssh.get_uc_active_config())) - allure.attach(name="Config Info", body="AP is Broadcasting Applied Config: " + str(allure_body)) - - else: - print("AP is Not Broadcasting Applied Config") - allure.attach(name="AP is Not Broadcasting Applied Config", body="") - allure.attach(name="Config Info", body="Applied Config: " + str(ap_config_active)) - allure.attach(name="Config Info", - body="AP is Broadc3asting Applied Config: " + str(ap_ssh.get_uc_active_config())) - allure.attach(name="Config Info", body="AP is Broadcasting Applied Config: " + str(allure_body)) - ap_logs = ap_ssh.logread() - allure.attach(body=ap_logs, name="AP LOgs: ") - # ap_wifi_data = ap_ssh.get_interface_details() - # idx_mapping = {} - # ssid_data = [] - # ap_interfaces = list(ap_wifi_data.keys()) - # for interface in range(len(ap_interfaces)): - # if ap_wifi_data[ap_interfaces[interface]][1] == "none": - # ssid = ["ssid_idx=" + str(interface) + - # " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + - # " security=OPEN" + - # " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], - # ap_wifi_data[ap_interfaces[interface]][2], - # ap_wifi_data[ap_interfaces[interface]][1], - # ap_wifi_data[ap_interfaces[interface]][3][1], - # ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # # pass - # if ap_wifi_data[ap_interfaces[interface]][1] == "psk": - # ssid = ["ssid_idx=" + str(interface) + - # " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + - # " security=WPA" + - # " password=" + ap_wifi_data[ap_interfaces[interface]][2] + - # " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], - # ap_wifi_data[ap_interfaces[interface]][2], - # ap_wifi_data[ap_interfaces[interface]][1], - # ap_wifi_data[ap_interfaces[interface]][3][1], - # ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # # pass - # if ap_wifi_data[ap_interfaces[interface]][1] == "psk-mixed": - # ssid = ["ssid_idx=" + str(interface) + - # " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + - # " security=WPA|WPA2" + - # " password=" + ap_wifi_data[ap_interfaces[interface]][2] + - # " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], - # ap_wifi_data[ap_interfaces[interface]][2], - # ap_wifi_data[ap_interfaces[interface]][1], - # ap_wifi_data[ap_interfaces[interface]][3][1], - # ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # # pass - # if ap_wifi_data[ap_interfaces[interface]][1] == "psk2": - # ssid = ["ssid_idx=" + str(interface) + - # " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + - # " security=WPA2" + - # " password=" + ap_wifi_data[ap_interfaces[interface]][2] + - # " bssid=" + str(ap_wifi_data[ap_interfaces[interface]][3][0]).lower() - # ] - # print(ssid) - # idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], - # ap_wifi_data[ap_interfaces[interface]][2], - # ap_wifi_data[ap_interfaces[interface]][1], - # ap_wifi_data[ap_interfaces[interface]][3][1], - # ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # # pass - # if ap_wifi_data[ap_interfaces[interface]][1] == "sae": - # ssid = ["ssid_idx=" + str(interface) + - # " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + - # " security=WPA3" + - # " password=" + ap_wifi_data[ap_interfaces[interface]][2] + - # " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], - # ap_wifi_data[ap_interfaces[interface]][2], - # ap_wifi_data[ap_interfaces[interface]][1], - # ap_wifi_data[ap_interfaces[interface]][3][1], - # ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # # pass - # if ap_wifi_data[ap_interfaces[interface]][1] == "sae-mixed": - # ssid = ["ssid_idx=" + str(interface) + - # " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + - # " security=WPA3" + - # " password=" + ap_wifi_data[ap_interfaces[interface]][2] + - # " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], - # ap_wifi_data[ap_interfaces[interface]][2], - # ap_wifi_data[ap_interfaces[interface]][1], - # ap_wifi_data[ap_interfaces[interface]][3][1], - # ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # # pass - # if ap_wifi_data[ap_interfaces[interface]][1] == "wpa2": - # ssid = ["ssid_idx=" + str(interface) + - # " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + - # " security=EAP-TTLS" + - # " bssid=" + str(ap_wifi_data[ap_interfaces[interface]][3][0]).lower() - # ] - # - # idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], - # ap_wifi_data[ap_interfaces[interface]][2], - # ap_wifi_data[ap_interfaces[interface]][1], - # ap_wifi_data[ap_interfaces[interface]][3][1], - # ap_wifi_data[ap_interfaces[interface]][3][0] - # ] - # # pass - # ssid_data.append(ssid) - # lf_tools.ssid_list.append(ap_wifi_data[ap_interfaces[interface]][0]) - # lf_tools.dut_idx_mapping = idx_mapping - # print(ssid_data) - # lf_tools.reset_scenario() - # lf_tools.update_ssid(ssid_data=ssid_data) - yield test_cases + # call this, if 1.x + return_1x = fixtures_ver.setup_profiles(request, param, setup_controller, testbed, setup_vlan, get_equipment_id, + instantiate_profile, + get_markers, create_lanforge_chamberview_dut, lf_tools, + get_security_flags, get_configuration, radius_info, get_apnos, + radius_accounting_info) + yield return_1x @pytest.fixture(scope="session") def lf_test(get_configuration, setup_influx): - # print(get_configuration) obj = RunTest(lanforge_data=get_configuration['traffic_generator']['details'], influx_params=setup_influx) - # pytest.exit("") yield obj diff --git a/tests/e2e/basic/validation_of_operating_modes/bridge_mode/client_connectivity/test_general_security_modes.py b/tests/e2e/basic/validation_of_operating_modes/bridge_mode/client_connectivity/test_general_security_modes.py index 9c8e0496d..0166308a5 100644 --- a/tests/e2e/basic/validation_of_operating_modes/bridge_mode/client_connectivity/test_general_security_modes.py +++ b/tests/e2e/basic/validation_of_operating_modes/bridge_mode/client_connectivity/test_general_security_modes.py @@ -9,7 +9,7 @@ import allure import pytest pytestmark = [pytest.mark.client_connectivity, pytest.mark.bridge, pytest.mark.general, pytest.mark.ucentral, - pytest.mark.sanity] # pytest.mark.usefixtures("setup_test_run")] + pytest.mark.sanity, pytest.mark.uc_sanity] # pytest.mark.usefixtures("setup_test_run")] setup_params_general = { "mode": "BRIDGE", @@ -29,7 +29,6 @@ setup_params_general = { } -@pytest.mark.uc_sanity @pytest.mark.suiteA @pytest.mark.sudo @allure.feature("BRIDGE MODE CLIENT CONNECTIVITY") diff --git a/tests/e2e/basic/validation_of_operating_modes/nat_mode/client_connectivity/test_general_security_modes.py b/tests/e2e/basic/validation_of_operating_modes/nat_mode/client_connectivity/test_general_security_modes.py index e662583bb..98260c454 100644 --- a/tests/e2e/basic/validation_of_operating_modes/nat_mode/client_connectivity/test_general_security_modes.py +++ b/tests/e2e/basic/validation_of_operating_modes/nat_mode/client_connectivity/test_general_security_modes.py @@ -9,7 +9,7 @@ import allure import pytest pytestmark = [pytest.mark.client_connectivity, pytest.mark.nat, pytest.mark.general, pytest.mark.sanity, - pytest.mark.ucentral] + pytest.mark.uc_sanity, pytest.mark.ucentral] setup_params_general = { "mode": "NAT", @@ -29,7 +29,7 @@ setup_params_general = { } -@pytest.mark.uc_sanity + @pytest.mark.suiteA @pytest.mark.sanity_ucentral @allure.feature("NAT MODE CLIENT CONNECTIVITY") diff --git a/tests/fixtures_1x.py b/tests/fixtures_1x.py new file mode 100644 index 000000000..cfa932821 --- /dev/null +++ b/tests/fixtures_1x.py @@ -0,0 +1,576 @@ +import sys +import os +if "libs" not in sys.path: + sys.path.append(f'../libs') +for folder in 'py-json', 'py-scripts': + if folder not in sys.path: + sys.path.append(f'../lanforge/lanforge-scripts/{folder}') + +sys.path.append( + os.path.dirname( + os.path.realpath(__file__) + ) +) +sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity") + +sys.path.append(f'../libs') +sys.path.append(f'../libs/lanforge/') + +from LANforge.LFUtils import * + +if 'py-json' not in sys.path: + sys.path.append('../py-scripts') +from apnos.apnos import APNOS +from controller.controller import Controller +from controller.controller import FirmwareUtility +import pytest +from cv_test_manager import cv_test +from configuration import CONFIGURATION +from configuration import RADIUS_SERVER_DATA +from configuration import RADIUS_ACCOUNTING_DATA +from configuration import TEST_CASES +from testrails.testrail_api import APIClient +from testrails.reporting import Reporting +from lf_tools import ChamberView +from sta_connect2 import StaConnect2 +from os import path +from typing import Any, Callable, Optional + +import time +import allure +import pytest + +class Fixtures_1x: + + def __init__(self, configuration={}): + self.lab_info = configuration + print(self.lab_info) + print("1.X") + try: + self.controller_obj = Controller(controller_data=self.lab_info["controller"]) + except Exception as e: + print(e) + allure.attach(body=str(e), name="Controller Instantiation Failed: ") + sdk_client = False + pytest.exit("unable to communicate to Controller" + str(e)) + + def disconnect(self): + self.controller_obj.disconnect_Controller() + + def setup_firmware(self): + pass + + def get_ap_version(self, get_apnos, get_configuration): + version_list = [] + for access_point_info in get_configuration['access_point']: + ap_ssh = get_apnos(access_point_info) + version = ap_ssh.get_ap_version_ucentral() + version_list.append(version) + return version_list + + def setup_profiles(self, request, param, setup_controller, testbed, setup_vlan, get_equipment_id, instantiate_profile, + get_markers, create_lanforge_chamberview_dut, lf_tools, + get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info): + + instantiate_profile = instantiate_profile(sdk_client=setup_controller) + vlan_id, mode = 0, 0 + instantiate_profile.cleanup_objects() + parameter = dict(param) + print(parameter) + test_cases = {} + profile_data = {} + if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]: + print("Invalid Mode: ", parameter['mode']) + allure.attach(body=parameter['mode'], name="Invalid Mode: ") + return test_cases + + if parameter['mode'] == "NAT": + mode = "NAT" + vlan_id = 1 + if parameter['mode'] == "BRIDGE": + mode = "BRIDGE" + vlan_id = 1 + if parameter['mode'] == "VLAN": + mode = "BRIDGE" + vlan_id = setup_vlan + + instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Equipment-AP-" + parameter['mode']) + + profile_data["equipment_ap"] = {"profile_name": testbed + "-Equipment-AP-" + parameter['mode']} + profile_data["ssid"] = {} + for i in parameter["ssid_modes"]: + profile_data["ssid"][i] = [] + for j in range(len(parameter["ssid_modes"][i])): + profile_name = testbed + "-SSID-" + i + "-" + str(j) + "-" + parameter['mode'] + data = parameter["ssid_modes"][i][j] + data["profile_name"] = profile_name + if "mode" not in dict(data).keys(): + data["mode"] = mode + if "vlan" not in dict(data).keys(): + data["vlan"] = vlan_id + instantiate_profile.delete_profile_by_name(profile_name=profile_name) + profile_data["ssid"][i].append(data) + # print(profile_name) + # print(profile_data) + + instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + mode) + time.sleep(10) + """ + Setting up rf profile + """ + rf_profile_data = { + "name": "RF-Profile-" + testbed + "-" + parameter['mode'] + "-" + + get_configuration['access_point'][0]['mode'] + } + + for i in parameter["rf"]: + rf_profile_data[i] = parameter['rf'][i] + # print(rf_profile_data) + + try: + instantiate_profile.delete_profile_by_name(profile_name=rf_profile_data['name']) + instantiate_profile.set_rf_profile(profile_data=rf_profile_data, + mode=get_configuration['access_point'][0]['mode']) + allure.attach(body=str(rf_profile_data), + name="RF Profile Created : " + get_configuration['access_point'][0]['mode']) + except Exception as e: + print(e) + allure.attach(body=str(e), name="Exception ") + + # Radius Profile Creation + if parameter["radius"]: + radius_info = radius_info + radius_info["name"] = testbed + "-Automation-Radius-Profile-" + mode + instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + mode) + try: + instantiate_profile.create_radius_profile(radius_info=radius_info) + allure.attach(body=str(radius_info), + name="Radius Profile Created") + test_cases['radius_profile'] = True + except Exception as e: + print(e) + test_cases['radius_profile'] = False + + # SSID Profile Creation + lf_dut_data = [] + for mode in profile_data['ssid']: + if mode == "open": + for j in profile_data["ssid"][mode]: + # print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_open_ssid_profile(profile_data=j) + test_cases["open_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["open_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + + if mode == "wpa": + for j in profile_data["ssid"][mode]: + # print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_wpa_ssid_profile(profile_data=j) + test_cases["wpa_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + + if mode == "wpa2_personal": + for j in profile_data["ssid"][mode]: + # print(j) + + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_wpa2_personal_ssid_profile(profile_data=j) + test_cases["wpa2_personal_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa2_personal_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + + if mode == "wpa_wpa2_personal_mixed": + for j in profile_data["ssid"][mode]: + # print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + + creates_profile = instantiate_profile.create_wpa_wpa2_personal_mixed_ssid_profile( + profile_data=j) + test_cases["wpa_wpa2_personal_mixed_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa_wpa2_personal_mixed_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + + if mode == "wpa3_personal": + for j in profile_data["ssid"][mode]: + print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + + creates_profile = instantiate_profile.create_wpa3_personal_ssid_profile(profile_data=j) + test_cases["wpa3_personal_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa3_personal_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + if mode == "wpa3_personal_mixed": + for j in profile_data["ssid"][mode]: + print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_wpa3_personal_mixed_ssid_profile( + profile_data=j) + test_cases["wpa3_personal_mixed_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa3_personal_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + + if mode == "wpa_enterprise": + for j in profile_data["ssid"][mode]: + + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_wpa_enterprise_ssid_profile(profile_data=j) + test_cases["wpa_enterprise_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + + except Exception as e: + print(e) + test_cases["wpa_enterprise_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + if mode == "wpa2_enterprise": + for j in profile_data["ssid"][mode]: + # print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_wpa2_enterprise_ssid_profile(profile_data=j) + test_cases["wpa2_enterprise_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa2_enterprise_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + if mode == "wpa3_enterprise": + for j in profile_data["ssid"][mode]: + # print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_wpa3_enterprise_ssid_profile(profile_data=j) + test_cases["wpa3_enterprise_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa3_enterprise_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + + if mode == "wpa_wpa2_enterprise_mixed": + for j in profile_data["ssid"][mode]: + # print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_wpa_wpa2_enterprise_mixed_ssid_profile( + profile_data=j) + test_cases["wpa_wpa2_enterprise_mixed_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa_wpa2_enterprise_mixed_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + if mode == "wpa3_enterprise_mixed": + for j in profile_data["ssid"][mode]: + # print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_wpa3_enterprise_mixed_ssid_profile( + profile_data=j) + test_cases["wpa3_enterprise_mixed_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa3_enterprise_mixed_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + + if mode == "wep": + for j in profile_data["ssid"][mode]: + # print(j) + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + for i in range(len(j["appliedRadios"])): + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz") + j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz") + creates_profile = instantiate_profile.create_wep_ssid_profile(profile_data=j) + test_cases["wpa3_enterprise_2g"] = True + allure.attach(body=str(creates_profile), + name="SSID Profile Created") + except Exception as e: + print(e) + test_cases["wpa3_enterprise_2g"] = False + allure.attach(body=str(e), + name="SSID Profile Creation Failed") + # Equipment AP Profile Creation + try: + instantiate_profile.set_ap_profile(profile_data=profile_data['equipment_ap']) + test_cases["equipment_ap"] = True + allure.attach(body=str(profile_data['equipment_ap']), + name="Equipment AP Profile Created") + except Exception as e: + print(e) + test_cases["equipment_ap"] = False + allure.attach(body=str(e), + name="Equipment AP Profile Creation Failed") + + # Push the Equipment AP Profile to AP + try: + for i in get_equipment_id: + instantiate_profile.push_profile_old_method(equipment_id=i) + except Exception as e: + print(e) + print("failed to create AP Profile") + + ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/") + # ssid_names = [] + # for i in instantiate_profile.profile_creation_ids["ssid"]: + # ssid_names.append(instantiate_profile.get_ssid_name_by_profile_id(profile_id=i)) + # ssid_names.sort() + ssid_names = [] + for i in lf_dut_data: + ssid_names.append(i["ssid_name"]) + ssid_names.sort() + # This loop will check the VIF Config with cloud profile + vif_config = [] + test_cases['vifc'] = False + for i in range(0, 18): + vif_config = list(ap_ssh.get_vif_config_ssids()) + vif_config.sort() + print(vif_config) + print(ssid_names) + if ssid_names == vif_config: + test_cases['vifc'] = True + break + time.sleep(10) + allure.attach( + body=str("VIF Config: " + str(vif_config) + "\n" + "SSID Pushed from Controller: " + str(ssid_names)), + name="SSID Profiles in VIF Config and Controller: ") + ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/") + + # This loop will check the VIF Config with VIF State + test_cases['vifs'] = False + for i in range(0, 18): + vif_state = list(ap_ssh.get_vif_state_ssids()) + vif_state.sort() + vif_config = list(ap_ssh.get_vif_config_ssids()) + vif_config.sort() + print(vif_config) + print(vif_state) + if vif_state == vif_config: + test_cases['vifs'] = True + break + time.sleep(10) + allure.attach(body=str("VIF Config: " + str(vif_config) + "\n" + "VIF State: " + str(vif_state)), + name="SSID Profiles in VIF Config and VIF State: ") + + ap_logs = ap_ssh.logread() + allure.attach(body=ap_logs, name="AP LOgs: ") + ssid_info = ap_ssh.get_ssid_info() + ssid_data = [] + print(ssid_info) + band_mapping = ap_ssh.get_bssid_band_mapping() + print(band_mapping) + idx_mapping = {} + for i in range(0, len(ssid_info)): + if ssid_info[i][1] == "OPEN": + ssid_info[i].append("") + if ssid_info[i][1] == "OPEN": + ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=OPEN" + + " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] + idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], + ssid_info[i][0]] + + if ssid_info[i][1] == "WPA": + ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA" + + " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] + idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], + ssid_info[i][0]] + if ssid_info[i][1] == "WPA2": + ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA2" + + " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] + idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], + ssid_info[i][0]] + if ssid_info[i][1] == "WPA3_PERSONAL": + ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA3" + + " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] + idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], + ssid_info[i][0]] + + if ssid_info[i][1] == "WPA | WPA2": + ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA|WPA2" + + " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] + idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], + ssid_info[i][0]] + + if ssid_info[i][1] == "EAP-TTLS": + ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=EAP-TTLS" + + " password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]] + idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]], + ssid_info[i][0]] + ssid_data.append(ssid) + lf_tools.dut_idx_mapping = idx_mapping + # Add bssid password and security from iwinfo data + # Format SSID Data in the below format + # ssid_data = [ + # ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'], + # ['ssid_idx=1 ssid=Default-SSID-5gl password=12345678 bssid=90:3c:b3:94:48:59'] + # ] + allure.attach(name="SSID DATA IN LF DUT", body=str(ssid_data)) + lf_tools.update_ssid(ssid_data=ssid_data) + + def teardown_session(): + print("\nRemoving Profiles") + instantiate_profile.delete_profile_by_name(profile_name=profile_data['equipment_ap']['profile_name']) + instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["ssid"]) + instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["radius"]) + instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["rf"]) + allure.attach(body=str(profile_data['equipment_ap']['profile_name'] + "\n"), + name="Tear Down in Profiles ") + time.sleep(20) + + request.addfinalizer(teardown_session) + return test_cases diff --git a/tests/fixtures_2x.py b/tests/fixtures_2x.py new file mode 100644 index 000000000..26345d1e5 --- /dev/null +++ b/tests/fixtures_2x.py @@ -0,0 +1,414 @@ +""" Python Inbuilt Libraries """ +import allure +import pytest +import sys +import os +import json +import time + +""" Environment Paths """ +if "libs" not in sys.path: + sys.path.append(f'../libs') +for folder in 'py-json', 'py-scripts': + if folder not in sys.path: + sys.path.append(f'../lanforge/lanforge-scripts/{folder}') + +sys.path.append( + os.path.dirname( + os.path.realpath(__file__) + ) +) +sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity") + +sys.path.append(f'../libs') +sys.path.append(f'../libs/lanforge/') + +from LANforge.LFUtils import * + +if 'py-json' not in sys.path: + sys.path.append('../py-scripts') +from apnos.apnos import APNOS +from controller.controller_2x.controller import Controller +from configuration import CONFIGURATION +from configuration import RADIUS_SERVER_DATA +from configuration import RADIUS_ACCOUNTING_DATA + + +class Fixtures_2x: + + def __init__(self, configuration={}): + self.lab_info = configuration + print(self.lab_info) + print("2.X") + try: + self.controller_obj = Controller(controller_data=self.lab_info["controller"]) + except Exception as e: + print(e) + allure.attach(body=str(e), name="Controller Instantiation Failed: ") + sdk_client = False + pytest.exit("unable to communicate to Controller" + str(e)) + + def disconnect(self): + self.controller_obj.logout() + + def setup_firmware(self): + pass + + def get_ap_version(self, get_apnos, get_configuration): + version_list = [] + for access_point_info in get_configuration['access_point']: + ap_ssh = get_apnos(access_point_info) + version = ap_ssh.get_ap_version_ucentral() + version_list.append(version) + return version_list + + def setup_profiles(self, request, param, setup_controller, testbed, setup_vlan, get_equipment_id, + instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools, + get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info): + print("inside conftest_2x") + if not request.config.getoption("1.x"): + instantiate_profile_obj = instantiate_profile(sdk_client=setup_controller) + print("garbage") + print(1, instantiate_profile_obj.sdk_client) + vlan_id, mode = 0, 0 + parameter = dict(param) + print("hola", parameter) + test_cases = {} + profile_data = {} + + if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]: + print("Invalid Mode: ", parameter['mode']) + return test_cases + + instantiate_profile_obj.set_radio_config() + + if parameter['mode'] == "NAT": + mode = "NAT" + instantiate_profile_obj.set_mode(mode=mode) + vlan_id = 1 + if parameter['mode'] == "BRIDGE": + mode = "BRIDGE" + instantiate_profile_obj.set_mode(mode=mode) + vlan_id = 1 + if parameter['mode'] == "VLAN": + mode = "VLAN" + instantiate_profile_obj.set_mode(mode=mode) + vlan_id = setup_vlan + profile_data["ssid"] = {} + + for i in parameter["ssid_modes"]: + profile_data["ssid"][i] = [] + for j in range(len(parameter["ssid_modes"][i])): + data = parameter["ssid_modes"][i][j] + profile_data["ssid"][i].append(data) + lf_dut_data = [] + for mode in profile_data['ssid']: + if mode == "open": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'none' + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa_2g"] = False + if mode == "wpa": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'psk' + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa_2g"] = False + if mode == "wpa2_personal": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'psk2' + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa2_personal"] = False + if mode == "wpa_wpa2_personal_mixed": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'psk-mixed' + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa2_personal"] = False + if mode == "wpa3_personal": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'sae' + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa2_personal"] = False + if mode == "wpa3_personal_mixed": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'sae-mixed' + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa2_personal"] = False + # EAP SSID Modes + if mode == "wpa2_enterprise": + for j in profile_data["ssid"][mode]: + if mode in get_markers.keys() and get_markers[mode]: + try: + if j["appliedRadios"].__contains__("2G"): + lf_dut_data.append(j) + if j["appliedRadios"].__contains__("5G"): + lf_dut_data.append(j) + j["appliedRadios"] = list(set(j["appliedRadios"])) + j['security'] = 'wpa2' + RADIUS_SERVER_DATA = radius_info + RADIUS_ACCOUNTING_DATA = radius_accounting_info + creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j, radius=True, + radius_auth_data=RADIUS_SERVER_DATA, + radius_accounting_data=RADIUS_ACCOUNTING_DATA) + test_cases["wpa_2g"] = True + except Exception as e: + print(e) + test_cases["wpa2_personal"] = False + ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/", sdk="2.x") + connected, latest, active = ap_ssh.get_ucentral_status() + if connected == False: + pytest.exit("AP is disconnected from UC Gateway") + if latest != active: + allure.attach(name="FAIL : ubus call ucentral status: ", + body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(active)) + ap_logs = ap_ssh.logread() + allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ") + pytest.fail("AP is disconnected from UC Gateway") + instantiate_profile_obj.push_config(serial_number=get_equipment_id[0]) + time_1 = time.time() + config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"')) + config["uuid"] = 0 + ap_config_latest = ap_ssh.get_uc_latest_config() + try: + ap_config_latest["uuid"] = 0 + except Exception as e: + print(e) + pass + x = 1 + old_config = latest + connected, latest, active = ap_ssh.get_ucentral_status() + while old_config == latest: + time.sleep(5) + x += 1 + print("old config: " , old_config) + print("latest: " , latest) + connected, latest, active = ap_ssh.get_ucentral_status() + if x == 19: + break + connected, latest, active = ap_ssh.get_ucentral_status() + x = 1 + while active != latest: + connected, latest, active = ap_ssh.get_ucentral_status() + time.sleep(10) + x += 1 + print("active: ", active) + print("latest: ", latest) + if x == 19: + break + if x < 19: + print("Config properly applied into AP", config) + ap_config_latest = ap_ssh.get_uc_latest_config() + ap_config_latest["uuid"] = 0 + + ap_config_active = ap_ssh.get_uc_active_config() + ap_config_active["uuid"] = 0 + x = 1 + while ap_config_active != ap_config_latest: + time.sleep(5) + x += 1 + ap_config_latest = ap_ssh.get_uc_latest_config() + ap_config_latest["uuid"] = 0 + + ap_config_active = ap_ssh.get_uc_active_config() + print("latest config: ", ap_config_latest) + print("Active config: ", ap_config_active) + ap_config_active["uuid"] = 0 + if x == 19: + break + allure_body = "AP config status: \n" + \ + "Active Config: " + str(ap_ssh.get_uc_active_config()) + "\n" \ + "Latest Config: ", str( + ap_ssh.get_uc_latest_config()) + "\n" \ + "Applied Config: ", str(config) + + if x < 19: + print("AP is Broadcasting Applied Config") + allure.attach(name="AP is Broadcasting Applied Config", body="") + allure.attach(name="Config Info", body="Applied Config: " + str(ap_config_active)) + allure.attach(name="Config Info", + body="AP is Broadc3asting Applied Config: " + str(ap_ssh.get_uc_active_config())) + allure.attach(name="Config Info", body="AP is Broadcasting Applied Config: " + str(allure_body)) + + else: + print("AP is Not Broadcasting Applied Config") + allure.attach(name="AP is Not Broadcasting Applied Config", body="") + allure.attach(name="Config Info", body="Applied Config: " + str(ap_config_active)) + allure.attach(name="Config Info", + body="AP is Broadc3asting Applied Config: " + str(ap_ssh.get_uc_active_config())) + allure.attach(name="Config Info", body="AP is Broadcasting Applied Config: " + str(allure_body)) + time_2 = time.time() + time_interval = time_2 - time_1 + allure.attach(name="Time Took to apply Config: " + str(time_interval), body="") + ap_logs = ap_ssh.logread() + allure.attach(body=ap_logs, name="AP LOgs: ") + ap_wifi_data = ap_ssh.get_interface_details() + idx_mapping = {} + ssid_data = [] + ap_interfaces = list(ap_wifi_data.keys()) + for interface in range(len(ap_interfaces)): + if ap_wifi_data[ap_interfaces[interface]][1] == "none": + ssid = ["ssid_idx=" + str(interface) + + " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + + " security=OPEN" + + " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], + ap_wifi_data[ap_interfaces[interface]][2], + ap_wifi_data[ap_interfaces[interface]][1], + ap_wifi_data[ap_interfaces[interface]][3][1], + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + # pass + if ap_wifi_data[ap_interfaces[interface]][1] == "psk": + ssid = ["ssid_idx=" + str(interface) + + " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + + " security=WPA" + + " password=" + ap_wifi_data[ap_interfaces[interface]][2] + + " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], + ap_wifi_data[ap_interfaces[interface]][2], + ap_wifi_data[ap_interfaces[interface]][1], + ap_wifi_data[ap_interfaces[interface]][3][1], + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + # pass + if ap_wifi_data[ap_interfaces[interface]][1] == "psk-mixed": + ssid = ["ssid_idx=" + str(interface) + + " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + + " security=WPA|WPA2" + + " password=" + ap_wifi_data[ap_interfaces[interface]][2] + + " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], + ap_wifi_data[ap_interfaces[interface]][2], + ap_wifi_data[ap_interfaces[interface]][1], + ap_wifi_data[ap_interfaces[interface]][3][1], + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + # pass + if ap_wifi_data[ap_interfaces[interface]][1] == "psk2": + ssid = ["ssid_idx=" + str(interface) + + " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + + " security=WPA2" + + " password=" + ap_wifi_data[ap_interfaces[interface]][2] + + " bssid=" + str(ap_wifi_data[ap_interfaces[interface]][3][0]).lower() + ] + print(ssid) + idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], + ap_wifi_data[ap_interfaces[interface]][2], + ap_wifi_data[ap_interfaces[interface]][1], + ap_wifi_data[ap_interfaces[interface]][3][1], + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + # pass + if ap_wifi_data[ap_interfaces[interface]][1] == "sae": + ssid = ["ssid_idx=" + str(interface) + + " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + + " security=WPA3" + + " password=" + ap_wifi_data[ap_interfaces[interface]][2] + + " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], + ap_wifi_data[ap_interfaces[interface]][2], + ap_wifi_data[ap_interfaces[interface]][1], + ap_wifi_data[ap_interfaces[interface]][3][1], + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + # pass + if ap_wifi_data[ap_interfaces[interface]][1] == "sae-mixed": + ssid = ["ssid_idx=" + str(interface) + + " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + + " security=WPA3" + + " password=" + ap_wifi_data[ap_interfaces[interface]][2] + + " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], + ap_wifi_data[ap_interfaces[interface]][2], + ap_wifi_data[ap_interfaces[interface]][1], + ap_wifi_data[ap_interfaces[interface]][3][1], + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + # pass + if ap_wifi_data[ap_interfaces[interface]][1] == "wpa2": + ssid = ["ssid_idx=" + str(interface) + + " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] + + " security=EAP-TTLS" + + " bssid=" + str(ap_wifi_data[ap_interfaces[interface]][3][0]).lower() + ] + + idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0], + ap_wifi_data[ap_interfaces[interface]][2], + ap_wifi_data[ap_interfaces[interface]][1], + ap_wifi_data[ap_interfaces[interface]][3][1], + ap_wifi_data[ap_interfaces[interface]][3][0] + ] + # pass + ssid_data.append(ssid) + lf_tools.ssid_list.append(ap_wifi_data[ap_interfaces[interface]][0]) + lf_tools.dut_idx_mapping = idx_mapping + print(ssid_data) + lf_tools.reset_scenario() + lf_tools.update_ssid(ssid_data=ssid_data) + return test_cases + else: + return False