diff --git a/py-scripts/tools/lf_check.py b/py-scripts/tools/lf_check.py index 54c7c167..32f5673d 100755 --- a/py-scripts/tools/lf_check.py +++ b/py-scripts/tools/lf_check.py @@ -123,7 +123,8 @@ sys.path.insert(0, parent_dir_path) FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s' -# lf_check class contains verificaiton configuration and ocastrates the testing. +# lf_check class contains verificaiton configuration and ocastrates the +# testing. class lf_check(): def __init__(self, _json_rig, @@ -153,7 +154,8 @@ class lf_check(): self.outfile = _outfile self.outfile_name = _outfile_name self.test_result = "Failure" - self.results_col_titles = ["Test", "Command", "Result", "STDOUT", "STDERR"] + self.results_col_titles = [ + "Test", "Command", "Result", "STDOUT", "STDERR"] self.html_results = "" self.background_green = "background-color:green" self.background_red = "background-color:red" @@ -193,7 +195,9 @@ class lf_check(): # section DUT # dut selection - self.dut_set_name = 'DUT_NAME ASUSRT-AX88U' # note the name will be set as --set DUT_NAME ASUSRT-AX88U, this is not dut_name (see above) + # note the name will be set as --set DUT_NAME ASUSRT-AX88U, this is not + # dut_name (see above) + self.dut_set_name = 'DUT_NAME ASUSRT-AX88U' self.use_dut_name = "DUT_NAME_NA" # "ASUSRT-AX88U" note this is not dut_set_name self.dut_hw = "DUT_HW_NA" self.dut_sw = "DUT_SW_NA" @@ -234,10 +238,13 @@ class lf_check(): def get_scripts_git_sha(self): # get git sha - process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE) + process = subprocess.Popen( + ["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE) (commit_hash, err) = process.communicate() exit_code = process.wait() - print("get_scripts_get_sha exit_code: {exit_code}".format(exit_code=exit_code)) + print( + "get_scripts_get_sha exit_code: {exit_code}".format( + exit_code=exit_code)) scripts_git_sha = commit_hash.decode('utf-8', 'ignore') return scripts_git_sha @@ -254,10 +261,17 @@ class lf_check(): # curl --user "lanforge:lanforge" -H 'Accept: application/json' # http://192.168.100.116:8080/radiostatus/all | json_pp , where --user # "USERNAME:PASSWORD" - request_command = 'http://{lfmgr}:{port}/radiostatus/all'.format(lfmgr=self.lf_mgr_ip, port=self.lf_mgr_port) - request = requests.get(request_command, auth=(self.lf_mgr_user, self.lf_mgr_pass)) - print("radio request command: {request_command}".format(request_command=request_command)) - print("radio request status_code {status}".format(status=request.status_code)) + request_command = 'http://{lfmgr}:{port}/radiostatus/all'.format( + lfmgr=self.lf_mgr_ip, port=self.lf_mgr_port) + request = requests.get( + request_command, auth=( + self.lf_mgr_user, self.lf_mgr_pass)) + print( + "radio request command: {request_command}".format( + request_command=request_command)) + print( + "radio request status_code {status}".format( + status=request.status_code)) lanforge_radio_json = request.json() print("radio request.json: {json}".format(json=lanforge_radio_json)) lanforge_radio_text = request.text @@ -265,69 +279,88 @@ class lf_check(): return lanforge_radio_json, lanforge_radio_text def get_lanforge_system_node_version(self): - ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + # creating shh client object we use this object to connect to router + ssh = paramiko.SSHClient() + # automatically adds the missing host key + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) stdin, stdout, stderr = ssh.exec_command('uname -n') self.lanforge_system_node_version = stdout.readlines() - self.lanforge_system_node_version = [line.replace('\n', '') for line in self.lanforge_system_node_version] + self.lanforge_system_node_version = [line.replace( + '\n', '') for line in self.lanforge_system_node_version] ssh.close() time.sleep(1) return self.lanforge_system_node_version def get_lanforge_kernel_version(self): - ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + # creating shh client object we use this object to connect to router + ssh = paramiko.SSHClient() + # automatically adds the missing host key + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) stdin, stdout, stderr = ssh.exec_command('uname -r') self.lanforge_kernel_version = stdout.readlines() - self.lanforge_kernel_version = [line.replace('\n', '') for line in self.lanforge_kernel_version] + self.lanforge_kernel_version = [line.replace( + '\n', '') for line in self.lanforge_kernel_version] ssh.close() time.sleep(1) return self.lanforge_kernel_version def get_lanforge_server_version(self): - ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + # creating shh client object we use this object to connect to router + ssh = paramiko.SSHClient() + # automatically adds the missing host key + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) - stdin, stdout, stderr = ssh.exec_command('./btserver --version | grep Version') + stdin, stdout, stderr = ssh.exec_command( + './btserver --version | grep Version') self.lanforge_server_version_full = stdout.readlines() - self.lanforge_server_version_full = [line.replace('\n', '') for line in self.lanforge_server_version_full] - print("lanforge_server_version_full: {lanforge_server_version_full}".format(lanforge_server_version_full=self.lanforge_server_version_full)) - self.lanforge_server_version = self.lanforge_server_version_full[0].split('Version:', maxsplit=1)[-1].split(maxsplit=1)[0] + self.lanforge_server_version_full = [line.replace( + '\n', '') for line in self.lanforge_server_version_full] + print("lanforge_server_version_full: {lanforge_server_version_full}".format( + lanforge_server_version_full=self.lanforge_server_version_full)) + self.lanforge_server_version = self.lanforge_server_version_full[0].split( + 'Version:', maxsplit=1)[-1].split(maxsplit=1)[0] self.lanforge_server_version = self.lanforge_server_version.strip() - print("lanforge_server_version: {lanforge_server_version}".format(lanforge_server_version=self.lanforge_server_version)) + print("lanforge_server_version: {lanforge_server_version}".format( + lanforge_server_version=self.lanforge_server_version)) ssh.close() time.sleep(1) return self.lanforge_server_version_full def get_lanforge_gui_version(self): - ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key + # creating shh client object we use this object to connect to router + ssh = paramiko.SSHClient() + # automatically adds the missing host key + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) stdin, stdout, stderr = ssh.exec_command( 'curl -H "Accept: application/json" http://{lanforge_ip}:8080 | json_pp | grep -A 7 "VersionInfo"'.format(lanforge_ip=self.lf_mgr_ip)) self.lanforge_gui_version_full = stdout.readlines() # print("lanforge_gui_version_full pre: {lanforge_gui_version_full}".format(lanforge_gui_version_full=self.lanforge_gui_version_full)) - self.lanforge_gui_version_full = [line.replace('\n', '') for line in self.lanforge_gui_version_full] + self.lanforge_gui_version_full = [line.replace( + '\n', '') for line in self.lanforge_gui_version_full] # print("lanforge_gui_version_full: {lanforge_gui_version_full}".format(lanforge_gui_version_full=self.lanforge_gui_version_full)) for element in self.lanforge_gui_version_full: if "BuildVersion" in element: ver_str = str(element) - self.lanforge_gui_version = ver_str.split(':', maxsplit=1)[-1].replace(',', '') + self.lanforge_gui_version = ver_str.split( + ':', maxsplit=1)[-1].replace(',', '') self.lanforge_gui_version = self.lanforge_gui_version.strip().replace('"', '') print("BuildVersion {}".format(self.lanforge_gui_version)) if "BuildDate" in element: gui_str = str(element) - self.lanforge_gui_build_date = gui_str.split(':', maxsplit=1)[-1].replace(',', '') + self.lanforge_gui_build_date = gui_str.split( + ':', maxsplit=1)[-1].replace(',', '') print("BuildDate {}".format(self.lanforge_gui_build_date)) if "GitVersion" in element: git_sha_str = str(element) - self.lanforge_gui_git_sha = git_sha_str.split(':', maxsplit=1)[-1].replace(',', '') + self.lanforge_gui_git_sha = git_sha_str.split( + ':', maxsplit=1)[-1].replace(',', '') print("GitVersion {}".format(self.lanforge_gui_git_sha)) ssh.close() @@ -353,20 +386,21 @@ class lf_check(): ip = socket.gethostbyname(hostname) # a hostname lacking dots by definition lacks a domain name - # this is not useful for hyperlinks outside the known domain, so an IP address should be preferred + # this is not useful for hyperlinks outside the known domain, so an IP + # address should be preferred if hostname.find('.') < 1: hostname = ip message_txt = "" if (self.email_txt != ""): message_txt = """{email_txt} lanforge target {lf_mgr_ip} -Results from {hostname}: +Results from {hostname}: Suite: {suite} Database: {db} http://{hostname}/{report} """.format(email_txt=self.email_txt, lf_mgr_ip=self.lf_mgr_ip, suite=self.test_suite, db=self.database_sqlite, hostname=hostname, report=report_url) else: - message_txt = """Results from {hostname}: + message_txt = """Results from {hostname}: Suite: {suite} Database: {db} http://{hostname}/{report}""".format(hostname=hostname, suite=self.test_suite, db=self.database_sqlite, report=report_url) @@ -379,14 +413,15 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) if (self.email_title_txt != ""): mail_subject = "{email} [{hostname}] {suite} {date}".format(email=self.email_title_txt, hostname=hostname, - suite=self.test_suite, db=self.database_sqlite, date=datetime.datetime.now()) + suite=self.test_suite, db=self.database_sqlite, date=datetime.datetime.now()) else: - mail_subject = "Regression Test [{hostname}] {suite} {date}".format(hostname=hostname, - suite = self.test_suite, db = self.database_sqlite, date=datetime.datetime.now()) + mail_subject = "Regression Test [{hostname}] {suite} {date}".format(hostname=hostname, + suite=self.test_suite, db=self.database_sqlite, date=datetime.datetime.now()) try: if self.production_run: msg = message_txt.format(ip=ip) - # for postfix from command line echo "My message" | mail -s subject user@candelatech.com + # for postfix from command line echo "My message" | mail -s + # subject user@candelatech.com command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format( message=msg, subject=mail_subject, @@ -410,8 +445,10 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) def start_csv_results(self): print("self.csv_results") self.csv_results_file = open(self.csv_results, "w") - self.csv_results_writer = csv.writer(self.csv_results_file, delimiter=",") - self.csv_results_column_headers = ['Test', 'Command', 'Result', 'STDOUT', 'STDERR'] + self.csv_results_writer = csv.writer( + self.csv_results_file, delimiter=",") + self.csv_results_column_headers = [ + 'Test', 'Command', 'Result', 'STDOUT', 'STDERR'] self.csv_results_writer.writerow(self.csv_results_column_headers) self.csv_results_file.flush() @@ -458,8 +495,11 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) # self.logger.info("test_rig_parameters {}".format(self.json_rig["test_rig_parameters"])) self.read_test_rig_parameters() else: - self.logger.info("EXITING test_rig_parameters not in json {}".format(self.json_rig)) - self.logger.info("EXITING ERROR test_rig_parameters not in rig json") + self.logger.info( + "EXITING test_rig_parameters not in json {}".format( + self.json_rig)) + self.logger.info( + "EXITING ERROR test_rig_parameters not in rig json") exit(1) # read dut configuration @@ -468,24 +508,34 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) self.logger.info("json: read test_dut") self.read_dut_parameters() else: - self.logger.info("EXITING test_dut not in json {}".format(self.json_dut)) + self.logger.info( + "EXITING test_dut not in json {}".format( + self.json_dut)) self.logger.info("EXITING ERROR test_dut not in dut json {}") exit(1) # Top Level for reading the tests to run def read_json_test(self): if "test_suites" in self.json_test: - self.logger.info("json: read test_suites looking for: {}".format(self.test_suite)) + self.logger.info( + "json: read test_suites looking for: {}".format( + self.test_suite)) # self.logger.info("test_suites {}".format(self.json_test["test_suites"])) if self.test_suite in self.json_test["test_suites"]: self.test_dict = self.json_test["test_suites"][self.test_suite] # self.logger.info("self.test_dict {}".format(self.test_dict)) else: - self.logger.info("EXITING test_suite {} Not Present in json test_suites: {}".format(self.test_suite, self.json_test["test_suites"])) - self.logger.info("EXITING ERROR test_suite {} Not Present in json test_suites".format(self.test_suite)) + self.logger.info( + "EXITING test_suite {} Not Present in json test_suites: {}".format( + self.test_suite, self.json_test["test_suites"])) + self.logger.info( + "EXITING ERROR test_suite {} Not Present in json test_suites".format( + self.test_suite)) exit(1) else: - self.logger.info("EXITING test_suites not in json {}".format(self.json_test)) + self.logger.info( + "EXITING test_suites not in json {}".format( + self.json_test)) self.logger.info("EXITING ERROR test_suites not in json test") exit(1) @@ -499,7 +549,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) if "DATABASE_SQLITE" in self.json_rig["test_rig_parameters"]: self.database_sqlite = self.json_rig["test_rig_parameters"]["DATABASE_SQLITE"] else: - self.logger.info("DATABASE_SQLITE not in test_rig_parameters json") + self.logger.info( + "DATABASE_SQLITE not in test_rig_parameters json") else: self.database_sqlite = self.db_override if "LF_MGR_IP" in self.json_rig["test_rig_parameters"]: @@ -531,7 +582,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) if "EMAIL_LIST_PRODUCTION" in self.json_rig["test_rig_parameters"]: self.email_list_production = self.json_rig["test_rig_parameters"]["EMAIL_LIST_PRODUCTION"] else: - self.logger.info("EMAIL_LIST_PRODUCTION not in test_rig_parameters json") + self.logger.info( + "EMAIL_LIST_PRODUCTION not in test_rig_parameters json") exit(1) if "EMAIL_LIST_TEST" in self.json_rig["test_rig_parameters"]: self.email_list_test = self.json_rig["test_rig_parameters"]["EMAIL_LIST_TEST"] @@ -555,7 +607,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) self.dut_set_name = self.json_dut["test_dut"]["DUT_SET_NAME"] else: self.logger.info("DUT_SET_NAME not in test_dut json") - # dut name will set a chamberview scenerio for a DUT which can be selected with dut_set_name + # dut name will set a chamberview scenerio for a DUT which can be + # selected with dut_set_name if "USE_DUT_NAME" in self.json_dut["test_dut"]: self.use_dut_name = self.json_dut["test_dut"]["USE_DUT_NAME"] else: @@ -583,12 +636,14 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) if "wireless_network_dict" in self.json_dut["test_dut"]: self.wireless_network_dict = self.json_dut["test_dut"]["wireless_network_dict"] - self.logger.info("self.wireless_network_dict {}".format(self.wireless_network_dict)) + self.logger.info( + "self.wireless_network_dict {}".format( + self.wireless_network_dict)) else: self.logger.info("wireless_network_dict not in test_dut json") exit(1) - # custom will accept --load FACTORY_DFLT and --load BLANK + # custom will accept --load FACTORY_DFLT and --load BLANK # TODO make a list def load_custom_database(self, custom_db): try: @@ -596,18 +651,23 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) except BaseException: self.logger.info("failed to change to {}".format(self.scripts_wd)) - # WARNING do not simplify the following constructed command - #command = "./{} {} {} {}".format("scenario.py", "--mgr {mgr}"\ + # WARNING do not simplify the following constructed command + # command = "./{} {} {} {}".format("scenario.py", "--mgr {mgr}"\ # .format(mgr=self.lf_mgr_ip),"--load {db}".format(db=custom_db),"--action {action}".format(action="overwrite")) - command = "./{cmd} --mgr {mgr} --load {db} --action {action}".format(cmd="scenario.py",mgr=self.lf_mgr_ip,db=custom_db,action="overwrite") + command = "./{cmd} --mgr {mgr} --load {db} --action {action}".format( + cmd="scenario.py", mgr=self.lf_mgr_ip, db=custom_db, action="overwrite") print("command: {command}".format(command=command)) - + process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # wait for the process to terminate out, err = process.communicate() errcode = process.returncode - print("load_custom_database out: {out} errcode: {errcode} err: {err}".format(out=out,errcode=errcode,err=err)) + print( + "load_custom_database out: {out} errcode: {errcode} err: {err}".format( + out=out, + errcode=errcode, + err=err)) # DO NOT REMOVE 15 second sleep. # After every DB load, the load changes are applied, and part of the apply is to re-build # The underlying netsmith objects @@ -624,24 +684,34 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) elif self.test_dict[test]['enabled'] == "TRUE": # TODO Place test interations here if 'iterations' in self.test_dict[test]: - self.logger.info("iterations : {}".format(self.test_dict[test]['iterations'])) - self.test_iterations = int(self.test_dict[test]['iterations']) + self.logger.info( + "iterations : {}".format( + self.test_dict[test]['iterations'])) + self.test_iterations = int( + self.test_dict[test]['iterations']) else: self.test_iterations = self.test_iterations_default iteration = 0 - report_index = 0 # log may contain multiple runs - this helps put the meta.txt in right directory + # log may contain multiple runs - this helps put the meta.txt + # in right directory + report_index = 0 for iteration in range(self.test_iterations): iteration += 1 # The network arguments need to be changed when in a list - for index, args_list_element in enumerate(self.test_dict[test]['args_list']): + for index, args_list_element in enumerate( + self.test_dict[test]['args_list']): if 'ssid_idx=' in args_list_element: # print("args_list_element {}".format(args_list_element)) - # get ssid_idx used in the test as an index for the dictionary - ssid_idx_number = args_list_element.split('ssid_idx=')[-1].split()[0] - print("ssid_idx_number: {}".format(ssid_idx_number)) - idx = "ssid_idx={}".format(ssid_idx_number) # index into the DUT network index + # get ssid_idx used in the test as an index for the + # dictionary + ssid_idx_number = args_list_element.split( + 'ssid_idx=')[-1].split()[0] + print( + "ssid_idx_number: {}".format(ssid_idx_number)) + # index into the DUT network index + idx = "ssid_idx={}".format(ssid_idx_number) print("idx: {}".format(idx)) if 'SSID_USED' in args_list_element: self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace( @@ -656,49 +726,67 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace( 'BSSID', self.wireless_network_dict[idx]['BSSID']) - # use_ssid_idx is ephemeral and used only for variable replacement , remove + # use_ssid_idx is ephemeral and used only for + # variable replacement , remove tmp_idx = "use_ssid_idx={}".format(ssid_idx_number) if tmp_idx in args_list_element: - self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace(tmp_idx, '') + self.test_dict[test]['args_list'][index] = self.test_dict[test]['args_list'][index].replace( + tmp_idx, '') # leave in for checking the command line arguments - print("self.test_dict[test]['args_list']: {}".format(self.test_dict[test]['args_list'])) + print( + "self.test_dict[test]['args_list']: {}".format( + self.test_dict[test]['args_list'])) - # Walk all the args in the args list then construct the arguments + # Walk all the args in the args list then construct the + # arguments if self.test_dict[test]['args'] == "": self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'], ''.join(self.test_dict[test][ 'args_list'])) if 'DATABASE_SQLITE' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_SQLITE', self.database_sqlite) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'DATABASE_SQLITE', self.database_sqlite) if 'HTTP_TEST_IP' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP', self.http_test_ip) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'HTTP_TEST_IP', self.http_test_ip) if 'FTP_TEST_IP' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP', self.ftp_test_ip) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'FTP_TEST_IP', self.ftp_test_ip) if 'TEST_IP' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP', self.test_ip) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'TEST_IP', self.test_ip) if 'LF_MGR_USER' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_USER', self.lf_mgr_user) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'LF_MGR_USER', self.lf_mgr_user) if 'LF_MGR_PASS' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PASS', self.lf_mgr_pass) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'LF_MGR_PASS', self.lf_mgr_pass) if 'LF_MGR_IP' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_IP', self.lf_mgr_ip) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'LF_MGR_IP', self.lf_mgr_ip) if 'LF_MGR_PORT' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PORT', self.lf_mgr_port) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'LF_MGR_PORT', self.lf_mgr_port) # DUT Configuration if 'USE_DUT_NAME' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('USE_DUT_NAME', self.use_dut_name) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'USE_DUT_NAME', self.use_dut_name) if 'DUT_HW' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_HW', self.dut_hw) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'DUT_HW', self.dut_hw) if 'DUT_SW' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SW', self.dut_sw) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'DUT_SW', self.dut_sw) if 'DUT_MODEL' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_MODEL', self.dut_model) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'DUT_MODEL', self.dut_model) if 'DUT_SERIAL' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SERIAL', self.dut_serial) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'DUT_SERIAL', self.dut_serial) if 'UPSTREAM_PORT' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT', @@ -706,13 +794,15 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) # lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location # of the reports when the reports are pulled. if 'REPORT_PATH' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('REPORT_PATH', self.report_path) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'REPORT_PATH', self.report_path) if 'DUT_SET_NAME' in self.test_dict[test]['args']: self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME', self.dut_set_name) if 'TEST_RIG' in self.test_dict[test]['args']: - self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_RIG', self.test_rig) + self.test_dict[test]['args'] = self.test_dict[test]['args'].replace( + 'TEST_RIG', self.test_rig) # END of command line arg processing if self.test_dict[test]['args'] == "": @@ -721,17 +811,23 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) 'args_list'])) if 'timeout' in self.test_dict[test]: - self.logger.info("timeout : {}".format(self.test_dict[test]['timeout'])) - self.test_timeout = int(self.test_dict[test]['timeout']) + self.logger.info( + "timeout : {}".format( + self.test_dict[test]['timeout'])) + self.test_timeout = int( + self.test_dict[test]['timeout']) else: self.test_timeout = self.test_timeout_default if 'load_db' in self.test_dict[test]: - self.logger.info("load_db : {}".format(self.test_dict[test]['load_db'])) + self.logger.info( + "load_db : {}".format( + self.test_dict[test]['load_db'])) if str(self.test_dict[test]['load_db']).lower() != "none" and str( self.test_dict[test]['load_db']).lower() != "skip": try: - self.load_custom_database(self.test_dict[test]['load_db']) + self.load_custom_database( + self.test_dict[test]['load_db']) except BaseException: self.logger.info("custom database failed to load check existance and location: {}".format( self.test_dict[test]['load_db'])) @@ -740,32 +836,46 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) os.chdir(self.scripts_wd) # self.logger.info("Current Working Directory {}".format(os.getcwd())) except BaseException: - self.logger.info("failed to change to {}".format(self.scripts_wd)) + self.logger.info( + "failed to change to {}".format( + self.scripts_wd)) cmd_args = "{}".format(self.test_dict[test]['args']) - command = "./{} {}".format(self.test_dict[test]['command'], cmd_args) + command = "./{} {}".format( + self.test_dict[test]['command'], cmd_args) self.logger.info("command: {}".format(command)) self.logger.info("cmd_args {}".format(cmd_args)) if self.outfile_name is not None: - stdout_log_txt = os.path.join(self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, test)) - self.logger.info("stdout_log_txt: {}".format(stdout_log_txt)) + stdout_log_txt = os.path.join( + self.log_path, "{}-{}-stdout.txt".format(self.outfile_name, test)) + self.logger.info( + "stdout_log_txt: {}".format(stdout_log_txt)) stdout_log = open(stdout_log_txt, 'a') - stderr_log_txt = os.path.join(self.log_path, "{}-{}-stderr.txt".format(self.outfile_name, test)) - self.logger.info("stderr_log_txt: {}".format(stderr_log_txt)) + stderr_log_txt = os.path.join( + self.log_path, "{}-{}-stderr.txt".format(self.outfile_name, test)) + self.logger.info( + "stderr_log_txt: {}".format(stderr_log_txt)) stderr_log = open(stderr_log_txt, 'a') # need to take into account --raw_line parameters thus need to use shlex.split - # need to preserve command to have correct command syntax in command output + # need to preserve command to have correct command syntax + # in command output command_to_run = command command_to_run = shlex.split(command_to_run) - print("running {command_to_run}".format(command_to_run=command_to_run)) - self.test_start_time = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")).replace(':', '-') - print("Test start: {time}".format(time=self.test_start_time)) + print( + "running {command_to_run}".format( + command_to_run=command_to_run)) + self.test_start_time = str(datetime.datetime.now().strftime( + "%Y-%m-%d-%H-%M-%S")).replace(':', '-') + print( + "Test start: {time}".format( + time=self.test_start_time)) start_time = datetime.datetime.now() try: process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log, universal_newlines=True) - # if there is a better solution please propose, the TIMEOUT Result is different then FAIL + # if there is a better solution please propose, the + # TIMEOUT Result is different then FAIL try: if int(self.test_timeout != 0): process.wait(timeout=int(self.test_timeout)) @@ -776,12 +886,17 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) self.test_result = "TIMEOUT" except BaseException: - print("No such file or directory with command: {}".format(command)) - self.logger.info("No such file or directory with command: {}".format(command)) + print( + "No such file or directory with command: {}".format(command)) + self.logger.info( + "No such file or directory with command: {}".format(command)) end_time = datetime.datetime.now() - self.test_end_time = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")).replace(':', '-') - print("Test end time {time}".format(time=self.test_end_time)) + self.test_end_time = str(datetime.datetime.now().strftime( + "%Y-%m-%d-%H-%M-%S")).replace(':', '-') + print( + "Test end time {time}".format( + time=self.test_end_time)) time_delta = end_time - start_time self.duration = "{day}d {seconds}s {msec} ms".format( @@ -789,38 +904,56 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) # If collect meta data is set meta_data_path = "" - # Will gather data even on a TIMEOUT condition as there is some results on longer tests + # Will gather data even on a TIMEOUT condition as there is + # some results on longer tests stdout_log_size = os.path.getsize(stdout_log_txt) if stdout_log_size > 0: stdout_log_fd = open(stdout_log_txt) # "Report Location:::/home/lanforge/html-reports/wifi-capacity-2021-08-17-04-02-56" # - + for line in stdout_log_fd: if "Report Location" in line: report_index += 1 if iteration == report_index: meta_data_path = line.replace('"', '') - meta_data_path = meta_data_path.replace('Report Location:::', '') - meta_data_path = meta_data_path.split('/')[-1] + meta_data_path = meta_data_path.replace( + 'Report Location:::', '') + meta_data_path = meta_data_path.split( + '/')[-1] meta_data_path = meta_data_path.strip() meta_data_path = self.report_path + '/' + meta_data_path + '/meta.txt' break stdout_log_fd.close() if meta_data_path != "": meta_data_fd = open(meta_data_path, 'w+') - meta_data_fd.write('$ Generated by Candela Technologies LANforge network testing tool\n') - meta_data_fd.write("test_run {test_run}\n".format(test_run=self.report_path)) - meta_data_fd.write("file_meta {path}\n".format(path=meta_data_path)) - meta_data_fd.write('lanforge_gui_version: {gui_version} \n'.format(gui_version=self.lanforge_gui_version)) - meta_data_fd.write('lanforge_server_version: {server_version} \n'.format(server_version=self.lanforge_server_version)) + meta_data_fd.write( + '$ Generated by Candela Technologies LANforge network testing tool\n') + meta_data_fd.write( + "test_run {test_run}\n".format( + test_run=self.report_path)) + meta_data_fd.write( + "file_meta {path}\n".format( + path=meta_data_path)) + meta_data_fd.write( + 'lanforge_gui_version: {gui_version} \n'.format( + gui_version=self.lanforge_gui_version)) + meta_data_fd.write( + 'lanforge_server_version: {server_version} \n'.format( + server_version=self.lanforge_server_version)) meta_data_fd.write('$ LANforge command\n') - meta_data_fd.write("command {command}\n".format(command=command)) - # split command at test-tag , at rest of string once at the actual test-tag value - test_tag = command.split('test_tag', maxsplit=1)[-1].split(maxsplit=1)[0] + meta_data_fd.write( + "command {command}\n".format( + command=command)) + # split command at test-tag , at rest of string once at + # the actual test-tag value + test_tag = command.split( + 'test_tag', maxsplit=1)[-1].split(maxsplit=1)[0] test_tag = test_tag.replace("'", "") meta_data_fd.write('$ LANforge test tag\n') - meta_data_fd.write("test_tag {test_tag}\n".format(test_tag=test_tag)) + meta_data_fd.write( + "test_tag {test_tag}\n".format( + test_tag=test_tag)) # LANforge information is a list thus [0] meta_data_fd.write('$ LANforge Information\n') meta_data_fd.write( @@ -839,7 +972,9 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) stderr_log_size = os.path.getsize(stderr_log_txt) if stderr_log_size > 0: - self.logger.info("File: {} is not empty: {}".format(stderr_log_txt, str(stderr_log_size))) + self.logger.info( + "File: {} is not empty: {}".format( + stderr_log_txt, str(stderr_log_size))) text = open(stderr_log_txt).read() if 'Error' in text: self.text_result = "Failure" @@ -848,7 +983,9 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) self.text_result = "Success" background = self.background_green else: - self.logger.info("File: {} is empty: {}".format(stderr_log_txt, str(stderr_log_size))) + self.logger.info( + "File: {} is empty: {}".format( + stderr_log_txt, str(stderr_log_size))) self.test_result = "Success" background = self.background_green @@ -859,7 +996,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) if 'ERROR: Could not find component: TestTag' in text: self.test_result = "Success" background = self.background_green - # leave the space in after error to not pick up tx errors or rx errors + # leave the space in after error to not pick up tx + # errors or rx errors elif 'error ' in text.lower(): self.test_result = "Test Fail" background = self.background_orange @@ -873,7 +1011,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) # if there was a if self.test_result == "TIMEOUT": - self.logger.info("TIMEOUT FAILURE, Check LANforge Radios") + self.logger.info( + "TIMEOUT FAILURE, Check LANforge Radios") self.test_result = "Time Out" background = self.background_purple @@ -882,14 +1021,20 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) for line in line_list: if 'html report:' in line: self.qa_report_html = line - print("html_report: {report}".format(report=self.qa_report_html)) + print( + "html_report: {report}".format( + report=self.qa_report_html)) break - self.qa_report_html = self.qa_report_html.replace('html report: ', '') + self.qa_report_html = self.qa_report_html.replace( + 'html report: ', '') - # stdout_log_link is used for the email reporting to have the corrected path - stdout_log_link = str(stdout_log_txt).replace('/home/lanforge', '') - stderr_log_link = str(stderr_log_txt).replace('/home/lanforge', '') + # stdout_log_link is used for the email reporting to have + # the corrected path + stdout_log_link = str(stdout_log_txt).replace( + '/home/lanforge', '') + stderr_log_link = str(stderr_log_txt).replace( + '/home/lanforge', '') if command.find(' ') > 1: short_cmd = command[0:command.find(' ')] else: @@ -913,7 +1058,8 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip, qa_url=qa_url) self.html_results += """