Added l3 and attenuator functions

Signed-off-by: tarun-candela <tarunkumar.madabathula@candelatech.com>
This commit is contained in:
tarun-candela
2022-11-16 18:57:54 +05:30
parent dff436b206
commit c5f8859d01

View File

@@ -11,6 +11,7 @@ from scp import SCPClient
from tabulate import tabulate from tabulate import tabulate
from itertools import islice from itertools import islice
import csv import csv
import pandas as pd
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base") lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
@@ -36,6 +37,11 @@ stascan = importlib.import_module("py-scripts.sta_scan_test")
StaScan = stascan.StaScan StaScan = stascan.StaScan
cv_test_reports = importlib.import_module("py-json.cv_test_reports") cv_test_reports = importlib.import_module("py-json.cv_test_reports")
lf_report = cv_test_reports.lanforge_reports lf_report = cv_test_reports.lanforge_reports
attenuator_serial = importlib.import_module("py-scripts.attenuator_serial")
Attenuator = attenuator_serial.AttenuatorSerial
lf_atten_mod_test = importlib.import_module("py-scripts.lf_atten_mod_test")
Attenuator_modify = lf_atten_mod_test.CreateAttenuator
class lf_libs: class lf_libs:
@@ -1447,6 +1453,108 @@ class lf_libs:
return ap_lanforge_6g_channel_dict[channel] return ap_lanforge_6g_channel_dict[channel]
else: else:
return None return None
def attenuator_serial(self):
self.obj = Attenuator(
lfclient_host= self.manager_ip,
lfclient_port=self.manager_http_port
)
val = self.obj.show()
return val
def attenuator_modify(self, serno, idx, val):
atten_obj = Attenuator_modify(self.manager_ip, self.manager_http_port, serno, idx, val)
atten_obj.build()
def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True, kpi_csv=False,
file_name="/csv-data/data-Combined_bps__60_second_running_average-1.csv",
batch_size="0"):
try:
df = pd.read_csv("../reports/" + str(dir_name) + file_name,
sep=r'\t', engine='python')
print("csv file opened")
except FileNotFoundError:
print("csv file does not exist")
return False
if kpi_csv:
count = 0
dict_data = {"Down": {}, "Up": {}, "Both": {}}
csv_short_dis = df.loc[:,"short-description"]
csv_num_score = df.loc[:,"numeric-score"]
for i in range(len(batch_size.split(","))):
dict_data["Down"][csv_short_dis[count + 0]] = csv_num_score[count + 0]
dict_data["Up"][csv_short_dis[count + 1]] = csv_num_score[count + 1]
dict_data["Both"][csv_short_dis[count + 2]] = csv_num_score[count + 2]
count += 3
if individual_station_throughput:
dict_data = {}
if option == "download":
csv_sta_names = df.iloc[[0]].values.tolist()
csv_throughput_values = df.iloc[[1]].values.tolist()
elif option == "upload":
csv_sta_names = df.iloc[[0]].values.tolist()
csv_throughput_values = df.iloc[[2]].values.tolist()
else:
print("Provide proper option: download or upload")
return
sta_list = csv_sta_names[0][0][:-1].replace('"', '').split(",")
th_list = list(map(float, csv_throughput_values[0][0].split(",")))
for i in range(len(sta_list)):
dict_data[sta_list[i]] = th_list[i]
return dict_data
def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate,
traffic_type, sta_list, side_b=""):
# checked
if side_b == "":
side_b = self.wan_ports
print(sta_list)
print(type(sta_list))
print(side_b)
print(type(side_b))
local_realm = realm.Realm(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port)
cx_profile = local_realm.new_l3_cx_profile()
cx_profile.host = self.manager_ip
cx_profile.port = self.manager_http_port
layer3_cols = ['name', 'tx bytes', 'rx bytes', 'tx rate', 'rx rate']
cx_profile.side_a_min_bps = side_a_min_rate
cx_profile.side_a_max_bps = side_a_max_rate
cx_profile.side_b_min_bps = side_b_min_rate
cx_profile.side_b_max_bps = side_b_max_rate
# create
cx_profile.create(endp_type=traffic_type, side_a=sta_list,
side_b=side_b,
sleep_time=0)
cx_profile.start_cx()
def l3_cleanup(self):
local_realm = realm.Realm(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port)
local_realm.remove_all_cxs(remove_all_endpoints=True)
def get_cx_list(self):
local_realm = realm.Realm(lfclient_host=self.manager_ip, lfclient_port=self.manager_http_port)
layer3_result = local_realm.cx_list()
layer3_names = [item["name"] for item in layer3_result.values() if "_links" in item]
print(layer3_names)
return layer3_names
def allure_report_table_format(self, dict_data=None, key=None, value=None, name=None):#, value_on_same_table=True):
report_obj = Report()
data_table, dict_table = "", {}
dict_table[key] = list(dict_data.keys())
dict_table[value] = list(dict_data.values())
try:
data_table = report_obj.table2(table=dict_table, headers='keys')
except Exception as e:
print(e)
if name != None:
allure.attach(name=name, body=str(data_table))