lf_cisco_power.py: LCS-45 Read AP for Power values, updates

This commit is contained in:
Chuck SmileyRekiere
2021-01-21 11:20:38 -07:00
parent 585c31abf1
commit 6916a7ce71
2 changed files with 150 additions and 216 deletions

View File

@@ -31,9 +31,9 @@ import pexpect
default_host = "localhost"
default_ports = {
"serial": None,
"ssh": 22,
"telnet": 23
"serial": None,
"ssh": 22,
"telnet": 23
}
NL = "\n"
CR = "\r\n"
@@ -48,17 +48,18 @@ logfile = "stdout"
# ^\s+1\s+6\s+\S+\s+\S+\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)
def usage():
print("$0 used connect to Cisco AP:")
print("-a|--ap: AP to act upon")
print("-d|--dest: destination host")
print("-o|--port: destination port")
print("-u|--user: AP login name")
print("-p|--pass: AP password")
print("-s|--scheme (serial|telnet|ssh): connect to controller via serial, ssh or telnet")
print("--tty Serial port for accessing AP")
print("-l|--log file: log messages here")
print("-b|--band: a (5Ghz) or b (2.4Ghz) or abgn for dual-band 2.4Ghz AP")
print("-h|--help")
print("$0 used connect to Cisco AP:")
print("-a|--ap: AP to act upon")
print("-d|--dest: destination host")
print("-o|--port: destination port")
print("-u|--user: AP login name")
print("-p|--pass: AP password")
print("-s|--scheme (serial|telnet|ssh): connect to controller via serial, ssh or telnet")
print("--tty Serial port for accessing AP")
print("-l|--log file: log messages here")
print("-b|--band: a (5Ghz) or b (2.4Ghz) or abgn for dual-band 2.4Ghz AP")
print("-z|--action: action")
print("-h|--help")
# see https://stackoverflow.com/a/13306095/11014343
class FileAdapter(object):
@@ -74,204 +75,111 @@ class FileAdapter(object):
def main():
global logfile
global logfile
parser = argparse.ArgumentParser(description="Cisco AP Control Script")
parser.add_argument("-a", "--ap", type=str, help="select AP")
parser.add_argument("-d", "--dest", type=str, help="address of the AP 172.19.27.55")
parser.add_argument("-o", "--port", type=int, help="control port on the AP, 2008")
parser.add_argument("-u", "--user", type=str, help="credential login/username, admin")
parser.add_argument("-p", "--passwd", type=str, help="credential password Wnbulab@123")
parser.add_argument("-s", "--scheme", type=str, choices=["serial", "ssh", "telnet"], help="Connect via serial, ssh or telnet")
parser.add_argument("-t", "--tty", type=str, help="tty serial device for connecting to AP")
parser.add_argument("-l", "--log", type=str, help="logfile for messages, stdout means output to console")
parser = argparse.ArgumentParser(description="Cisco AP Control Script")
parser.add_argument("-a", "--prompt", type=str, help="ap prompt")
parser.add_argument("-d", "--dest", type=str, help="address of the AP 172.19.27.55")
parser.add_argument("-o", "--port", type=int, help="control port on the AP, 2008")
parser.add_argument("-u", "--user", type=str, help="credential login/username, admin")
parser.add_argument("-p", "--passwd", type=str, help="credential password Wnbulab@123")
parser.add_argument("-s", "--scheme", type=str, choices=["serial", "ssh", "telnet"], help="Connect via serial, ssh or telnet")
parser.add_argument("-t", "--tty", type=str, help="tty serial device for connecting to AP")
parser.add_argument("-l", "--log", type=str, help="logfile for messages, stdout means output to console")
parser.add_argument("-z", "--action", type=str, help="action, current action is powercfg")
args = None
try:
args = parser.parse_args()
host = args.dest
scheme = args.scheme
port = (default_ports[scheme], args.port)[args.port != None]
user = args.user
passwd = args.passwd
if (args.log != None):
logfile = args.log
filehandler = None
except Exception as e:
logging.exception(e)
usage()
exit(2)
args = None
try:
args = parser.parse_args()
host = args.dest
scheme = args.scheme
port = (default_ports[scheme], args.port)[args.port != None]
user = args.user
passwd = args.passwd
if (args.log != None):
logfile = args.log
filehandler = None
except Exception as e:
logging.exception(e)
usage()
exit(2)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(FORMAT)
logg = logging.getLogger(__name__)
logg.setLevel(logging.DEBUG)
file_handler = None
if (logfile is not None):
if (logfile != "stdout"):
file_handler = logging.FileHandler(logfile, "w")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logg.addHandler(file_handler)
logging.basicConfig(format=FORMAT, handlers=[file_handler])
else:
# stdout logging
logging.basicConfig(format=FORMAT, handlers=[console_handler])
egg = None # think "eggpect"
ser = None
try:
if (scheme == "serial"):
#eggspect = pexpect.fdpexpect.fdspan(telcon, logfile=sys.stdout.buffer)
import serial
from pexpect_serial import SerialSpawn
ser = serial.Serial(args.tty, 9600, timeout=5)
print("Created serial connection on %s, open: %s"%(args.tty, ser.is_open))
egg = SerialSpawn(ser)
egg.logfile = FileAdapter(logg)
time.sleep(1)
elif (scheme == "ssh"):
if (port is None):
port = 22
cmd = "ssh -p%d %s@%s"%(port, user, host)
logg.info("Spawn: "+cmd+NL)
egg = pexpect.spawn(cmd)
#egg.logfile_read = sys.stdout.buffer
egg.logfile = FileAdapter(logg)
i = egg.expect(["password:", "continue connecting (yes/no)?"], timeout=3)
time.sleep(0.1)
if i == 1:
egg.sendline('yes')
egg.expect('password:')
sleep(0.1)
egg.sendline(args.passwd)
elif (scheme == "telnet"):
if (port is None):
port = 23
cmd = "telnet {} {}".format(host, port)
logg.info("Spawn: "+cmd+NL)
egg = pexpect.spawn(cmd)
egg.logfile = FileAdapter(logg)
# Will login below as needed.
else:
usage()
exit(1)
except Exception as e:
logging.exception(e)
ap_prompt = "{}>".format(args.prompt)
ap_hash = "{}#".format(args.prompt)
egg.sendline()
console_handler = logging.StreamHandler()
formatter = logging.Formatter(FORMAT)
logg = logging.getLogger(__name__)
logg.setLevel(logging.DEBUG)
file_handler = None
if (logfile is not None):
if (logfile != "stdout"):
file_handler = logging.FileHandler(logfile, "w")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logg.addHandler(file_handler)
logging.basicConfig(format=FORMAT, handlers=[file_handler])
else:
# stdout logging
logging.basicConfig(format=FORMAT, handlers=[console_handler])
egg = None # think "eggpect"
ser = None
try:
if (scheme == "serial"):
#eggspect = pexpect.fdpexpect.fdspan(telcon, logfile=sys.stdout.buffer)
import serial
from pexpect_serial import SerialSpawn
ser = serial.Serial(args.tty, 9600, timeout=5)
print("Created serial connection on %s, open: %s"%(args.tty, ser.is_open))
egg = SerialSpawn(ser);
egg.logfile = FileAdapter(logg)
time.sleep(1)
elif (scheme == "ssh"):
if (port is None):
port = 22
cmd = "ssh -p%d %s@%s"%(port, user, host)
logg.info("Spawn: "+cmd+NL)
egg = pexpect.spawn(cmd)
#egg.logfile_read = sys.stdout.buffer
egg.logfile = FileAdapter(logg)
i = egg.expect(["password:", "continue connecting (yes/no)?"], timeout=3)
time.sleep(0.1)
if i == 1:
egg.sendline('yes')
egg.expect('password:')
sleep(0.1)
egg.sendline(args.passwd)
elif (scheme == "telnet"):
if (port is None):
port = 23
cmd = "telnet %s %d"%(host, port)
logg.info("Spawn: "+cmd+NL)
egg = pexpect.spawn(cmd)
egg.logfile = FileAdapter(logg)
# Will login below as needed.
else:
usage()
exit(1)
except Exception as e:
logging.exception(e);
ap_prompt = "%s>"%(args.ap)
ap_hash = "%s#"%(args.ap)
egg.sendline()
i = egg.expect(['Password:', 'Username:', ap_prompt, 'u-boot>', ap_hash], timeout=3)
j = 0
if i == 0:
egg.sendline()
i = egg.expect(['Password:', 'Username:', ap_prompt, 'u-boot>', ap_hash], timeout=10)
i -= 1
if i == 0:
time.sleep(1)
egg.sendline(args.user)
time.sleep(1)
egg.expect('Password:')
egg.sendline(args.passwd)
if (i <= 1):
egg.expect(ap_prompt)
egg.sendline("en")
egg.expect("Password:")
egg.sendline(args.passwd)
egg.expect("#")
# Seems this cannot work on factory images, so comment it out. It is one-time
# thing, user must do it manually.
#logg.info('Login to Devshell and issue the updt command')
#egg.sendline('devshell')
#egg.expect("#", timeout=5)
#egg.sendline("updt_util -i /lib/firmware/%s uboot"%(args.binfile))
# if updt_util is not available, we must already be in the netbooted cookie shell
#egg.expect("#")
#egg.sendline('exit')
#egg.expect("#", timeout=5)
logg.info("Reload the AP and bring it to u-boot")
egg.sendline('reload')
egg.expect(r"confirm")
egg.sendline('yes')
time.sleep(10)
egg.expect(r'Hit ESC key to stop autoboot:', timeout=100)
egg.sendline('\x1b')
egg.expect(r"u-boot>")
egg.sendline('printenv')
time.sleep(5)
egg.expect(r"u-boot>")
if (i <= 2):
logg.info("Set the env variable and download bcm.bin from TFTP server")
time.sleep(1)
egg.sendline("setenv ipaddr %s"%(args.ip))
time.sleep(1)
egg.expect(r"u-boot>")
egg.sendline("setenv netmask %s"%(args.mask))
time.sleep(1)
egg.expect(r"u-boot>")
egg.sendline("setenv serverip %s"%(args.tftp_server))
time.sleep(2)
egg.expect(r"u-boot>")
egg.sendline("setenv gatewayip %s"%(args.gw))
time.sleep(2)
egg.expect(r"u-boot>")
egg.sendline('setenv tftpdir /')
time.sleep(2)
egg.expect(r"u-boot>")
egg.sendline('saveenv')
time.sleep(10)
egg.expect("u-boot>", timeout=5)
egg.sendline('printenv')
time.sleep(10)
egg.expect("u-boot>", timeout=15)
egg.sendline("netboot %s"%(args.netboot_binfile))
while True:
time.sleep(10)
k = egg.expect(["Username:", "Unable to download image"], timeout=240)
if (k == 0):
break
if (k == 1):
egg.sendline("netboot %s"%(args.netboot_binfile))
continue
break
logg.info("Setting up the cookie from devshell and rebooting the AP")
# Log in again
egg.sendline()
time.sleep(0.1)
egg.expect('Username:', timeout=3)
time.sleep(0.1)
egg.sendline(args.user)
time.sleep(0.1)
egg.expect('Password:')
egg.sendline(args.passwd)
egg.expect(ap_prompt)
egg.sendline("en")
egg.expect("Password:")
egg.sendline(args.passwd)
egg.expect("#")
egg.sendline('show controllers dot11Radio 1 powercfg | g T1')
egg.expect("#", timeout=5)
# ctlr.execute(cn_cmd)
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
egg.sendline()
time.sleep(0.1)
egg.expect('Username:', timeout=3)
time.sleep(0.1)
egg.sendline(args.user)
time.sleep(0.1)
egg.expect('Password:')
egg.sendline(args.passwd)
egg.expect(ap_prompt)
egg.sendline("en")
egg.expect("Password:")
egg.sendline(args.passwd)
egg.expect("#")
egg.sendline('show controllers dot11Radio 1 powercfg | g T1')
egg.expect("#", timeout=5)
# ctlr.execute(cn_cmd)
----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
if __name__ == '__main__':
main()

View File

@@ -305,7 +305,7 @@ def main():
parser.add_argument('-ccp','--prompt', type=str,help="controller prompt",required=True)
parser.add_argument('--beacon_dbm_diff', type=str,help="--beacon_dbm_diff <value> is the delta that is allowed between the controller tx and the beacon measured",default="7")
parser.add_argument('--show_lf_portmod', action='store_true',help="--show_lf_portmod, show the output of lf_portmod after traffic to verify RSSI values measured by lanforge")
parser.add_argument('-ap','--ap', action='append', nargs=1, type=str, help="--ap ap_scheme==<telnet,ssh or serial> ap_ip==<ap ip> ap_port==<ap port number> ap_user==<ap user> ap_pw==<ap password>")
parser.add_argument('-ap','--ap', action='append', nargs=1, type=str, help="--ap ap_scheme==<telnet,ssh or serial> ap_prompt==<ap_prompt> ap_ip==<ap ip> ap_port==<ap port number> ap_user==<ap user> ap_pw==<ap password>")
#current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "{:.3f}".format(time.time() - (math.floor(time.time())))[1:]
@@ -371,7 +371,7 @@ def main():
ap_info = args.ap_info
for _ap_info in ap_info:
print("ap_info {}".format(_ap_info))
ap_keys = ['ap_scheme','ap_ip','ap_port','ap_user','ap_pw']
ap_keys = ['ap_scheme','ap_prompt','ap_ip','ap_port','ap_user','ap_pw']
ap_dict = dict(map(lambda x: x.split('=='), str(_ap_info).replace('[','').replace(']','').replace("'","").split()))
for key in ap_keys:
if key not in ap_dict:
@@ -1572,13 +1572,25 @@ def main():
# fewer spatial streams
#
#
P1 = None
T1 = None
P2 = None
T2 = None
P3 = None
T3 = None
P4 = None
T4 = None
N_ANT = None
DAA_Pwr = None
DAA_N_TX = None
DAA_Total_pwr = None
if(bool(ap_dict)):
logg.info("Read AP ap_scheme: {} ap_ip: {} ap_port: {} ap_user: {} ap_pw: {}".format(ap_dict['ap_scheme'],ap_dict['ap_ip'],ap_dict["ap_port"],
ap_dict['ap_user'],ap_dict['ap_pw']))
try:
logg.info("cisco_ap_ctl.py: no_logging_console")
ap_info= subprocess.run(["./cisco_ap_ctl.py", "--scheme", ap_dict['ap_scheme'], "-d", ap_dict['ap_ip'], "--port", ap_dict["ap_port"],
"-u", ap_dict['ap_user'], "-p", ap_dict['ap_pw'],"--action", "powercfg"],capture_output=True, check=True)
ap_info= subprocess.run(["./cisco_ap_ctl.py", "--scheme", ap_dict['ap_scheme'], "--prompt", ap_dict['ap_prompt'],"--dest", ap_dict['ap_ip'], "--port", ap_dict["ap_port"],
"-user", ap_dict['ap_user'], "-passwd", ap_dict['ap_pw'],"--action", "powercfg"],capture_output=True, check=True)
pss = ap_info.stdout.decode('utf-8', 'ignore');
logg.info(pss)
except subprocess.CalledProcessError as process_error:
@@ -1589,14 +1601,28 @@ def main():
logg.info("####################################################################################################")
logg.info("# Unable to commicate to AP or unable to communicate to controller error code: {} output {}".format(process_error.returncode, process_error.output))
logg.info("####################################################################################################")
exit_test(workbook)
#exit_test(workbook)
exit(1)
for line in pss.splitlines():
logg.info("ap {}".format(line))
m = re.search('^\s+1\s+6\s+\S+\s+\S+\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)')
if (m != None):
pass
P1 = m.group(1)
T1 = m.group(2)
P2 = m.group(3)
T2 = m.group(4)
P3 = m.group(5)
T3 = m.group(6)
P4 = m.group(7)
T4 = m.group(8)
N_ANT = m.group(9)
DAA_Pwr = m.group(10)
DAA_N_TX = m.group(11)
DAA_Total_pwr = m.group(12)
print("P1: {} T1: {} P2: {} T2: {} P3: {} T3: {} P4: {} T4: {} N_ANT: {} DAA_Pwr: {} DAA_N_TX: {} DAA_Total_pwr: {}"
.format(P1,T1,P2,T2,P3,T3,P4,T4,N_ANT,DAA_Pwr,DAA_N_TX,DAA_Total_pwr))
else:
logg.info("AP Check regular expression!!!")
exit(1)