diff --git a/dut_lib_template/README.md b/dut_lib_template/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/dut_lib_template/SetupLibrary.py b/dut_lib_template/SetupLibrary.py new file mode 100644 index 000000000..c3b75ecef --- /dev/null +++ b/dut_lib_template/SetupLibrary.py @@ -0,0 +1,81 @@ +import logging + +import paramiko +from scp import SCPClient + +logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) + + +class SetupLibrary: + + def __init__(self, remote_ip="", + remote_ssh_port=22, + remote_ssh_username="lanforge", + remote_ssh_password="lanforge", + pwd="", + ): + self.pwd = pwd + self.remote_ip = remote_ip + self.remote_ssh_username = remote_ssh_username + self.remote_ssh_password = remote_ssh_password + self.remote_ssh_port = remote_ssh_port + + def setup_serial_environment(self): + client = self.ssh_cli_connect() + cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd) + output = str(stdout.read()) + if output.__contains__("False"): + cmd = 'mkdir ~/cicd-git/' + client.exec_command(cmd) + cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd) + output = str(stdout.read()) + if output.__contains__("False"): + logging.info("Copying openwrt_ctl serial control Script...") + with SCPClient(client.get_transport()) as scp: + scp.put(self.pwd + 'openwrt_ctl.py', '~/cicd-git/openwrt_ctl.py') # Copy my_file.txt to the server + cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd) + var = str(stdout.read()) + client.close() + + def ssh_cli_connect(self): + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + logging.info("Trying SSH Connection to: " + str(self.remote_ip) + + " on port: " + str(self.remote_ssh_port) + + " with username: " + str(self.remote_ssh_username) + + " and password: " + str(self.remote_ssh_password)) + client.connect(self.remote_ip, username=self.remote_ssh_username, password=self.remote_ssh_password, + port=self.remote_ssh_port, timeout=10, allow_agent=False, banner_timeout=200) + return client + + def check_serial_connection(self, tty="/dev/ttyUSB0"): + client = self.ssh_cli_connect() + cmd = 'ls /dev/tty*' + stdin, stdout, stderr = client.exec_command(cmd) + output = str(stdout.read().decode('utf-8')) + client.close() + available_tty_ports = output.split("\n") + if tty in available_tty_ports: + logging.info("Expected Serial Console Port, " + tty + " is available on " + self.remote_ip) + else: + logging.error("Expected Serial Console Port, " + tty + " is not available on " + self.remote_ip) + return tty in available_tty_ports + + def kill_all_minicom_process(self, tty="/dev/ttyUSB0"): + client = self.ssh_cli_connect() + stdin, stdout, stderr = client.exec_command("fuser -k " + tty) + # print(stdout.read()) + client.close() + + +if __name__ == '__main__': + obj = SetupLibrary(remote_ip="192.168.52.89", + remote_ssh_port=22, + pwd="") + + obj.setup_serial_environment() + obj.check_serial_connection(tty="/dev/ttyUSB0") + obj.kill_all_minicom_process(tty="/dev/ttyUSB0") diff --git a/dut_lib_template/__init__.py b/dut_lib_template/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dut_lib_template/ap_lib.py b/dut_lib_template/ap_lib.py new file mode 100644 index 000000000..d9585673e --- /dev/null +++ b/dut_lib_template/ap_lib.py @@ -0,0 +1,176 @@ +import importlib +import json +import logging + +import allure +import paramiko +import pytest + +setup_lib = importlib.import_module("SetupLibrary") +SetupLibrary = setup_lib.SetupLibrary + + +class APLIBS: + setup_library_objects = list() + device_under_tests_data = None + + def __init__(self, dut_data=None): + if dut_data is None: + logging.error("Device Under Test Data is Not Specified, Please provide valid DUT Data") + pytest.exit("Device Under Test Data is Not Specified, Please provide valid DUT Data") + if type(dut_data) is not list: + logging.error("Device Under Test Data is Not in list format, Please provide valid DUT Data in list format") + pytest.exit("Device Under Test Data is Not in list format, Please provide valid DUT Data in list format") + self.device_under_tests_data = dut_data + for dut in self.device_under_tests_data: + self.setup_library_objects.append(SetupLibrary(remote_ip=dut["host_ip"], + remote_ssh_port=dut["host_ssh_port"], + remote_ssh_username=dut["host_username"], + remote_ssh_password=dut["host_password"], + pwd="")) + + def ssh_cli_connect(self, ip="", + port=22, + username="", + password="", + timeout=10, + allow_agent=False, + banner_timeout=200): + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(ip, username=username, password=password, + port=port, timeout=timeout, allow_agent=allow_agent, banner_timeout=banner_timeout) + return client + + def run_generic_command(self, cmd="", idx=0, print_log=True, attach_allure=False, + expected_attachment_type=allure.attachment_type.TEXT): + input_command = cmd + logging.info("Executing Command on AP: " + cmd) + try: + self.setup_library_objects[idx].kill_all_minicom_process( + tty=self.device_under_tests_data[idx]["serial_tty"]) + client = self.ssh_cli_connect(ip=self.device_under_tests_data[idx]["host_ip"], + port=self.device_under_tests_data[idx]["host_ssh_port"], + username=self.device_under_tests_data[idx]["host_username"], + password=self.device_under_tests_data[idx]["host_password"], + timeout=10, + allow_agent=False, + banner_timeout=200) + if self.device_under_tests_data[idx]["method"] == "serial": + owrt_args = "--prompt root@" + self.device_under_tests_data[idx][ + "identifier"] + " -s serial --log stdout --user root --passwd openwifi" + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {owrt_args} -t {self.device_under_tests_data[idx]['serial_tty']} --action " \ + f"cmd --value \"{cmd}\" " + if print_log: + logging.info(cmd) + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + final_output = str(output) + if not output.__contains__(b"BOOTLOADER-CONSOLE-IPQ6018#"): + status = output.decode('utf-8').splitlines() + status.pop(0) + final_output = '\n'.join(status) + if print_log: + logging.info(cmd) + logging.info("Output for command: " + input_command + "\n" + final_output) + if attach_allure: + allure.attach(name=input_command, body=output, attachment_type=expected_attachment_type) + client.close() + except Exception as e: + logging.error(e) + final_output = "Error: " + str(e) + return final_output + + def example_function(self, idx=0, print_log=True, attach_allure=True): + output = self.run_generic_command(cmd="example_command", idx=idx, + print_log=print_log, + attach_allure=attach_allure, + expected_attachment_type=allure.attachment_type.TEXT) + return output + + + + + +if __name__ == '__main__': + basic_05 = { + "target": "dut_lib_template", + "controller": { + "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", + "username": "tip@ucentral.com", + "password": "OpenWifi%123" + }, + "device_under_tests": [{ + "model": "cig_wf188n", + "supported_bands": ["2G", "5G"], + "supported_modes": ["BRIDGE", "NAT", "VLAN"], + "wan_port": "1.1.eth2", + "lan_port": None, + "ssid": { + "2g-ssid": "OpenWifi", + "5g-ssid": "OpenWifi", + "6g-ssid": "OpenWifi", + "2g-password": "OpenWifi", + "5g-password": "OpenWifi", + "6g-password": "OpenWifi", + "2g-encryption": "WPA2", + "5g-encryption": "WPA2", + "6g-encryption": "WPA3", + "2g-bssid": "68:7d:b4:5f:5c:31", + "5g-bssid": "68:7d:b4:5f:5c:3c", + "6g-bssid": "68:7d:b4:5f:5c:38" + }, + "mode": "wifi6", + "identifier": "0000c1018812", + "method": "serial", + "host_ip": "localhost", + "host_username": "lanforge", + "host_password": "pumpkin77", + "host_ssh_port": 8842, + "serial_tty": "/dev/ttyAP1", + "firmware_version": "next-latest" + }], + "traffic_generator": { + "name": "lanforge", + "testbed": "basic", + "scenario": "dhcp-bridge", + "details": { + "manager_ip": "localhost", + "http_port": 8840, + "ssh_port": 8841, + "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, + "wan_ports": { + "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { + "lease-first": 10, + "lease-count": 10000, + "lease-time": "6h" + } + } + }, + "lan_ports": { + + }, + "uplink_nat_ports": { + "1.1.eth1": { + "addressing": "static", + "ip": "10.28.2.16", + "gateway_ip": "10.28.2.1/24", + "ip_mask": "255.255.255.0", + "dns_servers": "BLANK" + } + } + } + } + } + logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.NOTSET) + obj = APLIBS(dut_data=basic_05["device_under_tests"]) + # obj.exit_from_uboot() + # obj.setup_serial_environment() + + # obj.verify_certificates() + # obj.get_dut_logs() + # l = obj.get_latest_config_recieved() + # a = obj.get_active_config() + # if a == l: + # print("a = l") + # print(obj.get_ap_version()) diff --git a/dut_lib_template/dut_lib_template.py b/dut_lib_template/dut_lib_template.py new file mode 100644 index 000000000..d785a6ce2 --- /dev/null +++ b/dut_lib_template/dut_lib_template.py @@ -0,0 +1,585 @@ +""" + Telecom Infra Project OpenWifi 2.X (Ucentral libraries for Test Automation) + + +""" +import importlib +import json +import random +import string +import time + +import allure +import pytest +import requests + +logging = importlib.import_module("logging") + +ap_lib = importlib.import_module("ap_lib") +controller = importlib.import_module("controller") + +""" + Custom Class Imports needed for OpenWifi 2.X +""" + +ConfigureController = controller.ConfigureController +Controller = controller.Controller +FMSUtils = controller.FMSUtils +ProvUtils = controller.ProvUtils +UProfileUtility = controller.UProfileUtility +APLIBS = ap_lib.APLIBS + + +class dut_lib_template: + """ + Standard OpenWifi wlan-testing specific variables + + """ + controller_data = {} + device_under_tests_info = [] + """ + OpenWifi 2.x Specific Variables that will be only scoped in dut_lib_template Library + + """ + ow_sec_url = "" + ow_sec_login_username = "" + ow_sec_login_password = "" + target = "dut_lib_template" + controller_library_object = object() + prov_library_object = object() + firmware_library_object = object() + dut_library_object = object() + supported_bands = ["2G", "5G", "6G", "5G-lower", "5G-upper"] + supported_modes = ["BRIDGE", "NAT", "VLAN"] + supported_encryption = ["open", + "wpa", + "wpa2_personal", + "wpa3_personal", + "wpa_wpa2_personal_mixed", + "wpa3_personal_mixed", + "wpa_enterprise", + "wpa2_enterprise", + "wpa3_enterprise", + "wpa_wpa2_enterprise_mixed", + "wpa3_enterprise_mixed", + "wpa3_enterprise_192" + ] + tip_2x_specific_encryption_translation = {"open": "none", + "wpa": "psk", + "wpa2_personal": "psk2", + "wpa3_personal": "sae", + "wpa3_personal_mixed": "sae-mixed", + "wpa_wpa2_personal_mixed": "psk-mixed", + "wpa_enterprise": "wpa", + "wpa2_enterprise": "wpa2", + "wpa3_enterprise": "wpa3", + "wpa_wpa2_enterprise_mixed": "wpa-mixed", + "wpa3_enterprise_mixed": "wpa3-mixed", + "wpa3_enterprise_192": "wpa3-192" + } + + def __init__(self, controller_data=None, target=None, + device_under_tests_info=[], logging_level=logging.INFO): + logging.basicConfig(format='%(asctime)s - %(message)s', level=logging_level) + if target != self.target: + logging.error("Target version is : " + target + " Expected target is dut_lib_template") + pytest.exit("Target should be 'dut_lib_template', Current Target is : " + target) + if controller_data is None: + controller_data = {} + self.controller_data = controller_data + self.device_under_tests_info = device_under_tests_info + self.setup_metadata() + self.setup_objects() + self.setup_environment_properties() + + """ + Controller and Access Point specific metadata that is related to OpenWifi 2.x + """ + + def setup_metadata(self): + logging.info( + "Setting up the Controller metadata for dut_lib_template Library: " + str( + json.dumps(self.controller_data, indent=2))) + logging.info("Setting up the DUT metadata for dut_lib_template Library: " + str( + json.dumps(self.device_under_tests_info, indent=2))) + logging.info("Number of DUT's in lab_info.json: " + str(len(self.device_under_tests_info))) + self.ow_sec_url = self.controller_data["url"] + self.ow_sec_login_username = self.controller_data["username"] + self.ow_sec_login_password = self.controller_data["password"] + + def setup_objects(self): + try: + self.controller_library_object = Controller(controller_data=self.controller_data) + self.prov_library_object = ProvUtils(sdk_client=self.controller_library_object) + self.firmware_library_object = FMSUtils(sdk_client=self.controller_library_object) + except Exception as e: + pytest.fail("Unable to setup Controller Objects") + logging.error("Exception in setting up Controller objects:" + str(e)) + try: + self.dut_library_object = APLIBS(dut_data=self.device_under_tests_info) + except Exception as e: + logging.error("Exception in setting up Access Point Library object:" + str(e)) + pytest.fail("Unable to setup AP Objects") + + def teardown_objects(self): + self.controller_library_object.logout() + + """ Standard getter methods. Should be available for all type of libraries. Commonly used by wlan-testing""" + + def get_dut_library_object(self): + return self.dut_library_object + + def get_controller_library_object(self): + return self.controller_library_object + + def get_controller_data(self): + return self.controller_data + + def get_device_under_tests_info(self): + return self.device_under_tests_info + + def get_number_of_dut(self): + return len(self.device_under_tests_info) + + def get_dut_logs(self, dut_idx=0): + return self.dut_library_object.get_logs(idx=0) + + def get_controller_logs(self): + pass + + def get_dut_max_clients(self): + pass + + def setup_configuration_data(self, configuration=None, + requested_combination=None): + """Predefined function for getting configuration data for applied data""" + c_data = configuration.copy() + if c_data is None: + pytest.exit("No Configuration Received") + if requested_combination is None: + pytest.exit("No requested_combination Received") + rf_data = None + if c_data.keys().__contains__("rf"): + rf_data = c_data["rf"] + # base_band_keys = ["2G", "5G", "6G", "5G-lower", "5G-upper"] + base_dict = dict.fromkeys(self.supported_bands) + for i in base_dict: + base_dict[i] = [] + for i in requested_combination: + if i[0] in self.supported_bands: + base_dict[i[0]].append(self.tip_2x_specific_encryption_translation[i[1]]) + if i[1] in self.supported_bands: + base_dict[i[1]].append((self.tip_2x_specific_encryption_translation[i[0]])) + temp = [] + for i in list(base_dict.values()): + for j in i: + temp.append(j) + temp_conf = c_data["ssid_modes"].copy() + for i in temp_conf: + if self.tip_2x_specific_encryption_translation[i] not in temp: + c_data["ssid_modes"].pop(i) + + temp_conf = c_data["ssid_modes"].copy() + print(self.tip_2x_specific_encryption_translation) + for i in temp_conf: + for j in range(len(temp_conf[i])): + for k in temp_conf[i][j]["appliedRadios"]: + if self.tip_2x_specific_encryption_translation[i] not in base_dict[k]: + c_data["ssid_modes"][i][j]["appliedRadios"].remove(k) + if c_data["ssid_modes"][i][j]["appliedRadios"] == []: + c_data["ssid_modes"][i][j] = {} # .popi.popitem()) # .popitem() + + for i in c_data["ssid_modes"]: + c_data["ssid_modes"][i] = [x for x in c_data["ssid_modes"][i] if x != {}] + for ssids in c_data["ssid_modes"]: + for i in c_data["ssid_modes"][ssids]: + if i is not {}: + i["security"] = self.tip_2x_specific_encryption_translation[ssids] + temp_conf = c_data.copy() + for i in range(0, len(self.device_under_tests_info)): + if c_data["mode"] not in self.device_under_tests_info[i]["supported_modes"]: + pytest.skip(c_data["mode"] + " is not Supported by DUT") + for enc in c_data["ssid_modes"]: + for idx in c_data["ssid_modes"][enc]: + check = all( + item in self.device_under_tests_info[i]["supported_bands"] for item in idx["appliedRadios"]) + if not check: + temp_conf["ssid_modes"][enc].remove(idx) + for key in c_data["rf"]: + if key not in self.device_under_tests_info[i]["supported_bands"]: + print(key) + temp_conf["rf"][key] = None + + return temp_conf + + """ + setup_basic_configuration - Method to configure AP in basic operating modes with multiple SSID's and multiple AP's + This covers, basic and advanced test cases + """ + + def setup_basic_configuration(self, configuration=None, + requested_combination=None): + f_conf = self.setup_configuration_data(configuration=configuration, + requested_combination=requested_combination) + + logging.info("Selected Configuration: " + str(json.dumps(f_conf, indent=2))) + final_configuration = f_conf.copy() + # Setup Mode + + # Setup Radio Scenario + + # Setup Config Apply on all AP's + ret_val = dict() + for i in range(0, len(self.device_under_tests_info)): + self.pre_apply_check(idx=i) # Do check AP before pushing the configuration + + self.post_apply_check(idx=i) # Do check AP after pushing the configuration + + ret_val[self.device_under_tests_info[i]["identifier"]] = {} + + temp_data = ret_val.copy() + for dut in temp_data: + ret_val[dut] = dict.fromkeys(["ssid_data", "radio_data"]) + ret_val[dut]["radio_data"] = temp_data[dut][-1] + temp_data[dut].pop(-1) + n = len(temp_data[dut]) + lst = list(range(0, n)) + ret_val[dut]["ssid_data"] = dict.fromkeys(lst) + for j in ret_val[dut]["ssid_data"]: + a = temp_data[dut][j].copy() + a = dict.fromkeys(["ssid", "encryption", "password", "band", "bssid"]) + a["ssid"] = temp_data[dut][j][0] + a["encryption"] = temp_data[dut][j][1] + a["password"] = temp_data[dut][j][2] + a["band"] = temp_data[dut][j][3] + a["bssid"] = temp_data[dut][j][4] + ret_val[dut]["ssid_data"][j] = a + temp = ret_val[dut]["radio_data"].copy() + for j in temp: + a = dict.fromkeys(["channel", "bandwidth", "frequency"]) + if temp[j] != None: + a["channel"] = temp[j][0] + a["bandwidth"] = temp[j][1] + a["frequency"] = temp[j][2] + ret_val[dut]["radio_data"][j] = a + return ret_val + + """ + setup_special_configuration - Method to configure APs in mesh operating modes with multiple SSID's and multiple AP's + This covers, mesh and other roaming scenarios which includes any special type of modes + multiple AP's with WDS and Wifi Steering scenarios are also covered here + """ + + def setup_special_configuration(self, configuration=None, + requested_combination=None): + final_configuration = self.setup_configuration_data(configuration=configuration, + requested_combination=requested_combination) + + logging.info("Selected Configuration: " + str(json.dumps(final_configuration, indent=2))) + + profile_object = UProfileUtility(sdk_client=self.controller_library_object) + if final_configuration["mode"] in self.supported_modes: + profile_object.set_mode(mode=final_configuration["mode"]) + else: + pytest.skip(final_configuration["mode"] + " Mode is not supported") + + # Setup Radio Scenario + if final_configuration["rf"] != {}: + profile_object.set_radio_config(radio_config=final_configuration["rf"]) + else: + profile_object.set_radio_config() + for ssid in final_configuration["ssid_modes"]: + for ssid_data in final_configuration["ssid_modes"][ssid]: + profile_object.add_ssid(ssid_data=ssid_data) + logging.info( + "Configuration That is getting pushed: " + json.dumps(profile_object.base_profile_config, indent=2)) + r_val = False + + # Do check AP before pushing the configuration + # TODO + self.dut_library_object.check_serial_connection() + """ + serial connection check + ubus call ucentral status + save the current uuid + uci show ucentral + ifconfig + wifi status + start logger to collect ap logs before config apply + Timestamp before doing config apply + """ + + for dut in self.device_under_tests_info: + resp = profile_object.push_config(serial_number=dut["identifier"]) + logging.info("Response for Config apply: " + str(resp.status_code)) + if resp.status_code != 200: + logging.info("Failed to apply Configuration to AP. Response Code" + + str(resp.status_code) + + "Retrying in 5 Seconds... ") + time.sleep(5) + resp = profile_object.push_config(serial_number=dut["identifier"]) + if resp.status_code != 200: + logging.info("Failed to apply Config, Response code:" + str(resp.status_code)) + pytest.fail("Failed to apply Config, Response code :" + str(resp.status_code)) + if resp.status_code == 200: + r_val = True + # TODO + """ + serial connection check + ubus call ucentral status + save the current uuid and compare with the one before config apply + save the active config and compare with the latest apply + uci show + ifconfig + iwinfo + wifi status + start logger to collect ap logs before config apply + Timestamp after doing config apply + """ + return r_val + + def get_dut_channel_data(self, idx): + try: + d = self.dut_library_object.run_generic_command(cmd="iw dev | grep channel", idx=idx) + d = d.replace("\n", "").replace("\t", "").replace(" ", "").split("channel") + d.pop(0) + d = list(set(d)) + data = dict.fromkeys(["2G", "5G", "6G"]) + for i in d: + channel = int(i.split("(")[0]) + bandwidth = int(i.split(":")[1].split("MHz")[0]) + center_freq = int(i.split(":")[-1].replace("MHz", "")) + if 2401 < center_freq < 2495: + data["2G"] = [channel, bandwidth, center_freq] + elif center_freq in [5955, 5975, 5995] and channel <= 9: + data["6G"] = [channel, bandwidth, center_freq] + elif 5030 < center_freq < 5990: + data["5G"] = [channel, bandwidth, center_freq] + elif 5995 < center_freq < 7125: + data["6G"] = [channel, bandwidth, center_freq] + else: + pass + except Exception as e: + logging.error("Exception in getting DUT Channel and bw data, Retrying again!") + try: + d = self.dut_library_object.run_generic_command(cmd="iw dev | grep channel", idx=idx) + d = d.replace("\n", "").replace("\t", "").replace(" ", "").split("channel") + d.pop(0) + data = dict.fromkeys(["2G", "5G", "6G"]) + for i in d: + channel = int(i.split("(")[0]) + bandwidth = int(i.split(":")[1].split("MHz")[0]) + center_freq = int(i.split(":")[-1].replace("MHz", "")) + if 2401 < center_freq < 2495: + data["2G"] = [channel, bandwidth, center_freq] + elif center_freq in [5955, 5975, 5995] and channel <= 9: + data["6G"] = [channel, bandwidth, center_freq] + elif 5030 < center_freq < 5990: + data["5G"] = [channel, bandwidth, center_freq] + elif 5995 < center_freq < 7125: + data["6G"] = [channel, bandwidth, center_freq] + else: + pass + except Exception as e: + logging.error("Exception in getting DUT Channel and bw data.") + return data + + def get_applied_ssid_info(self, profile_object=None, idx=0): + if profile_object is None: + logging.error("Profile object is None, Unable to fetch ssid info from AP") + return None + ssid_info_sdk = profile_object.get_ssid_info() + ap_wifi_data = self.dut_library_object.get_iwinfo(idx=idx) + channel_info = self.get_dut_channel_data(idx=idx) + o = ap_wifi_data.split() + iwinfo_bssid_data = {} + for i in range(len(o)): + if o[i].__contains__("ESSID"): + if o[i + 9].__contains__("2.4"): + band = "2G" + else: + band = "5G" + iwinfo_bssid_data[o[i - 1]] = [o[i + 1].replace('"', ''), o[i + 4], band] + for p in iwinfo_bssid_data: + for q in ssid_info_sdk: + if iwinfo_bssid_data[p][0] == q[0] and iwinfo_bssid_data[p][2] == q[3]: + q.append(iwinfo_bssid_data[p][1]) + ssid_info_sdk.append(channel_info) + return ssid_info_sdk + + def get_dut_version(self): + """ + get_dut_version + write your code to get ap version in + self.dut_library_object.get_ap_version() + """ + version_info = [] + for ap in range(len(self.device_under_tests_info)): + version_info.append(self.dut_library_object.get_ap_version(idx=ap, print_log=True)) + return version_info + + def get_controller_version(self): + version_info = dict() + # version_info["ow_fms"] = self.controller_library_object.get_sdk_version_fms() + # version_info["ow_gw"] = self.controller_library_object.get_sdk_version_gw() + # version_info["ow_sec"] = self.controller_library_object.get_sdk_version_sec() + # version_info["ow_prov"] = self.controller_library_object.get_sdk_version_prov() + # version_info["ow_rrm"] = self.controller_library_object.get_sdk_version_owrrm() + # version_info["ow_analytics"] = self.controller_library_object.get_sdk_version_ow_analytics() + # version_info["ow_sub"] = self.controller_library_object.get_sdk_version_owsub() + return version_info + + def pre_apply_check(self, idx=0): + """ + Write your code that you need to use + + """ + check = "PASSED" + if check is "FAILED": + pytest.fail("Write the message here for why precheck is failed") + + def post_apply_check(self, idx=0): + + check = "PASSED" + if check is "FAILED": + pytest.fail("Write the message here for why post apply is failed") + + def setup_environment_properties(self, add_allure_environment_property=None): + if add_allure_environment_property is None: + return + add_allure_environment_property('Cloud-Controller-SDK-URL', self.controller_data.get("url")) + sdk_version_data = self.get_controller_version() + for microservice in sdk_version_data: + add_allure_environment_property(microservice + '-version', + str(sdk_version_data.get(microservice))) + dut_versions = self.get_dut_version() + for i in range(len(self.device_under_tests_info)): + add_allure_environment_property("Firmware-Version_" + self.device_under_tests_info[i]["identifier"], + str(dut_versions[i])) + + for dut in self.device_under_tests_info: + models = [] + identifiers = [] + models.append(dut["model"]) + identifiers.append(dut["identifier"]) + add_allure_environment_property('DUT-Model/s', ", ".join(models)) + add_allure_environment_property('Serial-Number/s', ", ".join(identifiers)) + + def setup_firmware(self): + # Query AP Firmware + upgrade_status = [] + return upgrade_status + + def simulate_radar(self, idx=0): + """Simulate radar command for DFS""" + ret = self.dut_library_object.dfs(idx=idx) + return ret + + def get_dfs_logs(self, idx=0): + """Get the ap logs after Simulate radar command""" + logs = self.dut_library_object.dfs_logread(idx=idx) + return logs + + def reboot(self, idx=0): + """Reboot the AP""" + ret = self.dut_library_object.reboot(idx=idx) + return ret + + +if __name__ == '__main__': + basic_05 = { + "target": "dut_lib_template", + "controller": { + "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", + "username": "tip@ucentral.com", + "password": "OpenWifi%123" + }, + "device_under_tests": [{ + "model": "cig_wf188n", + "supported_bands": ["2G", "5G", "6G"], + "supported_modes": ["BRIDGE", "NAT", "VLAN"], + "wan_port": "1.1.eth2", + "ssid": { + "mode": "BRIDGE", + "2g-ssid": "OpenWifi", + "5g-ssid": "OpenWifi", + "6g-ssid": "OpenWifi", + "2g-password": "OpenWifi", + "5g-password": "OpenWifi", + "6g-password": "OpenWifi", + "2g-encryption": "WPA2", + "5g-encryption": "WPA2", + "6g-encryption": "WPA3", + "2g-bssid": "68:7d:b4:5f:5c:31", + "5g-bssid": "68:7d:b4:5f:5c:3c", + "6g-bssid": "68:7d:b4:5f:5c:38" + }, + "mode": "wifi6", + "identifier": "0000c1018812", + "method": "serial", + "host_ip": "10.28.3.103", + "host_username": "lanforge", + "host_password": "pumpkin77", + "host_ssh_port": 22, + "serial_tty": "/dev/ttyAP1", + "firmware_version": "next-latest" + }], + "traffic_generator": { + "name": "lanforge", + "testbed": "basic", + "scenario": "dhcp-bridge", + "details": { + "manager_ip": "10.28.3.28", + "http_port": 8080, + "ssh_port": 22, + "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, + "wan_ports": { + "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { + "lease-first": 10, + "lease-count": 10000, + "lease-time": "6h" + } + } + }, + "lan_ports": { + "1.1.eth1": {"addressing": "dynamic"} + }, + "uplink_nat_ports": { + "1.1.eth1": {"addressing": "static", + "subnet": "10.28.2.16/24", + "gateway_ip": "10.28.2.1", + "ip_mask": "255.255.255.0" + } + } + } + } + } + var = dut_lib_template(controller_data=basic_05["controller"], + device_under_tests_info=basic_05["device_under_tests"], + target=basic_05["target"]) + + # var.setup_objects() + setup_params_general_two = { + "mode": "BRIDGE", + "ssid_modes": { + "wpa3_personal": [ + {"ssid_name": "ssid_wpa3_p_2g_br", "appliedRadios": ["2G"], "security_key": "something"}, + {"ssid_name": "ssid_wpa3_p_5g_br", "appliedRadios": ["5G"], "security_key": "something"}, + {"ssid_name": "ssid_wpa3_p_6g_br", "appliedRadios": ["6G"], "security_key": "something"}], + "wpa3_personal_mixed": [ + {"ssid_name": "ssid_wpa3_p_m_2g_br", "appliedRadios": ["2G"], "security_key": "something"}, + {"ssid_name": "ssid_wpa3_p_m_5g_br", "appliedRadios": ["5G"], "security_key": "something"}], + "wpa_wpa2_personal_mixed": [ + {"ssid_name": "ssid_wpa_wpa2_p_m_2g_br", "appliedRadios": ["2G"], "security_key": "something"}, + {"ssid_name": "ssid_wpa_wpa2_p_m_5g_br", "appliedRadios": ["5G"], "security_key": "something"}] + }, + "rf": {}, + "radius": False + } + x = setup_params_general_two.copy() + target = [['6G', 'wpa3_personal']] + d = var.setup_configuration_data(configuration=setup_params_general_two, requested_combination=target) + # d = var.setup_basic_configuration(configuration=setup_params_general_two, requested_combination=target) + print(x) + # var.setup_firmware() + # var.teardown_objects() diff --git a/dut_lib_template/openwrt_ctl.py b/dut_lib_template/openwrt_ctl.py new file mode 100755 index 000000000..c793c0ba8 --- /dev/null +++ b/dut_lib_template/openwrt_ctl.py @@ -0,0 +1,388 @@ +#!/usr/bin/python3 +''' + +make sure pexpect is installed: +$ sudo yum install python3-pexpect + +You might need to install pexpect-serial using pip: +$ pip3 install serial +$ pip3 install pexpect-serial + +./openwrt_ctl.py -l stdout -u root -p TIP -s serial --tty ttyUSB0 + +# Set up reverse ssh tunnel +./openwrt_ctl.py --tty /dev/ttyAP1 --action ssh-tunnel \ + --value "ssh -y -y -f -N -T -M -R 9999:localhost:22 lanforge@10.28.3.100" \ + --value2 password-for-10.28.3.100 --log stdout --scheme serial --prompt root@Open +''' + +import sys + +if sys.version_info[0] != 3: + print("This script requires Python 3") + exit() + +try: + import importlib + re = importlib.import_module("re") + logging = importlib.import_module("logging") + time = importlib.import_module("time") + sleep = time.sleep + pexpect = importlib.import_module("pexpect") + serial = importlib.import_module("serial") + pexpect_serial = importlib.import_module("pexpect_serial") + SerialSpawn = pexpect_serial.SerialSpawn + pprint = importlib.import_module("pprint") + telnetlib = importlib.import_module("telnetlib") + argparse = importlib.import_module("argparse") +except ImportError as e: + logging.error(e) + sys.exit("Python Import Error: " + str(e)) + +default_host = "localhost" +default_ports = { + "serial": None, + "ssh": 22, + "telnet": 23 +} +NL = "\n" +CR = "\r\n" +Q = '"' +A = "'" +FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s' +prompt = "root@OpenWrt:" + + +def usage(): + print("$0 used connect to OpenWrt AP or similar Linux machine:") + print("-d|--dest IP address of the OpenWrt AP, for ssh/telnet scheme") + print("-o|--port IP port of the OpenWrt AP, for ssh/telnet scheme") + print("-t|--tty Serial port, if using serial scheme") + print("-u|--user login name") + print("-p|--pass password") + print("--prompt Prompt to look for when commands are done (default: root@OpenWrt)") + print("-s|--scheme (serial|telnet|ssh): connect via serial, ssh or telnet") + print("-l|--log file log messages here") + print("--action (logread | journalctl | lurk | sysupgrade | download | upload | reboot | cmd | ssh-tunnel") + print("--value (option to help complete the action") + print("--value2 (option to help complete the action, dest filename for download, passwd for ssh-tunnel") + print("-h|--help") + + +# see https://stackoverflow.com/a/13306095/11014343 +class FileAdapter(object): + def __init__(self, logger): + self.logger = logger + + def write(self, data): + # NOTE: data can be a partial line, multiple lines + data = data.strip() # ignore leading/trailing whitespace + if data: # non-blank + self.logger.info(data) + + def flush(self): + pass # leave it to logging to flush properly + + +def main(): + global prompt + + parser = argparse.ArgumentParser(description="OpenWrt AP Control Script") + parser.add_argument("-d", "--dest", type=str, help="address of the cisco controller_tests") + parser.add_argument("-o", "--port", type=int, help="control port on the controller_tests") + parser.add_argument("-u", "--user", type=str, help="credential login/username") + parser.add_argument("-p", "--passwd", type=str, help="credential password") + parser.add_argument("-P", "--prompt", type=str, help="Prompt to look for") + parser.add_argument("-s", "--scheme", type=str, choices=["serial", "ssh", "telnet"], + help="Connect via serial, ssh or telnet") + parser.add_argument("-t", "--tty", type=str, help="tty serial device") + parser.add_argument("-l", "--log", type=str, help="logfile for messages, stdout means output to console") + parser.add_argument("--action", type=str, help="perform action", + choices=["logread", "journalctl", "lurk", "sysupgrade", "sysupgrade-n", "download", "upload", + "reboot", "cmd", "ssh-tunnel"]) + parser.add_argument("--value", type=str, help="set value") + parser.add_argument("--value2", type=str, help="set value2") + tty = None + + args = None + try: + args = parser.parse_args() + host = args.dest + scheme = args.scheme + port = args.port + # port = (default_ports[scheme], args.port)[args.port != None] + user = args.user + passwd = args.passwd + logfile = args.log + tty = args.tty; + if (args.prompt != None): + prompt = args.prompt + filehandler = None + except Exception as e: + logging.exception(e); + usage() + exit(2); + + console_handler = logging.StreamHandler() + formatter = logging.Formatter(FORMAT) + logg = logging.getLogger(__name__) + logg.setLevel(logging.DEBUG) + file_handler = None + if (logfile is not None): + if (logfile != "stdout"): + file_handler = logging.FileHandler(logfile, "w") + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(formatter) + logg.addHandler(file_handler) + logging.basicConfig(format=FORMAT, handlers=[file_handler]) + else: + # stdout logging + logging.basicConfig(format=FORMAT, handlers=[console_handler]) + + CCPROMPT = prompt + + ser = None + egg = None # think "eggpect" + try: + if (scheme == "serial"): + # eggspect = pexpect.fdpexpect.fdspan(telcon, logfile=sys.stdout.buffer) + + ser = serial.Serial(tty, 115200, timeout=5) + + egg = SerialSpawn(ser); + egg.logfile = FileAdapter(logg) + egg.sendline(NL) + has_reset = False + if args.value == "reset-from-console": + has_reset = True + try: + logg.info("prompt: %s user: %s passwd: %s" % (prompt, user, passwd)) + while True: + i = egg.expect([prompt, "Please press Enter to activate", "login:", "Password:", "IPQ6018#"], + timeout=3) + logg.info("expect-0: %i" % (i)) + if (i == 0): + logg.info("Found prompt, login complete.") + break + if (i == 1): + logg.info("Sending newline") + egg.setdline(NL) + if (i == 2): + logg.info("Sending username: %s" % (user)) + egg.sendline(user) + if (i == 3): + logg.info("Sending password: %s" % (passwd)) + egg.sendline(passwd) + if (i == 4): # in bootloader + if has_reset: + logg.info("In boot loader, will reset and sleep 30 seconds") + egg.sendline("reset") + time.sleep(30) + egg.sendline(NL) + sys.exit(1) + else: + print("BOOTLOADER-CONSOLE-IPQ6018#") + break + else: + logg.info("Found prompt, login complete.") + break + except Exception as e: + # maybe something like 'logread -f' is running? + # send ctrl-c + egg.send(chr(3)) + + elif (scheme == "ssh"): + # Not implemented/tested currently. --Ben + if (port is None): + port = 22 + cmd = "ssh -p%d %s@%s" % (port, user, host) + logg.info("Spawn: " + cmd + NL) + egg = pexpect.spawn(cmd) + # egg.logfile_read = sys.stdout.buffer + egg.logfile = FileAdapter(logg) + i = egg.expect(["password:", "continue connecting (yes/no)?"], timeout=3) + time.sleep(0.1) + if i == 1: + egg.sendline('yes') + egg.expect('password:') + sleep(0.1) + egg.sendline(passwd) + + elif (scheme == "telnet"): + # Not implemented/tested currently. --Ben + if (port is None): + port = 23 + cmd = "telnet %s %d" % (host, port) + logg.info("Spawn: " + cmd + NL) + egg = pexpect.spawn(cmd) + egg.logfile = FileAdapter(logg) + time.sleep(0.1) + egg.sendline(' ') + egg.expect('User\:') + egg.sendline(user) + egg.expect('Password\:') + egg.sendline(passwd) + egg.sendline('config paging disable') + else: + usage() + exit(1) + except Exception as e: + logging.exception(e); + + command = None + + CLOSEDBYREMOTE = "closed by remote host." + CLOSEDCX = "Connection to .* closed." + + try: + egg.expect(CCPROMPT) + except Exception as e: + egg.sendline(NL) + + TO = 10 + wait_forever = False + + # Clean pending output + egg.sendline("echo __hello__") + egg.expect("__hello__") + egg.expect(CCPROMPT) + + logg.info("Action[%s] Value[%s] Value2[%s]" % (args.action, args.value, args.value2)) + + if (args.action == "reboot"): + command = "reboot" + TO = 60 + + if (args.action == "cmd"): + if (args.value is None): + raise Exception("cmd requires value to be set.") + command = "%s" % (args.value) + + if (args.action == "logread"): + command = "logread -f" + TO = 1 + wait_forever = True + + if (args.action == "journalctl"): + command = "journalctl -f" + TO = 1 + wait_forever = True + + if (args.action == "lurk"): + command = "date" + TO = 1 + wait_forever = True + + if (args.action == "ssh-tunnel"): + command = "%s" % (args.value) + passwd = "%s" % (args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline(passwd) + egg.expect(CCPROMPT, timeout=20) + return + + if ((args.action == "sysupgrade") or (args.action == "sysupgrade-n")): + command = "scp %s /tmp/new_img.bin" % (args.value) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline("lanforge") + egg.expect(CCPROMPT, timeout=20) + if (args.action == "sysupgrade-n"): + egg.sendline("sysupgrade -n /tmp/new_img.bin") + else: + egg.sendline("sysupgrade /tmp/new_img.bin") + egg.expect("link becomes ready", timeout=100) + return + + if (args.action == "download"): + command = "scp %s /tmp/%s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("Network unreachable, wait 15 seconds and try again.") + time.sleep(15) + command = "scp %s /tmp/%s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("ERROR: Could not connect to LANforge to get download file") + exit(2) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline("lanforge") + egg.expect(CCPROMPT, timeout=20) + return + + if (args.action == "upload"): + command = "scp %s %s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("Network unreachable, wait 15 seconds and try again.") + time.sleep(15) + command = "scp /tmp/%s %s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("ERROR: Could not connect to LANforge to put upload file") + exit(2) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline("lanforge") + egg.expect(CCPROMPT, timeout=20) + return + + if (command is None): + logg.info("No command specified, going to log out.") + else: + logg.info("Command[%s]" % command) + egg.sendline(command); + while True: + try: + i = egg.expect([CCPROMPT, "kmodloader: done loading kernel", "\n"], timeout=TO) + print(egg.before.decode('utf-8', 'ignore')) + if i == 1: + egg.sendline(' ') + egg.expect(CCPROMPT, timeout=20) + print(egg.before.decode('utf-8', 'ignore')) + if i == 2: # new line of text, just print and continue + continue + # wait_forever = False + if not wait_forever: + break + + except Exception as e: + # Some commands take a long time (logread -f) + if not wait_forever: + logging.exception(e) + break + + +# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- +if __name__ == '__main__': + main() + +#### +#### +#### diff --git a/dut_lib_template/schemacheck.py b/dut_lib_template/schemacheck.py new file mode 100644 index 000000000..62307392a --- /dev/null +++ b/dut_lib_template/schemacheck.py @@ -0,0 +1,603 @@ +""" +This class consists of functions which checks the schema of the configuration for lab +Whether the schema contains all the necessary key-value pairs or not +If not it will inform the required key-value pair +End of the program called every function that checks all the schema data +""" +import logging +import re + + +class SchemaCheck: + """ + Global variables are declared for easy modification and checks of some data + """ + global target_var, dut_keys, tg_keys, testbed_name + target_var = "dut_lib_template" + testbed_name = 'basic' + + def __init__(self, configuration=None): + logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) + self.configuration = configuration + self.testbed_list = None + self.len_testbed_list = None + self.key_check_arr = ['target', 'controller', 'device_under_tests', 'traffic_generator'] + + def set_data(self): + """ + This Function sets the value of how many testbeds are there in the schema input file and stores the number of it + """ + testbed_list = [] + for key in self.configuration: + print(key) + testbed_list.append(key) + print(testbed_list) + self.testbed_list = testbed_list + self.len_testbed_list = len(testbed_list) + + def key_check(self): + """ + This fun checks the keys of the testbeds present in schema such as target, controller, DUT, traffic generator + """ + arr = [] + for a in range(self.len_testbed_list): + for key in self.configuration[self.testbed_list[a]]: + print(key) + arr.append(key) + print(arr) + if arr == self.key_check_arr: + arr.clear() + print("All keys are present in the schema for Testbed") + logging.info("All keys are present in the schema for Testbed") + else: + arr.clear() + logging.error("Not all the keys required present in schema for Testbed") + + def target_check(self): + """ + This function checks the global target we have declared is matching in the schema or not + """ + global target_var + for a in range(self.len_testbed_list): + if self.configuration[self.testbed_list[a]]['target'] == target_var: + logging.info("Target is matching") + else: + logging.error("Target variable is not matching") + + def controller_check(self): + """ + This func checks the keys of Controller such as Url, Username and password + """ + arr = ['url', 'username', 'password'] + lis = [] + for a in range(self.len_testbed_list): + for key in self.configuration[self.testbed_list[a]]['controller']: + lis.append(key) + print(self.testbed_list[a], "->", lis) + if lis == arr: + lis.clear() + print("All keys are present in the Controller data of schema") + logging.info("All keys are present in the Controller data of schema") + else: + lis.clear() + logging.error("Not all the Controller keys required present in schema") + + def dut_keys_check(self): + """ + This func checks DUT keys if every key is present in the schema or not + """ + global dut_keys + arr, arr2, arr3 = [], [], [] + dut_keys = ['model', 'supported_bands', 'supported_modes', 'wan_port', 'lan_port', 'ssid', 'mode', 'identifier', + 'method', 'host_ip', 'host_username', 'host_password', 'host_ssh_port', 'serial_tty', + 'firmware_version'] + for a in range(self.len_testbed_list): + for b in range(len(self.configuration[self.testbed_list[a]]['device_under_tests'])): + for key in self.configuration[self.testbed_list[a]]['device_under_tests'][b]: + arr.append(key) + arr2 = list(set(dut_keys) - set(arr)) + arr3.append(arr2) + # print(arr3) + arr.clear() + for a in range(len(arr3)): + if len(arr3[a]) == 0: + logging.info("All keys of DUT are present") + self.dut_values_check() + elif len(arr3[a]) == 1: + if arr3[a][0] == 'ssid': + logging.warning("Ssid key is not present" + str(self.testbed_list[a]) + "->" + str(arr3[a])) + self.dut_values_check() + else: + logging.error( + "Required keys of DUT are not present, Please include those" + str(self.testbed_list[a]) + + "->" + str(arr3[a])) + else: + logging.error( + "Not all Keys of DUT required are present" + str(self.testbed_list[a]) + "->" + str(arr3[a])) + + def dut_values_check(self): + """ + This func checks whether all the values of DUT Keys are valid or not. Use it after dut_keys_check() + """ + global dut_keys + print("DUT Key->value Check") + for a in range(self.len_testbed_list): + for b in range(len(self.configuration[self.testbed_list[a]]['device_under_tests'])): + for key, value in self.configuration[self.testbed_list[a]]['device_under_tests'][b].items(): + # print(key, value) + # print(type(value)) + if key == 'model': + if type(value) == str: + logging.info("Model key->values are present and eligible") + else: + logging.error( + "Model key->values which are present are not eligible" + str(key) + "->" + str(value)) + elif key == 'supported_bands': + if type(value) == list: + logging.info("Supported bands key->values are present and eligible") + else: + logging.error( + "Supported bands key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'supported_modes': + if type(value) == list: + logging.info("Supported modes key->values are present and eligible") + else: + logging.error( + "Supported modes key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'wan_port': + if type(value) == str: + x = re.search("\d.\d.", value) + if x is not None: + logging.info("Wan port key->values are present and eligible") + else: + logging.error( + "Wan port key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'lan_port': + if value is None or type(value) == str: + if type(value) == str: + x = re.search("\d.\d.", value) + if x is not None: + logging.info("Lan port key->values are present and eligible") + else: + logging.error("Lan port key->values are present and not eligible") + else: + logging.info("Lan port is null or None") + else: + logging.error("Lan port key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'ssid': + if type(value) == dict: + self.ssid_data_check() + logging.info("Ssid key->values are present and eligible") + else: + logging.error("Ssid key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'mode': + if type(value) == str: + logging.info("Mode key->values are present and eligible") + else: + logging.error("Mode key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'identifier': + if type(value) == str and type(value) is not None: + logging.info("Identifier key->values are present and eligible") + else: + logging.error( + "Identifier key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'method': + if type(value) == str and (value == 'serial' or value == 'ssh' or value == 'telnet'): + logging.info("Method key->values are present and eligible") + else: + logging.error("Method key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'host_ip': + if type(value) == str: + logging.info("Host IP key->values are present and eligible") + else: + logging.error("Host IP key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'host_username': + if type(value) == str: + logging.info("Host Username key->values are present and eligible") + else: + logging.error( + "Host Username key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'host_password': + if type(value) == str: + logging.info("Host Password key->values are present and eligible") + else: + logging.error( + "Host Password key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'host_ssh_port': + if type(value) == int: + logging.info("Host ssh Port key->values are present and eligible") + else: + logging.error( + "Host ssh Port key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'serial_tty': + if type(value) == str: + logging.info("Serial tty key->values are present and eligible") + else: + logging.error( + "Serial tty key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'firmware_version': + if type(value) == str: + logging.info("Firmware version key->values are present and eligible") + else: + logging.error( + "Firmware version key->values which are present are not eligible" + str(key) + "->" + + str(value)) + + def traffic_generator_keys_check(self): + """ + THis Func checks the Traffic generator keys are present in the schema or not. It should be called after + dut_values_check() + """ + global tg_keys + tg_keys = ['name', 'testbed', 'scenario', 'details'] + lis = [] + for count in range(self.len_testbed_list): + for key in self.configuration[self.testbed_list[count]]['traffic_generator']: + lis.append(key) + print(self.testbed_list[count], "->", lis) + if lis == tg_keys: + lis.clear() + print("All keys are present in the Traffic generator data of schema") + logging.info("All keys are present in the Traffic generator data of schema") + self.traffic_generator_values_check(count) + else: + lis.clear() + logging.error("Not all the Traffic generator keys required are present in schema") + + def ssid_data_check(self): + """ + This func has to check the Ssid data check in DUT values if SSid key is present in it + """ + pass + + def traffic_generator_values_check(self, count): + """ + This func validates the traffic generator values and is called from traffic_generator_keys_check() after + keys are checked + """ + global testbed_name + logging.info("Traffic generator Key->value check") + for key, value in self.configuration[self.testbed_list[count]]['traffic_generator'].items(): + if key == 'name': + if type(value) == str: + logging.info("Name key->value are present and Eligible") + else: + logging.error("Name key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'testbed': + if type(value) == str and value == testbed_name: + logging.info("Testbed key->value are present and Eligible") + else: + logging.error("Testbed key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'scenario': + if type(value) == str and (value == 'dhcp-bridge' or value == 'dhcp-external'): + logging.info("Scenario key->value are present and Eligible") + else: + logging.error("Scenario key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'details': + if type(value) == dict: + self.tg_details_data_keys_check(count) + logging.info("Details key->value are present and Eligible") + else: + logging.error("Details key->values which are present are not eligible" + str(key) + "->" + + str(value)) + + def tg_details_data_keys_check(self, count): + """ + This Func checks the Details data keys of Traffic generator and is called in traffic_generator_values_check() + after details key is validated there for further validation of details dict + """ + global tg_details_keys + tg_details_keys = ['manager_ip', 'http_port', 'ssh_port', 'setup', 'wan_ports', 'lan_ports', 'uplink_nat_ports'] + lis = [] + for key in self.configuration[self.testbed_list[count]]['traffic_generator']['details']: + lis.append(key) + print(self.testbed_list[count], "->", lis) + if lis == tg_details_keys: + lis.clear() + print("All keys are present in the Traffic generator Details data of schema") + logging.info("All keys are present in the Traffic generator Details data of schema") + self.tg_details_values_check(count) + else: + lis.clear() + logging.error("Not all the Traffic generator Details keys required are present in schema") + + def tg_details_values_check(self, count): + """ + This Func validates the Details data Values of Traffic generator and is called in tg_details_data_keys_check() + after details keys are validated + """ + logging.info("Traffic generator Key->value check") + for key, value in self.configuration[self.testbed_list[count]]['traffic_generator']['details'].items(): + if key == 'manager_ip': + if type(value) == str: + logging.info("Manager ip key->value are present and Eligible") + else: + logging.error("Manager ip key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'http_port': + if type(value) == int: + logging.info("Http port key->value are present and Eligible") + else: + logging.error("Http port key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'ssh_port': + if type(value) == int: + logging.info("Ssh port key->value are present and Eligible") + else: + logging.error("Ssh port key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'setup': + if type(value) == dict: + key2 = self.configuration[self.testbed_list[count]]['traffic_generator']['details']['setup'] + value2 = self.configuration[self.testbed_list[count]]['traffic_generator']['details']['setup'][ + 'method'] + if key2 == 'method': + if type(value2) == str: + if value2 == 'build': + logging.info("Method - Build key->value are present and Eligible") + elif value2 == 'load': + if key2['DB'] == str: + logging.info("Method - Load key->value are present and Eligible") + else: + logging.error("Method key->values which are present are not eligible" + str(key) + + "->" + + str(value)) + logging.info("Setup key->value are present and Eligible") + else: + logging.error("Setup key->values which are present are not eligible" + str(key) + "->" + + str(value)) + elif key == 'wan_ports': + if type(value) == dict: + self.tg_ports_data_keys_check(key, count) + else: + logging.error("Wan Ports data is not eligible") + elif key == 'lan_ports': + if type(value) == dict: + # self.tg_ports_data_keys_check(key, count) + pass + else: + logging.error("Lan Ports data is not eligible") + elif key == 'uplink_nat_ports': + if type(value) == dict: + self.tg_ports_data_keys_check(key, count) + else: + logging.error("Uplink nat Ports data is not eligible") + + def tg_ports_data_keys_check(self, key, count): + """ + This Func validates the Ports data Values of Traffic generator and is called in tg_details_values_check() + after details values are validated. It will check for patterns like 1.1.eth2 + """ + ports = self.configuration[self.testbed_list[count]]['traffic_generator']['details'][key] + print("Data of ---------------", key) + print(ports) + for key1, value1 in ports.items(): + if type(key1) == str and type(value1) == dict: + x = re.search("\d.\d.", key1) + if x is not None: + logging.info("Key of" + str(key) + "->" + str(key1) + "is eligible") + self.tg_ports_addressing_check(value1) + else: + logging.error("Key of" + str(key) + "->" + str(key1) + "is not eligible") + else: + logging.error("Key of" + str(key) + "->" + str(key1) + "is not eligible and is not a string") + + @staticmethod + def tg_ports_addressing_check(value): + """ + This function checks the addressing data if values present has ip address pattern or not. It is called in + tg_ports_data_keys_check() + """ + print("Value--------------") + print(value) + if value['addressing'] == 'static': + for key, value2 in value.items(): + if key == 'ip': + value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value2) + if value2 is not None: + logging.info("Ip is present and eligible in ports") + elif key == 'gateway_ip': + value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/\d*$", value2) + if value2 is not None: + logging.info("Gateway Ip is present and eligible in ports") + elif key == 'ip_mask': + value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value2) + if value2 is not None: + logging.info("Ip Mask is present and eligible in ports") + elif key == 'dns_servers' and type(value2) == str: + logging.info("DNS server is present and eligible in ports") + elif key == 'addressing': + logging.info("Skipping Addressing ,As it is already verified") + else: + logging.error("Please look into the Ports data") + elif value['addressing'] == 'dynamic': + pass + elif value['addressing'] == 'dhcp-server': + for key, value2 in value.items(): + if key == 'ip': + value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value2) + if value2 is not None: + logging.info("Ip is present and eligible in ports") + elif key == 'gateway_ip': + value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/\d*$", value2) + if value2 is not None: + logging.info("Gateway Ip is present and eligible in ports") + elif key == 'ip_mask': + value2 = re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value2) + if value2 is not None: + logging.info("Ip Mask is present and eligible in ports") + elif key == 'dns_servers' and type(value2) == str: + logging.info("DNS server is present and eligible in ports") + elif key == 'addressing': + logging.info("Skipping Addressing ,As it is already verified") + else: + logging.error("Please look into the Ports data") + + +if __name__ == '__main__': + var = { + "CONFIGURATION": { + 'basic-06': { + 'target': 'dut_lib_template', + 'controller': { + 'url': 'https://sec-qa01.cicd.lab.wlan.tip.build:16001', + 'username': 'tip@ucentral.com', + 'password': 'OpenWifi%123' + }, + 'device_under_tests': [{ + 'model': 'edgecore_eap102', # Will be string + 'supported_bands': ['2G', '5G'], # ['2G', '5G', '6G'] + 'supported_modes': ['BRIDGE', 'NAT', 'VLAN'], # Will remain same + 'wan_port': '1.1.eth2', # Has to be + 'lan_port': '1.1.eth2', # Has to be null or none + 'ssid': { # Has to be seperate func + '2g-ssid': 'OpenWifi', + '5g-ssid': 'OpenWifi', + '6g-ssid': 'OpenWifi', + '2g-password': 'OpenWifi', + '5g-password': 'OpenWifi', + '6g-password': 'OpenWifi', + '2g-encryption': 'WPA2', + '5g-encryption': 'WPA2', + '6g-encryption': 'WPA3', + '2g-bssid': '68:7d:b4:5f:5c:31', + '5g-bssid': '68:7d:b4:5f:5c:3c', + '6g-bssid': '68:7d:b4:5f:5c:38' + }, + 'mode': 'wifi6', # ['wifi5', 'wifi6', 'wifi6e'] + 'identifier': '903cb39d6918', # Has to be not Null + 'method': 'serial', # Has to be serial, ssh, telnet + 'host_ip': 'localhost', # Ip or localhost + 'host_username': 'lanforge', # Str + 'host_password': 'pumpkin77', # Str + 'host_ssh_port': 8852, # Int + 'serial_tty': '/dev/ttyAP2', # Str + 'firmware_version': 'next-latest' # Str + }], + 'traffic_generator': { + 'name': 'lanforge', # STR + 'testbed': 'basic', # [basic, ] + 'scenario': 'dhcp-bridge', # dhcp-bridge, dhcp-external + 'details': { + 'manager_ip': 'localhost', # Str or ip + 'http_port': 8850, # int + 'ssh_port': 8851, # int + 'setup': {'method': 'build', 'DB': 'Test_Scenario_Automation'}, + # Method-> build/load, if load-> DB + 'wan_ports': { # addressing(dhcp-server, static, dynamic) Subnet-> ip/ cannot be eth2(1.1.eth2) + '1.1.eth2': {'addressing': 'dhcp-server', 'subnet': '172.16.0.1/16', 'dhcp': { # DICT + 'lease-first': 10, # int + 'lease-count': 10000, # int + 'lease-time': '6h' # str + } + } + }, + 'lan_ports': { + + }, + 'uplink_nat_ports': { + '1.1.eth3': { + 'addressing': 'static', # If static -> need ip, g_ip, ip_mask, dns + 'ip': '10.28.2.17', + 'gateway_ip': '10.28.2.1/24', + 'ip_mask': '255.255.255.0', + 'dns_servers': 'BLANK' + } + } + } + } + }, + "advance-03": { + "target": "dut_lib_template", + "controller": { + "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", + "username": "tip@ucentral.com", + "password": "OpenWifi%123" + }, + "device_under_tests": [{ + "model": "cig_wf196", + "supported_bands": ["2G", "5G", "6G"], + "supported_modes": ["BRIDGE", "NAT", "VLAN"], + "wan_port": "1.3.eth2", + "lan_port": None, + "ssid": { + "2g-ssid": "OpenWifi", + "5g-ssid": "OpenWifi", + "6g-ssid": "OpenWifi", + "2g-password": "OpenWifi", + "5g-password": "OpenWifi", + "6g-password": "OpenWifi", + "2g-encryption": "WPA2", + "5g-encryption": "WPA2", + "6g-encryption": "WPA3", + "2g-bssid": "68:7d:b4:5f:5c:31", + "5g-bssid": "68:7d:b4:5f:5c:3c", + "6g-bssid": "68:7d:b4:5f:5c:38" + }, + "mode": "wifi6e", + "identifier": "824f816011e4", + "method": "serial", + "host_ip": "localhost", + "host_username": "lanforge", + "host_password": "pumpkin77", + "host_ssh_port": 8902, + "serial_tty": "/dev/ttyAP0", + "firmware_version": "next-latest" + }], + "traffic_generator": { + "name": "lanforge", + "testbed": "basic", + "scenario": "dhcp-bridge", + "details": { + "manager_ip": "10.28.3.117", + "http_port": 8900, + "ssh_port": 8901, + "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, + "wan_ports": { + "1.3.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { + "lease-first": 10, + "lease-count": 10000, + "lease-time": "6h" + } + } + }, + "lan_ports": { + + }, + "uplink_nat_ports": { + "1.3.eth3": { + "addressing": "static", + "ip": "10.28.2.39", + "gateway_ip": "10.28.2.1/24", + "ip_mask": "255.255.255.0", + "dns_servers": "BLANK" + } + } + } + } + } + } + } + obj = SchemaCheck(var["CONFIGURATION"]) + obj.set_data() + obj.key_check() + obj.target_check() + obj.controller_check() + obj.dut_values_check() + obj.traffic_generator_keys_check() diff --git a/dut_lib_template/setup.py b/dut_lib_template/setup.py new file mode 100644 index 000000000..a121d9d23 --- /dev/null +++ b/dut_lib_template/setup.py @@ -0,0 +1,21 @@ +import setuptools + +with open("README.md", "r") as fh: + long_description = fh.read() + +setuptools.setup( + name='dut_lib_template', + version='0.1', + scripts=['dut_lib_template.py', 'controller.py', 'ap_lib.py', 'SetupLibrary.py', 'openwrt_ctl.py'], + author="Shivam Thakur", + author_email="shivam.thakur@candelatech.com", + description="TIP OpenWIFI 2.X Library", + long_description=long_description, + long_description_content_type="text/markdown", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + ], +) diff --git a/libs/tip_2x/ap_lib.py b/libs/tip_2x/ap_lib.py index 2c6f53a36..40774cebf 100644 --- a/libs/tip_2x/ap_lib.py +++ b/libs/tip_2x/ap_lib.py @@ -430,7 +430,7 @@ class APLIBS: if __name__ == '__main__': basic_05 = { - "target": "tip_2x", + "target": "dut_lib_template", "controller": { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", diff --git a/libs/tip_2x/schemacheck.py b/libs/tip_2x/schemacheck.py index e4d537500..62307392a 100644 --- a/libs/tip_2x/schemacheck.py +++ b/libs/tip_2x/schemacheck.py @@ -13,7 +13,7 @@ class SchemaCheck: Global variables are declared for easy modification and checks of some data """ global target_var, dut_keys, tg_keys, testbed_name - target_var = "tip_2x" + target_var = "dut_lib_template" testbed_name = 'basic' def __init__(self, configuration=None): @@ -454,7 +454,7 @@ if __name__ == '__main__': var = { "CONFIGURATION": { 'basic-06': { - 'target': 'tip_2x', + 'target': 'dut_lib_template', 'controller': { 'url': 'https://sec-qa01.cicd.lab.wlan.tip.build:16001', 'username': 'tip@ucentral.com', @@ -524,7 +524,7 @@ if __name__ == '__main__': } }, "advance-03": { - "target": "tip_2x", + "target": "dut_lib_template", "controller": { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", diff --git a/libs/tip_2x/setup.py b/libs/tip_2x/setup.py index b308b7711..a121d9d23 100644 --- a/libs/tip_2x/setup.py +++ b/libs/tip_2x/setup.py @@ -4,9 +4,9 @@ with open("README.md", "r") as fh: long_description = fh.read() setuptools.setup( - name='tip_2x', + name='dut_lib_template', version='0.1', - scripts=['tip_2x.py', 'controller.py', 'ap_lib.py', 'SetupLibrary.py', 'openwrt_ctl.py'], + scripts=['dut_lib_template.py', 'controller.py', 'ap_lib.py', 'SetupLibrary.py', 'openwrt_ctl.py'], author="Shivam Thakur", author_email="shivam.thakur@candelatech.com", description="TIP OpenWIFI 2.X Library", diff --git a/libs/tip_2x/tip_2x.py b/libs/tip_2x/tip_2x.py index ee2b55cad..c1ba3ad6d 100644 --- a/libs/tip_2x/tip_2x.py +++ b/libs/tip_2x/tip_2x.py @@ -38,13 +38,13 @@ class tip_2x: controller_data = {} device_under_tests_info = [] """ - OpenWifi 2.x Specific Variables that will be only scoped in tip_2x Library + OpenWifi 2.x Specific Variables that will be only scoped in dut_lib_template Library """ ow_sec_url = "" ow_sec_login_username = "" ow_sec_login_password = "" - target = "tip_2x" + target = "dut_lib_template" controller_library_object = object() prov_library_object = object() firmware_library_object = object() @@ -82,8 +82,8 @@ class tip_2x: device_under_tests_info=[], logging_level=logging.INFO): logging.basicConfig(format='%(asctime)s - %(message)s', level=logging_level) if target != self.target: - logging.error("Target version is : " + target + " Expected target is tip_2x") - pytest.exit("Target should be 'tip_2x', Current Target is : " + target) + logging.error("Target version is : " + target + " Expected target is dut_lib_template") + pytest.exit("Target should be 'dut_lib_template', Current Target is : " + target) if controller_data is None: controller_data = {} self.controller_data = controller_data @@ -98,8 +98,8 @@ class tip_2x: def setup_metadata(self): logging.info( - "Setting up the Controller metadata for tip_2x Library: " + str(json.dumps(self.controller_data, indent=2))) - logging.info("Setting up the DUT metadata for tip_2x Library: " + str( + "Setting up the Controller metadata for dut_lib_template Library: " + str(json.dumps(self.controller_data, indent=2))) + logging.info("Setting up the DUT metadata for dut_lib_template Library: " + str( json.dumps(self.device_under_tests_info, indent=2))) logging.info("Number of DUT's in lab_info.json: " + str(len(self.device_under_tests_info))) self.ow_sec_url = self.controller_data["url"] @@ -900,7 +900,7 @@ class tip_2x: if __name__ == '__main__': basic_05 = { - "target": "tip_2x", + "target": "dut_lib_template", "controller": { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", diff --git a/setup_env.bash b/setup_env.bash index 78bfa7d1a..99396b6a3 100755 --- a/setup_env.bash +++ b/setup_env.bash @@ -2,7 +2,7 @@ # Setup python environment variable and pip environment variable like # export PYTHON=/usr/bin/python3 # export PIP=/usr/bin/pip3 -#sh setup_env.bash -t tip_2x -d all -n "Shivam Thakur" -o TIP -e shivam.thakur@candelatech.com -i "TIP OpenWIFI 2.X Library" +#sh setup_env.bash -t dut_lib_template -d all -n "Shivam Thakur" -o TIP -e shivam.thakur@candelatech.com -i "TIP OpenWIFI 2.X Library" set -e helpFunction() @@ -85,7 +85,7 @@ then touch tests/imports.py if [ $target == "tip_2x" ] then - cd libs/tip_2x + cd libs/dut_lib_template $PYTHON setup.py bdist_wheel $PIP install dist/*.whl --force-reinstall cd ../../ diff --git a/setup_tunnel.bash b/setup_tunnel.bash index 9dca51aa9..1d686700b 100644 --- a/setup_tunnel.bash +++ b/setup_tunnel.bash @@ -2,7 +2,7 @@ # Setup python environment variable and pip environment variable like # export PYTHON=/usr/bin/python3 # export PIP=/usr/bin/pip3 -#sh setup_env.bash -t tip_2x -d all -n "Shivam Thakur" -o TIP -e shivam.thakur@candelatech.com -i "TIP OpenWIFI 2.X Library" +#sh setup_env.bash -t dut_lib_template -d all -n "Shivam Thakur" -o TIP -e shivam.thakur@candelatech.com -i "TIP OpenWIFI 2.X Library" helpFunction() { echo "Setup SSH Tunnel for TIP Labs" @@ -76,7 +76,7 @@ Create_lab_info_json() "CONFIGURATION" : { "basic-01" : { - "target": "tip_2x", + "target": "dut_lib_template", "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", @@ -145,7 +145,7 @@ Create_lab_info_json() } }, "basic-02" : { - "target": "tip_2x", + "target": "dut_lib_template", "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", @@ -214,7 +214,7 @@ Create_lab_info_json() } }, "basic-03" : { - "target": "tip_2x", + "target": "dut_lib_template", "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", @@ -285,7 +285,7 @@ Create_lab_info_json() } }, "basic-03a" : { - "target": "tip_2x", + "target": "dut_lib_template", "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", @@ -356,7 +356,7 @@ Create_lab_info_json() } }, "basic-04" : { - "target": "tip_2x", + "target": "dut_lib_template", "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", @@ -427,7 +427,7 @@ Create_lab_info_json() } }, "basic-04a" : { - "target": "tip_2x", + "target": "dut_lib_template", "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", @@ -498,7 +498,7 @@ Create_lab_info_json() } }, "basic-05" : { - "target": "tip_2x", + "target": "dut_lib_template", "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", @@ -567,7 +567,7 @@ Create_lab_info_json() } }, "basic-06" : { - "target": "tip_2x", + "target": "dut_lib_template", "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", @@ -636,7 +636,7 @@ Create_lab_info_json() } }, "advance-03" : { - "target": "tip_2x", + "target": "dut_lib_template", "controller" : { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "username": "tip@ucentral.com", diff --git a/tests/conftest.py b/tests/conftest.py index 1eaabd803..dd67fb554 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -168,7 +168,7 @@ def get_target_object(request, get_testbed_details, add_allure_environment_prope t_object = target(controller_data=get_testbed_details["controller"], target=get_testbed_details["target"], device_under_tests_info=get_testbed_details["device_under_tests"]) if not request.config.getoption("--skip-env"): - if get_testbed_details["target"] == "tip_2x": + if get_testbed_details["target"] == "dut_lib_template": t_object.setup_environment_properties(add_allure_environment_property= add_allure_environment_property)