lf_check.py : initial checkin to have other LANforge information stored

in meta.txt

Signed-off-by: Chuck SmileyRekiere <chuck.smileyrekiere@candelatech.com>
This commit is contained in:
Chuck SmileyRekiere
2021-08-22 08:21:19 -06:00
parent a8e855c2cb
commit 75c3a6bb14

View File

@@ -127,9 +127,20 @@ class lf_check():
self.background_blue = "background-color:blue"
# LANforge information
self.lanforge_system_node_version = ""
self.lanforge_kernel_version = ""
self.lanforge_gui_version_full = ""
self.lanforge_gui_version = ""
# meta.txt
self.meta_data_path = ""
# lanforge configuration
self.lf_mgr_ip = "192.168.0.102"
self.lf_mgr_port = "8080"
self.lf_mgr_user = "lanforge"
self.lf_mgr_pass = "lanforge"
# results
self.test_start_time = ""
self.test_end_time = ""
@@ -165,13 +176,6 @@ class lf_check():
self.host_ip_test = None
self.email_title_txt = ""
self.email_txt = ""
self.meta_data_path = ""
# lanforge configuration
self.lf_mgr_ip = "192.168.0.102"
self.lf_mgr_port = "8080"
self.lf_mgr_user = "lanforge"
self.lf_mgr_pass = "lanforge"
# dut selection
self.dut_set_name = 'DUT_NAME ASUSRT-AX88U' # note the name will be set as --set DUT_NAME ASUSRT-AX88U, this is not dut_name (see above)
@@ -258,30 +262,26 @@ class lf_check():
def get_lanforge_system_node_version(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
# ssh.connect(self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, banner_timeout=600)
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('uname -n')
lanforge_system_node_version = stdout.readlines()
# print('\n'.join(output))
self.lanforge_system_node_version = stdout.readlines()
lanforge_system_node_version = [line.replace('\n', '') for line in lanforge_system_node_version]
ssh.close()
time.sleep(1)
return lanforge_system_node_version
return self.lanforge_system_node_version
def get_lanforge_kernel_version(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
# ssh.connect(self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, banner_timeout=600)
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('uname -r')
lanforge_kernel_version = stdout.readlines()
# print('\n'.join(output))
self.lanforge_kernel_version = stdout.readlines()
lanforge_kernel_version = [line.replace('\n', '') for line in lanforge_kernel_version]
ssh.close()
time.sleep(1)
return lanforge_kernel_version
return self.lanforge_kernel_version
def get_lanforge_gui_version(self):
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
@@ -290,7 +290,6 @@ class lf_check():
allow_agent=False, look_for_keys=False, banner_timeout=600)
stdin, stdout, stderr = ssh.exec_command('./btserver --version | grep Version')
self.lanforge_gui_version_full = stdout.readlines()
# print('\n'.join(output))
self.lanforge_gui_version_full = [line.replace('\n', '') for line in self.lanforge_gui_version_full]
print("lanforge_gui_version_full: {lanforge_gui_version_full}".format(lanforge_gui_version_full=self.lanforge_gui_version_full))
self.lanforge_gui_version = self.lanforge_gui_version_full[0].split('Version:',maxsplit=1)[-1].split(maxsplit=1)[0]
@@ -616,6 +615,45 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url)
self.logger.info("test_ip not in test_network json")
exit(1)
def read_test_generic(self):
if "radio_used" in self.json_rig["test_generic"]:
self.radio_lf = self.json_rig["test_generic"]["radio_used"]
else:
self.logger.info("radio_used not in test_generic json")
exit(1)
if "ssid_used" in self.json_rig["test_generic"]:
self.ssid = self.json_rig["test_generic"]["ssid_used"]
else:
self.logger.info("ssid_used not in test_generic json")
exit(1)
if "ssid_pw_used" in self.json_rig["test_generic"]:
self.ssid_pw = self.json_rig["test_generic"]["ssid_pw_used"]
else:
self.logger.info("ssid_pw_used not in test_generic json")
exit(1)
if "security_used" in self.json_rig["test_generic"]:
self.security = self.json_rig["test_generic"]["security_used"]
else:
self.logger.info("security_used not in test_generic json")
exit(1)
if "num_sta" in self.json_rig["test_generic"]:
self.num_sta = self.json_rig["test_generic"]["num_sta"]
else:
self.logger.info("num_sta not in test_generic json")
exit(1)
if "col_names" in self.json_rig["test_generic"]:
self.num_sta = self.json_rig["test_generic"]["col_names"]
else:
self.logger.info("col_names not in test_generic json")
exit(1)
if "upstream_port" in self.json_rig["test_generic"]:
self.upstream_port = self.json_rig["test_generic"]["upstream_port"]
else:
self.logger.info("upstream_port not in test_generic json")
exit(1)
# Optional from --json_igg switch
# kpi.csv and meta.txt can be read after test run performed holds same data
def read_test_database(self):
if "database_config" in self.json_igg["test_database"]:
self.database_config = self.json_igg["test_database"]["database_config"]
@@ -646,6 +684,7 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url)
else:
self.logger.info("database_tag not in test_database json")
# Optional only if --json_igg switch
def read_test_dashboard(self):
if "dashboard_config" in self.json_igg["test_dashboard"]:
self.dashboard_config = self.json_igg["test_dashboard"]["dashboard_config"]
@@ -662,6 +701,7 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url)
else:
self.logger.info("dashboard_token not in test_dashboard json")
# Optional on if --json_igg switch
def read_test_blog(self):
if "blog_config" in self.json_igg["test_blog"]:
self.blog_config = self.json_igg["test_blog"]["blog_config"]
@@ -703,42 +743,6 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url)
else:
self.logger.info("blog_flag not in test_blog json")
def read_test_generic(self):
if "radio_used" in self.json_rig["test_generic"]:
self.radio_lf = self.json_rig["test_generic"]["radio_used"]
else:
self.logger.info("radio_used not in test_generic json")
exit(1)
if "ssid_used" in self.json_rig["test_generic"]:
self.ssid = self.json_rig["test_generic"]["ssid_used"]
else:
self.logger.info("ssid_used not in test_generic json")
exit(1)
if "ssid_pw_used" in self.json_rig["test_generic"]:
self.ssid_pw = self.json_rig["test_generic"]["ssid_pw_used"]
else:
self.logger.info("ssid_pw_used not in test_generic json")
exit(1)
if "security_used" in self.json_rig["test_generic"]:
self.security = self.json_rig["test_generic"]["security_used"]
else:
self.logger.info("security_used not in test_generic json")
exit(1)
if "num_sta" in self.json_rig["test_generic"]:
self.num_sta = self.json_rig["test_generic"]["num_sta"]
else:
self.logger.info("num_sta not in test_generic json")
exit(1)
if "col_names" in self.json_rig["test_generic"]:
self.num_sta = self.json_rig["test_generic"]["col_names"]
else:
self.logger.info("col_names not in test_generic json")
exit(1)
if "upstream_port" in self.json_rig["test_generic"]:
self.upstream_port = self.json_rig["test_generic"]["upstream_port"]
else:
self.logger.info("upstream_port not in test_generic json")
exit(1)
def load_factory_default_db(self):
# self.logger.info("file_wd {}".format(self.scripts_wd))
@@ -757,7 +761,6 @@ NOTE: Diagrams are links in dashboard""".format(ip_qa=ip,qa_url=qa_url)
errcode = process.returncode
print("load_factory_default_db errcode: {errcode}".format(errcode=errcode))
# not currently used
def load_blank_db(self):
try: