diff --git a/py-scripts/tools/lf_check.py b/py-scripts/tools/lf_check.py index c09164cb..39f80116 100755 --- a/py-scripts/tools/lf_check.py +++ b/py-scripts/tools/lf_check.py @@ -146,6 +146,7 @@ class lf_check(): self.logger = logging.getLogger(__name__) self.test_timeout = 120 self.test_timeout_default = 120 + self.test_iterations_default = 1 self.use_blank_db = "FALSE" self.use_factory_default_db = "FALSE" self.use_custom_db = "FALSE" @@ -471,7 +472,7 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url) else: self.logger.info("NOTE: test_blog not found in json") - + #TODO change code so if parameter is not present then implied to be false def read_test_parameters(self): if "test_timeout" in self.json_rig["test_parameters"]: self.test_timeout = self.json_rig["test_parameters"]["test_timeout"] @@ -771,281 +772,293 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url) # load the default database elif self.test_dict[test]['enabled'] == "TRUE": #TODO Place test interations here - # if args key has a value of an empty string then need to manipulate the args_list to args - # list does not have replace only stings do to args_list will be joined and converted to a string and placed - # in args. Then the replace below will work. - if self.test_dict[test]['args'] == "": - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'], - ''.join(self.test_dict[test][ - 'args_list'])) - # Configure Tests - # loop through radios - for radio in self.radio_dict: - # replace RADIO, SSID, PASSWD, SECURITY with actual config values (e.g. RADIO_0_CFG to values) - # not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues - # --num_stations needs to be int not string (no double quotes) - if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( - self.radio_dict[radio]["KEY"], - '--radio {} --ssid {} --passwd {} --security {} --num_stations {}' - .format(self.radio_dict[radio]['RADIO'], self.radio_dict[radio]['SSID'], - self.radio_dict[radio]['PASSWD'], self.radio_dict[radio]['SECURITY'], - self.radio_dict[radio]['STATIONS'])) - - if 'HTTP_TEST_IP' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP', - self.http_test_ip) - if 'FTP_TEST_IP' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP', self.ftp_test_ip) - if 'TEST_IP' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP', self.test_ip) - - if 'LF_MGR_IP' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_IP', self.lf_mgr_ip) - if 'LF_MGR_PORT' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PORT', self.lf_mgr_port) - - if 'DUT_NAME' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_NAME', self.dut_name) - if 'DUT_HW' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_HW', self.dut_hw) - if 'DUT_SW' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SW', self.dut_sw) - if 'DUT_MODEL' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_MODEL', self.dut_model) - if 'DUT_SERIAL' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SERIAL', self.dut_serial) - if 'DUT_BSSID_2G' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_2G', - self.dut_bssid_2g) - if 'DUT_BSSID_5G' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_5G', - self.dut_bssid_5g) - if 'DUT_BSSID_6G' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_6G', - self.dut_bssid_6g) - - if 'RADIO_USED' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('RADIO_USED', self.radio_lf) - if 'SSID_USED' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_USED', self.ssid) - if 'SSID_PW_USED' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_PW_USED', self.ssid_pw) - if 'SECURITY_USED' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_USED', self.security) - if 'NUM_STA' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA', self.num_sta) - if 'COL_NAMES' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES', self.col_names) - if 'UPSTREAM_PORT' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT', - self.upstream_port) - - # lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location - # of the reports when the reports are pulled. - if 'REPORT_PATH' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('REPORT_PATH', self.report_path) - - # The TEST_BED is the database tag - if 'TEST_BED' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_BED', self.database_tag) - - # database configuration - if 'DATABASE_HOST' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_HOST', - self.database_host) - if 'DATABASE_PORT' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_PORT', - self.database_port) - if 'DATABASE_TOKEN' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TOKEN', - self.database_token) - if 'DATABASE_ORG' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_ORG', - self.database_org) - if 'DATABASE_BUCKET' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_BUCKET', - self.database_bucket) - if 'DATABASE_TAG' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TAG', - self.database_tag) - if 'DUT_SET_NAME' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME', - self.dut_set_name) - - if 'TEST_RIG' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_RIG', self.test_rig) - - # dashboard configuration - if 'DASHBOARD_HOST' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_HOST', - self.dashboard_host) - if 'DASHBOARD_TOKEN' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_TOKEN', - self.dashboard_token) - - # blog configuration - if 'BLOG_HOST' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_HOST', self.blog_host) - if 'BLOG_TOKEN' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_TOKEN', self.blog_token) - if 'BLOG_AUTHORS' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_AUTHORS', - self.blog_authors) - if 'BLOG_CUSTOMER' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_CUSTOMER', - self.blog_customer) - if 'BLOG_USER_PUSH' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_USER_PUSH', - self.blog_user_push) - if 'BLOG_PASSWORD_PUSH' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_PASSWORD_PUSH', - self.blog_password_push) - if 'BLOG_FLAG' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_FLAG', self.blog_flag) - - if 'timeout' in self.test_dict[test]: - self.logger.info("timeout : {}".format(self.test_dict[test]['timeout'])) - self.test_timeout = int(self.test_dict[test]['timeout']) + if 'iterations' in self.test_dict[test]: + self.logger.info("iterations : {}".format(self.test_dict[test]['iterations'])) + self.test_iterations = int(self.test_dict[test]['iterations']) else: - self.test_timeout = self.test_timeout_default + self.test_iterations = self.test_iterations_default - if 'load_db' in self.test_dict[test]: - self.logger.info("load_db : {}".format(self.test_dict[test]['load_db'])) - if str(self.test_dict[test]['load_db']).lower() != "none" and str( - self.test_dict[test]['load_db']).lower() != "skip": - try: - self.load_custom_db(self.test_dict[test]['load_db']) - except: - self.logger.info("custom database failed to load check existance and location: {}".format( - self.test_dict[test]['load_db'])) - else: - self.logger.info("no load_db present in dictionary, load db normally") - if self.use_factory_default_db == "TRUE": - self.load_factory_default_db() - sleep(3) - self.logger.info("FACTORY_DFLT loaded between tests with scenario.py --load FACTORY_DFLT") - if self.use_blank_db == "TRUE": - self.load_blank_db() - sleep(1) - self.logger.info("BLANK loaded between tests with scenario.py --load BLANK") - if self.use_custom_db == "TRUE": - try: - self.load_custom_db(self.custom_db) - sleep(1) - self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db, - self.custom_db)) - except: - self.logger.info("custom database failed to load check existance and location: {}".format( - self.custom_db)) + iteration = 0 + for iteration in range(self.test_iterations): + iteration += 1 + + # if args key has a value of an empty string then need to manipulate the args_list to args + # list does not have replace only stings do to args_list will be joined and converted to a string and placed + # in args. Then the replace below will work. + if self.test_dict[test]['args'] == "": + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'], + ''.join(self.test_dict[test][ + 'args_list'])) + # Configure Tests + # loop through radios + for radio in self.radio_dict: + # replace RADIO, SSID, PASSWD, SECURITY with actual config values (e.g. RADIO_0_CFG to values) + # not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues + # --num_stations needs to be int not string (no double quotes) + if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + self.radio_dict[radio]["KEY"], + '--radio {} --ssid {} --passwd {} --security {} --num_stations {}' + .format(self.radio_dict[radio]['RADIO'], self.radio_dict[radio]['SSID'], + self.radio_dict[radio]['PASSWD'], self.radio_dict[radio]['SECURITY'], + self.radio_dict[radio]['STATIONS'])) + + if 'HTTP_TEST_IP' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP', + self.http_test_ip) + if 'FTP_TEST_IP' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP', self.ftp_test_ip) + if 'TEST_IP' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP', self.test_ip) + + if 'LF_MGR_IP' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_IP', self.lf_mgr_ip) + if 'LF_MGR_PORT' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PORT', self.lf_mgr_port) + + if 'DUT_NAME' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_NAME', self.dut_name) + if 'DUT_HW' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_HW', self.dut_hw) + if 'DUT_SW' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SW', self.dut_sw) + if 'DUT_MODEL' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_MODEL', self.dut_model) + if 'DUT_SERIAL' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SERIAL', self.dut_serial) + if 'DUT_BSSID_2G' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_2G', + self.dut_bssid_2g) + if 'DUT_BSSID_5G' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_5G', + self.dut_bssid_5g) + if 'DUT_BSSID_6G' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_6G', + self.dut_bssid_6g) + + if 'RADIO_USED' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('RADIO_USED', self.radio_lf) + if 'SSID_USED' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_USED', self.ssid) + if 'SSID_PW_USED' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_PW_USED', self.ssid_pw) + if 'SECURITY_USED' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_USED', self.security) + if 'NUM_STA' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA', self.num_sta) + if 'COL_NAMES' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES', self.col_names) + if 'UPSTREAM_PORT' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT', + self.upstream_port) + + # lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location + # of the reports when the reports are pulled. + if 'REPORT_PATH' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('REPORT_PATH', self.report_path) + + # The TEST_BED is the database tag + if 'TEST_BED' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_BED', self.database_tag) + + # database configuration + if 'DATABASE_HOST' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_HOST', + self.database_host) + if 'DATABASE_PORT' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_PORT', + self.database_port) + if 'DATABASE_TOKEN' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TOKEN', + self.database_token) + if 'DATABASE_ORG' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_ORG', + self.database_org) + if 'DATABASE_BUCKET' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_BUCKET', + self.database_bucket) + if 'DATABASE_TAG' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TAG', + self.database_tag) + if 'DUT_SET_NAME' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME', + self.dut_set_name) + if 'TEST_RIG' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_RIG', self.test_rig) + # end of database configuration + + # dashboard configuration + if 'DASHBOARD_HOST' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_HOST', + self.dashboard_host) + if 'DASHBOARD_TOKEN' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_TOKEN', + self.dashboard_token) + # end of dashboard configuraiton + + # blog configuration + if 'BLOG_HOST' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_HOST', self.blog_host) + if 'BLOG_TOKEN' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_TOKEN', self.blog_token) + if 'BLOG_AUTHORS' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_AUTHORS', + self.blog_authors) + if 'BLOG_CUSTOMER' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_CUSTOMER', + self.blog_customer) + if 'BLOG_USER_PUSH' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_USER_PUSH', + self.blog_user_push) + if 'BLOG_PASSWORD_PUSH' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_PASSWORD_PUSH', + self.blog_password_push) + if 'BLOG_FLAG' in self.test_dict[test]['args']: + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_FLAG', self.blog_flag) + # end of blog configruation + + if 'timeout' in self.test_dict[test]: + self.logger.info("timeout : {}".format(self.test_dict[test]['timeout'])) + self.test_timeout = int(self.test_dict[test]['timeout']) else: - self.logger.info("no db loaded between tests: {}".format(self.use_custom_db)) + self.test_timeout = self.test_timeout_default - sleep(1) # DO NOT REMOVE the sleep is to allow for the database to stablize - try: - os.chdir(self.scripts_wd) - # self.logger.info("Current Working Directory {}".format(os.getcwd())) - except: - self.logger.info("failed to change to {}".format(self.scripts_wd)) - cmd_args = "{}".format(self.test_dict[test]['args']) - command = "./{} {}".format(self.test_dict[test]['command'], cmd_args) - self.logger.info("command: {}".format(command)) - self.logger.info("cmd_args {}".format(cmd_args)) - - if self.outfile_name is not None: - stdout_log_txt = os.path.join(self.log_path, "{}-{}-stdout.txt".format(self.outfile_name,test)) - self.logger.info("stdout_log_txt: {}".format(stdout_log_txt)) - stdout_log = open(stdout_log_txt, 'a') - stderr_log_txt = os.path.join(self.log_path, "{}-{}-stderr.txt".format(self.outfile_name,test)) - self.logger.info("stderr_log_txt: {}".format(stderr_log_txt)) - stderr_log = open(stderr_log_txt, 'a') - - # need to take into account --raw_line parameters thus need to use shlex.split - # need to preserve command to have correct command syntax in command output - command_to_run = command - command_to_run = shlex.split(command_to_run) - print("running {command_to_run}".format(command_to_run=command_to_run)) - try: - process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log, - universal_newlines=True) - # if there is a better solution please propose, the TIMEOUT Result is different then FAIL - try: - process.wait(timeout=int(self.test_timeout)) - except subprocess.TimeoutExpired: - process.terminate() - self.test_result = "TIMEOUT" - - except: - print("No such file or directory with command: {}".format(command)) - self.logger.info("No such file or directory with command: {}".format(command)) - - if self.test_result != "TIMEOUT": - stderr_log_size = os.path.getsize(stderr_log_txt) - if stderr_log_size > 0: - self.logger.info("File: {} is not empty: {}".format(stderr_log_txt, str(stderr_log_size))) - text = open(stderr_log_txt).read() - if 'Error' in text: - self.text_result = "Failure" - background = self.background_red + if 'load_db' in self.test_dict[test]: + self.logger.info("load_db : {}".format(self.test_dict[test]['load_db'])) + if str(self.test_dict[test]['load_db']).lower() != "none" and str( + self.test_dict[test]['load_db']).lower() != "skip": + try: + self.load_custom_db(self.test_dict[test]['load_db']) + except: + self.logger.info("custom database failed to load check existance and location: {}".format( + self.test_dict[test]['load_db'])) + else: + self.logger.info("no load_db present in dictionary, load db normally") + if self.use_factory_default_db == "TRUE": + self.load_factory_default_db() + sleep(3) + self.logger.info("FACTORY_DFLT loaded between tests with scenario.py --load FACTORY_DFLT") + if self.use_blank_db == "TRUE": + self.load_blank_db() + sleep(1) + self.logger.info("BLANK loaded between tests with scenario.py --load BLANK") + if self.use_custom_db == "TRUE": + try: + self.load_custom_db(self.custom_db) + sleep(1) + self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db, + self.custom_db)) + except: + self.logger.info("custom database failed to load check existance and location: {}".format( + self.custom_db)) else: + self.logger.info("no db loaded between tests: {}".format(self.use_custom_db)) + + sleep(1) # DO NOT REMOVE the sleep is to allow for the database to stablize + try: + os.chdir(self.scripts_wd) + # self.logger.info("Current Working Directory {}".format(os.getcwd())) + except: + self.logger.info("failed to change to {}".format(self.scripts_wd)) + cmd_args = "{}".format(self.test_dict[test]['args']) + command = "./{} {}".format(self.test_dict[test]['command'], cmd_args) + self.logger.info("command: {}".format(command)) + self.logger.info("cmd_args {}".format(cmd_args)) + + if self.outfile_name is not None: + stdout_log_txt = os.path.join(self.log_path, "{}-{}-stdout.txt".format(self.outfile_name,test)) + self.logger.info("stdout_log_txt: {}".format(stdout_log_txt)) + stdout_log = open(stdout_log_txt, 'a') + stderr_log_txt = os.path.join(self.log_path, "{}-{}-stderr.txt".format(self.outfile_name,test)) + self.logger.info("stderr_log_txt: {}".format(stderr_log_txt)) + stderr_log = open(stderr_log_txt, 'a') + + # need to take into account --raw_line parameters thus need to use shlex.split + # need to preserve command to have correct command syntax in command output + command_to_run = command + command_to_run = shlex.split(command_to_run) + print("running {command_to_run}".format(command_to_run=command_to_run)) + try: + process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log, + universal_newlines=True) + # if there is a better solution please propose, the TIMEOUT Result is different then FAIL + try: + process.wait(timeout=int(self.test_timeout)) + except subprocess.TimeoutExpired: + process.terminate() + self.test_result = "TIMEOUT" + + except: + print("No such file or directory with command: {}".format(command)) + self.logger.info("No such file or directory with command: {}".format(command)) + + if self.test_result != "TIMEOUT": + stderr_log_size = os.path.getsize(stderr_log_txt) + if stderr_log_size > 0: + self.logger.info("File: {} is not empty: {}".format(stderr_log_txt, str(stderr_log_size))) + text = open(stderr_log_txt).read() + if 'Error' in text: + self.text_result = "Failure" + background = self.background_red + else: + self.test_result = "Success" + background = self.background_green + else: + self.logger.info("File: {} is empty: {}".format(stderr_log_txt, str(stderr_log_size))) self.test_result = "Success" background = self.background_green else: - self.logger.info("File: {} is empty: {}".format(stderr_log_txt, str(stderr_log_size))) - self.test_result = "Success" - background = self.background_green + self.logger.info("TIMEOUT FAILURE, Check LANforge Radios") + self.test_result = "Time Out" + background = self.background_purple + + # Ghost will put data in stderr + if 'ghost' in command or 'lf_qa' in command: + if self.test_result != "TIMEOUT": + text = open(stderr_log_txt).read() + if 'Error' in text: + self.test_result = "Failure" + background = self.background_red + else: + self.test_result = "Success" + background = self.background_blue + if 'lf_qa' in command: + line_list = open(stdout_log_txt).readlines() + for line in line_list: + if 'html report:' in line: + self.qa_report_html = line + print("html_report: {report}".format(report=self.qa_report_html)) + break + + self.qa_report_html = self.qa_report_html.replace('html report: ','') + + + # stdout_log_link is used for the email reporting to have the corrected path + stdout_log_link = str(stdout_log_txt).replace('/home/lanforge', '') + stderr_log_link = str(stderr_log_txt).replace('/home/lanforge', '') + self.html_results += """ +