Better output when user types in a wrong port in lf_check.py

Signed-off-by: Matthew Stidham <stidmatt@gmail.com>
This commit is contained in:
Matthew Stidham
2021-08-06 09:19:42 -07:00
parent aa3517897e
commit 6a95930a09
2 changed files with 257 additions and 176 deletions

View File

@@ -206,10 +206,10 @@ class GhostRequest:
authors, authors,
title='custom'): title='custom'):
self.upload_images(folder) self.upload_images(folder)
head = '''<p>This is a custom post created via a script</p>''' head = '''This is a custom post created via a script'''
for picture in self.images: for picture in self.images:
head = head + '<img src="%s"></img>' % picture head = head + '<img src="%s"></img>' % picture
head = head + '''<p>This is the end of the example</p>''' head = head + '''This is the end of the example'''
self.create_post(title=title, self.create_post(title=title,
text=head) text=head)
@@ -492,7 +492,11 @@ class GhostRequest:
influxdb.post_to_influx(short_description, numeric_score, tags, date) influxdb.post_to_influx(short_description, numeric_score, tags, date)
except Exception as err: except Exception as err:
influx_error = err influx_error = err
text += ('<p style="color:red;">InfluxDB Error: %s</p><br />' % influx_error) text += '''InfluxDB Error: %s<br />
Influx Host: %s<br />
Influx Port: %s<br />
Influx Organization: %s<br />
Influx Bucket: %s<br />''' % (influx_error, self.influx_host, self.influx_port, self.influx_org, self.influx_bucket)
raw_test_tags = list() raw_test_tags = list()
test_tag_table = '' test_tag_table = ''
@@ -578,7 +582,11 @@ class GhostRequest:
grafana_host, snapshot['key'], '%') grafana_host, snapshot['key'], '%')
except Exception as err: except Exception as err:
grafana_error = err grafana_error = err
text = text + '<p style="color:red;">Grafana Error: %s</p><br />' % grafana_error text = text + '''Grafana Error: %s<br />
Grafana credentials:<br />
Grafana Host: %s<br />
Grafana Bucket: %s<br />
Grafana Database: %s<br />''' % (grafana_error, grafana_host, grafana_bucket, grafana_datasource)
text = text + 'Low priority results: %s' % csvreader.to_html(low_priority) text = text + 'Low priority results: %s' % csvreader.to_html(low_priority)

View File

@@ -63,7 +63,8 @@ Starting LANforge:
import datetime import datetime
import pprint import pprint
import sys import sys
if sys.version_info[0] != 3:
if sys.version_info[0] != 3:
print("This script requires Python3") print("This script requires Python3")
exit() exit()
@@ -78,38 +79,39 @@ import configparser
import subprocess import subprocess
import csv import csv
import shutil import shutil
from os import path
import shlex import shlex
import paramiko import paramiko
import pandas as pd import pandas as pd
# lf_report is from the parent of the current file # lf_report is from the parent of the current file
dir_path = os.path.dirname(os.path.realpath(__file__)) dir_path = os.path.dirname(os.path.realpath(__file__))
parent_dir_path = os.path.abspath(os.path.join(dir_path,os.pardir)) parent_dir_path = os.path.abspath(os.path.join(dir_path, os.pardir))
sys.path.insert(0, parent_dir_path) sys.path.insert(0, parent_dir_path)
from lf_report import lf_report from lf_report import lf_report
sys.path.append('/') sys.path.append('/')
# setup logging FORMAT # setup logging FORMAT
FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s' FORMAT = '%(asctime)s %(name)s %(levelname)s: %(message)s'
# lf_check class contains verificaiton configuration and ocastrates the testing. # lf_check class contains verificaiton configuration and ocastrates the testing.
class lf_check(): class lf_check():
def __init__(self, def __init__(self,
_use_json, _use_json,
_config_ini, _config_ini,
_json_data, _json_data,
_test_suite, _test_suite,
_production, _production,
_csv_results, _csv_results,
_outfile, _outfile,
_report_path): _report_path):
self.use_json = _use_json self.use_json = _use_json
self.json_data = _json_data self.json_data = _json_data
self.config_ini = _config_ini self.config_ini = _config_ini
self.test_suite = _test_suite self.test_suite = _test_suite
self.production_run = _production self.production_run = _production
self.report_path = _report_path self.report_path = _report_path
self.radio_dict = {} self.radio_dict = {}
self.test_dict = {} self.test_dict = {}
@@ -119,7 +121,7 @@ class lf_check():
self.results = "" self.results = ""
self.outfile = _outfile self.outfile = _outfile
self.test_result = "Failure" self.test_result = "Failure"
self.results_col_titles = ["Test","Command","Result","STDOUT","STDERR"] self.results_col_titles = ["Test", "Command", "Result", "STDOUT", "STDERR"]
self.html_results = "" self.html_results = ""
self.background_green = "background-color:green" self.background_green = "background-color:green"
self.background_red = "background-color:red" self.background_red = "background-color:red"
@@ -160,18 +162,18 @@ class lf_check():
self.lf_mgr_ip = "192.168.0.102" self.lf_mgr_ip = "192.168.0.102"
self.lf_mgr_port = "" self.lf_mgr_port = ""
self.lf_mgr_user = "lanforge" self.lf_mgr_user = "lanforge"
self.lf_mgr_pass = "lanforge" self.lf_mgr_pass = "lanforge"
# dut configuration # dut configuration
self.dut_name = "DUT_NAME_NOT_SET" # "ASUSRT-AX88U" note this is not dut_set_name self.dut_name = "DUT_NAME_NOT_SET" # "ASUSRT-AX88U" note this is not dut_set_name
self.dut_hw = "DUT_HW_NOT_SET" self.dut_hw = "DUT_HW_NOT_SET"
self.dut_sw = "DUT_SW_NOT_SET" self.dut_sw = "DUT_SW_NOT_SET"
self.dut_model = "DUT_MODEL_NOT_SET" self.dut_model = "DUT_MODEL_NOT_SET"
self.dut_serial = "DUT_SERIAL_NOT_SET" self.dut_serial = "DUT_SERIAL_NOT_SET"
self.dut_bssid_2g = "BSSID_2G_NOT_SET" #"3c:7c:3f:55:4d:64" - this is the mac for the 2.4G radio this may be seen with a scan self.dut_bssid_2g = "BSSID_2G_NOT_SET" # "3c:7c:3f:55:4d:64" - this is the mac for the 2.4G radio this may be seen with a scan
self.dut_bssid_5g = "BSSID_5G_NOT_SET" #"3c:7c:3f:55:4d:64" - this is the mac for the 5G radio this may be seen with a scan self.dut_bssid_5g = "BSSID_5G_NOT_SET" # "3c:7c:3f:55:4d:64" - this is the mac for the 5G radio this may be seen with a scan
self.dut_bssid_6g = "BSSID_6G_NOT_SET" #"3c:7c:3f:55:4d:64" - this is the mac for the 6G radio this may be seen with a scan self.dut_bssid_6g = "BSSID_6G_NOT_SET" # "3c:7c:3f:55:4d:64" - this is the mac for the 6G radio this may be seen with a scan
#NOTE: My influx token is unlucky and starts with a '-', but using the syntax below # with '=' right after the argument keyword works as hoped. # NOTE: My influx token is unlucky and starts with a '-', but using the syntax below # with '=' right after the argument keyword works as hoped.
# --influx_token= # --influx_token=
# DUT , Test rig must match testbed # DUT , Test rig must match testbed
@@ -179,20 +181,19 @@ class lf_check():
# database configuration # database # database configuration # database
self.database_json = "" self.database_json = ""
self.database_config = "True" # default to False once testing done self.database_config = "True" # default to False once testing done
self.database_host = "192.168.100.201" #"c7-grafana.candelatech.com" # influx and grafana have the same host "192.168.100.201" self.database_host = "192.168.100.201" # "c7-grafana.candelatech.com" # influx and grafana have the same host "192.168.100.201"
self.database_port = "8086" self.database_port = "8086"
self.database_token = "-u_Wd-L8o992701QF0c5UmqEp7w7Z7YOMaWLxOMgmHfATJGnQbbmYyNxHBR9PgD6taM_tcxqJl6U8DjU1xINFQ==" self.database_token = "-u_Wd-L8o992701QF0c5UmqEp7w7Z7YOMaWLxOMgmHfATJGnQbbmYyNxHBR9PgD6taM_tcxqJl6U8DjU1xINFQ=="
self.database_org = "Candela" self.database_org = "Candela"
self.database_bucket = "lanforge_qa_testing" self.database_bucket = "lanforge_qa_testing"
self.database_tag = 'testbed CT-US-001' # the test_rig needs to match self.database_tag = 'testbed CT-US-001' # the test_rig needs to match
self.dut_set_name = 'DUT_NAME ASUSRT-AX88U' # note the name will be set as --set DUT_NAME ASUSRT-AX88U, this is not dut_name (see above) self.dut_set_name = 'DUT_NAME ASUSRT-AX88U' # note the name will be set as --set DUT_NAME ASUSRT-AX88U, this is not dut_name (see above)
# grafana configuration #dashboard # grafana configuration #dashboard
self.dashboard_json = "" self.dashboard_json = ""
self.dashboard_config = "True" # default to False once testing done self.dashboard_config = "True" # default to False once testing done
self.dashboard_host = "192.168.100.201" #"c7-grafana.candelatech.com" # 192.168.100.201 self.dashboard_host = "192.168.100.201" # "c7-grafana.candelatech.com" # 192.168.100.201
self.dashboard_token = "eyJrIjoiS1NGRU8xcTVBQW9lUmlTM2dNRFpqNjFqV05MZkM0dzciLCJuIjoibWF0dGhldyIsImlkIjoxfQ==" self.dashboard_token = "eyJrIjoiS1NGRU8xcTVBQW9lUmlTM2dNRFpqNjFqV05MZkM0dzciLCJuIjoibWF0dGhldyIsImlkIjoxfQ=="
# ghost configuration # ghost configuration
@@ -208,23 +209,42 @@ class lf_check():
self.test_run = "" self.test_run = ""
def ping(self):
queries = dict()
queries['Lanforge Manager'] = self.lf_mgr_ip
queries['Blog Host'] = self.blog_host
queries['Influx Host'] = self.database_host
queries['Grafana Host'] = self.dashboard_host
results = dict()
for key, value in queries.items():
ping = subprocess.Popen(
["ping", "-c", "4", value],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
out, error = ping.communicate()
results[key] = [str(out), value]
return results
def get_scripts_git_sha(self): def get_scripts_git_sha(self):
# get git sha # get git sha
process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE) process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE)
(commit_hash, err) = process.communicate() (commit_hash, err) = process.communicate()
exit_code = process.wait() exit_code = process.wait()
scripts_git_sha = commit_hash.decode('utf-8','ignore') scripts_git_sha = commit_hash.decode('utf-8', 'ignore')
return scripts_git_sha return scripts_git_sha
def get_lanforge_node_version(self): def get_lanforge_node_version(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
#ssh.connect(self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, banner_timeout=600) # ssh.connect(self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, banner_timeout=600)
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('uname -n') stdin, stdout, stderr = ssh.exec_command('uname -n')
lanforge_node_version = stdout.readlines() lanforge_node_version = stdout.readlines()
# print('\n'.join(output)) # print('\n'.join(output))
lanforge_node_version =[line.replace('\n','') for line in lanforge_node_version] lanforge_node_version = [line.replace('\n', '') for line in lanforge_node_version]
ssh.close() ssh.close()
time.sleep(1) time.sleep(1)
return lanforge_node_version return lanforge_node_version
@@ -232,12 +252,13 @@ class lf_check():
def get_lanforge_kernel_version(self): def get_lanforge_kernel_version(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
#ssh.connect(self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, banner_timeout=600) # ssh.connect(self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, banner_timeout=600)
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('uname -r') stdin, stdout, stderr = ssh.exec_command('uname -r')
lanforge_kernel_version = stdout.readlines() lanforge_kernel_version = stdout.readlines()
# print('\n'.join(output)) # print('\n'.join(output))
lanforge_kernel_version =[line.replace('\n','') for line in lanforge_kernel_version] lanforge_kernel_version = [line.replace('\n', '') for line in lanforge_kernel_version]
ssh.close() ssh.close()
time.sleep(1) time.sleep(1)
return lanforge_kernel_version return lanforge_kernel_version
@@ -246,49 +267,51 @@ class lf_check():
output = "" output = ""
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, allow_agent=False, look_for_keys=False, banner_timeout=600) ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('./btserver --version | grep Version') stdin, stdout, stderr = ssh.exec_command('./btserver --version | grep Version')
lanforge_gui_version = stdout.readlines() lanforge_gui_version = stdout.readlines()
# print('\n'.join(output)) # print('\n'.join(output))
lanforge_gui_version =[line.replace('\n','') for line in lanforge_gui_version] lanforge_gui_version = [line.replace('\n', '') for line in lanforge_gui_version]
ssh.close() ssh.close()
time.sleep(1) time.sleep(1)
return lanforge_gui_version return lanforge_gui_version
# NOT complete : will send the email results # NOT complete : will send the email results
def send_results_email(self, report_file=None): def send_results_email(self, report_file=None):
if (report_file is None): if (report_file is None):
print( "No report file, not sending email.") print("No report file, not sending email.")
return return
report_url=report_file.replace('/home/lanforge/', '') report_url = report_file.replace('/home/lanforge/', '')
if report_url.startswith('/'): if report_url.startswith('/'):
report_url = report_url[1:] report_url = report_url[1:]
# following recommendation # following recommendation
# NOTE: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-from-nic-in-python # NOTE: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-from-nic-in-python
# Mail # Mail
# command to check if mail running : systemctl status postfix # command to check if mail running : systemctl status postfix
#command = 'echo "$HOSTNAME mail system works!" | mail -s "Test: $HOSTNAME $(date)" chuck.rekiere@candelatech.com' # command = 'echo "$HOSTNAME mail system works!" | mail -s "Test: $HOSTNAME $(date)" chuck.rekiere@candelatech.com'
hostname = socket.gethostname() hostname = socket.gethostname()
ip = socket.gethostbyname(hostname) ip = socket.gethostbyname(hostname)
if(self.email_txt != ""): if (self.email_txt != ""):
message_txt = """{email_txt} lanforge target {lf_mgr_ip} message_txt = """{email_txt} lanforge target {lf_mgr_ip}
Results from {hostname}: Results from {hostname}:
http://{ip}/{report} http://{ip}/{report}
Blog: Blog:
http://{blog}:2368 http://{blog}:2368
NOTE: for now to see stdout and stderr remove /home/lanforge from path. NOTE: for now to see stdout and stderr remove /home/lanforge from path.
""".format(hostname=hostname, ip=ip, report=report_url, email_txt=self.email_txt, lf_mgr_ip=self.lf_mgr_ip,blog=self.blog_host) """.format(hostname=hostname, ip=ip, report=report_url, email_txt=self.email_txt, lf_mgr_ip=self.lf_mgr_ip,
blog=self.blog_host)
else: else:
message_txt = """Results from {hostname}: message_txt = """Results from {hostname}:
http://{ip}/{report} http://{ip}/{report}
Blog: Blog:
blog: http://{blog}:2368 blog: http://{blog}:2368
""".format(hostname=hostname, ip=ip, report=report_url,blog=self.blog_host) """.format(hostname=hostname, ip=ip, report=report_url, blog=self.blog_host)
if(self.email_title_txt != ""): if (self.email_title_txt != ""):
mail_subject = "{} [{hostname}] {date}".format(self.email_title_txt,hostname=hostname, date=datetime.datetime.now()) mail_subject = "{} [{hostname}] {date}".format(self.email_title_txt, hostname=hostname,
date=datetime.datetime.now())
else: else:
mail_subject = "Regression Test [{hostname}] {date}".format(hostname=hostname, date=datetime.datetime.now()) mail_subject = "Regression Test [{hostname}] {date}".format(hostname=hostname, date=datetime.datetime.now())
try: try:
@@ -305,11 +328,12 @@ blog: http://{blog}:2368
command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format( command = "echo \"{message}\" | mail -s \"{subject}\" {address}".format(
message=msg, message=msg,
subject=mail_subject, subject=mail_subject,
ip=ip, #self.host_ip_test, ip=ip, # self.host_ip_test,
address=self.email_list_test) address=self.email_list_test)
print("running:[{}]".format(command)) print("running:[{}]".format(command))
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
# have email on separate timeout # have email on separate timeout
process.wait(timeout=int(self.test_timeout)) process.wait(timeout=int(self.test_timeout))
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
@@ -323,7 +347,7 @@ blog: http://{blog}:2368
print("self.csv_results") print("self.csv_results")
self.csv_results_file = open(self.csv_results, "w") self.csv_results_file = open(self.csv_results, "w")
self.csv_results_writer = csv.writer(self.csv_results_file, delimiter=",") self.csv_results_writer = csv.writer(self.csv_results_file, delimiter=",")
self.csv_results_column_headers = ['Test','Command','Result','STDOUT','STDERR'] self.csv_results_column_headers = ['Test', 'Command', 'Result', 'STDOUT', 'STDERR']
self.csv_results_writer.writerow(self.csv_results_column_headers) self.csv_results_writer.writerow(self.csv_results_column_headers)
self.csv_results_file.flush() self.csv_results_file.flush()
@@ -354,7 +378,6 @@ blog: http://{blog}:2368
<br> <br>
""" """
def read_config(self): def read_config(self):
if self.use_json: if self.use_json:
self.read_config_json() self.read_config_json()
@@ -364,10 +387,10 @@ blog: http://{blog}:2368
# there is probably a more efficient way to do this in python # there is probably a more efficient way to do this in python
# Keeping it obvious for now, may be refactored later # Keeping it obvious for now, may be refactored later
def read_config_json(self): def read_config_json(self):
#self.logger.info("read_config_json_contents {}".format(self.json_data)) # self.logger.info("read_config_json_contents {}".format(self.json_data))
if "test_parameters" in self.json_data: if "test_parameters" in self.json_data:
self.logger.info("json: read test_parameters") self.logger.info("json: read test_parameters")
#self.logger.info("test_parameters {}".format(self.json_data["test_parameters"])) # self.logger.info("test_parameters {}".format(self.json_data["test_parameters"]))
self.read_test_parameters() self.read_test_parameters()
else: else:
self.logger.info("EXITING test_parameters not in json {}".format(self.json_data)) self.logger.info("EXITING test_parameters not in json {}".format(self.json_data))
@@ -375,7 +398,7 @@ blog: http://{blog}:2368
if "test_network" in self.json_data: if "test_network" in self.json_data:
self.logger.info("json: read test_network") self.logger.info("json: read test_network")
#self.logger.info("test_network {}".format(self.json_data["test_network"])) # self.logger.info("test_network {}".format(self.json_data["test_network"]))
self.read_test_network() self.read_test_network()
else: else:
self.logger.info("EXITING test_network not in json {}".format(self.json_data)) self.logger.info("EXITING test_network not in json {}".format(self.json_data))
@@ -383,28 +406,28 @@ blog: http://{blog}:2368
if "test_database" in self.json_data: if "test_database" in self.json_data:
self.logger.info("json: read test_database") self.logger.info("json: read test_database")
#self.logger.info("test_database {}".format(self.json_data["test_database"])) # self.logger.info("test_database {}".format(self.json_data["test_database"]))
self.read_test_database() self.read_test_database()
else: else:
self.logger.info("NOTE: test_database not found in json") self.logger.info("NOTE: test_database not found in json")
if "test_dashboard" in self.json_data: if "test_dashboard" in self.json_data:
self.logger.info("json: read test_dashboard") self.logger.info("json: read test_dashboard")
#self.logger.info("test_dashboard {}".format(self.json_data["test_dashboard"])) # self.logger.info("test_dashboard {}".format(self.json_data["test_dashboard"]))
self.read_test_dashboard() self.read_test_dashboard()
else: else:
self.logger.info("NOTE: test_dashboard not found in json") self.logger.info("NOTE: test_dashboard not found in json")
if "test_blog" in self.json_data: if "test_blog" in self.json_data:
self.logger.info("json: read test_blog") self.logger.info("json: read test_blog")
#self.logger.info("test_blog {}".format(self.json_data["test_blog"])) # self.logger.info("test_blog {}".format(self.json_data["test_blog"]))
self.read_test_blog() self.read_test_blog()
else: else:
self.logger.info("NOTE: test_blog not found in json") self.logger.info("NOTE: test_blog not found in json")
if "test_generic" in self.json_data: if "test_generic" in self.json_data:
self.logger.info("json: read test_generic") self.logger.info("json: read test_generic")
#self.logger.info("test_generic {}".format(self.json_data["test_generic"])) # self.logger.info("test_generic {}".format(self.json_data["test_generic"]))
self.read_test_generic() self.read_test_generic()
else: else:
self.logger.info("EXITING test_generic not in json {}".format(self.json_data)) self.logger.info("EXITING test_generic not in json {}".format(self.json_data))
@@ -412,7 +435,7 @@ blog: http://{blog}:2368
if "radio_dict" in self.json_data: if "radio_dict" in self.json_data:
self.logger.info("json: read radio_dict") self.logger.info("json: read radio_dict")
#self.logger.info("radio_dict {}".format(self.json_data["radio_dict"])) # self.logger.info("radio_dict {}".format(self.json_data["radio_dict"]))
self.radio_dict = self.json_data["radio_dict"] self.radio_dict = self.json_data["radio_dict"]
self.logger.info("self.radio_dict {}".format(self.radio_dict)) self.logger.info("self.radio_dict {}".format(self.radio_dict))
else: else:
@@ -421,12 +444,14 @@ blog: http://{blog}:2368
if "test_suites" in self.json_data: if "test_suites" in self.json_data:
self.logger.info("json: read test_suites looking for: {}".format(self.test_suite)) self.logger.info("json: read test_suites looking for: {}".format(self.test_suite))
#self.logger.info("test_suites {}".format(self.json_data["test_suites"])) # self.logger.info("test_suites {}".format(self.json_data["test_suites"]))
if self.test_suite in self.json_data["test_suites"]: if self.test_suite in self.json_data["test_suites"]:
self.test_dict = self.json_data["test_suites"][self.test_suite] self.test_dict = self.json_data["test_suites"][self.test_suite]
#self.logger.info("self.test_dict {}".format(self.test_dict)) # self.logger.info("self.test_dict {}".format(self.test_dict))
else: else:
self.logger.info("EXITING test_suite {} Not Present in json test_suites: {}".format(self.test_suite, self.json_data["test_suites"])) self.logger.info("EXITING test_suite {} Not Present in json test_suites: {}".format(self.test_suite,
self.json_data[
"test_suites"]))
exit(1) exit(1)
else: else:
self.logger.info("EXITING test_suites not in json {}".format(self.json_data)) self.logger.info("EXITING test_suites not in json {}".format(self.json_data))
@@ -681,7 +706,7 @@ blog: http://{blog}:2368
# functions in this section are/can be overridden by descendants # functions in this section are/can be overridden by descendants
# this code reads the lf_check_config.ini file to populate the test variables # this code reads the lf_check_config.ini file to populate the test variables
def read_config_ini(self): def read_config_ini(self):
#self.logger.info("read_config_ini_contents {}".format(self.config_ini)) # self.logger.info("read_config_ini_contents {}".format(self.config_ini))
config_file = configparser.ConfigParser() config_file = configparser.ConfigParser()
success = True success = True
success = config_file.read(self.config_ini) success = config_file.read(self.config_ini)
@@ -743,25 +768,28 @@ blog: http://{blog}:2368
section = config_file[self.test_suite] section = config_file[self.test_suite]
# for json replace the \n and \r they are invalid json characters, allows for multiple line args # for json replace the \n and \r they are invalid json characters, allows for multiple line args
try: try:
self.test_dict = json.loads(section.get('TEST_DICT', self.test_dict).replace('\n',' ').replace('\r',' ')) self.test_dict = json.loads(
self.logger.info("{}: {}".format(self.test_suite,self.test_dict)) section.get('TEST_DICT', self.test_dict).replace('\n', ' ').replace('\r', ' '))
self.logger.info("{}: {}".format(self.test_suite, self.test_dict))
except: except:
self.logger.info("Excpetion loading {}, is there comma after the last entry? Check syntax".format(self.test_suite)) self.logger.info(
"Exception loading {}, is there comma after the last entry? Check syntax".format(self.test_suite))
else: else:
self.logger.info("EXITING... NOT FOUND Test Suite with name : {}".format(self.test_suite)) self.logger.info("EXITING... NOT FOUND Test Suite with name : {}".format(self.test_suite))
exit(1) exit(1)
def load_factory_default_db(self): def load_factory_default_db(self):
#self.logger.info("file_wd {}".format(self.scripts_wd)) # self.logger.info("file_wd {}".format(self.scripts_wd))
try: try:
os.chdir(self.scripts_wd) os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd())) # self.logger.info("Current Working Directory {}".format(os.getcwd()))
except: except:
self.logger.info("failed to change to {}".format(self.scripts_wd)) self.logger.info("failed to change to {}".format(self.scripts_wd))
# no spaces after FACTORY_DFLT # no spaces after FACTORY_DFLT
command = "./{} {}".format("scenario.py", "--load FACTORY_DFLT") command = "./{} {}".format("scenario.py", "--load FACTORY_DFLT")
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
# wait for the process to terminate # wait for the process to terminate
out, err = process.communicate() out, err = process.communicate()
errcode = process.returncode errcode = process.returncode
@@ -775,9 +803,10 @@ blog: http://{blog}:2368
# no spaces after FACTORY_DFLT # no spaces after FACTORY_DFLT
command = "./{} {}".format("scenario.py", "--load BLANK") command = "./{} {}".format("scenario.py", "--load BLANK")
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
def load_custom_db(self,custom_db): def load_custom_db(self, custom_db):
try: try:
os.chdir(self.scripts_wd) os.chdir(self.scripts_wd)
except: except:
@@ -785,7 +814,8 @@ blog: http://{blog}:2368
# no spaces after FACTORY_DFLT # no spaces after FACTORY_DFLT
command = "./{} {}".format("scenario.py", "--load {}".format(custom_db)) command = "./{} {}".format("scenario.py", "--load {}".format(custom_db))
process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) process = subprocess.Popen((command).split(' '), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
# wait for the process to terminate # wait for the process to terminate
out, err = process.communicate() out, err = process.communicate()
errcode = process.returncode errcode = process.returncode
@@ -803,7 +833,9 @@ blog: http://{blog}:2368
# list does not have replace only stings do to args_list will be joined and converted to a string and placed # list does not have replace only stings do to args_list will be joined and converted to a string and placed
# in args. Then the replace below will work. # in args. Then the replace below will work.
if self.test_dict[test]['args'] == "": if self.test_dict[test]['args'] == "":
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'],''.join(self.test_dict[test]['args_list'])) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.test_dict[test]['args'],
''.join(self.test_dict[test][
'args_list']))
# Configure Tests # Configure Tests
# loop through radios # loop through radios
for radio in self.radio_dict: for radio in self.radio_dict:
@@ -811,102 +843,124 @@ blog: http://{blog}:2368
# not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues # not "KEY" is just a word to refer to the RADIO define (e.g. RADIO_0_CFG) to get the vlaues
# --num_stations needs to be int not string (no double quotes) # --num_stations needs to be int not string (no double quotes)
if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']: if self.radio_dict[radio]["KEY"] in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(self.radio_dict[radio]["KEY"],'--radio {} --ssid {} --passwd {} --security {} --num_stations {}' self.test_dict[test]['args'] = self.test_dict[test]['args'].replace(
.format(self.radio_dict[radio]['RADIO'],self.radio_dict[radio]['SSID'],self.radio_dict[radio]['PASSWD'],self.radio_dict[radio]['SECURITY'],self.radio_dict[radio]['STATIONS'])) self.radio_dict[radio]["KEY"],
'--radio {} --ssid {} --passwd {} --security {} --num_stations {}'
.format(self.radio_dict[radio]['RADIO'], self.radio_dict[radio]['SSID'],
self.radio_dict[radio]['PASSWD'], self.radio_dict[radio]['SECURITY'],
self.radio_dict[radio]['STATIONS']))
if 'HTTP_TEST_IP' in self.test_dict[test]['args']: if 'HTTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP',self.http_test_ip) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('HTTP_TEST_IP',
self.http_test_ip)
if 'FTP_TEST_IP' in self.test_dict[test]['args']: if 'FTP_TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP',self.ftp_test_ip) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('FTP_TEST_IP', self.ftp_test_ip)
if 'TEST_IP' in self.test_dict[test]['args']: if 'TEST_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP',self.test_ip) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_IP', self.test_ip)
if 'LF_MGR_IP' in self.test_dict[test]['args']: if 'LF_MGR_IP' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_IP',self.lf_mgr_ip) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_IP', self.lf_mgr_ip)
if 'LF_MGR_PORT' in self.test_dict[test]['args']: if 'LF_MGR_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PORT',self.lf_mgr_port) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('LF_MGR_PORT', self.lf_mgr_port)
if 'DUT_NAME' in self.test_dict[test]['args']: if 'DUT_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_NAME',self.dut_name) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_NAME', self.dut_name)
if 'DUT_HW' in self.test_dict[test]['args']: if 'DUT_HW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_HW',self.dut_hw) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_HW', self.dut_hw)
if 'DUT_SW' in self.test_dict[test]['args']: if 'DUT_SW' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SW',self.dut_sw) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SW', self.dut_sw)
if 'DUT_MODEL' in self.test_dict[test]['args']: if 'DUT_MODEL' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_MODEL',self.dut_model) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_MODEL', self.dut_model)
if 'DUT_SERIAL' in self.test_dict[test]['args']: if 'DUT_SERIAL' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SERIAL',self.dut_serial) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SERIAL', self.dut_serial)
if 'DUT_BSSID_2G' in self.test_dict[test]['args']: if 'DUT_BSSID_2G' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_2G',self.dut_bssid_2g) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_2G',
self.dut_bssid_2g)
if 'DUT_BSSID_5G' in self.test_dict[test]['args']: if 'DUT_BSSID_5G' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_5G',self.dut_bssid_5g) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_5G',
self.dut_bssid_5g)
if 'DUT_BSSID_6G' in self.test_dict[test]['args']: if 'DUT_BSSID_6G' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_6G',self.dut_bssid_6g) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_BSSID_6G',
self.dut_bssid_6g)
if 'RADIO_USED' in self.test_dict[test]['args']: if 'RADIO_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('RADIO_USED',self.radio_lf) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('RADIO_USED', self.radio_lf)
if 'SSID_USED' in self.test_dict[test]['args']: if 'SSID_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_USED',self.ssid) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_USED', self.ssid)
if 'SSID_PW_USED' in self.test_dict[test]['args']: if 'SSID_PW_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_PW_USED',self.ssid_pw) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SSID_PW_USED', self.ssid_pw)
if 'SECURITY_USED' in self.test_dict[test]['args']: if 'SECURITY_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_USED',self.security) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_USED', self.security)
if 'NUM_STA' in self.test_dict[test]['args']: if 'NUM_STA' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA',self.num_sta) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA', self.num_sta)
if 'COL_NAMES' in self.test_dict[test]['args']: if 'COL_NAMES' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES',self.col_names) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES', self.col_names)
if 'UPSTREAM_PORT' in self.test_dict[test]['args']: if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',self.upstream_port) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',
self.upstream_port)
# lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location # lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location
# of the reports when the reports are pulled. # of the reports when the reports are pulled.
if 'REPORT_PATH' in self.test_dict[test]['args']: if 'REPORT_PATH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('REPORT_PATH',self.report_path) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('REPORT_PATH', self.report_path)
# The TEST_BED is the database tag # The TEST_BED is the database tag
if 'TEST_BED' in self.test_dict[test]['args']: if 'TEST_BED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_BED',self.database_tag) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_BED', self.database_tag)
# database configuration # database configuration
if 'DATABASE_HOST' in self.test_dict[test]['args']: if 'DATABASE_HOST' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_HOST',self.database_host) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_HOST',
self.database_host)
if 'DATABASE_PORT' in self.test_dict[test]['args']: if 'DATABASE_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_PORT',self.database_port) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_PORT',
self.database_port)
if 'DATABASE_TOKEN' in self.test_dict[test]['args']: if 'DATABASE_TOKEN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TOKEN',self.database_token) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TOKEN',
self.database_token)
if 'DATABASE_ORG' in self.test_dict[test]['args']: if 'DATABASE_ORG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_ORG',self.database_org) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_ORG',
self.database_org)
if 'DATABASE_BUCKET' in self.test_dict[test]['args']: if 'DATABASE_BUCKET' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_BUCKET',self.database_bucket) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_BUCKET',
self.database_bucket)
if 'DATABASE_TAG' in self.test_dict[test]['args']: if 'DATABASE_TAG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TAG',self.database_tag) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DATABASE_TAG',
self.database_tag)
if 'DUT_SET_NAME' in self.test_dict[test]['args']: if 'DUT_SET_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME',self.dut_set_name) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME',
self.dut_set_name)
if 'TEST_RIG' in self.test_dict[test]['args']: if 'TEST_RIG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_RIG',self.test_rig) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_RIG', self.test_rig)
# dashboard configuration # dashboard configuration
if 'DASHBOARD_HOST' in self.test_dict[test]['args']: if 'DASHBOARD_HOST' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_HOST',self.dashboard_host) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_HOST',
self.dashboard_host)
if 'DASHBOARD_TOKEN' in self.test_dict[test]['args']: if 'DASHBOARD_TOKEN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_TOKEN',self.dashboard_token) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DASHBOARD_TOKEN',
self.dashboard_token)
# blog configuration # blog configuration
if 'BLOG_HOST' in self.test_dict[test]['args']: if 'BLOG_HOST' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_HOST',self.blog_host) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_HOST', self.blog_host)
if 'BLOG_TOKEN' in self.test_dict[test]['args']: if 'BLOG_TOKEN' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_TOKEN',self.blog_token) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_TOKEN', self.blog_token)
if 'BLOG_AUTHORS' in self.test_dict[test]['args']: if 'BLOG_AUTHORS' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_AUTHORS',self.blog_authors) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_AUTHORS',
self.blog_authors)
if 'BLOG_CUSTOMER' in self.test_dict[test]['args']: if 'BLOG_CUSTOMER' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_CUSTOMER',self.blog_customer) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_CUSTOMER',
self.blog_customer)
if 'BLOG_USER_PUSH' in self.test_dict[test]['args']: if 'BLOG_USER_PUSH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_USER_PUSH',self.blog_user_push) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_USER_PUSH',
self.blog_user_push)
if 'BLOG_PASSWORD_PUSH' in self.test_dict[test]['args']: if 'BLOG_PASSWORD_PUSH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_PASSWORD_PUSH',self.blog_password_push) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_PASSWORD_PUSH',
self.blog_password_push)
if 'BLOG_FLAG' in self.test_dict[test]['args']: if 'BLOG_FLAG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_FLAG',self.blog_flag) self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_FLAG', self.blog_flag)
if 'timeout' in self.test_dict[test]: if 'timeout' in self.test_dict[test]:
self.logger.info("timeout : {}".format(self.test_dict[test]['timeout'])) self.logger.info("timeout : {}".format(self.test_dict[test]['timeout']))
@@ -916,11 +970,13 @@ blog: http://{blog}:2368
if 'load_db' in self.test_dict[test]: if 'load_db' in self.test_dict[test]:
self.logger.info("load_db : {}".format(self.test_dict[test]['load_db'])) self.logger.info("load_db : {}".format(self.test_dict[test]['load_db']))
if str(self.test_dict[test]['load_db']).lower() != "none" and str(self.test_dict[test]['load_db']).lower() != "skip": if str(self.test_dict[test]['load_db']).lower() != "none" and str(
self.test_dict[test]['load_db']).lower() != "skip":
try: try:
self.load_custom_db(self.test_dict[test]['load_db']) self.load_custom_db(self.test_dict[test]['load_db'])
except: except:
self.logger.info("custom database failed to load check existance and location: {}".format(self.test_dict[test]['load_db'])) self.logger.info("custom database failed to load check existance and location: {}".format(
self.test_dict[test]['load_db']))
else: else:
self.logger.info("no load_db present in dictionary, load db normally") self.logger.info("no load_db present in dictionary, load db normally")
if self.use_factory_default_db == "TRUE": if self.use_factory_default_db == "TRUE":
@@ -935,16 +991,18 @@ blog: http://{blog}:2368
try: try:
self.load_custom_db(self.custom_db) self.load_custom_db(self.custom_db)
sleep(1) sleep(1)
self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db,self.custom_db)) self.logger.info("{} loaded between tests with scenario.py --load {}".format(self.custom_db,
self.custom_db))
except: except:
self.logger.info("custom database failed to load check existance and location: {}".format(self.custom_db)) self.logger.info("custom database failed to load check existance and location: {}".format(
self.custom_db))
else: else:
self.logger.info("no db loaded between tests: {}".format(self.use_custom_db)) self.logger.info("no db loaded between tests: {}".format(self.use_custom_db))
sleep(1) # DO NOT REMOVE the sleep is to allow for the database to stablize sleep(1) # DO NOT REMOVE the sleep is to allow for the database to stablize
try: try:
os.chdir(self.scripts_wd) os.chdir(self.scripts_wd)
#self.logger.info("Current Working Directory {}".format(os.getcwd())) # self.logger.info("Current Working Directory {}".format(os.getcwd()))
except: except:
self.logger.info("failed to change to {}".format(self.scripts_wd)) self.logger.info("failed to change to {}".format(self.scripts_wd))
cmd_args = "{}".format(self.test_dict[test]['args']) cmd_args = "{}".format(self.test_dict[test]['args'])
@@ -955,11 +1013,11 @@ blog: http://{blog}:2368
if self.outfile is not None: if self.outfile is not None:
stdout_log_txt = self.outfile stdout_log_txt = self.outfile
stdout_log_txt = stdout_log_txt + "-{}-stdout.txt".format(test) stdout_log_txt = stdout_log_txt + "-{}-stdout.txt".format(test)
#self.logger.info("stdout_log_txt: {}".format(stdout_log_txt)) # self.logger.info("stdout_log_txt: {}".format(stdout_log_txt))
stdout_log = open(stdout_log_txt, 'a') stdout_log = open(stdout_log_txt, 'a')
stderr_log_txt = self.outfile stderr_log_txt = self.outfile
stderr_log_txt = stderr_log_txt + "-{}-stderr.txt".format(test) stderr_log_txt = stderr_log_txt + "-{}-stderr.txt".format(test)
#self.logger.info("stderr_log_txt: {}".format(stderr_log_txt)) # self.logger.info("stderr_log_txt: {}".format(stderr_log_txt))
stderr_log = open(stderr_log_txt, 'a') stderr_log = open(stderr_log_txt, 'a')
# need to take into account --raw_line parameters thus need to use shlex.split # need to take into account --raw_line parameters thus need to use shlex.split
@@ -968,7 +1026,8 @@ blog: http://{blog}:2368
command_to_run = shlex.split(command_to_run) command_to_run = shlex.split(command_to_run)
print("running {command_to_run}".format(command_to_run=command_to_run)) print("running {command_to_run}".format(command_to_run=command_to_run))
try: try:
process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log, universal_newlines=True) process = subprocess.Popen(command_to_run, shell=False, stdout=stdout_log, stderr=stderr_log,
universal_newlines=True)
# if there is a better solution please propose, the TIMEOUT Result is different then FAIL # if there is a better solution please propose, the TIMEOUT Result is different then FAIL
try: try:
process.wait(timeout=int(self.test_timeout)) process.wait(timeout=int(self.test_timeout))
@@ -980,15 +1039,15 @@ blog: http://{blog}:2368
print("No such file or directory with command: {}".format(command)) print("No such file or directory with command: {}".format(command))
self.logger.info("No such file or directory with command: {}".format(command)) self.logger.info("No such file or directory with command: {}".format(command))
if(self.test_result != "TIMEOUT"): if self.test_result != "TIMEOUT":
stderr_log_size = os.path.getsize(stderr_log_txt) stderr_log_size = os.path.getsize(stderr_log_txt)
if stderr_log_size > 0 : if stderr_log_size > 0:
self.logger.info("File: {} is not empty: {}".format(stderr_log_txt,str(stderr_log_size))) self.logger.info("File: {} is not empty: {}".format(stderr_log_txt, str(stderr_log_size)))
self.test_result = "Failure" self.test_result = "Failure"
background = self.background_red background = self.background_red
else: else:
self.logger.info("File: {} is empty: {}".format(stderr_log_txt,str(stderr_log_size))) self.logger.info("File: {} is empty: {}".format(stderr_log_txt, str(stderr_log_size)))
self.test_result = "Success" self.test_result = "Success"
background = self.background_green background = self.background_green
else: else:
@@ -997,8 +1056,8 @@ blog: http://{blog}:2368
background = self.background_purple background = self.background_purple
# Ghost will put data in stderr # Ghost will put data in stderr
if('ghost' in command): if 'ghost' in command:
if(self.test_result != "TIMEOUT"): if self.test_result != "TIMEOUT":
text = open(stderr_log_txt).read() text = open(stderr_log_txt).read()
if 'Error' in text: if 'Error' in text:
self.test_result = "Failure" self.test_result = "Failure"
@@ -1007,32 +1066,35 @@ blog: http://{blog}:2368
self.test_result = "Success" self.test_result = "Success"
background = self.background_blue background = self.background_blue
# stdout_log_link is used for the email reporting to have the corrected path # stdout_log_link is used for the email reporting to have the corrected path
stdout_log_link = str(stdout_log_txt).replace('/home/lanforge','') stdout_log_link = str(stdout_log_txt).replace('/home/lanforge', '')
stderr_log_link = str(stderr_log_txt).replace('/home/lanforge','') stderr_log_link = str(stderr_log_txt).replace('/home/lanforge', '')
self.html_results += """ self.html_results += """
<tr><td>""" + str(test) + """</td><td class='scriptdetails'>""" + str(command) + """</td> <tr><td>""" + str(test) + """</td><td class='scriptdetails'>""" + str(command) + """</td>
<td style="""+ str(background) + """>""" + str(self.test_result) + """ <td style=""" + str(background) + """>""" + str(self.test_result) + """
<td><a href=""" + str(stdout_log_link) + """ target=\"_blank\">STDOUT</a></td>""" <td><a href=""" + str(stdout_log_link) + """ target=\"_blank\">STDOUT</a></td>"""
if self.test_result == "Failure": if self.test_result == "Failure":
self.html_results += """<td><a href=""" + str(stderr_log_link) + """ target=\"_blank\">STDERR</a></td>""" self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
elif self.test_result == "Time Out": elif self.test_result == "Time Out":
self.html_results += """<td><a href=""" + str(stderr_log_link) + """ target=\"_blank\">STDERR</a></td>""" self.html_results += """<td><a href=""" + str(
stderr_log_link) + """ target=\"_blank\">STDERR</a></td>"""
else: else:
self.html_results += """<td></td>""" self.html_results += """<td></td>"""
self.html_results += """</tr>""" self.html_results += """</tr>"""
row = [test,command,self.test_result,stdout_log_txt,stderr_log_txt] row = [test, command, self.test_result, stdout_log_txt, stderr_log_txt]
self.csv_results_writer.writerow(row) self.csv_results_writer.writerow(row)
self.csv_results_file.flush() self.csv_results_file.flush()
#self.logger.info("row: {}".format(row)) # self.logger.info("row: {}".format(row))
self.logger.info("test: {} executed".format(test)) self.logger.info("test: {} executed".format(test))
else: else:
self.logger.info("enable value {} invalid for test: {}, test skipped".format(self.test_dict[test]['enabled'],test)) self.logger.info(
"enable value {} invalid for test: {}, test skipped".format(self.test_dict[test]['enabled'], test))
self.finish_html_results() self.finish_html_results()
def main(): def main():
# arguments # arguments
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@@ -1055,13 +1117,17 @@ Example :
--------- ---------
''') ''')
parser.add_argument('--ini', help="--ini <config.ini file> default lf_check_config.ini", default="lf_check_config.ini") parser.add_argument('--ini', help="--ini <config.ini file> default lf_check_config.ini",
default="lf_check_config.ini")
parser.add_argument('--json', help="--json <lf_ckeck_config.json file> ", default="lf_check_config.json") parser.add_argument('--json', help="--json <lf_ckeck_config.json file> ", default="lf_check_config.json")
parser.add_argument('--use_json', help="--use_json ", action='store_true') parser.add_argument('--use_json', help="--use_json ", action='store_true')
parser.add_argument('--suite', help="--suite <suite name> default TEST_DICTIONARY", default="TEST_DICTIONARY") parser.add_argument('--suite', help="--suite <suite name> default TEST_DICTIONARY", default="TEST_DICTIONARY")
parser.add_argument('--production', help="--production stores true, sends email results to production email list", action='store_true') parser.add_argument('--production', help="--production stores true, sends email results to production email list",
parser.add_argument('--outfile', help="--outfile <Output Generic Name> used as base name for all files generated", default="") action='store_true')
parser.add_argument('--logfile', help="--logfile <logfile Name> logging for output of lf_check.py script", default="lf_check.log") parser.add_argument('--outfile', help="--outfile <Output Generic Name> used as base name for all files generated",
default="")
parser.add_argument('--logfile', help="--logfile <logfile Name> logging for output of lf_check.py script",
default="lf_check.log")
args = parser.parse_args() args = parser.parse_args()
@@ -1107,32 +1173,32 @@ Example :
_output_pdf="lf-check.pdf") _output_pdf="lf-check.pdf")
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
csv_results = "lf_check{}-{}.csv".format(args.outfile,current_time) csv_results = "lf_check{}-{}.csv".format(args.outfile, current_time)
csv_results = report.file_add_path(csv_results) csv_results = report.file_add_path(csv_results)
outfile = "lf_check-{}-{}".format(args.outfile,current_time) outfile = "lf_check-{}-{}".format(args.outfile, current_time)
outfile_path = report.file_add_path(outfile) outfile_path = report.file_add_path(outfile)
report_path = report.get_report_path() report_path = report.get_report_path()
# lf_check() class created # lf_check() class created
check = lf_check(_use_json = use_json, check = lf_check(_use_json=use_json,
_config_ini = config_ini, _config_ini=config_ini,
_json_data = json_data, _json_data=json_data,
_test_suite = test_suite, _test_suite=test_suite,
_production = production, _production=production,
_csv_results = csv_results, _csv_results=csv_results,
_outfile = outfile_path, _outfile=outfile_path,
_report_path = report_path) _report_path=report_path)
# get git sha # get git sha
process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE) process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE)
(commit_hash, err) = process.communicate() (commit_hash, err) = process.communicate()
exit_code = process.wait() exit_code = process.wait()
git_sha = commit_hash.decode('utf-8','ignore') git_sha = commit_hash.decode('utf-8', 'ignore')
# set up logging # set up logging
logfile = args.logfile[:-4] logfile = args.logfile[:-4]
print("logfile: {}".format(logfile)) print("logfile: {}".format(logfile))
logfile = "{}-{}.log".format(logfile,current_time) logfile = "{}-{}.log".format(logfile, current_time)
logfile = report.file_add_path(logfile) logfile = report.file_add_path(logfile)
print("logfile {}".format(logfile)) print("logfile {}".format(logfile))
formatter = logging.Formatter(FORMAT) formatter = logging.Formatter(FORMAT)
@@ -1141,17 +1207,25 @@ Example :
file_handler = logging.FileHandler(logfile, "w") file_handler = logging.FileHandler(logfile, "w")
file_handler.setFormatter(formatter) file_handler.setFormatter(formatter)
logger.addHandler(file_handler) logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler(sys.stdout)) # allows to logging to file and stdout logger.addHandler(logging.StreamHandler(sys.stdout)) # allows to logging to file and stdout
# logger setup print out sha # logger setup print out sha
logger.info("commit_hash: {}".format(commit_hash)) logger.info("commit_hash: {}".format(commit_hash))
logger.info("commit_hash2: {}".format(commit_hash.decode('utf-8','ignore'))) logger.info("commit_hash2: {}".format(commit_hash.decode('utf-8', 'ignore')))
ping_result = check.ping()
for key, value in ping_result.items():
if 'Destination Host Unreachable' or '100% packet loss' in value:
print(UserWarning('Check your %s IP address, %s is unreachable' % (key, value[1])))
print(UserWarning(value[0]))
else:
print('IP address %s accessible' % value[1])
# read config and run tests # read config and run tests
check.read_config() check.read_config()
check.run_script_test() check.run_script_test()
# get sha and lanforge informaiton for results # get sha and lanforge information for results
# Need to do this after reading the configuration # Need to do this after reading the configuration
try: try:
scripts_git_sha = check.get_scripts_git_sha() scripts_git_sha = check.get_scripts_git_sha()
@@ -1188,7 +1262,7 @@ Example :
report.set_title("LF Check: lf_check.py") report.set_title("LF Check: lf_check.py")
report.build_banner_left() report.build_banner_left()
report.start_content_div2() report.start_content_div2()
report.set_obj_html("Objective","Run QA Tests") report.set_obj_html("Objective", "Run QA Tests")
report.build_objective() report.build_objective()
report.set_text("LANforge") report.set_text("LANforge")
report.build_text() report.build_text()
@@ -1217,7 +1291,7 @@ Example :
# duplicates html_report file up one directory # duplicates html_report file up one directory
lf_check_html_report = parent_report_dir + "/{}.html".format(outfile) lf_check_html_report = parent_report_dir + "/{}.html".format(outfile)
banner_src_png = report_path + "/banner.png" banner_src_png = report_path + "/banner.png"
banner_dest_png = parent_report_dir + "/banner.png" banner_dest_png = parent_report_dir + "/banner.png"
CandelaLogo_src_png = report_path + "/CandelaLogo2-90dpi-200x90-trans.png" CandelaLogo_src_png = report_path + "/CandelaLogo2-90dpi-200x90-trans.png"
CandelaLogo_dest_png = parent_report_dir + "/CandelaLogo2-90dpi-200x90-trans.png" CandelaLogo_dest_png = parent_report_dir + "/CandelaLogo2-90dpi-200x90-trans.png"
@@ -1230,37 +1304,36 @@ Example :
font_src_woff = report_path + "/CenturyGothic.woff" font_src_woff = report_path + "/CenturyGothic.woff"
font_dest_woff = parent_report_dir + "/CenturyGothic.woff" font_dest_woff = parent_report_dir + "/CenturyGothic.woff"
#pprint.pprint([ # pprint.pprint([
# ('banner_src', banner_src_png), # ('banner_src', banner_src_png),
# ('banner_dest', banner_dest_png), # ('banner_dest', banner_dest_png),
# ('CandelaLogo_src_png', CandelaLogo_src_png), # ('CandelaLogo_src_png', CandelaLogo_src_png),
# ('CandelaLogo_dest_png', CandelaLogo_dest_png), # ('CandelaLogo_dest_png', CandelaLogo_dest_png),
# ('report_src_css', report_src_css), # ('report_src_css', report_src_css),
# ('custom_src_css', custom_src_css) # ('custom_src_css', custom_src_css)
#]) # ])
# copy one directory above # copy one directory above
try: try:
shutil.copyfile(html_report, lf_check_latest_html) shutil.copyfile(html_report, lf_check_latest_html)
except: except:
print("check permissions on {lf_check_latest_html}".format(lf_check_latest_html=lf_check_latest_html)) print("check permissions on {lf_check_latest_html}".format(lf_check_latest_html=lf_check_latest_html))
shutil.copyfile(html_report, lf_check_html_report) shutil.copyfile(html_report, lf_check_html_report)
# copy banner and logo up one directory, # copy banner and logo up one directory,
shutil.copyfile(banner_src_png, banner_dest_png) shutil.copyfile(banner_src_png, banner_dest_png)
shutil.copyfile(CandelaLogo_src_png, CandelaLogo_dest_png) shutil.copyfile(CandelaLogo_src_png, CandelaLogo_dest_png)
shutil.copyfile(report_src_css, report_dest_css) shutil.copyfile(report_src_css, report_dest_css)
shutil.copyfile(custom_src_css, custom_dest_css) shutil.copyfile(custom_src_css, custom_dest_css)
shutil.copyfile(font_src_woff, font_dest_woff) shutil.copyfile(font_src_woff, font_dest_woff)
shutil.copyfile(CandelaLogo_small_src_png, CandelaLogo_small_dest_png) shutil.copyfile(CandelaLogo_small_src_png, CandelaLogo_small_dest_png)
# print out locations of results # print out locations of results
print("lf_check_latest.html: "+lf_check_latest_html) print("lf_check_latest.html: " + lf_check_latest_html)
print("lf_check_html_report: "+lf_check_html_report) print("lf_check_html_report: " + lf_check_html_report)
check.send_results_email(report_file=lf_check_html_report) check.send_results_email(report_file=lf_check_html_report)
if __name__ == '__main__': if __name__ == '__main__':
main() main()