Files
wlan-testing/libs/tip_2x/ap_lib.py
bhargavi-ct 5f4652d76b Wifi 15117 (#1072)
* Modify ASB Test Cases for Cybertan AP Models

Signed-off-by: bhargavi-ct <bhargavimamidipaka@candelatech.com>

* Update script to handle output after remove restrictions

Signed-off-by: bhargavi-ct <bhargavimamidipaka@candelatech.com>

---------

Signed-off-by: bhargavi-ct <bhargavimamidipaka@candelatech.com>
Co-authored-by: bhargavi-ct <bhargavimamidipaka@candelatech.com>
2025-10-01 13:15:34 +05:30

881 lines
48 KiB
Python

import importlib
import json
import logging
import random
import string
import time
import datetime
import allure
import paramiko
import pytest
setup_lib = importlib.import_module("SetupLibrary")
SetupLibrary = setup_lib.SetupLibrary
class APLIBS:
setup_library_objects = list()
device_under_tests_data = None
def __init__(self, dut_data=None):
if dut_data is None:
logging.error("Device Under Test Data is Not Specified, Please provide valid DUT Data")
pytest.exit("Device Under Test Data is Not Specified, Please provide valid DUT Data")
if type(dut_data) is not list:
logging.error("Device Under Test Data is Not in list format, Please provide valid DUT Data in list format")
pytest.exit("Device Under Test Data is Not in list format, Please provide valid DUT Data in list format")
self.device_under_tests_data = dut_data
for dut in self.device_under_tests_data:
self.setup_library_objects.append(SetupLibrary(remote_ip=dut["host_ip"],
remote_ssh_port=dut["host_ssh_port"],
remote_ssh_username=dut["host_username"],
remote_ssh_password=dut["host_password"],
pwd=""))
def ssh_cli_connect(self, ip="",
port=22,
username="",
password="",
timeout=10,
allow_agent=False,
banner_timeout=200):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=username, password=password,
port=port, timeout=timeout, allow_agent=allow_agent, banner_timeout=banner_timeout)
return client
def check_serial_connection(self, idx=0, print_log=True, attach_allure=True):
status = False
if self.device_under_tests_data[idx]["method"] == "serial":
status = self.setup_library_objects[idx].check_serial_connection(
tty=self.device_under_tests_data[idx]["serial_tty"])
if not status:
logging.error("Serial port not available. Exiting the Test")
pytest.exit("Serial port not available. Please check your serial port connection")
def setup_serial_environment(self, idx=0):
status = []
for dut in self.device_under_tests_data:
if dut["method"] == "serial":
status.append(self.setup_library_objects[idx].setup_serial_environment())
if False in status:
logging.error("Serial port not available. Exiting the Test")
pytest.exit("Serial port not available. Please check your serial port connection")
self.exit_from_uboot(idx=idx)
self.reset_to_default(idx=idx)
def get_dut_logs(self, idx=0, print_log=True, attach_allure=True):
output = self.run_generic_command(cmd="logread", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
return output
def check_connectivity(self, idx=0, print_log=True, attach_allure=True):
maverick_status = self.run_generic_command(cmd="/etc/init.d/maverick status", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
if maverick_status.__contains__("running"):
logging.error("Maverick is running!!!")
# Maverick check is happening
# TODO: add the steps to diagnose the maverick reason, check certificates, wan ip address,
# check the partition for certificates and check the /certificates dir. Also check if any of the cert is
# missing, if anything is missing, then report that. if everything looks good, please check md5sum of certs
# also, try to do a reboot in case everything looks good and post reboot, check the maverick status.
# Jitendra
# pytest.exit("Maverick is running, Please check the wan connection and certificates")
check_iface = self.run_generic_command(cmd="ifconfig up0v0", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
if check_iface.__contains__("error fetching interface information: Device not found"):
logging.error(check_iface)
pytest.exit("up0v0 interface is not available!!!")
output = self.run_generic_command(cmd="ip neigh show dev up0v0 REACHABLE", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
if output.__contains__("INCOMPLETE") or output.__contains__("FAILED"):
logging.error(output)
ap_logs = self.get_dut_logs(idx=idx, print_log=False, attach_allure=False)
allure.attach(body=ap_logs, name="AP logs")
pytest.fail("up0v0 interface is failed to have connectivity!!!")
def get_uci_show(self, param="ucentral", idx=0, print_log=True, attach_allure=True):
command = "uci show " + param
if "server" in param:
output = self.run_generic_command(cmd=command, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
ret_val = str(output).split("=")[1]
if "cicd.lab.wlan.tip.build" in ret_val:
logging.info(f"we are fetching server url with the command: {command}")
else:
command = "cat /etc/ucentral/gateway.json"
logging.info(f"we are fetching server url with the command: {command}")
output = self.run_generic_command(cmd=command, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
else:
logging.info(f"command:{command}")
output = self.run_generic_command(cmd=command, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
return output
def restart_ucentral_service(self, idx=0, print_log=True, attach_allure=True):
output = self.run_generic_command(cmd="/etc/init.d/ucentral restart", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
return output
def ubus_call_ucentral_status(self, idx=0, print_log=True, attach_allure=True, retry=5):
ret_val = dict.fromkeys(["connected", "latest", "active"])
for i in range(0, retry):
ret_val = dict.fromkeys(["connected", "latest", "active"])
output = self.run_generic_command(cmd="ubus call ucentral status", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
try:
data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
except Exception as e:
logging.error("error in converting the ubus call ucentral status output to json" + output)
data = {}
if (data.keys().__contains__("connected") or data.keys().__contains__("disconnected")) and \
data.keys().__contains__("latest") and \
data.keys().__contains__("active"):
break
else:
logging.error("Error in ubus call ucentral status: " + str(output))
if data.keys().__contains__("connected"):
ret_val["connected"] = True
if data.keys().__contains__("disconnected"):
ret_val["connected"] = False
if data.keys().__contains__("latest"):
ret_val["latest"] = data.get("latest")
if data.keys().__contains__("active"):
ret_val["active"] = data.get("active")
return ret_val
def get_latest_config_recieved(self, idx=0, print_log=True, attach_allure=True):
r_val = self.ubus_call_ucentral_status(idx=idx)
latest_json = {}
if r_val["latest"] is None:
r_val = self.ubus_call_ucentral_status(idx=idx)
if r_val["latest"] is None:
logging.error("ubus call ucentral status has unexpected data")
return False
latest_uuid = r_val["latest"]
output = self.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest_uuid), idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
try:
data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
logging.info("Latest config is : " + str(data))
allure.attach(name="cat /etc/ucentral/ucentral.cfg." + str(latest_uuid),
body=str(json.dumps(data, indent=2)),
attachment_type=allure.attachment_type.JSON)
except Exception as e:
data = output
logging.error("error in converting the output to json" + output)
try_again = True
allure.attach(name="cat /etc/ucentral/ucentral.cfg." + str(latest_uuid),
body=str(data),
attachment_type=allure.attachment_type.JSON)
return latest_json
def get_active_config(self, idx=0, print_log=True, attach_allure=False):
r_val = self.ubus_call_ucentral_status(idx=idx)
active_json = {}
if r_val["active"] is None:
r_val = self.ubus_call_ucentral_status(idx=idx)
if r_val["active"] is None:
logging.error("ubus call ucentral status has unexpected data")
return False
active_uuid = r_val["active"]
timestamp = datetime.datetime.utcnow()
logging.info("cat /etc/ucentral/ucentral.cfg Timestamp: " + str(timestamp))
output = self.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(active_uuid), idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
try:
data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
logging.info("Active config is : " + str(data))
allure.attach(name="cat /etc/ucentral/ucentral.cfg." + str(active_uuid),
body="TimeStamp: " + str(timestamp) + "\n" + str(json.dumps(data, indent=2)),
attachment_type=allure.attachment_type.JSON)
except Exception as e:
data = output
logging.error("error in converting the output to json" + output)
try_again = True
allure.attach(name="cat /etc/ucentral/ucentral.cfg." + str(active_uuid),
body=str(data),
attachment_type=allure.attachment_type.JSON)
print(data)
active_json = data
return active_json
def get_iwinfo(self, idx=0, print_log=True, attach_allure=True):
# [['ssid_wpa2_2g', 'wpa', 'something', '2G'], ['ssid_wpa2_2g', 'wpa', 'something', '5G']] {'wlan0': [
# '"ssid_wpa3_p_5g"', '12:34:56:78:90:12', '5G'], 'wlan1': ['"ssid_wpa3_p_2g"','00:03:7F:12:34:56', '5G']}
iwinfo_output = self.run_generic_command(cmd="iwinfo", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.TEXT)
return iwinfo_output
def get_bssid_band_mapping(self, idx=0):
data = self.get_iwinfo(idx=idx)
data = str(data).replace(" ", "").split("\n")
band_info = []
for i in data:
tmp = []
if i.__contains__("AccessPoint"):
bssid = i.replace("AccessPoint:", "")
tmp.append(bssid.casefold())
elif i.__contains__("MasterChannel"):
if i.split(":")[2].__contains__("2.4"):
tmp.append("2G")
else:
tmp.append("5G")
else:
tmp = []
if tmp != []:
band_info.append(tmp)
bssi_band_mapping = {}
for i in range(len(band_info)):
if (i % 2) == 0:
bssi_band_mapping[band_info[i][0]] = band_info[i + 1][0]
return bssi_band_mapping
def get_ifconfig(self, idx=0):
pass
def verify_certificates(self, idx=0, print_log=False, attach_allure=False):
cert_files_name = ["cas.pem", "dev-id", "key.pem", "cert.pem"]
for cert in cert_files_name:
output = self.run_generic_command(cmd='[ -f /etc/ucentral/' + cert + ' ] && echo "True" || echo "False"',
idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
if output == "False":
logging.error("Certificate " + cert + "is missing from /etc/ucentral/ directory. "
"Please add valid certificates on AP")
pytest.exit("Certificate " + cert + "is missing from /etc/ucentral/ directory. "
"Please add valid certificates on AP")
def run_generic_command(self, cmd="", idx=0, print_log=True, attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT, restrictions=False, attach_name=None):
input_command = cmd
logging.info("Executing Command on AP: " + cmd)
try:
self.setup_library_objects[idx].kill_all_minicom_process(
tty=self.device_under_tests_data[idx]["serial_tty"])
client = self.ssh_cli_connect(ip=self.device_under_tests_data[idx]["host_ip"],
port=self.device_under_tests_data[idx]["host_ssh_port"],
username=self.device_under_tests_data[idx]["host_username"],
password=self.device_under_tests_data[idx]["host_password"],
timeout=10,
allow_agent=False,
banner_timeout=200)
if self.device_under_tests_data[idx]["method"] == "serial":
owrt_args = "--prompt root@" + self.device_under_tests_data[idx][
"identifier"] + " -s serial --log stdout --user root --passwd openwifi"
if not restrictions:
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {owrt_args} -t {self.device_under_tests_data[idx]['serial_tty']} --action " \
f"cmd --value \"{cmd}\" "
if restrictions:
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {owrt_args} -t {self.device_under_tests_data[idx]['serial_tty']} --action " \
f"cmd --value \'{cmd}\' "
if print_log:
logging.info(cmd)
timestamp = datetime.datetime.utcnow()
logging.info(cmd + " Timestamp: " + str(timestamp))
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read()
# data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
final_output = str(output)
if not output.__contains__(b"BOOTLOADER-CONSOLE-IPQ6018#"):
status = output.decode('utf-8').splitlines()
status.pop(0)
final_output = '\n'.join(status)
if print_log:
logging.info(cmd)
logging.info("Output for command: " + input_command + "\n" + final_output)
if attach_allure:
if attach_name is None:
allure.attach(name=input_command, body=str("TimeStamp: " + str(timestamp) + "\n").encode() +
output, attachment_type=expected_attachment_type)
else:
allure.attach(name=attach_name,
body=str("TimeStamp: " + str(timestamp) + "\n").encode() + output,
attachment_type=expected_attachment_type)
client.close()
except Exception as e:
logging.error(e)
final_output = "Error: " + str(e)
return final_output
def get_status(self, idx=0):
pass
def exit_from_uboot(self, idx=0):
if self.is_console_uboot():
self.run_generic_command(cmd="reset-from-console", idx=idx,
print_log=True,
attach_allure=True,
expected_attachment_type=allure.attachment_type.JSON)
def is_console_uboot(self, idx=0):
output = self.run_generic_command(cmd="ubus call ucentral status", idx=idx,
print_log=True,
attach_allure=True,
expected_attachment_type=allure.attachment_type.JSON)
if output.__contains__("BOOTLOADER-CONSOLE-IPQ6018#"):
return True
else:
return False
def is_autoreboot_running(self):
# TODO : Jitendra/Shivam
pass
def reboot(self, idx=0):
output = self.run_generic_command(cmd="reboot", idx=idx,
print_log=False,
attach_allure=False,
expected_attachment_type=allure.attachment_type.JSON)
return output
def get_active_firmware(self, idx=0):
pass
def reset_to_default(self, idx=0):
self.run_generic_command(cmd="cd", idx=idx,
print_log=False,
attach_allure=False,
expected_attachment_type=allure.attachment_type.JSON)
def get_wifi_status(self, idx=0, print_log=True, attach_allure=True):
output = self.run_generic_command(cmd="wifi status", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
try_again = False
try:
data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
except Exception as e:
data = output
logging.error("error in converting the ubus call ucentral status output to json" + output)
try_again = True
if try_again or len(data.keys()) != 3:
output = self.run_generic_command(cmd="wifi status", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
try:
data = dict(json.loads(output.replace("\n\t", "").replace("\n", "")))
except Exception as e:
data = output
logging.error("error in converting the ubus call ucentral status output to json" + output)
ret_val = data
return ret_val
def get_ap_version(self, idx=0, print_log=True, attach_allure=False):
output = self.run_generic_command(cmd="cat /tmp/ucentral.version", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
return output
def get_logread(self, start_ref="", stop_ref="", idx=0, print_log=False, attach_allure=False):
output = self.run_generic_command(cmd="logread", idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
log_data = []
data = output.split("\n")
flag = 0
for logs in data:
if logs.__contains__(start_ref):
flag = 1
if flag == 1:
log_data.append(logs)
if logs.__contains__(stop_ref):
flag = 0
ap_logs = "\n".join(log_data)
return ap_logs
def dfs(self, idx=0, print_log=True, attach_allure=False):
type_ = self.device_under_tests_data[idx]["mode"]
cmd = None
if type_.lower() == "wifi5":
cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy1/ath10k/dfs_simulate_radar ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count = int(ret.count("True"))
logging.info("Status count: " + str(status_count))
if status_count == 1:
cmd = "cd && cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && echo 1 > dfs_simulate_radar"
else:
cmd = "cd && cd /sys/kernel/debug/ieee80211/phy0/ath10k/ && echo 1 > dfs_simulate_radar"
elif type_.lower() == "wifi6" or type_.lower() == "wifi6e":
# cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && ls && echo 1 > dfs_simulate_radar'
cmd1 = '[ -f /sys/kernel/debug/ath11k/qcn6122_1/mac0/dfs_simulate_radar ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count = int(ret.count("True"))
logging.info("Status count: " + str(status_count))
if status_count == 1:
cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd qcn6122_1 && cd mac0 && ls && echo 1 > dfs_simulate_radar'
else:
cmd1 = '[ -f /sys/kernel/debug/ath11k/qcn6122_2/mac0/dfs_simulate_radar ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count_1 = int(ret.count("True"))
logging.info("Status count: " + str(status_count_1))
if status_count_1 == 1:
cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd qcn6122_2 && cd mac0 && ls && echo 1 > dfs_simulate_radar'
else:
#cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && ls && echo 1 > dfs_simulate_radar'
cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy1/mt76/radar_trigger ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count_1 = int(ret.count("True"))
logging.info("Status count: " + str(status_count_1))
if status_count_1 == 1:
logging.info("In Mediatek Chipsets")
cmd = f'cd && cd //sys/kernel/debug/ieee80211/phy1/mt76/ && ls && echo 1 > radar_trigger'
else:
cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy2/mt76/radar_trigger ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count_1 = int(ret.count("True"))
logging.info("Status count: " + str(status_count_1))
if status_count_1 == 1:
logging.info("In Mediatek EAP112 Chipset (phy2)")
cmd = f'cd && cd /sys/kernel/debug/ieee80211/phy2/mt76/ && ls && echo 1 > radar_trigger'
else:
cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && ls && echo 1 > dfs_simulate_radar'
elif type_.lower() == "wifi7":
cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy0/ath12k_hw1/dfs_simulate_radar ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count = int(ret.count("True"))
logging.info("Status count: " + str(status_count))
if status_count == 1:
cmd = "cd && cd /sys/kernel/debug/ieee80211/phy0/ && cd ath12k_hw1 && echo 1 > dfs_simulate_radar"
else:
cmd = "cd && cd /sys/kernel/debug/ieee80211/phy00/ && cd ath12k_hw1 && echo 1 > dfs_simulate_radar"
output = self.run_generic_command(cmd=cmd, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
return output
def dfs_logread(self, idx=0, print_log=True, attach_allure=False):
"""get simulate radar command logs"""
type_ = self.device_under_tests_data[idx]["mode"]
if type_.lower() == "wifi5":
cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy1/ath10k/dfs_simulate_radar ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
logging.info("DFS logread output: " + str(output))
if output.__contains__("False"):
cmd = "cd /sys/kernel/debug/ieee80211/phy0/ath10k/ && logread | grep DFS"
else:
cmd = "cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && logread | grep DFS"
# cmd = "cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && logread | grep DFS"
# print("cmd: ", cmd)
elif type_.lower() == "wifi6" or type_.lower() == "wifi6e":
cmd1 = '[ -f /sys/kernel/debug/ath11k/qcn6122_1/mac0/dfs_simulate_radar ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count = int(ret.count("True"))
logging.info("Status count: " + str(status_count))
if status_count == 1:
cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd qcn6122_1 && cd mac0 && logread | grep DFS'
else:
cmd1 = '[ -f /sys/kernel/debug/ath11k/qcn6122_2/mac0/dfs_simulate_radar ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count_1 = int(ret.count("True"))
logging.info("Status count: " + str(status_count_1))
if status_count_1 == 1:
cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd qcn6122_2 && cd mac0 && logread | grep DFS'
else:
#cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && logread | grep DFS'
cmd1 = '[ -f //sys/kernel/debug/ieee80211/phy1/mt76/radar_trigger ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count_1 = int(ret.count("True"))
logging.info("Status count: " + str(status_count_1))
if status_count_1 == 1:
cmd = f'cd && cd //sys/kernel/debug/ieee80211/phy1/mt76/ && logread | grep DFS'
else:
# cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && logread | grep DFS'
cmd1 = '[ -f //sys/kernel/debug/ieee80211/phy2/mt76/radar_trigger ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
status_count_1 = int(ret.count("True"))
logging.info("Status count: " + str(status_count_1))
if status_count_1 == 1:
cmd = f'cd && cd //sys/kernel/debug/ieee80211/phy2/mt76/ && logread | grep DFS'
else:
cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && logread | grep DFS'
elif type_.lower() == "wifi7":
cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy0/ath12k_hw1/dfs_simulate_radar ] && echo "True" || echo "False"'
output = self.run_generic_command(cmd=cmd1, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
logging.info("DFS logread output: " + str(output))
if output.__contains__("False"):
cmd = f'cd && cd /sys/kernel/debug/ieee80211/phy00/ath12k_hw1/ && logread | grep DFS'
else:
cmd = f'cd && cd /sys/kernel/debug/ieee80211/phy0/ath12k_hw1/ && logread | grep DFS'
try:
output = self.run_generic_command(cmd=cmd, idx=idx,
print_log=print_log,
attach_allure=attach_allure,
expected_attachment_type=allure.attachment_type.JSON)
ret = output.split("\n")
logread = ret[-6:]
logs = ""
for i in logread:
logs = logs + i + "\n"
except Exception as e:
print(e)
logs = ""
logging.info("Simulate radar logs: " + str(logs))
return logs
def factory_reset(self, idx=0, print_log=True, attach_allure=False):
logging.info("started factory reset")
output = self.run_generic_command(cmd="jffs2reset -y -r",
idx=idx,
print_log=print_log,
attach_allure=attach_allure)
logging.info("Done Factory reset")
# Please wait 1min after doing factory reset
return output
def remove_restrictions(self, ap_model):
if "sonicfi_rap" in ap_model:
logging.info(f"Yes it is cybertan AP:")
output = self.run_generic_command(cmd="cd /certificates && fw_setenv cert_part 0",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of fw_setenv cert_part 0: {output}")
output = self.run_generic_command(cmd="rm -f /certificates/restrictions.json",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of removing restrictions: {output}")
output = self.run_generic_command(cmd="fw_setenv store_certs_disabled 0",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of fw_setenv store_certs_disabled 0: {output}")
output = self.run_generic_command(cmd="cd /certificates && store_certs",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of store_certs: {output}")
output = self.run_generic_command(cmd="cd /certificates && fw_setenv cert_part 1",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of fw_setenv cert_part 0: {output}")
output = self.run_generic_command(cmd="rm -f /certificates/restrictions.json",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of removing restrictions: {output}")
output = self.run_generic_command(cmd="fw_setenv store_certs_disabled 0",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of fw_setenv store_certs_disabled 0: {output}")
output = self.run_generic_command(cmd="cd /certificates && store_certs",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of store_certs: {output}")
# Verify the restriction file is removed from both partitions
output = self.run_generic_command(cmd="cd /certificates && fw_setenv cert_part 0",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of fw_setenv cert_part 0: {output}")
output = self.run_generic_command(
cmd='[ -f /certificates/restrictions.json ] && echo "True" || echo "False"',
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT)
logging.info(f"Check for restrictions file in partition 0: {output}")
result = output.strip().splitlines()[-1]
if result == "True":
logging.info("Unable to remove restrictions file from partition 0")
#pytest.fail("Unable to remove restrictions file from partition 0")
output = self.run_generic_command(cmd="cd /certificates && fw_setenv cert_part 1",
idx=0, print_log=True,
attach_allure=False)
logging.info(f"output of fw_setenv cert_part 0: {output}")
output = self.run_generic_command(
cmd='[ -f /certificates/restrictions.json ] && echo "True" || echo "False"',
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT)
logging.info(f"Check for restrictions file in partition 1: {output}")
result = output.strip().splitlines()[-1]
if result == "True" :
logging.info("Unable to remove restrictions file from partition 1")
#pytest.fail("Unable to remove restrictions file from partition 1")
else:
self.run_generic_command(cmd="rm /certificates/restrictions.json",
idx=0, print_log=True,
attach_allure=False)
self.run_generic_command(cmd="rm /etc/ucentral/restrictions.json",
idx=0, print_log=True,
attach_allure=False)
self.factory_reset(print_log=False)
time.sleep(120)
output = self.run_generic_command(cmd='[ -f /etc/ucentral/restrictions.json ] && echo "True" || echo "False"',
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT)
return output
def add_restrictions(self, restrictions_file, developer_mode, ap_model):
self.factory_reset(print_log=False)
time.sleep(200)
if "sonicfi_rap" in ap_model:
logging.info(f"Yes it is cybertan AP:")
restrictions_file = 'echo \"{\\"country\\":[\\"US\\", \\"CA\\"],\\"dfs\\": true,\\"rtty\\": true,\\"tty\\": ' \
'true,\\"developer\\": true,\\"sysupgrade\\": true,\\"commands\\": true,\\"key_info\\": {' \
'\\"vendor\\": \\"dummy\\",\\"algo\\": \\"static\\"}}\" > /certificates/restrictions.json ' \
'&& echo "Partition 0 done"'
restrictions_file1 = 'echo \"{\\"country\\":[\\"US\\", \\"CA\\"],\\"dfs\\": true,\\"rtty\\": true,\\"tty\\": ' \
'true,\\"developer\\": true,\\"sysupgrade\\": true,\\"commands\\": true,\\"key_info\\": {' \
'\\"vendor\\": \\"dummy\\",\\"algo\\": \\"static\\"}}\" > /certificates/restrictions.json ' \
'&& echo "Partition 1 done"'
output = self.run_generic_command(cmd=developer_mode,
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT)
logging.info(f"output of developer_mode::{output}")
output = self.run_generic_command(cmd="fw_setenv cert_part 0",
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT)
logging.info(f"output of fw_setenv cert_part 0::{output}")
output = self.run_generic_command(cmd=restrictions_file,
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT,
restrictions=True)
logging.info(f"output of adding restrictions_file::{output}")
output = self.run_generic_command(cmd="fw_setenv store_certs_disabled 0",
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT)
logging.info(f"output of fw_setenv store_certs_disabled 0::{output}")
cmd_output = self.run_generic_command(cmd="cd /certificates && store_certs",
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT,
restrictions=True)
logging.info(f"output of store_certs::{cmd_output}")
output = self.run_generic_command(cmd="fw_setenv cert_part 1",
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT)
logging.info(f"output of fw_setenv cert_part 1::{output}")
output = self.run_generic_command(cmd=restrictions_file1,
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT,
restrictions=True)
logging.info(f"output of adding restrictions_file1::{output}")
cmd_output = self.run_generic_command(cmd="fw_setenv store_certs_disabled 0",
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT)
logging.info(f"output of fw_setenv store_certs_disabled 0::{cmd_output}")
cmd_output = self.run_generic_command(cmd="cd /certificates && store_certs",
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT,
restrictions=True)
logging.info(f"output of store_certs::{cmd_output}")
else:
output = self.run_generic_command(cmd=developer_mode,
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT)
output = self.run_generic_command(cmd=restrictions_file,
idx=0,
print_log=True,
attach_allure=False,
expected_attachment_type=allure.attachment_type.TEXT,
restrictions=True)
self.factory_reset(print_log=False)
time.sleep(300)
return output
if __name__ == '__main__':
basic = {
"target": "tip_2x",
"controller": {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
"username": "tip@ucentral.com",
"password": "OpenWifi%123"
},
"device_under_tests": [{
"model": "edgecore_eap101",
"supported_bands": ["2G", "5G"],
"supported_modes": ["BRIDGE", "NAT", "VLAN"],
"wan_port": "1.1.eth3",
"lan_port": None,
"ssid": {
"2g-ssid": "OpenWifi",
"5g-ssid": "OpenWifi",
"6g-ssid": "OpenWifi",
"2g-password": "OpenWifi",
"5g-password": "OpenWifi",
"6g-password": "OpenWifi",
"2g-encryption": "WPA2",
"5g-encryption": "WPA2",
"6g-encryption": "WPA3",
"2g-bssid": "68:7d:b4:5f:5c:31",
"5g-bssid": "68:7d:b4:5f:5c:3c",
"6g-bssid": "68:7d:b4:5f:5c:38"
},
"mode": "wifi6",
"identifier": "903cb36c4301",
"method": "serial",
"host_ip": "192.168.52.89",
"host_username": "lanforge",
"host_password": "lanforge",
"host_ssh_port": 22,
"serial_tty": "/dev/ttyUSB0",
"firmware_version": "next-latest"
}],
"traffic_generator": {
"name": "lanforge",
"testbed": "basic",
"scenario": "dhcp-bridge",
"details": {
"manager_ip": "192.168.52.89",
"http_port": 8080,
"ssh_port": 22,
"setup": {"method": "build", "DB": "Test_Scenario_Automation"},
"wan_ports": {
"1.1.eth3": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
"lease-first": 10,
"lease-count": 10000,
"lease-time": "6h"
}
}
},
"lan_ports": {
},
"uplink_nat_ports": {
"1.1.eth2": {
"addressing": "static",
"ip": "192.168.52.150",
"gateway_ip": "192.168.52.1/24",
"ip_mask": "255.255.255.0",
"dns_servers": "BLANK"
}
}
}
}
}
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.NOTSET)
obj = APLIBS(dut_data=basic["device_under_tests"])
obj.ubus_call_ucentral_status()
# obj.exit_from_uboot()
# obj.setup_serial_environment()
# obj.verify_certificates()
# obj.get_dut_logs()
# l = obj.get_latest_config_recieved()
# a = obj.get_active_config()
# if a == l:
# print("a = l")
# print(obj.get_ap_version())