added methods to load db and read the scenario details from the saved db

Signed-off-by: shivam <shivam.thakur@candelatech.com>
This commit is contained in:
shivam
2022-07-14 11:55:50 +05:30
parent ae2d6e8510
commit a6673ee2de
3 changed files with 84 additions and 25 deletions

View File

@@ -1,12 +1,23 @@
import importlib
import json
import logging
import os
import sys
import time
import click
import requests
import urllib3
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
LFCliBase = lfcli_base.LFCliBase
realm = importlib.import_module("py-json.realm")
cv_test_manager = importlib.import_module("py-json.cv_test_manager")
cv_test = cv_test_manager.cv_test
lf_cv_base = importlib.import_module("py-json.lf_cv_base")
ChamberViewBase = lf_cv_base.ChamberViewBase
create_chamberview_dut = importlib.import_module("py-scripts.create_chamberview_dut")
DUT = create_chamberview_dut.DUT
class lf_libs:
@@ -26,12 +37,18 @@ class lf_libs:
"""
lanforge_data = dict()
manager_ip = None
testbed = None
manager_http_port = None
manager_ssh_port = None
manager_default_db = None
wan_ports = None
lan_ports = None
uplink_nat_ports = None
dut_data = None
dut_objects = []
default_scenario_name = None
default_scenario_test = None
default_scenario_raw_lines = []
"""
Scenario : dhcp-bridge / dhcp-external
dhcp-bridge - wan_ports will act as dhcp server for AP's and it will use uplink_nat_ports for uplink NAT
@@ -44,6 +61,11 @@ class lf_libs:
"""
scenario = None
"""
Scenario in chamberview which will be read by read_cv_scenario() and stored here
This will be used to add additional stuff on scenario along with this
"""
cv_scenario = None
"""
Number of Resources available
"""
resources = None
@@ -96,11 +118,13 @@ class lf_libs:
"""
local_realm = None
def __init__(self, lf_data, log_level=logging.DEBUG):
def __init__(self, lf_data={}, dut_data=[], log_level=logging.DEBUG):
logging.basicConfig(format='%(asctime)s - %(message)s', level=log_level)
lf_data = dict(lf_data)
self.dut_data = dut_data
try:
self.lanforge_data = lf_data.get("details")
self.testbed = lf_data.get("testbed")
self.setup_lf_data()
except Exception as e:
logging.error("lf_data has bad values: " + str(lf_data))
@@ -119,6 +143,20 @@ class lf_libs:
logging.error("lf_data has bad values: " + str(self.lanforge_data))
logging.error(e)
def setup_dut(self):
for index in range(0, len(self.dut_data)):
dut_obj = DUT(lfmgr=self.manager_ip,
port=self.manager_http_port,
dut_name=self.testbed + "-" + str(index),
sw_version=self.dut_data[index]["version"],
hw_version=self.dut_data[index]["mode"],
model_num=self.dut_data[index]["model"],
serial_num=self.dut_data[index]["serial"])
dut_obj.setup()
dut_obj.add_ssids()
time.sleep(5)
self.dut_objects.append(dut_obj)
def setup_metadata(self):
data = self.json_get("/port/all")
all_eth_ports = []
@@ -208,11 +246,34 @@ class lf_libs:
json_response = cli_base.json_get(_req_url=_req_url)
return json_response
def json_post(self, _req_url="/"):
cli_base = LFCliBase(_lfjson_host=self.manager_ip, _lfjson_port=self.manager_http_port)
json_response = cli_base.json_post(_req_url=_req_url)
return json_response
def read_cv_scenario(self):
cv_obj = cv_test(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port)
cv_obj.show_text_blob(type="Last-Built-Scenario")
data = self.json_get("/text/Last-Built-Scenario.last_built")
data = data['record']['text'].split("\n")
for d in data:
if "scenario-name" in d:
self.default_scenario_name = d.split(":")[1][1:]
cv_obj.apply_cv_scenario(self.default_scenario_name)
time.sleep(2)
cv_obj.show_text_blob(type="Network-Connectivity")
data = self.json_get("/text/Network-Connectivity." + str(self.default_scenario_name))
data = data["record"]["text"].split("\n")
for d in data:
if "profile_link" in d:
self.default_scenario_raw_lines.append([d])
logging.info("Saved default CV Scenario details: " + str(self.default_scenario_raw_lines))
class lf_tests(lf_libs):
def __init__(self, lf_data, log_level=logging.DEBUG):
super().__init__(lf_data, log_level)
def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG):
super().__init__(lf_data, dut_data, log_level)
pass
def client_connectivity_test(self):
@@ -239,8 +300,8 @@ class lf_tests(lf_libs):
class lf_tools(lf_libs):
def __init__(self, lf_data, log_level=logging.DEBUG):
super().__init__(lf_data, log_level)
def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG):
super().__init__(lf_data, dut_data, log_level)
pass
def create_stations(self):
@@ -270,12 +331,6 @@ class lf_tools(lf_libs):
def load_scenario_db(self):
pass
def read_cv_scenario(self):
pass
def add_dut(self):
pass
def delete_dut(self):
pass
@@ -321,6 +376,7 @@ if __name__ == '__main__':
],
"traffic_generator": {
"name": "lanforge",
"testbed": "basic",
"scenario": "dhcp-bridge", # dhcp-bridge / dhcp-external
"details": {
"manager_ip": "192.168.52.89",
@@ -334,8 +390,9 @@ if __name__ == '__main__':
}
}
obj = lf_tools(lf_data=basic_02["traffic_generator"])
obj = lf_tools(lf_data=dict(basic_02["traffic_generator"]), dut_data=list(basic_02["access_point"]))
obj.setup_metadata()
# obj.load_scenario()
# obj = lf_tests(lf_data="")
# obj.json_get(_req_url="/port/all")
obj.load_scenario()
obj.read_cv_scenario()
obj.setup_dut()

View File

@@ -42,7 +42,8 @@ LOGGER.addHandler(stdout_handler)
class ChamberView:
def __init__(self, lanforge_data=None, access_point_data=None, run_lf=False, debug=True, testbed=None, cc_1=False, ap_version=None):
def __init__(self, lanforge_data=None, access_point_data=None, run_lf=False, debug=True, testbed=None, cc_1=False,
ap_version=None):
print("lanforge data", lanforge_data)
print("access point data", access_point_data)
self.access_point_data = access_point_data
@@ -337,7 +338,6 @@ class ChamberView:
if band == "5G":
idx = 1
for i in self.dut_idx_mapping:
if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band:
idx = i
@@ -563,7 +563,8 @@ class ChamberView:
result = df[column_name].values.tolist()
return result
def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True, kpi_csv=False,
def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True,
kpi_csv=False,
file_name="/csv-data/data-Combined_bps__60_second_running_average-1.csv",
batch_size="0"):
try:
@@ -578,8 +579,8 @@ class ChamberView:
if kpi_csv:
count = 0
dict_data = {"Down": {}, "Up": {}, "Both": {}}
csv_short_dis = df.loc[:,"short-description"]
csv_num_score = df.loc[:,"numeric-score"]
csv_short_dis = df.loc[:, "short-description"]
csv_num_score = df.loc[:, "numeric-score"]
for i in range(len(batch_size.split(","))):
dict_data["Down"][csv_short_dis[count + 0]] = csv_num_score[count + 0]
dict_data["Up"][csv_short_dis[count + 1]] = csv_num_score[count + 1]
@@ -727,7 +728,6 @@ class ChamberView:
# [['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58']]
self.update_ssid(ssid_data=ssid_data[ssid])
def create_non_meh_dut(self, ssid_data=[]):
# print("hi")
for ap, ssid in zip(self.access_point_data, range(len(ssid_data))):
@@ -750,8 +750,6 @@ class ChamberView:
# [['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58']]
self.update_ssid(ssid_data=ssid_data[ssid])
def set_radio_antenna(self, req_url, shelf, resources, radio, antenna):
data = {
"shelf": shelf,

View File

@@ -214,16 +214,17 @@ class cv_test(Realm):
response = self.json_get(url)
return response
def show_text_blob(self, config_name, blob_test_name, brief):
def show_text_blob(self, config_name=None, blob_test_name=None, brief=False, type="Plugin-Settings"):
req_url = "/cli-json/show_text_blob"
response_json = []
data = {"type": "Plugin-Settings"}
data = {"type": type}
if config_name and blob_test_name:
data["name"] = "%s%s" % (blob_test_name, config_name) # config name
else:
data["name"] = "ALL"
if brief:
data["brief"] = "brief"
print(req_url, data)
self.json_post(req_url, data, response_json_list_=response_json)
return response_json
@@ -366,6 +367,9 @@ class cv_test(Realm):
logger.critical("SCP failed, user %s, password %s, dest %s" % (lf_user, lf_password, lf_host))
raise e # Exception("Could not find Reports")
break
else:
logger.info('Not reporting to kpi file')
# Of if test stopped for some reason and could not generate report.
if not self.get_is_running(instance_name):