fixed merge issue

This commit is contained in:
shivam
2022-07-20 12:48:22 +05:30
parent 710f78889e
commit c872de4a69

View File

@@ -1,16 +1,12 @@
import importlib
import json
import logging
import os
import sys
import time
import allure
import click
import paramiko
import pytest
import requests
import urllib3
from scp import SCPClient
from tabulate import tabulate
@@ -137,10 +133,19 @@ class lf_libs:
self.lanforge_data = lf_data.get("details")
self.testbed = lf_data.get("testbed")
self.setup_lf_data()
self.load_scenario()
self.setup_metadata()
self.setup_dut()
except Exception as e:
logging.error("lf_data has bad values: " + str(lf_data))
logging.error(e)
"""
setup_lf_data : used to set object variables that are passed from lab_info.json
It also creates object for realm and CreateChamberview class object
which can be used further
"""
def setup_lf_data(self):
try:
self.manager_ip = self.lanforge_data.get("manager_ip")
@@ -155,6 +160,10 @@ class lf_libs:
logging.error("lf_data has bad values: " + str(self.lanforge_data))
logging.error(e)
"""
setup_dut : It read the dut data and creates the dut with relevent data
"""
def setup_dut(self):
for index in range(0, len(self.dut_data)):
dut_obj = DUT(lfmgr=self.manager_ip,
@@ -175,17 +184,17 @@ class lf_libs:
for info in data["interfaces"]:
if (info[list(info.keys())[0]]["port type"]) == "Ethernet":
all_eth_ports.append(list(dict(info).keys())[0])
logging.debug("Available Ports: " + str(all_eth_ports))
logging.info("Available Ports: " + str(all_eth_ports))
for port in self.wan_ports:
if port not in all_eth_ports:
logging.error("LANforge system doesn't contains the expected WAN Port: " + str(port))
continue
logging.debug("WAN Port is Available on LANforge Port Manager: " + str(port))
logging.info("WAN Port is Available on LANforge Port Manager: " + str(port))
for port in self.lan_ports:
if port not in all_eth_ports:
logging.error("LANforge system doesn't contains the expected LAN Port: " + str(port))
continue
logging.debug("LAN Port is Available on LANforge Port Manager: " + str(port))
logging.info("LAN Port is Available on LANforge Port Manager: " + str(port))
data = self.json_get("/radiostatus/all")
all_radios = []
all_radio_eid = []
@@ -241,14 +250,14 @@ class lf_libs:
max_5g_stations += 1 * int(str(data[info]["max_vifs"]))
max_ax_stations += 1 * int(str(data[info]["max_vifs"]))
self.mtk_radios.append(info)
logging.debug("Radio Information is Extracted")
logging.debug("Available Radios: " + str(all_radio_eid) + " - Phantom Radios: " + str(phantom_radios))
logging.debug("max_possible_stations: " + str(max_possible_stations))
logging.debug("max_2g_stations: " + str(max_2g_stations))
logging.debug("max_5g_stations: " + str(max_5g_stations))
logging.debug("max_6g_stations: " + str(max_6g_stations))
logging.debug("max_ax_stations: " + str(max_ax_stations))
logging.debug("max_ac_stations: " + str(max_ac_stations))
logging.info("Radio Information is Extracted")
logging.info("Available Radios: " + str(all_radio_eid) + " - Phantom Radios: " + str(phantom_radios))
logging.info("max_possible_stations: " + str(max_possible_stations))
logging.info("max_2g_stations: " + str(max_2g_stations))
logging.info("max_5g_stations: " + str(max_5g_stations))
logging.info("max_6g_stations: " + str(max_6g_stations))
logging.info("max_ax_stations: " + str(max_ax_stations))
logging.info("max_ac_stations: " + str(max_ac_stations))
def load_scenario(self):
self.local_realm.load(self.manager_default_db)
@@ -292,8 +301,91 @@ class lf_libs:
"""
pass
def create_stations(self):
pass
def delete_stations(self):
pass
def modify_station(self):
pass
def read_stations(self):
pass
def start_sniffer(self):
pass
def pull_reports(self):
pass
def get_wifi_radios(self):
pass
def modify_wifi_radio(self):
pass
def load_scenario_db(self):
pass
def delete_dut(self):
pass
def read_dut(self):
pass
def update_dut(self):
pass
def get_ethernet_ports(self):
pass
def set_ethernet_port(self):
pass
def clean_port_manager(self):
pass
def clean_layer3cx(self):
pass
def add_vlan(self, vlan_ids=[]):
data = self.json_get("/port/all")
flag = 0
temp_raw_lines = self.default_scenario_raw_lines
for port in self.wan_ports:
for vlans in vlan_ids:
for i in data["interfaces"]:
if list(i.keys())[0] != port + "." + str(vlans):
flag = 1
if flag == 1:
for vlans in vlan_ids:
temp_raw_lines.append(["profile_link " + port + " vlan-100 1 " + port
+ " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)])
print(temp_raw_lines)
exit()
self.chamber_view(raw_lines=temp_raw_lines)
def chamber_view(self, delete_old_scenario=True, raw_lines=[]):
print(self.chamberview_object)
if delete_old_scenario:
self.chamberview_object.clean_cv_scenario(scenario_name=self.default_scenario_name)
self.chamberview_object.setup(create_scenario=self.default_scenario_name,
raw_line=self.default_scenario_raw_lines
)
self.chamberview_object.build(self.default_scenario_name)
self.chamberview_object.sync_cv()
time.sleep(2)
self.chamberview_object.show_text_blob(None, None, True) # Show changes on GUI
self.chamberview_object.sync_cv()
return self.chamberview_object, self.default_scenario_name
class lf_tests(lf_libs):
"""
lf_tools is needed in lf_tests to do various operations needed by various tests
"""
lf_tools_obj = None
def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG):
super().__init__(lf_data, dut_data, log_level)
@@ -504,263 +596,6 @@ class SCP_File:
scp.close()
class lf_tools(lf_libs):
def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG):
super().__init__(lf_data, dut_data, log_level)
pass
def create_stations(self, band="2G", num_stations="max", dut="NA", ssid_name=[], idx=0):
LOGGER.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) +
" DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx))
if num_stations == 0:
LOGGER.warning("0 Stations")
return
idx = idx
if self.run_lf or self.cc_1:
if band == "2G":
idx = 0
if band == "5G":
idx = 1
for i in self.dut_idx_mapping:
if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band:
idx = i
if band == "2G":
if num_stations != "max":
LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.twog_radios)))
total_sta = num_stations
max_possible = 0
for radio in self.twog_radios:
max_possible = max_possible + int(self.get_max_sta(radio))
if total_sta <= max_possible:
per_radio_sta = int(total_sta / len(self.twog_radios))
rem = total_sta % len(self.twog_radios)
else:
total_sta = max_possible
per_radio_sta = int(total_sta / len(self.twog_radios))
rem = total_sta % len(self.twog_radios)
if rem != 0 and per_radio_sta == 0:
per_radio_sta = rem / len(self.twog_radios)
LOGGER.info("Total stations per radio: " + str(per_radio_sta))
for radio in self.twog_radios:
max_possible = int(self.get_max_sta(radio))
if total_sta == 0:
return
num_stations = per_radio_sta
if rem == 0 and num_stations == 0:
return
if max_possible - num_stations >= rem:
num_stations = num_stations + rem
rem = 0
elif max_possible - rem >= num_stations:
num_stations = num_stations + rem
rem = 0
elif total_sta <= max_possible:
num_stations = total_sta
if per_radio_sta < 1:
num_stations = 1
total_sta = total_sta - num_stations
LOGGER.info("Adding " + str(num_stations) + " Stations on " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
LOGGER.debug("Raw Line : " + str(station_data))
if num_stations == "max":
LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.twog_radios)))
for radio in self.twog_radios:
num_stations = self.get_max_sta(radio)
LOGGER.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
LOGGER.debug("Raw Line : " + str(station_data))
if band == "5G":
if num_stations != "max":
LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.fiveg_radios)))
total_sta = num_stations
max_possible = 0
for radio in self.fiveg_radios:
max_possible = max_possible + int(self.get_max_sta(radio))
if total_sta <= max_possible:
per_radio_sta = int(total_sta / len(self.fiveg_radios))
rem = total_sta % len(self.fiveg_radios)
else:
total_sta = max_possible
per_radio_sta = int(total_sta / len(self.fiveg_radios))
rem = total_sta % len(self.fiveg_radios)
if rem != 0 and per_radio_sta == 0:
per_radio_sta = rem / len(self.fiveg_radios)
LOGGER.info("Total stations per radio: " + str(per_radio_sta))
for radio in self.fiveg_radios:
max_possible = int(self.get_max_sta(radio))
if total_sta == 0:
return
num_stations = per_radio_sta
if rem == 0 and num_stations == 0:
return
if max_possible - num_stations >= rem:
num_stations = num_stations + rem
rem = 0
elif max_possible - rem >= num_stations:
num_stations = num_stations + rem
rem = 0
elif total_sta <= max_possible:
num_stations = total_sta
if per_radio_sta < 1:
num_stations = 1
total_sta = total_sta - num_stations
LOGGER.info("Adding " + str(num_stations) + " Stations on " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
LOGGER.debug("Raw Line : " + str(station_data))
if num_stations == "max":
LOGGER.info("Total 5G Radios Available in Testbed: " + str(len(self.fiveg_radios)))
for radio in self.fiveg_radios:
num_stations = self.get_max_sta(radio)
LOGGER.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
LOGGER.debug("Raw Line : " + str(station_data))
if band == "ax":
if num_stations != "max":
LOGGER.info("Total 2G Radios Available in Testbed: " + str(len(self.ax_radios)))
total_sta = num_stations
max_possible = 0
for radio in self.ax_radios:
max_possible = max_possible + int(self.get_max_sta(radio))
if total_sta <= max_possible:
per_radio_sta = int(total_sta / len(self.ax_radios))
rem = total_sta % len(self.ax_radios)
else:
total_sta = max_possible
per_radio_sta = int(total_sta / len(self.ax_radios))
rem = total_sta % len(self.ax_radios)
if rem != 0 and per_radio_sta == 0:
per_radio_sta = rem / len(self.ax_radios)
LOGGER.info("Total stations per radio: " + str(per_radio_sta))
for radio in self.ax_radios:
max_possible = int(self.get_max_sta(radio))
if total_sta == 0:
return
num_stations = per_radio_sta
if rem == 0 and num_stations == 0:
return
if max_possible - num_stations >= rem:
num_stations = num_stations + rem
rem = 0
elif max_possible - rem >= num_stations:
num_stations = num_stations + rem
rem = 0
elif total_sta <= max_possible:
num_stations = total_sta
if per_radio_sta < 1:
num_stations = 1
total_sta = total_sta - num_stations
LOGGER.info("Adding " + str(num_stations) + " Stations on " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
LOGGER.debug("Raw Line : " + str(station_data))
if num_stations == "max":
LOGGER.info("Total AX Radios Available in Testbed: " + str(len(self.ax_radios)))
for radio in self.ax_radios:
num_stations = self.get_max_sta(radio)
LOGGER.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
LOGGER.debug("Raw Line : " + str(station_data))
def delete_stations(self):
pass
def modify_station(self):
pass
def read_stations(self):
pass
def start_sniffer(self):
pass
def pull_reports(self):
pass
def get_wifi_radios(self):
pass
def modify_wifi_radio(self):
pass
def load_scenario_db(self):
pass
def delete_dut(self):
pass
def read_dut(self):
pass
def update_dut(self):
pass
def get_ethernet_ports(self):
pass
def set_ethernet_port(self):
pass
def clean_port_manager(self):
pass
def clean_layer3cx(self):
pass
def add_vlan(self, vlan_ids=[]):
data = self.json_get("/port/all")
flag = 0
temp_raw_lines = self.default_scenario_raw_lines
for port in self.wan_ports:
for vlans in vlan_ids:
for i in data["interfaces"]:
if list(i.keys())[0] != port + "." + str(vlans):
flag = 1
if flag == 1:
for vlans in vlan_ids:
temp_raw_lines.append(["profile_link " + port + " vlan-100 1 " + port
+ " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)])
print(temp_raw_lines)
exit()
self.chamber_view(raw_lines=temp_raw_lines)
def chamber_view(self, delete_old_scenario=True, raw_lines=[]):
print(self.chamberview_object)
if delete_old_scenario:
self.chamberview_object.clean_cv_scenario(scenario_name=self.default_scenario_name)
self.chamberview_object.setup(create_scenario=self.default_scenario_name,
raw_line=self.default_scenario_raw_lines
)
self.chamberview_object.build(self.default_scenario_name)
self.chamberview_object.sync_cv()
time.sleep(2)
self.chamberview_object.show_text_blob(None, None, True) # Show changes on GUI
self.chamberview_object.sync_cv()
return self.chamberview_object, self.default_scenario_name
if __name__ == '__main__':
basic_02 = {
"controller": {
@@ -798,8 +633,7 @@ if __name__ == '__main__':
}
}
obj = lf_tools(lf_data=dict(basic_02["traffic_generator"]), dut_data=list(basic_02["access_point"]))
obj.setup_metadata()
obj.load_scenario()
obj.read_cv_scenario()
obj.setup_dut()
obj = lf_tests(lf_data=dict(basic_02["traffic_generator"]), dut_data=list(basic_02["access_point"]),
log_level=logging.DEBUG)
# obj.read_cv_scenario()
# obj.setup_dut()