lf_check.py : added iterations to the individual tests

Signed-off-by: Chuck SmileyRekiere <chuck.smileyrekiere@candelatech.com>
This commit is contained in:
Chuck SmileyRekiere
2021-08-15 09:27:56 -06:00
parent d84e82cac1
commit d0c5566f41

View File

@@ -146,6 +146,7 @@ class lf_check():
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.test_timeout = 120 self.test_timeout = 120
self.test_timeout_default = 120 self.test_timeout_default = 120
self.test_iterations_default = 1
self.use_blank_db = "FALSE" self.use_blank_db = "FALSE"
self.use_factory_default_db = "FALSE" self.use_factory_default_db = "FALSE"
self.use_custom_db = "FALSE" self.use_custom_db = "FALSE"
@@ -471,7 +472,7 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url)
else: else:
self.logger.info("NOTE: test_blog not found in json") self.logger.info("NOTE: test_blog not found in json")
#TODO change code so if parameter is not present then implied to be false
def read_test_parameters(self): def read_test_parameters(self):
if "test_timeout" in self.json_rig["test_parameters"]: if "test_timeout" in self.json_rig["test_parameters"]:
self.test_timeout = self.json_rig["test_parameters"]["test_timeout"] self.test_timeout = self.json_rig["test_parameters"]["test_timeout"]
@@ -771,281 +772,293 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url)
# load the default database # load the default database
elif self.test_dict[test]['enabled'] == "TRUE": elif self.test_dict[test]['enabled'] == "TRUE":
#TODO Place test interations here #TODO Place test interations here
# if args key has a value of an empty string then need to manipulate the args_list to args if 'iterations' in self.test_dict[test]:
# list does not have replace only stings do to args_list will be joined and converted to a string and placed self.logger.info("iterations : {}".format(self.test_dict[test]['iterations']))
# in args. Then the replace below will work. self.test_iterations = int(self.test_dict[test]['iterations'])
if self.test_dict[test]['args'] == "":
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'],
''.join(self.test_dict[test][
'args_list']))
# Configure Tests
# loop through radios
for radio in self.radio_dict:
# replace RADIO, SSID, PASSWD, SECURITY with actual config values (e.g. RADIO_0_CFG to values)
# not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues
# --num_stations needs to be int not string (no double quotes)
if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
self.radio_dict[radio]["KEY"],
'--radio {} --ssid {} --passwd {} --security {} --num_stations {}'
.format(self.radio_dict[radio]['RADIO'], self.radio_dict[radio]['SSID'],
self.radio_dict[radio]['PASSWD'], self.radio_dict[radio]['SECURITY'],
self.radio_dict[radio]['STATIONS']))
if 'HTTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP',
self.http_test_ip)
if 'FTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP', self.ftp_test_ip)
if 'TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP', self.test_ip)
if 'LF_MGR_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_IP', self.lf_mgr_ip)
if 'LF_MGR_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PORT', self.lf_mgr_port)
if 'DUT_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_NAME', self.dut_name)
if 'DUT_HW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_HW', self.dut_hw)
if 'DUT_SW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SW', self.dut_sw)
if 'DUT_MODEL' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_MODEL', self.dut_model)
if 'DUT_SERIAL' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SERIAL', self.dut_serial)
if 'DUT_BSSID_2G' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_2G',
self.dut_bssid_2g)
if 'DUT_BSSID_5G' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_5G',
self.dut_bssid_5g)
if 'DUT_BSSID_6G' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_6G',
self.dut_bssid_6g)
if 'RADIO_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('RADIO_USED', self.radio_lf)
if 'SSID_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_USED', self.ssid)
if 'SSID_PW_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_PW_USED', self.ssid_pw)
if 'SECURITY_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_USED', self.security)
if 'NUM_STA' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA', self.num_sta)
if 'COL_NAMES' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES', self.col_names)
if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',
self.upstream_port)
# lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location
# of the reports when the reports are pulled.
if 'REPORT_PATH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('REPORT_PATH', self.report_path)
# The TEST_BED is the database tag
if 'TEST_BED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_BED', self.database_tag)
# database configuration
if 'DATABASE_HOST' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_HOST',
self.database_host)
if 'DATABASE_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_PORT',
self.database_port)
if 'DATABASE_TOKEN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TOKEN',
self.database_token)
if 'DATABASE_ORG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_ORG',
self.database_org)
if 'DATABASE_BUCKET' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_BUCKET',
self.database_bucket)
if 'DATABASE_TAG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TAG',
self.database_tag)
if 'DUT_SET_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME',
self.dut_set_name)
if 'TEST_RIG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_RIG', self.test_rig)
# dashboard configuration
if 'DASHBOARD_HOST' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_HOST',
self.dashboard_host)
if 'DASHBOARD_TOKEN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_TOKEN',
self.dashboard_token)
# blog configuration
if 'BLOG_HOST' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_HOST', self.blog_host)
if 'BLOG_TOKEN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_TOKEN', self.blog_token)
if 'BLOG_AUTHORS' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_AUTHORS',
self.blog_authors)
if 'BLOG_CUSTOMER' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_CUSTOMER',
self.blog_customer)
if 'BLOG_USER_PUSH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_USER_PUSH',
self.blog_user_push)
if 'BLOG_PASSWORD_PUSH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_PASSWORD_PUSH',
self.blog_password_push)
if 'BLOG_FLAG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_FLAG', self.blog_flag)
if 'timeout' in self.test_dict[test]:
self.logger.info("timeout : {}".format(self.test_dict[test]['timeout']))
self.test_timeout = int(self.test_dict[test]['timeout'])
else: else:
self.test_timeout = self.test_timeout_default self.test_iterations = self.test_iterations_default
if 'load_db' in self.test_dict[test]: iteration = 0
self.logger.info("load_db : {}".format(self.test_dict[test]['load_db'])) for iteration in range(self.test_iterations):
if str(self.test_dict[test]['load_db']).lower() != "none" and str( iteration += 1
self.test_dict[test]['load_db']).lower() != "skip":
try: # if args key has a value of an empty string then need to manipulate the args_list to args
self.load_custom_db(self.test_dict[test]['load_db']) # list does not have replace only stings do to args_list will be joined and converted to a string and placed
except: # in args. Then the replace below will work.
self.logger.info("custom database failed to load check existance and location: {}".format( if self.test_dict[test]['args'] == "":
self.test_dict[test]['load_db'])) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'],
else: ''.join(self.test_dict[test][
self.logger.info("no load_db present in dictionary, load db normally") 'args_list']))
if self.use_factory_default_db == "TRUE": # Configure Tests
self.load_factory_default_db() # loop through radios
sleep(3) for radio in self.radio_dict:
self.logger.info("FACTORY_DFLT loaded between tests with scenario.py --load FACTORY_DFLT") # replace RADIO, SSID, PASSWD, SECURITY with actual config values (e.g. RADIO_0_CFG to values)
if self.use_blank_db == "TRUE": # not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues
self.load_blank_db() # --num_stations needs to be int not string (no double quotes)
sleep(1) if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']:
self.logger.info("BLANK loaded between tests with scenario.py --load BLANK") self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
if self.use_custom_db == "TRUE": self.radio_dict[radio]["KEY"],
try: '--radio {} --ssid {} --passwd {} --security {} --num_stations {}'
self.load_custom_db(self.custom_db) .format(self.radio_dict[radio]['RADIO'], self.radio_dict[radio]['SSID'],
sleep(1) self.radio_dict[radio]['PASSWD'], self.radio_dict[radio]['SECURITY'],
self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db, self.radio_dict[radio]['STATIONS']))
self.custom_db))
except: if 'HTTP_TEST_IP' in self.test_dict[test]['args']:
self.logger.info("custom database failed to load check existance and location: {}".format( self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP',
self.custom_db)) self.http_test_ip)
if 'FTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP', self.ftp_test_ip)
if 'TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP', self.test_ip)
if 'LF_MGR_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_IP', self.lf_mgr_ip)
if 'LF_MGR_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PORT', self.lf_mgr_port)
if 'DUT_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_NAME', self.dut_name)
if 'DUT_HW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_HW', self.dut_hw)
if 'DUT_SW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SW', self.dut_sw)
if 'DUT_MODEL' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_MODEL', self.dut_model)
if 'DUT_SERIAL' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SERIAL', self.dut_serial)
if 'DUT_BSSID_2G' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_2G',
self.dut_bssid_2g)
if 'DUT_BSSID_5G' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_5G',
self.dut_bssid_5g)
if 'DUT_BSSID_6G' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_6G',
self.dut_bssid_6g)
if 'RADIO_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('RADIO_USED', self.radio_lf)
if 'SSID_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_USED', self.ssid)
if 'SSID_PW_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_PW_USED', self.ssid_pw)
if 'SECURITY_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_USED', self.security)
if 'NUM_STA' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA', self.num_sta)
if 'COL_NAMES' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES', self.col_names)
if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',
self.upstream_port)
# lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location
# of the reports when the reports are pulled.
if 'REPORT_PATH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('REPORT_PATH', self.report_path)
# The TEST_BED is the database tag
if 'TEST_BED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_BED', self.database_tag)
# database configuration
if 'DATABASE_HOST' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_HOST',
self.database_host)
if 'DATABASE_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_PORT',
self.database_port)
if 'DATABASE_TOKEN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TOKEN',
self.database_token)
if 'DATABASE_ORG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_ORG',
self.database_org)
if 'DATABASE_BUCKET' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_BUCKET',
self.database_bucket)
if 'DATABASE_TAG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TAG',
self.database_tag)
if 'DUT_SET_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME',
self.dut_set_name)
if 'TEST_RIG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_RIG', self.test_rig)
# end of database configuration
# dashboard configuration
if 'DASHBOARD_HOST' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_HOST',
self.dashboard_host)
if 'DASHBOARD_TOKEN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_TOKEN',
self.dashboard_token)
# end of dashboard configuraiton
# blog configuration
if 'BLOG_HOST' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_HOST', self.blog_host)
if 'BLOG_TOKEN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_TOKEN', self.blog_token)
if 'BLOG_AUTHORS' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_AUTHORS',
self.blog_authors)
if 'BLOG_CUSTOMER' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_CUSTOMER',
self.blog_customer)
if 'BLOG_USER_PUSH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_USER_PUSH',
self.blog_user_push)
if 'BLOG_PASSWORD_PUSH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_PASSWORD_PUSH',
self.blog_password_push)
if 'BLOG_FLAG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_FLAG', self.blog_flag)
# end of blog configruation
if 'timeout' in self.test_dict[test]:
self.logger.info("timeout : {}".format(self.test_dict[test]['timeout']))
self.test_timeout = int(self.test_dict[test]['timeout'])
else: else:
self.logger.info("no db loaded between tests: {}".format(self.use_custom_db)) self.test_timeout = self.test_timeout_default
sleep(1) # DO NOT REMOVE the sleep is to allow for the database to stablize if 'load_db' in self.test_dict[test]:
try: self.logger.info("load_db : {}".format(self.test_dict[test]['load_db']))
os.chdir(self.scripts_wd) if str(self.test_dict[test]['load_db']).lower() != "none" and str(
# self.logger.info("Current Working Directory {}".format(os.getcwd())) self.test_dict[test]['load_db']).lower() != "skip":
except: try:
self.logger.info("failed to change to {}".format(self.scripts_wd)) self.load_custom_db(self.test_dict[test]['load_db'])
cmd_args = "{}".format(self.test_dict[test]['args']) except:
command = "./{} {}".format(self.test_dict[test]['command'], cmd_args) self.logger.info("custom database failed to load check existance and location: {}".format(
self.logger.info("command: {}".format(command)) self.test_dict[test]['load_db']))
self.logger.info("cmd_args {}".format(cmd_args)) else:
self.logger.info("no load_db present in dictionary, load db normally")
if self.outfile_name is not None: if self.use_factory_default_db == "TRUE":
stdout_log_txt = os.path.join(self.log_path, "{}-{}-stdout.txt".format(self.outfile_name,test)) self.load_factory_default_db()
self.logger.info("stdout_log_txt: {}".format(stdout_log_txt)) sleep(3)
stdout_log = open(stdout_log_txt, 'a') self.logger.info("FACTORY_DFLT loaded between tests with scenario.py --load FACTORY_DFLT")
stderr_log_txt = os.path.join(self.log_path, "{}-{}-stderr.txt".format(self.outfile_name,test)) if self.use_blank_db == "TRUE":
self.logger.info("stderr_log_txt: {}".format(stderr_log_txt)) self.load_blank_db()
stderr_log = open(stderr_log_txt, 'a') sleep(1)
self.logger.info("BLANK loaded between tests with scenario.py --load BLANK")
# need to take into account --raw_line parameters thus need to use shlex.split if self.use_custom_db == "TRUE":
# need to preserve command to have correct command syntax in command output try:
command_to_run = command self.load_custom_db(self.custom_db)
command_to_run = shlex.split(command_to_run) sleep(1)
print("running {command_to_run}".format(command_to_run=command_to_run)) self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db,
try: self.custom_db))
process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log, except:
universal_newlines=True) self.logger.info("custom database failed to load check existance and location: {}".format(
# if there is a better solution please propose, the TIMEOUT Result is different then FAIL self.custom_db))
try:
process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired:
process.terminate()
self.test_result = "TIMEOUT"
except:
print("No such file or directory with command: {}".format(command))
self.logger.info("No such file or directory with command: {}".format(command))
if self.test_result != "TIMEOUT":
stderr_log_size = os.path.getsize(stderr_log_txt)
if stderr_log_size > 0:
self.logger.info("File: {} is not empty: {}".format(stderr_log_txt, str(stderr_log_size)))
text = open(stderr_log_txt).read()
if 'Error' in text:
self.text_result = "Failure"
background = self.background_red
else: else:
self.logger.info("no db loaded between tests: {}".format(self.use_custom_db))
sleep(1) # DO NOT REMOVE the sleep is to allow for the database to stablize
try:
os.chdir(self.scripts_wd)
# self.logger.info("Current Working Directory {}".format(os.getcwd()))
except:
self.logger.info("failed to change to {}".format(self.scripts_wd))
cmd_args = "{}".format(self.test_dict[test]['args'])
command = "./{} {}".format(self.test_dict[test]['command'], cmd_args)
self.logger.info("command: {}".format(command))
self.logger.info("cmd_args {}".format(cmd_args))
if self.outfile_name is not None:
stdout_log_txt = os.path.join(self.log_path, "{}-{}-stdout.txt".format(self.outfile_name,test))
self.logger.info("stdout_log_txt: {}".format(stdout_log_txt))
stdout_log = open(stdout_log_txt, 'a')
stderr_log_txt = os.path.join(self.log_path, "{}-{}-stderr.txt".format(self.outfile_name,test))
self.logger.info("stderr_log_txt: {}".format(stderr_log_txt))
stderr_log = open(stderr_log_txt, 'a')
# need to take into account --raw_line parameters thus need to use shlex.split
# need to preserve command to have correct command syntax in command output
command_to_run = command
command_to_run = shlex.split(command_to_run)
print("running {command_to_run}".format(command_to_run=command_to_run))
try:
process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log,
universal_newlines=True)
# if there is a better solution please propose, the TIMEOUT Result is different then FAIL
try:
process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired:
process.terminate()
self.test_result = "TIMEOUT"
except:
print("No such file or directory with command: {}".format(command))
self.logger.info("No such file or directory with command: {}".format(command))
if self.test_result != "TIMEOUT":
stderr_log_size = os.path.getsize(stderr_log_txt)
if stderr_log_size > 0:
self.logger.info("File: {} is not empty: {}".format(stderr_log_txt, str(stderr_log_size)))
text = open(stderr_log_txt).read()
if 'Error' in text:
self.text_result = "Failure"
background = self.background_red
else:
self.test_result = "Success"
background = self.background_green
else:
self.logger.info("File: {} is empty: {}".format(stderr_log_txt, str(stderr_log_size)))
self.test_result = "Success" self.test_result = "Success"
background = self.background_green background = self.background_green
else: else:
self.logger.info("File: {} is empty: {}".format(stderr_log_txt, str(stderr_log_size))) self.logger.info("TIMEOUT FAILURE, Check LANforge Radios")
self.test_result = "Success" self.test_result = "Time Out"
background = self.background_green background = self.background_purple
# Ghost will put data in stderr
if 'ghost' in command or 'lf_qa' in command:
if self.test_result != "TIMEOUT":
text = open(stderr_log_txt).read()
if 'Error' in text:
self.test_result = "Failure"
background = self.background_red
else:
self.test_result = "Success"
background = self.background_blue
if 'lf_qa' in command:
line_list = open(stdout_log_txt).readlines()
for line in line_list:
if 'html report:' in line:
self.qa_report_html = line
print("html_report: {report}".format(report=self.qa_report_html))
break
self.qa_report_html = self.qa_report_html.replace('html report: ','')
# stdout_log_link is used for the email reporting to have the corrected path
stdout_log_link = str(stdout_log_txt).replace('/home/lanforge', '')
stderr_log_link = str(stderr_log_txt).replace('/home/lanforge', '')
self.html_results += """
<tr><td>""" + str(test) + """</td><td class='scriptdetails'>""" + str(command) + """</td>
<td style=""" + str(background) + """>""" + str(self.test_result) + """
<td><a href=""" + str(stdout_log_link) + """ target=\"_blank\">STDOUT</a></td>"""
if self.test_result == "Failure":
self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
elif self.test_result == "Time Out":
self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
else:
self.html_results += """<td></td>"""
self.html_results += """</tr>"""
row = [test, command, self.test_result, stdout_log_txt, stderr_log_txt]
self.csv_results_writer.writerow(row)
self.csv_results_file.flush()
# self.logger.info("row: {}".format(row))
self.logger.info("test: {} executed".format(test))
else: else:
self.logger.info("TIMEOUT FAILURE, Check LANforge Radios") self.logger.info(
self.test_result = "Time Out" "enable value {} invalid for test: {}, test skipped".format(self.test_dict[test]['enabled'], test))
background = self.background_purple
# Ghost will put data in stderr
if 'ghost' in command or 'lf_qa' in command:
if self.test_result != "TIMEOUT":
text = open(stderr_log_txt).read()
if 'Error' in text:
self.test_result = "Failure"
background = self.background_red
else:
self.test_result = "Success"
background = self.background_blue
if 'lf_qa' in command:
line_list = open(stdout_log_txt).readlines()
for line in line_list:
if 'html report:' in line:
self.qa_report_html = line
print("html_report: {report}".format(report=self.qa_report_html))
break
self.qa_report_html = self.qa_report_html.replace('html report: ','')
# stdout_log_link is used for the email reporting to have the corrected path
stdout_log_link = str(stdout_log_txt).replace('/home/lanforge', '')
stderr_log_link = str(stderr_log_txt).replace('/home/lanforge', '')
self.html_results += """
<tr><td>""" + str(test) + """</td><td class='scriptdetails'>""" + str(command) + """</td>
<td style=""" + str(background) + """>""" + str(self.test_result) + """
<td><a href=""" + str(stdout_log_link) + """ target=\"_blank\">STDOUT</a></td>"""
if self.test_result == "Failure":
self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
elif self.test_result == "Time Out":
self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
else:
self.html_results += """<td></td>"""
self.html_results += """</tr>"""
row = [test, command, self.test_result, stdout_log_txt, stderr_log_txt]
self.csv_results_writer.writerow(row)
self.csv_results_file.flush()
# self.logger.info("row: {}".format(row))
self.logger.info("test: {} executed".format(test))
else:
self.logger.info(
"enable value {} invalid for test: {}, test skipped".format(self.test_dict[test]['enabled'], test))
self.finish_html_results() self.finish_html_results()