diff --git a/lf_libs/__init__.py b/lf_libs/__init__.py index 4435324c..dffc2f72 100644 --- a/lf_libs/__init__.py +++ b/lf_libs/__init__.py @@ -1,16 +1,12 @@ import importlib -import json import logging import os import sys import time import allure -import click import paramiko import pytest -import requests -import urllib3 from scp import SCPClient from tabulate import tabulate @@ -137,10 +133,19 @@ class lf_libs: self.lanforge_data = lf_data.get("details") self.testbed = lf_data.get("testbed") self.setup_lf_data() + self.load_scenario() + self.setup_metadata() + self.setup_dut() except Exception as e: logging.error("lf_data has bad values: " + str(lf_data)) logging.error(e) + """ + setup_lf_data : used to set object variables that are passed from lab_info.json + It also creates object for realm and CreateChamberview class object + which can be used further + """ + def setup_lf_data(self): try: self.manager_ip = self.lanforge_data.get("manager_ip") @@ -155,6 +160,10 @@ class lf_libs: logging.error("lf_data has bad values: " + str(self.lanforge_data)) logging.error(e) + """ + setup_dut : It read the dut data and creates the dut with relevent data + """ + def setup_dut(self): for index in range(0, len(self.dut_data)): dut_obj = DUT(lfmgr=self.manager_ip, @@ -175,17 +184,17 @@ class lf_libs: for info in data["interfaces"]: if (info[list(info.keys())[0]]["port type"]) == "Ethernet": all_eth_ports.append(list(dict(info).keys())[0]) - logging.debug("Available Ports: " + str(all_eth_ports)) + logging.info("Available Ports: " + str(all_eth_ports)) for port in self.wan_ports: if port not in all_eth_ports: logging.error("LANforge system doesn't contains the expected WAN Port: " + str(port)) continue - logging.debug("WAN Port is Available on LANforge Port Manager: " + str(port)) + logging.info("WAN Port is Available on LANforge Port Manager: " + str(port)) for port in self.lan_ports: if port not in all_eth_ports: logging.error("LANforge system doesn't contains the expected LAN Port: " + str(port)) continue - logging.debug("LAN Port is Available on LANforge Port Manager: " + str(port)) + logging.info("LAN Port is Available on LANforge Port Manager: " + str(port)) data = self.json_get("/radiostatus/all") all_radios = [] all_radio_eid = [] @@ -241,14 +250,14 @@ class lf_libs: max_5g_stations += 1 * int(str(data[info]["max_vifs"])) max_ax_stations += 1 * int(str(data[info]["max_vifs"])) self.mtk_radios.append(info) - logging.debug("Radio Information is Extracted") - logging.debug("Available Radios: " + str(all_radio_eid) + " - Phantom Radios: " + str(phantom_radios)) - logging.debug("max_possible_stations: " + str(max_possible_stations)) - logging.debug("max_2g_stations: " + str(max_2g_stations)) - logging.debug("max_5g_stations: " + str(max_5g_stations)) - logging.debug("max_6g_stations: " + str(max_6g_stations)) - logging.debug("max_ax_stations: " + str(max_ax_stations)) - logging.debug("max_ac_stations: " + str(max_ac_stations)) + logging.info("Radio Information is Extracted") + logging.info("Available Radios: " + str(all_radio_eid) + " - Phantom Radios: " + str(phantom_radios)) + logging.info("max_possible_stations: " + str(max_possible_stations)) + logging.info("max_2g_stations: " + str(max_2g_stations)) + logging.info("max_5g_stations: " + str(max_5g_stations)) + logging.info("max_6g_stations: " + str(max_6g_stations)) + logging.info("max_ax_stations: " + str(max_ax_stations)) + logging.info("max_ac_stations: " + str(max_ac_stations)) def load_scenario(self): self.local_realm.load(self.manager_default_db) @@ -292,8 +301,91 @@ class lf_libs: """ pass + def create_stations(self): + pass + + def delete_stations(self): + pass + + def modify_station(self): + pass + + def read_stations(self): + pass + + def start_sniffer(self): + pass + + def pull_reports(self): + pass + + def get_wifi_radios(self): + pass + + def modify_wifi_radio(self): + pass + + def load_scenario_db(self): + pass + + def delete_dut(self): + pass + + def read_dut(self): + pass + + def update_dut(self): + pass + + def get_ethernet_ports(self): + pass + + def set_ethernet_port(self): + pass + + def clean_port_manager(self): + pass + + def clean_layer3cx(self): + pass + + def add_vlan(self, vlan_ids=[]): + data = self.json_get("/port/all") + flag = 0 + temp_raw_lines = self.default_scenario_raw_lines + for port in self.wan_ports: + for vlans in vlan_ids: + for i in data["interfaces"]: + if list(i.keys())[0] != port + "." + str(vlans): + flag = 1 + if flag == 1: + for vlans in vlan_ids: + temp_raw_lines.append(["profile_link " + port + " vlan-100 1 " + port + + " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)]) + print(temp_raw_lines) + exit() + self.chamber_view(raw_lines=temp_raw_lines) + + def chamber_view(self, delete_old_scenario=True, raw_lines=[]): + print(self.chamberview_object) + if delete_old_scenario: + self.chamberview_object.clean_cv_scenario(scenario_name=self.default_scenario_name) + self.chamberview_object.setup(create_scenario=self.default_scenario_name, + raw_line=self.default_scenario_raw_lines + ) + self.chamberview_object.build(self.default_scenario_name) + self.chamberview_object.sync_cv() + time.sleep(2) + self.chamberview_object.show_text_blob(None, None, True) # Show changes on GUI + self.chamberview_object.sync_cv() + return self.chamberview_object, self.default_scenario_name + class lf_tests(lf_libs): + """ + lf_tools is needed in lf_tests to do various operations needed by various tests + """ + lf_tools_obj = None def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG): super().__init__(lf_data, dut_data, log_level) @@ -504,263 +596,6 @@ class SCP_File: scp.close() -class lf_tools(lf_libs): - - def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG): - super().__init__(lf_data, dut_data, log_level) - pass - - def create_stations(self, band="2G", num_stations="max", dut="NA", ssid_name=[], idx=0): - LOGGER.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) + - " DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx)) - if num_stations == 0: - LOGGER.warning("0 Stations") - return - idx = idx - if self.run_lf or self.cc_1: - if band == "2G": - idx = 0 - if band == "5G": - idx = 1 - - for i in self.dut_idx_mapping: - if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band: - idx = i - if band == "2G": - if num_stations != "max": - LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.twog_radios))) - total_sta = num_stations - max_possible = 0 - for radio in self.twog_radios: - max_possible = max_possible + int(self.get_max_sta(radio)) - if total_sta <= max_possible: - per_radio_sta = int(total_sta / len(self.twog_radios)) - rem = total_sta % len(self.twog_radios) - else: - total_sta = max_possible - per_radio_sta = int(total_sta / len(self.twog_radios)) - rem = total_sta % len(self.twog_radios) - if rem != 0 and per_radio_sta == 0: - per_radio_sta = rem / len(self.twog_radios) - LOGGER.info("Total stations per radio: " + str(per_radio_sta)) - for radio in self.twog_radios: - max_possible = int(self.get_max_sta(radio)) - if total_sta == 0: - return - num_stations = per_radio_sta - if rem == 0 and num_stations == 0: - return - if max_possible - num_stations >= rem: - num_stations = num_stations + rem - rem = 0 - elif max_possible - rem >= num_stations: - num_stations = num_stations + rem - rem = 0 - elif total_sta <= max_possible: - num_stations = total_sta - if per_radio_sta < 1: - num_stations = 1 - total_sta = total_sta - num_stations - LOGGER.info("Adding " + str(num_stations) + " Stations on " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - LOGGER.debug("Raw Line : " + str(station_data)) - - if num_stations == "max": - LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.twog_radios))) - for radio in self.twog_radios: - num_stations = self.get_max_sta(radio) - LOGGER.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - LOGGER.debug("Raw Line : " + str(station_data)) - - if band == "5G": - if num_stations != "max": - LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.fiveg_radios))) - total_sta = num_stations - max_possible = 0 - for radio in self.fiveg_radios: - max_possible = max_possible + int(self.get_max_sta(radio)) - if total_sta <= max_possible: - per_radio_sta = int(total_sta / len(self.fiveg_radios)) - rem = total_sta % len(self.fiveg_radios) - else: - total_sta = max_possible - per_radio_sta = int(total_sta / len(self.fiveg_radios)) - rem = total_sta % len(self.fiveg_radios) - if rem != 0 and per_radio_sta == 0: - per_radio_sta = rem / len(self.fiveg_radios) - LOGGER.info("Total stations per radio: " + str(per_radio_sta)) - for radio in self.fiveg_radios: - max_possible = int(self.get_max_sta(radio)) - if total_sta == 0: - return - num_stations = per_radio_sta - if rem == 0 and num_stations == 0: - return - if max_possible - num_stations >= rem: - num_stations = num_stations + rem - rem = 0 - elif max_possible - rem >= num_stations: - num_stations = num_stations + rem - rem = 0 - elif total_sta <= max_possible: - num_stations = total_sta - if per_radio_sta < 1: - num_stations = 1 - total_sta = total_sta - num_stations - LOGGER.info("Adding " + str(num_stations) + " Stations on " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - LOGGER.debug("Raw Line : " + str(station_data)) - - if num_stations == "max": - LOGGER.info("Total 5G Radios Available in Testbed: " + str(len(self.fiveg_radios))) - for radio in self.fiveg_radios: - num_stations = self.get_max_sta(radio) - LOGGER.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - LOGGER.debug("Raw Line : " + str(station_data)) - if band == "ax": - if num_stations != "max": - LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.ax_radios))) - total_sta = num_stations - max_possible = 0 - for radio in self.ax_radios: - max_possible = max_possible + int(self.get_max_sta(radio)) - if total_sta <= max_possible: - per_radio_sta = int(total_sta / len(self.ax_radios)) - rem = total_sta % len(self.ax_radios) - else: - total_sta = max_possible - per_radio_sta = int(total_sta / len(self.ax_radios)) - rem = total_sta % len(self.ax_radios) - if rem != 0 and per_radio_sta == 0: - per_radio_sta = rem / len(self.ax_radios) - LOGGER.info("Total stations per radio: " + str(per_radio_sta)) - for radio in self.ax_radios: - max_possible = int(self.get_max_sta(radio)) - if total_sta == 0: - return - num_stations = per_radio_sta - if rem == 0 and num_stations == 0: - return - if max_possible - num_stations >= rem: - num_stations = num_stations + rem - rem = 0 - elif max_possible - rem >= num_stations: - num_stations = num_stations + rem - rem = 0 - elif total_sta <= max_possible: - num_stations = total_sta - if per_radio_sta < 1: - num_stations = 1 - total_sta = total_sta - num_stations - LOGGER.info("Adding " + str(num_stations) + " Stations on " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - LOGGER.debug("Raw Line : " + str(station_data)) - if num_stations == "max": - LOGGER.info("Total AX Radios Available in Testbed: " + str(len(self.ax_radios))) - for radio in self.ax_radios: - num_stations = self.get_max_sta(radio) - LOGGER.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - LOGGER.debug("Raw Line : " + str(station_data)) - - - def delete_stations(self): - pass - - def modify_station(self): - pass - - def read_stations(self): - pass - - def start_sniffer(self): - pass - - def pull_reports(self): - pass - - def get_wifi_radios(self): - pass - - def modify_wifi_radio(self): - pass - - def load_scenario_db(self): - pass - - def delete_dut(self): - pass - - def read_dut(self): - pass - - def update_dut(self): - pass - - def get_ethernet_ports(self): - pass - - def set_ethernet_port(self): - pass - - def clean_port_manager(self): - pass - - def clean_layer3cx(self): - pass - - def add_vlan(self, vlan_ids=[]): - data = self.json_get("/port/all") - flag = 0 - temp_raw_lines = self.default_scenario_raw_lines - for port in self.wan_ports: - for vlans in vlan_ids: - for i in data["interfaces"]: - if list(i.keys())[0] != port + "." + str(vlans): - flag = 1 - if flag == 1: - for vlans in vlan_ids: - temp_raw_lines.append(["profile_link " + port + " vlan-100 1 " + port - + " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)]) - print(temp_raw_lines) - exit() - self.chamber_view(raw_lines=temp_raw_lines) - - def chamber_view(self, delete_old_scenario=True, raw_lines=[]): - print(self.chamberview_object) - if delete_old_scenario: - self.chamberview_object.clean_cv_scenario(scenario_name=self.default_scenario_name) - self.chamberview_object.setup(create_scenario=self.default_scenario_name, - raw_line=self.default_scenario_raw_lines - ) - self.chamberview_object.build(self.default_scenario_name) - self.chamberview_object.sync_cv() - time.sleep(2) - self.chamberview_object.show_text_blob(None, None, True) # Show changes on GUI - self.chamberview_object.sync_cv() - return self.chamberview_object, self.default_scenario_name - - if __name__ == '__main__': basic_02 = { "controller": { @@ -798,8 +633,7 @@ if __name__ == '__main__': } } - obj = lf_tools(lf_data=dict(basic_02["traffic_generator"]), dut_data=list(basic_02["access_point"])) - obj.setup_metadata() - obj.load_scenario() - obj.read_cv_scenario() - obj.setup_dut() + obj = lf_tests(lf_data=dict(basic_02["traffic_generator"]), dut_data=list(basic_02["access_point"]), + log_level=logging.DEBUG) + # obj.read_cv_scenario() + # obj.setup_dut()