mirror of
				https://github.com/Telecominfraproject/wlan-testing.git
				synced 2025-11-04 04:48:01 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			703 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			703 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# """
 | 
						|
# APNOS Library : Used to execute SSH Commands in AP Using Direct-AP-SSH/ Jumphost-Serial Console
 | 
						|
#
 | 
						|
# Currently Having Methods:
 | 
						|
#     1. Get iwinfo
 | 
						|
#     2. AP Manager Satus
 | 
						|
#     3. Vif Config ssid's
 | 
						|
#     4. Vif State ssid's
 | 
						|
#     5. Get current Firmware
 | 
						|
#
 | 
						|
# """
 | 
						|
# import json
 | 
						|
# import os
 | 
						|
#
 | 
						|
# import paramiko
 | 
						|
# import pytest
 | 
						|
# from scp import SCPClient
 | 
						|
#
 | 
						|
#
 | 
						|
# class APNOS:
 | 
						|
#
 | 
						|
#     def __init__(self, credentials=None, pwd=os.getcwd(), sdk="2.x"):
 | 
						|
#         self.serial = credentials['serial']
 | 
						|
#         self.owrt_args = "--prompt root@OpenAp -s serial --log stdout --user root --passwd openwifi"
 | 
						|
#         self.sdk = sdk
 | 
						|
#         if sdk == "2.x":
 | 
						|
#             self.owrt_args = "--prompt root@" + self.serial + " -s serial --log stdout --user root --passwd openwifi"
 | 
						|
#         if credentials is None:
 | 
						|
#             print("No credentials Given")
 | 
						|
#             exit()
 | 
						|
#         self.ip = credentials['ip']  # if mode=1, enter jumphost ip else ap ip address
 | 
						|
#         self.username = credentials['username']  # if mode=1, enter jumphost username else ap username
 | 
						|
#         self.password = credentials['password']  # if mode=1, enter jumphost password else ap password
 | 
						|
#         self.port = credentials['port']  # if mode=1, enter jumphost ssh port else ap ssh port
 | 
						|
#         self.mode = credentials['jumphost']  # 1 for jumphost, 0 for direct ssh
 | 
						|
#
 | 
						|
#         if 'mode' in credentials:
 | 
						|
#             self.type = credentials['mode']
 | 
						|
#
 | 
						|
#         if self.mode:
 | 
						|
#             self.tty = credentials['jumphost_tty']  # /dev/ttyAP1
 | 
						|
#             # kill minicom instance
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "pgrep 'minicom' -a"
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             a = str(stdout.read()).split("\\n")
 | 
						|
#             for i in a:
 | 
						|
#                 if i.__contains__("minicom ap" + self.tty[-1]):
 | 
						|
#                     temp = i.split("minicom")
 | 
						|
#                     a = temp[0].replace(" ", "")
 | 
						|
#                     cmd = "kill " + str(a).replace("b'", "")
 | 
						|
#                     print(cmd)
 | 
						|
#                     stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"'
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = str(stdout.read())
 | 
						|
#
 | 
						|
#             if output.__contains__("False"):
 | 
						|
#                 cmd = 'mkdir ~/cicd-git/'
 | 
						|
#                 stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"'
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = str(stdout.read())
 | 
						|
#             if output.__contains__("False"):
 | 
						|
#                 print("Copying openwrt_ctl serial control Script...")
 | 
						|
#                 with SCPClient(client.get_transport()) as scp:
 | 
						|
#                     scp.put(pwd + '/openwrt_ctl.py', '~/cicd-git/openwrt_ctl.py')  # Copy my_file.txt to the server
 | 
						|
#             cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"'
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             var = str(stdout.read())
 | 
						|
#             client.close()
 | 
						|
#             if var.__contains__("True"):
 | 
						|
#                 print("APNOS Serial Setup OK")
 | 
						|
#             else:
 | 
						|
#                 print("APNOS Serial Setup Fail")
 | 
						|
#
 | 
						|
#     # Method to connect AP-CLI/ JUMPHOST-CLI
 | 
						|
#     def ssh_cli_connect(self):
 | 
						|
#         client = paramiko.SSHClient()
 | 
						|
#         client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 | 
						|
#         print("Connecting to jumphost: %s@%s:%s" % (
 | 
						|
#             self.username, self.ip, self.port))
 | 
						|
#         client.connect(self.ip, username=self.username, password=self.password,
 | 
						|
#                        port=self.port, timeout=10, allow_agent=False, banner_timeout=200)
 | 
						|
#         return client
 | 
						|
#
 | 
						|
#     def reboot(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#
 | 
						|
#         cmd = "reboot"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read()
 | 
						|
#         client.close()
 | 
						|
#         return output
 | 
						|
#
 | 
						|
#     # Method to get the iwinfo status of AP using AP-CLI/ JUMPHOST-CLI
 | 
						|
#
 | 
						|
#     def get_bssid_band_mapping(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         cmd = 'iwinfo'
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         data = stdout.read()
 | 
						|
#         client.close()
 | 
						|
#         data = str(data).replace(" ", "").split("\\r\\n")
 | 
						|
#         band_info = []
 | 
						|
#         client.close()
 | 
						|
#         for i in data:
 | 
						|
#             tmp = []
 | 
						|
#             if i.__contains__("AccessPoint"):
 | 
						|
#                 bssid = i.replace("AccessPoint:", "")
 | 
						|
#                 tmp.append(bssid.casefold())
 | 
						|
#             elif i.__contains__("MasterChannel"):
 | 
						|
#                 if i.split(":")[2].__contains__("2.4"):
 | 
						|
#                     tmp.append("2G")
 | 
						|
#                 else:
 | 
						|
#                     tmp.append("5G")
 | 
						|
#             else:
 | 
						|
#                 tmp = []
 | 
						|
#             if tmp != []:
 | 
						|
#                 band_info.append(tmp)
 | 
						|
#         bssi_band_mapping = {}
 | 
						|
#         for i in range(len(band_info)):
 | 
						|
#             if (i % 2) == 0:
 | 
						|
#                 bssi_band_mapping[band_info[i][0]] = band_info[i + 1][0]
 | 
						|
#         return bssi_band_mapping
 | 
						|
#
 | 
						|
#     # Method to get the vif_config of AP using AP-CLI/ JUMPHOST-CLI
 | 
						|
#     def get_vif_config(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         cmd = "/usr/opensync/bin/ovsh s Wifi_VIF_Config -c"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read()
 | 
						|
#         client.close()
 | 
						|
#
 | 
						|
#         return output
 | 
						|
#
 | 
						|
#     # Method to get the vif_state of AP using AP-CLI/ JUMPHOST-CLI
 | 
						|
#     def get_vif_state(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         cmd = "/usr/opensync/bin/ovsh s Wifi_VIF_State -c"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read()
 | 
						|
#         client.close()
 | 
						|
#         return output
 | 
						|
#
 | 
						|
#     # Method to get the vif_config ssid's of AP using AP-CLI/ JUMPHOST-CLI
 | 
						|
#     def get_vif_config_ssids(self):
 | 
						|
#         stdout = self.get_vif_config()
 | 
						|
#         ssid_list = []
 | 
						|
#         for i in stdout.splitlines():
 | 
						|
#             ssid = str(i).replace(" ", "").split(".")
 | 
						|
#             if ssid[0].split(":")[0] == "b'ssid":
 | 
						|
#                 ssid_list.append(ssid[0].split(":")[1].replace("'", ""))
 | 
						|
#         return ssid_list
 | 
						|
#
 | 
						|
#     # Method to get the vif_state ssid's of AP using AP-CLI/ JUMPHOST-CLI
 | 
						|
#     def get_ssid_info(self):
 | 
						|
#         stdout = self.get_vif_state()
 | 
						|
#         ssid_info_list = []
 | 
						|
#         info = []
 | 
						|
#         for i in stdout.splitlines():
 | 
						|
#             ssid = str(i).replace(" ", "").split(".")
 | 
						|
#             # print(ssid)
 | 
						|
#             if ssid[0].split(":")[0] == "b'mac":
 | 
						|
#                 mac_info_list = ssid[0].split(":")
 | 
						|
#                 mac_info_list.pop(0)
 | 
						|
#                 info.append(":".join(mac_info_list).replace("'", ""))
 | 
						|
#             if ssid[0].split(":")[0] == "b'security":
 | 
						|
#                 security = ssid[0].split(":")[1].split(",")[2].replace("]", "").replace('"', "").replace("'", "")
 | 
						|
#                 print(ssid[0].split(":")[1])
 | 
						|
#                 if security != "OPEN":
 | 
						|
#                     if security == "WPA-PSK":
 | 
						|
#                         if ssid[0].split(":")[1].split(",")[6].__contains__("1"):
 | 
						|
#                             info.append("WPA")
 | 
						|
#                             security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
 | 
						|
#                         if ssid[0].split(":")[1].split(",")[6].__contains__("2"):
 | 
						|
#                             info.append("WPA2")
 | 
						|
#                             security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
 | 
						|
#                         if ssid[0].split(":")[1].split(",")[6].__contains__("mixed"):
 | 
						|
#                             info.append("WPA | WPA2")
 | 
						|
#                             security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
 | 
						|
#                     if security == "WPA-SAE":
 | 
						|
#                         if ssid[0].split(":")[1].split(",")[6].__contains__("3"):
 | 
						|
#                             info.append("WPA3_PERSONAL")
 | 
						|
#                             security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
 | 
						|
#                         if ssid[0].split(":")[1].split(",")[6].__contains__("mixed"):
 | 
						|
#                             info.append("WPA3_PERSONAL")
 | 
						|
#                             security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
 | 
						|
#                     if security == "WPA-EAP":
 | 
						|
#                         info.append("EAP-TTLS")
 | 
						|
#                         security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
 | 
						|
#                     if security == "WPA3-EAP":
 | 
						|
#                         info.append("EAP-TTLS")
 | 
						|
#                         security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
 | 
						|
#                     else:
 | 
						|
#                         security_key = ssid[0].split(":")[1].split(",")[4].replace('"', "").replace("]", "")
 | 
						|
#                     info.append(security_key)
 | 
						|
#                 else:
 | 
						|
#                     info.append("OPEN")
 | 
						|
#             if ssid[0].split(":")[0] == "b'ssid":
 | 
						|
#                 info.append(ssid[0].split(":")[1].replace("'", ""))
 | 
						|
#                 ssid_info_list.append(info)
 | 
						|
#                 info = []
 | 
						|
#         print(ssid_info_list)
 | 
						|
#         return ssid_info_list
 | 
						|
#
 | 
						|
#     # Get VIF State parameters
 | 
						|
#     def get_vif_state_ssids(self):
 | 
						|
#         stdout = self.get_vif_state()
 | 
						|
#         ssid_list = []
 | 
						|
#         for i in stdout.splitlines():
 | 
						|
#             ssid = str(i).replace(" ", "").split(".")
 | 
						|
#             if ssid[0].split(":")[0] == "b'ssid":
 | 
						|
#                 ssid_list.append(ssid[0].split(":")[1].replace("'", ""))
 | 
						|
#         return ssid_list
 | 
						|
#
 | 
						|
#     # Method to get the active firmware of AP using AP-CLI/ JUMPHOST-CLI
 | 
						|
#     def get_active_firmware(self):
 | 
						|
#         try:
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = '/usr/opensync/bin/ovsh s AWLAN_Node -c | grep FW_IMAGE_ACTIVE'
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \
 | 
						|
#                       f" --action cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read()
 | 
						|
#             # print(output)
 | 
						|
#             version_matrix = str(output.decode('utf-8').splitlines())
 | 
						|
#             version_matrix_split = version_matrix.partition('FW_IMAGE_ACTIVE","')[2]
 | 
						|
#             cli_active_fw = version_matrix_split.partition('"],[')[0]
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             print(e)
 | 
						|
#             cli_active_fw = "Error"
 | 
						|
#         return cli_active_fw
 | 
						|
#
 | 
						|
#     # Method to get the manager state of AP using AP-CLI/ JUMPHOST-CLI
 | 
						|
#     def get_manager_state(self):
 | 
						|
#         try:
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = '/usr/opensync/bin/ovsh s Manager -c | grep status'
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \
 | 
						|
#                       f" --action cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read()
 | 
						|
#             status = str(output.decode('utf-8').splitlines())
 | 
						|
#             # print(output, stderr.read())
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             print(e)
 | 
						|
#             status = "Error"
 | 
						|
#         return status
 | 
						|
#
 | 
						|
#     def get_serial_number(self):
 | 
						|
#         try:
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "node | grep serial_number"
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read()
 | 
						|
#             output = output.decode('utf-8').splitlines()
 | 
						|
#             serial = output[1].replace(" ", "").split("|")[1]
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             print(e)
 | 
						|
#             serial = "Error"
 | 
						|
#         return serial
 | 
						|
#
 | 
						|
#     def get_redirector(self):
 | 
						|
#         try:
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "node | grep redirector_addr"
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read()
 | 
						|
#             print(output, stderr.read())
 | 
						|
#             status = output.decode('utf-8').splitlines()
 | 
						|
#             redirector = status[1].replace(" ", "").split("|")[1]
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             print(e)
 | 
						|
#             redirector = "Error"
 | 
						|
#         return redirector
 | 
						|
#
 | 
						|
#     def run_generic_command(self, cmd=""):
 | 
						|
#         try:
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = cmd
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read()
 | 
						|
#             status = output.decode('utf-8').splitlines()
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             print(e)
 | 
						|
#             status = "Error"
 | 
						|
#         return status
 | 
						|
#
 | 
						|
#     def get_ucentral_status(self):
 | 
						|
#         try:
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "ubus call ucentral status"
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read()
 | 
						|
#             print(output)
 | 
						|
#             if 'latest' not in str(output):
 | 
						|
#                 print("ubus call ucentral status: command has invalid output", str(output))
 | 
						|
#                 connected, latest, active = "Error", "Error1", "Error2"
 | 
						|
#                 return connected, latest, active
 | 
						|
#             else:
 | 
						|
#                 connected = False
 | 
						|
#                 if "\"connected" in output.decode('utf-8').splitlines()[2]:
 | 
						|
#                     connected = True
 | 
						|
#                 # connected = output.decode('utf-8').splitlines()[2]
 | 
						|
#                 latest = output.decode('utf-8').splitlines()[3].split(":")[1].replace(" ", "").replace(",", "")
 | 
						|
#                 active = output.decode('utf-8').splitlines()[4].split(":")[1].replace(" ", "").replace(",", "")
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             if output.__contains__(b'"connected":'):
 | 
						|
#                 pass
 | 
						|
#             else:
 | 
						|
#                 pytest.exit("ubus call ucentral status: error" + str(output))
 | 
						|
#                 print(e)
 | 
						|
#             connected, latest, active = "Error", "Error", "Error"
 | 
						|
#         return connected, latest, active
 | 
						|
#
 | 
						|
#     def get_uc_latest_config(self):
 | 
						|
#         try:
 | 
						|
#             connected, latest, active = self.get_ucentral_status()
 | 
						|
#             print()
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "cat /etc/ucentral/ucentral.cfg." + latest
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read().decode('utf-8').splitlines()[1]
 | 
						|
#             print(output)
 | 
						|
#             json_output = json.loads(output)  # , sort_keys=True)
 | 
						|
#             print(type(json_output))
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             json_output = {}
 | 
						|
#             print(e)
 | 
						|
#         return json_output
 | 
						|
#
 | 
						|
#     def get_uc_active_config(self):
 | 
						|
#         try:
 | 
						|
#             connected, latest, active = self.get_ucentral_status()
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "cat /etc/ucentral/ucentral.cfg." + active
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read().decode('utf-8').splitlines()[1]
 | 
						|
#             json_output = json.loads(output)  # , sort_keys=True)
 | 
						|
#             print(json_output)
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             json_output = {}
 | 
						|
#             print(e)
 | 
						|
#         return json_output
 | 
						|
#
 | 
						|
#     def get_interface_details(self):
 | 
						|
#         r = self.get_wifi_status()
 | 
						|
#         print(r)
 | 
						|
#         wifi_info = {}
 | 
						|
#         if self.sdk == "1.x":
 | 
						|
#             for i in r:
 | 
						|
#                 for j in r[i]["interfaces"]:
 | 
						|
#                     encryption = j["config"]["encryption"]
 | 
						|
#                     if encryption == "psk" or encryption == "psk2" or encryption == "psk-mixed" or \
 | 
						|
#                             encryption == "sae" or encryption == "sae-mixed":
 | 
						|
#                         wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], j["config"]["key"]]
 | 
						|
#                     else:
 | 
						|
#                         wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], ""]
 | 
						|
#             print(wifi_info)
 | 
						|
#             data = self.get_iwinfo()
 | 
						|
#             for i in wifi_info.keys():
 | 
						|
#                 wifi_info[i].append(data[i])
 | 
						|
#
 | 
						|
#             return wifi_info
 | 
						|
#         if self.sdk == "2.x":
 | 
						|
#             for i in r:
 | 
						|
#                 for j in r[i]["interfaces"]:
 | 
						|
#                     encryption = j["config"]["encryption"]
 | 
						|
#                     if encryption == "psk" or encryption == "psk2" or encryption == "psk-mixed" or \
 | 
						|
#                             encryption == "sae" or encryption == "sae-mixed":
 | 
						|
#                         wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], j["config"]["key"]]
 | 
						|
#                     else:
 | 
						|
#                         wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], ""]
 | 
						|
#             data = self.get_iwinfo()
 | 
						|
#             print(wifi_info)
 | 
						|
#             print(data)
 | 
						|
#             for i in wifi_info.keys():
 | 
						|
#                 wifi_info[i].append(data[i])
 | 
						|
#             return wifi_info
 | 
						|
#
 | 
						|
#     def get_wifi_status(self):
 | 
						|
#         try:
 | 
						|
#
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "wifi status"
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#
 | 
						|
#             output = stdout.read().decode('utf-8')
 | 
						|
#             data = output.split()
 | 
						|
#             data.pop(0)
 | 
						|
#             data.pop(0)
 | 
						|
#             data.pop(0)
 | 
						|
#             OUT = "".join(data)
 | 
						|
#             json_output = json.loads(OUT)
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             json_output = False
 | 
						|
#             print(e)
 | 
						|
#         return json_output
 | 
						|
#
 | 
						|
#     def get_iwinfo(self):
 | 
						|
#         try:
 | 
						|
#             # [['ssid_wpa2_2g', 'wpa', 'something', '2G'], ['ssid_wpa2_2g', 'wpa', 'something', '5G']]
 | 
						|
#             # {'wlan0': ['"ssid_wpa3_p_5g"', '12:34:56:78:90:12', '5G'], 'wlan1': ['"ssid_wpa3_p_2g"','00:03:7F:12:34:56', '5G']}
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "iwinfo"
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8')
 | 
						|
#             o = output.split()
 | 
						|
#             iwinfo_bssid_data = {}
 | 
						|
#             for i in range(len(o)):
 | 
						|
#                 if o[i].__contains__("ESSID"):
 | 
						|
#                     if o[i + 9].__contains__("2.4"):
 | 
						|
#                         band = "2G"
 | 
						|
#                     else:
 | 
						|
#                         band = "5G"
 | 
						|
#                     iwinfo_bssid_data[o[i - 1]] = [o[i + 1].replace('"', ''), o[i + 4], band]
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             iwinfo_bssid_data = False
 | 
						|
#             print(e)
 | 
						|
#         return iwinfo_bssid_data
 | 
						|
#
 | 
						|
#     def iwinfo(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         cmd = "iwinfo"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8')
 | 
						|
#         o = output
 | 
						|
#         client.close()
 | 
						|
#         return o
 | 
						|
#
 | 
						|
#     def get_memory_profile(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         cmd = "ucode /usr/share/ucentral/sysinfo.uc"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8')
 | 
						|
#         o = output
 | 
						|
#         client.close()
 | 
						|
#         return o
 | 
						|
#
 | 
						|
#     def gettxpower(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         cmd = "iw dev | grep txpower"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read().replace(b":~# iw dev | grep txpower", b"").decode('utf-8')
 | 
						|
#         tx_power = output.replace("\t\t", "").split("\r\n")
 | 
						|
#         tx_power.remove('')
 | 
						|
#         tx_power.remove('\n')
 | 
						|
#         cmd = "iw dev | grep Interface"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read().replace(b":~# iw dev | grep Interface", b"").decode('utf-8')
 | 
						|
#         name = output.replace("\t", "").splitlines()
 | 
						|
#         name.remove('')
 | 
						|
#         name.pop(-1)
 | 
						|
#         client.close()
 | 
						|
#         return tx_power, name
 | 
						|
#
 | 
						|
    # def get_logread(self, start_ref="", stop_ref=""):
 | 
						|
    #     data = self.logread()
 | 
						|
    #     log_data = []
 | 
						|
    #     data = data.split("\n")
 | 
						|
    #     flag = 0
 | 
						|
    #     for logs in data:
 | 
						|
    #         if logs.__contains__(start_ref):
 | 
						|
    #             flag = 1
 | 
						|
    #         if flag == 1:
 | 
						|
    #             log_data.append(logs)
 | 
						|
    #         if logs.__contains__(stop_ref):
 | 
						|
    #             flag = 0
 | 
						|
    #     ap_logs = "\n".join(log_data)
 | 
						|
    #     return ap_logs
 | 
						|
#
 | 
						|
#     def logread(self):
 | 
						|
#         try:
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "logread"
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read()
 | 
						|
#             status = output.decode('utf-8').splitlines()
 | 
						|
#             logread = status
 | 
						|
#             logs = ""
 | 
						|
#             for i in logread:
 | 
						|
#                 logs = logs + i + "\n"
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             print(e)
 | 
						|
#             logs = ""
 | 
						|
#         return logs
 | 
						|
#
 | 
						|
#     def get_ap_version_ucentral(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         cmd = "cat /tmp/ucentral.version"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read().replace(b":~# cat /tmp/ucentral.version", b"").decode('utf-8')
 | 
						|
#         client.close()
 | 
						|
#         return output
 | 
						|
#
 | 
						|
#     def get_vifc(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         cmd = "vifC"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read()
 | 
						|
#         client.close()
 | 
						|
#         return output
 | 
						|
#
 | 
						|
#     def get_vifs(self):
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         cmd = "vifS"
 | 
						|
#         if self.mode:
 | 
						|
#             cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                   f"cmd --value \"{cmd}\" "
 | 
						|
#         stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#         output = stdout.read()
 | 
						|
#         client.close()
 | 
						|
#         return output
 | 
						|
#
 | 
						|
#     def get_vlan(self):
 | 
						|
#         stdout = self.get_vifs()
 | 
						|
#         vlan_list = []
 | 
						|
#         for i in stdout.splitlines():
 | 
						|
#             vlan = str(i.strip()).replace("|", ".").split(".")
 | 
						|
#             try:
 | 
						|
#                 if not vlan[0].find("b'vlan_id"):
 | 
						|
#                     vlan_list.append(vlan[1].strip())
 | 
						|
#             except:
 | 
						|
#                 pass
 | 
						|
#         return vlan_list
 | 
						|
#
 | 
						|
#     def get_ap_uci_show_ucentral(self):
 | 
						|
#         try:
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd = "uci show ucentral"
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read()
 | 
						|
#             status = output.decode('utf-8').splitlines()
 | 
						|
#             for i in status:
 | 
						|
#                 if i.startswith("ucentral.config.server="):
 | 
						|
#                     status = i.split("=")[1]
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             print(e)
 | 
						|
#             status = "Error"
 | 
						|
#         return status
 | 
						|
#
 | 
						|
#     def dfs(self):
 | 
						|
#         if self.type.lower() == "wifi5":
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy1/ath10k/dfs_simulate_radar ] && echo "True" || echo "False"'
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd1}\" "
 | 
						|
#                 stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#                 output = stdout.read()
 | 
						|
#                 status = output.decode('utf-8')
 | 
						|
#                 status_count = int(status.count("True"))
 | 
						|
#                 print("status", status)
 | 
						|
#                 print("count", status_count)
 | 
						|
#                 client.close()
 | 
						|
#                 if status_count == 1:
 | 
						|
#                     cmd = "cd && cd /sys/kernel/debug/ieee80211/phy0/ath10k/ && echo 1 > dfs_simulate_radar"
 | 
						|
#                 else:
 | 
						|
#                     cmd = "cd && cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && echo 1 > dfs_simulate_radar"
 | 
						|
#                 command = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                           f"cmd --value \"{cmd}\" "
 | 
						|
#         elif self.type.lower() == "wifi6" or self.type.lower() == "wifi6e":
 | 
						|
#             cmd = f'cd  && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && ls && echo 1 > dfs_simulate_radar'
 | 
						|
#             print("cmd: ", cmd)
 | 
						|
#             if self.mode:
 | 
						|
#                 command = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                           f"cmd --value \"{cmd}\" "
 | 
						|
#         client = self.ssh_cli_connect()
 | 
						|
#         stdin, stdout, stderr = client.exec_command(command)
 | 
						|
#         output = stdout.read()
 | 
						|
#         print("Output", output)
 | 
						|
#         client.close()
 | 
						|
#
 | 
						|
#     def dfs_logread(self):
 | 
						|
#         if self.type.lower() == "wifi5":
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             cmd1 = '[ -f /sys/kernel/debug/ieee80211/phy1/ath10k/dfs_simulate_radar ] && echo "True" || echo "False"'
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd1)
 | 
						|
#             output = str(stdout.read())
 | 
						|
#             print("Output", output)
 | 
						|
#             if output.__contains__("False"):
 | 
						|
#                 cmd = "cd /sys/kernel/debug/ieee80211/phy0/ath10k/ && logread | grep DFS"
 | 
						|
#             else:
 | 
						|
#                 cmd = "cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && logread | grep DFS"
 | 
						|
#             client.close()
 | 
						|
#             #cmd = "cd /sys/kernel/debug/ieee80211/phy1/ath10k/ && logread | grep DFS"
 | 
						|
#             #print("cmd: ", cmd)
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#         elif self.type.lower() == "wifi6" or self.type.lower() == "wifi6e":
 | 
						|
#             cmd = f'cd  && cd /sys/kernel/debug/ath11k/ && cd ipq* && cd mac0 && logread | grep DFS'
 | 
						|
#             print("cmd: ", cmd)
 | 
						|
#             if self.mode:
 | 
						|
#                 cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
 | 
						|
#                       f"cmd --value \"{cmd}\" "
 | 
						|
#         try:
 | 
						|
#             client = self.ssh_cli_connect()
 | 
						|
#             stdin, stdout, stderr = client.exec_command(cmd)
 | 
						|
#             output = stdout.read()
 | 
						|
#             status = output.decode('utf-8').splitlines()
 | 
						|
#             logread = status[-6:]
 | 
						|
#             logs = ""
 | 
						|
#             for i in logread:
 | 
						|
#                 logs = logs + i + "\n"
 | 
						|
#             client.close()
 | 
						|
#         except Exception as e:
 | 
						|
#             print(e)
 | 
						|
#             logs = ""
 | 
						|
#         return logs
 | 
						|
#
 | 
						|
#
 | 
						|
# if __name__ == '__main__':
 | 
						|
#     obj = {
 | 
						|
#                 "model": "cig_wf188n",
 | 
						|
#                 "mode": "wifi6",
 | 
						|
#                 "serial": "0000c1018812",
 | 
						|
#                 "jumphost": True,
 | 
						|
#                 "ip": "10.28.3.103",
 | 
						|
#                 "username": "lanforge",
 | 
						|
#                 "password": "pumpkin77",
 | 
						|
#                 "port": 22,
 | 
						|
#                 "jumphost_tty": "/dev/ttyAP1",
 | 
						|
#                 "version": "next-latest"
 | 
						|
#             }
 | 
						|
#     var = APNOS(credentials=obj, sdk="2.x")
 | 
						|
#     print(var.get_memory_profile())
 |