lf_check.py : code cleanup for deprecated functionality

Signed-off-by: Chuck SmileyRekiere <chuck.smileyrekiere@candelatech.com>
This commit is contained in:
Chuck SmileyRekiere
2021-09-21 08:24:06 -06:00
parent 1eabb0f919
commit d509e02e10

View File

@@ -213,9 +213,6 @@ class lf_check():
self.email_title_txt = ""
self.email_txt = ""
# NOTE: My influx token is unlucky and starts with a '-', but using the syntax below # with '=' right after the argument keyword works as hoped.
# --influx_token=
# DUT , Test rig must match testbed
self.test_rig = "CT-US-NA"
self.test_rig_json = ""
@@ -230,24 +227,6 @@ class lf_check():
def get_test_rig(self):
return self.test_rig
def check_if_port_exists(self,json_igg):
queries = dict()
queries['LANforge Manager'] = 'http://%s:%s' % (self.lf_mgr_ip, self.lf_mgr_port)
# Frame work not required to use specific databases or presentation
if json_igg != "":
queries['Blog Host'] = 'http://%s:%s' % (self.blog_host_ghost, self.blog_port_ghost)
queries['Influx Host'] = 'http://%s:%s' % (self.influx_database_host, self.influx_database_port)
queries['Grafana Host'] = 'http://%s:%s' % (self.dashboard_host_grafana, self.dashboard_port_grafana)
results = dict()
for key, value in queries.items():
try:
ping = requests.get(value).text
results[key] = [str(ping), value]
except:
print('%s not found' % value)
results[key] = [value, None]
return results
def get_scripts_git_sha(self):
# get git sha
process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE)
@@ -362,12 +341,6 @@ QA Report Dashboard:
http://{ip_qa}/{qa_url}
NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url)
if(self.json_igg != "" ):
message_txt += """
Ghost Blog:
http://{blog}:2368""".format(blog=self.blog_host_ghost)
if (self.email_title_txt != ""):
mail_subject = "{} [{hostname}] {date}".format(self.email_title_txt, hostname=hostname,
date=datetime.datetime.now())
@@ -397,9 +370,6 @@ http://{blog}:2368""".format(blog=self.blog_host_ghost)
print("send email timed out")
process.terminate()
def get_csv_results(self):
return self.csv_file.name
def start_csv_results(self):
print("self.csv_results")
self.csv_results_file = open(self.csv_results, "w")
@@ -693,7 +663,7 @@ http://{blog}:2368""".format(blog=self.blog_host_ghost)
self.logger.info("test_ip not in test_network json")
exit(1)
def load_factory_default_db(self):
def load_FACTORY_DFLT_database(self):
# self.logger.info("file_wd {}".format(self.scripts_wd))
try:
os.chdir(self.scripts_wd)
@@ -708,10 +678,10 @@ http://{blog}:2368""".format(blog=self.blog_host_ghost)
# wait for the process to terminate
out, err = process.communicate()
errcode = process.returncode
print("load_factory_default_db errcode: {errcode}".format(errcode=errcode))
print("load_FACTORY_DFLT_database errcode: {errcode}".format(errcode=errcode))
# not currently used
def load_blank_db(self):
def load_BLANK_database(self):
try:
os.chdir(self.scripts_wd)
except:
@@ -724,10 +694,10 @@ http://{blog}:2368""".format(blog=self.blog_host_ghost)
# wait for the process to terminate
out, err = process.communicate()
errcode = process.returncode
print("load_blank_db errcode: {errcode}".format(errcode=errcode))
print("load_BLANK_database errcode: {errcode}".format(errcode=errcode))
def load_custom_db(self, custom_db):
def load_custom_database(self, custom_db):
try:
os.chdir(self.scripts_wd)
except:
@@ -740,7 +710,7 @@ http://{blog}:2368""".format(blog=self.blog_host_ghost)
# wait for the process to terminate
out, err = process.communicate()
errcode = process.returncode
print("load_custome_db errcode: {errcode}".format(errcode=errcode))
print("load_custome_database errcode: {errcode}".format(errcode=errcode))
def run_script_test(self):
self.start_html_results()
@@ -837,43 +807,15 @@ http://{blog}:2368""".format(blog=self.blog_host_ghost)
if 'SECURITY_6G_USED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('SECURITY_6G_USED', self.security_6g)
if 'NUM_STA' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('NUM_STA', self.num_sta)
if 'COL_NAMES' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('COL_NAMES', self.col_names)
if 'UPSTREAM_PORT' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('UPSTREAM_PORT',
self.upstream_port)
# lf_dataplane_test.py and lf_wifi_capacity_test.py use a parameter --local_path for the location
# of the reports when the reports are pulled.
if 'REPORT_PATH' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('REPORT_PATH', self.report_path)
# The TEST_BED is the database tag
if 'TEST_BED' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_BED', self.influx_database_tag)
# Influx database configuration
if 'influx_database_host' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('influx_database_host',
self.influx_database_host)
if 'influx_database_port' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('influx_database_port',
self.influx_database_port)
if 'influx_database_token' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('influx_database_token',
self.influx_database_token)
if 'influx_database_org' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('influx_database_org',
self.influx_database_org)
if 'influx_database_bucket' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('influx_database_bucket',
self.influx_database_bucket)
if 'influx_database_tag' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('influx_database_tag',
self.influx_database_tag)
if 'DUT_SET_NAME' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('DUT_SET_NAME',
self.dut_set_name)
@@ -881,36 +823,6 @@ http://{blog}:2368""".format(blog=self.blog_host_ghost)
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('TEST_RIG', self.test_rig)
# end of database configuration
# dashboard configuration
if 'dashboard_host_grafana' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('dashboard_host_grafana',
self.dashboard_host_grafana)
if 'dashboard_token_grafana' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('dashboard_token_grafana',
self.dashboard_token_grafana)
# end of dashboard configuraiton
# blog configuration
if 'blog_host_ghost' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('blog_host_ghost', self.blog_host_ghost)
if 'blog_token_ghost' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('blog_token_ghost', self.blog_token_ghost)
if 'blog_authors_ghost' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('blog_authors_ghost',
self.blog_authors_ghost)
if 'blog_customer_ghost' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('blog_customer_ghost',
self.blog_customer_ghost)
if 'blog_user_push_ghost' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('blog_user_push_ghost',
self.blog_user_push_ghost)
if 'blog_password_push_ghost' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('blog_password_push_ghost',
self.blog_password_push_ghost)
if 'BLOG_FLAG' in self.test_dict[test]['args']:
self.test_dict[test]['args'] = self.test_dict[test]['args'].replace('BLOG_FLAG', self.blog_flag)
# end of blog configruation
if 'timeout' in self.test_dict[test]:
self.logger.info("timeout : {}".format(self.test_dict[test]['timeout']))
self.test_timeout = int(self.test_dict[test]['timeout'])
@@ -1164,7 +1076,6 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
parser.add_argument('--json_rig', help="--json_rig <rig json config> ", default="", required=True)
parser.add_argument('--json_dut', help="--json_dut <dut json config> ", default="", required=True)
parser.add_argument('--json_test', help="--json_test <test json config> ", default="", required=True)
parser.add_argument('--json_igg', help="--json_igg <influx grafana ghost json config> ", default="")
parser.add_argument('--suite', help="--suite <suite name> default TEST_DICTIONARY", default="TEST_DICTIONARY")
parser.add_argument('--production', help="--production stores true, sends email results to production email list",
action='store_true')
@@ -1274,18 +1185,6 @@ note if all json data (rig,dut,tests) in same json file pass same json in for a
check.read_json_dut()
check.read_json_test()
if args.json_igg != "":
print("Tests need to have influx parameters passed in")
check.read_json_igg()
ping_result = check.check_if_port_exists(json_igg)
for key, value in ping_result.items():
if value[1] is None:
print(UserWarning('Check your %s IP address, %s is unreachable' % (key, value[0])))
else:
print('%s IP address %s accessible' % (key, value[1]))
# get sha and lanforge information for results
# Need to do this after reading the configuration
try: