mirror of
				https://github.com/Telecominfraproject/wlan-testing.git
				synced 2025-10-31 19:08:01 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			386 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			386 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/python3
 | |
| '''
 | |
| 
 | |
| make sure pexpect is installed:
 | |
| $ sudo yum install python3-pexpect
 | |
| 
 | |
| You might need to install pexpect-serial using pip:
 | |
| $ pip3 install serial
 | |
| $ pip3 install pexpect-serial
 | |
| 
 | |
| ./openwrt_ctl.py -l stdout -u root -p TIP -s serial --tty ttyUSB0
 | |
| 
 | |
| # Set up reverse ssh tunnel
 | |
| ./openwrt_ctl.py --tty /dev/ttyAP1 --action ssh-tunnel \
 | |
|         --value "ssh -y -y -f -N -T -M -R 9999:localhost:22 lanforge@10.28.3.100" \
 | |
|         --value2 password-for-10.28.3.100 --log stdout --scheme serial --prompt root@Open
 | |
| '''
 | |
| 
 | |
| import sys
 | |
| 
 | |
| 
 | |
| 
 | |
| if sys.version_info[0] != 3:
 | |
|     print("This script requires Python 3")
 | |
|     exit()
 | |
| 
 | |
| try:
 | |
|     import importlib
 | |
|     re = importlib.import_module("re")
 | |
|     logging = importlib.import_module("logging")
 | |
|     time = importlib.import_module("time")
 | |
|     sleep = time.sleep
 | |
|     pexpect = importlib.import_module("pexpect")
 | |
|     serial = importlib.import_module("serial")
 | |
|     pexpect_serial = importlib.import_module("pexpect_serial")
 | |
|     SerialSpawn = pexpect_serial.SerialSpawn
 | |
|     pprint = importlib.import_module("pprint")
 | |
|     telnetlib = importlib.import_module("telnetlib")
 | |
|     argparse = importlib.import_module("argparse")
 | |
| except ImportError as e:
 | |
|     print(e)
 | |
|     sys.exit("Python Import Error: " + str(e))
 | |
| 
 | |
| default_host = "localhost"
 | |
| default_ports = {
 | |
|     "serial": None,
 | |
|     "ssh": 22,
 | |
|     "telnet": 23
 | |
| }
 | |
| NL = "\n"
 | |
| CR = "\r\n"
 | |
| Q = '"'
 | |
| A = "'"
 | |
| FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s'
 | |
| prompt = "root@OpenWrt:"
 | |
| 
 | |
| 
 | |
| def usage():
 | |
|     print("$0 used connect to OpenWrt AP or similar Linux machine:")
 | |
|     print("-d|--dest  IP address of the OpenWrt AP, for ssh/telnet scheme")
 | |
|     print("-o|--port  IP port of the OpenWrt AP, for ssh/telnet scheme")
 | |
|     print("-t|--tty   Serial port, if using serial scheme")
 | |
|     print("-u|--user  login name")
 | |
|     print("-p|--pass  password")
 | |
|     print("--prompt   Prompt to look for when commands are done (default: root@OpenWrt)")
 | |
|     print("-s|--scheme (serial|telnet|ssh): connect via serial, ssh or telnet")
 | |
|     print("-l|--log file log messages here")
 | |
|     print("--action (logread | journalctl | lurk | sysupgrade | download | upload | reboot | cmd | ssh-tunnel")
 | |
|     print("--value (option to help complete the action")
 | |
|     print("--value2 (option to help complete the action, dest filename for download, passwd for ssh-tunnel")
 | |
|     print("-h|--help")
 | |
| 
 | |
| 
 | |
| # see https://stackoverflow.com/a/13306095/11014343
 | |
| class FileAdapter(object):
 | |
|     def __init__(self, logger):
 | |
|         self.logger = logger
 | |
| 
 | |
|     def write(self, data):
 | |
|         # NOTE: data can be a partial line, multiple lines
 | |
|         data = data.strip()  # ignore leading/trailing whitespace
 | |
|         if data:  # non-blank
 | |
|             self.logger.info(data)
 | |
| 
 | |
|     def flush(self):
 | |
|         pass  # leave it to logging to flush properly
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     global prompt
 | |
| 
 | |
|     parser = argparse.ArgumentParser(description="OpenWrt AP Control Script")
 | |
|     parser.add_argument("-d", "--dest", type=str, help="address of the cisco controller_tests")
 | |
|     parser.add_argument("-o", "--port", type=int, help="control port on the controller_tests")
 | |
|     parser.add_argument("-u", "--user", type=str, help="credential login/username")
 | |
|     parser.add_argument("-p", "--passwd", type=str, help="credential password")
 | |
|     parser.add_argument("-P", "--prompt", type=str, help="Prompt to look for")
 | |
|     parser.add_argument("-s", "--scheme", type=str, choices=["serial", "ssh", "telnet"],
 | |
|                         help="Connect via serial, ssh or telnet")
 | |
|     parser.add_argument("-t", "--tty", type=str, help="tty serial device")
 | |
|     parser.add_argument("-l", "--log", type=str, help="logfile for messages, stdout means output to console")
 | |
|     parser.add_argument("--action", type=str, help="perform action",
 | |
|                         choices=["logread", "journalctl", "lurk", "sysupgrade", "sysupgrade-n", "download", "upload",
 | |
|                                  "reboot", "cmd", "ssh-tunnel"])
 | |
|     parser.add_argument("--value", type=str, help="set value")
 | |
|     parser.add_argument("--value2", type=str, help="set value2")
 | |
|     tty = None
 | |
| 
 | |
|     args = None
 | |
|     try:
 | |
|         args = parser.parse_args()
 | |
|         host = args.dest
 | |
|         scheme = args.scheme
 | |
|         port = args.port
 | |
|         # port = (default_ports[scheme], args.port)[args.port != None]
 | |
|         user = args.user
 | |
|         passwd = args.passwd
 | |
|         logfile = args.log
 | |
|         tty = args.tty;
 | |
|         if (args.prompt != None):
 | |
|             prompt = args.prompt
 | |
|         filehandler = None
 | |
|     except Exception as e:
 | |
|         logging.exception(e);
 | |
|         usage()
 | |
|         exit(2);
 | |
| 
 | |
|     console_handler = logging.StreamHandler()
 | |
|     formatter = logging.Formatter(FORMAT)
 | |
|     logg = logging.getLogger(__name__)
 | |
|     logg.setLevel(logging.DEBUG)
 | |
|     file_handler = None
 | |
|     if (logfile is not None):
 | |
|         if (logfile != "stdout"):
 | |
|             file_handler = logging.FileHandler(logfile, "w")
 | |
|             file_handler.setLevel(logging.DEBUG)
 | |
|             file_handler.setFormatter(formatter)
 | |
|             logg.addHandler(file_handler)
 | |
|             logging.basicConfig(format=FORMAT, handlers=[file_handler])
 | |
|         else:
 | |
|             # stdout logging
 | |
|             logging.basicConfig(format=FORMAT, handlers=[console_handler])
 | |
| 
 | |
|     CCPROMPT = prompt
 | |
| 
 | |
|     ser = None
 | |
|     egg = None  # think "eggpect"
 | |
|     try:
 | |
|         if (scheme == "serial"):
 | |
|             # eggspect = pexpect.fdpexpect.fdspan(telcon, logfile=sys.stdout.buffer)
 | |
| 
 | |
|             ser = serial.Serial(tty, 115200, timeout=5)
 | |
| 
 | |
|             egg = SerialSpawn(ser);
 | |
|             egg.logfile = FileAdapter(logg)
 | |
|             egg.sendline(NL)
 | |
|             has_reset = False
 | |
|             try:
 | |
|                 logg.info("prompt: %s user: %s  passwd: %s" % (prompt, user, passwd))
 | |
|                 while True:
 | |
|                     i = egg.expect([prompt, "Please press Enter to activate", "login:", "Password:", "IPQ6018#"],
 | |
|                                    timeout=3)
 | |
|                     logg.info("expect-0: %i" % (i))
 | |
|                     if (i == 0):
 | |
|                         logg.info("Found prompt, login complete.")
 | |
|                         break
 | |
|                     if (i == 1):
 | |
|                         logg.info("Sending newline")
 | |
|                         egg.setdline(NL)
 | |
|                     if (i == 2):
 | |
|                         logg.info("Sending username: %s" % (user))
 | |
|                         egg.sendline(user)
 | |
|                     if (i == 3):
 | |
|                         logg.info("Sending password: %s" % (passwd))
 | |
|                         egg.sendline(passwd)
 | |
|                     if (i == 4):  # in bootloader
 | |
|                         if has_reset:
 | |
|                             logg.info("ERROR:  Have reset once already, back in bootloader?")
 | |
|                             sys.exit(1)
 | |
|                         has_reset = True
 | |
|                         logg.info("In boot loader, will reset and sleep 30 seconds")
 | |
|                         egg.sendline("reset")
 | |
|                         time.sleep(30)
 | |
|                         egg.sendline(NL)
 | |
| 
 | |
|             except Exception as e:
 | |
|                 # maybe something like 'logread -f' is running?
 | |
|                 # send ctrl-c
 | |
|                 egg.send(chr(3))
 | |
| 
 | |
|         elif (scheme == "ssh"):
 | |
|             # Not implemented/tested currently. --Ben
 | |
|             if (port is None):
 | |
|                 port = 22
 | |
|             cmd = "ssh -p%d %s@%s" % (port, user, host)
 | |
|             logg.info("Spawn: " + cmd + NL)
 | |
|             egg = pexpect.spawn(cmd)
 | |
|             # egg.logfile_read = sys.stdout.buffer
 | |
|             egg.logfile = FileAdapter(logg)
 | |
|             i = egg.expect(["password:", "continue connecting (yes/no)?"], timeout=3)
 | |
|             time.sleep(0.1)
 | |
|             if i == 1:
 | |
|                 egg.sendline('yes')
 | |
|                 egg.expect('password:')
 | |
|             sleep(0.1)
 | |
|             egg.sendline(passwd)
 | |
| 
 | |
|         elif (scheme == "telnet"):
 | |
|             # Not implemented/tested currently. --Ben
 | |
|             if (port is None):
 | |
|                 port = 23
 | |
|             cmd = "telnet %s %d" % (host, port)
 | |
|             logg.info("Spawn: " + cmd + NL)
 | |
|             egg = pexpect.spawn(cmd)
 | |
|             egg.logfile = FileAdapter(logg)
 | |
|             time.sleep(0.1)
 | |
|             egg.sendline(' ')
 | |
|             egg.expect('User\:')
 | |
|             egg.sendline(user)
 | |
|             egg.expect('Password\:')
 | |
|             egg.sendline(passwd)
 | |
|             egg.sendline('config paging disable')
 | |
|         else:
 | |
|             usage()
 | |
|             exit(1)
 | |
|     except Exception as e:
 | |
|         logging.exception(e);
 | |
| 
 | |
|     command = None
 | |
| 
 | |
|     CLOSEDBYREMOTE = "closed by remote host."
 | |
|     CLOSEDCX = "Connection to .* closed."
 | |
| 
 | |
|     try:
 | |
|         egg.expect(CCPROMPT)
 | |
|     except Exception as e:
 | |
|         egg.sendline(NL)
 | |
| 
 | |
|     TO = 10
 | |
|     wait_forever = False
 | |
| 
 | |
|     # Clean pending output
 | |
|     egg.sendline("echo __hello__")
 | |
|     egg.expect("__hello__")
 | |
|     egg.expect(CCPROMPT)
 | |
| 
 | |
|     logg.info("Action[%s] Value[%s] Value2[%s]" % (args.action, args.value, args.value2))
 | |
| 
 | |
|     if (args.action == "reboot"):
 | |
|         command = "reboot"
 | |
|         TO = 60
 | |
| 
 | |
|     if (args.action == "cmd"):
 | |
|         if (args.value is None):
 | |
|             raise Exception("cmd requires value to be set.")
 | |
|         command = "%s" % (args.value)
 | |
| 
 | |
|     if (args.action == "logread"):
 | |
|         command = "logread -f"
 | |
|         TO = 1
 | |
|         wait_forever = True
 | |
| 
 | |
|     if (args.action == "journalctl"):
 | |
|         command = "journalctl -f"
 | |
|         TO = 1
 | |
|         wait_forever = True
 | |
| 
 | |
|     if (args.action == "lurk"):
 | |
|         command = "date"
 | |
|         TO = 1
 | |
|         wait_forever = True
 | |
| 
 | |
|     if (args.action == "ssh-tunnel"):
 | |
|         command = "%s" % (args.value)
 | |
|         passwd = "%s" % (args.value2)
 | |
|         logg.info("Command[%s]" % command)
 | |
|         egg.sendline(command);
 | |
| 
 | |
|         i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5)
 | |
|         if i == 1:
 | |
|             egg.sendline("y")
 | |
|             egg.expect("password:", timeout=5)
 | |
|         egg.sendline(passwd)
 | |
|         egg.expect(CCPROMPT, timeout=20)
 | |
|         return
 | |
| 
 | |
|     if ((args.action == "sysupgrade") or (args.action == "sysupgrade-n")):
 | |
|         command = "scp %s /tmp/new_img.bin" % (args.value)
 | |
|         logg.info("Command[%s]" % command)
 | |
|         egg.sendline(command);
 | |
| 
 | |
|         i = egg.expect(["password:", "Do you want to continue connecting"], timeout=5)
 | |
|         if i == 1:
 | |
|             egg.sendline("y")
 | |
|             egg.expect("password:", timeout=5)
 | |
|         egg.sendline("lanforge")
 | |
|         egg.expect(CCPROMPT, timeout=20)
 | |
|         if (args.action == "sysupgrade-n"):
 | |
|             egg.sendline("sysupgrade -n /tmp/new_img.bin")
 | |
|         else:
 | |
|             egg.sendline("sysupgrade /tmp/new_img.bin")
 | |
|         egg.expect("link becomes ready", timeout=100)
 | |
|         return
 | |
| 
 | |
|     if (args.action == "download"):
 | |
|         command = "scp %s /tmp/%s" % (args.value, args.value2)
 | |
|         logg.info("Command[%s]" % command)
 | |
|         egg.sendline(command);
 | |
| 
 | |
|         i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5)
 | |
|         if i == 2:
 | |
|             print("Network unreachable, wait 15 seconds and try again.")
 | |
|             time.sleep(15)
 | |
|             command = "scp %s /tmp/%s" % (args.value, args.value2)
 | |
|             logg.info("Command[%s]" % command)
 | |
|             egg.sendline(command);
 | |
| 
 | |
|             i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5)
 | |
|         if i == 2:
 | |
|             print("ERROR:  Could not connect to LANforge to get download file")
 | |
|             exit(2)
 | |
|         if i == 1:
 | |
|             egg.sendline("y")
 | |
|             egg.expect("password:", timeout=5)
 | |
|         egg.sendline("lanforge")
 | |
|         egg.expect(CCPROMPT, timeout=20)
 | |
|         return
 | |
| 
 | |
|     if (args.action == "upload"):
 | |
|         command = "scp %s %s" % (args.value, args.value2)
 | |
|         logg.info("Command[%s]" % command)
 | |
|         egg.sendline(command);
 | |
| 
 | |
|         i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5)
 | |
|         if i == 2:
 | |
|             print("Network unreachable, wait 15 seconds and try again.")
 | |
|             time.sleep(15)
 | |
|             command = "scp /tmp/%s %s" % (args.value, args.value2)
 | |
|             logg.info("Command[%s]" % command)
 | |
|             egg.sendline(command);
 | |
| 
 | |
|             i = egg.expect(["password:", "Do you want to continue connecting", "Network unreachable"], timeout=5)
 | |
|         if i == 2:
 | |
|             print("ERROR:  Could not connect to LANforge to put upload file")
 | |
|             exit(2)
 | |
|         if i == 1:
 | |
|             egg.sendline("y")
 | |
|             egg.expect("password:", timeout=5)
 | |
|         egg.sendline("lanforge")
 | |
|         egg.expect(CCPROMPT, timeout=20)
 | |
|         return
 | |
| 
 | |
|     if (command is None):
 | |
|         logg.info("No command specified, going to log out.")
 | |
|     else:
 | |
|         logg.info("Command[%s]" % command)
 | |
|         egg.sendline(command);
 | |
|         while True:
 | |
|             try:
 | |
|                 i = egg.expect([CCPROMPT, "kmodloader: done loading kernel", "\n"], timeout=TO)
 | |
|                 print(egg.before.decode('utf-8', 'ignore'))
 | |
|                 if i == 1:
 | |
|                     egg.sendline(' ')
 | |
|                     egg.expect(CCPROMPT, timeout=20)
 | |
|                     print(egg.before.decode('utf-8', 'ignore'))
 | |
|                 if i == 2:  # new line of text, just print and continue
 | |
|                     continue
 | |
| 
 | |
|                 if not wait_forever:
 | |
|                     break
 | |
| 
 | |
|             except Exception as e:
 | |
|                 # Some commands take a long time (logread -f)
 | |
|                 if not wait_forever:
 | |
|                     logging.exception(e)
 | |
|                     break
 | |
| 
 | |
| 
 | |
| # ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
 | |
| if __name__ == '__main__':
 | |
|     main()
 | |
| 
 | |
| ####
 | |
| ####
 | |
| ####
 | 
