lf_tx_power.py : added info for lf_associate_ap.pl working directory

Signed-off-by: Chuck SmileyRekiere <chuck.smileyrekiere@candelatech.com>
This commit is contained in:
Chuck SmileyRekiere
2022-03-05 14:55:53 -07:00
committed by shivam
parent 4da87bead8
commit 2f00b694ad
2 changed files with 54 additions and 51 deletions

View File

@@ -377,7 +377,7 @@ def main():
parser.add_argument('--beacon_dbm_diff', type=str, help="[tx power configuration] --beacon_dbm_diff <value> is the delta that is allowed between the controller tx and the beacon measured", default="7")
# traffic generation configuration (LANforge)
parser.add_argument("--lfmgr", type=str, help="[traffic generation configuration (LANforge)] LANforge Manager IP address --lfmgr 192.168.100.139", required=True)
parser.add_argument("--lfmgr", type=str, help="[traffic generation configuration (LANforge)] LANforge Manager IP address --lfmgr 192.168.100.178", required=True)
parser.add_argument("--upstream_port", type=str, help="[traffic generation configuration (LANforge)] LANforge upsteram-port to use (eth1, etc) --upstream_port eth2", required=True)
parser.add_argument("--lfresource", type=str, help="[traffic generation configuration (LANforge)] LANforge resource ID for the station --lfresource 1")
parser.add_argument("--lfresource2", type=str, help="[traffic generation configuration (LANforge)] LANforge resource ID for the upstream port system ")
@@ -747,7 +747,7 @@ def main():
if (args.band == '6g'):
if (args.vht160):
logg.info("creating station with VHT160 set: {} on radio {}".format(args.station, args.radio))
print()
logg.info("cwd lf_associate_ap.pl: {dir}".format(dir=os.getcwd()))
subprocess.run(["./lf_associate_ap.pl", "--mgr", lfmgr, "--radio", args.radio, "--ssid", args.ssid, "--passphrase", args.ssidpw,
"--security", args.security, "--upstream", args.upstream_port, "--first_ip", "DHCP",
"--first_sta", args.station, "--ieee80211w", args.ieee80211w, "--wifi_mode", args.wifi_mode, "--action", "add", "--xsec", "ht160_enable"], timeout=20, capture_output=True)

View File

@@ -154,7 +154,10 @@ class lf_check():
self.test_dict = {}
path_parent = os.path.dirname(os.getcwd())
os.chdir(path_parent)
# TODO have method to pass in other
# script directories , currently only top lanforge scripts directory and py-scripts
self.scripts_wd = os.getcwd()
self.lanforge_wd = os.path.dirname(os.getcwd())
self.results = ""
self.outfile = _outfile
self.outfile_name = _outfile_name
@@ -190,6 +193,8 @@ class lf_check():
# lanforge configuration
self.lf_mgr_ip = "192.168.0.102"
self.lf_mgr_port = "8080"
#TODO allow for json configuration
self.lf_mgr_ssh_port = "22"
self.lf_mgr_user = "lanforge"
self.lf_mgr_pass = "lanforge"
self.upstream_port = ""
@@ -310,7 +315,7 @@ class lf_check():
ssh = paramiko.SSHClient()
# automatically adds the missing host key
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('uname -n')
self.lanforge_system_node_version = stdout.readlines()
@@ -325,7 +330,7 @@ class lf_check():
ssh = paramiko.SSHClient()
# automatically adds the missing host key
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('cat /etc/fedora-release')
self.lanforge_fedora_version = stdout.readlines()
@@ -340,7 +345,7 @@ class lf_check():
ssh = paramiko.SSHClient()
# automatically adds the missing host key
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('uname -r')
self.lanforge_kernel_version = stdout.readlines()
@@ -355,7 +360,7 @@ class lf_check():
ssh = paramiko.SSHClient()
# automatically adds the missing host key
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command(
'./btserver --version | grep Version')
@@ -378,7 +383,7 @@ class lf_check():
ssh = paramiko.SSHClient()
# automatically adds the missing host key
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command(
'curl -H "Accept: application/json" http://{lanforge_ip}:8080 | json_pp | grep -A 7 "VersionInfo"'.format(lanforge_ip=self.lf_mgr_ip))
@@ -865,7 +870,8 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
self.test_dict[self.test]['load_db']))
try:
os.chdir(self.scripts_wd)
# self.logger.info("Current Working Directory {}".format(os.getcwd()))
self.logger.info("Current Working Directory {}".format(os.getcwd()))
except BaseException:
self.logger.info(
"failed to change to {}".format(
@@ -875,8 +881,9 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
self.test_dict[self.test]['command'], cmd_args)
self.logger.info("command: {}".format(command))
self.logger.info("cmd_args {}".format(cmd_args))
# TODO this code is always run since there is a default
# TODO change name to file obj to make more unde
# TODO change name to file obj to make more understandable
if self.outfile_name is not None:
stdout_log_txt = os.path.join(
self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, self.test))
@@ -893,6 +900,7 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
# in command output
command_to_run = command
command_to_run = shlex.split(command_to_run)
self.logger.info(
"running {command_to_run}".format(
command_to_run=command_to_run))
@@ -902,28 +910,26 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
"Test start: {time} Timeout: {timeout}".format(
time=self.test_start_time, timeout=self.test_timeout))
start_time = datetime.datetime.now()
# try:
# process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log,
# universal_newlines=True)
# # if there is a better solution please propose, the
# # TIMEOUT Result is different then FAIL
# try:
# if int(self.test_timeout != 0):
# process.wait(timeout=int(self.test_timeout))
# else:
# process.wait()
# except subprocess.TimeoutExpired:
# process.terminate()
# self.test_result = "TIMEOUT"
# except BaseException:
# print(
# "No such file or directory with command: {}".format(command))
# self.logger.info(
# "No such file or directory with command: {}".format(command))
summary_output = ''
# have stderr go to stdout
summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
try:
summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True)
except FileNotFoundError:
# TODO tx_power is one directory up from py-scripts
self.logger.info("FileNotFoundError will try to execute from lanforge Top directory")
os.chdir(self.lanforge_wd)
self.logger.info("Changed Current Working Directory to {}".format(os.getcwd()))
summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True)
except PermissionError:
self.logger.info("PermissionError on execution of {command}".format(command=command_to_run))
except IsADirectoryError:
self.logger.info("IsADirectoryError on execution of {command}".format(command=command_to_run))
for line in iter(summary.stdout.readline, ''):
self.logger.info(line)
summary_output += line
@@ -935,6 +941,11 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
except TimeoutExpired:
summary.terminate
self.test_result = "TIMEOUT"
# TODO will change back to the scripts_wd since a script like tx_power.py will run in the parent directoy to
# py-scripts
os.chdir(self.scripts_wd)
self.logger.info("Current Working Directory {}".format(os.getcwd()))
self.logger.info(summary_output)
stdout_log.write(summary_output)
stdout_log.close()
@@ -1031,26 +1042,7 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
self.test_result = "Time Out"
background = self.background_purple
else:
# stderr_log_size = os.path.getsize(stderr_log_txt)
# if stderr_log_size > 0:
# self.logger.info(
# "File: {} is not empty: {}".format(
# stderr_log_txt, str(stderr_log_size)))
# text = open(stderr_log_txt).read()
# if 'Error' in text:
# self.text_result = "Failure"
# background = self.background_red
# else:
# self.text_result = "Success"
# background = self.background_green
# else:
# self.logger.info(
# "File: {} is empty: {}".format(
# stderr_log_txt, str(stderr_log_size)))
# self.test_result = "Success"
# background = self.background_green
# Check to see if there is an error in stdout_log
# TODO use the test result returned
# TODO use summary returned from subprocess
if stdout_log_size > 0:
text = open(stdout_log_txt).read()
# for 5.4.3 only TestTag was not present
@@ -1065,8 +1057,13 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
# leave the space in after error to not pick up tx
# errors or rx errors
elif 'ERROR: ' in text:
self.test_result = "Some Tests Failed"
background = self.background_orange
# TODO check for return code from script
if 'New and Old channel width are same' in text:
self.test_result = "Success"
background = self.background_green
else:
self.test_result = "Some Tests Failed"
background = self.background_orange
elif 'ERROR: FAILED ' in text:
self.test_result = "Some Tests Failed"
background = self.background_orange
@@ -1286,6 +1283,8 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
default=None)
parser.add_argument('--production', help="--production stores true, sends email results to production email list",
action='store_true')
parser.add_argument('--no_send_email', help="--no_send_email stores true, to not send emails results to engineer or production email list", action='store_true')
parser.add_argument('--outfile', help="--outfile <Output Generic Name> used as base name for all files generated",
default="")
parser.add_argument('--logfile', help="--logfile <logfile Name> logging for output of lf_check.py script",
@@ -1343,6 +1342,7 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
server_override = args.server_override
db_override = args.db_override
# TODO create config for --no_email
if args.production:
production = True
print("Email to production list")
@@ -1583,7 +1583,10 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
print("exception write_pdf_with_timestamp()")
print("lf_check_html_report: " + html_report)
check.send_results_email(report_file=html_report)
if args.no_send_email:
print("send email not set")
else:
check.send_results_email(report_file=html_report)
#
if args.update_latest:
report_path = os.path.dirname(html_report)