seperate fixture files for 2.x and 1.x

Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
shivamcandela
2021-08-08 01:05:49 +05:30
parent 55e8e528ed
commit 2e8515e148
12 changed files with 2456 additions and 1264 deletions

Submodule lanforge/lanforge-scripts updated: 7fe8579ceb...16276e080c

File diff suppressed because it is too large Load Diff

View File

@@ -1,380 +0,0 @@
"""
Base Library for Ucentral
"""
import json
import ssl
import sys
import time
from urllib.parse import urlparse
import pytest
import allure
import requests
from pathlib import Path
from requests.adapters import HTTPAdapter
import logging
# logging.basicConfig(level=logging.DEBUG)
# from http.client import HTTPConnection
#
# HTTPConnection.debuglevel = 1
# requests.logging.getLogger()
class ConfigureController:
def __init__(self, controller_data):
self.username = controller_data["username"]
self.password = controller_data["password"]
self.host = urlparse(controller_data["url"])
print(self.host)
self.access_token = ""
# self.session = requests.Session()
self.login_resp = self.login()
self.gw_host = self.get_endpoint()
def build_uri_sec(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path)
print(new_uri)
return new_uri
def build_uri(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.gw_host.hostname, self.gw_host.port, path)
print(new_uri)
return new_uri
def login(self):
uri = self.build_uri_sec("oauth2")
# self.session.mount(uri, HTTPAdapter(max_retries=15))
payload = json.dumps({"userId": self.username, "password": self.password})
resp = requests.post(uri, data=payload, verify=False, timeout=100)
self.check_response("POST", resp, "", payload, uri)
token = resp.json()
self.access_token = token["access_token"]
print(token)
# self.session.headers.update({'Authorization': self.access_token})
return resp
def get_endpoint(self):
uri = self.build_uri_sec("systemEndpoints")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("GET", resp, self.make_headers(), "", uri)
devices = resp.json()
print(devices["endpoints"][0]["uri"])
gw_host = urlparse(devices["endpoints"][0]["uri"])
return gw_host
def logout(self):
uri = self.build_uri_sec('oauth2/%s' % self.access_token)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("DELETE", resp, self.make_headers(), "", uri)
print('Logged out:', resp.status_code)
return resp
def make_headers(self):
headers = {'Authorization': 'Bearer %s' % self.access_token,
"Connection": "keep-alive",
"Keep-Alive": "timeout=10, max=1000"
}
return headers
def check_response(self, cmd, response, headers, data_str, url):
if response.status_code >= 400:
if response.status_code >= 400:
print("check-response: ERROR, url: ", url)
else:
print("check-response: url: ", url)
print("Command: ", cmd)
print("response-status: ", response.status_code)
print("response-headers: ", response.headers)
print("response-content: ", response.content)
print("headers: ", headers)
print("data-str: ", data_str)
if response.status_code >= 400:
# if True:
raise NameError("Invalid response code.")
return True
class UController(ConfigureController):
def __init__(self, controller_data=None):
super().__init__(controller_data)
def get_devices(self):
uri = self.build_uri("devices/")
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
devices = resp.json()
# resp.close()()
return devices
def get_device_by_serial_number(self, serial_number=None):
uri = self.build_uri("device/" + serial_number)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
self.check_response("GET", resp, self.make_headers(), "", uri)
device = resp.json()
# resp.close()()
return device
def get_device_uuid(self, serial_number):
device_info = self.get_device_by_serial_number(serial_number=serial_number)
return device_info["UUID"]
class UProfileUtility:
def __init__(self, sdk_client=None, controller_data=None):
if sdk_client is None:
self.sdk_client = UController(controller_data=controller_data)
self.sdk_client = sdk_client
self.base_profile_config = {
"uuid": 1,
"radios": [],
"interfaces": [{
"name": "WAN",
"role": "upstream",
"services": ["lldp", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
},
{
"name": "LAN",
"role": "downstream",
"services": ["ssh", "lldp", "dhcp-snooping"],
"ethernet": [
{
"select-ports": [
"LAN*"
]
}
],
"ipv4": {
"addressing": "static",
"subnet": "192.168.1.1/16",
"dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
}
},
}],
"metrics": {
"statistics": {
"interval": 60,
"types": ["ssids", "lldp", "clients"]
},
"health": {
"interval": 120
},
"wifi-frames": {
"filters": ["probe",
"auth",
"assoc",
"disassoc",
"deauth",
"local-deauth",
"inactive-deauth",
"key-mismatch",
"beacon-report",
"radar-detected"]
},
"dhcp-snooping": {
"filters": ["ack", "discover", "offer", "request", "solicit", "reply", "renew"]
}
},
"services": {
"lldp": {
"describe": "TIP OpenWiFi",
"location": "QA"
},
"ssh": {
"port": 22
}
}
}
self.vlan_section = {
"name": "WAN100",
"role": "upstream",
"vlan": {
"id": 100
},
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
self.mode = None
def set_radio_config(self, radio_config=None):
self.base_profile_config["radios"].append({
"band": "2G",
"country": "US",
# "channel-mode": "HE",
"channel-width": 20,
# "channel": 11
})
self.base_profile_config["radios"].append({
"band": "5G",
"country": "US",
# "channel-mode": "HE",
"channel-width": 80,
# "channel": "auto"
})
self.vlan_section["ssids"] = []
self.vlan_ids = []
def set_mode(self, mode):
self.mode = mode
if mode == "NAT":
self.base_profile_config['interfaces'][1]['ssids'] = []
elif mode == "BRIDGE":
del self.base_profile_config['interfaces'][1]
self.base_profile_config['interfaces'][0]['ssids'] = []
elif mode == "VLAN":
del self.base_profile_config['interfaces'][1]
self.base_profile_config['interfaces'][0]['ssids'] = []
self.base_profile_config['interfaces'] = []
wan_section_vlan = {
"name": "WAN",
"role": "upstream",
"services": ["lldp"],
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
self.base_profile_config['interfaces'].append(wan_section_vlan)
else:
print("Invalid Mode")
return 0
def add_ssid(self, ssid_data, radius=False, radius_auth_data={}, radius_accounting_data={}):
print("ssid data : ", ssid_data)
ssid_info = {'name': ssid_data["ssid_name"], "bss-mode": "ap", "wifi-bands": [], "services": ["wifi-frames"]}
for i in ssid_data["appliedRadios"]:
ssid_info["wifi-bands"].append(i)
ssid_info['encryption'] = {}
ssid_info['encryption']['proto'] = ssid_data["security"]
try:
ssid_info['encryption']['key'] = ssid_data["security_key"]
except Exception as e:
pass
ssid_info['encryption']['ieee80211w'] = "optional"
if radius:
ssid_info["radius"] = {}
ssid_info["radius"]["authentication"] = {
"host": radius_auth_data["ip"],
"port": radius_auth_data["port"],
"secret": radius_auth_data["secret"]
}
ssid_info["radius"]["accounting"] = {
"host": radius_accounting_data["ip"],
"port": radius_accounting_data["port"],
"secret": radius_accounting_data["secret"]
}
if self.mode == "NAT":
self.base_profile_config['interfaces'][1]['ssids'].append(ssid_info)
elif self.mode == "BRIDGE":
self.base_profile_config['interfaces'][0]['ssids'].append(ssid_info)
elif self.mode == "VLAN":
vid = ssid_data["vlan"]
self.vlan_section = {
"name": "WAN100",
"role": "upstream",
"services": ["lldp", "dhcp-snooping"],
"vlan": {
"id": 100
},
"ethernet": [
{
"select-ports": [
"WAN*"
]
}
],
"ipv4": {
"addressing": "dynamic"
}
}
vlan_section = self.vlan_section
if vid in self.vlan_ids:
print("sss", self.vlan_ids)
for i in self.base_profile_config['interfaces']:
if i["name"] == "WANv%s" % (vid):
i["ssids"].append(ssid_info)
else:
print(self.vlan_ids)
self.vlan_ids.append(vid)
vlan_section['name'] = "WANv%s" % (vid)
vlan_section['vlan']['id'] = int(vid)
vlan_section["ssids"] = []
vlan_section["ssids"].append(ssid_info)
self.base_profile_config['interfaces'].append(vlan_section)
print(vlan_section)
vsection = 0
else:
print("invalid mode")
pytest.exit("invalid Operating Mode")
def push_config(self, serial_number):
payload = {"configuration": self.base_profile_config, "serialNumber": serial_number, "UUID": 0}
uri = self.sdk_client.build_uri("device/" + serial_number + "/configure")
basic_cfg_str = json.dumps(payload)
allure.attach(name="ucentral_config: ", body=str(self.base_profile_config))
print(self.base_profile_config)
resp = requests.post(uri, data=basic_cfg_str, headers=self.sdk_client.make_headers(),
verify=False, timeout=100)
self.sdk_client.check_response("POST", resp, self.sdk_client.make_headers(), basic_cfg_str, uri)
print(resp.url)
resp.close()
print(resp)
if __name__ == '__main__':
controller = {
'url': 'https://sec-ucentral-qa01.cicd.lab.wlan.tip.build:16001', # API base url for the controller
'username': "tip@ucentral.com",
'password': 'openwifi',
}
obj = UController(controller_data=controller)
profile = UProfileUtility(sdk_client=obj)
profile.set_mode(mode="BRIDGE")
profile.set_radio_config()
ssid = {"ssid_name": "ssid_wpa2_2g", "appliedRadios": ["2G"], "security": "psk", "security_key": "something",
"vlan": 100}
profile.add_ssid(ssid_data=ssid)
profile.push_config(serial_number="903cb39d6918")
# print(obj.get_devices())
obj.logout()

View File

View File

@@ -10,6 +10,7 @@ import allure
import re
import logging
from _pytest.fixtures import SubRequest
from pyparsing import Optional
@@ -41,7 +42,6 @@ if 'py-json' not in sys.path:
sys.path.append('../py-scripts')
from apnos.apnos import APNOS
from controller.controller import Controller
from controller.ucentral_ctlr import UController
from controller.controller import FirmwareUtility
import pytest
from cv_test_manager import cv_test
@@ -59,6 +59,14 @@ from typing import Any, Callable, Optional
from _pytest.fixtures import SubRequest
from pytest import fixture
import fixtures_1x
from fixtures_1x import Fixtures_1x
import fixtures_2x
from fixtures_2x import Fixtures_2x
ALLURE_ENVIRONMENT_PROPERTIES_FILE = 'environment.properties'
ALLUREDIR_OPTION = '--alluredir'
def pytest_addoption(parser):
"""pytest addoption function: contains ini objects and options"""
@@ -201,14 +209,12 @@ def exit_on_fail(request):
@pytest.fixture(scope="session")
def radius_info():
"""yields the radius server information from lab info file"""
allure.attach(body=str(RADIUS_SERVER_DATA), name="Radius server Info: ")
yield RADIUS_SERVER_DATA
@pytest.fixture(scope="session")
def radius_accounting_info():
"""yields the radius accounting information from lab info file"""
allure.attach(body=str(RADIUS_ACCOUNTING_DATA), name="Radius server Info: ")
yield RADIUS_ACCOUNTING_DATA
@@ -255,40 +261,19 @@ def instantiate_access_point(testbed, get_apnos, get_configuration):
# Controller Fixture
@pytest.fixture(scope="session")
def setup_controller(request, get_configuration, test_access_point, add_env_properties):
def setup_controller(request, get_configuration, test_access_point, add_env_properties, fixtures_ver):
"""sets up the controller connection and yields the sdk_client object"""
try:
if request.config.getoption("1.x"):
sdk_client = Controller(controller_data=get_configuration["controller"])
def teardown_controller():
print("\nTest session Completed")
sdk_client.disconnect_Controller()
request.addfinalizer(teardown_controller)
else:
sdk_client = UController(controller_data=get_configuration["controller"])
def teardown_ucontroller():
print("\nTest session Completed")
sdk_client.logout()
try:
sdk_client.logout()
except Exception as e:
print(e)
request.addfinalizer(teardown_ucontroller)
except Exception as e:
print(e)
allure.attach(body=str(e), name="Controller Instantiation Failed: ")
sdk_client = False
pytest.exit("unable to communicate to Controller" + str(e))
sdk_client = fixtures_ver.controller_obj
request.addfinalizer(fixtures_ver.disconnect)
yield sdk_client
@pytest.fixture(scope="session")
def setup_firmware(fixtures_ver):
""" Fixture to Setup Firmware with the selected sdk """
yield True
@pytest.fixture(scope="session")
def instantiate_firmware(request, setup_controller, get_configuration):
"""sets up firmware utility and yields the object for firmware upgrade"""
@@ -609,12 +594,6 @@ def lf_tools(get_configuration, testbed):
yield obj
# @pytest.fixture(scope="class")
# def create_vlan(request, testbed, get_configuration, lf_tools):
# """Create a vlan on lanforge"""
#
@pytest.fixture(scope="session")
def setup_influx(request, testbed, get_configuration):
""" Setup Influx Parameters: Used in CV Automation"""
@@ -634,10 +613,6 @@ def pytest_sessionstart(session):
session.results = dict()
ALLURE_ENVIRONMENT_PROPERTIES_FILE = 'environment.properties'
ALLUREDIR_OPTION = '--alluredir'
@fixture(scope='session', autouse=True)
def add_allure_environment_property(request: SubRequest) -> Optional[Callable]:
environment_properties = dict()
@@ -660,17 +635,18 @@ def add_allure_environment_property(request: SubRequest) -> Optional[Callable]:
@fixture(scope='session')
def get_uc_ap_version(get_apnos, get_configuration):
version_list = []
for access_point_info in get_configuration['access_point']:
ap_ssh = get_apnos(access_point_info)
version = ap_ssh.get_ap_version_ucentral()
version_list.append(version)
yield version_list
@fixture(scope='session')
def add_env_properties(get_configuration, get_uc_ap_version, add_allure_environment_property: Callable) -> None:
def add_env_properties(get_configuration, get_apnos, fixtures_ver, add_allure_environment_property: Callable) -> None:
add_allure_environment_property('Access-Point-Model', get_configuration["access_point"][0]["model"])
add_allure_environment_property('Access-Point-Firmware-Version', get_uc_ap_version[0].split("\n")[1])
add_allure_environment_property('Access-Point-Firmware-Version',
fixtures_ver.get_ap_version(get_apnos, get_configuration)[0].split("\n")[1])
add_allure_environment_property('Cloud-Controller-SDK-URL', get_configuration["controller"]["url"])
add_allure_environment_property('AP-Serial-Number', get_configuration["access_point"][0]["serial"])
@pytest.fixture(scope="session")
def fixtures_ver(request, get_configuration):
if request.config.getoption("1.x") is False:
obj = Fixtures_2x(configuration=get_configuration)
if request.config.getoption("1.x"):
obj = Fixtures_1x(configuration=get_configuration)
yield obj

View File

@@ -11,7 +11,7 @@ if "libs" not in sys.path:
sys.path.append(f'../libs')
from controller.controller import ProfileUtility
from controller.ucentral_ctlr import UProfileUtility
from controller.controller_2x.controller import UProfileUtility
import time
from lanforge.lf_tests import RunTest
from lanforge.lf_tools import ChamberView
@@ -50,15 +50,17 @@ def create_lanforge_chamberview_dut(lf_tools):
@pytest.fixture(scope="session")
def setup_vlan():
vlan_id = [100]
allure.attach(body=str(vlan_id), name="VLAN Created: ")
yield vlan_id[0]
@pytest.fixture(scope="class")
def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment_id,
def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment_id, fixtures_ver,
instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools,
get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info):
lf_tools.reset_scenario()
param = dict(request.param)
# VLAN Setup
if request.param["mode"] == "VLAN":
vlan_list = list()
@@ -78,833 +80,19 @@ def setup_profiles(request, setup_controller, testbed, setup_vlan, get_equipment
vlan_list.pop(i)
if request.param["mode"] == "VLAN":
lf_tools.add_vlan(vlan_ids=vlan_list)
if request.config.getoption("1.x"):
instantiate_profile = instantiate_profile(sdk_client=setup_controller)
vlan_id, mode = 0, 0
instantiate_profile.cleanup_objects()
parameter = dict(request.param)
print(parameter)
test_cases = {}
profile_data = {}
if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]:
print("Invalid Mode: ", parameter['mode'])
allure.attach(body=parameter['mode'], name="Invalid Mode: ")
yield test_cases
if parameter['mode'] == "NAT":
mode = "NAT"
vlan_id = 1
if parameter['mode'] == "BRIDGE":
mode = "BRIDGE"
vlan_id = 1
if parameter['mode'] == "VLAN":
mode = "BRIDGE"
vlan_id = setup_vlan
instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Equipment-AP-" + parameter['mode'])
profile_data["equipment_ap"] = {"profile_name": testbed + "-Equipment-AP-" + parameter['mode']}
profile_data["ssid"] = {}
for i in parameter["ssid_modes"]:
profile_data["ssid"][i] = []
for j in range(len(parameter["ssid_modes"][i])):
profile_name = testbed + "-SSID-" + i + "-" + str(j) + "-" + parameter['mode']
data = parameter["ssid_modes"][i][j]
data["profile_name"] = profile_name
if "mode" not in dict(data).keys():
data["mode"] = mode
if "vlan" not in dict(data).keys():
data["vlan"] = vlan_id
instantiate_profile.delete_profile_by_name(profile_name=profile_name)
profile_data["ssid"][i].append(data)
# print(profile_name)
# print(profile_data)
instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + mode)
time.sleep(10)
"""
Setting up rf profile
"""
rf_profile_data = {
"name": "RF-Profile-" + testbed + "-" + parameter['mode'] + "-" +
get_configuration['access_point'][0]['mode']
}
for i in parameter["rf"]:
rf_profile_data[i] = parameter['rf'][i]
# print(rf_profile_data)
try:
instantiate_profile.delete_profile_by_name(profile_name=rf_profile_data['name'])
instantiate_profile.set_rf_profile(profile_data=rf_profile_data,
mode=get_configuration['access_point'][0]['mode'])
allure.attach(body=str(rf_profile_data),
name="RF Profile Created : " + get_configuration['access_point'][0]['mode'])
except Exception as e:
print(e)
allure.attach(body=str(e), name="Exception ")
# Radius Profile Creation
if parameter["radius"]:
radius_info = radius_info
radius_info["name"] = testbed + "-Automation-Radius-Profile-" + mode
instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + mode)
try:
instantiate_profile.create_radius_profile(radius_info=radius_info)
allure.attach(body=str(radius_info),
name="Radius Profile Created")
test_cases['radius_profile'] = True
except Exception as e:
print(e)
test_cases['radius_profile'] = False
# SSID Profile Creation
lf_dut_data = []
for mode in profile_data['ssid']:
if mode == "open":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_open_ssid_profile(profile_data=j)
test_cases["open_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["open_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa_ssid_profile(profile_data=j)
test_cases["wpa_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa2_personal":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa2_personal_ssid_profile(profile_data=j)
test_cases["wpa2_personal_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa2_personal_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa_wpa2_personal_mixed":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa_wpa2_personal_mixed_ssid_profile(
profile_data=j)
test_cases["wpa_wpa2_personal_mixed_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa_wpa2_personal_mixed_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa3_personal":
for j in profile_data["ssid"][mode]:
print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa3_personal_ssid_profile(profile_data=j)
test_cases["wpa3_personal_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_personal_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa3_personal_mixed":
for j in profile_data["ssid"][mode]:
print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa3_personal_mixed_ssid_profile(
profile_data=j)
test_cases["wpa3_personal_mixed_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_personal_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa_enterprise":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa_enterprise_ssid_profile(profile_data=j)
test_cases["wpa_enterprise_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa_enterprise_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa2_enterprise":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa2_enterprise_ssid_profile(profile_data=j)
test_cases["wpa2_enterprise_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa2_enterprise_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa3_enterprise":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa3_enterprise_ssid_profile(profile_data=j)
test_cases["wpa3_enterprise_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_enterprise_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa_wpa2_enterprise_mixed":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa_wpa2_enterprise_mixed_ssid_profile(
profile_data=j)
test_cases["wpa_wpa2_enterprise_mixed_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa_wpa2_enterprise_mixed_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa3_enterprise_mixed":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa3_enterprise_mixed_ssid_profile(
profile_data=j)
test_cases["wpa3_enterprise_mixed_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_enterprise_mixed_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wep":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wep_ssid_profile(profile_data=j)
test_cases["wpa3_enterprise_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_enterprise_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
# Equipment AP Profile Creation
try:
instantiate_profile.set_ap_profile(profile_data=profile_data['equipment_ap'])
test_cases["equipment_ap"] = True
allure.attach(body=str(profile_data['equipment_ap']),
name="Equipment AP Profile Created")
except Exception as e:
print(e)
test_cases["equipment_ap"] = False
allure.attach(body=str(e),
name="Equipment AP Profile Creation Failed")
# Push the Equipment AP Profile to AP
try:
for i in get_equipment_id:
instantiate_profile.push_profile_old_method(equipment_id=i)
except Exception as e:
print(e)
print("failed to create AP Profile")
ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/")
# ssid_names = []
# for i in instantiate_profile.profile_creation_ids["ssid"]:
# ssid_names.append(instantiate_profile.get_ssid_name_by_profile_id(profile_id=i))
# ssid_names.sort()
ssid_names = []
for i in lf_dut_data:
ssid_names.append(i["ssid_name"])
ssid_names.sort()
# This loop will check the VIF Config with cloud profile
vif_config = []
test_cases['vifc'] = False
for i in range(0, 18):
vif_config = list(ap_ssh.get_vif_config_ssids())
vif_config.sort()
print(vif_config)
print(ssid_names)
if ssid_names == vif_config:
test_cases['vifc'] = True
break
time.sleep(10)
allure.attach(
body=str("VIF Config: " + str(vif_config) + "\n" + "SSID Pushed from Controller: " + str(ssid_names)),
name="SSID Profiles in VIF Config and Controller: ")
ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/")
# This loop will check the VIF Config with VIF State
test_cases['vifs'] = False
for i in range(0, 18):
vif_state = list(ap_ssh.get_vif_state_ssids())
vif_state.sort()
vif_config = list(ap_ssh.get_vif_config_ssids())
vif_config.sort()
print(vif_config)
print(vif_state)
if vif_state == vif_config:
test_cases['vifs'] = True
break
time.sleep(10)
allure.attach(body=str("VIF Config: " + str(vif_config) + "\n" + "VIF State: " + str(vif_state)),
name="SSID Profiles in VIF Config and VIF State: ")
ap_logs = ap_ssh.logread()
allure.attach(body=ap_logs, name="AP LOgs: ")
ssid_info = ap_ssh.get_ssid_info()
ssid_data = []
print(ssid_info)
band_mapping = ap_ssh.get_bssid_band_mapping()
print(band_mapping)
idx_mapping = {}
for i in range(0, len(ssid_info)):
if ssid_info[i][1] == "OPEN":
ssid_info[i].append("")
if ssid_info[i][1] == "OPEN":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=OPEN" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "WPA":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "WPA2":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA2" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "WPA3_PERSONAL":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA3" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "WPA | WPA2":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA|WPA2" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "EAP-TTLS":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=EAP-TTLS" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
ssid_data.append(ssid)
lf_tools.dut_idx_mapping = idx_mapping
# Add bssid password and security from iwinfo data
# Format SSID Data in the below format
# ssid_data = [
# ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'],
# ['ssid_idx=1 ssid=Default-SSID-5gl password=12345678 bssid=90:3c:b3:94:48:59']
# ]
allure.attach(name="SSID DATA IN LF DUT", body=str(ssid_data))
lf_tools.update_ssid(ssid_data=ssid_data)
def teardown_session():
print("\nRemoving Profiles")
instantiate_profile.delete_profile_by_name(profile_name=profile_data['equipment_ap']['profile_name'])
instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["ssid"])
instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["radius"])
instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["rf"])
allure.attach(body=str(profile_data['equipment_ap']['profile_name'] + "\n"),
name="Tear Down in Profiles ")
time.sleep(20)
request.addfinalizer(teardown_session)
yield test_cases
else:
instantiate_profile_obj = instantiate_profile(sdk_client=setup_controller)
print(1, instantiate_profile_obj.sdk_client)
vlan_id, mode = 0, 0
parameter = dict(request.param)
print(parameter)
test_cases = {}
profile_data = {}
if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]:
print("Invalid Mode: ", parameter['mode'])
yield test_cases
instantiate_profile_obj.set_radio_config()
if parameter['mode'] == "NAT":
mode = "NAT"
instantiate_profile_obj.set_mode(mode=mode)
vlan_id = 1
if parameter['mode'] == "BRIDGE":
mode = "BRIDGE"
instantiate_profile_obj.set_mode(mode=mode)
vlan_id = 1
if parameter['mode'] == "VLAN":
mode = "VLAN"
instantiate_profile_obj.set_mode(mode=mode)
vlan_id = setup_vlan
profile_data["ssid"] = {}
for i in parameter["ssid_modes"]:
profile_data["ssid"][i] = []
for j in range(len(parameter["ssid_modes"][i])):
data = parameter["ssid_modes"][i][j]
profile_data["ssid"][i].append(data)
lf_dut_data = []
for mode in profile_data['ssid']:
if mode == "open":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'none'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa_2g"] = False
if mode == "wpa":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'psk'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa_2g"] = False
if mode == "wpa2_personal":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'psk2'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
if mode == "wpa_wpa2_personal_mixed":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'psk-mixed'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
if mode == "wpa3_personal":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'sae'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
if mode == "wpa3_personal_mixed":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'sae-mixed'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
# EAP SSID Modes
if mode == "wpa2_enterprise":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'wpa2'
RADIUS_SERVER_DATA = radius_info
RADIUS_ACCOUNTING_DATA = radius_accounting_info
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j, radius=True,
radius_auth_data=RADIUS_SERVER_DATA,
radius_accounting_data=RADIUS_ACCOUNTING_DATA)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/", sdk="2.x")
connected, latest, active = ap_ssh.get_ucentral_status()
if connected == False:
pytest.exit("AP is disconnected from UC Gateway")
instantiate_profile_obj.push_config(serial_number=get_equipment_id[0])
config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"'))
config["uuid"] = 0
ap_config_latest = ap_ssh.get_uc_latest_config()
try:
ap_config_latest["uuid"] = 0
except Exception as e:
print(e)
pass
x = 1
while ap_config_latest != config:
time.sleep(5)
x += 1
ap_config_latest = ap_ssh.get_uc_latest_config()
ap_config_latest["uuid"] = 0
print("latest config: ", ap_config_latest)
print("config: ", config)
if x == 19:
break
if x < 19:
print("Config properly applied into AP", config)
ap_config_latest = ap_ssh.get_uc_latest_config()
ap_config_latest["uuid"] = 0
ap_config_active = ap_ssh.get_uc_active_config()
ap_config_active["uuid"] = 0
x = 1
while ap_config_active != ap_config_latest:
time.sleep(5)
x += 1
ap_config_latest = ap_ssh.get_uc_latest_config()
ap_config_latest["uuid"] = 0
ap_config_active = ap_ssh.get_uc_active_config()
print("latest config: ", ap_config_latest)
print("Active config: ", ap_config_active)
ap_config_active["uuid"] = 0
if x == 19:
break
allure_body = "AP config status: \n" + \
"Active Config: " + str(ap_ssh.get_uc_active_config()) + "\n" \
"Latest Config: ", str(
ap_ssh.get_uc_latest_config()) + "\n" \
"Applied Config: ", str(config)
if x < 19:
print("AP is Broadcasting Applied Config")
allure.attach(name="AP is Broadcasting Applied Config", body="")
allure.attach(name="Config Info", body="Applied Config: " + str(ap_config_active))
allure.attach(name="Config Info", body="AP is Broadc3asting Applied Config: " + str(ap_ssh.get_uc_active_config()))
allure.attach(name="Config Info", body="AP is Broadcasting Applied Config: " + str(allure_body))
else:
print("AP is Not Broadcasting Applied Config")
allure.attach(name="AP is Not Broadcasting Applied Config", body="")
allure.attach(name="Config Info", body="Applied Config: " + str(ap_config_active))
allure.attach(name="Config Info",
body="AP is Broadc3asting Applied Config: " + str(ap_ssh.get_uc_active_config()))
allure.attach(name="Config Info", body="AP is Broadcasting Applied Config: " + str(allure_body))
ap_logs = ap_ssh.logread()
allure.attach(body=ap_logs, name="AP LOgs: ")
# ap_wifi_data = ap_ssh.get_interface_details()
# idx_mapping = {}
# ssid_data = []
# ap_interfaces = list(ap_wifi_data.keys())
# for interface in range(len(ap_interfaces)):
# if ap_wifi_data[ap_interfaces[interface]][1] == "none":
# ssid = ["ssid_idx=" + str(interface) +
# " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
# " security=OPEN" +
# " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
# ap_wifi_data[ap_interfaces[interface]][2],
# ap_wifi_data[ap_interfaces[interface]][1],
# ap_wifi_data[ap_interfaces[interface]][3][1],
# ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# # pass
# if ap_wifi_data[ap_interfaces[interface]][1] == "psk":
# ssid = ["ssid_idx=" + str(interface) +
# " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
# " security=WPA" +
# " password=" + ap_wifi_data[ap_interfaces[interface]][2] +
# " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
# ap_wifi_data[ap_interfaces[interface]][2],
# ap_wifi_data[ap_interfaces[interface]][1],
# ap_wifi_data[ap_interfaces[interface]][3][1],
# ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# # pass
# if ap_wifi_data[ap_interfaces[interface]][1] == "psk-mixed":
# ssid = ["ssid_idx=" + str(interface) +
# " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
# " security=WPA|WPA2" +
# " password=" + ap_wifi_data[ap_interfaces[interface]][2] +
# " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
# ap_wifi_data[ap_interfaces[interface]][2],
# ap_wifi_data[ap_interfaces[interface]][1],
# ap_wifi_data[ap_interfaces[interface]][3][1],
# ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# # pass
# if ap_wifi_data[ap_interfaces[interface]][1] == "psk2":
# ssid = ["ssid_idx=" + str(interface) +
# " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
# " security=WPA2" +
# " password=" + ap_wifi_data[ap_interfaces[interface]][2] +
# " bssid=" + str(ap_wifi_data[ap_interfaces[interface]][3][0]).lower()
# ]
# print(ssid)
# idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
# ap_wifi_data[ap_interfaces[interface]][2],
# ap_wifi_data[ap_interfaces[interface]][1],
# ap_wifi_data[ap_interfaces[interface]][3][1],
# ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# # pass
# if ap_wifi_data[ap_interfaces[interface]][1] == "sae":
# ssid = ["ssid_idx=" + str(interface) +
# " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
# " security=WPA3" +
# " password=" + ap_wifi_data[ap_interfaces[interface]][2] +
# " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
# ap_wifi_data[ap_interfaces[interface]][2],
# ap_wifi_data[ap_interfaces[interface]][1],
# ap_wifi_data[ap_interfaces[interface]][3][1],
# ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# # pass
# if ap_wifi_data[ap_interfaces[interface]][1] == "sae-mixed":
# ssid = ["ssid_idx=" + str(interface) +
# " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
# " security=WPA3" +
# " password=" + ap_wifi_data[ap_interfaces[interface]][2] +
# " bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
# ap_wifi_data[ap_interfaces[interface]][2],
# ap_wifi_data[ap_interfaces[interface]][1],
# ap_wifi_data[ap_interfaces[interface]][3][1],
# ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# # pass
# if ap_wifi_data[ap_interfaces[interface]][1] == "wpa2":
# ssid = ["ssid_idx=" + str(interface) +
# " ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
# " security=EAP-TTLS" +
# " bssid=" + str(ap_wifi_data[ap_interfaces[interface]][3][0]).lower()
# ]
#
# idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
# ap_wifi_data[ap_interfaces[interface]][2],
# ap_wifi_data[ap_interfaces[interface]][1],
# ap_wifi_data[ap_interfaces[interface]][3][1],
# ap_wifi_data[ap_interfaces[interface]][3][0]
# ]
# # pass
# ssid_data.append(ssid)
# lf_tools.ssid_list.append(ap_wifi_data[ap_interfaces[interface]][0])
# lf_tools.dut_idx_mapping = idx_mapping
# print(ssid_data)
# lf_tools.reset_scenario()
# lf_tools.update_ssid(ssid_data=ssid_data)
yield test_cases
# call this, if 1.x
return_1x = fixtures_ver.setup_profiles(request, param, setup_controller, testbed, setup_vlan, get_equipment_id,
instantiate_profile,
get_markers, create_lanforge_chamberview_dut, lf_tools,
get_security_flags, get_configuration, radius_info, get_apnos,
radius_accounting_info)
yield return_1x
@pytest.fixture(scope="session")
def lf_test(get_configuration, setup_influx):
# print(get_configuration)
obj = RunTest(lanforge_data=get_configuration['traffic_generator']['details'], influx_params=setup_influx)
# pytest.exit("")
yield obj

View File

@@ -9,7 +9,7 @@ import allure
import pytest
pytestmark = [pytest.mark.client_connectivity, pytest.mark.bridge, pytest.mark.general, pytest.mark.ucentral,
pytest.mark.sanity] # pytest.mark.usefixtures("setup_test_run")]
pytest.mark.sanity, pytest.mark.uc_sanity] # pytest.mark.usefixtures("setup_test_run")]
setup_params_general = {
"mode": "BRIDGE",
@@ -29,7 +29,6 @@ setup_params_general = {
}
@pytest.mark.uc_sanity
@pytest.mark.suiteA
@pytest.mark.sudo
@allure.feature("BRIDGE MODE CLIENT CONNECTIVITY")

View File

@@ -9,7 +9,7 @@ import allure
import pytest
pytestmark = [pytest.mark.client_connectivity, pytest.mark.nat, pytest.mark.general, pytest.mark.sanity,
pytest.mark.ucentral]
pytest.mark.uc_sanity, pytest.mark.ucentral]
setup_params_general = {
"mode": "NAT",
@@ -29,7 +29,7 @@ setup_params_general = {
}
@pytest.mark.uc_sanity
@pytest.mark.suiteA
@pytest.mark.sanity_ucentral
@allure.feature("NAT MODE CLIENT CONNECTIVITY")

576
tests/fixtures_1x.py Normal file
View File

@@ -0,0 +1,576 @@
import sys
import os
if "libs" not in sys.path:
sys.path.append(f'../libs')
for folder in 'py-json', 'py-scripts':
if folder not in sys.path:
sys.path.append(f'../lanforge/lanforge-scripts/{folder}')
sys.path.append(
os.path.dirname(
os.path.realpath(__file__)
)
)
sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity")
sys.path.append(f'../libs')
sys.path.append(f'../libs/lanforge/')
from LANforge.LFUtils import *
if 'py-json' not in sys.path:
sys.path.append('../py-scripts')
from apnos.apnos import APNOS
from controller.controller import Controller
from controller.controller import FirmwareUtility
import pytest
from cv_test_manager import cv_test
from configuration import CONFIGURATION
from configuration import RADIUS_SERVER_DATA
from configuration import RADIUS_ACCOUNTING_DATA
from configuration import TEST_CASES
from testrails.testrail_api import APIClient
from testrails.reporting import Reporting
from lf_tools import ChamberView
from sta_connect2 import StaConnect2
from os import path
from typing import Any, Callable, Optional
import time
import allure
import pytest
class Fixtures_1x:
def __init__(self, configuration={}):
self.lab_info = configuration
print(self.lab_info)
print("1.X")
try:
self.controller_obj = Controller(controller_data=self.lab_info["controller"])
except Exception as e:
print(e)
allure.attach(body=str(e), name="Controller Instantiation Failed: ")
sdk_client = False
pytest.exit("unable to communicate to Controller" + str(e))
def disconnect(self):
self.controller_obj.disconnect_Controller()
def setup_firmware(self):
pass
def get_ap_version(self, get_apnos, get_configuration):
version_list = []
for access_point_info in get_configuration['access_point']:
ap_ssh = get_apnos(access_point_info)
version = ap_ssh.get_ap_version_ucentral()
version_list.append(version)
return version_list
def setup_profiles(self, request, param, setup_controller, testbed, setup_vlan, get_equipment_id, instantiate_profile,
get_markers, create_lanforge_chamberview_dut, lf_tools,
get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info):
instantiate_profile = instantiate_profile(sdk_client=setup_controller)
vlan_id, mode = 0, 0
instantiate_profile.cleanup_objects()
parameter = dict(param)
print(parameter)
test_cases = {}
profile_data = {}
if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]:
print("Invalid Mode: ", parameter['mode'])
allure.attach(body=parameter['mode'], name="Invalid Mode: ")
return test_cases
if parameter['mode'] == "NAT":
mode = "NAT"
vlan_id = 1
if parameter['mode'] == "BRIDGE":
mode = "BRIDGE"
vlan_id = 1
if parameter['mode'] == "VLAN":
mode = "BRIDGE"
vlan_id = setup_vlan
instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Equipment-AP-" + parameter['mode'])
profile_data["equipment_ap"] = {"profile_name": testbed + "-Equipment-AP-" + parameter['mode']}
profile_data["ssid"] = {}
for i in parameter["ssid_modes"]:
profile_data["ssid"][i] = []
for j in range(len(parameter["ssid_modes"][i])):
profile_name = testbed + "-SSID-" + i + "-" + str(j) + "-" + parameter['mode']
data = parameter["ssid_modes"][i][j]
data["profile_name"] = profile_name
if "mode" not in dict(data).keys():
data["mode"] = mode
if "vlan" not in dict(data).keys():
data["vlan"] = vlan_id
instantiate_profile.delete_profile_by_name(profile_name=profile_name)
profile_data["ssid"][i].append(data)
# print(profile_name)
# print(profile_data)
instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + mode)
time.sleep(10)
"""
Setting up rf profile
"""
rf_profile_data = {
"name": "RF-Profile-" + testbed + "-" + parameter['mode'] + "-" +
get_configuration['access_point'][0]['mode']
}
for i in parameter["rf"]:
rf_profile_data[i] = parameter['rf'][i]
# print(rf_profile_data)
try:
instantiate_profile.delete_profile_by_name(profile_name=rf_profile_data['name'])
instantiate_profile.set_rf_profile(profile_data=rf_profile_data,
mode=get_configuration['access_point'][0]['mode'])
allure.attach(body=str(rf_profile_data),
name="RF Profile Created : " + get_configuration['access_point'][0]['mode'])
except Exception as e:
print(e)
allure.attach(body=str(e), name="Exception ")
# Radius Profile Creation
if parameter["radius"]:
radius_info = radius_info
radius_info["name"] = testbed + "-Automation-Radius-Profile-" + mode
instantiate_profile.delete_profile_by_name(profile_name=testbed + "-Automation-Radius-Profile-" + mode)
try:
instantiate_profile.create_radius_profile(radius_info=radius_info)
allure.attach(body=str(radius_info),
name="Radius Profile Created")
test_cases['radius_profile'] = True
except Exception as e:
print(e)
test_cases['radius_profile'] = False
# SSID Profile Creation
lf_dut_data = []
for mode in profile_data['ssid']:
if mode == "open":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_open_ssid_profile(profile_data=j)
test_cases["open_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["open_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa_ssid_profile(profile_data=j)
test_cases["wpa_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa2_personal":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa2_personal_ssid_profile(profile_data=j)
test_cases["wpa2_personal_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa2_personal_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa_wpa2_personal_mixed":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa_wpa2_personal_mixed_ssid_profile(
profile_data=j)
test_cases["wpa_wpa2_personal_mixed_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa_wpa2_personal_mixed_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa3_personal":
for j in profile_data["ssid"][mode]:
print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa3_personal_ssid_profile(profile_data=j)
test_cases["wpa3_personal_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_personal_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa3_personal_mixed":
for j in profile_data["ssid"][mode]:
print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa3_personal_mixed_ssid_profile(
profile_data=j)
test_cases["wpa3_personal_mixed_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_personal_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa_enterprise":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa_enterprise_ssid_profile(profile_data=j)
test_cases["wpa_enterprise_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa_enterprise_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa2_enterprise":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa2_enterprise_ssid_profile(profile_data=j)
test_cases["wpa2_enterprise_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa2_enterprise_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa3_enterprise":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa3_enterprise_ssid_profile(profile_data=j)
test_cases["wpa3_enterprise_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_enterprise_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa_wpa2_enterprise_mixed":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa_wpa2_enterprise_mixed_ssid_profile(
profile_data=j)
test_cases["wpa_wpa2_enterprise_mixed_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa_wpa2_enterprise_mixed_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wpa3_enterprise_mixed":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wpa3_enterprise_mixed_ssid_profile(
profile_data=j)
test_cases["wpa3_enterprise_mixed_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_enterprise_mixed_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
if mode == "wep":
for j in profile_data["ssid"][mode]:
# print(j)
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
for i in range(len(j["appliedRadios"])):
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GU", "is5GHzU")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5GL", "is5GHzL")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("5G", "is5GHz")
j["appliedRadios"][i] = j["appliedRadios"][i].replace("2G", "is2dot4GHz")
creates_profile = instantiate_profile.create_wep_ssid_profile(profile_data=j)
test_cases["wpa3_enterprise_2g"] = True
allure.attach(body=str(creates_profile),
name="SSID Profile Created")
except Exception as e:
print(e)
test_cases["wpa3_enterprise_2g"] = False
allure.attach(body=str(e),
name="SSID Profile Creation Failed")
# Equipment AP Profile Creation
try:
instantiate_profile.set_ap_profile(profile_data=profile_data['equipment_ap'])
test_cases["equipment_ap"] = True
allure.attach(body=str(profile_data['equipment_ap']),
name="Equipment AP Profile Created")
except Exception as e:
print(e)
test_cases["equipment_ap"] = False
allure.attach(body=str(e),
name="Equipment AP Profile Creation Failed")
# Push the Equipment AP Profile to AP
try:
for i in get_equipment_id:
instantiate_profile.push_profile_old_method(equipment_id=i)
except Exception as e:
print(e)
print("failed to create AP Profile")
ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/")
# ssid_names = []
# for i in instantiate_profile.profile_creation_ids["ssid"]:
# ssid_names.append(instantiate_profile.get_ssid_name_by_profile_id(profile_id=i))
# ssid_names.sort()
ssid_names = []
for i in lf_dut_data:
ssid_names.append(i["ssid_name"])
ssid_names.sort()
# This loop will check the VIF Config with cloud profile
vif_config = []
test_cases['vifc'] = False
for i in range(0, 18):
vif_config = list(ap_ssh.get_vif_config_ssids())
vif_config.sort()
print(vif_config)
print(ssid_names)
if ssid_names == vif_config:
test_cases['vifc'] = True
break
time.sleep(10)
allure.attach(
body=str("VIF Config: " + str(vif_config) + "\n" + "SSID Pushed from Controller: " + str(ssid_names)),
name="SSID Profiles in VIF Config and Controller: ")
ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/")
# This loop will check the VIF Config with VIF State
test_cases['vifs'] = False
for i in range(0, 18):
vif_state = list(ap_ssh.get_vif_state_ssids())
vif_state.sort()
vif_config = list(ap_ssh.get_vif_config_ssids())
vif_config.sort()
print(vif_config)
print(vif_state)
if vif_state == vif_config:
test_cases['vifs'] = True
break
time.sleep(10)
allure.attach(body=str("VIF Config: " + str(vif_config) + "\n" + "VIF State: " + str(vif_state)),
name="SSID Profiles in VIF Config and VIF State: ")
ap_logs = ap_ssh.logread()
allure.attach(body=ap_logs, name="AP LOgs: ")
ssid_info = ap_ssh.get_ssid_info()
ssid_data = []
print(ssid_info)
band_mapping = ap_ssh.get_bssid_band_mapping()
print(band_mapping)
idx_mapping = {}
for i in range(0, len(ssid_info)):
if ssid_info[i][1] == "OPEN":
ssid_info[i].append("")
if ssid_info[i][1] == "OPEN":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=OPEN" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "WPA":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "WPA2":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA2" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "WPA3_PERSONAL":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA3" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "WPA | WPA2":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=WPA|WPA2" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
if ssid_info[i][1] == "EAP-TTLS":
ssid = ["ssid_idx=" + str(i) + " ssid=" + ssid_info[i][3] + " security=EAP-TTLS" +
" password=" + ssid_info[i][2] + " bssid=" + ssid_info[i][0]]
idx_mapping[str(i)] = [ssid_info[i][3], ssid_info[i][2], ssid_info[i][1], band_mapping[ssid_info[i][0]],
ssid_info[i][0]]
ssid_data.append(ssid)
lf_tools.dut_idx_mapping = idx_mapping
# Add bssid password and security from iwinfo data
# Format SSID Data in the below format
# ssid_data = [
# ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'],
# ['ssid_idx=1 ssid=Default-SSID-5gl password=12345678 bssid=90:3c:b3:94:48:59']
# ]
allure.attach(name="SSID DATA IN LF DUT", body=str(ssid_data))
lf_tools.update_ssid(ssid_data=ssid_data)
def teardown_session():
print("\nRemoving Profiles")
instantiate_profile.delete_profile_by_name(profile_name=profile_data['equipment_ap']['profile_name'])
instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["ssid"])
instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["radius"])
instantiate_profile.delete_profile(instantiate_profile.profile_creation_ids["rf"])
allure.attach(body=str(profile_data['equipment_ap']['profile_name'] + "\n"),
name="Tear Down in Profiles ")
time.sleep(20)
request.addfinalizer(teardown_session)
return test_cases

414
tests/fixtures_2x.py Normal file
View File

@@ -0,0 +1,414 @@
""" Python Inbuilt Libraries """
import allure
import pytest
import sys
import os
import json
import time
""" Environment Paths """
if "libs" not in sys.path:
sys.path.append(f'../libs')
for folder in 'py-json', 'py-scripts':
if folder not in sys.path:
sys.path.append(f'../lanforge/lanforge-scripts/{folder}')
sys.path.append(
os.path.dirname(
os.path.realpath(__file__)
)
)
sys.path.append(f"../lanforge/lanforge-scripts/py-scripts/tip-cicd-sanity")
sys.path.append(f'../libs')
sys.path.append(f'../libs/lanforge/')
from LANforge.LFUtils import *
if 'py-json' not in sys.path:
sys.path.append('../py-scripts')
from apnos.apnos import APNOS
from controller.controller_2x.controller import Controller
from configuration import CONFIGURATION
from configuration import RADIUS_SERVER_DATA
from configuration import RADIUS_ACCOUNTING_DATA
class Fixtures_2x:
def __init__(self, configuration={}):
self.lab_info = configuration
print(self.lab_info)
print("2.X")
try:
self.controller_obj = Controller(controller_data=self.lab_info["controller"])
except Exception as e:
print(e)
allure.attach(body=str(e), name="Controller Instantiation Failed: ")
sdk_client = False
pytest.exit("unable to communicate to Controller" + str(e))
def disconnect(self):
self.controller_obj.logout()
def setup_firmware(self):
pass
def get_ap_version(self, get_apnos, get_configuration):
version_list = []
for access_point_info in get_configuration['access_point']:
ap_ssh = get_apnos(access_point_info)
version = ap_ssh.get_ap_version_ucentral()
version_list.append(version)
return version_list
def setup_profiles(self, request, param, setup_controller, testbed, setup_vlan, get_equipment_id,
instantiate_profile, get_markers, create_lanforge_chamberview_dut, lf_tools,
get_security_flags, get_configuration, radius_info, get_apnos, radius_accounting_info):
print("inside conftest_2x")
if not request.config.getoption("1.x"):
instantiate_profile_obj = instantiate_profile(sdk_client=setup_controller)
print("garbage")
print(1, instantiate_profile_obj.sdk_client)
vlan_id, mode = 0, 0
parameter = dict(param)
print("hola", parameter)
test_cases = {}
profile_data = {}
if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]:
print("Invalid Mode: ", parameter['mode'])
return test_cases
instantiate_profile_obj.set_radio_config()
if parameter['mode'] == "NAT":
mode = "NAT"
instantiate_profile_obj.set_mode(mode=mode)
vlan_id = 1
if parameter['mode'] == "BRIDGE":
mode = "BRIDGE"
instantiate_profile_obj.set_mode(mode=mode)
vlan_id = 1
if parameter['mode'] == "VLAN":
mode = "VLAN"
instantiate_profile_obj.set_mode(mode=mode)
vlan_id = setup_vlan
profile_data["ssid"] = {}
for i in parameter["ssid_modes"]:
profile_data["ssid"][i] = []
for j in range(len(parameter["ssid_modes"][i])):
data = parameter["ssid_modes"][i][j]
profile_data["ssid"][i].append(data)
lf_dut_data = []
for mode in profile_data['ssid']:
if mode == "open":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'none'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa_2g"] = False
if mode == "wpa":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'psk'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa_2g"] = False
if mode == "wpa2_personal":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'psk2'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
if mode == "wpa_wpa2_personal_mixed":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'psk-mixed'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
if mode == "wpa3_personal":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'sae'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
if mode == "wpa3_personal_mixed":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'sae-mixed'
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
# EAP SSID Modes
if mode == "wpa2_enterprise":
for j in profile_data["ssid"][mode]:
if mode in get_markers.keys() and get_markers[mode]:
try:
if j["appliedRadios"].__contains__("2G"):
lf_dut_data.append(j)
if j["appliedRadios"].__contains__("5G"):
lf_dut_data.append(j)
j["appliedRadios"] = list(set(j["appliedRadios"]))
j['security'] = 'wpa2'
RADIUS_SERVER_DATA = radius_info
RADIUS_ACCOUNTING_DATA = radius_accounting_info
creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j, radius=True,
radius_auth_data=RADIUS_SERVER_DATA,
radius_accounting_data=RADIUS_ACCOUNTING_DATA)
test_cases["wpa_2g"] = True
except Exception as e:
print(e)
test_cases["wpa2_personal"] = False
ap_ssh = get_apnos(get_configuration['access_point'][0], pwd="../libs/apnos/", sdk="2.x")
connected, latest, active = ap_ssh.get_ucentral_status()
if connected == False:
pytest.exit("AP is disconnected from UC Gateway")
if latest != active:
allure.attach(name="FAIL : ubus call ucentral status: ",
body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(active))
ap_logs = ap_ssh.logread()
allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ")
pytest.fail("AP is disconnected from UC Gateway")
instantiate_profile_obj.push_config(serial_number=get_equipment_id[0])
time_1 = time.time()
config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"'))
config["uuid"] = 0
ap_config_latest = ap_ssh.get_uc_latest_config()
try:
ap_config_latest["uuid"] = 0
except Exception as e:
print(e)
pass
x = 1
old_config = latest
connected, latest, active = ap_ssh.get_ucentral_status()
while old_config == latest:
time.sleep(5)
x += 1
print("old config: " , old_config)
print("latest: " , latest)
connected, latest, active = ap_ssh.get_ucentral_status()
if x == 19:
break
connected, latest, active = ap_ssh.get_ucentral_status()
x = 1
while active != latest:
connected, latest, active = ap_ssh.get_ucentral_status()
time.sleep(10)
x += 1
print("active: ", active)
print("latest: ", latest)
if x == 19:
break
if x < 19:
print("Config properly applied into AP", config)
ap_config_latest = ap_ssh.get_uc_latest_config()
ap_config_latest["uuid"] = 0
ap_config_active = ap_ssh.get_uc_active_config()
ap_config_active["uuid"] = 0
x = 1
while ap_config_active != ap_config_latest:
time.sleep(5)
x += 1
ap_config_latest = ap_ssh.get_uc_latest_config()
ap_config_latest["uuid"] = 0
ap_config_active = ap_ssh.get_uc_active_config()
print("latest config: ", ap_config_latest)
print("Active config: ", ap_config_active)
ap_config_active["uuid"] = 0
if x == 19:
break
allure_body = "AP config status: \n" + \
"Active Config: " + str(ap_ssh.get_uc_active_config()) + "\n" \
"Latest Config: ", str(
ap_ssh.get_uc_latest_config()) + "\n" \
"Applied Config: ", str(config)
if x < 19:
print("AP is Broadcasting Applied Config")
allure.attach(name="AP is Broadcasting Applied Config", body="")
allure.attach(name="Config Info", body="Applied Config: " + str(ap_config_active))
allure.attach(name="Config Info",
body="AP is Broadc3asting Applied Config: " + str(ap_ssh.get_uc_active_config()))
allure.attach(name="Config Info", body="AP is Broadcasting Applied Config: " + str(allure_body))
else:
print("AP is Not Broadcasting Applied Config")
allure.attach(name="AP is Not Broadcasting Applied Config", body="")
allure.attach(name="Config Info", body="Applied Config: " + str(ap_config_active))
allure.attach(name="Config Info",
body="AP is Broadc3asting Applied Config: " + str(ap_ssh.get_uc_active_config()))
allure.attach(name="Config Info", body="AP is Broadcasting Applied Config: " + str(allure_body))
time_2 = time.time()
time_interval = time_2 - time_1
allure.attach(name="Time Took to apply Config: " + str(time_interval), body="")
ap_logs = ap_ssh.logread()
allure.attach(body=ap_logs, name="AP LOgs: ")
ap_wifi_data = ap_ssh.get_interface_details()
idx_mapping = {}
ssid_data = []
ap_interfaces = list(ap_wifi_data.keys())
for interface in range(len(ap_interfaces)):
if ap_wifi_data[ap_interfaces[interface]][1] == "none":
ssid = ["ssid_idx=" + str(interface) +
" ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
" security=OPEN" +
" bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
]
idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
ap_wifi_data[ap_interfaces[interface]][2],
ap_wifi_data[ap_interfaces[interface]][1],
ap_wifi_data[ap_interfaces[interface]][3][1],
ap_wifi_data[ap_interfaces[interface]][3][0]
]
# pass
if ap_wifi_data[ap_interfaces[interface]][1] == "psk":
ssid = ["ssid_idx=" + str(interface) +
" ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
" security=WPA" +
" password=" + ap_wifi_data[ap_interfaces[interface]][2] +
" bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
]
idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
ap_wifi_data[ap_interfaces[interface]][2],
ap_wifi_data[ap_interfaces[interface]][1],
ap_wifi_data[ap_interfaces[interface]][3][1],
ap_wifi_data[ap_interfaces[interface]][3][0]
]
# pass
if ap_wifi_data[ap_interfaces[interface]][1] == "psk-mixed":
ssid = ["ssid_idx=" + str(interface) +
" ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
" security=WPA|WPA2" +
" password=" + ap_wifi_data[ap_interfaces[interface]][2] +
" bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
]
idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
ap_wifi_data[ap_interfaces[interface]][2],
ap_wifi_data[ap_interfaces[interface]][1],
ap_wifi_data[ap_interfaces[interface]][3][1],
ap_wifi_data[ap_interfaces[interface]][3][0]
]
# pass
if ap_wifi_data[ap_interfaces[interface]][1] == "psk2":
ssid = ["ssid_idx=" + str(interface) +
" ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
" security=WPA2" +
" password=" + ap_wifi_data[ap_interfaces[interface]][2] +
" bssid=" + str(ap_wifi_data[ap_interfaces[interface]][3][0]).lower()
]
print(ssid)
idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
ap_wifi_data[ap_interfaces[interface]][2],
ap_wifi_data[ap_interfaces[interface]][1],
ap_wifi_data[ap_interfaces[interface]][3][1],
ap_wifi_data[ap_interfaces[interface]][3][0]
]
# pass
if ap_wifi_data[ap_interfaces[interface]][1] == "sae":
ssid = ["ssid_idx=" + str(interface) +
" ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
" security=WPA3" +
" password=" + ap_wifi_data[ap_interfaces[interface]][2] +
" bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
]
idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
ap_wifi_data[ap_interfaces[interface]][2],
ap_wifi_data[ap_interfaces[interface]][1],
ap_wifi_data[ap_interfaces[interface]][3][1],
ap_wifi_data[ap_interfaces[interface]][3][0]
]
# pass
if ap_wifi_data[ap_interfaces[interface]][1] == "sae-mixed":
ssid = ["ssid_idx=" + str(interface) +
" ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
" security=WPA3" +
" password=" + ap_wifi_data[ap_interfaces[interface]][2] +
" bssid=" + ap_wifi_data[ap_interfaces[interface]][3][0]
]
idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
ap_wifi_data[ap_interfaces[interface]][2],
ap_wifi_data[ap_interfaces[interface]][1],
ap_wifi_data[ap_interfaces[interface]][3][1],
ap_wifi_data[ap_interfaces[interface]][3][0]
]
# pass
if ap_wifi_data[ap_interfaces[interface]][1] == "wpa2":
ssid = ["ssid_idx=" + str(interface) +
" ssid=" + ap_wifi_data[ap_interfaces[interface]][0] +
" security=EAP-TTLS" +
" bssid=" + str(ap_wifi_data[ap_interfaces[interface]][3][0]).lower()
]
idx_mapping[str(interface)] = [ap_wifi_data[ap_interfaces[interface]][0],
ap_wifi_data[ap_interfaces[interface]][2],
ap_wifi_data[ap_interfaces[interface]][1],
ap_wifi_data[ap_interfaces[interface]][3][1],
ap_wifi_data[ap_interfaces[interface]][3][0]
]
# pass
ssid_data.append(ssid)
lf_tools.ssid_list.append(ap_wifi_data[ap_interfaces[interface]][0])
lf_tools.dut_idx_mapping = idx_mapping
print(ssid_data)
lf_tools.reset_scenario()
lf_tools.update_ssid(ssid_data=ssid_data)
return test_cases
else:
return False