lf_tx_power.py : for AP interface powerreg, and powercfg

ap_ctl.py : for show controllers dot11Radio {slot} , slot configurable

Signed-off-by: Chuck SmileyRekiere <chuck.smileyrekiere@candelatech.com>
This commit is contained in:
Chuck SmileyRekiere
2022-04-26 18:54:36 -04:00
committed by shivam
parent 79bc8ea0ee
commit 2672e08bcf
2 changed files with 120 additions and 22 deletions

View File

@@ -226,7 +226,11 @@ def main():
loop_count = 0
while (loop_count <= 8 and logged_in == False):
loop_count += 1
i = egg.expect_exact([AP_ESCAPE,AP_PROMPT,AP_HASH,AP_USERNAME,AP_PASSWORD,AP_MORE,LF_PROMPT,MUX_PROMPT,pexpect.TIMEOUT],timeout=5)
try:
i = egg.expect_exact([AP_ESCAPE,AP_PROMPT,AP_HASH,AP_USERNAME,AP_PASSWORD,AP_MORE,LF_PROMPT,MUX_PROMPT,pexpect.TIMEOUT],timeout=5)
except BaseException:
logg.info("pexcept exception the connection may not be present, exiting")
exit(1)
# AP_ESCAPE
if i == 0:
logg.info("Expect: {} i: {} before: {} after: {}".format(AP_ESCAPE,i,egg.before,egg.after))
@@ -296,12 +300,14 @@ def main():
# show controllers dot11Radio 2 powerreg
# include or i will include all lines that match the criteria
# section or s will include entire sections that match
# show controllers dot11Radio 2 powercfg | i T1
if (args.action == "powercfg"):
logg.info("execute: show controllers dot11Radio 1 powercfg | g T1")
egg.sendline('show controllers dot11Radio 1 powercfg | g T1')
egg.expect([pexpect.TIMEOUT], timeout=3) # do not delete this for it allows for subprocess to see output
command = 'show controllers dot11Radio {slot} powercfg | i T1'.format(slot=args.radio_slot)
logg.info("execute: {command}".format(command=command))
egg.sendline(command)
egg.expect([pexpect.TIMEOUT], timeout=7) # do not delete this for it allows for subprocess to see output
print(egg.before.decode('utf-8', 'ignore')) # do not delete this for it allows for subprocess to see output
i = egg.expect_exact([AP_MORE,pexpect.TIMEOUT],timeout=5)
if i == 0:

View File

@@ -347,6 +347,7 @@ def close_workbook(workbook):
sleep(0.5)
def main():
global lfmgr
global lfstation
@@ -431,6 +432,7 @@ def main():
parser.add_argument("--nss_4x4_override", help="[test configuration] --nss_4x4_override controller nss is 4 client nss is 2, set expected power to 1/4", action='store_true')
parser.add_argument("--nss_4x4_ap_adjust", help="[test configuration] --nss_4x4_ap_adjust read ap to know number of spatial stream to take into account", action='store_true')
parser.add_argument("--set_nss", help="[test configuration] --set_nss configure controller to spatial streams to test", action='store_true')
parser.add_argument("-T", "--txpower", type=str, help="[test configuration] List of txpowers to test. NA means no change")
parser.add_argument('-D', '--duration', type=str, help='[test configuration] --traffic <how long to run in seconds> example -D 30 (seconds) default: 30 ', default='20')
@@ -831,10 +833,19 @@ def main():
worksheet.write(row, col, 'Tx\nPower\nSetting', dtan_bold)
col += 1
worksheet.set_column(col, col, 20) # Set width
worksheet.write(row, col, 'Controller Reported\n Tx Power dBm\nFrom AP Summary', dtan_bold)
worksheet.write(row, col, 'Controller Reported\nTotal\nTx Power dBm\nFrom AP Summary', dtan_bold)
if (bool(ap_dict)):
col += 1
worksheet.set_column(col, col, 20) # Set width
worksheet.write(row, col, 'AP Reported\nTotal \nTx Power dBm', dtan_bold)
col += 1
worksheet.set_column(col, col, 20) # Set width
worksheet.write(row, col, 'Allowed dBm\nPer Spatial Steam\n cc_dbm', dtan_bold)
if (bool(ap_dict)):
col += 1
worksheet.set_column(col, col, 20) # Set width
worksheet.write(row, col, 'AP reported\nAllowed dBm\nPer Spatial Steam\n ap_dbm', dtan_bold)
col += 1
worksheet.write(row, col, 'Cabling\nPathloss', dtan_bold)
col += 1
@@ -884,16 +895,16 @@ def main():
worksheet.write(row, col, 'Calculated Antenna 4 =\n Antenna Sig dBm\n + pathloss\n + rssi_adj\n + ant gain', dpink_bold)
col += 1
worksheet.set_column(col, col, 20) # Set width
worksheet.write(row, col, 'Offset 1 = \nCalculated Antenna\n - cc_dbm(per SS)', dyel_bold)
worksheet.write(row, col, 'Offset 1 = \nCalculated Antenna 1\n - cc_dbm(per SS)', dyel_bold)
col += 1
worksheet.set_column(col, col, 20) # Set width
worksheet.write(row, col, 'Offset 2 = \nCalculated Antenna\n - cc_dbm(per SS)', dyel_bold)
worksheet.write(row, col, 'Offset 2 = \nCalculated Antenna 2\n - cc_dbm(per SS)', dyel_bold)
col += 1
worksheet.set_column(col, col, 20) # Set width
worksheet.write(row, col, 'Offset 3 = \nCalculated Antenna\n - cc_dbm(per SS)', dyel_bold)
worksheet.write(row, col, 'Offset 3 = \nCalculated Antenna 3\n - cc_dbm(per SS)', dyel_bold)
col += 1
worksheet.set_column(col, col, 20) # Set width
worksheet.write(row, col, 'Offset 4 = \nCalculated Antenna\n - cc_dbm(per SS)', dyel_bold)
worksheet.write(row, col, 'Offset 4 = \nCalculated Antenna 4\n - cc_dbm(per SS)', dyel_bold)
col += 1
worksheet.set_column(col, col, 15) # Set width
worksheet.write(row, col, 'Controller\nReported\ndBm', dblue_bold)
@@ -1477,7 +1488,7 @@ def main():
pss = cs.show_ap_dot11_6gz_summary()
logg.info(pss)
pss = cs.show_ap_bssid_6ghz()
logg.info(pss)
logg.info(pss)
elif args.band == '5g':
pss = cs.show_ap_dot11_5gz_summary()
logg.info(pss)
@@ -1919,19 +1930,20 @@ def main():
logg.info("####################################################################################################")
logg.info("# READ AP POWERREG")
logg.info("####################################################################################################")
summary_output = ''
try:
logg.info("ap_ctl.py: read AP power information")
# TODO use ap module
summary_output = ''
command = ["./ap_ctl.py", "--scheme", ap_dict['ap_scheme'], "--prompt", ap_dict['ap_prompt'], "--dest", ap_dict['ap_ip'], "--port", ap_dict["ap_port"],
"--user", ap_dict['ap_user'], "--passwd", ap_dict['ap_pw'], "--action", "powerreg"]
summary = subprocess.Popen(command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in iter(summary.stdout.readline, ''):
logger.debug(line)
summary_output += line
# sys.stdout.flush() # please see comments regarding the necessity of this line
logger.info(line)
if len(line) > 4:
summary_output += line
summary.wait()
logger.info(summary_output) # .decode('utf-8', 'ignore'))
except subprocess.CalledProcessError as process_error:
logg.info("####################################################################################################")
logg.info("# CHECK IF AP HAS TELNET CONNECTION ALREADY ACTIVE")
@@ -1942,6 +1954,77 @@ def main():
process_error.returncode, process_error.output))
logg.info("####################################################################################################")
summary = "empty_process_error"
summary_output = summary
ap_ant_gain = ''
ap_legal_ant_gain = ''
ap_total_power = ''
ap_total_power_found = False
ap_per_path_power = ''
ap_per_path_power_found = False
summary_output_split = summary_output.splitlines()
print(summary_output_split)
for line in summary_output.splitlines():
if 'Configured Antenna Gain(dBi):' in line:
pat = "Configured Antenna Gain\\(dBi\\)\\:\\s+(\\d+)"
match = re.search(pat, line)
if match is not None :
ap_ant_gain = match.group(1)
logger.info("AP antenna gain: {gain}".format(gain=ap_ant_gain))
if 'Legal Antenna Gain in use(dBi):' in line:
pat = "Legal Antenna Gain in use\\(dBi\\):\\s+(\\S+)"
match = re.search(pat, line)
if match is not None :
ap_legal_ant_gain = match.group(1)
logger.info("AP Legal antenna gain: {gain}".format(gain=ap_legal_ant_gain))
# /Allowed total powers\S+\Sn(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)
if "INFO" not in line and "Allowed total powers(dBm):" in line:
ap_total_power_found = True
continue
if ap_total_power_found is True:
ap_total_power_found = False
if args.band == '6g' or args.band == 'dual_band_6g':
if bw == '20':
pat = "(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
elif bw == '40':
pat = "(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
else:
pat = "(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
else:
pat = "(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
match = re.search(pat, line)
if match is not None:
ap_total_power = match.group(int(tx))
# /Allowed per-path powers\S+\Sn(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)/gm
if "INFO" not in line and "Allowed per-path powers(dBm):" in line:
ap_per_path_power_found = True
continue
if ap_per_path_power_found is True:
ap_per_path_power_found = False
if args.band == '6g' or args.band == 'dual_band_6g':
if bw == '20':
# pat = "Allowed per-path powers\\S+\\Sn(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
pat = "(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
elif bw == '40':
# pat = "Allowed per-path powers\\S+\\Sn(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
pat = "(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
else:
# pat = "Allowed per-path powers\\S+\\Sn(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
pat = "(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
else:
pat = "(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"
match = re.search(pat, line)
if match is not None:
ap_per_path_power = match.group(int(tx))
logg.info("ap_ant_gain: {ap_ant_gain}".format(ap_ant_gain=ap_ant_gain))
logg.info("ap_legal_ant_gain: {ap_legal_ant_gain}".format(ap_legal_ant_gain=ap_legal_ant_gain))
logg.info("ap_total_power: {ap_total_power}".format(ap_total_power=ap_total_power))
logg.info("ap_per_path_power: {ap_per_path_power}".format(ap_per_path_power=ap_per_path_power))
# Gather probe results and record data, verify NSS, BW, Channel
# note the probe will get the information from this command
@@ -2282,7 +2365,7 @@ def main():
DAA_Pwr = None
DAA_N_TX = None
DAA_Total_pwr = None
if(bool(ap_dict)):
if(bool(ap_dict) and args.nss_4x4_ap_adjust):
logg.info("ap_dict {}".format(ap_dict))
logg.info("Read AP ap_scheme: {} ap_ip: {} ap_port: {} ap_user: {} ap_pw: {}".format(ap_dict['ap_scheme'], ap_dict['ap_ip'], ap_dict["ap_port"],
ap_dict['ap_user'], ap_dict['ap_pw']))
@@ -2299,15 +2382,17 @@ def main():
summary = subprocess.Popen(command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in iter(summary.stdout.readline, ''):
logger.debug(line)
summary_output += line
if line:
summary_output += line
# sys.stdout.flush() # please see comments regarding the necessity of this line
summary.wait()
logger.info(summary_output) # .decode('utf-8', 'ignore'))
#
# ap_info = subprocess.run(["./ap_ctl.py", "--scheme", ap_dict['ap_scheme'], "--prompt", ap_dict['ap_prompt'], "--dest", ap_dict['ap_ip'], "--port", ap_dict["ap_port"],
# "--user", ap_dict['ap_user'], "--passwd", ap_dict['ap_pw'], "--action", "powerreg"], stdout=subprocess.PIPE)
# "--user", ap_dict['ap_user'], "--passwd", ap_dict['ap_pw'], "--action", "powercfg"], stdout=subprocess.PIPE)
# try:
# pss = ap_info.stdout.decode('utf-8', 'ignore')
# logg.info(pss)
# except BaseException:
# logg.info("ap_info was of type NoneType will set pss empty")
# pss = "empty"
@@ -2325,8 +2410,8 @@ def main():
logg.info("####################################################################################################")
summary = "empty_process_error"
logg.info(summary)
for line in summary.splitlines():
logg.info(summary_output)
for line in summary_output.splitlines():
logg.info("ap {}".format(line))
pat = '^\\s+1\\s+6\\s+\\S+\\s+\\S+\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)'
m = re.search(pat, line)
@@ -2568,8 +2653,15 @@ def main():
worksheet.write(row, col, cc_power, center_tan)
col += 1
worksheet.write(row, col, cc_dbm, center_tan)
if(bool(ap_dict)):
col += 1
worksheet.write(row, col, ap_total_power, center_tan)
col += 1
worksheet.write(row, col, allowed_per_path, center_tan)
if(bool(ap_dict)):
col += 1
worksheet.write(row, col, ap_per_path_power, center_tan)
col += 1
worksheet.write(row, col, pathloss, center_tan)
col += 1