diff --git a/py-scripts/tools/lf_check.py b/py-scripts/tools/lf_check.py
index 5b49a187..4eb1efc5 100755
--- a/py-scripts/tools/lf_check.py
+++ b/py-scripts/tools/lf_check.py
@@ -209,6 +209,16 @@ class lf_check():
self.ftp_test_ip = ""
self.test_ip = ""
+ # current test running
+ self.test = None
+ self.report_index = 0
+ self.iteration = 0
+ self.channel_list = []
+ self.nss_list = []
+ self.bandwidth_list = []
+ self.tx_power_list = []
+
+
# section DUT
# dut selection
# note the name will be set as --set DUT_NAME ASUSRT-AX88U, this is not
@@ -724,6 +734,444 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
# The underlying netsmith objects
sleep(15)
+ def run_script(self):
+ # The network arguments need to be changed when in a list
+ for index, args_list_element in enumerate(
+ self.test_dict[self.test]['args_list']):
+ if 'ssid_idx=' in args_list_element:
+ # print("args_list_element {}".format(args_list_element))
+ # get ssid_idx used in the test as an index for the
+ # dictionary
+ ssid_idx_number = args_list_element.split(
+ 'ssid_idx=')[-1].split()[0]
+ print(
+ "ssid_idx_number: {}".format(ssid_idx_number))
+ # index into the DUT network index
+ idx = "ssid_idx={}".format(ssid_idx_number)
+ print("idx: {}".format(idx))
+ if 'SSID_USED' in args_list_element:
+ self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
+ 'SSID_USED', self.wireless_network_dict[idx]['SSID_USED'])
+ if 'SECURITY_USED' in args_list_element:
+ self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
+ 'SECURITY_USED', self.wireless_network_dict[idx]['SECURITY_USED'])
+ if 'SSID_PW_USED' in args_list_element:
+ self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
+ 'SSID_PW_USED', self.wireless_network_dict[idx]['SSID_PW_USED'])
+ if 'BSSID_TO_USE' in args_list_element:
+ self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
+ 'BSSID_TO_USE', self.wireless_network_dict[idx]['BSSID_TO_USE'])
+ # use_ssid_idx is ephemeral and used only for
+ # variable replacement , remove
+ tmp_idx = "use_ssid_idx={}".format(ssid_idx_number)
+ if tmp_idx in args_list_element:
+ self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
+ tmp_idx, '')
+ # leave in for checking the command line arguments
+ print(
+ "self.test_dict[self.test]['args_list']: {}".format(
+ self.test_dict[self.test]['args_list']))
+ # Walk all the args in the args list then construct the
+ # arguments
+ if self.test_dict[self.test]['args'] == "":
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(self.test_dict[self.test]['args'],
+ ''.join(self.test_dict[self.test][
+ 'args_list']))
+ if 'DATABASE_SQLITE' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'DATABASE_SQLITE', self.database_sqlite)
+ if 'HTTP_TEST_IP' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'HTTP_TEST_IP', self.http_test_ip)
+ if 'FTP_TEST_IP' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'FTP_TEST_IP', self.ftp_test_ip)
+ if 'TEST_IP' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'TEST_IP', self.test_ip)
+ if 'LF_MGR_USER' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'LF_MGR_USER', self.lf_mgr_user)
+ if 'LF_MGR_PASS' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'LF_MGR_PASS', self.lf_mgr_pass)
+ if 'LF_MGR_IP' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'LF_MGR_IP', self.lf_mgr_ip)
+ if 'LF_MGR_PORT' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'LF_MGR_PORT', self.lf_mgr_port)
+ # DUT Configuration
+ if 'USE_DUT_NAME' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'USE_DUT_NAME', self.use_dut_name)
+ if 'DUT_HW' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'DUT_HW', self.dut_hw)
+ if 'DUT_SW' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'DUT_SW', self.dut_sw)
+ if 'DUT_MODEL' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'DUT_MODEL', self.dut_model)
+ if 'DUT_SN' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'DUT_SN', self.dut_serial)
+ if 'UPSTREAM_PORT' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace('UPSTREAM_PORT',
+ self.upstream_port)
+ if 'UPSTREAM_ALIAS' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace('UPSTREAM_ALIAS',
+ self.upstream_alias)
+ # lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location
+ # of the reports when the reports are pulled.
+ if 'REPORT_PATH' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'REPORT_PATH', self.report_path)
+ if 'TEST_SERVER' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'TEST_SERVER', self.test_server)
+ if 'DUT_SET_NAME' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace('DUT_SET_NAME',
+ self.dut_set_name)
+ if 'TEST_RIG' in self.test_dict[self.test]['args']:
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
+ 'TEST_RIG', self.test_rig)
+ # END of command line arg processing
+ if self.test_dict[self.test]['args'] == "":
+ self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(self.test_dict[self.test]['args'],
+ ''.join(self.test_dict[self.test][
+ 'args_list']))
+ if 'timeout' in self.test_dict[self.test]:
+ self.logger.info(
+ "timeout : {}".format(
+ self.test_dict[self.test]['timeout']))
+ self.test_timeout = int(
+ self.test_dict[self.test]['timeout'])
+ else:
+ self.test_timeout = self.test_timeout_default
+ if 'load_db' in self.test_dict[self.test]:
+ self.logger.info(
+ "load_db : {}".format(
+ self.test_dict[self.test]['load_db']))
+ if str(self.test_dict[self.test]['load_db']).lower() != "none" and str(
+ self.test_dict[self.test]['load_db']).lower() != "skip":
+ try:
+ self.load_custom_database(
+ self.test_dict[self.test]['load_db'])
+ except BaseException:
+ self.logger.info("custom database failed to load check existance and location: {}".format(
+ self.test_dict[self.test]['load_db']))
+ try:
+ os.chdir(self.scripts_wd)
+ # self.logger.info("Current Working Directory {}".format(os.getcwd()))
+ except BaseException:
+ self.logger.info(
+ "failed to change to {}".format(
+ self.scripts_wd))
+ cmd_args = "{}".format(self.test_dict[self.test]['args'])
+ command = "./{} {}".format(
+ self.test_dict[self.test]['command'], cmd_args)
+ self.logger.info("command: {}".format(command))
+ self.logger.info("cmd_args {}".format(cmd_args))
+ # TODO this code is always run since there is a default
+ # TODO change name to file obj to make more unde
+ if self.outfile_name is not None:
+ stdout_log_txt = os.path.join(
+ self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, self.test))
+ self.logger.info(
+ "stdout_log_txt: {}".format(stdout_log_txt))
+ stdout_log = open(stdout_log_txt, 'a')
+ stderr_log_txt = os.path.join(
+ self.log_path, "{}-{}-stderr.txt".format(self.outfile_name, self.test))
+ self.logger.info(
+ "stderr_log_txt: {}".format(stderr_log_txt))
+ # stderr_log = open(stderr_log_txt, 'a')
+ # need to take into account --raw_line parameters thus need to use shlex.split
+ # need to preserve command to have correct command syntax
+ # in command output
+ command_to_run = command
+ command_to_run = shlex.split(command_to_run)
+ self.logger.info(
+ "running {command_to_run}".format(
+ command_to_run=command_to_run))
+ self.test_start_time = str(datetime.datetime.now().strftime(
+ "%Y-%m-%d-%H-%M-%S")).replace(':', '-')
+ self.logger.info(
+ "Test start: {time} Timeout: {timeout}".format(
+ time=self.test_start_time, timeout=self.test_timeout))
+ start_time = datetime.datetime.now()
+ # try:
+ # process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log,
+ # universal_newlines=True)
+ # # if there is a better solution please propose, the
+ # # TIMEOUT Result is different then FAIL
+ # try:
+ # if int(self.test_timeout != 0):
+ # process.wait(timeout=int(self.test_timeout))
+ # else:
+ # process.wait()
+ # except subprocess.TimeoutExpired:
+ # process.terminate()
+ # self.test_result = "TIMEOUT"
+ # except BaseException:
+ # print(
+ # "No such file or directory with command: {}".format(command))
+ # self.logger.info(
+ # "No such file or directory with command: {}".format(command))
+ summary_output = ''
+ # have stderr go to stdout
+ summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ universal_newlines=True)
+ for line in iter(summary.stdout.readline, ''):
+ self.logger.info(line)
+ summary_output += line
+ try:
+ if int(self.test_timeout != 0):
+ summary.wait(timeout=int(self.test_timeout))
+ else:
+ summary.wait()
+ except TimeoutExpired:
+ summary.terminate
+ self.test_result = "TIMEOUT"
+ self.logger.info(summary_output)
+ stdout_log.write(summary_output)
+ stdout_log.close()
+ end_time = datetime.datetime.now()
+ self.test_end_time = str(datetime.datetime.now().strftime(
+ "%Y-%m-%d-%H-%M-%S")).replace(':', '-')
+ self.logger.info(
+ "Test end time {time}".format(
+ time=self.test_end_time))
+ time_delta = end_time - start_time
+ minutes, seconds = divmod(time_delta.seconds, 60)
+ hours, minutes = divmod(minutes, 60)
+ self.duration = "{day}d {hours}h {minutes}m {seconds}s {msec} ms".format(
+ day=time_delta.days, hours=hours, minutes=minutes, seconds=seconds, msec=time_delta.microseconds)
+ # If collect meta data is set
+ meta_data_path = ""
+ # Will gather data even on a TIMEOUT condition as there is
+ # some results on longer tests
+ # TODO use the summary
+ stdout_log_size = os.path.getsize(stdout_log_txt)
+ if stdout_log_size > 0:
+ stdout_log_fd = open(stdout_log_txt)
+ # "Report Location:::/home/lanforge/html-reports/wifi-capacity-2021-08-17-04-02-56"
+ #
+ for line in stdout_log_fd:
+ if "Report Location" in line:
+ report_index += 1
+ if iteration == report_index:
+ meta_data_path = line.replace('"', '')
+ meta_data_path = meta_data_path.replace(
+ 'Report Location:::', '')
+ meta_data_path = meta_data_path.split(
+ '/')[-1]
+ meta_data_path = meta_data_path.strip()
+ meta_data_path = self.report_path + '/' + meta_data_path + '/meta.txt'
+ break
+ stdout_log_fd.close()
+ if meta_data_path != "":
+ try:
+ meta_data_fd = open(meta_data_path, 'w+')
+ meta_data_fd.write(
+ '$ Generated by Candela Technologies LANforge network testing tool\n')
+ meta_data_fd.write(
+ "test_run {test_run}\n".format(
+ test_run=self.report_path))
+ meta_data_fd.write(
+ "file_meta {path}\n".format(
+ path=meta_data_path))
+ meta_data_fd.write(
+ 'lanforge_gui_version: {gui_version} \n'.format(
+ gui_version=self.lanforge_gui_version))
+ meta_data_fd.write(
+ 'lanforge_server_version: {server_version} \n'.format(
+ server_version=self.lanforge_server_version))
+ meta_data_fd.write('$ LANforge command\n')
+ meta_data_fd.write(
+ "command {command}\n".format(
+ command=command))
+ # split command at test-tag , at rest of string once at
+ # the actual test-tag value
+ test_tag = command.split(
+ 'test_tag', maxsplit=1)[-1].split(maxsplit=1)[0]
+ test_tag = test_tag.replace("'", "")
+ meta_data_fd.write('$ LANforge test tag\n')
+ meta_data_fd.write(
+ "test_tag {test_tag}\n".format(
+ test_tag=test_tag))
+ # LANforge information is a list thus [0]
+ meta_data_fd.write('$ LANforge Information\n')
+ meta_data_fd.write(
+ "lanforge_system_node {lanforge_system_node}\n".format(
+ lanforge_system_node=self.lanforge_system_node_version[0]))
+ meta_data_fd.write(
+ "lanforge_kernel_version {lanforge_kernel_version}\n".format(
+ lanforge_kernel_version=self.lanforge_kernel_version[0]))
+ meta_data_fd.write(
+ "lanforge_fedora_version {lanforge_fedora_version}\n".format(
+ lanforge_fedora_version=self.lanforge_fedora_version[0]))
+ meta_data_fd.write(
+ "lanforge_gui_version_full {lanforge_gui_version_full}\n".format(
+ lanforge_gui_version_full=self.lanforge_gui_version_full))
+ meta_data_fd.write(
+ "lanforge_server_version_full {lanforge_server_version_full}\n".format(
+ lanforge_server_version_full=self.lanforge_server_version_full[0]))
+ meta_data_fd.close()
+ except ValueError as err:
+ self.logger.critical("unable to write meta {meta_data_path} : {msg})".format(meta_data_path=meta_data_path, msg=err))
+ except BaseException as err:
+ self.logger.critical("BaseException unable to write meta {meta_data_path} : {msg}".format(meta_data_path=meta_data_path, msg=err))
+ # Timeout needs to be reported and not overwriten
+ if self.test_result == "TIMEOUT":
+ self.logger.info(
+ "TIMEOUT FAILURE, Check LANforge Radios")
+ self.test_result = "Time Out"
+ background = self.background_purple
+ else:
+ # stderr_log_size = os.path.getsize(stderr_log_txt)
+ # if stderr_log_size > 0:
+ # self.logger.info(
+ # "File: {} is not empty: {}".format(
+ # stderr_log_txt, str(stderr_log_size)))
+ # text = open(stderr_log_txt).read()
+ # if 'Error' in text:
+ # self.text_result = "Failure"
+ # background = self.background_red
+ # else:
+ # self.text_result = "Success"
+ # background = self.background_green
+ # else:
+ # self.logger.info(
+ # "File: {} is empty: {}".format(
+ # stderr_log_txt, str(stderr_log_size)))
+ # self.test_result = "Success"
+ # background = self.background_green
+ # Check to see if there is an error in stdout_log
+ # TODO use the test result returned
+ if stdout_log_size > 0:
+ text = open(stdout_log_txt).read()
+ # for 5.4.3 only TestTag was not present
+ if 'ERROR: Could not find component: TestTag' in text:
+ self.test_result = "Success"
+ background = self.background_green
+ # probe command for test_ip_variable_time.py has
+ # the word alloc error and erros in it
+ elif 'alloc error' in text:
+ self.test_result = "Success"
+ background = self.background_green
+ # leave the space in after error to not pick up tx
+ # errors or rx errors
+ elif 'ERROR: ' in text:
+ self.test_result = "Some Tests Failed"
+ background = self.background_orange
+ elif 'ERROR: FAILED ' in text:
+ self.test_result = "Some Tests Failed"
+ background = self.background_orange
+ elif 'error ' in text.lower():
+ if 'passes: zero test results' in text:
+ self.test_result = "Success"
+ background = self.background_green
+ else:
+ self.test_result = "Test Errors"
+ background = self.background_red
+ elif 'tests failed' in text.lower():
+ self.test_result = "Some Tests Failed"
+ background = self.background_orange
+ else:
+ self.test_result = "Success"
+ background = self.background_green
+ else:
+ # if stdout empty that is a failure also
+ self.test_result = "Failure"
+ background = self.background_red
+ # Total up test, tests success, tests failure, tests
+ # timeouts
+ self.tests_run += 1
+ if self.test_result == "Success":
+ self.tests_success += 1
+ elif self.test_result == "Failure":
+ self.tests_failure += 1
+ elif self.test_result == "Some Tests Failed":
+ self.tests_some_failure += 1
+ elif self.test_result == "Test Errors":
+ self.tests_failure += 1
+ elif self.test_result == "TIMEOUT":
+ self.tests_timeout += 1
+ if 'lf_qa' in command:
+ line_list = open(stdout_log_txt).readlines()
+ for line in line_list:
+ if 'html report:' in line:
+ self.qa_report_html = line
+ print(
+ "html_report: {report}".format(
+ report=self.qa_report_html))
+ break
+ self.qa_report_html = self.qa_report_html.replace(
+ 'html report: ', '')
+ # stdout_log_link is used for the email reporting to have
+ # the corrected path
+ stdout_log_link = str(stdout_log_txt).replace(
+ '/home/lanforge', '')
+ stderr_log_link = str(stderr_log_txt).replace(
+ '/home/lanforge', '')
+ if command.find(' ') > 1:
+ short_cmd = command[0:command.find(' ')]
+ else:
+ short_cmd = command
+ self.html_results += """
+
| """ + str(self.test) + """ |
+ """ + str(short_cmd) + """ |
+ """ + str(self.duration) + """ |
+ """ + str(self.test_start_time) + """ |
+ """ + str(self.test_end_time) + """ |
+ """ + str(self.test_result) + """
+ | STDOUT | """
+ if self.test_result == "Failure":
+ self.html_results += """STDERR | """
+ elif self.test_result == "Time Out":
+ self.html_results += """STDERR | """
+ else:
+ self.html_results += """ | """
+ self.html_results += """
"""
+ # TODO - place copy button at end and selectable , so
+ # individual sections may be copied
+ if command != short_cmd:
+ # Hover and copy button snows up
+ self.html_results += f"""|
+ Copy
+ {command}
+ |
+ """.format(command=command)
+ # TODO - place a point button for not have the copy
+ # hover, no copy button
+ '''self.html_results += f"""|
+ {command}
+ |
+ """.format(command=command)
+ '''
+ # nocopy - example
+ '''
+ self.html_results += f"""|
+ {command}
+ |
+ """.format(command=command)
+ '''
+ row = [
+ self.test,
+ command,
+ self.test_result,
+ stdout_log_txt,
+ stderr_log_txt]
+ self.csv_results_writer.writerow(row)
+ self.csv_results_file.flush()
+ # self.logger.info("row: {}".format(row))
+ self.logger.info("test: {} executed".format(self.test))
+
+
def run_script_test(self):
self.start_html_results()
self.start_csv_results()
@@ -735,499 +1183,33 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
suite_time=self.suite_start_time))
# Configure Tests
- for test in self.test_dict:
- if self.test_dict[test]['enabled'] == "FALSE":
- self.logger.info("test: {} skipped".format(test))
+ for self.test in self.test_dict:
+ if self.test_dict[self.test]['enabled'] == "FALSE":
+ self.logger.info("test: {} skipped".format(self.test))
# load the default database
- elif self.test_dict[test]['enabled'] == "TRUE":
+ elif self.test_dict[self.test]['enabled'] == "TRUE":
# TODO Place test interations here
- if 'iterations' in self.test_dict[test]:
+ if 'iterations' in self.test_dict[self.test]:
self.logger.info(
"iterations : {}".format(
- self.test_dict[test]['iterations']))
+ self.test_dict[self.test]['iterations']))
self.test_iterations = int(
- self.test_dict[test]['iterations'])
+ self.test_dict[self.test]['iterations'])
else:
self.test_iterations = self.test_iterations_default
iteration = 0
# log may contain multiple runs - this helps put the meta.txt
# in right directory
- report_index = 0
+ self.report_index = 0
for iteration in range(self.test_iterations):
iteration += 1
- # The network arguments need to be changed when in a list
- for index, args_list_element in enumerate(
- self.test_dict[test]['args_list']):
- if 'ssid_idx=' in args_list_element:
- # print("args_list_element {}".format(args_list_element))
- # get ssid_idx used in the test as an index for the
- # dictionary
- ssid_idx_number = args_list_element.split(
- 'ssid_idx=')[-1].split()[0]
- print(
- "ssid_idx_number: {}".format(ssid_idx_number))
- # index into the DUT network index
- idx = "ssid_idx={}".format(ssid_idx_number)
- print("idx: {}".format(idx))
- if 'SSID_USED' in args_list_element:
- self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
- 'SSID_USED', self.wireless_network_dict[idx]['SSID_USED'])
- if 'SECURITY_USED' in args_list_element:
- self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
- 'SECURITY_USED', self.wireless_network_dict[idx]['SECURITY_USED'])
- if 'SSID_PW_USED' in args_list_element:
- self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
- 'SSID_PW_USED', self.wireless_network_dict[idx]['SSID_PW_USED'])
- if 'BSSID_TO_USE' in args_list_element:
- self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
- 'BSSID_TO_USE', self.wireless_network_dict[idx]['BSSID_TO_USE'])
-
- # use_ssid_idx is ephemeral and used only for
- # variable replacement , remove
- tmp_idx = "use_ssid_idx={}".format(ssid_idx_number)
- if tmp_idx in args_list_element:
- self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
- tmp_idx, '')
-
- # leave in for checking the command line arguments
- print(
- "self.test_dict[test]['args_list']: {}".format(
- self.test_dict[test]['args_list']))
-
- # Walk all the args in the args list then construct the
- # arguments
- if self.test_dict[test]['args'] == "":
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'],
- ''.join(self.test_dict[test][
- 'args_list']))
- if 'DATABASE_SQLITE' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'DATABASE_SQLITE', self.database_sqlite)
- if 'HTTP_TEST_IP' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'HTTP_TEST_IP', self.http_test_ip)
- if 'FTP_TEST_IP' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'FTP_TEST_IP', self.ftp_test_ip)
- if 'TEST_IP' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'TEST_IP', self.test_ip)
-
- if 'LF_MGR_USER' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'LF_MGR_USER', self.lf_mgr_user)
- if 'LF_MGR_PASS' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'LF_MGR_PASS', self.lf_mgr_pass)
-
- if 'LF_MGR_IP' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'LF_MGR_IP', self.lf_mgr_ip)
- if 'LF_MGR_PORT' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'LF_MGR_PORT', self.lf_mgr_port)
-
- # DUT Configuration
- if 'USE_DUT_NAME' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'USE_DUT_NAME', self.use_dut_name)
- if 'DUT_HW' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'DUT_HW', self.dut_hw)
- if 'DUT_SW' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'DUT_SW', self.dut_sw)
- if 'DUT_MODEL' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'DUT_MODEL', self.dut_model)
- if 'DUT_SN' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'DUT_SN', self.dut_serial)
-
- if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',
- self.upstream_port)
- if 'UPSTREAM_ALIAS' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_ALIAS',
- self.upstream_alias)
-
- # lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location
- # of the reports when the reports are pulled.
- if 'REPORT_PATH' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'REPORT_PATH', self.report_path)
-
- if 'TEST_SERVER' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'TEST_SERVER', self.test_server)
-
- if 'DUT_SET_NAME' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME',
- self.dut_set_name)
- if 'TEST_RIG' in self.test_dict[test]['args']:
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
- 'TEST_RIG', self.test_rig)
-
- # END of command line arg processing
- if self.test_dict[test]['args'] == "":
- self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'],
- ''.join(self.test_dict[test][
- 'args_list']))
-
- if 'timeout' in self.test_dict[test]:
- self.logger.info(
- "timeout : {}".format(
- self.test_dict[test]['timeout']))
- self.test_timeout = int(
- self.test_dict[test]['timeout'])
- else:
- self.test_timeout = self.test_timeout_default
-
- if 'load_db' in self.test_dict[test]:
- self.logger.info(
- "load_db : {}".format(
- self.test_dict[test]['load_db']))
- if str(self.test_dict[test]['load_db']).lower() != "none" and str(
- self.test_dict[test]['load_db']).lower() != "skip":
- try:
- self.load_custom_database(
- self.test_dict[test]['load_db'])
- except BaseException:
- self.logger.info("custom database failed to load check existance and location: {}".format(
- self.test_dict[test]['load_db']))
-
- try:
- os.chdir(self.scripts_wd)
- # self.logger.info("Current Working Directory {}".format(os.getcwd()))
- except BaseException:
- self.logger.info(
- "failed to change to {}".format(
- self.scripts_wd))
- cmd_args = "{}".format(self.test_dict[test]['args'])
- command = "./{} {}".format(
- self.test_dict[test]['command'], cmd_args)
- self.logger.info("command: {}".format(command))
- self.logger.info("cmd_args {}".format(cmd_args))
-
- # TODO this code is always run since there is a default
- # TODO change name to file obj to make more unde
- if self.outfile_name is not None:
- stdout_log_txt = os.path.join(
- self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, test))
- self.logger.info(
- "stdout_log_txt: {}".format(stdout_log_txt))
- stdout_log = open(stdout_log_txt, 'a')
- stderr_log_txt = os.path.join(
- self.log_path, "{}-{}-stderr.txt".format(self.outfile_name, test))
- self.logger.info(
- "stderr_log_txt: {}".format(stderr_log_txt))
- stderr_log = open(stderr_log_txt, 'a')
-
- # need to take into account --raw_line parameters thus need to use shlex.split
- # need to preserve command to have correct command syntax
- # in command output
- command_to_run = command
- command_to_run = shlex.split(command_to_run)
- self.logger.info(
- "running {command_to_run}".format(
- command_to_run=command_to_run))
- self.test_start_time = str(datetime.datetime.now().strftime(
- "%Y-%m-%d-%H-%M-%S")).replace(':', '-')
- self.logger.info(
- "Test start: {time} Timeout: {timeout}".format(
- time=self.test_start_time, timeout=self.test_timeout))
- start_time = datetime.datetime.now()
- # try:
- # process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log,
- # universal_newlines=True)
- # # if there is a better solution please propose, the
- # # TIMEOUT Result is different then FAIL
- # try:
- # if int(self.test_timeout != 0):
- # process.wait(timeout=int(self.test_timeout))
- # else:
- # process.wait()
- # except subprocess.TimeoutExpired:
- # process.terminate()
- # self.test_result = "TIMEOUT"
-#
- # except BaseException:
- # print(
- # "No such file or directory with command: {}".format(command))
- # self.logger.info(
- # "No such file or directory with command: {}".format(command))
- summary_output = ''
- # have stderr go to stdout
- summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
- universal_newlines=True)
- for line in iter(summary.stdout.readline, ''):
- self.logger.info(line)
- summary_output += line
- try:
- if int(self.test_timeout != 0):
- summary.wait(timeout=int(self.test_timeout))
- else:
- summary.wait()
- except TimeoutExpired:
- summary.terminate
- self.test_result = "TIMEOUT"
-
- self.logger.info(summary_output)
-
- stdout_log.write(summary_output)
- stdout_log.close()
- end_time = datetime.datetime.now()
- self.test_end_time = str(datetime.datetime.now().strftime(
- "%Y-%m-%d-%H-%M-%S")).replace(':', '-')
- self.logger.info(
- "Test end time {time}".format(
- time=self.test_end_time))
-
- time_delta = end_time - start_time
- minutes, seconds = divmod(time_delta.seconds, 60)
- hours, minutes = divmod(minutes, 60)
-
- self.duration = "{day}d {hours}h {minutes}m {seconds}s {msec} ms".format(
- day=time_delta.days, hours=hours, minutes=minutes, seconds=seconds, msec=time_delta.microseconds)
-
- # If collect meta data is set
- meta_data_path = ""
- # Will gather data even on a TIMEOUT condition as there is
- # some results on longer tests
- # TODO use the summary
- stdout_log_size = os.path.getsize(stdout_log_txt)
- if stdout_log_size > 0:
- stdout_log_fd = open(stdout_log_txt)
- # "Report Location:::/home/lanforge/html-reports/wifi-capacity-2021-08-17-04-02-56"
- #
-
- for line in stdout_log_fd:
- if "Report Location" in line:
- report_index += 1
- if iteration == report_index:
- meta_data_path = line.replace('"', '')
- meta_data_path = meta_data_path.replace(
- 'Report Location:::', '')
- meta_data_path = meta_data_path.split(
- '/')[-1]
- meta_data_path = meta_data_path.strip()
- meta_data_path = self.report_path + '/' + meta_data_path + '/meta.txt'
- break
- stdout_log_fd.close()
- if meta_data_path != "":
- try:
- meta_data_fd = open(meta_data_path, 'w+')
- meta_data_fd.write(
- '$ Generated by Candela Technologies LANforge network testing tool\n')
- meta_data_fd.write(
- "test_run {test_run}\n".format(
- test_run=self.report_path))
- meta_data_fd.write(
- "file_meta {path}\n".format(
- path=meta_data_path))
- meta_data_fd.write(
- 'lanforge_gui_version: {gui_version} \n'.format(
- gui_version=self.lanforge_gui_version))
- meta_data_fd.write(
- 'lanforge_server_version: {server_version} \n'.format(
- server_version=self.lanforge_server_version))
- meta_data_fd.write('$ LANforge command\n')
- meta_data_fd.write(
- "command {command}\n".format(
- command=command))
- # split command at test-tag , at rest of string once at
- # the actual test-tag value
- test_tag = command.split(
- 'test_tag', maxsplit=1)[-1].split(maxsplit=1)[0]
- test_tag = test_tag.replace("'", "")
- meta_data_fd.write('$ LANforge test tag\n')
- meta_data_fd.write(
- "test_tag {test_tag}\n".format(
- test_tag=test_tag))
- # LANforge information is a list thus [0]
- meta_data_fd.write('$ LANforge Information\n')
- meta_data_fd.write(
- "lanforge_system_node {lanforge_system_node}\n".format(
- lanforge_system_node=self.lanforge_system_node_version[0]))
- meta_data_fd.write(
- "lanforge_kernel_version {lanforge_kernel_version}\n".format(
- lanforge_kernel_version=self.lanforge_kernel_version[0]))
- meta_data_fd.write(
- "lanforge_fedora_version {lanforge_fedora_version}\n".format(
- lanforge_fedora_version=self.lanforge_fedora_version[0]))
- meta_data_fd.write(
- "lanforge_gui_version_full {lanforge_gui_version_full}\n".format(
- lanforge_gui_version_full=self.lanforge_gui_version_full))
- meta_data_fd.write(
- "lanforge_server_version_full {lanforge_server_version_full}\n".format(
- lanforge_server_version_full=self.lanforge_server_version_full[0]))
- meta_data_fd.close()
- except ValueError as err:
- self.logger.critical("unable to write meta {meta_data_path} : {msg})".format(meta_data_path=meta_data_path, msg=err))
- except BaseException as err:
- self.logger.critical("BaseException unable to write meta {meta_data_path} : {msg}".format(meta_data_path=meta_data_path, msg=err))
-
- # Timeout needs to be reported and not overwriten
- if self.test_result == "TIMEOUT":
- self.logger.info(
- "TIMEOUT FAILURE, Check LANforge Radios")
- self.test_result = "Time Out"
- background = self.background_purple
- else:
- # stderr_log_size = os.path.getsize(stderr_log_txt)
- # if stderr_log_size > 0:
- # self.logger.info(
- # "File: {} is not empty: {}".format(
- # stderr_log_txt, str(stderr_log_size)))
- # text = open(stderr_log_txt).read()
- # if 'Error' in text:
- # self.text_result = "Failure"
- # background = self.background_red
- # else:
- # self.text_result = "Success"
- # background = self.background_green
- # else:
- # self.logger.info(
- # "File: {} is empty: {}".format(
- # stderr_log_txt, str(stderr_log_size)))
- # self.test_result = "Success"
- # background = self.background_green
-
- # Check to see if there is an error in stdout_log
- # TODO use the test result returned
- if stdout_log_size > 0:
- text = open(stdout_log_txt).read()
- # for 5.4.3 only TestTag was not present
- if 'ERROR: Could not find component: TestTag' in text:
- self.test_result = "Success"
- background = self.background_green
- # probe command for test_ip_variable_time.py has
- # the word alloc error and erros in it
- elif 'alloc error' in text:
- self.test_result = "Success"
- background = self.background_green
- # leave the space in after error to not pick up tx
- # errors or rx errors
- elif 'ERROR: ' in text:
- self.test_result = "Some Tests Failed"
- background = self.background_orange
- elif 'ERROR: FAILED ' in text:
- self.test_result = "Some Tests Failed"
- background = self.background_orange
- elif 'error ' in text.lower():
- if 'passes: zero test results' in text:
- self.test_result = "Success"
- background = self.background_green
- else:
- self.test_result = "Test Errors"
- background = self.background_red
- elif 'tests failed' in text.lower():
- self.test_result = "Some Tests Failed"
- background = self.background_orange
- else:
- self.test_result = "Success"
- background = self.background_green
- else:
- # if stdout empty that is a failure also
- self.test_result = "Failure"
- background = self.background_red
-
- # Total up test, tests success, tests failure, tests
- # timeouts
- self.tests_run += 1
- if self.test_result == "Success":
- self.tests_success += 1
- elif self.test_result == "Failure":
- self.tests_failure += 1
- elif self.test_result == "Some Tests Failed":
- self.tests_some_failure += 1
- elif self.test_result == "Test Errors":
- self.tests_failure += 1
- elif self.test_result == "TIMEOUT":
- self.tests_timeout += 1
-
- if 'lf_qa' in command:
- line_list = open(stdout_log_txt).readlines()
- for line in line_list:
- if 'html report:' in line:
- self.qa_report_html = line
- print(
- "html_report: {report}".format(
- report=self.qa_report_html))
- break
-
- self.qa_report_html = self.qa_report_html.replace(
- 'html report: ', '')
-
- # stdout_log_link is used for the email reporting to have
- # the corrected path
- stdout_log_link = str(stdout_log_txt).replace(
- '/home/lanforge', '')
- stderr_log_link = str(stderr_log_txt).replace(
- '/home/lanforge', '')
- if command.find(' ') > 1:
- short_cmd = command[0:command.find(' ')]
- else:
- short_cmd = command
-
- self.html_results += """
- | """ + str(test) + """ |
- """ + str(short_cmd) + """ |
- """ + str(self.duration) + """ |
- """ + str(self.test_start_time) + """ |
- """ + str(self.test_end_time) + """ |
- """ + str(self.test_result) + """
- | STDOUT | """
- if self.test_result == "Failure":
- self.html_results += """STDERR | """
- elif self.test_result == "Time Out":
- self.html_results += """STDERR | """
- else:
- self.html_results += """ | """
-
- self.html_results += """
"""
- # TODO - place copy button at end and selectable , so
- # individual sections may be copied
- if command != short_cmd:
- # Hover and copy button snows up
- self.html_results += f"""|
- Copy
- {command}
- |
- """.format(command=command)
- # TODO - place a point button for not have the copy
- # hover, no copy button
- '''self.html_results += f"""|
- {command}
- |
- """.format(command=command)
- '''
- # nocopy - example
- '''
- self.html_results += f"""|
- {command}
- |
- """.format(command=command)
- '''
-
- row = [
- test,
- command,
- self.test_result,
- stdout_log_txt,
- stderr_log_txt]
- self.csv_results_writer.writerow(row)
- self.csv_results_file.flush()
- # self.logger.info("row: {}".format(row))
- self.logger.info("test: {} executed".format(test))
+ self.run_script()
else:
self.logger.info(
- "enable value {} for test: {} ".format(self.test_dict[test]['enabled'], test))
+ "enable value {} for test: {} ".format(self.test_dict[self.test]['enabled'], self.test))
suite_end_time = datetime.datetime.now()
self.suite_end_time = str(datetime.datetime.now().strftime(
@@ -1401,7 +1383,7 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
- file_handler = logging.FileHandler(logfile, "w")
+ file_handler = logging.FileHandler(logfile, "w+")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# allows to logging to file and stdout