lf_check.py : initial step in allowing test to input channel, nss, bw, tx power for tests

Signed-off-by: Chuck SmileyRekiere <chuck.smileyrekiere@candelatech.com>
This commit is contained in:
Chuck SmileyRekiere
2022-03-03 22:54:24 -07:00
committed by shivam
parent 7a46072338
commit f8d0949d94

View File

@@ -209,6 +209,16 @@ class lf_check():
self.ftp_test_ip = ""
self.test_ip = ""
# current test running
self.test = None
self.report_index = 0
self.iteration = 0
self.channel_list = []
self.nss_list = []
self.bandwidth_list = []
self.tx_power_list = []
# section DUT
# dut selection
# note the name will be set as --set DUT_NAME ASUSRT-AX88U, this is not
@@ -724,6 +734,444 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
# The underlying netsmith objects
sleep(15)
def run_script(self):
# The network arguments need to be changed when in a list
for index, args_list_element in enumerate(
self.test_dict[self.test]['args_list']):
if 'ssid_idx=' in args_list_element:
# print("args_list_element {}".format(args_list_element))
# get ssid_idx used in the test as an index for the
# dictionary
ssid_idx_number = args_list_element.split(
'ssid_idx=')[-1].split()[0]
print(
"ssid_idx_number: {}".format(ssid_idx_number))
# index into the DUT network index
idx = "ssid_idx={}".format(ssid_idx_number)
print("idx: {}".format(idx))
if 'SSID_USED' in args_list_element:
self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
'SSID_USED', self.wireless_network_dict[idx]['SSID_USED'])
if 'SECURITY_USED' in args_list_element:
self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
'SECURITY_USED', self.wireless_network_dict[idx]['SECURITY_USED'])
if 'SSID_PW_USED' in args_list_element:
self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
'SSID_PW_USED', self.wireless_network_dict[idx]['SSID_PW_USED'])
if 'BSSID_TO_USE' in args_list_element:
self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
'BSSID_TO_USE', self.wireless_network_dict[idx]['BSSID_TO_USE'])
# use_ssid_idx is ephemeral and used only for
# variable replacement , remove
tmp_idx = "use_ssid_idx={}".format(ssid_idx_number)
if tmp_idx in args_list_element:
self.test_dict[self.test]['args_list'][index] = self.test_dict[self.test]['args_list'][index].replace(
tmp_idx, '')
# leave in for checking the command line arguments
print(
"self.test_dict[self.test]['args_list']: {}".format(
self.test_dict[self.test]['args_list']))
# Walk all the args in the args list then construct the
# arguments
if self.test_dict[self.test]['args'] == "":
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(self.test_dict[self.test]['args'],
''.join(self.test_dict[self.test][
'args_list']))
if 'DATABASE_SQLITE' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'DATABASE_SQLITE', self.database_sqlite)
if 'HTTP_TEST_IP' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'HTTP_TEST_IP', self.http_test_ip)
if 'FTP_TEST_IP' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'FTP_TEST_IP', self.ftp_test_ip)
if 'TEST_IP' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'TEST_IP', self.test_ip)
if 'LF_MGR_USER' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'LF_MGR_USER', self.lf_mgr_user)
if 'LF_MGR_PASS' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'LF_MGR_PASS', self.lf_mgr_pass)
if 'LF_MGR_IP' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'LF_MGR_IP', self.lf_mgr_ip)
if 'LF_MGR_PORT' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'LF_MGR_PORT', self.lf_mgr_port)
# DUT Configuration
if 'USE_DUT_NAME' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'USE_DUT_NAME', self.use_dut_name)
if 'DUT_HW' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'DUT_HW', self.dut_hw)
if 'DUT_SW' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'DUT_SW', self.dut_sw)
if 'DUT_MODEL' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'DUT_MODEL', self.dut_model)
if 'DUT_SN' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'DUT_SN', self.dut_serial)
if 'UPSTREAM_PORT' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace('UPSTREAM_PORT',
self.upstream_port)
if 'UPSTREAM_ALIAS' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace('UPSTREAM_ALIAS',
self.upstream_alias)
# lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location
# of the reports when the reports are pulled.
if 'REPORT_PATH' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'REPORT_PATH', self.report_path)
if 'TEST_SERVER' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'TEST_SERVER', self.test_server)
if 'DUT_SET_NAME' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace('DUT_SET_NAME',
self.dut_set_name)
if 'TEST_RIG' in self.test_dict[self.test]['args']:
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(
'TEST_RIG', self.test_rig)
# END of command line arg processing
if self.test_dict[self.test]['args'] == "":
self.test_dict[self.test]['args'] = self.test_dict[self.test]['args'].replace(self.test_dict[self.test]['args'],
''.join(self.test_dict[self.test][
'args_list']))
if 'timeout' in self.test_dict[self.test]:
self.logger.info(
"timeout : {}".format(
self.test_dict[self.test]['timeout']))
self.test_timeout = int(
self.test_dict[self.test]['timeout'])
else:
self.test_timeout = self.test_timeout_default
if 'load_db' in self.test_dict[self.test]:
self.logger.info(
"load_db : {}".format(
self.test_dict[self.test]['load_db']))
if str(self.test_dict[self.test]['load_db']).lower() != "none" and str(
self.test_dict[self.test]['load_db']).lower() != "skip":
try:
self.load_custom_database(
self.test_dict[self.test]['load_db'])
except BaseException:
self.logger.info("custom database failed to load check existance and location: {}".format(
self.test_dict[self.test]['load_db']))
try:
os.chdir(self.scripts_wd)
# self.logger.info("Current Working Directory {}".format(os.getcwd()))
except BaseException:
self.logger.info(
"failed to change to {}".format(
self.scripts_wd))
cmd_args = "{}".format(self.test_dict[self.test]['args'])
command = "./{} {}".format(
self.test_dict[self.test]['command'], cmd_args)
self.logger.info("command: {}".format(command))
self.logger.info("cmd_args {}".format(cmd_args))
# TODO this code is always run since there is a default
# TODO change name to file obj to make more unde
if self.outfile_name is not None:
stdout_log_txt = os.path.join(
self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, self.test))
self.logger.info(
"stdout_log_txt: {}".format(stdout_log_txt))
stdout_log = open(stdout_log_txt, 'a')
stderr_log_txt = os.path.join(
self.log_path, "{}-{}-stderr.txt".format(self.outfile_name, self.test))
self.logger.info(
"stderr_log_txt: {}".format(stderr_log_txt))
# stderr_log = open(stderr_log_txt, 'a')
# need to take into account --raw_line parameters thus need to use shlex.split
# need to preserve command to have correct command syntax
# in command output
command_to_run = command
command_to_run = shlex.split(command_to_run)
self.logger.info(
"running {command_to_run}".format(
command_to_run=command_to_run))
self.test_start_time = str(datetime.datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S")).replace(':', '-')
self.logger.info(
"Test start: {time} Timeout: {timeout}".format(
time=self.test_start_time, timeout=self.test_timeout))
start_time = datetime.datetime.now()
# try:
# process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log,
# universal_newlines=True)
# # if there is a better solution please propose, the
# # TIMEOUT Result is different then FAIL
# try:
# if int(self.test_timeout != 0):
# process.wait(timeout=int(self.test_timeout))
# else:
# process.wait()
# except subprocess.TimeoutExpired:
# process.terminate()
# self.test_result = "TIMEOUT"
# except BaseException:
# print(
# "No such file or directory with command: {}".format(command))
# self.logger.info(
# "No such file or directory with command: {}".format(command))
summary_output = ''
# have stderr go to stdout
summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True)
for line in iter(summary.stdout.readline, ''):
self.logger.info(line)
summary_output += line
try:
if int(self.test_timeout != 0):
summary.wait(timeout=int(self.test_timeout))
else:
summary.wait()
except TimeoutExpired:
summary.terminate
self.test_result = "TIMEOUT"
self.logger.info(summary_output)
stdout_log.write(summary_output)
stdout_log.close()
end_time = datetime.datetime.now()
self.test_end_time = str(datetime.datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S")).replace(':', '-')
self.logger.info(
"Test end time {time}".format(
time=self.test_end_time))
time_delta = end_time - start_time
minutes, seconds = divmod(time_delta.seconds, 60)
hours, minutes = divmod(minutes, 60)
self.duration = "{day}d {hours}h {minutes}m {seconds}s {msec} ms".format(
day=time_delta.days, hours=hours, minutes=minutes, seconds=seconds, msec=time_delta.microseconds)
# If collect meta data is set
meta_data_path = ""
# Will gather data even on a TIMEOUT condition as there is
# some results on longer tests
# TODO use the summary
stdout_log_size = os.path.getsize(stdout_log_txt)
if stdout_log_size > 0:
stdout_log_fd = open(stdout_log_txt)
# "Report Location:::/home/lanforge/html-reports/wifi-capacity-2021-08-17-04-02-56"
#
for line in stdout_log_fd:
if "Report Location" in line:
report_index += 1
if iteration == report_index:
meta_data_path = line.replace('"', '')
meta_data_path = meta_data_path.replace(
'Report Location:::', '')
meta_data_path = meta_data_path.split(
'/')[-1]
meta_data_path = meta_data_path.strip()
meta_data_path = self.report_path + '/' + meta_data_path + '/meta.txt'
break
stdout_log_fd.close()
if meta_data_path != "":
try:
meta_data_fd = open(meta_data_path, 'w+')
meta_data_fd.write(
'$ Generated by Candela Technologies LANforge network testing tool\n')
meta_data_fd.write(
"test_run {test_run}\n".format(
test_run=self.report_path))
meta_data_fd.write(
"file_meta {path}\n".format(
path=meta_data_path))
meta_data_fd.write(
'lanforge_gui_version: {gui_version} \n'.format(
gui_version=self.lanforge_gui_version))
meta_data_fd.write(
'lanforge_server_version: {server_version} \n'.format(
server_version=self.lanforge_server_version))
meta_data_fd.write('$ LANforge command\n')
meta_data_fd.write(
"command {command}\n".format(
command=command))
# split command at test-tag , at rest of string once at
# the actual test-tag value
test_tag = command.split(
'test_tag', maxsplit=1)[-1].split(maxsplit=1)[0]
test_tag = test_tag.replace("'", "")
meta_data_fd.write('$ LANforge test tag\n')
meta_data_fd.write(
"test_tag {test_tag}\n".format(
test_tag=test_tag))
# LANforge information is a list thus [0]
meta_data_fd.write('$ LANforge Information\n')
meta_data_fd.write(
"lanforge_system_node {lanforge_system_node}\n".format(
lanforge_system_node=self.lanforge_system_node_version[0]))
meta_data_fd.write(
"lanforge_kernel_version {lanforge_kernel_version}\n".format(
lanforge_kernel_version=self.lanforge_kernel_version[0]))
meta_data_fd.write(
"lanforge_fedora_version {lanforge_fedora_version}\n".format(
lanforge_fedora_version=self.lanforge_fedora_version[0]))
meta_data_fd.write(
"lanforge_gui_version_full {lanforge_gui_version_full}\n".format(
lanforge_gui_version_full=self.lanforge_gui_version_full))
meta_data_fd.write(
"lanforge_server_version_full {lanforge_server_version_full}\n".format(
lanforge_server_version_full=self.lanforge_server_version_full[0]))
meta_data_fd.close()
except ValueError as err:
self.logger.critical("unable to write meta {meta_data_path} : {msg})".format(meta_data_path=meta_data_path, msg=err))
except BaseException as err:
self.logger.critical("BaseException unable to write meta {meta_data_path} : {msg}".format(meta_data_path=meta_data_path, msg=err))
# Timeout needs to be reported and not overwriten
if self.test_result == "TIMEOUT":
self.logger.info(
"TIMEOUT FAILURE, Check LANforge Radios")
self.test_result = "Time Out"
background = self.background_purple
else:
# stderr_log_size = os.path.getsize(stderr_log_txt)
# if stderr_log_size > 0:
# self.logger.info(
# "File: {} is not empty: {}".format(
# stderr_log_txt, str(stderr_log_size)))
# text = open(stderr_log_txt).read()
# if 'Error' in text:
# self.text_result = "Failure"
# background = self.background_red
# else:
# self.text_result = "Success"
# background = self.background_green
# else:
# self.logger.info(
# "File: {} is empty: {}".format(
# stderr_log_txt, str(stderr_log_size)))
# self.test_result = "Success"
# background = self.background_green
# Check to see if there is an error in stdout_log
# TODO use the test result returned
if stdout_log_size > 0:
text = open(stdout_log_txt).read()
# for 5.4.3 only TestTag was not present
if 'ERROR: Could not find component: TestTag' in text:
self.test_result = "Success"
background = self.background_green
# probe command for test_ip_variable_time.py has
# the word alloc error and erros in it
elif 'alloc error' in text:
self.test_result = "Success"
background = self.background_green
# leave the space in after error to not pick up tx
# errors or rx errors
elif 'ERROR: ' in text:
self.test_result = "Some Tests Failed"
background = self.background_orange
elif 'ERROR: FAILED ' in text:
self.test_result = "Some Tests Failed"
background = self.background_orange
elif 'error ' in text.lower():
if 'passes: zero test results' in text:
self.test_result = "Success"
background = self.background_green
else:
self.test_result = "Test Errors"
background = self.background_red
elif 'tests failed' in text.lower():
self.test_result = "Some Tests Failed"
background = self.background_orange
else:
self.test_result = "Success"
background = self.background_green
else:
# if stdout empty that is a failure also
self.test_result = "Failure"
background = self.background_red
# Total up test, tests success, tests failure, tests
# timeouts
self.tests_run += 1
if self.test_result == "Success":
self.tests_success += 1
elif self.test_result == "Failure":
self.tests_failure += 1
elif self.test_result == "Some Tests Failed":
self.tests_some_failure += 1
elif self.test_result == "Test Errors":
self.tests_failure += 1
elif self.test_result == "TIMEOUT":
self.tests_timeout += 1
if 'lf_qa' in command:
line_list = open(stdout_log_txt).readlines()
for line in line_list:
if 'html report:' in line:
self.qa_report_html = line
print(
"html_report: {report}".format(
report=self.qa_report_html))
break
self.qa_report_html = self.qa_report_html.replace(
'html report: ', '')
# stdout_log_link is used for the email reporting to have
# the corrected path
stdout_log_link = str(stdout_log_txt).replace(
'/home/lanforge', '')
stderr_log_link = str(stderr_log_txt).replace(
'/home/lanforge', '')
if command.find(' ') > 1:
short_cmd = command[0:command.find(' ')]
else:
short_cmd = command
self.html_results += """
<tr><td>""" + str(self.test) + """</td>
<td>""" + str(short_cmd) + """</td>
<td class='TimeFont'>""" + str(self.duration) + """</td>
<td class='DateFont'>""" + str(self.test_start_time) + """</td>
<td class='DateFont'>""" + str(self.test_end_time) + """</td>
<td style=""" + str(background) + """>""" + str(self.test_result) + """
<td><a href=""" + str(stdout_log_link) + """ target=\"_blank\">STDOUT</a></td>"""
if self.test_result == "Failure":
self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
elif self.test_result == "Time Out":
self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
else:
self.html_results += """<td></td>"""
self.html_results += """</tr>"""
# TODO - place copy button at end and selectable , so
# individual sections may be copied
if command != short_cmd:
# Hover and copy button snows up
self.html_results += f"""<tr><td colspan='8' class='scriptdetails'>
<span class='copybtn'>Copy</span>
<tt onclick='copyTextToClipboard(this)'>{command}</tt>
</td></tr>
""".format(command=command)
# TODO - place a point button for not have the copy
# hover, no copy button
'''self.html_results += f"""<tr><td colspan='8' class='scriptdetails'>
<tt onclick='copyTextToClipboard(this)'>{command}</tt>
</td></tr>
""".format(command=command)
'''
# nocopy - example
'''
self.html_results += f"""<tr><td colspan='8' class='scriptdetails'>
<tt>{command}</tt>
</td></tr>
""".format(command=command)
'''
row = [
self.test,
command,
self.test_result,
stdout_log_txt,
stderr_log_txt]
self.csv_results_writer.writerow(row)
self.csv_results_file.flush()
# self.logger.info("row: {}".format(row))
self.logger.info("test: {} executed".format(self.test))
def run_script_test(self):
self.start_html_results()
self.start_csv_results()
@@ -735,499 +1183,33 @@ QA Report Dashboard: lf_qa.py was not run as last script of test suite"""
suite_time=self.suite_start_time))
# Configure Tests
for test in self.test_dict:
if self.test_dict[test]['enabled'] == "FALSE":
self.logger.info("test: {} skipped".format(test))
for self.test in self.test_dict:
if self.test_dict[self.test]['enabled'] == "FALSE":
self.logger.info("test: {} skipped".format(self.test))
# load the default database
elif self.test_dict[test]['enabled'] == "TRUE":
elif self.test_dict[self.test]['enabled'] == "TRUE":
# TODO Place test interations here
if 'iterations' in self.test_dict[test]:
if 'iterations' in self.test_dict[self.test]:
self.logger.info(
"iterations : {}".format(
self.test_dict[test]['iterations']))
self.test_dict[self.test]['iterations']))
self.test_iterations = int(
self.test_dict[test]['iterations'])
self.test_dict[self.test]['iterations'])
else:
self.test_iterations = self.test_iterations_default
iteration = 0
# log may contain multiple runs - this helps put the meta.txt
# in right directory
report_index = 0
self.report_index = 0
for iteration in range(self.test_iterations):
iteration += 1
# The network arguments need to be changed when in a list
for index, args_list_element in enumerate(
self.test_dict[test]['args_list']):
if 'ssid_idx=' in args_list_element:
# print("args_list_element {}".format(args_list_element))
# get ssid_idx used in the test as an index for the
# dictionary
ssid_idx_number = args_list_element.split(
'ssid_idx=')[-1].split()[0]
print(
"ssid_idx_number: {}".format(ssid_idx_number))
# index into the DUT network index
idx = "ssid_idx={}".format(ssid_idx_number)
print("idx: {}".format(idx))
if 'SSID_USED' in args_list_element:
self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
'SSID_USED', self.wireless_network_dict[idx]['SSID_USED'])
if 'SECURITY_USED' in args_list_element:
self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
'SECURITY_USED', self.wireless_network_dict[idx]['SECURITY_USED'])
if 'SSID_PW_USED' in args_list_element:
self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
'SSID_PW_USED', self.wireless_network_dict[idx]['SSID_PW_USED'])
if 'BSSID_TO_USE' in args_list_element:
self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
'BSSID_TO_USE', self.wireless_network_dict[idx]['BSSID_TO_USE'])
# use_ssid_idx is ephemeral and used only for
# variable replacement , remove
tmp_idx = "use_ssid_idx={}".format(ssid_idx_number)
if tmp_idx in args_list_element:
self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
tmp_idx, '')
# leave in for checking the command line arguments
print(
"self.test_dict[test]['args_list']: {}".format(
self.test_dict[test]['args_list']))
# Walk all the args in the args list then construct the
# arguments
if self.test_dict[test]['args'] == "":
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'],
''.join(self.test_dict[test][
'args_list']))
if 'DATABASE_SQLITE' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DATABASE_SQLITE', self.database_sqlite)
if 'HTTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'HTTP_TEST_IP', self.http_test_ip)
if 'FTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'FTP_TEST_IP', self.ftp_test_ip)
if 'TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'TEST_IP', self.test_ip)
if 'LF_MGR_USER' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'LF_MGR_USER', self.lf_mgr_user)
if 'LF_MGR_PASS' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'LF_MGR_PASS', self.lf_mgr_pass)
if 'LF_MGR_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'LF_MGR_IP', self.lf_mgr_ip)
if 'LF_MGR_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'LF_MGR_PORT', self.lf_mgr_port)
# DUT Configuration
if 'USE_DUT_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'USE_DUT_NAME', self.use_dut_name)
if 'DUT_HW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DUT_HW', self.dut_hw)
if 'DUT_SW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DUT_SW', self.dut_sw)
if 'DUT_MODEL' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DUT_MODEL', self.dut_model)
if 'DUT_SN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DUT_SN', self.dut_serial)
if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',
self.upstream_port)
if 'UPSTREAM_ALIAS' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_ALIAS',
self.upstream_alias)
# lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location
# of the reports when the reports are pulled.
if 'REPORT_PATH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'REPORT_PATH', self.report_path)
if 'TEST_SERVER' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'TEST_SERVER', self.test_server)
if 'DUT_SET_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME',
self.dut_set_name)
if 'TEST_RIG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'TEST_RIG', self.test_rig)
# END of command line arg processing
if self.test_dict[test]['args'] == "":
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'],
''.join(self.test_dict[test][
'args_list']))
if 'timeout' in self.test_dict[test]:
self.logger.info(
"timeout : {}".format(
self.test_dict[test]['timeout']))
self.test_timeout = int(
self.test_dict[test]['timeout'])
else:
self.test_timeout = self.test_timeout_default
if 'load_db' in self.test_dict[test]:
self.logger.info(
"load_db : {}".format(
self.test_dict[test]['load_db']))
if str(self.test_dict[test]['load_db']).lower() != "none" and str(
self.test_dict[test]['load_db']).lower() != "skip":
try:
self.load_custom_database(
self.test_dict[test]['load_db'])
except BaseException:
self.logger.info("custom database failed to load check existance and location: {}".format(
self.test_dict[test]['load_db']))
try:
os.chdir(self.scripts_wd)
# self.logger.info("Current Working Directory {}".format(os.getcwd()))
except BaseException:
self.logger.info(
"failed to change to {}".format(
self.scripts_wd))
cmd_args = "{}".format(self.test_dict[test]['args'])
command = "./{} {}".format(
self.test_dict[test]['command'], cmd_args)
self.logger.info("command: {}".format(command))
self.logger.info("cmd_args {}".format(cmd_args))
# TODO this code is always run since there is a default
# TODO change name to file obj to make more unde
if self.outfile_name is not None:
stdout_log_txt = os.path.join(
self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, test))
self.logger.info(
"stdout_log_txt: {}".format(stdout_log_txt))
stdout_log = open(stdout_log_txt, 'a')
stderr_log_txt = os.path.join(
self.log_path, "{}-{}-stderr.txt".format(self.outfile_name, test))
self.logger.info(
"stderr_log_txt: {}".format(stderr_log_txt))
stderr_log = open(stderr_log_txt, 'a')
# need to take into account --raw_line parameters thus need to use shlex.split
# need to preserve command to have correct command syntax
# in command output
command_to_run = command
command_to_run = shlex.split(command_to_run)
self.logger.info(
"running {command_to_run}".format(
command_to_run=command_to_run))
self.test_start_time = str(datetime.datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S")).replace(':', '-')
self.logger.info(
"Test start: {time} Timeout: {timeout}".format(
time=self.test_start_time, timeout=self.test_timeout))
start_time = datetime.datetime.now()
# try:
# process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log,
# universal_newlines=True)
# # if there is a better solution please propose, the
# # TIMEOUT Result is different then FAIL
# try:
# if int(self.test_timeout != 0):
# process.wait(timeout=int(self.test_timeout))
# else:
# process.wait()
# except subprocess.TimeoutExpired:
# process.terminate()
# self.test_result = "TIMEOUT"
#
# except BaseException:
# print(
# "No such file or directory with command: {}".format(command))
# self.logger.info(
# "No such file or directory with command: {}".format(command))
summary_output = ''
# have stderr go to stdout
summary = subprocess.Popen(command_to_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True)
for line in iter(summary.stdout.readline, ''):
self.logger.info(line)
summary_output += line
try:
if int(self.test_timeout != 0):
summary.wait(timeout=int(self.test_timeout))
else:
summary.wait()
except TimeoutExpired:
summary.terminate
self.test_result = "TIMEOUT"
self.logger.info(summary_output)
stdout_log.write(summary_output)
stdout_log.close()
end_time = datetime.datetime.now()
self.test_end_time = str(datetime.datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S")).replace(':', '-')
self.logger.info(
"Test end time {time}".format(
time=self.test_end_time))
time_delta = end_time - start_time
minutes, seconds = divmod(time_delta.seconds, 60)
hours, minutes = divmod(minutes, 60)
self.duration = "{day}d {hours}h {minutes}m {seconds}s {msec} ms".format(
day=time_delta.days, hours=hours, minutes=minutes, seconds=seconds, msec=time_delta.microseconds)
# If collect meta data is set
meta_data_path = ""
# Will gather data even on a TIMEOUT condition as there is
# some results on longer tests
# TODO use the summary
stdout_log_size = os.path.getsize(stdout_log_txt)
if stdout_log_size > 0:
stdout_log_fd = open(stdout_log_txt)
# "Report Location:::/home/lanforge/html-reports/wifi-capacity-2021-08-17-04-02-56"
#
for line in stdout_log_fd:
if "Report Location" in line:
report_index += 1
if iteration == report_index:
meta_data_path = line.replace('"', '')
meta_data_path = meta_data_path.replace(
'Report Location:::', '')
meta_data_path = meta_data_path.split(
'/')[-1]
meta_data_path = meta_data_path.strip()
meta_data_path = self.report_path + '/' + meta_data_path + '/meta.txt'
break
stdout_log_fd.close()
if meta_data_path != "":
try:
meta_data_fd = open(meta_data_path, 'w+')
meta_data_fd.write(
'$ Generated by Candela Technologies LANforge network testing tool\n')
meta_data_fd.write(
"test_run {test_run}\n".format(
test_run=self.report_path))
meta_data_fd.write(
"file_meta {path}\n".format(
path=meta_data_path))
meta_data_fd.write(
'lanforge_gui_version: {gui_version} \n'.format(
gui_version=self.lanforge_gui_version))
meta_data_fd.write(
'lanforge_server_version: {server_version} \n'.format(
server_version=self.lanforge_server_version))
meta_data_fd.write('$ LANforge command\n')
meta_data_fd.write(
"command {command}\n".format(
command=command))
# split command at test-tag , at rest of string once at
# the actual test-tag value
test_tag = command.split(
'test_tag', maxsplit=1)[-1].split(maxsplit=1)[0]
test_tag = test_tag.replace("'", "")
meta_data_fd.write('$ LANforge test tag\n')
meta_data_fd.write(
"test_tag {test_tag}\n".format(
test_tag=test_tag))
# LANforge information is a list thus [0]
meta_data_fd.write('$ LANforge Information\n')
meta_data_fd.write(
"lanforge_system_node {lanforge_system_node}\n".format(
lanforge_system_node=self.lanforge_system_node_version[0]))
meta_data_fd.write(
"lanforge_kernel_version {lanforge_kernel_version}\n".format(
lanforge_kernel_version=self.lanforge_kernel_version[0]))
meta_data_fd.write(
"lanforge_fedora_version {lanforge_fedora_version}\n".format(
lanforge_fedora_version=self.lanforge_fedora_version[0]))
meta_data_fd.write(
"lanforge_gui_version_full {lanforge_gui_version_full}\n".format(
lanforge_gui_version_full=self.lanforge_gui_version_full))
meta_data_fd.write(
"lanforge_server_version_full {lanforge_server_version_full}\n".format(
lanforge_server_version_full=self.lanforge_server_version_full[0]))
meta_data_fd.close()
except ValueError as err:
self.logger.critical("unable to write meta {meta_data_path} : {msg})".format(meta_data_path=meta_data_path, msg=err))
except BaseException as err:
self.logger.critical("BaseException unable to write meta {meta_data_path} : {msg}".format(meta_data_path=meta_data_path, msg=err))
# Timeout needs to be reported and not overwriten
if self.test_result == "TIMEOUT":
self.logger.info(
"TIMEOUT FAILURE, Check LANforge Radios")
self.test_result = "Time Out"
background = self.background_purple
else:
# stderr_log_size = os.path.getsize(stderr_log_txt)
# if stderr_log_size > 0:
# self.logger.info(
# "File: {} is not empty: {}".format(
# stderr_log_txt, str(stderr_log_size)))
# text = open(stderr_log_txt).read()
# if 'Error' in text:
# self.text_result = "Failure"
# background = self.background_red
# else:
# self.text_result = "Success"
# background = self.background_green
# else:
# self.logger.info(
# "File: {} is empty: {}".format(
# stderr_log_txt, str(stderr_log_size)))
# self.test_result = "Success"
# background = self.background_green
# Check to see if there is an error in stdout_log
# TODO use the test result returned
if stdout_log_size > 0:
text = open(stdout_log_txt).read()
# for 5.4.3 only TestTag was not present
if 'ERROR: Could not find component: TestTag' in text:
self.test_result = "Success"
background = self.background_green
# probe command for test_ip_variable_time.py has
# the word alloc error and erros in it
elif 'alloc error' in text:
self.test_result = "Success"
background = self.background_green
# leave the space in after error to not pick up tx
# errors or rx errors
elif 'ERROR: ' in text:
self.test_result = "Some Tests Failed"
background = self.background_orange
elif 'ERROR: FAILED ' in text:
self.test_result = "Some Tests Failed"
background = self.background_orange
elif 'error ' in text.lower():
if 'passes: zero test results' in text:
self.test_result = "Success"
background = self.background_green
else:
self.test_result = "Test Errors"
background = self.background_red
elif 'tests failed' in text.lower():
self.test_result = "Some Tests Failed"
background = self.background_orange
else:
self.test_result = "Success"
background = self.background_green
else:
# if stdout empty that is a failure also
self.test_result = "Failure"
background = self.background_red
# Total up test, tests success, tests failure, tests
# timeouts
self.tests_run += 1
if self.test_result == "Success":
self.tests_success += 1
elif self.test_result == "Failure":
self.tests_failure += 1
elif self.test_result == "Some Tests Failed":
self.tests_some_failure += 1
elif self.test_result == "Test Errors":
self.tests_failure += 1
elif self.test_result == "TIMEOUT":
self.tests_timeout += 1
if 'lf_qa' in command:
line_list = open(stdout_log_txt).readlines()
for line in line_list:
if 'html report:' in line:
self.qa_report_html = line
print(
"html_report: {report}".format(
report=self.qa_report_html))
break
self.qa_report_html = self.qa_report_html.replace(
'html report: ', '')
# stdout_log_link is used for the email reporting to have
# the corrected path
stdout_log_link = str(stdout_log_txt).replace(
'/home/lanforge', '')
stderr_log_link = str(stderr_log_txt).replace(
'/home/lanforge', '')
if command.find(' ') > 1:
short_cmd = command[0:command.find(' ')]
else:
short_cmd = command
self.html_results += """
<tr><td>""" + str(test) + """</td>
<td>""" + str(short_cmd) + """</td>
<td class='TimeFont'>""" + str(self.duration) + """</td>
<td class='DateFont'>""" + str(self.test_start_time) + """</td>
<td class='DateFont'>""" + str(self.test_end_time) + """</td>
<td style=""" + str(background) + """>""" + str(self.test_result) + """
<td><a href=""" + str(stdout_log_link) + """ target=\"_blank\">STDOUT</a></td>"""
if self.test_result == "Failure":
self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
elif self.test_result == "Time Out":
self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
else:
self.html_results += """<td></td>"""
self.html_results += """</tr>"""
# TODO - place copy button at end and selectable , so
# individual sections may be copied
if command != short_cmd:
# Hover and copy button snows up
self.html_results += f"""<tr><td colspan='8' class='scriptdetails'>
<span class='copybtn'>Copy</span>
<tt onclick='copyTextToClipboard(this)'>{command}</tt>
</td></tr>
""".format(command=command)
# TODO - place a point button for not have the copy
# hover, no copy button
'''self.html_results += f"""<tr><td colspan='8' class='scriptdetails'>
<tt onclick='copyTextToClipboard(this)'>{command}</tt>
</td></tr>
""".format(command=command)
'''
# nocopy - example
'''
self.html_results += f"""<tr><td colspan='8' class='scriptdetails'>
<tt>{command}</tt>
</td></tr>
""".format(command=command)
'''
row = [
test,
command,
self.test_result,
stdout_log_txt,
stderr_log_txt]
self.csv_results_writer.writerow(row)
self.csv_results_file.flush()
# self.logger.info("row: {}".format(row))
self.logger.info("test: {} executed".format(test))
self.run_script()
else:
self.logger.info(
"enable value {} for test: {} ".format(self.test_dict[test]['enabled'], test))
"enable value {} for test: {} ".format(self.test_dict[self.test]['enabled'], self.test))
suite_end_time = datetime.datetime.now()
self.suite_end_time = str(datetime.datetime.now().strftime(
@@ -1401,7 +1383,7 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(logfile, "w")
file_handler = logging.FileHandler(logfile, "w+")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# allows to logging to file and stdout