mirror of
				https://github.com/Telecominfraproject/wlan-testing.git
				synced 2025-10-31 19:08:01 +00:00 
			
		
		
		
	 750ce7599c
			
		
	
	750ce7599c
	
	
	
		
			
			Added ap logs and reporting to allure if ap is disconnected from gateway Added Improvements in Connectivity check logs
		
			
				
	
	
		
			758 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			758 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
|     Pytest fixtures: High level Resource Management and base setup fixtures
 | |
| """
 | |
| import datetime
 | |
| import random
 | |
| import string
 | |
| import sys
 | |
| import os
 | |
| import time
 | |
| 
 | |
| import allure
 | |
| import re
 | |
| import logging
 | |
| 
 | |
| from _pytest.fixtures import SubRequest
 | |
| from pyparsing import Optional
 | |
| 
 | |
| ALLURE_ENVIRONMENT_PROPERTIES_FILE = 'environment.properties'
 | |
| ALLUREDIR_OPTION = '--alluredir'
 | |
| 
 | |
| # if "logs" not in os.listdir():
 | |
| #     os.mkdir("logs/")
 | |
| # logging.basicConfig(level=logging.INFO, filename="logs/" + '{:%Y-%m-%d-%H-%M-%S}.log'.format(datetime.datetime.now()))
 | |
| sys.path.append(
 | |
|     os.path.dirname(
 | |
|         os.path.realpath(__file__)
 | |
|     )
 | |
| )
 | |
| if "libs" not in sys.path:
 | |
|     sys.path.append(f'../libs')
 | |
| for folder in 'py-json', 'py-scripts':
 | |
|     if folder not in sys.path:
 | |
|         sys.path.append(f'../lanforge/lanforge-scripts/{folder}')
 | |
| 
 | |
| sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity")
 | |
| 
 | |
| sys.path.append(f'../libs')
 | |
| sys.path.append(f'../libs/lanforge/')
 | |
| 
 | |
| from LANforge.LFUtils import *
 | |
| 
 | |
| if 'py-json' not in sys.path:
 | |
|     sys.path.append('../py-scripts')
 | |
| from apnos.apnos import APNOS
 | |
| from controller.controller_1x.controller import FirmwareUtility
 | |
| import pytest
 | |
| from lanforge.lf_tests import RunTest
 | |
| from cv_test_manager import cv_test
 | |
| from configuration import CONFIGURATION
 | |
| from configuration import open_flow
 | |
| from configuration import RADIUS_SERVER_DATA
 | |
| from configuration import RADIUS_ACCOUNTING_DATA
 | |
| from configuration import RATE_LIMITING_RADIUS_SERVER_DATA
 | |
| from configuration import RATE_LIMITING_RADIUS_ACCOUNTING_DATA
 | |
| from lanforge.scp_util import SCP_File
 | |
| from testrails.testrail_api import APIClient
 | |
| from testrails.reporting import Reporting
 | |
| from lf_tools import ChamberView
 | |
| from os import path
 | |
| from typing import Any, Callable, Optional
 | |
| 
 | |
| from _pytest.fixtures import SubRequest
 | |
| from pytest import fixture
 | |
| 
 | |
| import fixtures_1x
 | |
| from fixtures_1x import Fixtures_1x
 | |
| import fixtures_2x
 | |
| from fixtures_2x import Fixtures_2x
 | |
| 
 | |
| ALLURE_ENVIRONMENT_PROPERTIES_FILE = 'environment.properties'
 | |
| ALLUREDIR_OPTION = '--alluredir'
 | |
| 
 | |
| 
 | |
| def pytest_addoption(parser):
 | |
|     """pytest addoption function: contains ini objects and options"""
 | |
|     parser.addini("tr_url", "Test Rail URL")
 | |
|     parser.addini("tr_prefix", "Test Rail Prefix (Generally Testbed_name_)")
 | |
|     parser.addini("tr_user", "Testrail Username")
 | |
|     parser.addini("tr_pass", "Testrail Password")
 | |
|     parser.addini("tr_project_id", "Testrail Project ID")
 | |
|     parser.addini("milestone", "milestone Id")
 | |
| 
 | |
|     parser.addini("influx_host", "Influx Host", default="influx.cicd.lab.wlan.tip.build")
 | |
|     parser.addini("influx_port", "Influx Port", default=80)
 | |
|     parser.addini("influx_token", "Influx Token", default="TCkdATXAbHmNbn4QyNaj43WpGBYxFrzV")
 | |
|     parser.addini("influx_bucket", "influx bucket", default="tip-cicd")
 | |
|     parser.addini("influx_org", "influx organization", default="tip")
 | |
|     parser.addini(name="firmware", type='string', help="AP Firmware build URL", default="0")
 | |
|     parser.addini("cloud_ctlr", "AP Firmware build URL", default="0")
 | |
| 
 | |
|     parser.addini("num_stations", "Number of Stations/Clients for testing")
 | |
| 
 | |
|     # change behaviour
 | |
|     parser.addoption(
 | |
|         "--skip-upgrade",
 | |
|         action="store_true",
 | |
|         default=False,
 | |
|         help="skip updating firmware on the AP (useful for local testing)"
 | |
|     )
 | |
| 
 | |
|     parser.addoption(
 | |
|         "--skip-lanforge",
 | |
|         action="store_true",
 | |
|         default=False,
 | |
|         help="skip to do any interactions on lanforge (to be used in case of interop)"
 | |
|     )
 | |
| 
 | |
|     # change behaviour
 | |
|     parser.addoption(
 | |
|         "--exit-on-fail",
 | |
|         action="store_true",
 | |
|         default=False,
 | |
|         help="skip updating firmware on the AP (useful for local testing)"
 | |
|     )
 | |
| 
 | |
|     # change to Ucentral Ctlr
 | |
|     parser.addoption(
 | |
|         "--1.x",
 | |
|         action="store_true",
 | |
|         default=False,
 | |
|         help="Option to run Test Cases on 1.x SDK"
 | |
|     )
 | |
| 
 | |
|     # change behaviour
 | |
|     parser.addoption(
 | |
|         "--force-upgrade",
 | |
|         action="store_true",
 | |
|         default=False,
 | |
|         help="force Upgrading Firmware even if it is already latest version"
 | |
|     )
 | |
|     parser.addoption(
 | |
|         "--force-upload",
 | |
|         action="store_true",
 | |
|         default=False,
 | |
|         help="force Uploading Firmware even if it is already latest version"
 | |
|     )
 | |
|     # this has to be the last argument
 | |
|     # example: --access-points ECW5410 EA8300-EU
 | |
|     parser.addoption(
 | |
|         "--testbed",
 | |
|         # nargs="+",
 | |
|         default="basic-01",
 | |
|         help="AP Model which is needed to test"
 | |
|     )
 | |
|     parser.addoption(
 | |
|         "--use-testrail",
 | |
|         action="store_false",
 | |
|         default=True,
 | |
|         help="Stop using Testrails"
 | |
|     )
 | |
| 
 | |
|     # Perfecto Parameters
 | |
|     parser.addini("perfectoURL", "Cloud URL")
 | |
|     parser.addini("securityToken", "Security Token")
 | |
|     parser.addini("platformName-iOS", "iOS Platform")
 | |
|     parser.addini("platformName-android", "Android Platform")
 | |
|     parser.addini("model-iOS", "iOS Devices")
 | |
|     parser.addini("model-android", "Android Devices")
 | |
|     parser.addini("bundleId-iOS", "iOS Devices")
 | |
|     parser.addini("bundleId-iOS-Settings", "iOS Settings App")
 | |
|     parser.addini("appPackage-android", "Android Devices")
 | |
|     parser.addini("bundleId-iOS-Safari", "Safari BundleID")
 | |
|     parser.addini("wifi-SSID-2g-Pwd", "Wifi 2g Password")
 | |
|     parser.addini("Default-SSID-5gl-perfecto-b", "Wifi 5g AP Name")
 | |
|     parser.addini("Default-SSID-2g-perfecto-b", "Wifi 2g AP Name")
 | |
|     parser.addini("Default-SSID-perfecto-b", "Wifi AP Name")
 | |
|     parser.addini("bundleId-iOS-Ping", "Ping Bundle ID")
 | |
|     parser.addini("browserType-iOS", "Mobile Browser Name")
 | |
|     parser.addini("projectName", "Project Name")
 | |
|     parser.addini("projectVersion", "Project Version")
 | |
|     parser.addini("jobName", "CI Job Name")
 | |
|     parser.addini("jobNumber", "CI Job Number")
 | |
|     parser.addini("reportTags", "Report Tags")
 | |
|     parser.addoption(
 | |
|         "--access-points-perfecto",
 | |
|         # nargs="+",
 | |
|         default=["Perfecto"],
 | |
|         help="list of access points to test"
 | |
|     )
 | |
| 
 | |
| 
 | |
| """
 | |
| Test session base fixture
 | |
| """
 | |
| 
 | |
| 
 | |
| # To be depreciated as testrails will go
 | |
| @pytest.fixture(scope="session")
 | |
| def test_cases():
 | |
|     """Yields the test cases from configuration.py: will be depreciated"""
 | |
|     yield []
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def testbed(request):
 | |
|     """yields the testbed option selection"""
 | |
|     var = request.config.getoption("--testbed")
 | |
|     yield var
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def should_upload_firmware(request):
 | |
|     """yields the --force-upload option for firmware upload selection"""
 | |
|     yield request.config.getoption("--force-upload")
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def should_upgrade_firmware(request):
 | |
|     """yields the --force-upgrade option  for firmware upgrade selection"""
 | |
|     yield request.config.getoption("--force-upgrade")
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def exit_on_fail(request):
 | |
|     """yields the --exit-on-fail option for exiting the test case if it fails without teardown"""
 | |
|     yield request.config.getoption("--exit-on-fail")
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def radius_info():
 | |
|     """yields the radius server information from lab info file"""
 | |
|     yield RADIUS_SERVER_DATA
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def radius_accounting_info():
 | |
|     """yields the radius accounting information from lab info file"""
 | |
|     yield RADIUS_ACCOUNTING_DATA
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def rate_radius_info():
 | |
|     """yields the radius server information from lab info file"""
 | |
|     yield RATE_LIMITING_RADIUS_SERVER_DATA
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def rate_radius_accounting_info():
 | |
|     """yields the radius accounting information from lab info file"""
 | |
|     yield RATE_LIMITING_RADIUS_ACCOUNTING_DATA
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def get_configuration(testbed, request):
 | |
|     """yields the selected testbed information from lab info file (configuration.py)"""
 | |
|     if request.config.getini("cloud_ctlr") != "0":
 | |
|         CONFIGURATION[testbed]["controller"]["url"] = request.config.getini("cloud_ctlr")
 | |
|     if request.config.getini("firmware") != "0":
 | |
|         version = request.config.getini("firmware")
 | |
|         version_list = version.split(",")
 | |
|         for i in range(len(CONFIGURATION[testbed]["access_point"])):
 | |
|             print("i", i)
 | |
|             print(version_list)
 | |
|             CONFIGURATION[testbed]["access_point"][i]["version"] = version_list[0]
 | |
|     yield CONFIGURATION[testbed]
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def get_apnos():
 | |
|     """yields the LIBRARY for APNOS, Reduces the use of imports across files"""
 | |
|     yield APNOS
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def get_equipment_ref(request, setup_controller, testbed, get_configuration):
 | |
|     """"""
 | |
|     if request.config.getoption("1.x"):
 | |
|         equipment_id_list = []
 | |
|         for i in get_configuration['access_point']:
 | |
|             equipment_id_list.append(setup_controller.get_equipment_id(
 | |
|                 serial_number=i['serial']))
 | |
|     else:
 | |
|         equipment_id_list = []
 | |
|         for i in get_configuration['access_point']:
 | |
|             equipment_id_list.append(i['serial'])
 | |
|     yield equipment_id_list
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def get_sdk_version(fixtures_ver):
 | |
|     version = fixtures_ver.get_sdk_version()
 | |
|     yield version
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def get_uci_show(fixtures_ver, get_apnos, get_configuration):
 | |
|     uci_show = fixtures_ver.get_uci_show(get_apnos, get_configuration)
 | |
|     yield uci_show
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def skip_lf(request):
 | |
|     yield request.config.getoption("--skip-lanforge")
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def get_openflow():
 | |
|     yield open_flow
 | |
| 
 | |
| 
 | |
| # Controller Fixture
 | |
| @pytest.fixture(scope="session")
 | |
| def setup_controller(request, get_configuration, add_env_properties, fixtures_ver):
 | |
|     """sets up the controller connection and yields the sdk_client object"""
 | |
|     sdk_client = fixtures_ver.controller_obj
 | |
|     request.addfinalizer(fixtures_ver.disconnect)
 | |
|     yield sdk_client
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def setup_firmware(setup_controller):
 | |
|     """ Fixture to Setup Firmware with the selected sdk """
 | |
|     setup_controller.instantiate_firmware()
 | |
|     yield True
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def instantiate_firmware(request, setup_controller, get_configuration):
 | |
|     """sets up firmware utility and yields the object for firmware upgrade"""
 | |
|     if request.config.getoption("--1.x"):
 | |
|         firmware_client_obj = []
 | |
| 
 | |
|         for access_point_info in get_configuration['access_point']:
 | |
|             version = access_point_info["version"]
 | |
|             if request.config.getini("build").__contains__("https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/"):
 | |
|                 version = request.config.getini("build")
 | |
|             firmware_client = FirmwareUtility(sdk_client=setup_controller,
 | |
|                                               model=access_point_info["model"],
 | |
|                                               version_url=version)
 | |
|             firmware_client_obj.append(firmware_client)
 | |
|         yield firmware_client_obj
 | |
|     else:
 | |
|         # 2.x
 | |
|         pass
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def get_latest_firmware(request, instantiate_firmware):
 | |
|     """yields the list of firmware version"""
 | |
|     if request.config.getoption("--1.x"):
 | |
|         fw_version_list = []
 | |
|         try:
 | |
| 
 | |
|             for fw_obj in instantiate_firmware:
 | |
|                 latest_firmware = fw_obj.get_fw_version()
 | |
|                 latest_firmware = latest_firmware.replace(".tar.gz", "")
 | |
|                 fw_version_list.append(latest_firmware)
 | |
|         except Exception as e:
 | |
|             print(e)
 | |
|             fw_version_list = []
 | |
| 
 | |
|         yield fw_version_list
 | |
|     else:
 | |
|         # 2.x
 | |
|         pass
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def upload_firmware(request, should_upload_firmware, instantiate_firmware):
 | |
|     """yields the firmware_id that is uploaded to cloud"""
 | |
|     if request.config.getoption("--1.x"):
 | |
|         firmware_id_list = []
 | |
|         for i in range(0, len(instantiate_firmware)):
 | |
|             firmware_id = instantiate_firmware[i].upload_fw_on_cloud(force_upload=should_upload_firmware)
 | |
|             firmware_id_list.append(firmware_id)
 | |
|         yield firmware_id_list
 | |
|     else:
 | |
|         # 2.x release
 | |
|         yield True
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def upgrade_firmware(request, instantiate_firmware, get_equipment_ref, check_ap_firmware_cloud, get_latest_firmware,
 | |
|                      should_upgrade_firmware, should_upload_firmware, get_apnos, get_configuration):
 | |
|     """yields the status of upgrade of firmware. waits for 300 sec after each upgrade request"""
 | |
|     print(should_upgrade_firmware, should_upload_firmware)
 | |
|     if request.config.getoption("--1.x"):
 | |
|         status_list = []
 | |
|         active_fw_list = []
 | |
|         try:
 | |
|             for access_point in get_configuration['access_point']:
 | |
|                 ap_ssh = get_apnos(access_point, sdk="1.x")
 | |
|                 active_fw = ap_ssh.get_active_firmware()
 | |
|                 active_fw_list.append(active_fw)
 | |
|         except Exception as e:
 | |
|             print(e)
 | |
|             active_fw_list = []
 | |
|         print(active_fw_list, get_latest_firmware)
 | |
|         if get_latest_firmware != active_fw_list:
 | |
|             if request.config.getoption("--skip-upgrade"):
 | |
|                 status = "skip-upgrade"
 | |
|                 status_list.append(status)
 | |
|             else:
 | |
| 
 | |
|                 for i in range(0, len(instantiate_firmware)):
 | |
|                     status = instantiate_firmware[i].upgrade_fw(equipment_id=get_equipment_ref[i], force_upload=True,
 | |
|                                                                 force_upgrade=should_upgrade_firmware)
 | |
|                     status_list.append(status)
 | |
|         else:
 | |
|             if should_upgrade_firmware:
 | |
|                 for i in range(0, len(instantiate_firmware)):
 | |
|                     status = instantiate_firmware[i].upgrade_fw(equipment_id=get_equipment_ref[i],
 | |
|                                                                 force_upload=should_upload_firmware,
 | |
|                                                                 force_upgrade=should_upgrade_firmware)
 | |
|                     status_list.append(status)
 | |
|             else:
 | |
|                 status = "skip-upgrade Version Already Available"
 | |
|                 status_list.append(status)
 | |
|         yield status_list
 | |
|     else:
 | |
|         # 2.x release
 | |
|         pass
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def check_ap_firmware_cloud(request, setup_controller, get_equipment_ref):
 | |
|     """yields the active version of firmware on cloud"""
 | |
|     if request.config.getoption("--1.x"):
 | |
|         ap_fw_list = []
 | |
|         for i in get_equipment_ref:
 | |
|             ap_fw_list.append(setup_controller.get_ap_firmware_old_method(equipment_id=i))
 | |
|         yield ap_fw_list
 | |
|     else:
 | |
|         # 2.x
 | |
|         pass
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def check_ap_firmware_ssh(get_configuration, request):
 | |
|     """yields the active version of firmware on ap"""
 | |
|     if request.config.getoption("--1.x"):
 | |
|         active_fw_list = []
 | |
|         try:
 | |
|             for access_point in get_configuration['access_point']:
 | |
|                 ap_ssh = APNOS(access_point)
 | |
|                 active_fw = ap_ssh.get_active_firmware()
 | |
|                 active_fw_list.append(active_fw)
 | |
|         except Exception as e:
 | |
|             print(e)
 | |
|             active_fw_list = []
 | |
|         yield active_fw_list
 | |
|     else:
 | |
|         # 2.x
 | |
|         pass
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def setup_test_run(setup_controller, request, upgrade_firmware, get_configuration,
 | |
|                    get_equipment_ref, get_latest_firmware,
 | |
|                    get_apnos):
 | |
|     """used to upgrade the firmware on AP and should be called on each test case on a module level"""
 | |
|     if request.config.getoption("--1.x"):
 | |
|         active_fw_list = []
 | |
|         try:
 | |
|             for access_point in get_configuration['access_point']:
 | |
|                 ap_ssh = get_apnos(access_point, sdk="1.x")
 | |
|                 active_fw = ap_ssh.get_active_firmware()
 | |
|                 active_fw_list.append(active_fw)
 | |
|         except Exception as e:
 | |
|             print(e)
 | |
|             active_fw_list = []
 | |
|         print(active_fw_list, get_latest_firmware)
 | |
|         if active_fw_list == get_latest_firmware:
 | |
|             yield True
 | |
|         else:
 | |
|             pytest.exit("AP is not Upgraded tp Target Firmware versions")
 | |
|     else:
 | |
|         # 2.x
 | |
|         pass
 | |
| 
 | |
| 
 | |
| """
 | |
| Instantiate Reporting
 | |
| """
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def update_report(request, testbed, get_configuration):
 | |
|     """used to update the test report on testrail/allure"""
 | |
|     if request.config.getoption("--use-testrail"):
 | |
|         tr_client = Reporting()
 | |
|     else:
 | |
|         tr_client = APIClient(request.config.getini("tr_url"), request.config.getini("tr_user"),
 | |
|                               request.config.getini("tr_pass"), request.config.getini("tr_project_id"))
 | |
|     if request.config.getoption("--use-testrail"):
 | |
|         tr_client.rid = "skip testrails"
 | |
|     else:
 | |
|         projId = tr_client.get_project_id(project_name=request.config.getini("tr_project_id"))
 | |
|         test_run_name = request.config.getini("tr_prefix") + testbed + "_" + str(
 | |
|             datetime.date.today()) + "_" + get_configuration['access_point'][0]['version']
 | |
|         tr_client.create_testrun(name=test_run_name, case_ids=list(TEST_CASES.values()), project_id=projId,
 | |
|                                  milestone_id=request.config.getini("milestone"),
 | |
|                                  description="Automated Nightly Sanity test run for new firmware build")
 | |
|         rid = tr_client.get_run_id(test_run_name=test_run_name)
 | |
|         tr_client.rid = rid
 | |
|     yield tr_client
 | |
| 
 | |
| 
 | |
| """
 | |
| FRAMEWORK MARKER LOGIC
 | |
| """
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def get_security_flags():
 | |
|     """used to get the essential markers on security and band"""
 | |
|     # Add more classifications as we go
 | |
|     security = ["open", "wpa", "wep", "wpa2_personal", "wpa3_personal", "wpa3_personal_mixed",
 | |
|                 "wpa_wpa2_enterprise_mixed", "wpa2_eap", "wpa2_only_eap",
 | |
|                 "wpa_wpa2_personal_mixed", "wpa_enterprise", "wpa2_enterprise", "wpa3_enterprise_mixed",
 | |
|                 "wpa3_enterprise", "twog", "fiveg", "radius"]
 | |
|     yield security
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def get_markers(request, get_security_flags):
 | |
|     """used to get the markers on the selected test case class, used in setup_profiles"""
 | |
|     session = request.node
 | |
|     markers = list()
 | |
|     security = get_security_flags
 | |
|     security_dict = dict().fromkeys(security)
 | |
|     for item in session.items:
 | |
|         for j in item.iter_markers():
 | |
|             markers.append(j.name)
 | |
|     for i in security:
 | |
|         if set(markers).__contains__(i):
 | |
|             security_dict[i] = True
 | |
|         else:
 | |
|             security_dict[i] = False
 | |
| 
 | |
|     yield security_dict
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def test_access_point(fixtures_ver, request, get_configuration, get_apnos):
 | |
|     """used to check the manager status of AP, should be used as a setup to verify if ap can reach cloud"""
 | |
|     status = fixtures_ver.get_ap_cloud_connectivity_status(get_configuration, get_apnos)
 | |
| 
 | |
|     def teardown_session():
 | |
|         data = []
 | |
|         data.append(False)
 | |
|         for s in status:
 | |
|             data.append(s[0])
 | |
|         print(data)
 | |
|         if False not in data:
 | |
|             pytest.exit("AP is Not connected to ucentral gw")
 | |
|         allure.attach(name=str(status), body="")
 | |
| 
 | |
|     request.addfinalizer(teardown_session)
 | |
|     yield status
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def test_ap_connection_status(fixtures_ver, request, get_configuration, get_apnos):
 | |
|     """used to check the manager status of AP, should be used as a setup to verify if ap can reach cloud"""
 | |
|     connection, redirector_value = fixtures_ver.get_ap_status_logs(get_configuration, get_apnos)
 | |
|     yield connection, redirector_value
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def traffic_generator_connectivity(testbed, get_configuration):
 | |
|     """Verify if traffic generator is reachable"""
 | |
|     if get_configuration['traffic_generator']['name'] == "lanforge":
 | |
|         lanforge_ip = get_configuration['traffic_generator']['details']['ip']
 | |
|         lanforge_port = get_configuration['traffic_generator']['details']['port']
 | |
|         # Condition :
 | |
|         #   if gui connection is not available
 | |
|         #   yield False
 | |
|         # Condition :
 | |
|         # If Gui Connection is available
 | |
|         # yield the gui version
 | |
|         try:
 | |
|             cv = cv_test(lanforge_ip, lanforge_port)
 | |
|             url_data = cv.get_ports("/")
 | |
|             lanforge_GUI_version = url_data["VersionInfo"]["BuildVersion"]
 | |
|             lanforge_gui_git_version = url_data["VersionInfo"]["GitVersion"]
 | |
|             lanforge_gui_build_date = url_data["VersionInfo"]["BuildDate"]
 | |
|             print(lanforge_GUI_version, lanforge_gui_build_date, lanforge_gui_git_version)
 | |
|             if not (lanforge_GUI_version or lanforge_gui_build_date or lanforge_gui_git_version):
 | |
|                 yield False
 | |
|             else:
 | |
|                 yield lanforge_GUI_version
 | |
|         except:
 | |
|             yield False
 | |
|     else:
 | |
|         yield True
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def create_lanforge_chamberview_dut(lf_tools, skip_lf):
 | |
|     dut_name = ""
 | |
|     if not skip_lf:
 | |
|         dut_object, dut_name = lf_tools.Create_Dut()
 | |
|     return dut_name
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def lf_tools(get_configuration, testbed, skip_lf):
 | |
|     """ Create a DUT on LANforge"""
 | |
|     if not skip_lf:
 | |
|         obj = ChamberView(lanforge_data=get_configuration["traffic_generator"]["details"],
 | |
|                           testbed=testbed, access_point_data=get_configuration["access_point"])
 | |
|     else:
 | |
|         obj = False
 | |
|     yield obj
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def lf_test(get_configuration, setup_influx, request, skip_lf):
 | |
|     if not skip_lf:
 | |
|         if request.config.getoption("--exit-on-fail"):
 | |
|             obj = RunTest(lanforge_data=get_configuration['traffic_generator']['details'], influx_params=setup_influx,
 | |
|                           debug=True)
 | |
|         if request.config.getoption("--exit-on-fail") is False:
 | |
|             obj = RunTest(lanforge_data=get_configuration['traffic_generator']['details'], influx_params=setup_influx,
 | |
|                           debug=False)
 | |
|     yield obj
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def setup_influx(request, testbed, get_configuration):
 | |
|     """ Setup Influx Parameters: Used in CV Automation"""
 | |
|     influx_params = {
 | |
|         "influx_host": request.config.getini("influx_host"),
 | |
|         "influx_port": request.config.getini("influx_port"),
 | |
|         "influx_token": request.config.getini("influx_token"),
 | |
|         "influx_bucket": request.config.getini("influx_bucket"),
 | |
|         "influx_org": request.config.getini("influx_org"),
 | |
|         "influx_tag": [testbed, get_configuration["access_point"][0]["model"]],
 | |
|     }
 | |
|     yield influx_params
 | |
| 
 | |
| 
 | |
| # Need for Perforce Mobile Device Execution
 | |
| def pytest_sessionstart(session):
 | |
|     session.results = dict()
 | |
| 
 | |
| 
 | |
| @fixture(scope='session', autouse=True)
 | |
| def add_allure_environment_property(request: SubRequest) -> Optional[Callable]:
 | |
|     environment_properties = dict()
 | |
| 
 | |
|     def maker(key: str, value: Any):
 | |
|         environment_properties.update({key: value})
 | |
| 
 | |
|     yield maker
 | |
| 
 | |
|     alluredir = request.config.getoption(ALLUREDIR_OPTION)
 | |
| 
 | |
|     if not alluredir or not os.path.isdir(alluredir) or not environment_properties:
 | |
|         return
 | |
| 
 | |
|     allure_env_path = path.join(alluredir, ALLURE_ENVIRONMENT_PROPERTIES_FILE)
 | |
| 
 | |
|     with open(allure_env_path, 'w') as _f:
 | |
|         data = '\n'.join([f'{variable}={value}' for variable, value in environment_properties.items()])
 | |
|         _f.write(data)
 | |
| 
 | |
| 
 | |
| @fixture(scope='session')
 | |
| def add_env_properties(get_configuration, get_sdk_version, get_apnos, fixtures_ver,
 | |
|                        add_allure_environment_property: Callable) -> None:
 | |
|     add_allure_environment_property('Access-Point-Model', get_configuration["access_point"][0]["model"])
 | |
|     add_allure_environment_property('SDK-Version', get_sdk_version)
 | |
|     try:
 | |
|         add_allure_environment_property('Access-Point-Firmware-Version',
 | |
|                                         fixtures_ver.get_ap_version(get_apnos, get_configuration)[0].split("\n")[1])
 | |
|     except Exception as e:
 | |
|         print(e)
 | |
|         pass
 | |
|     add_allure_environment_property('Cloud-Controller-SDK-URL', get_configuration["controller"]["url"])
 | |
|     add_allure_environment_property('AP-Serial-Number', get_configuration["access_point"][0]["serial"] + "\n")
 | |
| 
 | |
| 
 | |
| @fixture(scope="session")
 | |
| def add_firmware_property_after_upgrade(add_allure_environment_property, fixtures_ver, get_apnos,
 | |
|                                         get_configuration):
 | |
|     # try:
 | |
|     add_allure_environment_property('Access-Point-Firmware-Version',
 | |
|                                         fixtures_ver.get_ap_version(get_apnos, get_configuration)[0].split("\n")[1])
 | |
|     # except Exception as e:
 | |
|     #     print(e)
 | |
|     #     pass
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def fixtures_ver(request, get_configuration):
 | |
|     if request.config.getoption("1.x") is False:
 | |
|         print("2.x")
 | |
|         obj = Fixtures_2x(configuration=get_configuration)
 | |
|     if request.config.getoption("1.x"):
 | |
|         print("1.x")
 | |
|         obj = Fixtures_1x(configuration=get_configuration)
 | |
|     yield obj
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="session")
 | |
| def firmware_upgrade(fixtures_ver, get_apnos, get_configuration):
 | |
|     upgrade_status = fixtures_ver.setup_firmware(get_apnos, get_configuration)
 | |
|     yield upgrade_status
 | |
| 
 | |
| 
 | |
| """
 | |
| Logs related Fixtures
 | |
| """
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="function")
 | |
| def get_ap_logs(request, get_apnos, get_configuration):
 | |
|     S = 9
 | |
|     instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S))
 | |
|     for ap in get_configuration['access_point']:
 | |
|         ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x")
 | |
|         ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name)
 | |
| 
 | |
|     def collect_logs():
 | |
|         for ap in get_configuration['access_point']:
 | |
|             ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x")
 | |
|             ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name)
 | |
|             ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name,
 | |
|                                          stop_ref="stop testcase: " + instance_name)
 | |
|             allure.attach(name='logread', body=str(ap_logs))
 | |
|         pass
 | |
| 
 | |
|     request.addfinalizer(collect_logs)
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="function")
 | |
| def get_lf_logs(request, get_apnos, get_configuration):
 | |
|     ip = get_configuration["traffic_generator"]["details"]["ip"]
 | |
|     port = get_configuration["traffic_generator"]["details"]["ssh_port"]
 | |
| 
 | |
|     def collect_logs_lf():
 | |
|         log_0 = "/home/lanforge/lanforge_log_0.txt"
 | |
|         log_1 = "/home/lanforge/lanforge_log_1.txt"
 | |
|         obj = SCP_File(ip=ip, port=port, username="root", password="lanforge", remote_path=log_0,
 | |
|                        local_path=".")
 | |
|         obj.pull_file()
 | |
|         allure.attach.file(source="lanforge_log_0.txt",
 | |
|                            name="lanforge_log_0")
 | |
|         obj = SCP_File(ip=ip, port=port, username="root", password="lanforge", remote_path=log_1,
 | |
|                        local_path=".")
 | |
|         obj.pull_file()
 | |
|         allure.attach.file(source="lanforge_log_1.txt",
 | |
|                            name="lanforge_log_1")
 | |
| 
 | |
|     request.addfinalizer(collect_logs_lf)
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="function")
 | |
| def get_apnos_logs(get_apnos, get_configuration):
 | |
|     all_logs = []
 | |
|     for ap in get_configuration['access_point']:
 | |
|         ap_ssh = get_apnos(ap, pwd="../libs/apnos/", sdk="2.x")
 | |
|         logs = ap_ssh.logread()
 | |
|         all_logs.append(logs)
 | |
|     yield all_logs
 |