lf_check.py : pep8 and white space

Signed-off-by: Chuck SmileyRekiere <chuck.smileyrekiere@candelatech.com>
This commit is contained in:
Chuck SmileyRekiere
2021-11-06 12:18:06 -06:00
parent 2899eddb07
commit ae5b49d4d5

View File

@@ -123,7 +123,8 @@ sys.path.insert(0, parent_dir_path)
FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s'
# lf_check class contains verificaiton configuration and ocastrates the testing.
# lf_check class contains verificaiton configuration and ocastrates the
# testing.
class lf_check():
def __init__(self,
_json_rig,
@@ -153,7 +154,8 @@ class lf_check():
self.outfile = _outfile
self.outfile_name = _outfile_name
self.test_result = "Failure"
self.results_col_titles = ["Test", "Command", "Result", "STDOUT", "STDERR"]
self.results_col_titles = [
"Test", "Command", "Result", "STDOUT", "STDERR"]
self.html_results = ""
self.background_green = "background-color:green"
self.background_red = "background-color:red"
@@ -193,7 +195,9 @@ class lf_check():
# section DUT
# dut selection
self.dut_set_name = 'DUT_NAME ASUSRT-AX88U' # note the name will be set as --set DUT_NAME ASUSRT-AX88U, this is not dut_name (see above)
# note the name will be set as --set DUT_NAME ASUSRT-AX88U, this is not
# dut_name (see above)
self.dut_set_name = 'DUT_NAME ASUSRT-AX88U'
self.use_dut_name = "DUT_NAME_NA" # "ASUSRT-AX88U" note this is not dut_set_name
self.dut_hw = "DUT_HW_NA"
self.dut_sw = "DUT_SW_NA"
@@ -234,10 +238,13 @@ class lf_check():
def get_scripts_git_sha(self):
# get git sha
process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE)
process = subprocess.Popen(
["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE)
(commit_hash, err) = process.communicate()
exit_code = process.wait()
print("get_scripts_get_sha exit_code: {exit_code}".format(exit_code=exit_code))
print(
"get_scripts_get_sha exit_code: {exit_code}".format(
exit_code=exit_code))
scripts_git_sha = commit_hash.decode('utf-8', 'ignore')
return scripts_git_sha
@@ -254,10 +261,17 @@ class lf_check():
# curl --user "lanforge:lanforge" -H 'Accept: application/json'
# http://192.168.100.116:8080/radiostatus/all | json_pp , where --user
# "USERNAME:PASSWORD"
request_command = 'http://{lfmgr}:{port}/radiostatus/all'.format(lfmgr=self.lf_mgr_ip, port=self.lf_mgr_port)
request = requests.get(request_command, auth=(self.lf_mgr_user, self.lf_mgr_pass))
print("radio request command: {request_command}".format(request_command=request_command))
print("radio request status_code {status}".format(status=request.status_code))
request_command = 'http://{lfmgr}:{port}/radiostatus/all'.format(
lfmgr=self.lf_mgr_ip, port=self.lf_mgr_port)
request = requests.get(
request_command, auth=(
self.lf_mgr_user, self.lf_mgr_pass))
print(
"radio request command: {request_command}".format(
request_command=request_command))
print(
"radio request status_code {status}".format(
status=request.status_code))
lanforge_radio_json = request.json()
print("radio request.json: {json}".format(json=lanforge_radio_json))
lanforge_radio_text = request.text
@@ -265,69 +279,88 @@ class lf_check():
return lanforge_radio_json, lanforge_radio_text
def get_lanforge_system_node_version(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
# creating shh client object we use this object to connect to router
ssh = paramiko.SSHClient()
# automatically adds the missing host key
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('uname -n')
self.lanforge_system_node_version = stdout.readlines()
self.lanforge_system_node_version = [line.replace('\n', '') for line in self.lanforge_system_node_version]
self.lanforge_system_node_version = [line.replace(
'\n', '') for line in self.lanforge_system_node_version]
ssh.close()
time.sleep(1)
return self.lanforge_system_node_version
def get_lanforge_kernel_version(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
# creating shh client object we use this object to connect to router
ssh = paramiko.SSHClient()
# automatically adds the missing host key
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('uname -r')
self.lanforge_kernel_version = stdout.readlines()
self.lanforge_kernel_version = [line.replace('\n', '') for line in self.lanforge_kernel_version]
self.lanforge_kernel_version = [line.replace(
'\n', '') for line in self.lanforge_kernel_version]
ssh.close()
time.sleep(1)
return self.lanforge_kernel_version
def get_lanforge_server_version(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
# creating shh client object we use this object to connect to router
ssh = paramiko.SSHClient()
# automatically adds the missing host key
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('./btserver --version | grep Version')
stdin, stdout, stderr = ssh.exec_command(
'./btserver --version | grep Version')
self.lanforge_server_version_full = stdout.readlines()
self.lanforge_server_version_full = [line.replace('\n', '') for line in self.lanforge_server_version_full]
print("lanforge_server_version_full: {lanforge_server_version_full}".format(lanforge_server_version_full=self.lanforge_server_version_full))
self.lanforge_server_version = self.lanforge_server_version_full[0].split('Version:', maxsplit=1)[-1].split(maxsplit=1)[0]
self.lanforge_server_version_full = [line.replace(
'\n', '') for line in self.lanforge_server_version_full]
print("lanforge_server_version_full: {lanforge_server_version_full}".format(
lanforge_server_version_full=self.lanforge_server_version_full))
self.lanforge_server_version = self.lanforge_server_version_full[0].split(
'Version:', maxsplit=1)[-1].split(maxsplit=1)[0]
self.lanforge_server_version = self.lanforge_server_version.strip()
print("lanforge_server_version: {lanforge_server_version}".format(lanforge_server_version=self.lanforge_server_version))
print("lanforge_server_version: {lanforge_server_version}".format(
lanforge_server_version=self.lanforge_server_version))
ssh.close()
time.sleep(1)
return self.lanforge_server_version_full
def get_lanforge_gui_version(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
# creating shh client object we use this object to connect to router
ssh = paramiko.SSHClient()
# automatically adds the missing host key
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command(
'curl -H "Accept: application/json" http://{lanforge_ip}:8080 | json_pp | grep -A 7 "VersionInfo"'.format(lanforge_ip=self.lf_mgr_ip))
self.lanforge_gui_version_full = stdout.readlines()
# print("lanforge_gui_version_full pre: {lanforge_gui_version_full}".format(lanforge_gui_version_full=self.lanforge_gui_version_full))
self.lanforge_gui_version_full = [line.replace('\n', '') for line in self.lanforge_gui_version_full]
self.lanforge_gui_version_full = [line.replace(
'\n', '') for line in self.lanforge_gui_version_full]
# print("lanforge_gui_version_full: {lanforge_gui_version_full}".format(lanforge_gui_version_full=self.lanforge_gui_version_full))
for element in self.lanforge_gui_version_full:
if "BuildVersion" in element:
ver_str = str(element)
self.lanforge_gui_version = ver_str.split(':', maxsplit=1)[-1].replace(',', '')
self.lanforge_gui_version = ver_str.split(
':', maxsplit=1)[-1].replace(',', '')
self.lanforge_gui_version = self.lanforge_gui_version.strip().replace('"', '')
print("BuildVersion {}".format(self.lanforge_gui_version))
if "BuildDate" in element:
gui_str = str(element)
self.lanforge_gui_build_date = gui_str.split(':', maxsplit=1)[-1].replace(',', '')
self.lanforge_gui_build_date = gui_str.split(
':', maxsplit=1)[-1].replace(',', '')
print("BuildDate {}".format(self.lanforge_gui_build_date))
if "GitVersion" in element:
git_sha_str = str(element)
self.lanforge_gui_git_sha = git_sha_str.split(':', maxsplit=1)[-1].replace(',', '')
self.lanforge_gui_git_sha = git_sha_str.split(
':', maxsplit=1)[-1].replace(',', '')
print("GitVersion {}".format(self.lanforge_gui_git_sha))
ssh.close()
@@ -353,20 +386,21 @@ class lf_check():
ip = socket.gethostbyname(hostname)
# a hostname lacking dots by definition lacks a domain name
# this is not useful for hyperlinks outside the known domain, so an IP address should be preferred
# this is not useful for hyperlinks outside the known domain, so an IP
# address should be preferred
if hostname.find('.') < 1:
hostname = ip
message_txt = ""
if (self.email_txt != ""):
message_txt = """{email_txt} lanforge target {lf_mgr_ip}
Results from {hostname}:
Results from {hostname}:
Suite: {suite}
Database: {db}
http://{hostname}/{report}
""".format(email_txt=self.email_txt, lf_mgr_ip=self.lf_mgr_ip, suite=self.test_suite, db=self.database_sqlite, hostname=hostname, report=report_url)
else:
message_txt = """Results from {hostname}:
message_txt = """Results from {hostname}:
Suite: {suite}
Database: {db}
http://{hostname}/{report}""".format(hostname=hostname, suite=self.test_suite, db=self.database_sqlite, report=report_url)
@@ -379,14 +413,15 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
if (self.email_title_txt != ""):
mail_subject = "{email} [{hostname}] {suite} {date}".format(email=self.email_title_txt, hostname=hostname,
suite=self.test_suite, db=self.database_sqlite, date=datetime.datetime.now())
suite=self.test_suite, db=self.database_sqlite, date=datetime.datetime.now())
else:
mail_subject = "Regression Test [{hostname}] {suite} {date}".format(hostname=hostname,
suite = self.test_suite, db = self.database_sqlite, date=datetime.datetime.now())
mail_subject = "Regression Test [{hostname}] {suite} {date}".format(hostname=hostname,
suite=self.test_suite, db=self.database_sqlite, date=datetime.datetime.now())
try:
if self.production_run:
msg = message_txt.format(ip=ip)
# for postfix from command line echo "My message" | mail -s subject user@candelatech.com
# for postfix from command line echo "My message" | mail -s
# subject user@candelatech.com
command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format(
message=msg,
subject=mail_subject,
@@ -410,8 +445,10 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
def start_csv_results(self):
print("self.csv_results")
self.csv_results_file = open(self.csv_results, "w")
self.csv_results_writer = csv.writer(self.csv_results_file, delimiter=",")
self.csv_results_column_headers = ['Test', 'Command', 'Result', 'STDOUT', 'STDERR']
self.csv_results_writer = csv.writer(
self.csv_results_file, delimiter=",")
self.csv_results_column_headers = [
'Test', 'Command', 'Result', 'STDOUT', 'STDERR']
self.csv_results_writer.writerow(self.csv_results_column_headers)
self.csv_results_file.flush()
@@ -458,8 +495,11 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
# self.logger.info("test_rig_parameters {}".format(self.json_rig["test_rig_parameters"]))
self.read_test_rig_parameters()
else:
self.logger.info("EXITING test_rig_parameters not in json {}".format(self.json_rig))
self.logger.info("EXITING ERROR test_rig_parameters not in rig json")
self.logger.info(
"EXITING test_rig_parameters not in json {}".format(
self.json_rig))
self.logger.info(
"EXITING ERROR test_rig_parameters not in rig json")
exit(1)
# read dut configuration
@@ -468,24 +508,34 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
self.logger.info("json: read test_dut")
self.read_dut_parameters()
else:
self.logger.info("EXITING test_dut not in json {}".format(self.json_dut))
self.logger.info(
"EXITING test_dut not in json {}".format(
self.json_dut))
self.logger.info("EXITING ERROR test_dut not in dut json {}")
exit(1)
# Top Level for reading the tests to run
def read_json_test(self):
if "test_suites" in self.json_test:
self.logger.info("json: read test_suites looking for: {}".format(self.test_suite))
self.logger.info(
"json: read test_suites looking for: {}".format(
self.test_suite))
# self.logger.info("test_suites {}".format(self.json_test["test_suites"]))
if self.test_suite in self.json_test["test_suites"]:
self.test_dict = self.json_test["test_suites"][self.test_suite]
# self.logger.info("self.test_dict {}".format(self.test_dict))
else:
self.logger.info("EXITING test_suite {} Not Present in json test_suites: {}".format(self.test_suite, self.json_test["test_suites"]))
self.logger.info("EXITING ERROR test_suite {} Not Present in json test_suites".format(self.test_suite))
self.logger.info(
"EXITING test_suite {} Not Present in json test_suites: {}".format(
self.test_suite, self.json_test["test_suites"]))
self.logger.info(
"EXITING ERROR test_suite {} Not Present in json test_suites".format(
self.test_suite))
exit(1)
else:
self.logger.info("EXITING test_suites not in json {}".format(self.json_test))
self.logger.info(
"EXITING test_suites not in json {}".format(
self.json_test))
self.logger.info("EXITING ERROR test_suites not in json test")
exit(1)
@@ -499,7 +549,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
if "DATABASE_SQLITE" in self.json_rig["test_rig_parameters"]:
self.database_sqlite = self.json_rig["test_rig_parameters"]["DATABASE_SQLITE"]
else:
self.logger.info("DATABASE_SQLITE not in test_rig_parameters json")
self.logger.info(
"DATABASE_SQLITE not in test_rig_parameters json")
else:
self.database_sqlite = self.db_override
if "LF_MGR_IP" in self.json_rig["test_rig_parameters"]:
@@ -531,7 +582,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
if "EMAIL_LIST_PRODUCTION" in self.json_rig["test_rig_parameters"]:
self.email_list_production = self.json_rig["test_rig_parameters"]["EMAIL_LIST_PRODUCTION"]
else:
self.logger.info("EMAIL_LIST_PRODUCTION not in test_rig_parameters json")
self.logger.info(
"EMAIL_LIST_PRODUCTION not in test_rig_parameters json")
exit(1)
if "EMAIL_LIST_TEST" in self.json_rig["test_rig_parameters"]:
self.email_list_test = self.json_rig["test_rig_parameters"]["EMAIL_LIST_TEST"]
@@ -555,7 +607,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
self.dut_set_name = self.json_dut["test_dut"]["DUT_SET_NAME"]
else:
self.logger.info("DUT_SET_NAME not in test_dut json")
# dut name will set a chamberview scenerio for a DUT which can be selected with dut_set_name
# dut name will set a chamberview scenerio for a DUT which can be
# selected with dut_set_name
if "USE_DUT_NAME" in self.json_dut["test_dut"]:
self.use_dut_name = self.json_dut["test_dut"]["USE_DUT_NAME"]
else:
@@ -583,12 +636,14 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
if "wireless_network_dict" in self.json_dut["test_dut"]:
self.wireless_network_dict = self.json_dut["test_dut"]["wireless_network_dict"]
self.logger.info("self.wireless_network_dict {}".format(self.wireless_network_dict))
self.logger.info(
"self.wireless_network_dict {}".format(
self.wireless_network_dict))
else:
self.logger.info("wireless_network_dict not in test_dut json")
exit(1)
# custom will accept --load FACTORY_DFLT and --load BLANK
# custom will accept --load FACTORY_DFLT and --load BLANK
# TODO make a list
def load_custom_database(self, custom_db):
try:
@@ -596,18 +651,23 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
except BaseException:
self.logger.info("failed to change to {}".format(self.scripts_wd))
# WARNING do not simplify the following constructed command
#command = "./{} {} {} {}".format("scenario.py", "--mgr {mgr}"\
# WARNING do not simplify the following constructed command
# command = "./{} {} {} {}".format("scenario.py", "--mgr {mgr}"\
# .format(mgr=self.lf_mgr_ip),"--load {db}".format(db=custom_db),"--action {action}".format(action="overwrite"))
command = "./{cmd} --mgr {mgr} --load {db} --action {action}".format(cmd="scenario.py",mgr=self.lf_mgr_ip,db=custom_db,action="overwrite")
command = "./{cmd} --mgr {mgr} --load {db} --action {action}".format(
cmd="scenario.py", mgr=self.lf_mgr_ip, db=custom_db, action="overwrite")
print("command: {command}".format(command=command))
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
# wait for the process to terminate
out, err = process.communicate()
errcode = process.returncode
print("load_custom_database out: {out} errcode: {errcode} err: {err}".format(out=out,errcode=errcode,err=err))
print(
"load_custom_database out: {out} errcode: {errcode} err: {err}".format(
out=out,
errcode=errcode,
err=err))
# DO NOT REMOVE 15 second sleep.
# After every DB load, the load changes are applied, and part of the apply is to re-build
# The underlying netsmith objects
@@ -624,24 +684,34 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
elif self.test_dict[test]['enabled'] == "TRUE":
# TODO Place test interations here
if 'iterations' in self.test_dict[test]:
self.logger.info("iterations : {}".format(self.test_dict[test]['iterations']))
self.test_iterations = int(self.test_dict[test]['iterations'])
self.logger.info(
"iterations : {}".format(
self.test_dict[test]['iterations']))
self.test_iterations = int(
self.test_dict[test]['iterations'])
else:
self.test_iterations = self.test_iterations_default
iteration = 0
report_index = 0 # log may contain multiple runs - this helps put the meta.txt in right directory
# log may contain multiple runs - this helps put the meta.txt
# in right directory
report_index = 0
for iteration in range(self.test_iterations):
iteration += 1
# The network arguments need to be changed when in a list
for index, args_list_element in enumerate(self.test_dict[test]['args_list']):
for index, args_list_element in enumerate(
self.test_dict[test]['args_list']):
if 'ssid_idx=' in args_list_element:
# print("args_list_element {}".format(args_list_element))
# get ssid_idx used in the test as an index for the dictionary
ssid_idx_number = args_list_element.split('ssid_idx=')[-1].split()[0]
print("ssid_idx_number: {}".format(ssid_idx_number))
idx = "ssid_idx={}".format(ssid_idx_number) # index into the DUT network index
# get ssid_idx used in the test as an index for the
# dictionary
ssid_idx_number = args_list_element.split(
'ssid_idx=')[-1].split()[0]
print(
"ssid_idx_number: {}".format(ssid_idx_number))
# index into the DUT network index
idx = "ssid_idx={}".format(ssid_idx_number)
print("idx: {}".format(idx))
if 'SSID_USED' in args_list_element:
self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
@@ -656,49 +726,67 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
'BSSID', self.wireless_network_dict[idx]['BSSID'])
# use_ssid_idx is ephemeral and used only for variable replacement , remove
# use_ssid_idx is ephemeral and used only for
# variable replacement , remove
tmp_idx = "use_ssid_idx={}".format(ssid_idx_number)
if tmp_idx in args_list_element:
self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(tmp_idx, '')
self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(
tmp_idx, '')
# leave in for checking the command line arguments
print("self.test_dict[test]['args_list']: {}".format(self.test_dict[test]['args_list']))
print(
"self.test_dict[test]['args_list']: {}".format(
self.test_dict[test]['args_list']))
# Walk all the args in the args list then construct the arguments
# Walk all the args in the args list then construct the
# arguments
if self.test_dict[test]['args'] == "":
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'],
''.join(self.test_dict[test][
'args_list']))
if 'DATABASE_SQLITE' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_SQLITE', self.database_sqlite)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DATABASE_SQLITE', self.database_sqlite)
if 'HTTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP', self.http_test_ip)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'HTTP_TEST_IP', self.http_test_ip)
if 'FTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP', self.ftp_test_ip)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'FTP_TEST_IP', self.ftp_test_ip)
if 'TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP', self.test_ip)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'TEST_IP', self.test_ip)
if 'LF_MGR_USER' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_USER', self.lf_mgr_user)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'LF_MGR_USER', self.lf_mgr_user)
if 'LF_MGR_PASS' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PASS', self.lf_mgr_pass)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'LF_MGR_PASS', self.lf_mgr_pass)
if 'LF_MGR_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_IP', self.lf_mgr_ip)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'LF_MGR_IP', self.lf_mgr_ip)
if 'LF_MGR_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PORT', self.lf_mgr_port)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'LF_MGR_PORT', self.lf_mgr_port)
# DUT Configuration
if 'USE_DUT_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('USE_DUT_NAME', self.use_dut_name)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'USE_DUT_NAME', self.use_dut_name)
if 'DUT_HW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_HW', self.dut_hw)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DUT_HW', self.dut_hw)
if 'DUT_SW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SW', self.dut_sw)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DUT_SW', self.dut_sw)
if 'DUT_MODEL' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_MODEL', self.dut_model)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DUT_MODEL', self.dut_model)
if 'DUT_SERIAL' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SERIAL', self.dut_serial)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'DUT_SERIAL', self.dut_serial)
if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',
@@ -706,13 +794,15 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
# lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location
# of the reports when the reports are pulled.
if 'REPORT_PATH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('REPORT_PATH', self.report_path)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'REPORT_PATH', self.report_path)
if 'DUT_SET_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME',
self.dut_set_name)
if 'TEST_RIG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_RIG', self.test_rig)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
'TEST_RIG', self.test_rig)
# END of command line arg processing
if self.test_dict[test]['args'] == "":
@@ -721,17 +811,23 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
'args_list']))
if 'timeout' in self.test_dict[test]:
self.logger.info("timeout : {}".format(self.test_dict[test]['timeout']))
self.test_timeout = int(self.test_dict[test]['timeout'])
self.logger.info(
"timeout : {}".format(
self.test_dict[test]['timeout']))
self.test_timeout = int(
self.test_dict[test]['timeout'])
else:
self.test_timeout = self.test_timeout_default
if 'load_db' in self.test_dict[test]:
self.logger.info("load_db : {}".format(self.test_dict[test]['load_db']))
self.logger.info(
"load_db : {}".format(
self.test_dict[test]['load_db']))
if str(self.test_dict[test]['load_db']).lower() != "none" and str(
self.test_dict[test]['load_db']).lower() != "skip":
try:
self.load_custom_database(self.test_dict[test]['load_db'])
self.load_custom_database(
self.test_dict[test]['load_db'])
except BaseException:
self.logger.info("custom database failed to load check existance and location: {}".format(
self.test_dict[test]['load_db']))
@@ -740,32 +836,46 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
os.chdir(self.scripts_wd)
# self.logger.info("Current Working Directory {}".format(os.getcwd()))
except BaseException:
self.logger.info("failed to change to {}".format(self.scripts_wd))
self.logger.info(
"failed to change to {}".format(
self.scripts_wd))
cmd_args = "{}".format(self.test_dict[test]['args'])
command = "./{} {}".format(self.test_dict[test]['command'], cmd_args)
command = "./{} {}".format(
self.test_dict[test]['command'], cmd_args)
self.logger.info("command: {}".format(command))
self.logger.info("cmd_args {}".format(cmd_args))
if self.outfile_name is not None:
stdout_log_txt = os.path.join(self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, test))
self.logger.info("stdout_log_txt: {}".format(stdout_log_txt))
stdout_log_txt = os.path.join(
self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, test))
self.logger.info(
"stdout_log_txt: {}".format(stdout_log_txt))
stdout_log = open(stdout_log_txt, 'a')
stderr_log_txt = os.path.join(self.log_path, "{}-{}-stderr.txt".format(self.outfile_name, test))
self.logger.info("stderr_log_txt: {}".format(stderr_log_txt))
stderr_log_txt = os.path.join(
self.log_path, "{}-{}-stderr.txt".format(self.outfile_name, test))
self.logger.info(
"stderr_log_txt: {}".format(stderr_log_txt))
stderr_log = open(stderr_log_txt, 'a')
# need to take into account --raw_line parameters thus need to use shlex.split
# need to preserve command to have correct command syntax in command output
# need to preserve command to have correct command syntax
# in command output
command_to_run = command
command_to_run = shlex.split(command_to_run)
print("running {command_to_run}".format(command_to_run=command_to_run))
self.test_start_time = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")).replace(':', '-')
print("Test start: {time}".format(time=self.test_start_time))
print(
"running {command_to_run}".format(
command_to_run=command_to_run))
self.test_start_time = str(datetime.datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S")).replace(':', '-')
print(
"Test start: {time}".format(
time=self.test_start_time))
start_time = datetime.datetime.now()
try:
process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log,
universal_newlines=True)
# if there is a better solution please propose, the TIMEOUT Result is different then FAIL
# if there is a better solution please propose, the
# TIMEOUT Result is different then FAIL
try:
if int(self.test_timeout != 0):
process.wait(timeout=int(self.test_timeout))
@@ -776,12 +886,17 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
self.test_result = "TIMEOUT"
except BaseException:
print("No such file or directory with command: {}".format(command))
self.logger.info("No such file or directory with command: {}".format(command))
print(
"No such file or directory with command: {}".format(command))
self.logger.info(
"No such file or directory with command: {}".format(command))
end_time = datetime.datetime.now()
self.test_end_time = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")).replace(':', '-')
print("Test end time {time}".format(time=self.test_end_time))
self.test_end_time = str(datetime.datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S")).replace(':', '-')
print(
"Test end time {time}".format(
time=self.test_end_time))
time_delta = end_time - start_time
self.duration = "{day}d {seconds}s {msec} ms".format(
@@ -789,38 +904,56 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
# If collect meta data is set
meta_data_path = ""
# Will gather data even on a TIMEOUT condition as there is some results on longer tests
# Will gather data even on a TIMEOUT condition as there is
# some results on longer tests
stdout_log_size = os.path.getsize(stdout_log_txt)
if stdout_log_size > 0:
stdout_log_fd = open(stdout_log_txt)
# "Report Location:::/home/lanforge/html-reports/wifi-capacity-2021-08-17-04-02-56"
#
for line in stdout_log_fd:
if "Report Location" in line:
report_index += 1
if iteration == report_index:
meta_data_path = line.replace('"', '')
meta_data_path = meta_data_path.replace('Report Location:::', '')
meta_data_path = meta_data_path.split('/')[-1]
meta_data_path = meta_data_path.replace(
'Report Location:::', '')
meta_data_path = meta_data_path.split(
'/')[-1]
meta_data_path = meta_data_path.strip()
meta_data_path = self.report_path + '/' + meta_data_path + '/meta.txt'
break
stdout_log_fd.close()
if meta_data_path != "":
meta_data_fd = open(meta_data_path, 'w+')
meta_data_fd.write('$ Generated by Candela Technologies LANforge network testing tool\n')
meta_data_fd.write("test_run {test_run}\n".format(test_run=self.report_path))
meta_data_fd.write("file_meta {path}\n".format(path=meta_data_path))
meta_data_fd.write('lanforge_gui_version: {gui_version} \n'.format(gui_version=self.lanforge_gui_version))
meta_data_fd.write('lanforge_server_version: {server_version} \n'.format(server_version=self.lanforge_server_version))
meta_data_fd.write(
'$ Generated by Candela Technologies LANforge network testing tool\n')
meta_data_fd.write(
"test_run {test_run}\n".format(
test_run=self.report_path))
meta_data_fd.write(
"file_meta {path}\n".format(
path=meta_data_path))
meta_data_fd.write(
'lanforge_gui_version: {gui_version} \n'.format(
gui_version=self.lanforge_gui_version))
meta_data_fd.write(
'lanforge_server_version: {server_version} \n'.format(
server_version=self.lanforge_server_version))
meta_data_fd.write('$ LANforge command\n')
meta_data_fd.write("command {command}\n".format(command=command))
# split command at test-tag , at rest of string once at the actual test-tag value
test_tag = command.split('test_tag', maxsplit=1)[-1].split(maxsplit=1)[0]
meta_data_fd.write(
"command {command}\n".format(
command=command))
# split command at test-tag , at rest of string once at
# the actual test-tag value
test_tag = command.split(
'test_tag', maxsplit=1)[-1].split(maxsplit=1)[0]
test_tag = test_tag.replace("'", "")
meta_data_fd.write('$ LANforge test tag\n')
meta_data_fd.write("test_tag {test_tag}\n".format(test_tag=test_tag))
meta_data_fd.write(
"test_tag {test_tag}\n".format(
test_tag=test_tag))
# LANforge information is a list thus [0]
meta_data_fd.write('$ LANforge Information\n')
meta_data_fd.write(
@@ -839,7 +972,9 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
stderr_log_size = os.path.getsize(stderr_log_txt)
if stderr_log_size > 0:
self.logger.info("File: {} is not empty: {}".format(stderr_log_txt, str(stderr_log_size)))
self.logger.info(
"File: {} is not empty: {}".format(
stderr_log_txt, str(stderr_log_size)))
text = open(stderr_log_txt).read()
if 'Error' in text:
self.text_result = "Failure"
@@ -848,7 +983,9 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
self.text_result = "Success"
background = self.background_green
else:
self.logger.info("File: {} is empty: {}".format(stderr_log_txt, str(stderr_log_size)))
self.logger.info(
"File: {} is empty: {}".format(
stderr_log_txt, str(stderr_log_size)))
self.test_result = "Success"
background = self.background_green
@@ -859,7 +996,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
if 'ERROR: Could not find component: TestTag' in text:
self.test_result = "Success"
background = self.background_green
# leave the space in after error to not pick up tx errors or rx errors
# leave the space in after error to not pick up tx
# errors or rx errors
elif 'error ' in text.lower():
self.test_result = "Test Fail"
background = self.background_orange
@@ -873,7 +1011,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
# if there was a
if self.test_result == "TIMEOUT":
self.logger.info("TIMEOUT FAILURE, Check LANforge Radios")
self.logger.info(
"TIMEOUT FAILURE, Check LANforge Radios")
self.test_result = "Time Out"
background = self.background_purple
@@ -882,14 +1021,20 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
for line in line_list:
if 'html report:' in line:
self.qa_report_html = line
print("html_report: {report}".format(report=self.qa_report_html))
print(
"html_report: {report}".format(
report=self.qa_report_html))
break
self.qa_report_html = self.qa_report_html.replace('html report: ', '')
self.qa_report_html = self.qa_report_html.replace(
'html report: ', '')
# stdout_log_link is used for the email reporting to have the corrected path
stdout_log_link = str(stdout_log_txt).replace('/home/lanforge', '')
stderr_log_link = str(stderr_log_txt).replace('/home/lanforge', '')
# stdout_log_link is used for the email reporting to have
# the corrected path
stdout_log_link = str(stdout_log_txt).replace(
'/home/lanforge', '')
stderr_log_link = str(stderr_log_txt).replace(
'/home/lanforge', '')
if command.find(' ') > 1:
short_cmd = command[0:command.find(' ')]
else:
@@ -913,7 +1058,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
self.html_results += """<td></td>"""
self.html_results += """</tr>"""
# TODO - plase copy button at end and selectable , so individual sections may be copied
# TODO - plase copy button at end and selectable , so
# individual sections may be copied
if command != short_cmd:
'''self.html_results += f"""<tr><td colspan='8' class='scriptdetails'>
<span class='copybtn'>Copy</span>
@@ -933,7 +1079,12 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url)
""".format(command=command)
'''
row = [test, command, self.test_result, stdout_log_txt, stderr_log_txt]
row = [
test,
command,
self.test_result,
stdout_log_txt,
stderr_log_txt]
self.csv_results_writer.writerow(row)
self.csv_results_file.flush()
# self.logger.info("row: {}".format(row))
@@ -967,13 +1118,37 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
---------
''')
parser.add_argument('--dir', help="--dir <results directory>", default="lf_check")
parser.add_argument('--path', help="--path <results path>", default="/home/lanforge/html-results")
parser.add_argument('--json_rig', help="--json_rig <rig json config> ", default="", required=True)
parser.add_argument('--json_dut', help="--json_dut <dut json config> ", default="", required=True)
parser.add_argument('--json_test', help="--json_test <test json config> ", default="", required=True)
parser.add_argument('--suite', help="--suite <suite name> default TEST_DICTIONARY", default="TEST_DICTIONARY")
parser.add_argument('--db_override', help="--db_override <sqlite db> override for json DATABASE_SQLITE''", default=None)
parser.add_argument(
'--dir',
help="--dir <results directory>",
default="lf_check")
parser.add_argument(
'--path',
help="--path <results path>",
default="/home/lanforge/html-results")
parser.add_argument(
'--json_rig',
help="--json_rig <rig json config> ",
default="",
required=True)
parser.add_argument(
'--json_dut',
help="--json_dut <dut json config> ",
default="",
required=True)
parser.add_argument(
'--json_test',
help="--json_test <test json config> ",
default="",
required=True)
parser.add_argument(
'--suite',
help="--suite <suite name> default TEST_DICTIONARY",
default="TEST_DICTIONARY")
parser.add_argument(
'--db_override',
help="--db_override <sqlite db> override for json DATABASE_SQLITE''",
default=None)
parser.add_argument('--production', help="--production stores true, sends email results to production email list",
action='store_true')
parser.add_argument('--outfile', help="--outfile <Output Generic Name> used as base name for all files generated",
@@ -1030,9 +1205,9 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
# create report class for reporting
report = lf_report.lf_report(_path=__path,
_results_dir_name=__dir,
_output_html="lf_check.html",
_output_pdf="lf_check.pdf")
_results_dir_name=__dir,
_output_html="lf_check.html",
_output_pdf="lf_check.pdf")
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
csv_results = "lf_check{}-{}.csv".format(args.outfile, current_time)
@@ -1067,7 +1242,8 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
file_handler = logging.FileHandler(logfile, "w")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler(sys.stdout)) # allows to logging to file and stdout
# allows to logging to file and stdout
logger.addHandler(logging.StreamHandler(sys.stdout))
# read config and run tests
check.read_json_rig() # check.read_config
@@ -1084,44 +1260,59 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
try:
lanforge_system_node_version = check.get_lanforge_system_node_version()
print("lanforge_system_node_version {system_node_ver}".format(system_node_ver=lanforge_system_node_version))
print("lanforge_system_node_version {system_node_ver}".format(
system_node_ver=lanforge_system_node_version))
except BaseException:
print("lanforge_system_node_version exception")
try:
lanforge_kernel_version = check.get_lanforge_kernel_version()
print("lanforge_kernel_version {kernel_ver}".format(kernel_ver=lanforge_kernel_version))
print("lanforge_kernel_version {kernel_ver}".format(
kernel_ver=lanforge_kernel_version))
except BaseException:
print("lanforge_kernel_version exception, tests aborted check lanforge ip")
exit(1)
try:
lanforge_server_version_full = check.get_lanforge_server_version()
print("lanforge_server_version_full {lanforge_server_version_full}".format(lanforge_server_version_full=lanforge_server_version_full))
print("lanforge_server_version_full {lanforge_server_version_full}".format(
lanforge_server_version_full=lanforge_server_version_full))
except BaseException:
print("lanforge_server_version exception, tests aborted check lanforge ip")
exit(1)
try:
lanforge_gui_version_full, lanforge_gui_version, lanforge_gui_build_date, lanforge_gui_git_sha = check.get_lanforge_gui_version()
print("lanforge_gui_version_full {lanforge_gui_version_full}".format(lanforge_gui_version_full=lanforge_gui_version_full))
print("lanforge_gui_version_full {lanforge_gui_version_full}".format(
lanforge_gui_version_full=lanforge_gui_version_full))
except BaseException:
print("lanforge_gui_version exception, tests aborted check lanforge ip")
exit(1)
try:
lanforge_radio_json, lanforge_radio_text = check.get_lanforge_radio_information()
lanforge_radio_formatted_str = json.dumps(lanforge_radio_json, indent=2)
print("lanforge_radio_json: {lanforge_radio_json}".format(lanforge_radio_json=lanforge_radio_formatted_str))
lanforge_radio_formatted_str = json.dumps(
lanforge_radio_json, indent=2)
print("lanforge_radio_json: {lanforge_radio_json}".format(
lanforge_radio_json=lanforge_radio_formatted_str))
# note put into the meta data
lf_radio_df = pd.DataFrame(columns=['Radio', 'WIFI-Radio Driver', 'Radio Capabilities', 'Firmware Version', 'max_sta', 'max_vap', 'max_vifs'])
lf_radio_df = pd.DataFrame(
columns=[
'Radio',
'WIFI-Radio Driver',
'Radio Capabilities',
'Firmware Version',
'max_sta',
'max_vap',
'max_vifs'])
for key in lanforge_radio_json:
if 'wiphy' in key:
# print("key {}".format(key))
# print("lanforge_radio_json[{}]: {}".format(key,lanforge_radio_json[key]))
driver = lanforge_radio_json[key]['driver'].split('Driver:', maxsplit=1)[-1].split(maxsplit=1)[0]
driver = lanforge_radio_json[key]['driver'].split(
'Driver:', maxsplit=1)[-1].split(maxsplit=1)[0]
try:
firmware_version = lanforge_radio_json[key]['firmware version']
except BaseException:
@@ -1160,7 +1351,9 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
# generate output reports
test_rig = check.get_test_rig()
report.set_title("LF Check: {test_rig} lf_check.py".format(test_rig=test_rig))
report.set_title(
"LF Check: {test_rig} lf_check.py".format(
test_rig=test_rig))
report.build_banner_left()
report.start_content_div2()
report.set_obj_html("Objective", "Run QA Tests")