diff --git a/lf_tx_power.py b/lf_tx_power.py index ede7302e..5dd59113 100755 --- a/lf_tx_power.py +++ b/lf_tx_power.py @@ -377,7 +377,7 @@ def main(): parser.add_argument('--beacon_dbm_diff', type=str, help="[tx power configuration] --beacon_dbm_diff is the delta that is allowed between the controller tx and the beacon measured", default="7") # traffic generation configuration (LANforge) - parser.add_argument("--lfmgr", type=str, help="[traffic generation configuration (LANforge)] LANforge Manager IP address --lfmgr 192.168.100.139", required=True) + parser.add_argument("--lfmgr", type=str, help="[traffic generation configuration (LANforge)] LANforge Manager IP address --lfmgr 192.168.100.178", required=True) parser.add_argument("--upstream_port", type=str, help="[traffic generation configuration (LANforge)] LANforge upsteram-port to use (eth1, etc) --upstream_port eth2", required=True) parser.add_argument("--lfresource", type=str, help="[traffic generation configuration (LANforge)] LANforge resource ID for the station --lfresource 1") parser.add_argument("--lfresource2", type=str, help="[traffic generation configuration (LANforge)] LANforge resource ID for the upstream port system ") @@ -747,7 +747,7 @@ def main(): if (args.band == '6g'): if (args.vht160): logg.info("creating station with VHT160 set: {} on radio {}".format(args.station, args.radio)) - print() + logg.info("cwd lf_associate_ap.pl: {dir}".format(dir=os.getcwd())) subprocess.run(["./lf_associate_ap.pl", "--mgr", lfmgr, "--radio", args.radio, "--ssid", args.ssid, "--passphrase", args.ssidpw, "--security", args.security, "--upstream", args.upstream_port, "--first_ip", "DHCP", "--first_sta", args.station, "--ieee80211w", args.ieee80211w, "--wifi_mode", args.wifi_mode, "--action", "add", "--xsec", "ht160_enable"], timeout=20, capture_output=True) diff --git a/py-scripts/tools/lf_check.py b/py-scripts/tools/lf_check.py index 692d8181..67936475 100755 --- a/py-scripts/tools/lf_check.py +++ b/py-scripts/tools/lf_check.py @@ -154,7 +154,10 @@ class lf_check(): self.test_dict = {} path_parent = os.path.dirname(os.getcwd()) os.chdir(path_parent) + # TODO have method to pass in other + # script directories , currently only top lanforge scripts directory and py-scripts self.scripts_wd = os.getcwd() + self.lanforge_wd = os.path.dirname(os.getcwd()) self.results = "" self.outfile = _outfile self.outfile_name = _outfile_name @@ -190,6 +193,8 @@ class lf_check(): # lanforge configuration self.lf_mgr_ip = "192.168.0.102" self.lf_mgr_port = "8080" + #TODO allow for json configuration + self.lf_mgr_ssh_port = "22" self.lf_mgr_user = "lanforge" self.lf_mgr_pass = "lanforge" self.upstream_port = "" @@ -310,7 +315,7 @@ class lf_check(): ssh = paramiko.SSHClient() # automatically adds the missing host key ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, + ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) stdin, stdout, stderr = ssh.exec_command('uname -n') self.lanforge_system_node_version = stdout.readlines() @@ -325,7 +330,7 @@ class lf_check(): ssh = paramiko.SSHClient() # automatically adds the missing host key ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, + ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) stdin, stdout, stderr = ssh.exec_command('cat /etc/fedora-release') self.lanforge_fedora_version = stdout.readlines() @@ -340,7 +345,7 @@ class lf_check(): ssh = paramiko.SSHClient() # automatically adds the missing host key ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, + ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) stdin, stdout, stderr = ssh.exec_command('uname -r') self.lanforge_kernel_version = stdout.readlines() @@ -355,7 +360,7 @@ class lf_check(): ssh = paramiko.SSHClient() # automatically adds the missing host key ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, + ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) stdin, stdout, stderr = ssh.exec_command( './btserver --version | grep Version') @@ -378,7 +383,7 @@ class lf_check(): ssh = paramiko.SSHClient() # automatically adds the missing host key ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, + ssh.connect(hostname=self.lf_mgr_ip, port=self.lf_mgr_ssh_port, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) stdin, stdout, stderr = ssh.exec_command( 'curl -H "Accept: application/json" http://{lanforge_ip}:8080 | json_pp | grep -A 7 "VersionInfo"'.format(lanforge_ip=self.lf_mgr_ip)) @@ -865,7 +870,8 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite""" self.test_dict[self.test]['load_db'])) try: os.chdir(self.scripts_wd) - # self.logger.info("Current Working Directory {}".format(os.getcwd())) + self.logger.info("Current Working Directory {}".format(os.getcwd())) + except BaseException: self.logger.info( "failed to change to {}".format( @@ -875,8 +881,9 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite""" self.test_dict[self.test]['command'], cmd_args) self.logger.info("command: {}".format(command)) self.logger.info("cmd_args {}".format(cmd_args)) + # TODO this code is always run since there is a default - # TODO change name to file obj to make more unde + # TODO change name to file obj to make more understandable if self.outfile_name is not None: stdout_log_txt = os.path.join( self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, self.test)) @@ -893,6 +900,7 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite""" # in command output command_to_run = command command_to_run = shlex.split(command_to_run) + self.logger.info( "running {command_to_run}".format( command_to_run=command_to_run)) @@ -902,28 +910,26 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite""" "Test start: {time} Timeout: {timeout}".format( time=self.test_start_time, timeout=self.test_timeout)) start_time = datetime.datetime.now() - # try: - # process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log, - # universal_newlines=True) - # # if there is a better solution please propose, the - # # TIMEOUT Result is different then FAIL - # try: - # if int(self.test_timeout != 0): - # process.wait(timeout=int(self.test_timeout)) - # else: - # process.wait() - # except subprocess.TimeoutExpired: - # process.terminate() - # self.test_result = "TIMEOUT" - # except BaseException: - # print( - # "No such file or directory with command: {}".format(command)) - # self.logger.info( - # "No such file or directory with command: {}".format(command)) summary_output = '' # have stderr go to stdout - summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + try: + summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) + + except FileNotFoundError: + # TODO tx_power is one directory up from py-scripts + self.logger.info("FileNotFoundError will try to execute from lanforge Top directory") + os.chdir(self.lanforge_wd) + self.logger.info("Changed Current Working Directory to {}".format(os.getcwd())) + summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=True) + + except PermissionError: + self.logger.info("PermissionError on execution of {command}".format(command=command_to_run)) + + except IsADirectoryError: + self.logger.info("IsADirectoryError on execution of {command}".format(command=command_to_run)) + for line in iter(summary.stdout.readline, ''): self.logger.info(line) summary_output += line @@ -935,6 +941,11 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite""" except TimeoutExpired: summary.terminate self.test_result = "TIMEOUT" + # TODO will change back to the scripts_wd since a script like tx_power.py will run in the parent directoy to + # py-scripts + os.chdir(self.scripts_wd) + self.logger.info("Current Working Directory {}".format(os.getcwd())) + self.logger.info(summary_output) stdout_log.write(summary_output) stdout_log.close() @@ -1031,26 +1042,7 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite""" self.test_result = "Time Out" background = self.background_purple else: - # stderr_log_size = os.path.getsize(stderr_log_txt) - # if stderr_log_size > 0: - # self.logger.info( - # "File: {} is not empty: {}".format( - # stderr_log_txt, str(stderr_log_size))) - # text = open(stderr_log_txt).read() - # if 'Error' in text: - # self.text_result = "Failure" - # background = self.background_red - # else: - # self.text_result = "Success" - # background = self.background_green - # else: - # self.logger.info( - # "File: {} is empty: {}".format( - # stderr_log_txt, str(stderr_log_size))) - # self.test_result = "Success" - # background = self.background_green - # Check to see if there is an error in stdout_log - # TODO use the test result returned + # TODO use summary returned from subprocess if stdout_log_size > 0: text = open(stdout_log_txt).read() # for 5.4.3 only TestTag was not present @@ -1065,8 +1057,13 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite""" # leave the space in after error to not pick up tx # errors or rx errors elif 'ERROR: ' in text: - self.test_result = "Some Tests Failed" - background = self.background_orange + # TODO check for return code from script + if 'New and Old channel width are same' in text: + self.test_result = "Success" + background = self.background_green + else: + self.test_result = "Some Tests Failed" + background = self.background_orange elif 'ERROR: FAILED ' in text: self.test_result = "Some Tests Failed" background = self.background_orange @@ -1286,6 +1283,8 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a default=None) parser.add_argument('--production', help="--production stores true, sends email results to production email list", action='store_true') + parser.add_argument('--no_send_email', help="--no_send_email stores true, to not send emails results to engineer or production email list", action='store_true') + parser.add_argument('--outfile', help="--outfile used as base name for all files generated", default="") parser.add_argument('--logfile', help="--logfile logging for output of lf_check.py script", @@ -1343,6 +1342,7 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a server_override = args.server_override db_override = args.db_override + # TODO create config for --no_email if args.production: production = True print("Email to production list") @@ -1583,7 +1583,10 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a print("exception write_pdf_with_timestamp()") print("lf_check_html_report: " + html_report) - check.send_results_email(report_file=html_report) + if args.no_send_email: + print("send email not set") + else: + check.send_results_email(report_file=html_report) # if args.update_latest: report_path = os.path.dirname(html_report)