mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-11-01 11:28:09 +00:00
206 lines
8.7 KiB
Python
206 lines
8.7 KiB
Python
import re
|
|
|
|
import allure
|
|
from create_chamberview import CreateChamberview
|
|
from create_chamberview_dut import DUT
|
|
import time
|
|
from LANforge.lfcli_base import LFCliBase
|
|
import json
|
|
import os
|
|
import pandas as pd
|
|
|
|
|
|
class ChamberView:
|
|
|
|
def __init__(self, lanforge_data=None, access_point_data=None, debug=True, testbed=None):
|
|
self.lanforge_ip = lanforge_data["ip"]
|
|
self.lanforge_port = lanforge_data["port"]
|
|
self.twog_radios = lanforge_data["2.4G-Radio"]
|
|
self.fiveg_radios = lanforge_data["5G-Radio"]
|
|
self.ax_radios = lanforge_data["AX-Radio"]
|
|
self.upstream_port = lanforge_data["upstream"]
|
|
self.twog_prefix = lanforge_data["2.4G-Station-Name"]
|
|
self.fiveg_prefix = lanforge_data["5G-Station-Name"]
|
|
self.ax_prefix = lanforge_data["AX-Station-Name"]
|
|
self.uplink_port = lanforge_data["uplink"] # eth2
|
|
self.upstream_subnet = lanforge_data["upstream_subnet"]
|
|
self.testbed = testbed
|
|
self.upstream_resources = self.upstream_port.split(".")[0] + "." + self.upstream_port.split(".")[1]
|
|
self.uplink_resources = self.uplink_port.split(".")[0] + "." + self.uplink_port.split(".")[1]
|
|
self.delete_old_scenario = True
|
|
# For chamber view
|
|
self.scenario_name = "TIP-" + self.testbed
|
|
self.debug = debug
|
|
self.exit_on_error = False
|
|
self.dut_idx_mapping = {}
|
|
self.ssid_list = []
|
|
self.raw_line = [
|
|
["profile_link " + self.upstream_resources + " upstream-dhcp 1 NA NA " + self.upstream_port.split(".")
|
|
[2] + ",AUTO -1 NA"],
|
|
["profile_link " + self.uplink_resources + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet
|
|
+ "' NA " + self.uplink_port.split(".")[2] + "," + self.upstream_port.split(".")[2] + " -1 NA"]
|
|
]
|
|
|
|
# This is for rawline input | see create_chamberview_dut.py for more details
|
|
|
|
self.CreateChamberview = CreateChamberview(self.lanforge_ip, self.lanforge_port)
|
|
|
|
if access_point_data:
|
|
# for DUT
|
|
self.dut_name = testbed
|
|
self.ap_model = access_point_data[0]["model"]
|
|
self.version = access_point_data[0]["version"].split("/")[-1]
|
|
self.serial = access_point_data[0]["serial"]
|
|
|
|
self.CreateDut = DUT(lfmgr=self.lanforge_ip,
|
|
port=self.lanforge_port,
|
|
dut_name=self.testbed,
|
|
sw_version=self.version,
|
|
hw_version=self.ap_model,
|
|
model_num=self.ap_model,
|
|
serial_num=self.serial
|
|
)
|
|
self.CreateDut.ssid = []
|
|
|
|
def reset_scenario(self):
|
|
self.raw_line = [
|
|
["profile_link " + self.upstream_resources + " upstream-dhcp 1 NA NA " + self.upstream_port.split(".")
|
|
[2] + ",AUTO -1 NA"],
|
|
["profile_link " + self.uplink_resources + " uplink-nat 1 'DUT: upstream LAN " + self.upstream_subnet
|
|
+ "' NA " + self.uplink_port.split(".")[2] + "," + self.upstream_port.split(".")[2] + " -1 NA"]
|
|
]
|
|
self.Chamber_View()
|
|
|
|
def Chamber_View(self):
|
|
if self.delete_old_scenario:
|
|
self.CreateChamberview.clean_cv_scenario(type="Network-Connectivity", scenario_name=self.scenario_name)
|
|
self.CreateChamberview.setup(create_scenario=self.scenario_name,
|
|
raw_line=self.raw_line
|
|
)
|
|
self.CreateChamberview.build(self.scenario_name)
|
|
self.CreateChamberview.sync_cv()
|
|
time.sleep(2)
|
|
self.CreateChamberview.show_text_blob(None, None, True) # Show changes on GUI
|
|
self.CreateChamberview.sync_cv()
|
|
return self.CreateChamberview, self.scenario_name
|
|
|
|
def add_stations(self, band="2G", num_stations="max", dut="NA", ssid_name=[]):
|
|
idx = 0
|
|
print(self.dut_idx_mapping)
|
|
for i in self.dut_idx_mapping:
|
|
if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band:
|
|
idx = i
|
|
max_stations = 0
|
|
print(idx)
|
|
if band == "2G":
|
|
if num_stations != "max":
|
|
num_stations = int(num_stations / len(self.twog_radios))
|
|
for radio in self.twog_radios:
|
|
max_stations = 64
|
|
if num_stations == "max":
|
|
num_stations = max_stations
|
|
station_data = ["profile_link 1.1 STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
|
|
str(int(idx) + 1) + "'" + " NA " + radio]
|
|
self.raw_line.append(station_data)
|
|
|
|
if band == "5G":
|
|
if num_stations != "max":
|
|
num_stations = int(num_stations / len(self.fiveg_radios))
|
|
for radio in self.fiveg_radios:
|
|
max_stations = 64
|
|
if num_stations == "max":
|
|
num_stations = max_stations
|
|
station_data = ["profile_link 1.1 STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
|
|
str(int(idx) + 1) + "'" + " NA " + radio]
|
|
self.raw_line.append(station_data)
|
|
if band == "ax":
|
|
if num_stations != "max":
|
|
num_stations = int(num_stations / len(self.fiveg_radios))
|
|
for radio in self.ax_radios:
|
|
max_stations = 1
|
|
if num_stations == "max":
|
|
num_stations = max_stations
|
|
station_data = ["profile_link 1.1 STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
|
|
str(int(idx) + 1) + "'" + " NA " + radio]
|
|
self.raw_line.append(station_data)
|
|
|
|
|
|
|
|
def Create_Dut(self):
|
|
self.CreateDut.setup()
|
|
self.CreateDut.add_ssids()
|
|
self.CreateDut.cv_test.show_text_blob(None, None, True) # Show changes on GUI
|
|
self.CreateDut.cv_test.sync_cv()
|
|
time.sleep(2)
|
|
self.CreateDut.cv_test.show_text_blob(None, None, True) # Show changes on GUI
|
|
self.CreateDut.cv_test.sync_cv()
|
|
return self.CreateDut, self.dut_name
|
|
|
|
def update_ssid(self, ssid_data=[]):
|
|
self.CreateDut.ssid = ssid_data
|
|
self.CreateDut.add_ssids()
|
|
# SSID data should be in this format
|
|
# [
|
|
# ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'],
|
|
# ['ssid_idx=1 ssid=Default-SSID-5gl password=12345678 bssid=90:3c:b3:94:48:59']
|
|
# ]
|
|
pass
|
|
|
|
def json_get(self, _req_url="/"):
|
|
cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port, )
|
|
json_response = cli_base.json_get(_req_url=_req_url)
|
|
return json_response
|
|
|
|
def json_post(self, req_url, shelf, resources, port, current, intrest):
|
|
data = {
|
|
"shelf": shelf,
|
|
"resource": resources,
|
|
"port": port,
|
|
"current_flags": current,
|
|
"interest": intrest
|
|
}
|
|
cli_base = LFCliBase(_lfjson_host=self.lanforge_ip, _lfjson_port=self.lanforge_port, )
|
|
return cli_base.json_post(req_url, data)
|
|
|
|
def read_kpi_file(self, column_name, dir_name):
|
|
if column_name == None:
|
|
df = pd.read_csv("../reports/" + str(dir_name) + "/kpi.csv", sep=r'\t', engine='python')
|
|
if df.empty == True:
|
|
return "empty"
|
|
else:
|
|
return df
|
|
else:
|
|
df = pd.read_csv("../reports/" + str(dir_name) + "/kpi.csv", sep=r'\t', usecols=column_name,
|
|
engine='python')
|
|
if df.empty == True:
|
|
return "empty"
|
|
else:
|
|
return df
|
|
|
|
def attach_report_graphs(self, report_name=None, pdf_name="WIFI Capacity Test PDF Report"):
|
|
relevant_path = "../reports/" + report_name + "/"
|
|
entries = os.listdir("../reports/" + report_name + '/')
|
|
pdf = False
|
|
for i in entries:
|
|
if ".pdf" in i:
|
|
pdf = i
|
|
if pdf:
|
|
allure.attach.file(source=relevant_path + pdf,
|
|
name=pdf_name)
|
|
|
|
included_extensions = ['png']
|
|
file_names = [fn for fn in os.listdir(relevant_path)
|
|
if any(fn.endswith(ext) for ext in included_extensions)]
|
|
|
|
a = [item for item in file_names if 'print' not in item]
|
|
a = [item for item in a if 'logo' not in item]
|
|
a = [item for item in a if 'Logo' not in item]
|
|
a = [item for item in a if 'candela' not in item]
|
|
|
|
a.sort()
|
|
for i in a:
|
|
allure.attach.file(source=relevant_path + i,
|
|
name=i,
|
|
attachment_type="image/png", extension=None)
|
|
|