#!/usr/bin/python3 ''' NAME: lf_check.py PURPOSE: lf_check.py will run a series of tests based on TEST_DICTIONARY or test suite passed in listed in lf_check_config.ini. The config file may be copied from lf_check_config_template.ini, or can be generated. The config file name can be passed in as a configuraiton parameter. EXAMPLE: lf_check.py # this will use the defaults lf_check.py --ini --test_suite NOTES: Before using lf_check.py 1. copy lf_check_config_template.ini to the lf_check_config.ini 2. update lf_check_config.ini to enable (TRUE) tests to be run in the test suite, the default suite is the TEST_DICTIONARY TO DO NOTES: 6/13/2021 : 1. add server (telnet localhost 4001) build info, GUI build sha, and Kernel version to the output. 2. allow for json or config.ini file ''' import datetime import pprint import sys if sys.version_info[0] != 3: print("This script requires Python3") exit() import os import socket import logging import time from time import sleep import argparse import json import configparser import subprocess import csv import shutil import os.path from os import path # lf_report is from the parent of the current file dir_path = os.path.dirname(os.path.realpath(__file__)) parent_dir_path = os.path.abspath(os.path.join(dir_path,os.pardir)) sys.path.insert(0, parent_dir_path) #sys.path.append('../') from lf_report import lf_report sys.path.append('/') # setup logging FORMAT FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s' # lf_check class contains verificaiton configuration and ocastrates the testing. class lf_check(): def __init__(self, _config_ini, _test_suite, _csv_results, _outfile): self.config_ini = _config_ini self.test_suite = _test_suite self.lf_mgr_ip = "" self.lf_mgr_port = "" self.radio_dict = {} self.test_dict = {} path_parent = os.path.dirname(os.getcwd()) os.chdir(path_parent) self.scripts_wd = os.getcwd() self.results = "" self.outfile = _outfile self.test_result = "Failure" self.results_col_titles = ["Test","Command","Result","STDOUT","STDERR"] self.html_results = "" self.background_green = "background-color:green" self.background_red = "background-color:red" self.background_purple = "background-color:purple" self.http_test_ip = "" self.ftp_test_ip = "" self.test_ip = "" # section TEST_GENERIC self.radio_lf = "" self.ssdi = "" self.ssid_pw = "" self.security = "" self.num_sta = "" self.col_names = "" self.upstream_port = "" self.csv_results = _csv_results self.csv_results_file = "" self.csv_results_writer = "" self.csv_results_column_headers = "" self.logger = logging.getLogger(__name__) self.test_timeout = 120 self.use_blank_db = "FALSE" self.use_factory_default_db = "FALSE" self.use_custom_db = "FALSE" self.production_run = "FALSE" self.email_list_production = "" self.host_ip_production = None self.email_list_test = "" self.host_ip_test = None # NOT complete : will send the email results def send_results_email(self, report_file=None): if (report_file is None): print( "No report file, not sending email.") return report_url=report_file.replace('/home/lanforge/', '') if report_url.startswith('/'): report_url = report_url[1:] # Following recommendation # NOTE: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-from-nic-in-python #command = 'echo "$HOSTNAME mail system works!" | mail -s "Test: $HOSTNAME $(date)" chuck.rekiere@candelatech.com' hostname = socket.gethostname() ip = socket.gethostbyname(hostname) message_txt = """Results from {hostname}: http://{ip}/{report} NOTE: for now to see stdout and stderr remove /home/lanforge from path. """.format(hostname=hostname, ip=ip, report=report_url) mail_subject = "Regression Test [{hostname}] {date}".format(hostname=hostname, date=datetime.datetime.now()) try: if self.production_run == "TRUE": msg = message_txt.format(ip=self.host_ip_production) command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format( message=msg, subject=mail_subject, ip=self.host_ip_production, address=self.email_list_production) else: msg = message_txt.format(ip=ip) command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format( message=msg, subject=mail_subject, ip=ip, #self.host_ip_test, address=self.email_list_test) print("running:[{}]".format(command)) process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # have email on separate timeout process.wait(timeout=int(self.test_timeout)) except subprocess.TimeoutExpired: print("send email timed out") process.terminate() def get_csv_results(self): return self.csv_file.name def start_csv_results(self): print("self.csv_results") self.csv_results_file = open(self.csv_results, "w") self.csv_results_writer = csv.writer(self.csv_results_file, delimiter=",") self.csv_results_column_headers = ['Test','Command','Result','STDOUT','STDERR'] self.csv_results_writer.writerow(self.csv_results_column_headers) self.csv_results_file.flush() def get_html_results(self): return self.html_results def start_html_results(self): self.html_results += """ """ def finish_html_results(self): self.html_results += """
Test Command Result STDOUT STDERR



""" # Functions in this section are/can be overridden by descendants # This code reads the lf_check_config.ini file to populate the test variables def read_config_contents(self): self.logger.info("read_config_contents {}".format(self.config_ini)) config_file = configparser.ConfigParser() success = True success = config_file.read(self.config_ini) self.logger.info("logger worked") if 'LF_MGR' in config_file.sections(): section = config_file['LF_MGR'] self.lf_mgr_ip = section['LF_MGR_IP'] self.lf_mgr_port = section['LF_MGR_PORT'] self.logger.info("lf_mgr_ip {}".format(self.lf_mgr_ip)) self.logger.info("lf_mgr_port {}".format(self.lf_mgr_port)) if 'TEST_NETWORK' in config_file.sections(): section = config_file['TEST_NETWORK'] self.http_test_ip = section['HTTP_TEST_IP'] self.logger.info("http_test_ip {}".format(self.http_test_ip)) self.ftp_test_ip = section['FTP_TEST_IP'] self.logger.info("ftp_test_ip {}".format(self.ftp_test_ip)) self.test_ip = section['TEST_IP'] self.logger.info("test_ip {}".format(self.test_ip)) if 'TEST_GENERIC' in config_file.sections(): section = config_file['TEST_GENERIC'] self.radio_lf = section['RADIO_USED'] self.logger.info("radio_lf {}".format(self.radio_lf)) self.ssid = section['SSID_USED'] self.logger.info("ssid {}".format(self.ssid)) self.ssid_pw = section['SSID_PW_USED'] self.logger.info("ssid_pw {}".format(self.ssid_pw)) self.security = section['SECURITY_USED'] self.logger.info("secruity {}".format(self.security)) self.num_sta = section['NUM_STA'] self.logger.info("num_sta {}".format(self.num_sta)) self.col_names = section['COL_NAMES'] self.logger.info("col_names {}".format(self.col_names)) self.upstream_port = section['UPSTREAM_PORT'] self.logger.info("upstream_port {}".format(self.upstream_port)) if 'TEST_PARAMETERS' in config_file.sections(): section = config_file['TEST_PARAMETERS'] self.test_timeout = section['TEST_TIMEOUT'] self.use_blank_db = section['LOAD_BLANK_DB'] self.use_factory_default_db = section['LOAD_FACTORY_DEFAULT_DB'] self.use_custom_db = section['LOAD_CUSTOM_DB'] self.custom_db = section['CUSTOM_DB'] self.production_run = section['PRODUCTION_RUN'] self.email_list_production = section['EMAIL_LIST_PRODUCTION'] self.host_ip_production = section['HOST_IP_PRODUCTION'] self.email_list_test = section['EMAIL_LIST_TEST'] self.host_ip_test = section['HOST_IP_TEST'] if 'RADIO_DICTIONARY' in config_file.sections(): section = config_file['RADIO_DICTIONARY'] self.radio_dict = json.loads(section.get('RADIO_DICT', self.radio_dict)) self.logger.info("self.radio_dict {}".format(self.radio_dict)) if self.test_suite in config_file.sections(): section = config_file[self.test_suite] # for json replace the \n and \r they are invalid json characters, allows for multiple line args try: self.test_dict = json.loads(section.get('TEST_DICT', self.test_dict).replace('\n',' ').replace('\r',' ')) self.logger.info("{}: {}".format(self.test_suite,self.test_dict)) except: self.logger.info("Excpetion loading {}, is there comma after the last entry? Check syntax".format(self.test_suite)) def load_factory_default_db(self): #self.logger.info("file_wd {}".format(self.scripts_wd)) try: os.chdir(self.scripts_wd) #self.logger.info("Current Working Directory {}".format(os.getcwd())) except: self.logger.info("failed to change to {}".format(self.scripts_wd)) # no spaces after FACTORY_DFLT command = "./{} {}".format("scenario.py", "--load FACTORY_DFLT") process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # wait for the process to terminate out, err = process.communicate() errcode = process.returncode # Not currently used def load_blank_db(self): #self.logger.info("file_wd {}".format(self.scripts_wd)) try: os.chdir(self.scripts_wd) #self.logger.info("Current Working Directory {}".format(os.getcwd())) except: self.logger.info("failed to change to {}".format(self.scripts_wd)) # no spaces after FACTORY_DFLT command = "./{} {}".format("scenario.py", "--load BLANK") process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) def load_custom_db(self,custom_db): #self.logger.info("file_wd {}".format(self.scripts_wd)) try: os.chdir(self.scripts_wd) #self.logger.info("Current Working Directory {}".format(os.getcwd())) except: self.logger.info("failed to change to {}".format(self.scripts_wd)) # no spaces after FACTORY_DFLT command = "./{} {}".format("scenario.py", "--load {}".format(custom_db)) process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # wait for the process to terminate out, err = process.communicate() errcode = process.returncode def run_script_test(self): self.start_html_results() self.start_csv_results() for test in self.test_dict: if self.test_dict[test]['enabled'] == "FALSE": self.logger.info("test: {} skipped".format(test)) # load the default database elif self.test_dict[test]['enabled'] == "TRUE": # Make the command replace ment a separate method call. # loop through radios for radio in self.radio_dict: # Replace RADIO, SSID, PASSWD, SECURITY with actual config values (e.g. RADIO_0_CFG to values) # not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues # --num_stations needs to be int not string (no double quotes) if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.radio_dict[radio]["KEY"],'--radio {} --ssid {} --passwd {} --security {} --num_stations {}' .format(self.radio_dict[radio]['RADIO'],self.radio_dict[radio]['SSID'],self.radio_dict[radio]['PASSWD'],self.radio_dict[radio]['SECURITY'],self.radio_dict[radio]['STATIONS'])) if 'HTTP_TEST_IP' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP',self.http_test_ip) if 'FTP_TEST_IP' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP',self.ftp_test_ip) if 'TEST_IP' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP',self.test_ip) if 'RADIO_USED' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('RADIO_USED',self.radio_lf) if 'SSID_USED' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_USED',self.ssid) if 'SSID_PW_USED' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_PW_USED',self.ssid_pw) if 'SECURITY_USED' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_USED',self.security) if 'NUM_STA' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA',self.num_sta) if 'COL_NAMES' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES',self.col_names) if 'UPSTREAM_PORT' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',self.col_names) if self.use_factory_default_db == "TRUE": self.load_factory_default_db() sleep(3) self.logger.info("FACTORY_DFLT loaded between tests with scenario.py --load FACTORY_DFLT") if self.use_blank_db == "TRUE": self.load_blank_db() sleep(1) self.logger.info("BLANK loaded between tests with scenario.py --load BLANK") if self.use_custom_db == "TRUE": try: self.load_custom_db(self.custom_db) sleep(1) self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db,self.custom_db)) except: self.logger.info("custom database failed to load check existance and location") else: self.logger.info("no db loaded between tests: {}".format(self.use_custom_db)) sleep(1) # the sleep is to allow for the database to stablize try: os.chdir(self.scripts_wd) #self.logger.info("Current Working Directory {}".format(os.getcwd())) except: self.logger.info("failed to change to {}".format(self.scripts_wd)) cmd_args = "{}".format(self.test_dict[test]['args']) command = "./{} {}".format(self.test_dict[test]['command'], cmd_args) self.logger.info("command: {}".format(command)) self.logger.info("cmd_args {}".format(cmd_args)) if self.outfile is not None: stdout_log_txt = self.outfile stdout_log_txt = stdout_log_txt + "-{}-stdout.txt".format(test) #self.logger.info("stdout_log_txt: {}".format(stdout_log_txt)) stdout_log = open(stdout_log_txt, 'a') stderr_log_txt = self.outfile stderr_log_txt = stderr_log_txt + "-{}-stderr.txt".format(test) #self.logger.info("stderr_log_txt: {}".format(stderr_log_txt)) stderr_log = open(stderr_log_txt, 'a') # HERE is thwere the test is run print("running {}".format(command)) try: process = subprocess.Popen((command).split(' '), shell=False, stdout=stdout_log, stderr=stderr_log, universal_newlines=True) # If there is a better solution please propose, the TIMEOUT Result is different then FAIL try: #out, err = process.communicate() process.wait(timeout=int(self.test_timeout)) except subprocess.TimeoutExpired: process.terminate() self.test_result = "TIMEOUT" except: print("No such file or directory with command: {}".format(command)) self.logger.info("No such file or directory with command: {}".format(command)) if(self.test_result != "TIMEOUT"): stderr_log_size = os.path.getsize(stderr_log_txt) if stderr_log_size > 0 : self.logger.info("File: {} is not empty: {}".format(stderr_log_txt,str(stderr_log_size))) self.test_result = "Failure" background = self.background_red else: self.logger.info("File: {} is empty: {}".format(stderr_log_txt,str(stderr_log_size))) self.test_result = "Success" background = self.background_green else: self.logger.info("TIMEOUT FAILURE, Check LANforge Radios") self.test_result = "Time Out" background = self.background_purple self.html_results += """ """ + str(test) + """""" + str(command) + """ """ + str(self.test_result) + """ STDOUT""" if self.test_result == "Failure": self.html_results += """STDERR""" elif self.test_result == "Time Out": self.html_results += """STDERR""" #self.html_results += """""" else: self.html_results += """""" self.html_results += """""" row = [test,command,self.test_result,stdout_log_txt,stderr_log_txt] self.csv_results_writer.writerow(row) self.csv_results_file.flush() #self.logger.info("row: {}".format(row)) self.logger.info("test: {} executed".format(test)) else: self.logger.info("enable value {} invalid for test: {}, test skipped".format(self.test_dict[test]['enabled'],test)) self.finish_html_results() def main(): # arguments parser = argparse.ArgumentParser( prog='lf_check.py', formatter_class=argparse.RawTextHelpFormatter, epilog='''\ lf_check.py : for running scripts listed in lf_check_config.ini file ''', description='''\ lf_check.py ----------- Summary : --------- for running scripts listed in lf_check_config.ini ''') parser.add_argument('--ini', help="--ini default lf_check_config.ini", default="lf_check_config.ini") parser.add_argument('--suite', help="--suite default TEST_DICTIONARY", default="TEST_DICTIONARY") parser.add_argument('--outfile', help="--outfile used as base name for all files generated", default="") parser.add_argument('--logfile', help="--logfile logging for output of lf_check.py script", default="lf_check.log") args = parser.parse_args() # load the test config information config_ini = os.getcwd() + '/' + args.ini if os.path.exists(config_ini): print("TEST CONFIG : {}".format(config_ini)) else: print("EXITING: NOTFOUND TEST CONFIG : {} ".format(config_ini)) exit(1) test_suite = args.suite # output report. report = lf_report(_results_dir_name="lf_check", _output_html="lf_check.html", _output_pdf="lf-check.pdf") current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) csv_results = "lf_check{}-{}.csv".format(args.outfile,current_time) csv_results = report.file_add_path(csv_results) outfile = "lf_check-{}-{}".format(args.outfile,current_time) outfile_path = report.file_add_path(outfile) # lf_check() class created check = lf_check(_config_ini = config_ini, _test_suite = test_suite, _csv_results = csv_results, _outfile = outfile_path) # get the git sha process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE) (commit_hash, err) = process.communicate() exit_code = process.wait() git_sha = commit_hash.decode('utf-8','ignore') # set up logging logfile = args.logfile[:-4] print("logfile: {}".format(logfile)) logfile = "{}-{}.log".format(logfile,current_time) logfile = report.file_add_path(logfile) print("logfile {}".format(logfile)) formatter = logging.Formatter(FORMAT) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) file_handler = logging.FileHandler(logfile, "w") file_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) # allows to logging to file and stdout logger.info("commit_hash: {}".format(commit_hash)) logger.info("commit_hash2: {}".format(commit_hash.decode('utf-8','ignore'))) # READ Config and RUN Tests check.read_config_contents() check.run_script_test() # Generate Ouptput reports report.set_title("LF Check: lf_check.py") report.build_banner() report.start_content_div() report.set_table_title("LF Check Test Results") report.build_table_title() report.set_text("git sha: {}".format(git_sha)) report.build_text() html_results = check.get_html_results() report.set_custom_html(html_results) report.build_custom() html_report = report.write_html_with_timestamp() print("html report: {}".format(html_report)) report.write_pdf_with_timestamp() report_path = os.path.dirname(html_report) parent_report_dir = os.path.dirname(report_path) # copy results to lastest so someone may see the latest. lf_check_latest_html = parent_report_dir + "/lf_check_latest.html" # duplicates html_report file up one directory lf_check_html_report = parent_report_dir + "/{}.html".format(outfile) # banner_src_png = report_path + "/banner.png" banner_dest_png = parent_report_dir + "/banner.png" CandelaLogo_src_png = report_path + "/CandelaLogo2-90dpi-200x90-trans.png" CandelaLogo_dest_png = parent_report_dir + "/CandelaLogo2-90dpi-200x90-trans.png" report_src_css = report_path + "/report.css" report_dest_css = parent_report_dir + "/report.css" custom_src_css = report_path + "/custom.css" custom_dest_css = parent_report_dir + "/custom.css" font_src_woff = report_path + "/CenturyGothic.woff" font_dest_woff = parent_report_dir + "/CenturyGothic.woff" #pprint.pprint([ # ('banner_src', banner_src_png), # ('banner_dest', banner_dest_png), # ('CandelaLogo_src_png', CandelaLogo_src_png), # ('CandelaLogo_dest_png', CandelaLogo_dest_png), # ('report_src_css', report_src_css), # ('custom_src_css', custom_src_css) #]) # copy one directory above shutil.copyfile(html_report, lf_check_latest_html) shutil.copyfile(html_report, lf_check_html_report) # copy banner and logo shutil.copyfile(banner_src_png, banner_dest_png) shutil.copyfile(CandelaLogo_src_png, CandelaLogo_dest_png) shutil.copyfile(report_src_css, report_dest_css) shutil.copyfile(custom_src_css, custom_dest_css) shutil.copyfile(font_src_woff, font_dest_woff) print("lf_check_latest.html: "+lf_check_latest_html) print("lf_check_html_report: "+lf_check_html_report) check.send_results_email(report_file=lf_check_html_report) if __name__ == '__main__': main()