""" Pytest fixtures: High level Resource Management and base setup fixtures """ import datetime import random import string import sys import os import time import allure import re import logging from _pytest.fixtures import SubRequest from pyparsing import Optional ALLURE_ENVIRONMENT_PROPERTIES_FILE = 'environment.properties' ALLUREDIR_OPTION = '--alluredir' # if "logs" not in os.listdir(): # os.mkdir("logs/") # logging.basicConfig(level=logging.INFO, filename="logs/" + '{:%Y-%m-%d-%H-%M-%S}.log'.format(datetime.datetime.now())) sys.path.append( os.path.dirname( os.path.realpath(__file__) ) ) if "libs" not in sys.path: sys.path.append(f'../libs') for folder in 'py-json', 'py-scripts': if folder not in sys.path: sys.path.append(f'../lanforge/lanforge-scripts/{folder}') sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity") sys.path.append(f'../libs') sys.path.append(f'../libs/lanforge/') from LANforge.LFUtils import * if 'py-json' not in sys.path: sys.path.append('../py-scripts') from apnos.apnos import APNOS from controller.controller_1x.controller import FirmwareUtility import pytest from lanforge.lf_tests import RunTest from cv_test_manager import cv_test from configuration import CONFIGURATION from configuration import RADIUS_SERVER_DATA from configuration import RADIUS_ACCOUNTING_DATA from configuration import RATE_LIMITING_RADIUS_SERVER_DATA from configuration import RATE_LIMITING_RADIUS_ACCOUNTING_DATA from lanforge.scp_util import SCP_File from testrails.testrail_api import APIClient from testrails.reporting import Reporting from lf_tools import ChamberView from os import path from typing import Any, Callable, Optional from _pytest.fixtures import SubRequest from pytest import fixture import fixtures_1x from fixtures_1x import Fixtures_1x import fixtures_2x from fixtures_2x import Fixtures_2x ALLURE_ENVIRONMENT_PROPERTIES_FILE = 'environment.properties' ALLUREDIR_OPTION = '--alluredir' def pytest_addoption(parser): """pytest addoption function: contains ini objects and options""" parser.addini("tr_url", "Test Rail URL") parser.addini("tr_prefix", "Test Rail Prefix (Generally Testbed_name_)") parser.addini("tr_user", "Testrail Username") parser.addini("tr_pass", "Testrail Password") parser.addini("tr_project_id", "Testrail Project ID") parser.addini("milestone", "milestone Id") parser.addini("influx_host", "Influx Host", default="influx.cicd.lab.wlan.tip.build") parser.addini("influx_port", "Influx Port", default=80) parser.addini("influx_token", "Influx Token", default="TCkdATXAbHmNbn4QyNaj43WpGBYxFrzV") parser.addini("influx_bucket", "influx bucket", default="tip-cicd") parser.addini("influx_org", "influx organization", default="tip") parser.addini(name="firmware", type='string', help="AP Firmware build URL", default="0") parser.addini("cloud_ctlr", "AP Firmware build URL", default="0") parser.addini("num_stations", "Number of Stations/Clients for testing") # change behaviour parser.addoption( "--skip-upgrade", action="store_true", default=False, help="skip updating firmware on the AP (useful for local testing)" ) parser.addoption( "--skip-lanforge", action="store_true", default=False, help="skip to do any interactions on lanforge (to be used in case of interop)" ) # change behaviour parser.addoption( "--exit-on-fail", action="store_true", default=False, help="skip updating firmware on the AP (useful for local testing)" ) # change to Ucentral Ctlr parser.addoption( "--1.x", action="store_true", default=False, help="Option to run Test Cases on 1.x SDK" ) # change behaviour parser.addoption( "--force-upgrade", action="store_true", default=False, help="force Upgrading Firmware even if it is already latest version" ) parser.addoption( "--force-upload", action="store_true", default=False, help="force Uploading Firmware even if it is already latest version" ) # this has to be the last argument # example: --access-points ECW5410 EA8300-EU parser.addoption( "--testbed", # nargs="+", default="basic-01", help="AP Model which is needed to test" ) parser.addoption( "--use-testrail", action="store_false", default=True, help="Stop using Testrails" ) # Perfecto Parameters parser.addini("perfectoURL", "Cloud URL") parser.addini("securityToken", "Security Token") parser.addini("platformName-iOS", "iOS Platform") parser.addini("platformName-android", "Android Platform") parser.addini("model-iOS", "iOS Devices") parser.addini("model-android", "Android Devices") parser.addini("bundleId-iOS", "iOS Devices") parser.addini("bundleId-iOS-Settings", "iOS Settings App") parser.addini("appPackage-android", "Android Devices") parser.addini("bundleId-iOS-Safari", "Safari BundleID") parser.addini("wifi-SSID-2g-Pwd", "Wifi 2g Password") parser.addini("Default-SSID-5gl-perfecto-b", "Wifi 5g AP Name") parser.addini("Default-SSID-2g-perfecto-b", "Wifi 2g AP Name") parser.addini("Default-SSID-perfecto-b", "Wifi AP Name") parser.addini("bundleId-iOS-Ping", "Ping Bundle ID") parser.addini("browserType-iOS", "Mobile Browser Name") parser.addini("projectName", "Project Name") parser.addini("projectVersion", "Project Version") parser.addini("jobName", "CI Job Name") parser.addini("jobNumber", "CI Job Number") parser.addini("reportTags", "Report Tags") parser.addoption( "--access-points-perfecto", # nargs="+", default=["Perfecto"], help="list of access points to test" ) """ Test session base fixture """ # To be depreciated as testrails will go @pytest.fixture(scope="session") def test_cases(): """Yields the test cases from configuration.py: will be depreciated""" yield [] @pytest.fixture(scope="session") def testbed(request): """yields the testbed option selection""" var = request.config.getoption("--testbed") yield var @pytest.fixture(scope="session") def should_upload_firmware(request): """yields the --force-upload option for firmware upload selection""" yield request.config.getoption("--force-upload") @pytest.fixture(scope="session") def should_upgrade_firmware(request): """yields the --force-upgrade option for firmware upgrade selection""" yield request.config.getoption("--force-upgrade") @pytest.fixture(scope="session") def exit_on_fail(request): """yields the --exit-on-fail option for exiting the test case if it fails without teardown""" yield request.config.getoption("--exit-on-fail") @pytest.fixture(scope="session") def radius_info(): """yields the radius server information from lab info file""" yield RADIUS_SERVER_DATA @pytest.fixture(scope="session") def radius_accounting_info(): """yields the radius accounting information from lab info file""" yield RADIUS_ACCOUNTING_DATA @pytest.fixture(scope="session") def rate_radius_info(): """yields the radius server information from lab info file""" yield RATE_LIMITING_RADIUS_SERVER_DATA @pytest.fixture(scope="session") def rate_radius_accounting_info(): """yields the radius accounting information from lab info file""" yield RATE_LIMITING_RADIUS_ACCOUNTING_DATA @pytest.fixture(scope="session") def get_configuration(testbed, request): """yields the selected testbed information from lab info file (configuration.py)""" if request.config.getini("cloud_ctlr") != "0": CONFIGURATION[testbed]["controller"]["url"] = request.config.getini("cloud_ctlr") if request.config.getini("firmware") != "0": version = request.config.getini("firmware") version_list = version.split(",") for i in range(len(CONFIGURATION[testbed]["access_point"])): CONFIGURATION[testbed]["access_point"][i]["version"] = version_list[i] yield CONFIGURATION[testbed] @pytest.fixture(scope="session") def get_apnos(): """yields the LIBRARY for APNOS, Reduces the use of imports across files""" yield APNOS @pytest.fixture(scope="session") def get_equipment_ref(request, setup_controller, testbed, get_configuration): """""" if request.config.getoption("1.x"): equipment_id_list = [] for i in get_configuration['access_point']: equipment_id_list.append(setup_controller.get_equipment_id( serial_number=i['serial'])) else: equipment_id_list = [] for i in get_configuration['access_point']: equipment_id_list.append(i['serial']) yield equipment_id_list @pytest.fixture(scope="session") def get_sdk_version(fixtures_ver): version = fixtures_ver.get_sdk_version() yield version @pytest.fixture(scope="session") def get_uci_show(fixtures_ver, get_apnos, get_configuration): uci_show = fixtures_ver.get_uci_show(get_apnos, get_configuration) yield uci_show @pytest.fixture(scope="session") def skip_lf(request): yield request.config.getoption("--skip-lanforge") # Controller Fixture @pytest.fixture(scope="session") def setup_controller(request, get_configuration, add_env_properties, fixtures_ver): """sets up the controller connection and yields the sdk_client object""" sdk_client = fixtures_ver.controller_obj request.addfinalizer(fixtures_ver.disconnect) yield sdk_client @pytest.fixture(scope="session") def setup_firmware(setup_controller): """ Fixture to Setup Firmware with the selected sdk """ setup_controller.instantiate_firmware() yield True @pytest.fixture(scope="session") def instantiate_firmware(request, setup_controller, get_configuration): """sets up firmware utility and yields the object for firmware upgrade""" if request.config.getoption("--1.x"): firmware_client_obj = [] for access_point_info in get_configuration['access_point']: version = access_point_info["version"] if request.config.getini("build").__contains__("https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/"): version = request.config.getini("build") firmware_client = FirmwareUtility(sdk_client=setup_controller, model=access_point_info["model"], version_url=version) firmware_client_obj.append(firmware_client) yield firmware_client_obj else: # 2.x pass @pytest.fixture(scope="session") def get_latest_firmware(request, instantiate_firmware): """yields the list of firmware version""" if request.config.getoption("--1.x"): fw_version_list = [] try: for fw_obj in instantiate_firmware: latest_firmware = fw_obj.get_fw_version() latest_firmware = latest_firmware.replace(".tar.gz", "") fw_version_list.append(latest_firmware) except Exception as e: print(e) fw_version_list = [] yield fw_version_list else: # 2.x pass @pytest.fixture(scope="session") def upload_firmware(request, should_upload_firmware, instantiate_firmware): """yields the firmware_id that is uploaded to cloud""" if request.config.getoption("--1.x"): firmware_id_list = [] for i in range(0, len(instantiate_firmware)): firmware_id = instantiate_firmware[i].upload_fw_on_cloud(force_upload=should_upload_firmware) firmware_id_list.append(firmware_id) yield firmware_id_list else: # 2.x release yield True @pytest.fixture(scope="session") def upgrade_firmware(request, instantiate_firmware, get_equipment_ref, check_ap_firmware_cloud, get_latest_firmware, should_upgrade_firmware, should_upload_firmware, get_apnos, get_configuration): """yields the status of upgrade of firmware. waits for 300 sec after each upgrade request""" print(should_upgrade_firmware, should_upload_firmware) if request.config.getoption("--1.x"): status_list = [] active_fw_list = [] try: for access_point in get_configuration['access_point']: ap_ssh = get_apnos(access_point, sdk="1.x") active_fw = ap_ssh.get_active_firmware() active_fw_list.append(active_fw) except Exception as e: print(e) active_fw_list = [] print(active_fw_list, get_latest_firmware) if get_latest_firmware != active_fw_list: if request.config.getoption("--skip-upgrade"): status = "skip-upgrade" status_list.append(status) else: for i in range(0, len(instantiate_firmware)): status = instantiate_firmware[i].upgrade_fw(equipment_id=get_equipment_ref[i], force_upload=True, force_upgrade=should_upgrade_firmware) status_list.append(status) else: if should_upgrade_firmware: for i in range(0, len(instantiate_firmware)): status = instantiate_firmware[i].upgrade_fw(equipment_id=get_equipment_ref[i], force_upload=should_upload_firmware, force_upgrade=should_upgrade_firmware) status_list.append(status) else: status = "skip-upgrade Version Already Available" status_list.append(status) yield status_list else: # 2.x release pass @pytest.fixture(scope="session") def check_ap_firmware_cloud(request, setup_controller, get_equipment_ref): """yields the active version of firmware on cloud""" if request.config.getoption("--1.x"): ap_fw_list = [] for i in get_equipment_ref: ap_fw_list.append(setup_controller.get_ap_firmware_old_method(equipment_id=i)) yield ap_fw_list else: # 2.x pass @pytest.fixture(scope="session") def check_ap_firmware_ssh(get_configuration, request): """yields the active version of firmware on ap""" if request.config.getoption("--1.x"): active_fw_list = [] try: for access_point in get_configuration['access_point']: ap_ssh = APNOS(access_point) active_fw = ap_ssh.get_active_firmware() active_fw_list.append(active_fw) except Exception as e: print(e) active_fw_list = [] yield active_fw_list else: # 2.x pass @pytest.fixture(scope="session") def setup_test_run(setup_controller, request, upgrade_firmware, get_configuration, get_equipment_ref, get_latest_firmware, get_apnos): """used to upgrade the firmware on AP and should be called on each test case on a module level""" if request.config.getoption("--1.x"): active_fw_list = [] try: for access_point in get_configuration['access_point']: ap_ssh = get_apnos(access_point, sdk="1.x") active_fw = ap_ssh.get_active_firmware() active_fw_list.append(active_fw) except Exception as e: print(e) active_fw_list = [] print(active_fw_list, get_latest_firmware) if active_fw_list == get_latest_firmware: yield True else: pytest.exit("AP is not Upgraded tp Target Firmware versions") else: # 2.x pass """ Instantiate Reporting """ @pytest.fixture(scope="session") def update_report(request, testbed, get_configuration): """used to update the test report on testrail/allure""" if request.config.getoption("--use-testrail"): tr_client = Reporting() else: tr_client = APIClient(request.config.getini("tr_url"), request.config.getini("tr_user"), request.config.getini("tr_pass"), request.config.getini("tr_project_id")) if request.config.getoption("--use-testrail"): tr_client.rid = "skip testrails" else: projId = tr_client.get_project_id(project_name=request.config.getini("tr_project_id")) test_run_name = request.config.getini("tr_prefix") + testbed + "_" + str( datetime.date.today()) + "_" + get_configuration['access_point'][0]['version'] tr_client.create_testrun(name=test_run_name, case_ids=list(TEST_CASES.values()), project_id=projId, milestone_id=request.config.getini("milestone"), description="Automated Nightly Sanity test run for new firmware build") rid = tr_client.get_run_id(test_run_name=test_run_name) tr_client.rid = rid yield tr_client """ FRAMEWORK MARKER LOGIC """ @pytest.fixture(scope="session") def get_security_flags(): """used to get the essential markers on security and band""" # Add more classifications as we go security = ["open", "wpa", "wep", "wpa2_personal", "wpa3_personal", "wpa3_personal_mixed", "wpa_wpa2_enterprise_mixed", "wpa2_eap", "wpa2_only_eap", "wpa_wpa2_personal_mixed", "wpa_enterprise", "wpa2_enterprise","wpa2_enterprise_rate", "wpa3_enterprise_mixed", "wpa3_enterprise", "twog", "fiveg", "radius"] yield security @pytest.fixture(scope="session") def get_markers(request, get_security_flags): """used to get the markers on the selected test case class, used in setup_profiles""" session = request.node markers = list() security = get_security_flags security_dict = dict().fromkeys(security) for item in session.items: for j in item.iter_markers(): markers.append(j.name) for i in security: if set(markers).__contains__(i): security_dict[i] = True else: security_dict[i] = False yield security_dict @pytest.fixture(scope="session") def test_access_point(fixtures_ver, request, get_configuration, get_apnos): """used to check the manager status of AP, should be used as a setup to verify if ap can reach cloud""" status = fixtures_ver.get_ap_cloud_connectivity_status(get_configuration, get_apnos) def teardown_session(): data = [] data.append(False) for s in status: data.append(s[0]) print(data) if False not in data: pytest.exit("AP is Not connected to ucentral gw") allure.attach(name=str(status), body="") request.addfinalizer(teardown_session) yield status @pytest.fixture(scope="session") def test_ap_connection_status(fixtures_ver, request, get_configuration, get_apnos): """used to check the manager status of AP, should be used as a setup to verify if ap can reach cloud""" connection, redirector_value = fixtures_ver.get_ap_status_logs(get_configuration, get_apnos) yield connection, redirector_value @pytest.fixture(scope="session") def traffic_generator_connectivity(testbed, get_configuration): """Verify if traffic generator is reachable""" if get_configuration['traffic_generator']['name'] == "lanforge": lanforge_ip = get_configuration['traffic_generator']['details']['ip'] lanforge_port = get_configuration['traffic_generator']['details']['port'] # Condition : # if gui connection is not available # yield False # Condition : # If Gui Connection is available # yield the gui version try: cv = cv_test(lanforge_ip, lanforge_port) url_data = cv.get_ports("/") lanforge_GUI_version = url_data["VersionInfo"]["BuildVersion"] lanforge_gui_git_version = url_data["VersionInfo"]["GitVersion"] lanforge_gui_build_date = url_data["VersionInfo"]["BuildDate"] print(lanforge_GUI_version, lanforge_gui_build_date, lanforge_gui_git_version) if not (lanforge_GUI_version or lanforge_gui_build_date or lanforge_gui_git_version): yield False else: yield lanforge_GUI_version except: yield False else: yield True @pytest.fixture(scope="session") def create_lanforge_chamberview_dut(lf_tools, skip_lf): dut_name = "" if not skip_lf: dut_object, dut_name = lf_tools.Create_Dut() return dut_name @pytest.fixture(scope="session") def lf_tools(get_configuration, testbed, skip_lf): """ Create a DUT on LANforge""" if not skip_lf: obj = ChamberView(lanforge_data=get_configuration["traffic_generator"]["details"], testbed=testbed, access_point_data=get_configuration["access_point"]) else: obj = False yield obj @pytest.fixture(scope="session") def lf_test(get_configuration, setup_influx, request, skip_lf): if not skip_lf: if request.config.getoption("--exit-on-fail"): obj = RunTest(lanforge_data=get_configuration['traffic_generator']['details'], influx_params=setup_influx, debug=True) if request.config.getoption("--exit-on-fail") is False: obj = RunTest(lanforge_data=get_configuration['traffic_generator']['details'], influx_params=setup_influx, debug=False) yield obj @pytest.fixture(scope="session") def setup_influx(request, testbed, get_configuration): """ Setup Influx Parameters: Used in CV Automation""" influx_params = { "influx_host": request.config.getini("influx_host"), "influx_port": request.config.getini("influx_port"), "influx_token": request.config.getini("influx_token"), "influx_bucket": request.config.getini("influx_bucket"), "influx_org": request.config.getini("influx_org"), "influx_tag": [testbed, get_configuration["access_point"][0]["model"]], } yield influx_params # Need for Perforce Mobile Device Execution def pytest_sessionstart(session): session.results = dict() @fixture(scope='session', autouse=True) def add_allure_environment_property(request: SubRequest) -> Optional[Callable]: environment_properties = dict() def maker(key: str, value: Any): environment_properties.update({key: value}) yield maker alluredir = request.config.getoption(ALLUREDIR_OPTION) if not alluredir or not os.path.isdir(alluredir) or not environment_properties: return allure_env_path = path.join(alluredir, ALLURE_ENVIRONMENT_PROPERTIES_FILE) with open(allure_env_path, 'w') as _f: data = '\n'.join([f'{variable}={value}' for variable, value in environment_properties.items()]) _f.write(data) @fixture(scope='session') def add_env_properties(get_configuration, get_sdk_version, get_apnos, fixtures_ver, add_allure_environment_property: Callable) -> None: add_allure_environment_property('Access-Point-Model', get_configuration["access_point"][0]["model"]) add_allure_environment_property('SDK-Version', get_sdk_version) try: add_allure_environment_property('Access-Point-Firmware-Version', fixtures_ver.get_ap_version(get_apnos, get_configuration)[0].split("\n")[1]) except Exception as e: print(e) pass add_allure_environment_property('Cloud-Controller-SDK-URL', get_configuration["controller"]["url"]) add_allure_environment_property('AP-Serial-Number', get_configuration["access_point"][0]["serial"] + "\n") @pytest.fixture(scope="session") def fixtures_ver(request, get_configuration): if request.config.getoption("1.x") is False: print("2.x") obj = Fixtures_2x(configuration=get_configuration) if request.config.getoption("1.x"): print("1.x") obj = Fixtures_1x(configuration=get_configuration) yield obj @pytest.fixture(scope="session") def firmware_upgrade(fixtures_ver, get_apnos, get_configuration): upgrade_status = fixtures_ver.setup_firmware(get_apnos, get_configuration) yield upgrade_status """ Logs related Fixtures """ @pytest.fixture(scope="function") def get_ap_logs(request, get_apnos, get_configuration): S = 9 instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) for ap in get_configuration['access_point']: ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x") ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name) def collect_logs(): for ap in get_configuration['access_point']: ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x") ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name) ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, stop_ref="stop testcase: " + instance_name) allure.attach(name='logread', body=str(ap_logs)) pass request.addfinalizer(collect_logs) @pytest.fixture(scope="function") def get_lf_logs(request, get_apnos, get_configuration): ip = get_configuration["traffic_generator"]["details"]["ip"] port = get_configuration["traffic_generator"]["details"]["ssh_port"] def collect_logs_lf(): log_0 = "/home/lanforge/lanforge_log_0.txt" log_1 = "/home/lanforge/lanforge_log_1.txt" obj = SCP_File(ip=ip, port=port, username="root", password="lanforge", remote_path=log_0, local_path=".") obj.pull_file() allure.attach.file(source="lanforge_log_0.txt", name="lanforge_log_0") obj = SCP_File(ip=ip, port=port, username="root", password="lanforge", remote_path=log_1, local_path=".") obj.pull_file() allure.attach.file(source="lanforge_log_1.txt", name="lanforge_log_1") request.addfinalizer(collect_logs_lf)