diff --git a/libs/apnos/apnos.py b/libs/apnos/apnos.py index 7f7b9a84f..4794ecc6d 100644 --- a/libs/apnos/apnos.py +++ b/libs/apnos/apnos.py @@ -9,694 +9,694 @@ # 5. Get current Firmware # # """ -# import json -# import os -# -# import paramiko -# import pytest -# from scp import SCPClient -# -# -# class APNOS: -# -# def __init__(self, credentials=None, pwd=os.getcwd(), sdk="2.x"): -# self.serial = credentials['serial'] -# self.owrt_args = "--prompt root@OpenAp -s serial --log stdout --user root --passwd openwifi" -# self.sdk = sdk -# if sdk == "2.x": -# self.owrt_args = "--prompt root@" + self.serial + " -s serial --log stdout --user root --passwd openwifi" -# if credentials is None: -# print("No credentials Given") -# exit() -# self.ip = credentials['ip'] # if mode=1, enter jumphost ip else ap ip address -# self.username = credentials['username'] # if mode=1, enter jumphost username else ap username -# self.password = credentials['password'] # if mode=1, enter jumphost password else ap password -# self.port = credentials['port'] # if mode=1, enter jumphost ssh port else ap ssh port -# self.mode = credentials['jumphost'] # 1 for jumphost, 0 for direct ssh -# -# if 'mode' in credentials: -# self.type = credentials['mode'] -# -# if self.mode: -# self.tty = credentials['jumphost_tty'] # /dev/ttyAP1 -# # kill minicom instance -# client = self.ssh_cli_connect() -# cmd = "pgrep 'minicom' -a" -# stdin, stdout, stderr = client.exec_command(cmd) -# a = str(stdout.read()).split("\\n") -# for i in a: -# if i.__contains__("minicom ap" + self.tty[-1]): -# temp = i.split("minicom") -# a = temp[0].replace(" ", "") -# cmd = "kill " + str(a).replace("b'", "") -# print(cmd) -# stdin, stdout, stderr = client.exec_command(cmd) -# cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"' -# stdin, stdout, stderr = client.exec_command(cmd) -# output = str(stdout.read()) -# -# if output.__contains__("False"): -# cmd = 'mkdir ~/cicd-git/' -# stdin, stdout, stderr = client.exec_command(cmd) -# cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"' -# stdin, stdout, stderr = client.exec_command(cmd) -# output = str(stdout.read()) -# if output.__contains__("False"): -# print("Copying openwrt_ctl serial control Script...") -# with SCPClient(client.get_transport()) as scp: -# scp.put(pwd + '/openwrt_ctl.py', '~/cicd-git/openwrt_ctl.py') # Copy my_file.txt to the server -# cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"' -# stdin, stdout, stderr = client.exec_command(cmd) -# var = str(stdout.read()) -# client.close() -# if var.__contains__("True"): -# print("APNOS Serial Setup OK") -# else: -# print("APNOS Serial Setup Fail") -# -# # Method to connect AP-CLI/ JUMPHOST-CLI -# def ssh_cli_connect(self): -# client = paramiko.SSHClient() -# client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) -# print("Connecting to jumphost: %s@%s:%s" % ( -# self.username, self.ip, self.port)) -# client.connect(self.ip, username=self.username, password=self.password, -# port=self.port, timeout=10, allow_agent=False, banner_timeout=200) -# return client -# -# def reboot(self): -# client = self.ssh_cli_connect() -# -# cmd = "reboot" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# client.close() -# return output -# -# # Method to get the iwinfo status of AP using AP-CLI/ JUMPHOST-CLI -# -# def get_bssid_band_mapping(self): -# client = self.ssh_cli_connect() -# cmd = 'iwinfo' -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# data = stdout.read() -# client.close() -# data = str(data).replace(" ", "").split("\\r\\n") -# band_info = [] -# client.close() -# for i in data: -# tmp = [] -# if i.__contains__("AccessPoint"): -# bssid = i.replace("AccessPoint:", "") -# tmp.append(bssid.casefold()) -# elif i.__contains__("MasterChannel"): -# if i.split(":")[2].__contains__("2.4"): -# tmp.append("2G") -# else: -# tmp.append("5G") -# else: -# tmp = [] -# if tmp != []: -# band_info.append(tmp) -# bssi_band_mapping = {} -# for i in range(len(band_info)): -# if (i % 2) == 0: -# bssi_band_mapping[band_info[i][0]] = band_info[i + 1][0] -# return bssi_band_mapping -# -# # Method to get the vif_config of AP using AP-CLI/ JUMPHOST-CLI -# def get_vif_config(self): -# client = self.ssh_cli_connect() -# cmd = "/usr/opensync/bin/ovsh s Wifi_VIF_Config -c" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# client.close() -# -# return output -# -# # Method to get the vif_state of AP using AP-CLI/ JUMPHOST-CLI -# def get_vif_state(self): -# client = self.ssh_cli_connect() -# cmd = "/usr/opensync/bin/ovsh s Wifi_VIF_State -c" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# client.close() -# return output -# -# # Method to get the vif_config ssid's of AP using AP-CLI/ JUMPHOST-CLI -# def get_vif_config_ssids(self): -# stdout = self.get_vif_config() -# ssid_list = [] -# for i in stdout.splitlines(): -# ssid = str(i).replace(" ", "").split(".") -# if ssid[0].split(":")[0] == "b'ssid": -# ssid_list.append(ssid[0].split(":")[1].replace("'", "")) -# return ssid_list -# -# # Method to get the vif_state ssid's of AP using AP-CLI/ JUMPHOST-CLI -# def get_ssid_info(self): -# stdout = self.get_vif_state() -# ssid_info_list = [] -# info = [] -# for i in stdout.splitlines(): -# ssid = str(i).replace(" ", "").split(".") -# # print(ssid) -# if ssid[0].split(":")[0] == "b'mac": -# mac_info_list = ssid[0].split(":") -# mac_info_list.pop(0) -# info.append(":".join(mac_info_list).replace("'", "")) -# if ssid[0].split(":")[0] == "b'security": -# security = ssid[0].split(":")[1].split(",")[2].replace("]", "").replace('"', "").replace("'", "") -# print(ssid[0].split(":")[1]) -# if security != "OPEN": -# if security == "WPA-PSK": -# if ssid[0].split(":")[1].split(",")[6].__contains__("1"): -# info.append("WPA") -# security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") -# if ssid[0].split(":")[1].split(",")[6].__contains__("2"): -# info.append("WPA2") -# security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") -# if ssid[0].split(":")[1].split(",")[6].__contains__("mixed"): -# info.append("WPA | WPA2") -# security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") -# if security == "WPA-SAE": -# if ssid[0].split(":")[1].split(",")[6].__contains__("3"): -# info.append("WPA3_PERSONAL") -# security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") -# if ssid[0].split(":")[1].split(",")[6].__contains__("mixed"): -# info.append("WPA3_PERSONAL") -# security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") -# if security == "WPA-EAP": -# info.append("EAP-TTLS") -# security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") -# if security == "WPA3-EAP": -# info.append("EAP-TTLS") -# security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") -# else: -# security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") -# info.append(security_key) -# else: -# info.append("OPEN") -# if ssid[0].split(":")[0] == "b'ssid": -# info.append(ssid[0].split(":")[1].replace("'", "")) -# ssid_info_list.append(info) -# info = [] -# print(ssid_info_list) -# return ssid_info_list -# -# # Get VIF State parameters -# def get_vif_state_ssids(self): -# stdout = self.get_vif_state() -# ssid_list = [] -# for i in stdout.splitlines(): -# ssid = str(i).replace(" ", "").split(".") -# if ssid[0].split(":")[0] == "b'ssid": -# ssid_list.append(ssid[0].split(":")[1].replace("'", "")) -# return ssid_list -# -# # Method to get the active firmware of AP using AP-CLI/ JUMPHOST-CLI -# def get_active_firmware(self): -# try: -# client = self.ssh_cli_connect() -# cmd = '/usr/opensync/bin/ovsh s AWLAN_Node -c | grep FW_IMAGE_ACTIVE' -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \ -# f" --action cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# # print(output) -# version_matrix = str(output.decode('utf-8').splitlines()) -# version_matrix_split = version_matrix.partition('FW_IMAGE_ACTIVE","')[2] -# cli_active_fw = version_matrix_split.partition('"],[')[0] -# client.close() -# except Exception as e: -# print(e) -# cli_active_fw = "Error" -# return cli_active_fw -# -# # Method to get the manager state of AP using AP-CLI/ JUMPHOST-CLI -# def get_manager_state(self): -# try: -# client = self.ssh_cli_connect() -# cmd = '/usr/opensync/bin/ovsh s Manager -c | grep status' -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \ -# f" --action cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# status = str(output.decode('utf-8').splitlines()) -# # print(output, stderr.read()) -# client.close() -# except Exception as e: -# print(e) -# status = "Error" -# return status -# -# def get_serial_number(self): -# try: -# client = self.ssh_cli_connect() -# cmd = "node | grep serial_number" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# output = output.decode('utf-8').splitlines() -# serial = output[1].replace(" ", "").split("|")[1] -# client.close() -# except Exception as e: -# print(e) -# serial = "Error" -# return serial -# -# def get_redirector(self): -# try: -# client = self.ssh_cli_connect() -# cmd = "node | grep redirector_addr" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# print(output, stderr.read()) -# status = output.decode('utf-8').splitlines() -# redirector = status[1].replace(" ", "").split("|")[1] -# client.close() -# except Exception as e: -# print(e) -# redirector = "Error" -# return redirector -# -# def run_generic_command(self, cmd=""): -# try: -# client = self.ssh_cli_connect() -# cmd = cmd -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# status = output.decode('utf-8').splitlines() -# client.close() -# except Exception as e: -# print(e) -# status = "Error" -# return status -# -# def get_ucentral_status(self): -# try: -# client = self.ssh_cli_connect() -# cmd = "ubus call ucentral status" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# print(output) -# if 'latest' not in str(output): -# print("ubus call ucentral status: command has invalid output", str(output)) -# connected, latest, active = "Error", "Error1", "Error2" -# return connected, latest, active -# else: -# connected = False -# if "\"connected" in output.decode('utf-8').splitlines()[2]: -# connected = True -# # connected = output.decode('utf-8').splitlines()[2] -# latest = output.decode('utf-8').splitlines()[3].split(":")[1].replace(" ", "").replace(",", "") -# active = output.decode('utf-8').splitlines()[4].split(":")[1].replace(" ", "").replace(",", "") -# client.close() -# except Exception as e: -# if output.__contains__(b'"connected":'): -# pass -# else: -# pytest.exit("ubus call ucentral status: error" + str(output)) -# print(e) -# connected, latest, active = "Error", "Error", "Error" -# return connected, latest, active -# -# def get_uc_latest_config(self): -# try: -# connected, latest, active = self.get_ucentral_status() -# print() -# client = self.ssh_cli_connect() -# cmd = "cat /etc/ucentral/ucentral.cfg." + latest -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read().decode('utf-8').splitlines()[1] -# print(output) -# json_output = json.loads(output) # , sort_keys=True) -# print(type(json_output)) -# client.close() -# except Exception as e: -# json_output = {} -# print(e) -# return json_output -# -# def get_uc_active_config(self): -# try: -# connected, latest, active = self.get_ucentral_status() -# client = self.ssh_cli_connect() -# cmd = "cat /etc/ucentral/ucentral.cfg." + active -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read().decode('utf-8').splitlines()[1] -# json_output = json.loads(output) # , sort_keys=True) -# print(json_output) -# client.close() -# except Exception as e: -# json_output = {} -# print(e) -# return json_output -# -# def get_interface_details(self): -# r = self.get_wifi_status() -# print(r) -# wifi_info = {} -# if self.sdk == "1.x": -# for i in r: -# for j in r[i]["interfaces"]: -# encryption = j["config"]["encryption"] -# if encryption == "psk" or encryption == "psk2" or encryption == "psk-mixed" or \ -# encryption == "sae" or encryption == "sae-mixed": -# wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], j["config"]["key"]] -# else: -# wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], ""] -# print(wifi_info) -# data = self.get_iwinfo() -# for i in wifi_info.keys(): -# wifi_info[i].append(data[i]) -# -# return wifi_info -# if self.sdk == "2.x": -# for i in r: -# for j in r[i]["interfaces"]: -# encryption = j["config"]["encryption"] -# if encryption == "psk" or encryption == "psk2" or encryption == "psk-mixed" or \ -# encryption == "sae" or encryption == "sae-mixed": -# wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], j["config"]["key"]] -# else: -# wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], ""] -# data = self.get_iwinfo() -# print(wifi_info) -# print(data) -# for i in wifi_info.keys(): -# wifi_info[i].append(data[i]) -# return wifi_info -# -# def get_wifi_status(self): -# try: -# -# client = self.ssh_cli_connect() -# cmd = "wifi status" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# -# output = stdout.read().decode('utf-8') -# data = output.split() -# data.pop(0) -# data.pop(0) -# data.pop(0) -# OUT = "".join(data) -# json_output = json.loads(OUT) -# client.close() -# except Exception as e: -# json_output = False -# print(e) -# return json_output -# -# def get_iwinfo(self): -# try: -# # [['ssid_wpa2_2g', 'wpa', 'something', '2G'], ['ssid_wpa2_2g', 'wpa', 'something', '5G']] -# # {'wlan0': ['"ssid_wpa3_p_5g"', '12:34:56:78:90:12', '5G'], 'wlan1': ['"ssid_wpa3_p_2g"','00:03:7F:12:34:56', '5G']} -# client = self.ssh_cli_connect() -# cmd = "iwinfo" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8') -# o = output.split() -# iwinfo_bssid_data = {} -# for i in range(len(o)): -# if o[i].__contains__("ESSID"): -# if o[i + 9].__contains__("2.4"): -# band = "2G" -# else: -# band = "5G" -# iwinfo_bssid_data[o[i - 1]] = [o[i + 1].replace('"', ''), o[i + 4], band] -# client.close() -# except Exception as e: -# iwinfo_bssid_data = False -# print(e) -# return iwinfo_bssid_data -# -# def iwinfo(self): -# client = self.ssh_cli_connect() -# cmd = "iwinfo" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8') -# o = output -# client.close() -# return o -# -# def get_memory_profile(self): -# client = self.ssh_cli_connect() -# cmd = "ucode /usr/share/ucentral/sysinfo.uc" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8') -# o = output -# client.close() -# return o -# -# def gettxpower(self): -# client = self.ssh_cli_connect() -# cmd = "iw dev | grep txpower" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read().replace(b":~# iw dev | grep txpower", b"").decode('utf-8') -# tx_power = output.replace("\t\t", "").split("\r\n") -# tx_power.remove('') -# tx_power.remove('\n') -# cmd = "iw dev | grep Interface" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read().replace(b":~# iw dev | grep Interface", b"").decode('utf-8') -# name = output.replace("\t", "").splitlines() -# name.remove('') -# name.pop(-1) -# client.close() -# return tx_power, name -# - # def get_logread(self, start_ref="", stop_ref=""): - # data = self.logread() - # log_data = [] - # data = data.split("\n") - # flag = 0 - # for logs in data: - # if logs.__contains__(start_ref): - # flag = 1 - # if flag == 1: - # log_data.append(logs) - # if logs.__contains__(stop_ref): - # flag = 0 - # ap_logs = "\n".join(log_data) - # return ap_logs -# -# def logread(self): -# try: -# client = self.ssh_cli_connect() -# cmd = "logread" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# status = output.decode('utf-8').splitlines() -# logread = status -# logs = "" -# for i in logread: -# logs = logs + i + "\n" -# client.close() -# except Exception as e: -# print(e) -# logs = "" -# return logs -# -# def get_ap_version_ucentral(self): -# client = self.ssh_cli_connect() -# cmd = "cat /tmp/ucentral.version" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read().replace(b":~# cat /tmp/ucentral.version", b"").decode('utf-8') -# client.close() -# return output -# -# def get_vifc(self): -# client = self.ssh_cli_connect() -# cmd = "vifC" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# client.close() -# return output -# -# def get_vifs(self): -# client = self.ssh_cli_connect() -# cmd = "vifS" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# client.close() -# return output -# -# def get_vlan(self): -# stdout = self.get_vifs() -# vlan_list = [] -# for i in stdout.splitlines(): -# vlan = str(i.strip()).replace("|", ".").split(".") -# try: -# if not vlan[0].find("b'vlan_id"): -# vlan_list.append(vlan[1].strip()) -# except: -# pass -# return vlan_list -# -# def get_ap_uci_show_ucentral(self): -# try: -# client = self.ssh_cli_connect() -# cmd = "uci show ucentral" -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# status = output.decode('utf-8').splitlines() -# for i in status: -# if i.startswith("ucentral.config.server="): -# status = i.split("=")[1] -# client.close() -# except Exception as e: -# print(e) -# status = "Error" -# return status -# -# def dfs(self): -# if self.type.lower() == "wifi5": -# client = self.ssh_cli_connect() -# cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy1/ath10k/dfs_simulate_radar ] && echo "True" || echo "False"' -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd1}\" " -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# status = output.decode('utf-8') -# status_count = int(status.count("True")) -# print("status", status) -# print("count", status_count) -# client.close() -# if status_count == 1: -# cmd = "cd && cd /sys/kernel/debug/ieee80211/phy0/ath10k/ && echo 1 > dfs_simulate_radar" -# else: -# cmd = "cd && cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && echo 1 > dfs_simulate_radar" -# command = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# elif self.type.lower() == "wifi6" or self.type.lower() == "wifi6e": -# cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && ls && echo 1 > dfs_simulate_radar' -# print("cmd: ", cmd) -# if self.mode: -# command = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# client = self.ssh_cli_connect() -# stdin, stdout, stderr = client.exec_command(command) -# output = stdout.read() -# print("Output", output) -# client.close() -# -# def dfs_logread(self): -# if self.type.lower() == "wifi5": -# client = self.ssh_cli_connect() -# cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy1/ath10k/dfs_simulate_radar ] && echo "True" || echo "False"' -# stdin, stdout, stderr = client.exec_command(cmd1) -# output = str(stdout.read()) -# print("Output", output) -# if output.__contains__("False"): -# cmd = "cd /sys/kernel/debug/ieee80211/phy0/ath10k/ && logread | grep DFS" -# else: -# cmd = "cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && logread | grep DFS" -# client.close() -# #cmd = "cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && logread | grep DFS" -# #print("cmd: ", cmd) -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# elif self.type.lower() == "wifi6" or self.type.lower() == "wifi6e": -# cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && logread | grep DFS' -# print("cmd: ", cmd) -# if self.mode: -# cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ -# f"cmd --value \"{cmd}\" " -# try: -# client = self.ssh_cli_connect() -# stdin, stdout, stderr = client.exec_command(cmd) -# output = stdout.read() -# status = output.decode('utf-8').splitlines() -# logread = status[-6:] -# logs = "" -# for i in logread: -# logs = logs + i + "\n" -# client.close() -# except Exception as e: -# print(e) -# logs = "" -# return logs -# -# -# if __name__ == '__main__': -# obj = { -# "model": "cig_wf188n", -# "mode": "wifi6", -# "serial": "0000c1018812", -# "jumphost": True, -# "ip": "10.28.3.103", -# "username": "lanforge", -# "password": "pumpkin77", -# "port": 22, -# "jumphost_tty": "/dev/ttyAP1", -# "version": "next-latest" -# } -# var = APNOS(credentials=obj, sdk="2.x") -# print(var.get_memory_profile()) +import json +import os + +import paramiko +import pytest +from scp import SCPClient + + +class APNOS: + + def __init__(self, credentials=None, pwd=os.getcwd(), sdk="2.x"): + self.serial = credentials['serial'] + self.owrt_args = "--prompt root@OpenAp -s serial --log stdout --user root --passwd openwifi" + self.sdk = sdk + if sdk == "2.x": + self.owrt_args = "--prompt root@" + self.serial + " -s serial --log stdout --user root --passwd openwifi" + if credentials is None: + print("No credentials Given") + exit() + self.ip = credentials['ip'] # if mode=1, enter jumphost ip else ap ip address + self.username = credentials['username'] # if mode=1, enter jumphost username else ap username + self.password = credentials['password'] # if mode=1, enter jumphost password else ap password + self.port = credentials['port'] # if mode=1, enter jumphost ssh port else ap ssh port + self.mode = credentials['jumphost'] # 1 for jumphost, 0 for direct ssh + + if 'mode' in credentials: + self.type = credentials['mode'] + + if self.mode: + self.tty = credentials['jumphost_tty'] # /dev/ttyAP1 + # kill minicom instance + client = self.ssh_cli_connect() + cmd = "pgrep 'minicom' -a" + stdin, stdout, stderr = client.exec_command(cmd) + a = str(stdout.read()).split("\\n") + for i in a: + if i.__contains__("minicom ap" + self.tty[-1]): + temp = i.split("minicom") + a = temp[0].replace(" ", "") + cmd = "kill " + str(a).replace("b'", "") + print(cmd) + stdin, stdout, stderr = client.exec_command(cmd) + cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd) + output = str(stdout.read()) + + if output.__contains__("False"): + cmd = 'mkdir ~/cicd-git/' + stdin, stdout, stderr = client.exec_command(cmd) + cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd) + output = str(stdout.read()) + if output.__contains__("False"): + print("Copying openwrt_ctl serial control Script...") + with SCPClient(client.get_transport()) as scp: + scp.put(pwd + '/openwrt_ctl.py', '~/cicd-git/openwrt_ctl.py') # Copy my_file.txt to the server + cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd) + var = str(stdout.read()) + client.close() + if var.__contains__("True"): + print("APNOS Serial Setup OK") + else: + print("APNOS Serial Setup Fail") + + # Method to connect AP-CLI/ JUMPHOST-CLI + def ssh_cli_connect(self): + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + print("Connecting to jumphost: %s@%s:%s" % ( + self.username, self.ip, self.port)) + client.connect(self.ip, username=self.username, password=self.password, + port=self.port, timeout=10, allow_agent=False, banner_timeout=200) + return client + + def reboot(self): + client = self.ssh_cli_connect() + + cmd = "reboot" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + client.close() + return output + + # Method to get the iwinfo status of AP using AP-CLI/ JUMPHOST-CLI + + def get_bssid_band_mapping(self): + client = self.ssh_cli_connect() + cmd = 'iwinfo' + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + data = stdout.read() + client.close() + data = str(data).replace(" ", "").split("\\r\\n") + band_info = [] + client.close() + for i in data: + tmp = [] + if i.__contains__("AccessPoint"): + bssid = i.replace("AccessPoint:", "") + tmp.append(bssid.casefold()) + elif i.__contains__("MasterChannel"): + if i.split(":")[2].__contains__("2.4"): + tmp.append("2G") + else: + tmp.append("5G") + else: + tmp = [] + if tmp != []: + band_info.append(tmp) + bssi_band_mapping = {} + for i in range(len(band_info)): + if (i % 2) == 0: + bssi_band_mapping[band_info[i][0]] = band_info[i + 1][0] + return bssi_band_mapping + + # Method to get the vif_config of AP using AP-CLI/ JUMPHOST-CLI + def get_vif_config(self): + client = self.ssh_cli_connect() + cmd = "/usr/opensync/bin/ovsh s Wifi_VIF_Config -c" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + client.close() + + return output + + # Method to get the vif_state of AP using AP-CLI/ JUMPHOST-CLI + def get_vif_state(self): + client = self.ssh_cli_connect() + cmd = "/usr/opensync/bin/ovsh s Wifi_VIF_State -c" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + client.close() + return output + + # Method to get the vif_config ssid's of AP using AP-CLI/ JUMPHOST-CLI + def get_vif_config_ssids(self): + stdout = self.get_vif_config() + ssid_list = [] + for i in stdout.splitlines(): + ssid = str(i).replace(" ", "").split(".") + if ssid[0].split(":")[0] == "b'ssid": + ssid_list.append(ssid[0].split(":")[1].replace("'", "")) + return ssid_list + + # Method to get the vif_state ssid's of AP using AP-CLI/ JUMPHOST-CLI + def get_ssid_info(self): + stdout = self.get_vif_state() + ssid_info_list = [] + info = [] + for i in stdout.splitlines(): + ssid = str(i).replace(" ", "").split(".") + # print(ssid) + if ssid[0].split(":")[0] == "b'mac": + mac_info_list = ssid[0].split(":") + mac_info_list.pop(0) + info.append(":".join(mac_info_list).replace("'", "")) + if ssid[0].split(":")[0] == "b'security": + security = ssid[0].split(":")[1].split(",")[2].replace("]", "").replace('"', "").replace("'", "") + print(ssid[0].split(":")[1]) + if security != "OPEN": + if security == "WPA-PSK": + if ssid[0].split(":")[1].split(",")[6].__contains__("1"): + info.append("WPA") + security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") + if ssid[0].split(":")[1].split(",")[6].__contains__("2"): + info.append("WPA2") + security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") + if ssid[0].split(":")[1].split(",")[6].__contains__("mixed"): + info.append("WPA | WPA2") + security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") + if security == "WPA-SAE": + if ssid[0].split(":")[1].split(",")[6].__contains__("3"): + info.append("WPA3_PERSONAL") + security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") + if ssid[0].split(":")[1].split(",")[6].__contains__("mixed"): + info.append("WPA3_PERSONAL") + security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") + if security == "WPA-EAP": + info.append("EAP-TTLS") + security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") + if security == "WPA3-EAP": + info.append("EAP-TTLS") + security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") + else: + security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "") + info.append(security_key) + else: + info.append("OPEN") + if ssid[0].split(":")[0] == "b'ssid": + info.append(ssid[0].split(":")[1].replace("'", "")) + ssid_info_list.append(info) + info = [] + print(ssid_info_list) + return ssid_info_list + + # Get VIF State parameters + def get_vif_state_ssids(self): + stdout = self.get_vif_state() + ssid_list = [] + for i in stdout.splitlines(): + ssid = str(i).replace(" ", "").split(".") + if ssid[0].split(":")[0] == "b'ssid": + ssid_list.append(ssid[0].split(":")[1].replace("'", "")) + return ssid_list + + # Method to get the active firmware of AP using AP-CLI/ JUMPHOST-CLI + def get_active_firmware(self): + try: + client = self.ssh_cli_connect() + cmd = '/usr/opensync/bin/ovsh s AWLAN_Node -c | grep FW_IMAGE_ACTIVE' + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \ + f" --action cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + # print(output) + version_matrix = str(output.decode('utf-8').splitlines()) + version_matrix_split = version_matrix.partition('FW_IMAGE_ACTIVE","')[2] + cli_active_fw = version_matrix_split.partition('"],[')[0] + client.close() + except Exception as e: + print(e) + cli_active_fw = "Error" + return cli_active_fw + + # Method to get the manager state of AP using AP-CLI/ JUMPHOST-CLI + def get_manager_state(self): + try: + client = self.ssh_cli_connect() + cmd = '/usr/opensync/bin/ovsh s Manager -c | grep status' + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \ + f" --action cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + status = str(output.decode('utf-8').splitlines()) + # print(output, stderr.read()) + client.close() + except Exception as e: + print(e) + status = "Error" + return status + + def get_serial_number(self): + try: + client = self.ssh_cli_connect() + cmd = "node | grep serial_number" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + output = output.decode('utf-8').splitlines() + serial = output[1].replace(" ", "").split("|")[1] + client.close() + except Exception as e: + print(e) + serial = "Error" + return serial + + def get_redirector(self): + try: + client = self.ssh_cli_connect() + cmd = "node | grep redirector_addr" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + print(output, stderr.read()) + status = output.decode('utf-8').splitlines() + redirector = status[1].replace(" ", "").split("|")[1] + client.close() + except Exception as e: + print(e) + redirector = "Error" + return redirector + + def run_generic_command(self, cmd=""): + try: + client = self.ssh_cli_connect() + cmd = cmd + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + status = output.decode('utf-8').splitlines() + client.close() + except Exception as e: + print(e) + status = "Error" + return status + + def get_ucentral_status(self): + try: + client = self.ssh_cli_connect() + cmd = "ubus call ucentral status" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + print(output) + if 'latest' not in str(output): + print("ubus call ucentral status: command has invalid output", str(output)) + connected, latest, active = "Error", "Error1", "Error2" + return connected, latest, active + else: + connected = False + if "\"connected" in output.decode('utf-8').splitlines()[2]: + connected = True + # connected = output.decode('utf-8').splitlines()[2] + latest = output.decode('utf-8').splitlines()[3].split(":")[1].replace(" ", "").replace(",", "") + active = output.decode('utf-8').splitlines()[4].split(":")[1].replace(" ", "").replace(",", "") + client.close() + except Exception as e: + if output.__contains__(b'"connected":'): + pass + else: + pytest.exit("ubus call ucentral status: error" + str(output)) + print(e) + connected, latest, active = "Error", "Error", "Error" + return connected, latest, active + + def get_uc_latest_config(self): + try: + connected, latest, active = self.get_ucentral_status() + print() + client = self.ssh_cli_connect() + cmd = "cat /etc/ucentral/ucentral.cfg." + latest + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read().decode('utf-8').splitlines()[1] + print(output) + json_output = json.loads(output) # , sort_keys=True) + print(type(json_output)) + client.close() + except Exception as e: + json_output = {} + print(e) + return json_output + + def get_uc_active_config(self): + try: + connected, latest, active = self.get_ucentral_status() + client = self.ssh_cli_connect() + cmd = "cat /etc/ucentral/ucentral.cfg." + active + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read().decode('utf-8').splitlines()[1] + json_output = json.loads(output) # , sort_keys=True) + print(json_output) + client.close() + except Exception as e: + json_output = {} + print(e) + return json_output + + def get_interface_details(self): + r = self.get_wifi_status() + print(r) + wifi_info = {} + if self.sdk == "1.x": + for i in r: + for j in r[i]["interfaces"]: + encryption = j["config"]["encryption"] + if encryption == "psk" or encryption == "psk2" or encryption == "psk-mixed" or \ + encryption == "sae" or encryption == "sae-mixed": + wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], j["config"]["key"]] + else: + wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], ""] + print(wifi_info) + data = self.get_iwinfo() + for i in wifi_info.keys(): + wifi_info[i].append(data[i]) + + return wifi_info + if self.sdk == "2.x": + for i in r: + for j in r[i]["interfaces"]: + encryption = j["config"]["encryption"] + if encryption == "psk" or encryption == "psk2" or encryption == "psk-mixed" or \ + encryption == "sae" or encryption == "sae-mixed": + wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], j["config"]["key"]] + else: + wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], ""] + data = self.get_iwinfo() + print(wifi_info) + print(data) + for i in wifi_info.keys(): + wifi_info[i].append(data[i]) + return wifi_info + + def get_wifi_status(self): + try: + + client = self.ssh_cli_connect() + cmd = "wifi status" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + + output = stdout.read().decode('utf-8') + data = output.split() + data.pop(0) + data.pop(0) + data.pop(0) + OUT = "".join(data) + json_output = json.loads(OUT) + client.close() + except Exception as e: + json_output = False + print(e) + return json_output + + def get_iwinfo(self): + try: + # [['ssid_wpa2_2g', 'wpa', 'something', '2G'], ['ssid_wpa2_2g', 'wpa', 'something', '5G']] + # {'wlan0': ['"ssid_wpa3_p_5g"', '12:34:56:78:90:12', '5G'], 'wlan1': ['"ssid_wpa3_p_2g"','00:03:7F:12:34:56', '5G']} + client = self.ssh_cli_connect() + cmd = "iwinfo" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8') + o = output.split() + iwinfo_bssid_data = {} + for i in range(len(o)): + if o[i].__contains__("ESSID"): + if o[i + 9].__contains__("2.4"): + band = "2G" + else: + band = "5G" + iwinfo_bssid_data[o[i - 1]] = [o[i + 1].replace('"', ''), o[i + 4], band] + client.close() + except Exception as e: + iwinfo_bssid_data = False + print(e) + return iwinfo_bssid_data + + def iwinfo(self): + client = self.ssh_cli_connect() + cmd = "iwinfo" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8') + o = output + client.close() + return o + + def get_memory_profile(self): + client = self.ssh_cli_connect() + cmd = "ucode /usr/share/ucentral/sysinfo.uc" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8') + o = output + client.close() + return o + + def gettxpower(self): + client = self.ssh_cli_connect() + cmd = "iw dev | grep txpower" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read().replace(b":~# iw dev | grep txpower", b"").decode('utf-8') + tx_power = output.replace("\t\t", "").split("\r\n") + tx_power.remove('') + tx_power.remove('\n') + cmd = "iw dev | grep Interface" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read().replace(b":~# iw dev | grep Interface", b"").decode('utf-8') + name = output.replace("\t", "").splitlines() + name.remove('') + name.pop(-1) + client.close() + return tx_power, name + + def get_logread(self, start_ref="", stop_ref=""): + data = self.logread() + log_data = [] + data = data.split("\n") + flag = 0 + for logs in data: + if logs.__contains__(start_ref): + flag = 1 + if flag == 1: + log_data.append(logs) + if logs.__contains__(stop_ref): + flag = 0 + ap_logs = "\n".join(log_data) + return ap_logs + + def logread(self): + try: + client = self.ssh_cli_connect() + cmd = "logread" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + status = output.decode('utf-8').splitlines() + logread = status + logs = "" + for i in logread: + logs = logs + i + "\n" + client.close() + except Exception as e: + print(e) + logs = "" + return logs + + def get_ap_version_ucentral(self): + client = self.ssh_cli_connect() + cmd = "cat /tmp/ucentral.version" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read().replace(b":~# cat /tmp/ucentral.version", b"").decode('utf-8') + client.close() + return output + + def get_vifc(self): + client = self.ssh_cli_connect() + cmd = "vifC" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + client.close() + return output + + def get_vifs(self): + client = self.ssh_cli_connect() + cmd = "vifS" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + client.close() + return output + + def get_vlan(self): + stdout = self.get_vifs() + vlan_list = [] + for i in stdout.splitlines(): + vlan = str(i.strip()).replace("|", ".").split(".") + try: + if not vlan[0].find("b'vlan_id"): + vlan_list.append(vlan[1].strip()) + except: + pass + return vlan_list + + def get_ap_uci_show_ucentral(self): + try: + client = self.ssh_cli_connect() + cmd = "uci show ucentral" + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + status = output.decode('utf-8').splitlines() + for i in status: + if i.startswith("ucentral.config.server="): + status = i.split("=")[1] + client.close() + except Exception as e: + print(e) + status = "Error" + return status + + def dfs(self): + if self.type.lower() == "wifi5": + client = self.ssh_cli_connect() + cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy1/ath10k/dfs_simulate_radar ] && echo "True" || echo "False"' + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd1}\" " + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + status = output.decode('utf-8') + status_count = int(status.count("True")) + print("status", status) + print("count", status_count) + client.close() + if status_count == 1: + cmd = "cd && cd /sys/kernel/debug/ieee80211/phy0/ath10k/ && echo 1 > dfs_simulate_radar" + else: + cmd = "cd && cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && echo 1 > dfs_simulate_radar" + command = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + elif self.type.lower() == "wifi6" or self.type.lower() == "wifi6e": + cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && ls && echo 1 > dfs_simulate_radar' + print("cmd: ", cmd) + if self.mode: + command = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + client = self.ssh_cli_connect() + stdin, stdout, stderr = client.exec_command(command) + output = stdout.read() + print("Output", output) + client.close() + + def dfs_logread(self): + if self.type.lower() == "wifi5": + client = self.ssh_cli_connect() + cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy1/ath10k/dfs_simulate_radar ] && echo "True" || echo "False"' + stdin, stdout, stderr = client.exec_command(cmd1) + output = str(stdout.read()) + print("Output", output) + if output.__contains__("False"): + cmd = "cd /sys/kernel/debug/ieee80211/phy0/ath10k/ && logread | grep DFS" + else: + cmd = "cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && logread | grep DFS" + client.close() + #cmd = "cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && logread | grep DFS" + #print("cmd: ", cmd) + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + elif self.type.lower() == "wifi6" or self.type.lower() == "wifi6e": + cmd = f'cd && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && logread | grep DFS' + print("cmd: ", cmd) + if self.mode: + cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \ + f"cmd --value \"{cmd}\" " + try: + client = self.ssh_cli_connect() + stdin, stdout, stderr = client.exec_command(cmd) + output = stdout.read() + status = output.decode('utf-8').splitlines() + logread = status[-6:] + logs = "" + for i in logread: + logs = logs + i + "\n" + client.close() + except Exception as e: + print(e) + logs = "" + return logs + + +if __name__ == '__main__': + obj = { + "model": "cig_wf188n", + "mode": "wifi6", + "serial": "0000c1018812", + "jumphost": True, + "ip": "10.28.3.103", + "username": "lanforge", + "password": "pumpkin77", + "port": 22, + "jumphost_tty": "/dev/ttyAP1", + "version": "next-latest" + } + var = APNOS(credentials=obj, sdk="2.x") + print(var.get_memory_profile()) diff --git a/libs/apnos/openwrt_ctl.py b/libs/apnos/openwrt_ctl.py index 829852cf2..0d05799f1 100755 --- a/libs/apnos/openwrt_ctl.py +++ b/libs/apnos/openwrt_ctl.py @@ -16,360 +16,360 @@ # --value2 password-for-10.28.3.100 --log stdout --scheme serial --prompt root@Open # ''' # -# import sys -# -# if sys.version_info[0] != 3: -# print("This script requires Python 3") -# exit() -# -# import re -# import logging -# import time -# from time import sleep -# import pprint -# import telnetlib -# import argparse -# import pexpect -# -# default_host = "localhost" -# default_ports = { -# "serial": None, -# "ssh": 22, -# "telnet": 23 -# } -# NL = "\n" -# CR = "\r\n" -# Q = '"' -# A = "'" -# FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s' -# prompt = "root@OpenWrt:" -# -# -# def usage(): -# print("$0 used connect to OpenWrt AP or similar Linux machine:") -# print("-d|--dest IP address of the OpenWrt AP, for ssh/telnet scheme") -# print("-o|--port IP port of the OpenWrt AP, for ssh/telnet scheme") -# print("-t|--tty Serial port, if using serial scheme") -# print("-u|--user login name") -# print("-p|--pass password") -# print("--prompt Prompt to look for when commands are done (default: root@OpenWrt)") -# print("-s|--scheme (serial|telnet|ssh): connect via serial, ssh or telnet") -# print("-l|--log file log messages here") -# print("--action (logread | journalctl | lurk | sysupgrade | download | upload | reboot | cmd | ssh-tunnel") -# print("--value (option to help complete the action") -# print("--value2 (option to help complete the action, dest filename for download, passwd for ssh-tunnel") -# print("-h|--help") -# -# -# # see https://stackoverflow.com/a/13306095/11014343 -# class FileAdapter(object): -# def __init__(self, logger): -# self.logger = logger -# -# def write(self, data): -# # NOTE: data can be a partial line, multiple lines -# data = data.strip() # ignore leading/trailing whitespace -# if data: # non-blank -# self.logger.info(data) -# -# def flush(self): -# pass # leave it to logging to flush properly -# -# -# def main(): -# global prompt -# -# parser = argparse.ArgumentParser(description="OpenWrt AP Control Script") -# parser.add_argument("-d", "--dest", type=str, help="address of the cisco controller_tests") -# parser.add_argument("-o", "--port", type=int, help="control port on the controller_tests") -# parser.add_argument("-u", "--user", type=str, help="credential login/username") -# parser.add_argument("-p", "--passwd", type=str, help="credential password") -# parser.add_argument("-P", "--prompt", type=str, help="Prompt to look for") -# parser.add_argument("-s", "--scheme", type=str, choices=["serial", "ssh", "telnet"], -# help="Connect via serial, ssh or telnet") -# parser.add_argument("-t", "--tty", type=str, help="tty serial device") -# parser.add_argument("-l", "--log", type=str, help="logfile for messages, stdout means output to console") -# parser.add_argument("--action", type=str, help="perform action", -# choices=["logread", "journalctl", "lurk", "sysupgrade", "sysupgrade-n", "download", "upload", -# "reboot", "cmd", "ssh-tunnel"]) -# parser.add_argument("--value", type=str, help="set value") -# parser.add_argument("--value2", type=str, help="set value2") -# tty = None -# -# args = None -# try: -# args = parser.parse_args() -# host = args.dest -# scheme = args.scheme -# port = args.port -# # port = (default_ports[scheme], args.port)[args.port != None] -# user = args.user -# passwd = args.passwd -# logfile = args.log -# tty = args.tty; -# if (args.prompt != None): -# prompt = args.prompt -# filehandler = None -# except Exception as e: -# logging.exception(e); -# usage() -# exit(2); -# -# console_handler = logging.StreamHandler() -# formatter = logging.Formatter(FORMAT) -# logg = logging.getLogger(__name__) -# logg.setLevel(logging.DEBUG) -# file_handler = None -# if (logfile is not None): -# if (logfile != "stdout"): -# file_handler = logging.FileHandler(logfile, "w") -# file_handler.setLevel(logging.DEBUG) -# file_handler.setFormatter(formatter) -# logg.addHandler(file_handler) -# logging.basicConfig(format=FORMAT, handlers=[file_handler]) -# else: -# # stdout logging -# logging.basicConfig(format=FORMAT, handlers=[console_handler]) -# -# CCPROMPT = prompt -# -# ser = None -# egg = None # think "eggpect" -# try: -# if (scheme == "serial"): -# # eggspect = pexpect.fdpexpect.fdspan(telcon, logfile=sys.stdout.buffer) -# import serial -# from pexpect_serial import SerialSpawn -# ser = serial.Serial(tty, 115200, timeout=5) -# -# egg = SerialSpawn(ser); -# egg.logfile = FileAdapter(logg) -# egg.sendline(NL) -# has_reset = False -# try: -# logg.info("prompt: %s user: %s passwd: %s" % (prompt, user, passwd)) -# while True: -# i = egg.expect([prompt, "Please press Enter to activate", "login:", "Password:", "IPQ6018#"], -# timeout=3) -# logg.info("expect-0: %i" % (i)) -# if (i == 0): -# logg.info("Found prompt, login complete.") -# break -# if (i == 1): -# logg.info("Sending newline") -# egg.setdline(NL) -# if (i == 2): -# logg.info("Sending username: %s" % (user)) -# egg.sendline(user) -# if (i == 3): -# logg.info("Sending password: %s" % (passwd)) -# egg.sendline(passwd) -# if (i == 4): # in bootloader -# if has_reset: -# logg.info("ERROR: Have reset once already, back in bootloader?") -# sys.exit(1) -# has_reset = True -# logg.info("In boot loader, will reset and sleep 30 seconds") -# egg.sendline("reset") -# time.sleep(30) -# egg.sendline(NL) -# -# except Exception as e: -# # maybe something like 'logread -f' is running? -# # send ctrl-c -# egg.send(chr(3)) -# -# elif (scheme == "ssh"): -# # Not implemented/tested currently. --Ben -# if (port is None): -# port = 22 -# cmd = "ssh -p%d %s@%s" % (port, user, host) -# logg.info("Spawn: " + cmd + NL) -# egg = pexpect.spawn(cmd) -# # egg.logfile_read = sys.stdout.buffer -# egg.logfile = FileAdapter(logg) -# i = egg.expect(["password:", "continue connecting (yes/no)?"], timeout=3) -# time.sleep(0.1) -# if i == 1: -# egg.sendline('yes') -# egg.expect('password:') -# sleep(0.1) -# egg.sendline(passwd) -# -# elif (scheme == "telnet"): -# # Not implemented/tested currently. --Ben -# if (port is None): -# port = 23 -# cmd = "telnet %s %d" % (host, port) -# logg.info("Spawn: " + cmd + NL) -# egg = pexpect.spawn(cmd) -# egg.logfile = FileAdapter(logg) -# time.sleep(0.1) -# egg.sendline(' ') -# egg.expect('User\:') -# egg.sendline(user) -# egg.expect('Password\:') -# egg.sendline(passwd) -# egg.sendline('config paging disable') -# else: -# usage() -# exit(1) -# except Exception as e: -# logging.exception(e); -# -# command = None -# -# CLOSEDBYREMOTE = "closed by remote host." -# CLOSEDCX = "Connection to .* closed." -# -# try: -# egg.expect(CCPROMPT) -# except Exception as e: -# egg.sendline(NL) -# -# TO = 10 -# wait_forever = False -# -# # Clean pending output -# egg.sendline("echo __hello__") -# egg.expect("__hello__") -# egg.expect(CCPROMPT) -# -# logg.info("Action[%s] Value[%s] Value2[%s]" % (args.action, args.value, args.value2)) -# -# if (args.action == "reboot"): -# command = "reboot" -# TO = 60 -# -# if (args.action == "cmd"): -# if (args.value is None): -# raise Exception("cmd requires value to be set.") -# command = "%s" % (args.value) -# -# if (args.action == "logread"): -# command = "logread -f" -# TO = 1 -# wait_forever = True -# -# if (args.action == "journalctl"): -# command = "journalctl -f" -# TO = 1 -# wait_forever = True -# -# if (args.action == "lurk"): -# command = "date" -# TO = 1 -# wait_forever = True -# -# if (args.action == "ssh-tunnel"): -# command = "%s" % (args.value) -# passwd = "%s" % (args.value2) -# logg.info("Command[%s]" % command) -# egg.sendline(command); -# -# i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5) -# if i == 1: -# egg.sendline("y") -# egg.expect("password:", timeout=5) -# egg.sendline(passwd) -# egg.expect(CCPROMPT, timeout=20) -# return -# -# if ((args.action == "sysupgrade") or (args.action == "sysupgrade-n")): -# command = "scp %s /tmp/new_img.bin" % (args.value) -# logg.info("Command[%s]" % command) -# egg.sendline(command); -# -# i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5) -# if i == 1: -# egg.sendline("y") -# egg.expect("password:", timeout=5) -# egg.sendline("lanforge") -# egg.expect(CCPROMPT, timeout=20) -# if (args.action == "sysupgrade-n"): -# egg.sendline("sysupgrade -n /tmp/new_img.bin") -# else: -# egg.sendline("sysupgrade /tmp/new_img.bin") -# egg.expect("link becomes ready", timeout=100) -# return -# -# if (args.action == "download"): -# command = "scp %s /tmp/%s" % (args.value, args.value2) -# logg.info("Command[%s]" % command) -# egg.sendline(command); -# -# i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) -# if i == 2: -# print("Network unreachable, wait 15 seconds and try again.") -# time.sleep(15) -# command = "scp %s /tmp/%s" % (args.value, args.value2) -# logg.info("Command[%s]" % command) -# egg.sendline(command); -# -# i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) -# if i == 2: -# print("ERROR: Could not connect to LANforge to get download file") -# exit(2) -# if i == 1: -# egg.sendline("y") -# egg.expect("password:", timeout=5) -# egg.sendline("lanforge") -# egg.expect(CCPROMPT, timeout=20) -# return -# -# if (args.action == "upload"): -# command = "scp %s %s" % (args.value, args.value2) -# logg.info("Command[%s]" % command) -# egg.sendline(command); -# -# i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) -# if i == 2: -# print("Network unreachable, wait 15 seconds and try again.") -# time.sleep(15) -# command = "scp /tmp/%s %s" % (args.value, args.value2) -# logg.info("Command[%s]" % command) -# egg.sendline(command); -# -# i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) -# if i == 2: -# print("ERROR: Could not connect to LANforge to put upload file") -# exit(2) -# if i == 1: -# egg.sendline("y") -# egg.expect("password:", timeout=5) -# egg.sendline("lanforge") -# egg.expect(CCPROMPT, timeout=20) -# return -# -# if (command is None): -# logg.info("No command specified, going to log out.") -# else: -# logg.info("Command[%s]" % command) -# egg.sendline(command); -# while True: -# try: -# i = egg.expect([CCPROMPT, "kmodloader: done loading kernel", "\n"], timeout=TO) -# print(egg.before.decode('utf-8', 'ignore')) -# if i == 1: -# egg.sendline(' ') -# egg.expect(CCPROMPT, timeout=20) -# print(egg.before.decode('utf-8', 'ignore')) -# if i == 2: # new line of text, just print and continue -# continue -# -# if not wait_forever: -# break -# -# except Exception as e: -# # Some commands take a long time (logread -f) -# if not wait_forever: -# logging.exception(e) -# break +import sys + +if sys.version_info[0] != 3: + print("This script requires Python 3") + exit() + +import re +import logging +import time +from time import sleep +import pprint +import telnetlib +import argparse +import pexpect + +default_host = "localhost" +default_ports = { + "serial": None, + "ssh": 22, + "telnet": 23 +} +NL = "\n" +CR = "\r\n" +Q = '"' +A = "'" +FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s' +prompt = "root@OpenWrt:" + + +def usage(): + print("$0 used connect to OpenWrt AP or similar Linux machine:") + print("-d|--dest IP address of the OpenWrt AP, for ssh/telnet scheme") + print("-o|--port IP port of the OpenWrt AP, for ssh/telnet scheme") + print("-t|--tty Serial port, if using serial scheme") + print("-u|--user login name") + print("-p|--pass password") + print("--prompt Prompt to look for when commands are done (default: root@OpenWrt)") + print("-s|--scheme (serial|telnet|ssh): connect via serial, ssh or telnet") + print("-l|--log file log messages here") + print("--action (logread | journalctl | lurk | sysupgrade | download | upload | reboot | cmd | ssh-tunnel") + print("--value (option to help complete the action") + print("--value2 (option to help complete the action, dest filename for download, passwd for ssh-tunnel") + print("-h|--help") + + +# see https://stackoverflow.com/a/13306095/11014343 +class FileAdapter(object): + def __init__(self, logger): + self.logger = logger + + def write(self, data): + # NOTE: data can be a partial line, multiple lines + data = data.strip() # ignore leading/trailing whitespace + if data: # non-blank + self.logger.info(data) + + def flush(self): + pass # leave it to logging to flush properly + + +def main(): + global prompt + + parser = argparse.ArgumentParser(description="OpenWrt AP Control Script") + parser.add_argument("-d", "--dest", type=str, help="address of the cisco controller_tests") + parser.add_argument("-o", "--port", type=int, help="control port on the controller_tests") + parser.add_argument("-u", "--user", type=str, help="credential login/username") + parser.add_argument("-p", "--passwd", type=str, help="credential password") + parser.add_argument("-P", "--prompt", type=str, help="Prompt to look for") + parser.add_argument("-s", "--scheme", type=str, choices=["serial", "ssh", "telnet"], + help="Connect via serial, ssh or telnet") + parser.add_argument("-t", "--tty", type=str, help="tty serial device") + parser.add_argument("-l", "--log", type=str, help="logfile for messages, stdout means output to console") + parser.add_argument("--action", type=str, help="perform action", + choices=["logread", "journalctl", "lurk", "sysupgrade", "sysupgrade-n", "download", "upload", + "reboot", "cmd", "ssh-tunnel"]) + parser.add_argument("--value", type=str, help="set value") + parser.add_argument("--value2", type=str, help="set value2") + tty = None + + args = None + try: + args = parser.parse_args() + host = args.dest + scheme = args.scheme + port = args.port + # port = (default_ports[scheme], args.port)[args.port != None] + user = args.user + passwd = args.passwd + logfile = args.log + tty = args.tty; + if (args.prompt != None): + prompt = args.prompt + filehandler = None + except Exception as e: + logging.exception(e); + usage() + exit(2); + + console_handler = logging.StreamHandler() + formatter = logging.Formatter(FORMAT) + logg = logging.getLogger(__name__) + logg.setLevel(logging.DEBUG) + file_handler = None + if (logfile is not None): + if (logfile != "stdout"): + file_handler = logging.FileHandler(logfile, "w") + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(formatter) + logg.addHandler(file_handler) + logging.basicConfig(format=FORMAT, handlers=[file_handler]) + else: + # stdout logging + logging.basicConfig(format=FORMAT, handlers=[console_handler]) + + CCPROMPT = prompt + + ser = None + egg = None # think "eggpect" + try: + if (scheme == "serial"): + # eggspect = pexpect.fdpexpect.fdspan(telcon, logfile=sys.stdout.buffer) + import serial + from pexpect_serial import SerialSpawn + ser = serial.Serial(tty, 115200, timeout=5) + + egg = SerialSpawn(ser); + egg.logfile = FileAdapter(logg) + egg.sendline(NL) + has_reset = False + try: + logg.info("prompt: %s user: %s passwd: %s" % (prompt, user, passwd)) + while True: + i = egg.expect([prompt, "Please press Enter to activate", "login:", "Password:", "IPQ6018#"], + timeout=3) + logg.info("expect-0: %i" % (i)) + if (i == 0): + logg.info("Found prompt, login complete.") + break + if (i == 1): + logg.info("Sending newline") + egg.setdline(NL) + if (i == 2): + logg.info("Sending username: %s" % (user)) + egg.sendline(user) + if (i == 3): + logg.info("Sending password: %s" % (passwd)) + egg.sendline(passwd) + if (i == 4): # in bootloader + if has_reset: + logg.info("ERROR: Have reset once already, back in bootloader?") + sys.exit(1) + has_reset = True + logg.info("In boot loader, will reset and sleep 30 seconds") + egg.sendline("reset") + time.sleep(30) + egg.sendline(NL) + + except Exception as e: + # maybe something like 'logread -f' is running? + # send ctrl-c + egg.send(chr(3)) + + elif (scheme == "ssh"): + # Not implemented/tested currently. --Ben + if (port is None): + port = 22 + cmd = "ssh -p%d %s@%s" % (port, user, host) + logg.info("Spawn: " + cmd + NL) + egg = pexpect.spawn(cmd) + # egg.logfile_read = sys.stdout.buffer + egg.logfile = FileAdapter(logg) + i = egg.expect(["password:", "continue connecting (yes/no)?"], timeout=3) + time.sleep(0.1) + if i == 1: + egg.sendline('yes') + egg.expect('password:') + sleep(0.1) + egg.sendline(passwd) + + elif (scheme == "telnet"): + # Not implemented/tested currently. --Ben + if (port is None): + port = 23 + cmd = "telnet %s %d" % (host, port) + logg.info("Spawn: " + cmd + NL) + egg = pexpect.spawn(cmd) + egg.logfile = FileAdapter(logg) + time.sleep(0.1) + egg.sendline(' ') + egg.expect('User\:') + egg.sendline(user) + egg.expect('Password\:') + egg.sendline(passwd) + egg.sendline('config paging disable') + else: + usage() + exit(1) + except Exception as e: + logging.exception(e); + + command = None + + CLOSEDBYREMOTE = "closed by remote host." + CLOSEDCX = "Connection to .* closed." + + try: + egg.expect(CCPROMPT) + except Exception as e: + egg.sendline(NL) + + TO = 10 + wait_forever = False + + # Clean pending output + egg.sendline("echo __hello__") + egg.expect("__hello__") + egg.expect(CCPROMPT) + + logg.info("Action[%s] Value[%s] Value2[%s]" % (args.action, args.value, args.value2)) + + if (args.action == "reboot"): + command = "reboot" + TO = 60 + + if (args.action == "cmd"): + if (args.value is None): + raise Exception("cmd requires value to be set.") + command = "%s" % (args.value) + + if (args.action == "logread"): + command = "logread -f" + TO = 1 + wait_forever = True + + if (args.action == "journalctl"): + command = "journalctl -f" + TO = 1 + wait_forever = True + + if (args.action == "lurk"): + command = "date" + TO = 1 + wait_forever = True + + if (args.action == "ssh-tunnel"): + command = "%s" % (args.value) + passwd = "%s" % (args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline(passwd) + egg.expect(CCPROMPT, timeout=20) + return + + if ((args.action == "sysupgrade") or (args.action == "sysupgrade-n")): + command = "scp %s /tmp/new_img.bin" % (args.value) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline("lanforge") + egg.expect(CCPROMPT, timeout=20) + if (args.action == "sysupgrade-n"): + egg.sendline("sysupgrade -n /tmp/new_img.bin") + else: + egg.sendline("sysupgrade /tmp/new_img.bin") + egg.expect("link becomes ready", timeout=100) + return + + if (args.action == "download"): + command = "scp %s /tmp/%s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("Network unreachable, wait 15 seconds and try again.") + time.sleep(15) + command = "scp %s /tmp/%s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("ERROR: Could not connect to LANforge to get download file") + exit(2) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline("lanforge") + egg.expect(CCPROMPT, timeout=20) + return + + if (args.action == "upload"): + command = "scp %s %s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("Network unreachable, wait 15 seconds and try again.") + time.sleep(15) + command = "scp /tmp/%s %s" % (args.value, args.value2) + logg.info("Command[%s]" % command) + egg.sendline(command); + + i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5) + if i == 2: + print("ERROR: Could not connect to LANforge to put upload file") + exit(2) + if i == 1: + egg.sendline("y") + egg.expect("password:", timeout=5) + egg.sendline("lanforge") + egg.expect(CCPROMPT, timeout=20) + return + + if (command is None): + logg.info("No command specified, going to log out.") + else: + logg.info("Command[%s]" % command) + egg.sendline(command); + while True: + try: + i = egg.expect([CCPROMPT, "kmodloader: done loading kernel", "\n"], timeout=TO) + print(egg.before.decode('utf-8', 'ignore')) + if i == 1: + egg.sendline(' ') + egg.expect(CCPROMPT, timeout=20) + print(egg.before.decode('utf-8', 'ignore')) + if i == 2: # new line of text, just print and continue + continue + + if not wait_forever: + break + + except Exception as e: + # Some commands take a long time (logread -f) + if not wait_forever: + logging.exception(e) + break # # # # ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -# if __name__ == '__main__': -# main() +if __name__ == '__main__': + main() # # #### # ####