Merge branch 'WIFI-1321-create-a-lan-forge-pip-module' of https://github.com/Telecominfraproject/wlan-testing into WIFI-1321-create-a-lan-forge-pip-module

This commit is contained in:
jitendracandela
2022-09-07 09:06:33 +05:30
17 changed files with 1903 additions and 32 deletions

View File

View File

@@ -0,0 +1,81 @@
import logging
import paramiko
from scp import SCPClient
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
class SetupLibrary:
def __init__(self, remote_ip="",
remote_ssh_port=22,
remote_ssh_username="lanforge",
remote_ssh_password="lanforge",
pwd="",
):
self.pwd = pwd
self.remote_ip = remote_ip
self.remote_ssh_username = remote_ssh_username
self.remote_ssh_password = remote_ssh_password
self.remote_ssh_port = remote_ssh_port
def setup_serial_environment(self):
client = self.ssh_cli_connect()
cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"'
stdin, stdout, stderr = client.exec_command(cmd)
output = str(stdout.read())
if output.__contains__("False"):
cmd = 'mkdir ~/cicd-git/'
client.exec_command(cmd)
cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"'
stdin, stdout, stderr = client.exec_command(cmd)
output = str(stdout.read())
if output.__contains__("False"):
logging.info("Copying openwrt_ctl serial control Script...")
with SCPClient(client.get_transport()) as scp:
scp.put(self.pwd + 'openwrt_ctl.py', '~/cicd-git/openwrt_ctl.py') # Copy my_file.txt to the server
cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"'
stdin, stdout, stderr = client.exec_command(cmd)
var = str(stdout.read())
client.close()
def ssh_cli_connect(self):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
logging.info("Trying SSH Connection to: " + str(self.remote_ip) +
" on port: " + str(self.remote_ssh_port) +
" with username: " + str(self.remote_ssh_username) +
" and password: " + str(self.remote_ssh_password))
client.connect(self.remote_ip, username=self.remote_ssh_username, password=self.remote_ssh_password,
port=self.remote_ssh_port, timeout=10, allow_agent=False, banner_timeout=200)
return client
def check_serial_connection(self, tty="/dev/ttyUSB0"):
client = self.ssh_cli_connect()
cmd = 'ls /dev/tty*'
stdin, stdout, stderr = client.exec_command(cmd)
output = str(stdout.read().decode('utf-8'))
client.close()
available_tty_ports = output.split("\n")
if tty in available_tty_ports:
logging.info("Expected Serial Console Port, " + tty + " is available on " + self.remote_ip)
else:
logging.error("Expected Serial Console Port, " + tty + " is not available on " + self.remote_ip)
return tty in available_tty_ports
def kill_all_minicom_process(self, tty="/dev/ttyUSB0"):
client = self.ssh_cli_connect()
stdin, stdout, stderr = client.exec_command("fuser -k " + tty)
# print(stdout.read())
client.close()
if __name__ == '__main__':
obj = SetupLibrary(remote_ip="192.168.52.89",
remote_ssh_port=22,
pwd="")
obj.setup_serial_environment()
obj.check_serial_connection(tty="/dev/ttyUSB0")
obj.kill_all_minicom_process(tty="/dev/ttyUSB0")

View File

176
dut_lib_template/ap_lib.py Normal file
View File

@@ -0,0 +1,176 @@
import importlib
import json
import logging
import allure
import paramiko
import pytest
setup_lib = importlib.import_module("SetupLibrary")
SetupLibrary = setup_lib.SetupLibrary
class APLIBS:
setup_library_objects = list()
device_under_tests_data = None
def __init__(self, dut_data=None):
if dut_data is None:
logging.error("Device Under Test Data is Not Specified, Please provide valid DUT Data")
pytest.exit("Device Under Test Data is Not Specified, Please provide valid DUT Data")
if type(dut_data) is not list:
logging.error("Device Under Test Data is Not in list format, Please provide valid DUT Data in list format")
pytest.exit("Device Under Test Data is Not in list format, Please provide valid DUT Data in list format")
self.device_under_tests_data = dut_data
for dut in self.device_under_tests_data:
self.setup_library_objects.append(SetupLibrary(remote_ip=dut["host_ip"],
remote_ssh_port=dut["host_ssh_port"],
remote_ssh_username=dut["host_username"],
remote_ssh_password=dut["host_password"],
pwd=""))
def ssh_cli_connect(self, ip="",
port=22,
username="",
password="",
timeout=10,
allow_agent=False,
banner_timeout=200):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=username, password=password,
port=port, timeout=timeout, allow_agent=allow_agent, banner_timeout=banner_timeout)
return client
def run_generic_command(self, cmd="", idx=0, print_log=True, attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT):
input_command = cmd
logging.info("Executing Command on AP: " + cmd)
try:
self.setup_library_objects[idx].kill_all_minicom_process(
tty=self.device_under_tests_data[idx]["serial_tty"])
client = self.ssh_cli_connect(ip=self.device_under_tests_data[idx]["host_ip"],
port=self.device_under_tests_data[idx]["host_ssh_port"],
username=self.device_under_tests_data[idx]["host_username"],
password=self.device_under_tests_data[idx]["host_password"],
timeout=10,
allow_agent=False,
banner_timeout=200)
if self.device_under_tests_data[idx]["method"] == "serial":
owrt_args = "--prompt root@" + self.device_under_tests_data[idx][
"identifier"] + " -s serial --log stdout --user root --passwd openwifi"
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {owrt_args} -t {self.device_under_tests_data[idx]['serial_tty']} --action " \
f"cmd --value \"{cmd}\" "
if print_log:
logging.info(cmd)
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read()
final_output = str(output)
if not output.__contains__(b"BOOTLOADER-CONSOLE-IPQ6018#"):
status = output.decode('utf-8').splitlines()
status.pop(0)
final_output = '\n'.join(status)
if print_log:
logging.info(cmd)
logging.info("Output for command: " + input_command + "\n" + final_output)
if attach_allure:
allure.attach(name=input_command, body=output, attachment_type=expected_attachment_type)
client.close()
except Exception as e:
logging.error(e)
final_output = "Error: " + str(e)
return final_output
def example_function(self, idx=0, print_log=True, attach_allure=True):
output = self.run_generic_command(cmd="example_command", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
return output
if __name__ == '__main__':
basic_05 = {
"target": "dut_lib_template",
"controller": {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
"password": "OpenWifi%123"
},
"device_under_tests": [{
"model": "cig_wf188n",
"supported_bands": ["2G", "5G"],
"supported_modes": ["BRIDGE", "NAT", "VLAN"],
"wan_port": "1.1.eth2",
"lan_port": None,
"ssid": {
"2g-ssid": "OpenWifi",
"5g-ssid": "OpenWifi",
"6g-ssid": "OpenWifi",
"2g-password": "OpenWifi",
"5g-password": "OpenWifi",
"6g-password": "OpenWifi",
"2g-encryption": "WPA2",
"5g-encryption": "WPA2",
"6g-encryption": "WPA3",
"2g-bssid": "68:7d:b4:5f:5c:31",
"5g-bssid": "68:7d:b4:5f:5c:3c",
"6g-bssid": "68:7d:b4:5f:5c:38"
},
"mode": "wifi6",
"identifier": "0000c1018812",
"method": "serial",
"host_ip": "localhost",
"host_username": "lanforge",
"host_password": "pumpkin77",
"host_ssh_port": 8842,
"serial_tty": "/dev/ttyAP1",
"firmware_version": "next-latest"
}],
"traffic_generator": {
"name": "lanforge",
"testbed": "basic",
"scenario": "dhcp-bridge",
"details": {
"manager_ip": "localhost",
"http_port": 8840,
"ssh_port": 8841,
"setup": {"method": "build", "DB": "Test_Scenario_Automation"},
"wan_ports": {
"1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
}
}
},
"lan_ports": {
},
"uplink_nat_ports": {
"1.1.eth1": {
"addressing": "static",
"ip": "10.28.2.16",
"gateway_ip": "10.28.2.1/24",
"ip_mask": "255.255.255.0",
"dns_servers": "BLANK"
}
}
}
}
}
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.NOTSET)
obj = APLIBS(dut_data=basic_05["device_under_tests"])
# obj.exit_from_uboot()
# obj.setup_serial_environment()
# obj.verify_certificates()
# obj.get_dut_logs()
# l = obj.get_latest_config_recieved()
# a = obj.get_active_config()
# if a == l:
# print("a = l")
# print(obj.get_ap_version())

View File

@@ -0,0 +1,585 @@
"""
Telecom Infra Project OpenWifi 2.X (Ucentral libraries for Test Automation)
"""
import importlib
import json
import random
import string
import time
import allure
import pytest
import requests
logging = importlib.import_module("logging")
ap_lib = importlib.import_module("ap_lib")
controller = importlib.import_module("controller")
"""
Custom Class Imports needed for OpenWifi 2.X
"""
ConfigureController = controller.ConfigureController
Controller = controller.Controller
FMSUtils = controller.FMSUtils
ProvUtils = controller.ProvUtils
UProfileUtility = controller.UProfileUtility
APLIBS = ap_lib.APLIBS
class dut_lib_template:
"""
Standard OpenWifi wlan-testing specific variables
"""
controller_data = {}
device_under_tests_info = []
"""
OpenWifi 2.x Specific Variables that will be only scoped in dut_lib_template Library
"""
ow_sec_url = ""
ow_sec_login_username = ""
ow_sec_login_password = ""
target = "dut_lib_template"
controller_library_object = object()
prov_library_object = object()
firmware_library_object = object()
dut_library_object = object()
supported_bands = ["2G", "5G", "6G", "5G-lower", "5G-upper"]
supported_modes = ["BRIDGE", "NAT", "VLAN"]
supported_encryption = ["open",
"wpa",
"wpa2_personal",
"wpa3_personal",
"wpa_wpa2_personal_mixed",
"wpa3_personal_mixed",
"wpa_enterprise",
"wpa2_enterprise",
"wpa3_enterprise",
"wpa_wpa2_enterprise_mixed",
"wpa3_enterprise_mixed",
"wpa3_enterprise_192"
]
tip_2x_specific_encryption_translation = {"open": "none",
"wpa": "psk",
"wpa2_personal": "psk2",
"wpa3_personal": "sae",
"wpa3_personal_mixed": "sae-mixed",
"wpa_wpa2_personal_mixed": "psk-mixed",
"wpa_enterprise": "wpa",
"wpa2_enterprise": "wpa2",
"wpa3_enterprise": "wpa3",
"wpa_wpa2_enterprise_mixed": "wpa-mixed",
"wpa3_enterprise_mixed": "wpa3-mixed",
"wpa3_enterprise_192": "wpa3-192"
}
def __init__(self, controller_data=None, target=None,
device_under_tests_info=[], logging_level=logging.INFO):
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging_level)
if target != self.target:
logging.error("Target version is : " + target + " Expected target is dut_lib_template")
pytest.exit("Target should be 'dut_lib_template', Current Target is : " + target)
if controller_data is None:
controller_data = {}
self.controller_data = controller_data
self.device_under_tests_info = device_under_tests_info
self.setup_metadata()
self.setup_objects()
self.setup_environment_properties()
"""
Controller and Access Point specific metadata that is related to OpenWifi 2.x
"""
def setup_metadata(self):
logging.info(
"Setting up the Controller metadata for dut_lib_template Library: " + str(
json.dumps(self.controller_data, indent=2)))
logging.info("Setting up the DUT metadata for dut_lib_template Library: " + str(
json.dumps(self.device_under_tests_info, indent=2)))
logging.info("Number of DUT's in lab_info.json: " + str(len(self.device_under_tests_info)))
self.ow_sec_url = self.controller_data["url"]
self.ow_sec_login_username = self.controller_data["username"]
self.ow_sec_login_password = self.controller_data["password"]
def setup_objects(self):
try:
self.controller_library_object = Controller(controller_data=self.controller_data)
self.prov_library_object = ProvUtils(sdk_client=self.controller_library_object)
self.firmware_library_object = FMSUtils(sdk_client=self.controller_library_object)
except Exception as e:
pytest.fail("Unable to setup Controller Objects")
logging.error("Exception in setting up Controller objects:" + str(e))
try:
self.dut_library_object = APLIBS(dut_data=self.device_under_tests_info)
except Exception as e:
logging.error("Exception in setting up Access Point Library object:" + str(e))
pytest.fail("Unable to setup AP Objects")
def teardown_objects(self):
self.controller_library_object.logout()
""" Standard getter methods. Should be available for all type of libraries. Commonly used by wlan-testing"""
def get_dut_library_object(self):
return self.dut_library_object
def get_controller_library_object(self):
return self.controller_library_object
def get_controller_data(self):
return self.controller_data
def get_device_under_tests_info(self):
return self.device_under_tests_info
def get_number_of_dut(self):
return len(self.device_under_tests_info)
def get_dut_logs(self, dut_idx=0):
return self.dut_library_object.get_logs(idx=0)
def get_controller_logs(self):
pass
def get_dut_max_clients(self):
pass
def setup_configuration_data(self, configuration=None,
requested_combination=None):
"""Predefined function for getting configuration data for applied data"""
c_data = configuration.copy()
if c_data is None:
pytest.exit("No Configuration Received")
if requested_combination is None:
pytest.exit("No requested_combination Received")
rf_data = None
if c_data.keys().__contains__("rf"):
rf_data = c_data["rf"]
# base_band_keys = ["2G", "5G", "6G", "5G-lower", "5G-upper"]
base_dict = dict.fromkeys(self.supported_bands)
for i in base_dict:
base_dict[i] = []
for i in requested_combination:
if i[0] in self.supported_bands:
base_dict[i[0]].append(self.tip_2x_specific_encryption_translation[i[1]])
if i[1] in self.supported_bands:
base_dict[i[1]].append((self.tip_2x_specific_encryption_translation[i[0]]))
temp = []
for i in list(base_dict.values()):
for j in i:
temp.append(j)
temp_conf = c_data["ssid_modes"].copy()
for i in temp_conf:
if self.tip_2x_specific_encryption_translation[i] not in temp:
c_data["ssid_modes"].pop(i)
temp_conf = c_data["ssid_modes"].copy()
print(self.tip_2x_specific_encryption_translation)
for i in temp_conf:
for j in range(len(temp_conf[i])):
for k in temp_conf[i][j]["appliedRadios"]:
if self.tip_2x_specific_encryption_translation[i] not in base_dict[k]:
c_data["ssid_modes"][i][j]["appliedRadios"].remove(k)
if c_data["ssid_modes"][i][j]["appliedRadios"] == []:
c_data["ssid_modes"][i][j] = {} # .popi.popitem()) # .popitem()
for i in c_data["ssid_modes"]:
c_data["ssid_modes"][i] = [x for x in c_data["ssid_modes"][i] if x != {}]
for ssids in c_data["ssid_modes"]:
for i in c_data["ssid_modes"][ssids]:
if i is not {}:
i["security"] = self.tip_2x_specific_encryption_translation[ssids]
temp_conf = c_data.copy()
for i in range(0, len(self.device_under_tests_info)):
if c_data["mode"] not in self.device_under_tests_info[i]["supported_modes"]:
pytest.skip(c_data["mode"] + " is not Supported by DUT")
for enc in c_data["ssid_modes"]:
for idx in c_data["ssid_modes"][enc]:
check = all(
item in self.device_under_tests_info[i]["supported_bands"] for item in idx["appliedRadios"])
if not check:
temp_conf["ssid_modes"][enc].remove(idx)
for key in c_data["rf"]:
if key not in self.device_under_tests_info[i]["supported_bands"]:
print(key)
temp_conf["rf"][key] = None
return temp_conf
"""
setup_basic_configuration - Method to configure AP in basic operating modes with multiple SSID's and multiple AP's
This covers, basic and advanced test cases
"""
def setup_basic_configuration(self, configuration=None,
requested_combination=None):
f_conf = self.setup_configuration_data(configuration=configuration,
requested_combination=requested_combination)
logging.info("Selected Configuration: " + str(json.dumps(f_conf, indent=2)))
final_configuration = f_conf.copy()
# Setup Mode
# Setup Radio Scenario
# Setup Config Apply on all AP's
ret_val = dict()
for i in range(0, len(self.device_under_tests_info)):
self.pre_apply_check(idx=i) # Do check AP before pushing the configuration
self.post_apply_check(idx=i) # Do check AP after pushing the configuration
ret_val[self.device_under_tests_info[i]["identifier"]] = {}
temp_data = ret_val.copy()
for dut in temp_data:
ret_val[dut] = dict.fromkeys(["ssid_data", "radio_data"])
ret_val[dut]["radio_data"] = temp_data[dut][-1]
temp_data[dut].pop(-1)
n = len(temp_data[dut])
lst = list(range(0, n))
ret_val[dut]["ssid_data"] = dict.fromkeys(lst)
for j in ret_val[dut]["ssid_data"]:
a = temp_data[dut][j].copy()
a = dict.fromkeys(["ssid", "encryption", "password", "band", "bssid"])
a["ssid"] = temp_data[dut][j][0]
a["encryption"] = temp_data[dut][j][1]
a["password"] = temp_data[dut][j][2]
a["band"] = temp_data[dut][j][3]
a["bssid"] = temp_data[dut][j][4]
ret_val[dut]["ssid_data"][j] = a
temp = ret_val[dut]["radio_data"].copy()
for j in temp:
a = dict.fromkeys(["channel", "bandwidth", "frequency"])
if temp[j] != None:
a["channel"] = temp[j][0]
a["bandwidth"] = temp[j][1]
a["frequency"] = temp[j][2]
ret_val[dut]["radio_data"][j] = a
return ret_val
"""
setup_special_configuration - Method to configure APs in mesh operating modes with multiple SSID's and multiple AP's
This covers, mesh and other roaming scenarios which includes any special type of modes
multiple AP's with WDS and Wifi Steering scenarios are also covered here
"""
def setup_special_configuration(self, configuration=None,
requested_combination=None):
final_configuration = self.setup_configuration_data(configuration=configuration,
requested_combination=requested_combination)
logging.info("Selected Configuration: " + str(json.dumps(final_configuration, indent=2)))
profile_object = UProfileUtility(sdk_client=self.controller_library_object)
if final_configuration["mode"] in self.supported_modes:
profile_object.set_mode(mode=final_configuration["mode"])
else:
pytest.skip(final_configuration["mode"] + " Mode is not supported")
# Setup Radio Scenario
if final_configuration["rf"] != {}:
profile_object.set_radio_config(radio_config=final_configuration["rf"])
else:
profile_object.set_radio_config()
for ssid in final_configuration["ssid_modes"]:
for ssid_data in final_configuration["ssid_modes"][ssid]:
profile_object.add_ssid(ssid_data=ssid_data)
logging.info(
"Configuration That is getting pushed: " + json.dumps(profile_object.base_profile_config, indent=2))
r_val = False
# Do check AP before pushing the configuration
# TODO
self.dut_library_object.check_serial_connection()
"""
serial connection check
ubus call ucentral status
save the current uuid
uci show ucentral
ifconfig
wifi status
start logger to collect ap logs before config apply
Timestamp before doing config apply
"""
for dut in self.device_under_tests_info:
resp = profile_object.push_config(serial_number=dut["identifier"])
logging.info("Response for Config apply: " + str(resp.status_code))
if resp.status_code != 200:
logging.info("Failed to apply Configuration to AP. Response Code" +
str(resp.status_code) +
"Retrying in 5 Seconds... ")
time.sleep(5)
resp = profile_object.push_config(serial_number=dut["identifier"])
if resp.status_code != 200:
logging.info("Failed to apply Config, Response code:" + str(resp.status_code))
pytest.fail("Failed to apply Config, Response code :" + str(resp.status_code))
if resp.status_code == 200:
r_val = True
# TODO
"""
serial connection check
ubus call ucentral status
save the current uuid and compare with the one before config apply
save the active config and compare with the latest apply
uci show
ifconfig
iwinfo
wifi status
start logger to collect ap logs before config apply
Timestamp after doing config apply
"""
return r_val
def get_dut_channel_data(self, idx):
try:
d = self.dut_library_object.run_generic_command(cmd="iw dev | grep channel", idx=idx)
d = d.replace("\n", "").replace("\t", "").replace(" ", "").split("channel")
d.pop(0)
d = list(set(d))
data = dict.fromkeys(["2G", "5G", "6G"])
for i in d:
channel = int(i.split("(")[0])
bandwidth = int(i.split(":")[1].split("MHz")[0])
center_freq = int(i.split(":")[-1].replace("MHz", ""))
if 2401 < center_freq < 2495:
data["2G"] = [channel, bandwidth, center_freq]
elif center_freq in [5955, 5975, 5995] and channel <= 9:
data["6G"] = [channel, bandwidth, center_freq]
elif 5030 < center_freq < 5990:
data["5G"] = [channel, bandwidth, center_freq]
elif 5995 < center_freq < 7125:
data["6G"] = [channel, bandwidth, center_freq]
else:
pass
except Exception as e:
logging.error("Exception in getting DUT Channel and bw data, Retrying again!")
try:
d = self.dut_library_object.run_generic_command(cmd="iw dev | grep channel", idx=idx)
d = d.replace("\n", "").replace("\t", "").replace(" ", "").split("channel")
d.pop(0)
data = dict.fromkeys(["2G", "5G", "6G"])
for i in d:
channel = int(i.split("(")[0])
bandwidth = int(i.split(":")[1].split("MHz")[0])
center_freq = int(i.split(":")[-1].replace("MHz", ""))
if 2401 < center_freq < 2495:
data["2G"] = [channel, bandwidth, center_freq]
elif center_freq in [5955, 5975, 5995] and channel <= 9:
data["6G"] = [channel, bandwidth, center_freq]
elif 5030 < center_freq < 5990:
data["5G"] = [channel, bandwidth, center_freq]
elif 5995 < center_freq < 7125:
data["6G"] = [channel, bandwidth, center_freq]
else:
pass
except Exception as e:
logging.error("Exception in getting DUT Channel and bw data.")
return data
def get_applied_ssid_info(self, profile_object=None, idx=0):
if profile_object is None:
logging.error("Profile object is None, Unable to fetch ssid info from AP")
return None
ssid_info_sdk = profile_object.get_ssid_info()
ap_wifi_data = self.dut_library_object.get_iwinfo(idx=idx)
channel_info = self.get_dut_channel_data(idx=idx)
o = ap_wifi_data.split()
iwinfo_bssid_data = {}
for i in range(len(o)):
if o[i].__contains__("ESSID"):
if o[i + 9].__contains__("2.4"):
band = "2G"
else:
band = "5G"
iwinfo_bssid_data[o[i - 1]] = [o[i + 1].replace('"', ''), o[i + 4], band]
for p in iwinfo_bssid_data:
for q in ssid_info_sdk:
if iwinfo_bssid_data[p][0] == q[0] and iwinfo_bssid_data[p][2] == q[3]:
q.append(iwinfo_bssid_data[p][1])
ssid_info_sdk.append(channel_info)
return ssid_info_sdk
def get_dut_version(self):
"""
get_dut_version
write your code to get ap version in
self.dut_library_object.get_ap_version()
"""
version_info = []
for ap in range(len(self.device_under_tests_info)):
version_info.append(self.dut_library_object.get_ap_version(idx=ap, print_log=True))
return version_info
def get_controller_version(self):
version_info = dict()
# version_info["ow_fms"] = self.controller_library_object.get_sdk_version_fms()
# version_info["ow_gw"] = self.controller_library_object.get_sdk_version_gw()
# version_info["ow_sec"] = self.controller_library_object.get_sdk_version_sec()
# version_info["ow_prov"] = self.controller_library_object.get_sdk_version_prov()
# version_info["ow_rrm"] = self.controller_library_object.get_sdk_version_owrrm()
# version_info["ow_analytics"] = self.controller_library_object.get_sdk_version_ow_analytics()
# version_info["ow_sub"] = self.controller_library_object.get_sdk_version_owsub()
return version_info
def pre_apply_check(self, idx=0):
"""
Write your code that you need to use
"""
check = "PASSED"
if check is "FAILED":
pytest.fail("Write the message here for why precheck is failed")
def post_apply_check(self, idx=0):
check = "PASSED"
if check is "FAILED":
pytest.fail("Write the message here for why post apply is failed")
def setup_environment_properties(self, add_allure_environment_property=None):
if add_allure_environment_property is None:
return
add_allure_environment_property('Cloud-Controller-SDK-URL', self.controller_data.get("url"))
sdk_version_data = self.get_controller_version()
for microservice in sdk_version_data:
add_allure_environment_property(microservice + '-version',
str(sdk_version_data.get(microservice)))
dut_versions = self.get_dut_version()
for i in range(len(self.device_under_tests_info)):
add_allure_environment_property("Firmware-Version_" + self.device_under_tests_info[i]["identifier"],
str(dut_versions[i]))
for dut in self.device_under_tests_info:
models = []
identifiers = []
models.append(dut["model"])
identifiers.append(dut["identifier"])
add_allure_environment_property('DUT-Model/s', ", ".join(models))
add_allure_environment_property('Serial-Number/s', ", ".join(identifiers))
def setup_firmware(self):
# Query AP Firmware
upgrade_status = []
return upgrade_status
def simulate_radar(self, idx=0):
"""Simulate radar command for DFS"""
ret = self.dut_library_object.dfs(idx=idx)
return ret
def get_dfs_logs(self, idx=0):
"""Get the ap logs after Simulate radar command"""
logs = self.dut_library_object.dfs_logread(idx=idx)
return logs
def reboot(self, idx=0):
"""Reboot the AP"""
ret = self.dut_library_object.reboot(idx=idx)
return ret
if __name__ == '__main__':
basic_05 = {
"target": "dut_lib_template",
"controller": {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
"password": "OpenWifi%123"
},
"device_under_tests": [{
"model": "cig_wf188n",
"supported_bands": ["2G", "5G", "6G"],
"supported_modes": ["BRIDGE", "NAT", "VLAN"],
"wan_port": "1.1.eth2",
"ssid": {
"mode": "BRIDGE",
"2g-ssid": "OpenWifi",
"5g-ssid": "OpenWifi",
"6g-ssid": "OpenWifi",
"2g-password": "OpenWifi",
"5g-password": "OpenWifi",
"6g-password": "OpenWifi",
"2g-encryption": "WPA2",
"5g-encryption": "WPA2",
"6g-encryption": "WPA3",
"2g-bssid": "68:7d:b4:5f:5c:31",
"5g-bssid": "68:7d:b4:5f:5c:3c",
"6g-bssid": "68:7d:b4:5f:5c:38"
},
"mode": "wifi6",
"identifier": "0000c1018812",
"method": "serial",
"host_ip": "10.28.3.103",
"host_username": "lanforge",
"host_password": "pumpkin77",
"host_ssh_port": 22,
"serial_tty": "/dev/ttyAP1",
"firmware_version": "next-latest"
}],
"traffic_generator": {
"name": "lanforge",
"testbed": "basic",
"scenario": "dhcp-bridge",
"details": {
"manager_ip": "10.28.3.28",
"http_port": 8080,
"ssh_port": 22,
"setup": {"method": "build", "DB": "Test_Scenario_Automation"},
"wan_ports": {
"1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
}
}
},
"lan_ports": {
"1.1.eth1": {"addressing": "dynamic"}
},
"uplink_nat_ports": {
"1.1.eth1": {"addressing": "static",
"subnet": "10.28.2.16/24",
"gateway_ip": "10.28.2.1",
"ip_mask": "255.255.255.0"
}
}
}
}
}
var = dut_lib_template(controller_data=basic_05["controller"],
device_under_tests_info=basic_05["device_under_tests"],
target=basic_05["target"])
# var.setup_objects()
setup_params_general_two = {
"mode": "BRIDGE",
"ssid_modes": {
"wpa3_personal": [
{"ssid_name": "ssid_wpa3_p_2g_br", "appliedRadios": ["2G"], "security_key": "something"},
{"ssid_name": "ssid_wpa3_p_5g_br", "appliedRadios": ["5G"], "security_key": "something"},
{"ssid_name": "ssid_wpa3_p_6g_br", "appliedRadios": ["6G"], "security_key": "something"}],
"wpa3_personal_mixed": [
{"ssid_name": "ssid_wpa3_p_m_2g_br", "appliedRadios": ["2G"], "security_key": "something"},
{"ssid_name": "ssid_wpa3_p_m_5g_br", "appliedRadios": ["5G"], "security_key": "something"}],
"wpa_wpa2_personal_mixed": [
{"ssid_name": "ssid_wpa_wpa2_p_m_2g_br", "appliedRadios": ["2G"], "security_key": "something"},
{"ssid_name": "ssid_wpa_wpa2_p_m_5g_br", "appliedRadios": ["5G"], "security_key": "something"}]
},
"rf": {},
"radius": False
}
x = setup_params_general_two.copy()
target = [['6G', 'wpa3_personal']]
d = var.setup_configuration_data(configuration=setup_params_general_two, requested_combination=target)
# d = var.setup_basic_configuration(configuration=setup_params_general_two, requested_combination=target)
print(x)
# var.setup_firmware()
# var.teardown_objects()

388
dut_lib_template/openwrt_ctl.py Executable file
View File

@@ -0,0 +1,388 @@
#!/usr/bin/python3
'''
make sure pexpect is installed:
$ sudo yum install python3-pexpect
You might need to install pexpect-serial using pip:
$ pip3 install serial
$ pip3 install pexpect-serial
./openwrt_ctl.py -l stdout -u root -p TIP -s serial --tty ttyUSB0
# Set up reverse ssh tunnel
./openwrt_ctl.py --tty /dev/ttyAP1 --action ssh-tunnel \
--value "ssh -y -y -f -N -T -M -R 9999:localhost:22 lanforge@10.28.3.100" \
--value2 password-for-10.28.3.100 --log stdout --scheme serial --prompt root@Open
'''
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit()
try:
import importlib
re = importlib.import_module("re")
logging = importlib.import_module("logging")
time = importlib.import_module("time")
sleep = time.sleep
pexpect = importlib.import_module("pexpect")
serial = importlib.import_module("serial")
pexpect_serial = importlib.import_module("pexpect_serial")
SerialSpawn = pexpect_serial.SerialSpawn
pprint = importlib.import_module("pprint")
telnetlib = importlib.import_module("telnetlib")
argparse = importlib.import_module("argparse")
except ImportError as e:
logging.error(e)
sys.exit("Python Import Error: " + str(e))
default_host = "localhost"
default_ports = {
"serial": None,
"ssh": 22,
"telnet": 23
}
NL = "\n"
CR = "\r\n"
Q = '"'
A = "'"
FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s'
prompt = "root@OpenWrt:"
def usage():
print("$0 used connect to OpenWrt AP or similar Linux machine:")
print("-d|--dest IP address of the OpenWrt AP, for ssh/telnet scheme")
print("-o|--port IP port of the OpenWrt AP, for ssh/telnet scheme")
print("-t|--tty Serial port, if using serial scheme")
print("-u|--user login name")
print("-p|--pass password")
print("--prompt Prompt to look for when commands are done (default: root@OpenWrt)")
print("-s|--scheme (serial|telnet|ssh): connect via serial, ssh or telnet")
print("-l|--log file log messages here")
print("--action (logread | journalctl | lurk | sysupgrade | download | upload | reboot | cmd | ssh-tunnel")
print("--value (option to help complete the action")
print("--value2 (option to help complete the action, dest filename for download, passwd for ssh-tunnel")
print("-h|--help")
# see https://stackoverflow.com/a/13306095/11014343
class FileAdapter(object):
def __init__(self, logger):
self.logger = logger
def write(self, data):
# NOTE: data can be a partial line, multiple lines
data = data.strip() # ignore leading/trailing whitespace
if data: # non-blank
self.logger.info(data)
def flush(self):
pass # leave it to logging to flush properly
def main():
global prompt
parser = argparse.ArgumentParser(description="OpenWrt AP Control Script")
parser.add_argument("-d", "--dest", type=str, help="address of the cisco controller_tests")
parser.add_argument("-o", "--port", type=int, help="control port on the controller_tests")
parser.add_argument("-u", "--user", type=str, help="credential login/username")
parser.add_argument("-p", "--passwd", type=str, help="credential password")
parser.add_argument("-P", "--prompt", type=str, help="Prompt to look for")
parser.add_argument("-s", "--scheme", type=str, choices=["serial", "ssh", "telnet"],
help="Connect via serial, ssh or telnet")
parser.add_argument("-t", "--tty", type=str, help="tty serial device")
parser.add_argument("-l", "--log", type=str, help="logfile for messages, stdout means output to console")
parser.add_argument("--action", type=str, help="perform action",
choices=["logread", "journalctl", "lurk", "sysupgrade", "sysupgrade-n", "download", "upload",
"reboot", "cmd", "ssh-tunnel"])
parser.add_argument("--value", type=str, help="set value")
parser.add_argument("--value2", type=str, help="set value2")
tty = None
args = None
try:
args = parser.parse_args()
host = args.dest
scheme = args.scheme
port = args.port
# port = (default_ports[scheme], args.port)[args.port != None]
user = args.user
passwd = args.passwd
logfile = args.log
tty = args.tty;
if (args.prompt != None):
prompt = args.prompt
filehandler = None
except Exception as e:
logging.exception(e);
usage()
exit(2);
console_handler = logging.StreamHandler()
formatter = logging.Formatter(FORMAT)
logg = logging.getLogger(__name__)
logg.setLevel(logging.DEBUG)
file_handler = None
if (logfile is not None):
if (logfile != "stdout"):
file_handler = logging.FileHandler(logfile, "w")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logg.addHandler(file_handler)
logging.basicConfig(format=FORMAT, handlers=[file_handler])
else:
# stdout logging
logging.basicConfig(format=FORMAT, handlers=[console_handler])
CCPROMPT = prompt
ser = None
egg = None # think "eggpect"
try:
if (scheme == "serial"):
# eggspect = pexpect.fdpexpect.fdspan(telcon, logfile=sys.stdout.buffer)
ser = serial.Serial(tty, 115200, timeout=5)
egg = SerialSpawn(ser);
egg.logfile = FileAdapter(logg)
egg.sendline(NL)
has_reset = False
if args.value == "reset-from-console":
has_reset = True
try:
logg.info("prompt: %s user: %s passwd: %s" % (prompt, user, passwd))
while True:
i = egg.expect([prompt, "Please press Enter to activate", "login:", "Password:", "IPQ6018#"],
timeout=3)
logg.info("expect-0: %i" % (i))
if (i == 0):
logg.info("Found prompt, login complete.")
break
if (i == 1):
logg.info("Sending newline")
egg.setdline(NL)
if (i == 2):
logg.info("Sending username: %s" % (user))
egg.sendline(user)
if (i == 3):
logg.info("Sending password: %s" % (passwd))
egg.sendline(passwd)
if (i == 4): # in bootloader
if has_reset:
logg.info("In boot loader, will reset and sleep 30 seconds")
egg.sendline("reset")
time.sleep(30)
egg.sendline(NL)
sys.exit(1)
else:
print("BOOTLOADER-CONSOLE-IPQ6018#")
break
else:
logg.info("Found prompt, login complete.")
break
except Exception as e:
# maybe something like 'logread -f' is running?
# send ctrl-c
egg.send(chr(3))
elif (scheme == "ssh"):
# Not implemented/tested currently. --Ben
if (port is None):
port = 22
cmd = "ssh -p%d %s@%s" % (port, user, host)
logg.info("Spawn: " + cmd + NL)
egg = pexpect.spawn(cmd)
# egg.logfile_read = sys.stdout.buffer
egg.logfile = FileAdapter(logg)
i = egg.expect(["password:", "continue connecting (yes/no)?"], timeout=3)
time.sleep(0.1)
if i == 1:
egg.sendline('yes')
egg.expect('password:')
sleep(0.1)
egg.sendline(passwd)
elif (scheme == "telnet"):
# Not implemented/tested currently. --Ben
if (port is None):
port = 23
cmd = "telnet %s %d" % (host, port)
logg.info("Spawn: " + cmd + NL)
egg = pexpect.spawn(cmd)
egg.logfile = FileAdapter(logg)
time.sleep(0.1)
egg.sendline(' ')
egg.expect('User\:')
egg.sendline(user)
egg.expect('Password\:')
egg.sendline(passwd)
egg.sendline('config paging disable')
else:
usage()
exit(1)
except Exception as e:
logging.exception(e);
command = None
CLOSEDBYREMOTE = "closed by remote host."
CLOSEDCX = "Connection to .* closed."
try:
egg.expect(CCPROMPT)
except Exception as e:
egg.sendline(NL)
TO = 10
wait_forever = False
# Clean pending output
egg.sendline("echo __hello__")
egg.expect("__hello__")
egg.expect(CCPROMPT)
logg.info("Action[%s] Value[%s] Value2[%s]" % (args.action, args.value, args.value2))
if (args.action == "reboot"):
command = "reboot"
TO = 60
if (args.action == "cmd"):
if (args.value is None):
raise Exception("cmd requires value to be set.")
command = "%s" % (args.value)
if (args.action == "logread"):
command = "logread -f"
TO = 1
wait_forever = True
if (args.action == "journalctl"):
command = "journalctl -f"
TO = 1
wait_forever = True
if (args.action == "lurk"):
command = "date"
TO = 1
wait_forever = True
if (args.action == "ssh-tunnel"):
command = "%s" % (args.value)
passwd = "%s" % (args.value2)
logg.info("Command[%s]" % command)
egg.sendline(command);
i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5)
if i == 1:
egg.sendline("y")
egg.expect("password:", timeout=5)
egg.sendline(passwd)
egg.expect(CCPROMPT, timeout=20)
return
if ((args.action == "sysupgrade") or (args.action == "sysupgrade-n")):
command = "scp %s /tmp/new_img.bin" % (args.value)
logg.info("Command[%s]" % command)
egg.sendline(command);
i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5)
if i == 1:
egg.sendline("y")
egg.expect("password:", timeout=5)
egg.sendline("lanforge")
egg.expect(CCPROMPT, timeout=20)
if (args.action == "sysupgrade-n"):
egg.sendline("sysupgrade -n /tmp/new_img.bin")
else:
egg.sendline("sysupgrade /tmp/new_img.bin")
egg.expect("link becomes ready", timeout=100)
return
if (args.action == "download"):
command = "scp %s /tmp/%s" % (args.value, args.value2)
logg.info("Command[%s]" % command)
egg.sendline(command);
i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5)
if i == 2:
print("Network unreachable, wait 15 seconds and try again.")
time.sleep(15)
command = "scp %s /tmp/%s" % (args.value, args.value2)
logg.info("Command[%s]" % command)
egg.sendline(command);
i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5)
if i == 2:
print("ERROR: Could not connect to LANforge to get download file")
exit(2)
if i == 1:
egg.sendline("y")
egg.expect("password:", timeout=5)
egg.sendline("lanforge")
egg.expect(CCPROMPT, timeout=20)
return
if (args.action == "upload"):
command = "scp %s %s" % (args.value, args.value2)
logg.info("Command[%s]" % command)
egg.sendline(command);
i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5)
if i == 2:
print("Network unreachable, wait 15 seconds and try again.")
time.sleep(15)
command = "scp /tmp/%s %s" % (args.value, args.value2)
logg.info("Command[%s]" % command)
egg.sendline(command);
i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5)
if i == 2:
print("ERROR: Could not connect to LANforge to put upload file")
exit(2)
if i == 1:
egg.sendline("y")
egg.expect("password:", timeout=5)
egg.sendline("lanforge")
egg.expect(CCPROMPT, timeout=20)
return
if (command is None):
logg.info("No command specified, going to log out.")
else:
logg.info("Command[%s]" % command)
egg.sendline(command);
while True:
try:
i = egg.expect([CCPROMPT, "kmodloader: done loading kernel", "\n"], timeout=TO)
print(egg.before.decode('utf-8', 'ignore'))
if i == 1:
egg.sendline(' ')
egg.expect(CCPROMPT, timeout=20)
print(egg.before.decode('utf-8', 'ignore'))
if i == 2: # new line of text, just print and continue
continue
# wait_forever = False
if not wait_forever:
break
except Exception as e:
# Some commands take a long time (logread -f)
if not wait_forever:
logging.exception(e)
break
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
if __name__ == '__main__':
main()
####
####
####

View File

@@ -0,0 +1,603 @@
"""
This class consists of functions which checks the schema of the configuration for lab
Whether the schema contains all the necessary key-value pairs or not
If not it will inform the required key-value pair
End of the program called every function that checks all the schema data
"""
import logging
import re
class SchemaCheck:
"""
Global variables are declared for easy modification and checks of some data
"""
global target_var, dut_keys, tg_keys, testbed_name
target_var = "dut_lib_template"
testbed_name = 'basic'
def __init__(self, configuration=None):
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
self.configuration = configuration
self.testbed_list = None
self.len_testbed_list = None
self.key_check_arr = ['target', 'controller', 'device_under_tests', 'traffic_generator']
def set_data(self):
"""
This Function sets the value of how many testbeds are there in the schema input file and stores the number of it
"""
testbed_list = []
for key in self.configuration:
print(key)
testbed_list.append(key)
print(testbed_list)
self.testbed_list = testbed_list
self.len_testbed_list = len(testbed_list)
def key_check(self):
"""
This fun checks the keys of the testbeds present in schema such as target, controller, DUT, traffic generator
"""
arr = []
for a in range(self.len_testbed_list):
for key in self.configuration[self.testbed_list[a]]:
print(key)
arr.append(key)
print(arr)
if arr == self.key_check_arr:
arr.clear()
print("All keys are present in the schema for Testbed")
logging.info("All keys are present in the schema for Testbed")
else:
arr.clear()
logging.error("Not all the keys required present in schema for Testbed")
def target_check(self):
"""
This function checks the global target we have declared is matching in the schema or not
"""
global target_var
for a in range(self.len_testbed_list):
if self.configuration[self.testbed_list[a]]['target'] == target_var:
logging.info("Target is matching")
else:
logging.error("Target variable is not matching")
def controller_check(self):
"""
This func checks the keys of Controller such as Url, Username and password
"""
arr = ['url', 'username', 'password']
lis = []
for a in range(self.len_testbed_list):
for key in self.configuration[self.testbed_list[a]]['controller']:
lis.append(key)
print(self.testbed_list[a], "->", lis)
if lis == arr:
lis.clear()
print("All keys are present in the Controller data of schema")
logging.info("All keys are present in the Controller data of schema")
else:
lis.clear()
logging.error("Not all the Controller keys required present in schema")
def dut_keys_check(self):
"""
This func checks DUT keys if every key is present in the schema or not
"""
global dut_keys
arr, arr2, arr3 = [], [], []
dut_keys = ['model', 'supported_bands', 'supported_modes', 'wan_port', 'lan_port', 'ssid', 'mode', 'identifier',
'method', 'host_ip', 'host_username', 'host_password', 'host_ssh_port', 'serial_tty',
'firmware_version']
for a in range(self.len_testbed_list):
for b in range(len(self.configuration[self.testbed_list[a]]['device_under_tests'])):
for key in self.configuration[self.testbed_list[a]]['device_under_tests'][b]:
arr.append(key)
arr2 = list(set(dut_keys) - set(arr))
arr3.append(arr2)
# print(arr3)
arr.clear()
for a in range(len(arr3)):
if len(arr3[a]) == 0:
logging.info("All keys of DUT are present")
self.dut_values_check()
elif len(arr3[a]) == 1:
if arr3[a][0] == 'ssid':
logging.warning("Ssid key is not present" + str(self.testbed_list[a]) + "->" + str(arr3[a]))
self.dut_values_check()
else:
logging.error(
"Required keys of DUT are not present, Please include those" + str(self.testbed_list[a])
+ "->" + str(arr3[a]))
else:
logging.error(
"Not all Keys of DUT required are present" + str(self.testbed_list[a]) + "->" + str(arr3[a]))
def dut_values_check(self):
"""
This func checks whether all the values of DUT Keys are valid or not. Use it after dut_keys_check()
"""
global dut_keys
print("DUT Key->value Check")
for a in range(self.len_testbed_list):
for b in range(len(self.configuration[self.testbed_list[a]]['device_under_tests'])):
for key, value in self.configuration[self.testbed_list[a]]['device_under_tests'][b].items():
# print(key, value)
# print(type(value))
if key == 'model':
if type(value) == str:
logging.info("Model key->values are present and eligible")
else:
logging.error(
"Model key->values which are present are not eligible" + str(key) + "->" + str(value))
elif key == 'supported_bands':
if type(value) == list:
logging.info("Supported bands key->values are present and eligible")
else:
logging.error(
"Supported bands key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'supported_modes':
if type(value) == list:
logging.info("Supported modes key->values are present and eligible")
else:
logging.error(
"Supported modes key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'wan_port':
if type(value) == str:
x = re.search("\d.\d.", value)
if x is not None:
logging.info("Wan port key->values are present and eligible")
else:
logging.error(
"Wan port key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'lan_port':
if value is None or type(value) == str:
if type(value) == str:
x = re.search("\d.\d.", value)
if x is not None:
logging.info("Lan port key->values are present and eligible")
else:
logging.error("Lan port key->values are present and not eligible")
else:
logging.info("Lan port is null or None")
else:
logging.error("Lan port key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'ssid':
if type(value) == dict:
self.ssid_data_check()
logging.info("Ssid key->values are present and eligible")
else:
logging.error("Ssid key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'mode':
if type(value) == str:
logging.info("Mode key->values are present and eligible")
else:
logging.error("Mode key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'identifier':
if type(value) == str and type(value) is not None:
logging.info("Identifier key->values are present and eligible")
else:
logging.error(
"Identifier key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'method':
if type(value) == str and (value == 'serial' or value == 'ssh' or value == 'telnet'):
logging.info("Method key->values are present and eligible")
else:
logging.error("Method key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'host_ip':
if type(value) == str:
logging.info("Host IP key->values are present and eligible")
else:
logging.error("Host IP key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'host_username':
if type(value) == str:
logging.info("Host Username key->values are present and eligible")
else:
logging.error(
"Host Username key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'host_password':
if type(value) == str:
logging.info("Host Password key->values are present and eligible")
else:
logging.error(
"Host Password key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'host_ssh_port':
if type(value) == int:
logging.info("Host ssh Port key->values are present and eligible")
else:
logging.error(
"Host ssh Port key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'serial_tty':
if type(value) == str:
logging.info("Serial tty key->values are present and eligible")
else:
logging.error(
"Serial tty key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'firmware_version':
if type(value) == str:
logging.info("Firmware version key->values are present and eligible")
else:
logging.error(
"Firmware version key->values which are present are not eligible" + str(key) + "->" +
str(value))
def traffic_generator_keys_check(self):
"""
THis Func checks the Traffic generator keys are present in the schema or not. It should be called after
dut_values_check()
"""
global tg_keys
tg_keys = ['name', 'testbed', 'scenario', 'details']
lis = []
for count in range(self.len_testbed_list):
for key in self.configuration[self.testbed_list[count]]['traffic_generator']:
lis.append(key)
print(self.testbed_list[count], "->", lis)
if lis == tg_keys:
lis.clear()
print("All keys are present in the Traffic generator data of schema")
logging.info("All keys are present in the Traffic generator data of schema")
self.traffic_generator_values_check(count)
else:
lis.clear()
logging.error("Not all the Traffic generator keys required are present in schema")
def ssid_data_check(self):
"""
This func has to check the Ssid data check in DUT values if SSid key is present in it
"""
pass
def traffic_generator_values_check(self, count):
"""
This func validates the traffic generator values and is called from traffic_generator_keys_check() after
keys are checked
"""
global testbed_name
logging.info("Traffic generator Key->value check")
for key, value in self.configuration[self.testbed_list[count]]['traffic_generator'].items():
if key == 'name':
if type(value) == str:
logging.info("Name key->value are present and Eligible")
else:
logging.error("Name key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'testbed':
if type(value) == str and value == testbed_name:
logging.info("Testbed key->value are present and Eligible")
else:
logging.error("Testbed key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'scenario':
if type(value) == str and (value == 'dhcp-bridge' or value == 'dhcp-external'):
logging.info("Scenario key->value are present and Eligible")
else:
logging.error("Scenario key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'details':
if type(value) == dict:
self.tg_details_data_keys_check(count)
logging.info("Details key->value are present and Eligible")
else:
logging.error("Details key->values which are present are not eligible" + str(key) + "->" +
str(value))
def tg_details_data_keys_check(self, count):
"""
This Func checks the Details data keys of Traffic generator and is called in traffic_generator_values_check()
after details key is validated there for further validation of details dict
"""
global tg_details_keys
tg_details_keys = ['manager_ip', 'http_port', 'ssh_port', 'setup', 'wan_ports', 'lan_ports', 'uplink_nat_ports']
lis = []
for key in self.configuration[self.testbed_list[count]]['traffic_generator']['details']:
lis.append(key)
print(self.testbed_list[count], "->", lis)
if lis == tg_details_keys:
lis.clear()
print("All keys are present in the Traffic generator Details data of schema")
logging.info("All keys are present in the Traffic generator Details data of schema")
self.tg_details_values_check(count)
else:
lis.clear()
logging.error("Not all the Traffic generator Details keys required are present in schema")
def tg_details_values_check(self, count):
"""
This Func validates the Details data Values of Traffic generator and is called in tg_details_data_keys_check()
after details keys are validated
"""
logging.info("Traffic generator Key->value check")
for key, value in self.configuration[self.testbed_list[count]]['traffic_generator']['details'].items():
if key == 'manager_ip':
if type(value) == str:
logging.info("Manager ip key->value are present and Eligible")
else:
logging.error("Manager ip key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'http_port':
if type(value) == int:
logging.info("Http port key->value are present and Eligible")
else:
logging.error("Http port key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'ssh_port':
if type(value) == int:
logging.info("Ssh port key->value are present and Eligible")
else:
logging.error("Ssh port key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'setup':
if type(value) == dict:
key2 = self.configuration[self.testbed_list[count]]['traffic_generator']['details']['setup']
value2 = self.configuration[self.testbed_list[count]]['traffic_generator']['details']['setup'][
'method']
if key2 == 'method':
if type(value2) == str:
if value2 == 'build':
logging.info("Method - Build key->value are present and Eligible")
elif value2 == 'load':
if key2['DB'] == str:
logging.info("Method - Load key->value are present and Eligible")
else:
logging.error("Method key->values which are present are not eligible" + str(key) +
"->" +
str(value))
logging.info("Setup key->value are present and Eligible")
else:
logging.error("Setup key->values which are present are not eligible" + str(key) + "->" +
str(value))
elif key == 'wan_ports':
if type(value) == dict:
self.tg_ports_data_keys_check(key, count)
else:
logging.error("Wan Ports data is not eligible")
elif key == 'lan_ports':
if type(value) == dict:
# self.tg_ports_data_keys_check(key, count)
pass
else:
logging.error("Lan Ports data is not eligible")
elif key == 'uplink_nat_ports':
if type(value) == dict:
self.tg_ports_data_keys_check(key, count)
else:
logging.error("Uplink nat Ports data is not eligible")
def tg_ports_data_keys_check(self, key, count):
"""
This Func validates the Ports data Values of Traffic generator and is called in tg_details_values_check()
after details values are validated. It will check for patterns like 1.1.eth2
"""
ports = self.configuration[self.testbed_list[count]]['traffic_generator']['details'][key]
print("Data of ---------------", key)
print(ports)
for key1, value1 in ports.items():
if type(key1) == str and type(value1) == dict:
x = re.search("\d.\d.", key1)
if x is not None:
logging.info("Key of" + str(key) + "->" + str(key1) + "is eligible")
self.tg_ports_addressing_check(value1)
else:
logging.error("Key of" + str(key) + "->" + str(key1) + "is not eligible")
else:
logging.error("Key of" + str(key) + "->" + str(key1) + "is not eligible and is not a string")
@staticmethod
def tg_ports_addressing_check(value):
"""
This function checks the addressing data if values present has ip address pattern or not. It is called in
tg_ports_data_keys_check()
"""
print("Value--------------")
print(value)
if value['addressing'] == 'static':
for key, value2 in value.items():
if key == 'ip':
value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value2)
if value2 is not None:
logging.info("Ip is present and eligible in ports")
elif key == 'gateway_ip':
value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/\d*$", value2)
if value2 is not None:
logging.info("Gateway Ip is present and eligible in ports")
elif key == 'ip_mask':
value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value2)
if value2 is not None:
logging.info("Ip Mask is present and eligible in ports")
elif key == 'dns_servers' and type(value2) == str:
logging.info("DNS server is present and eligible in ports")
elif key == 'addressing':
logging.info("Skipping Addressing ,As it is already verified")
else:
logging.error("Please look into the Ports data")
elif value['addressing'] == 'dynamic':
pass
elif value['addressing'] == 'dhcp-server':
for key, value2 in value.items():
if key == 'ip':
value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value2)
if value2 is not None:
logging.info("Ip is present and eligible in ports")
elif key == 'gateway_ip':
value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/\d*$", value2)
if value2 is not None:
logging.info("Gateway Ip is present and eligible in ports")
elif key == 'ip_mask':
value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value2)
if value2 is not None:
logging.info("Ip Mask is present and eligible in ports")
elif key == 'dns_servers' and type(value2) == str:
logging.info("DNS server is present and eligible in ports")
elif key == 'addressing':
logging.info("Skipping Addressing ,As it is already verified")
else:
logging.error("Please look into the Ports data")
if __name__ == '__main__':
var = {
"CONFIGURATION": {
'basic-06': {
'target': 'dut_lib_template',
'controller': {
'url': 'https://sec-qa01.cicd.lab.wlan.tip.build:16001',
'username': 'tip@ucentral.com',
'password': 'OpenWifi%123'
},
'device_under_tests': [{
'model': 'edgecore_eap102', # Will be string
'supported_bands': ['2G', '5G'], # ['2G', '5G', '6G']
'supported_modes': ['BRIDGE', 'NAT', 'VLAN'], # Will remain same
'wan_port': '1.1.eth2', # Has to be
'lan_port': '1.1.eth2', # Has to be null or none
'ssid': { # Has to be seperate func
'2g-ssid': 'OpenWifi',
'5g-ssid': 'OpenWifi',
'6g-ssid': 'OpenWifi',
'2g-password': 'OpenWifi',
'5g-password': 'OpenWifi',
'6g-password': 'OpenWifi',
'2g-encryption': 'WPA2',
'5g-encryption': 'WPA2',
'6g-encryption': 'WPA3',
'2g-bssid': '68:7d:b4:5f:5c:31',
'5g-bssid': '68:7d:b4:5f:5c:3c',
'6g-bssid': '68:7d:b4:5f:5c:38'
},
'mode': 'wifi6', # ['wifi5', 'wifi6', 'wifi6e']
'identifier': '903cb39d6918', # Has to be not Null
'method': 'serial', # Has to be serial, ssh, telnet
'host_ip': 'localhost', # Ip or localhost
'host_username': 'lanforge', # Str
'host_password': 'pumpkin77', # Str
'host_ssh_port': 8852, # Int
'serial_tty': '/dev/ttyAP2', # Str
'firmware_version': 'next-latest' # Str
}],
'traffic_generator': {
'name': 'lanforge', # STR
'testbed': 'basic', # [basic, ]
'scenario': 'dhcp-bridge', # dhcp-bridge, dhcp-external
'details': {
'manager_ip': 'localhost', # Str or ip
'http_port': 8850, # int
'ssh_port': 8851, # int
'setup': {'method': 'build', 'DB': 'Test_Scenario_Automation'},
# Method-> build/load, if load-> DB
'wan_ports': { # addressing(dhcp-server, static, dynamic) Subnet-> ip/ cannot be eth2(1.1.eth2)
'1.1.eth2': {'addressing': 'dhcp-server', 'subnet': '172.16.0.1/16', 'dhcp': { # DICT
'lease-first': 10, # int
'lease-count': 10000, # int
'lease-time': '6h' # str
}
}
},
'lan_ports': {
},
'uplink_nat_ports': {
'1.1.eth3': {
'addressing': 'static', # If static -> need ip, g_ip, ip_mask, dns
'ip': '10.28.2.17',
'gateway_ip': '10.28.2.1/24',
'ip_mask': '255.255.255.0',
'dns_servers': 'BLANK'
}
}
}
}
},
"advance-03": {
"target": "dut_lib_template",
"controller": {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
"password": "OpenWifi%123"
},
"device_under_tests": [{
"model": "cig_wf196",
"supported_bands": ["2G", "5G", "6G"],
"supported_modes": ["BRIDGE", "NAT", "VLAN"],
"wan_port": "1.3.eth2",
"lan_port": None,
"ssid": {
"2g-ssid": "OpenWifi",
"5g-ssid": "OpenWifi",
"6g-ssid": "OpenWifi",
"2g-password": "OpenWifi",
"5g-password": "OpenWifi",
"6g-password": "OpenWifi",
"2g-encryption": "WPA2",
"5g-encryption": "WPA2",
"6g-encryption": "WPA3",
"2g-bssid": "68:7d:b4:5f:5c:31",
"5g-bssid": "68:7d:b4:5f:5c:3c",
"6g-bssid": "68:7d:b4:5f:5c:38"
},
"mode": "wifi6e",
"identifier": "824f816011e4",
"method": "serial",
"host_ip": "localhost",
"host_username": "lanforge",
"host_password": "pumpkin77",
"host_ssh_port": 8902,
"serial_tty": "/dev/ttyAP0",
"firmware_version": "next-latest"
}],
"traffic_generator": {
"name": "lanforge",
"testbed": "basic",
"scenario": "dhcp-bridge",
"details": {
"manager_ip": "10.28.3.117",
"http_port": 8900,
"ssh_port": 8901,
"setup": {"method": "build", "DB": "Test_Scenario_Automation"},
"wan_ports": {
"1.3.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
}
}
},
"lan_ports": {
},
"uplink_nat_ports": {
"1.3.eth3": {
"addressing": "static",
"ip": "10.28.2.39",
"gateway_ip": "10.28.2.1/24",
"ip_mask": "255.255.255.0",
"dns_servers": "BLANK"
}
}
}
}
}
}
}
obj = SchemaCheck(var["CONFIGURATION"])
obj.set_data()
obj.key_check()
obj.target_check()
obj.controller_check()
obj.dut_values_check()
obj.traffic_generator_keys_check()

21
dut_lib_template/setup.py Normal file
View File

@@ -0,0 +1,21 @@
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name='dut_lib_template',
version='0.1',
scripts=['dut_lib_template.py', 'controller.py', 'ap_lib.py', 'SetupLibrary.py', 'openwrt_ctl.py'],
author="Shivam Thakur",
author_email="shivam.thakur@candelatech.com",
description="TIP OpenWIFI 2.X Library",
long_description=long_description,
long_description_content_type="text/markdown",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
)

View File

@@ -430,7 +430,7 @@ class APLIBS:
if __name__ == '__main__':
basic_05 = {
"target": "tip_2x",
"target": "dut_lib_template",
"controller": {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",

View File

@@ -1455,7 +1455,6 @@ class UProfileUtility:
for keys in radio_config[band]:
base_radio_config_6g[keys] = radio_config[band][keys]
self.base_profile_config["radios"].append(base_radio_config_2g)
self.base_profile_config["radios"].append(base_radio_config_5g)
# self.base_profile_config["radios"].append(base_radio_config_6g)

View File

@@ -13,7 +13,7 @@ class SchemaCheck:
Global variables are declared for easy modification and checks of some data
"""
global target_var, dut_keys, tg_keys, testbed_name
target_var = "tip_2x"
target_var = "dut_lib_template"
testbed_name = 'basic'
def __init__(self, configuration=None):
@@ -454,7 +454,7 @@ if __name__ == '__main__':
var = {
"CONFIGURATION": {
'basic-06': {
'target': 'tip_2x',
'target': 'dut_lib_template',
'controller': {
'url': 'https://sec-qa01.cicd.lab.wlan.tip.build:16001',
'username': 'tip@ucentral.com',
@@ -524,7 +524,7 @@ if __name__ == '__main__':
}
},
"advance-03": {
"target": "tip_2x",
"target": "dut_lib_template",
"controller": {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",

View File

@@ -4,9 +4,9 @@ with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name='tip_2x',
name='dut_lib_template',
version='0.1',
scripts=['tip_2x.py', 'controller.py', 'ap_lib.py', 'SetupLibrary.py', 'openwrt_ctl.py'],
scripts=['dut_lib_template.py', 'controller.py', 'ap_lib.py', 'SetupLibrary.py', 'openwrt_ctl.py'],
author="Shivam Thakur",
author_email="shivam.thakur@candelatech.com",
description="TIP OpenWIFI 2.X Library",

View File

@@ -38,13 +38,13 @@ class tip_2x:
controller_data = {}
device_under_tests_info = []
"""
OpenWifi 2.x Specific Variables that will be only scoped in tip_2x Library
OpenWifi 2.x Specific Variables that will be only scoped in dut_lib_template Library
"""
ow_sec_url = ""
ow_sec_login_username = ""
ow_sec_login_password = ""
target = "tip_2x"
target = "dut_lib_template"
controller_library_object = object()
prov_library_object = object()
firmware_library_object = object()
@@ -82,8 +82,8 @@ class tip_2x:
device_under_tests_info=[], logging_level=logging.INFO):
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging_level)
if target != self.target:
logging.error("Target version is : " + target + " Expected target is tip_2x")
pytest.exit("Target should be 'tip_2x', Current Target is : " + target)
logging.error("Target version is : " + target + " Expected target is dut_lib_template")
pytest.exit("Target should be 'dut_lib_template', Current Target is : " + target)
if controller_data is None:
controller_data = {}
self.controller_data = controller_data
@@ -98,8 +98,8 @@ class tip_2x:
def setup_metadata(self):
logging.info(
"Setting up the Controller metadata for tip_2x Library: " + str(json.dumps(self.controller_data, indent=2)))
logging.info("Setting up the DUT metadata for tip_2x Library: " + str(
"Setting up the Controller metadata for dut_lib_template Library: " + str(json.dumps(self.controller_data, indent=2)))
logging.info("Setting up the DUT metadata for dut_lib_template Library: " + str(
json.dumps(self.device_under_tests_info, indent=2)))
logging.info("Number of DUT's in lab_info.json: " + str(len(self.device_under_tests_info)))
self.ow_sec_url = self.controller_data["url"]
@@ -547,6 +547,8 @@ class tip_2x:
self.dut_library_object.setup_serial_environment(idx=idx)
self.dut_library_object.verify_certificates(idx=idx)
ret_val = self.dut_library_object.ubus_call_ucentral_status(idx=idx, attach_allure=False)
wifi_status = self.dut_library_object.get_wifi_status(idx=idx, attach_allure=True)
allure.attach(name="wifi_status_before_apply: ", body=wifi_status, attachment_type=allure.attachment_type.TEXT)
if not ret_val["connected"] or ret_val["connected"] is None:
# TODO: check the connectivity (if it is not connected, then check the lanforge wan port and bring it
# up if lanforge eth is in down state. Also check the link state of eth port with ip address
@@ -573,6 +575,7 @@ class tip_2x:
self.dut_library_object.check_connectivity(idx=idx, attach_allure=False)
self.dut_library_object.check_connectivity(idx=idx)
r_data = self.dut_library_object.get_wifi_status(idx=idx, attach_allure=False)
allure.attach(name="wifi_status_after_apply: ", body=r_data, attachment_type=allure.attachment_type.TEXT)
logging.info("Checking Wifi Status after Config Apply...")
for radio in r_data:
if not r_data[radio]["up"]:
@@ -897,7 +900,7 @@ class tip_2x:
if __name__ == '__main__':
basic_05 = {
"target": "tip_2x",
"target": "dut_lib_template",
"controller": {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",

View File

@@ -2,7 +2,7 @@
# Setup python environment variable and pip environment variable like
# export PYTHON=/usr/bin/python3
# export PIP=/usr/bin/pip3
#sh setup_env.bash -t tip_2x -d all -n "Shivam Thakur" -o TIP -e shivam.thakur@candelatech.com -i "TIP OpenWIFI 2.X Library"
#sh setup_env.bash -t dut_lib_template -d all -n "Shivam Thakur" -o TIP -e shivam.thakur@candelatech.com -i "TIP OpenWIFI 2.X Library"
set -e
helpFunction()
@@ -85,7 +85,7 @@ then
touch tests/imports.py
if [ $target == "tip_2x" ]
then
cd libs/tip_2x
cd libs/dut_lib_template
$PYTHON setup.py bdist_wheel
$PIP install dist/*.whl --force-reinstall
cd ../../

View File

@@ -2,7 +2,7 @@
# Setup python environment variable and pip environment variable like
# export PYTHON=/usr/bin/python3
# export PIP=/usr/bin/pip3
#sh setup_env.bash -t tip_2x -d all -n "Shivam Thakur" -o TIP -e shivam.thakur@candelatech.com -i "TIP OpenWIFI 2.X Library"
#sh setup_env.bash -t dut_lib_template -d all -n "Shivam Thakur" -o TIP -e shivam.thakur@candelatech.com -i "TIP OpenWIFI 2.X Library"
helpFunction()
{
echo "Setup SSH Tunnel for TIP Labs"
@@ -76,7 +76,7 @@ Create_lab_info_json()
"CONFIGURATION" : {
"basic-01" : {
"target": "tip_2x",
"target": "dut_lib_template",
"controller" : {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
@@ -145,7 +145,7 @@ Create_lab_info_json()
}
},
"basic-02" : {
"target": "tip_2x",
"target": "dut_lib_template",
"controller" : {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
@@ -214,7 +214,7 @@ Create_lab_info_json()
}
},
"basic-03" : {
"target": "tip_2x",
"target": "dut_lib_template",
"controller" : {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
@@ -285,7 +285,7 @@ Create_lab_info_json()
}
},
"basic-03a" : {
"target": "tip_2x",
"target": "dut_lib_template",
"controller" : {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
@@ -356,7 +356,7 @@ Create_lab_info_json()
}
},
"basic-04" : {
"target": "tip_2x",
"target": "dut_lib_template",
"controller" : {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
@@ -427,7 +427,7 @@ Create_lab_info_json()
}
},
"basic-04a" : {
"target": "tip_2x",
"target": "dut_lib_template",
"controller" : {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
@@ -498,7 +498,7 @@ Create_lab_info_json()
}
},
"basic-05" : {
"target": "tip_2x",
"target": "dut_lib_template",
"controller" : {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
@@ -567,7 +567,7 @@ Create_lab_info_json()
}
},
"basic-06" : {
"target": "tip_2x",
"target": "dut_lib_template",
"controller" : {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
@@ -636,7 +636,7 @@ Create_lab_info_json()
}
},
"advance-03" : {
"target": "tip_2x",
"target": "dut_lib_template",
"controller" : {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",

View File

@@ -59,6 +59,12 @@ def pytest_addoption(parser):
default="lanforge",
help="Test Traffic Generator which can be used, lanforge | perfecto"
)
parser.addoption(
"--run-lf",
action="store_true",
default=False,
help="Test Traffic Generator which can be used, lanforge | perfecto"
)
parser.addoption(
"--skip-env",
@@ -87,6 +93,13 @@ def get_lab_info():
yield configuration
@pytest.fixture(scope="session")
def run_lf(request):
"""yields the testbed option selection"""
run_lf = request.config.getoption("--run-lf")
yield run_lf
@pytest.fixture(scope="session")
def selected_testbed(request):
"""yields the testbed option selection"""
@@ -155,7 +168,7 @@ def get_target_object(request, get_testbed_details, add_allure_environment_prope
t_object = target(controller_data=get_testbed_details["controller"], target=get_testbed_details["target"],
device_under_tests_info=get_testbed_details["device_under_tests"])
if not request.config.getoption("--skip-env"):
if get_testbed_details["target"] == "tip_2x":
if get_testbed_details["target"] == "dut_lib_template":
t_object.setup_environment_properties(add_allure_environment_property=
add_allure_environment_property)
@@ -219,7 +232,7 @@ def is_test_library_perfecto_ios(request):
@pytest.fixture(scope="session")
def get_test_library(get_testbed_details, is_test_library_perfecto_android, is_test_library_perfecto_ios):
def get_test_library(get_testbed_details, is_test_library_perfecto_android, is_test_library_perfecto_ios, run_lf):
if is_test_library_perfecto_android:
obj = android_tests()
elif is_test_library_perfecto_ios:
@@ -228,7 +241,7 @@ def get_test_library(get_testbed_details, is_test_library_perfecto_android, is_t
obj = lf_tests(lf_data=get_testbed_details["traffic_generator"],
dut_data=get_testbed_details["device_under_tests"],
log_level=logging.DEBUG,
run_lf=False,
run_lf=run_lf,
influx_params=None)
yield obj

View File

@@ -2,7 +2,7 @@ import pytest
@pytest.fixture(scope="class")
def setup_configuration(request, get_markers, get_target_object):
def setup_configuration(request, get_markers, get_target_object, run_lf):
# Predefined selected markers and selected configuration
conf = dict(request.param)
@@ -13,7 +13,9 @@ def setup_configuration(request, get_markers, get_target_object):
requested_combination.append(get_markers[key])
# Method to setup the basic configuration
data = get_target_object.setup_basic_configuration(configuration=configuration,
requested_combination=requested_combination)
data = {}
if not run_lf:
data = get_target_object.setup_basic_configuration(configuration=configuration,
requested_combination=requested_combination)
yield data