Resolved merge conflicts

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2022-08-19 17:41:22 +05:30
5 changed files with 543 additions and 415 deletions

View File

@@ -37,6 +37,7 @@ StaScan = stascan.StaScan
cv_test_reports = importlib.import_module("py-json.cv_test_reports") cv_test_reports = importlib.import_module("py-json.cv_test_reports")
lf_report = cv_test_reports.lanforge_reports lf_report = cv_test_reports.lanforge_reports
class lf_libs: class lf_libs:
""" """
"traffic_generator": { "traffic_generator": {
@@ -55,6 +56,7 @@ class lf_libs:
lanforge_data = dict() lanforge_data = dict()
manager_ip = None manager_ip = None
testbed = None testbed = None
run_lf = False
manager_http_port = None manager_http_port = None
manager_ssh_port = None manager_ssh_port = None
manager_default_db = None manager_default_db = None
@@ -147,10 +149,11 @@ class lf_libs:
""" """
local_realm = None local_realm = None
def __init__(self, lf_data={}, dut_data=[], log_level=logging.DEBUG): def __init__(self, lf_data={}, dut_data=[], run_lf=False, log_level=logging.DEBUG):
logging.basicConfig(format='%(asctime)s - %(message)s', level=log_level) logging.basicConfig(format='%(asctime)s - %(message)s', level=log_level)
lf_data = dict(lf_data) lf_data = dict(lf_data)
self.dut_data = dut_data self.dut_data = dut_data
self.run_lf = run_lf
# try: # try:
self.lanforge_data = lf_data.get("details") self.lanforge_data = lf_data.get("details")
self.testbed = lf_data.get("testbed") self.testbed = lf_data.get("testbed")
@@ -200,16 +203,18 @@ class lf_libs:
""" """
def setup_dut(self): def setup_dut(self):
self.dut_objects = []
for index in range(0, len(self.dut_data)): for index in range(0, len(self.dut_data)):
dut_obj = DUT(lfmgr=self.manager_ip, dut_obj = DUT(lfmgr=self.manager_ip,
port=self.manager_http_port, port=self.manager_http_port,
dut_name=self.testbed + "-" + str(index), dut_name=self.dut_data[index]["identifier"],
sw_version=self.dut_data[index]["firmware_version"], sw_version=self.dut_data[index]["firmware_version"],
hw_version=self.dut_data[index]["mode"], hw_version=self.dut_data[index]["mode"],
model_num=self.dut_data[index]["model"], model_num=self.dut_data[index]["model"],
serial_num=self.dut_data[index]["identifier"]) serial_num=self.dut_data[index]["identifier"])
dut_obj.setup() dut_obj.setup()
dut_obj.add_ssids() dut_obj.add_ssids()
self.dut_objects.append(dut_obj)
logging.info("Creating DUT") logging.info("Creating DUT")
def setup_metadata(self): def setup_metadata(self):
@@ -404,40 +409,103 @@ class lf_libs:
temp_raw_lines.append([d]) temp_raw_lines.append([d])
logging.info("Saved default CV Scenario details: " + str(temp_raw_lines)) logging.info("Saved default CV Scenario details: " + str(temp_raw_lines))
def setup_interfaces(self, band=None, vlan_id=None, mode=None, num_sta=None): def setup_interfaces(self, ssid="", bssid="", passkey="", encryption="", band=None, vlan_id=None, mode=None,
if band is None: num_sta=None):
logging.error("Band value is not available.") if band not in ["twog", "fiveg", "sixg"]:
pytest.exit("Band value is not available.") pytest.skip("Unsupported Band Selected: " + str(band))
if mode is None: if mode not in ["BRIDGE", "NAT-WAN", "NAT-LAN", "VLAN"]:
logging.error("mode value is not available") pytest.skip("Unsupported Mode Selected: " + str(mode))
pytest.exit("mode value is not available") r_val = dict()
if num_sta is None: for dut in self.dut_data:
logging.error("Number of stations are not available") r_val[dut["identifier"]] = None
pytest.exit("Number of stations are not available") for i in r_val:
r_val[i] = dict.fromkeys(["ssid", "bssid", "passkey", "encryption", "upstream_port",
"upstream_resource", "upstream", "station_data", "sniff_radio_2g",
"sniff_radio_5g", "sniff_radio_6g"])
if mode == "BRIDGE": if mode == "BRIDGE":
upstream_port = self.wan_upstream_port() ret = self.get_wan_upstream_ports()
elif mode == "NAT-WAN": for dut in r_val:
upstream_port = self.wan_upstream_port() if ret.keys().__contains__(dut) and ret[dut] is not None:
elif mode == "NAT-LAN": upstream_data = (ret[dut]).split(".")
upstream_port = self.lan_upstream_port() r_val[dut]["upstream_port"] = ret[dut]
elif mode == "VLAN": upstream_resource = upstream_data[1]
# for vlan mode vlan id should be available r_val[dut]["upstream_resource"] = upstream_resource
if vlan_id is not None: upstream_data.pop(0)
upstream_port = self.wan_upstream_port() + "." + str(vlan_id) upstream_data.pop(0)
upstream = ".".join(upstream_data)
r_val[dut]["upstream"] = upstream
else:
r_val.pop(dut)
if mode == "NAT-WAN":
ret = self.get_wan_upstream_ports()
for dut in r_val:
if ret.keys().__contains__(dut) and ret[dut] is not None:
upstream_data = (ret[dut]).split(".")
r_val[dut]["upstream_port"] = ret[dut]
upstream_resource = upstream_data[1]
r_val[dut]["upstream_resource"] = upstream_resource
upstream_data.pop(0)
upstream_data.pop(0)
upstream = ".".join(upstream_data)
r_val[dut]["upstream"] = upstream
else:
r_val.pop(dut)
if mode == "NAT-LAN":
ret = self.get_lan_upstream_ports()
for dut in r_val:
if ret.keys().__contains__(dut) and ret[dut] is not None:
upstream_data = (ret[dut]).split(".")
r_val[dut]["upstream_port"] = ret[dut]
upstream_resource = upstream_data[1]
r_val[dut]["upstream_resource"] = upstream_resource
upstream_data.pop(0)
upstream_data.pop(0)
upstream = ".".join(upstream_data)
r_val[dut]["upstream"] = upstream
else:
r_val.pop(dut)
if mode == "VLAN":
if vlan_id is None:
logging.error("VLAN ID is Unspecified in the VLAN Case")
pytest.skip("VLAN ID is Unspecified in the VLAN Case")
else: else:
logging.error("Vlan id is not available for vlan") ret = self.get_wan_upstream_ports()
pytest.exit("Vlan id is not available for vlan") for dut in r_val:
else: if ret.keys().__contains__(dut) and ret[dut] is not None:
logging.error("Mode value is wrong.Value e.g. BRIDGE or NAT or VLAN") upstream_data = (ret[dut] + "." + str(vlan_id)).split(".")
pytest.exit("Mode value is wrong.Value e.g. BRIDGE or NAT or VLAN") r_val[dut]["upstream_port"] = ret[dut] + "." + str(vlan_id)
radio_data = {} upstream_resource = upstream_data[1]
sta_prefix = "" r_val[dut]["upstream_resource"] = upstream_resource
sniff_radio = "" upstream_data.pop(0)
data_dict = {} upstream_data.pop(0)
# deleting existing stations and layer 3 upstream = ".".join(upstream_data)
self.pre_cleanup() r_val[dut]["upstream"] = upstream
else:
r_val.pop(dut)
dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"ax210_radios": self.ax210_radios}
dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"ax210_radios": self.ax210_radios}
dict_all_radios_6g = {"ax210_radios": self.ax210_radios}
max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19, max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19,
"ax200_radios": 1, "ax210_radios": 1} "ax200_radios": 1, "ax210_radios": 1}
radio_data = {}
sniff_radio = ""
sta_prefix = ""
data_dict = {}
# deleting existing stations and layer 3
# self.pre_cleanup()
data_dict["sniff_radio_2g"] = None
data_dict["sniff_radio_5g"] = None
data_dict["sniff_radio_6g"] = None
if band == "twog": if band == "twog":
if self.run_lf: if self.run_lf:
for i in self.dut_data: for i in self.dut_data:
@@ -445,6 +513,7 @@ class lf_libs:
passkey = i["ssid"]["2g-password"] passkey = i["ssid"]["2g-password"]
security = i["ssid"]["2g-encryption"].lower() security = i["ssid"]["2g-encryption"].lower()
sta_prefix = self.twog_prefix sta_prefix = self.twog_prefix
data_dict["sta_prefix"] = sta_prefix
# checking station compitality of lanforge # checking station compitality of lanforge
if int(num_sta) > int(self.max_2g_stations): if int(num_sta) > int(self.max_2g_stations):
logging.error("Can't create %s stations on lanforge" % num_sta) logging.error("Can't create %s stations on lanforge" % num_sta)
@@ -454,12 +523,6 @@ class lf_libs:
self.ax200_radios) == 0 and len(self.mtk_radios) == 0: self.ax200_radios) == 0 and len(self.mtk_radios) == 0:
logging.error("Twog radio is not available") logging.error("Twog radio is not available")
pytest.skip("Twog radio is not available") pytest.skip("Twog radio is not available")
dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"ax210_radios": self.ax210_radios}
# radio and station selection # radio and station selection
stations = num_sta stations = num_sta
for j in dict_all_radios_2g: for j in dict_all_radios_2g:
@@ -476,7 +539,6 @@ class lf_libs:
radio_data[i] = max_station radio_data[i] = max_station
stations = stations - max_station stations = stations - max_station
diff = max_station - stations diff = max_station - stations
# setup sniffer
sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data)
data_dict["sniff_radio_2g"] = sniff_radio data_dict["sniff_radio_2g"] = sniff_radio
if band == "fiveg": if band == "fiveg":
@@ -487,6 +549,7 @@ class lf_libs:
security = i["ssid"]["2g-encryption"].lower() security = i["ssid"]["2g-encryption"].lower()
sta_prefix = self.fiveg_prefix sta_prefix = self.fiveg_prefix
data_dict["sta_prefix"] = sta_prefix
# checking station compitality of lanforge # checking station compitality of lanforge
if int(num_sta) > int(self.max_5g_stations): if int(num_sta) > int(self.max_5g_stations):
logging.error("Can't create %s stations on lanforge" % num_sta) logging.error("Can't create %s stations on lanforge" % num_sta)
@@ -497,11 +560,6 @@ class lf_libs:
logging.error("fiveg radio is not available") logging.error("fiveg radio is not available")
pytest.skip("fiveg radio is not available") pytest.skip("fiveg radio is not available")
dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"ax210_radios": self.ax210_radios}
# radio and station selection # radio and station selection
stations = num_sta stations = num_sta
for j in dict_all_radios_5g: for j in dict_all_radios_5g:
@@ -529,6 +587,7 @@ class lf_libs:
security = i["ssid"]["6g-encryption"].lower() security = i["ssid"]["6g-encryption"].lower()
sta_prefix = self.sixg_prefix sta_prefix = self.sixg_prefix
data_dict["sta_prefix"] = sta_prefix
# checking station compitality of lanforge # checking station compitality of lanforge
if int(num_sta) > int(self.max_6g_stations): if int(num_sta) > int(self.max_6g_stations):
logging.error("Can't create %s stations on lanforge" % num_sta) logging.error("Can't create %s stations on lanforge" % num_sta)
@@ -538,8 +597,6 @@ class lf_libs:
logging.error("sixg radio is not available") logging.error("sixg radio is not available")
pytest.skip("sixg radio is not available") pytest.skip("sixg radio is not available")
dict_all_radios_6g = {"ax210_radios": self.ax210_radios}
# radio and station selection # radio and station selection
stations = num_sta stations = num_sta
for j in dict_all_radios_6g: for j in dict_all_radios_6g:
@@ -559,7 +616,7 @@ class lf_libs:
sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data)
data_dict["sniff_radio_6g"] = sniff_radio data_dict["sniff_radio_6g"] = sniff_radio
# creating dict of radio and station_list
dict_radio_sta_list = {} dict_radio_sta_list = {}
# list of per radio station # list of per radio station
length_to_split = list(radio_data.values()) length_to_split = list(radio_data.values())
@@ -577,22 +634,97 @@ class lf_libs:
for j in dict_radio_sta_list[i]: for j in dict_radio_sta_list[i]:
temp_list.append(shelf_resource + j) temp_list.append(shelf_resource + j)
dict_radio_sta_list[i] = temp_list dict_radio_sta_list[i] = temp_list
data_dict["radios"] = dict_radio_sta_list
for i in r_val:
r_val[i]["station_data"] = data_dict["radios"]
r_val[i]["sta_prefix"] = data_dict["sta_prefix"]
r_val[i]["sniff_radio_2g"] = data_dict["sniff_radio_2g"]
r_val[i]["sniff_radio_5g"] = data_dict["sniff_radio_5g"]
r_val[i]["sniff_radio_6g"] = data_dict["sniff_radio_6g"]
if self.run_lf: if self.run_lf:
data_dict["radios"] = dict_radio_sta_list for dut in self.dut_data:
data_dict["upstream_port"] = upstream_port ssid_data = []
data_dict["ssid"] = ssid if r_val.keys().__contains__(dut["identifier"]):
data_dict["passkey"] = passkey if dut.keys().__contains__("ssid"):
data_dict["security"] = security if band == "twog":
data_dict["sta_prefix"] = sta_prefix r_val[dut["identifier"]]["ssid"] = dut["ssid"]["2g-ssid"]
# data_dict["sniff_radio"] = sniff_radio r_val[dut["identifier"]]["passkey"] = dut["ssid"]["2g-password"]
return data_dict r_val[dut["identifier"]]["encryption"] = dut["ssid"]["2g-encryption"]
r_val[dut["identifier"]]["bssid"] = dut["ssid"]["2g-bssid"]
if dut["ssid"]["2g-encryption"] == "OPEN":
ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] +
' password=' + dut["ssid"]["2g-password"] +
' bssid=' + dut["ssid"]["2g-bssid"]])
else:
ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] +
' security=' + str(dut["ssid"]["2g-encryption"]).upper() +
' password=' + dut["ssid"]["2g-password"] +
' bssid=' + dut["ssid"]["2g-bssid"]])
self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data)
if band == "fiveg":
r_val[dut["identifier"]]["ssid"] = dut["ssid"]["5g-ssid"]
r_val[dut["identifier"]]["passkey"] = dut["ssid"]["5g-password"]
r_val[dut["identifier"]]["encryption"] = dut["ssid"]["5g-encryption"]
r_val[dut["identifier"]]["bssid"] = dut["ssid"]["5g-bssid"]
if dut["ssid"]["5g-encryption"] == "OPEN":
ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["5g-ssid"] +
' password=' + dut["ssid"]["5g-password"] +
' bssid=' + dut["ssid"]["5g-bssid"]])
else:
ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["5g-ssid"] +
' security=' + str(dut["ssid"]["5g-encryption"]).upper() +
' password=' + dut["ssid"]["5g-password"] +
' bssid=' + dut["ssid"]["5g-bssid"]])
self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data)
if band == "sixg":
r_val[dut["identifier"]]["ssid"] = dut["ssid"]["6g-ssid"]
r_val[dut["identifier"]]["passkey"] = dut["ssid"]["6g-password"]
r_val[dut["identifier"]]["encryption"] = dut["ssid"]["6g-encryption"]
r_val[dut["identifier"]]["bssid"] = dut["ssid"]["6g-bssid"]
if dut["ssid"]["6g-encryption"] == "OPEN":
ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["6g-ssid"] +
' password=' + dut["ssid"]["6g-password"] +
' bssid=' + dut["ssid"]["6g-bssid"]])
else:
ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["6g-ssid"] +
' security=' + str(dut["ssid"]["6g-encryption"]).upper() +
' password=' + dut["ssid"]["6g-password"] +
' bssid=' + dut["ssid"]["6g-bssid"]])
self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data)
else: else:
data_dict["radios"] = dict_radio_sta_list for dut in self.dut_data:
data_dict["upstream_port"] = upstream_port ssid_data = []
data_dict["sta_prefix"] = sta_prefix if r_val.keys().__contains__(dut["identifier"]):
# data_dict["sniff_radio"] = sniff_radio r_val[dut["identifier"]]["ssid"] = ssid
return data_dict r_val[dut["identifier"]]["passkey"] = passkey
r_val[dut["identifier"]]["encryption"] = encryption
r_val[dut["identifier"]]["bssid"] = bssid
if encryption == "OPEN":
ssid_data.append(['ssid_idx=0 ssid=' + ssid +
' password=' + passkey +
' bssid=' + str(bssid).upper()])
else:
ssid_data.append(['ssid_idx=0 ssid=' + ssid +
' security=' + str(encryption).upper() +
' password=' + passkey +
' bssid=' + str(bssid).upper()])
if str(encryption).upper() in ["OPEN", "WPA", "WPA2", "WPA3", "WEP"]:
self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data)
print(r_val)
return r_val
def update_duts(self, identifier=0, ssid_data=[]):
for dut_obj in self.dut_objects:
if identifier == dut_obj.dut_name:
dut_obj.ssid = ssid_data
dut_obj.add_ssids()
# SSID data should be in this format
# [
# ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'],
# ['ssid_idx=1 ssid=Default-SSID-5gl password=12345678 bssid=90:3c:b3:94:48:59']
# ]
def setup_relevent_profiles(self): def setup_relevent_profiles(self):
""" TODO """ TODO
@@ -695,21 +827,25 @@ class lf_libs:
else: else:
logging.error("Name is not provided") logging.error("Name is not provided")
def wan_upstream_port(self): def get_wan_upstream_ports(self):
"""finding upstream port""" """finding upstream wan ports"""
upstream_port = "" r_val = dict()
for i in self.dut_data: for dut in self.dut_data:
if dict(i).keys().__contains__("wan_port"): r_val[dut["identifier"]] = None
upstream_port = i["wan_port"] if dut["wan_port"] is not None:
return upstream_port if dut["wan_port"] in self.lanforge_data["wan_ports"].keys():
r_val[dut["identifier"]] = dut["wan_port"]
return r_val
def lan_upstream_port(self): def get_lan_upstream_ports(self):
"""finding upstream port""" """finding upstream wan ports"""
upstream_port = "" r_val = dict()
for i in self.dut_data: for dut in self.dut_data:
if dict(i).keys().__contains__("lan_port"): r_val[dut["identifier"]] = None
upstream_port = i["lan_port"] if dut["lan_port"] is not None:
return upstream_port if dut["lan_port"] in self.lanforge_data["lan_ports"].keys():
r_val[dut["identifier"]] = dut["lan_port"]
return r_val
def setup_sniffer(self, band=None, station_radio_data=None): def setup_sniffer(self, band=None, station_radio_data=None):
"""Setup sniff radio""" """Setup sniff radio"""

View File

@@ -47,154 +47,136 @@ class lf_tests(lf_libs):
""" """
def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None): def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None):
super().__init__(lf_data, dut_data, log_level) super().__init__(lf_data, dut_data, run_lf, log_level)
self.run_lf = run_lf
self.dut_idx_mapping = {}
# self.upstream_port = list(self.uplink_nat_ports.keys())[0]
# self.skip_pcap = skip_pcap
# self.wan_upstream = list(self.wan_ports.keys())
# self.lan_upstream =
#self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam", _cleanup_on_exit=False)
def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", dut_data={}, def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", dut_data={},
security="open", extra_securities=[], security="open", extra_securities=[], sta_mode=0,
num_sta=1, mode="BRIDGE", vlan_id=[None], band="twog", ssid_channel=None, num_sta=1, mode="BRIDGE", vlan_id=[None], band="twog",
allure_attach=True, runtime_secs=40): allure_attach=True, runtime_secs=40):
# self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, debug_=self.debug) data = self.setup_interfaces(ssid=ssid, bssid=passkey, passkey=passkey, encryption=security,
# setup_interfaces() interface selection return radio name along no of station on each radio, upstream port band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta)
#
data = self.setup_interfaces(band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta)
self.add_vlan(vlan_ids=vlan_id) self.add_vlan(vlan_ids=vlan_id)
logging.info("Setup interface data" + str(data))
if self.run_lf:
ssid = data["ssid"]
passkey = data["passkey"]
security = data["security"]
sta_connect_obj = []
for radio in data["radios"]:
obj_sta_connect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam",
_cleanup_on_exit=False)
obj_sta_connect.sta_mode = 0
upstream_data = list(data["upstream_port"].split("."))
obj_sta_connect.upstream_resource = upstream_data[1]
upstream_data.pop(0)
upstream_data.pop(0)
upstream_port = ".".join(upstream_data)
obj_sta_connect.upstream_port = upstream_port
self.enable_verbose_debug(radio=radio, enable=False)
obj_sta_connect.radio = radio
obj_sta_connect.admin_down(obj_sta_connect.radio)
obj_sta_connect.admin_up(obj_sta_connect.radio)
obj_sta_connect.sta_prefix = data["sta_prefix"]
# changed to auto channel
self.set_radio_channel(radio=radio, channel="AUTO")
logging.info("scan ssid radio: " + str(radio.split(".")[2]))
result = self.scan_ssid(radio=radio, ssid=ssid, ssid_channel=ssid_channel)
logging.info("ssid scan data : " + str(result))
if not result and ssid_channel:
# Sniffer required
# print("sniff radio", data["sniff_radio"].split(".")[2])
for dut in self.dut_data:
identifier = dut["identifier"]
if dut_data.keys().__contains__(identifier):
if band == "twog":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)[-1])["2G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["2G"][0]
if data["sniff_radio_2g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data["sniff_radio_2g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
elif band == "fiveg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)[-1])["5G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["5G"][0]
if data["sniff_radio_5g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data["sniff_radio_5g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
elif band == "sixg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)[-1])["6G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["6G"][0]
if data["sniff_radio_6g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data["sniff_radio_6g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
# print("ssid not available in scan result") logging.info("Setup interface data" + str(data))
# return "FAIL", "ssid not available in scan result" sta_connect_obj = []
pass for dut in data:
obj_sta_connect.resource = radio.split(".")[1] for radio in data[dut]["station_data"]:
obj_sta_connect.dut_ssid = ssid obj_sta_connect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam",
obj_sta_connect.dut_passwd = passkey _cleanup_on_exit=False)
obj_sta_connect.dut_security = security
obj_sta_connect.station_names = data["radios"][radio] obj_sta_connect.sta_mode = sta_mode
obj_sta_connect.runtime_secs = runtime_secs obj_sta_connect.upstream_resource = data[dut]["upstream_resource"]
obj_sta_connect.bringup_time_sec = 80 obj_sta_connect.upstream_port = data[dut]["upstream"]
obj_sta_connect.cleanup_on_exit = True self.enable_verbose_debug(radio=radio, enable=True)
obj_sta_connect.download_bps = 128000 obj_sta_connect.radio = radio
obj_sta_connect.upload_bps = 128000 obj_sta_connect.admin_down(obj_sta_connect.radio)
obj_sta_connect.side_a_pdu = 1200 obj_sta_connect.admin_up(obj_sta_connect.radio)
obj_sta_connect.side_b_pdu = 1500 obj_sta_connect.sta_prefix = data[dut]["sta_prefix"]
obj_sta_connect.setup(extra_securities=extra_securities) obj_sta_connect.resource = radio.split(".")[1]
if ssid_channel: obj_sta_connect.dut_ssid = ssid
pass obj_sta_connect.dut_ssid = ssid
# Need to start sniffer obj_sta_connect.dut_passwd = passkey
# print("sniff radio", data["sniff_radio"].split(".")[2]) obj_sta_connect.dut_security = security
# self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) obj_sta_connect.station_names = data[dut]["station_data"][radio]
obj_sta_connect.runtime_secs = runtime_secs
obj_sta_connect.bringup_time_sec = 80
obj_sta_connect.cleanup_on_exit = True
obj_sta_connect.download_bps = 128000
obj_sta_connect.upload_bps = 128000
obj_sta_connect.side_a_pdu = 1200
obj_sta_connect.side_b_pdu = 1500
# changed to auto channel
self.set_radio_channel(radio=radio, channel="AUTO")
logging.info("scan ssid radio: " + str(radio.split(".")[2]))
result = self.scan_ssid(radio=radio, ssid=ssid)
logging.info("ssid scan data : " + str(result))
if not result:
# Sniffer required
# print("sniff radio", data["sniff_radio"].split(".")[2])
for duts in self.dut_data:
identifier = duts["identifier"]
if dut_data.keys().__contains__(identifier):
if band == "twog":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)[-1])["2G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["2G"][0]
if data[dut]["sniff_radio_2g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_2g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
elif band == "fiveg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)[-1])["5G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["5G"][0]
if data[dut]["sniff_radio_5g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_5g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
elif band == "sixg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)[-1])["6G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["6G"][0]
if data[dut]["sniff_radio_6g"] is not None:
self.start_sniffer(radio_channel=channel,
radio=data[dut]["sniff_radio_6g"].split(".")[2],
duration=10)
time.sleep(10)
self.stop_sniffer()
if not result:
pytest.fail("SSID is not Available in Scan Result")
obj_sta_connect.setup(extra_securities=extra_securities)
sta_connect_obj.append(obj_sta_connect) sta_connect_obj.append(obj_sta_connect)
for dut in self.dut_data: for dut_ in self.dut_data:
identifier = dut["identifier"] identifier = dut_["identifier"]
if dut_data.keys().__contains__(identifier): if dut_data.keys().__contains__(identifier):
if band == "twog": if band == "twog":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \ if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \
dict(dut_data.get(identifier)[-1])["2G"] is not None: dict(dut_data.get(identifier)[-1])["2G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["2G"][0] channel = dict(dut_data.get(identifier)[-1])["2G"][0]
self.start_sniffer(radio_channel=channel, radio=data["sniff_radio_2g"].split(".")[2], self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_2g"].split(".")[2],
duration=runtime_secs) duration=runtime_secs)
logging.info("started-sniffer") logging.info("started-sniffer")
for obj in sta_connect_obj: for obj in sta_connect_obj:
obj.start() obj.start()
logging.info("napping %f sec" % runtime_secs) logging.info("napping %f sec" % runtime_secs)
time.sleep(runtime_secs) time.sleep(runtime_secs)
logging.info("stopping-sniffer") logging.info("stopping-sniffer")
self.stop_sniffer() self.stop_sniffer()
elif band == "fiveg": elif band == "fiveg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \ if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \
dict(dut_data.get(identifier)[-1])["5G"] is not None: dict(dut_data.get(identifier)[-1])["5G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["5G"][0] channel = dict(dut_data.get(identifier)[-1])["5G"][0]
self.start_sniffer(radio_channel=channel, radio=data["sniff_radio_5g"].split(".")[2], self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_5g"].split(".")[2],
duration=runtime_secs) duration=runtime_secs)
for obj in sta_connect_obj: for obj in sta_connect_obj:
obj.start() obj.start()
logging.info("napping %f sec" % runtime_secs) logging.info("napping %f sec" % runtime_secs)
time.sleep(runtime_secs) time.sleep(runtime_secs)
self.stop_sniffer() self.stop_sniffer()
elif band == "sixg": elif band == "sixg":
if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \ if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \
dict(dut_data.get(identifier)[-1])["6G"] is not None: dict(dut_data.get(identifier)[-1])["6G"] is not None:
channel = dict(dut_data.get(identifier)[-1])["6G"][0] channel = dict(dut_data.get(identifier)[-1])["6G"][0]
self.start_sniffer(radio_channel=channel, radio=data["sniff_radio_6g"].split(".")[2], self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_6g"].split(".")[2],
duration=runtime_secs) duration=runtime_secs)
for obj in sta_connect_obj: for obj in sta_connect_obj:
obj.start() obj.start()
logging.info("napping %f sec" % runtime_secs) logging.info("napping %f sec" % runtime_secs)
time.sleep(runtime_secs) time.sleep(runtime_secs)
self.stop_sniffer() self.stop_sniffer()
else: else:
for obj in sta_connect_obj: for obj in sta_connect_obj:
print(obj) print(obj)
obj.start() obj.start()
print("napping %f sec" % runtime_secs) print("napping %f sec" % runtime_secs)
time.sleep(runtime_secs) time.sleep(runtime_secs)
pass_fail_result = [] pass_fail_result = []
for obj in sta_connect_obj: for obj in sta_connect_obj:
sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"] sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"]
@@ -225,7 +207,7 @@ class lf_tests(lf_libs):
cx_table_dict = {} cx_table_dict = {}
upstream = [] upstream = []
for i in range(len(obj.station_names)): for i in range(len(obj.station_names)):
upstream.append(data["upstream_port"]) upstream.append(data[dut]["upstream_port"])
cx_table_dict["Upstream"] = upstream cx_table_dict["Upstream"] = upstream
cx_table_dict["Downstream"] = obj.station_names cx_table_dict["Downstream"] = obj.station_names
cx_tcp_ul = [] cx_tcp_ul = []
@@ -280,9 +262,6 @@ class lf_tests(lf_libs):
logging.info("client connection to" + str(obj.dut_ssid) + "unsuccessful. Test Failed") logging.info("client connection to" + str(obj.dut_ssid) + "unsuccessful. Test Failed")
result = "FAIL" result = "FAIL"
if ssid_channel:
# need to stop sniffer
pass
result = "PASS" result = "PASS"
description = "" description = ""
for i in pass_fail_result: for i in pass_fail_result:
@@ -371,185 +350,191 @@ class lf_tests(lf_libs):
logging.info("ALL Stations got IP's") logging.info("ALL Stations got IP's")
return station_data_all return station_data_all
def add_stations(self, band="2G", num_stations="max", dut="NA", ssid_name=[], idx=0): def add_stations(self, band="2G", num_stations="max", ssid_name=[], idx=0):
logging.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) +
" DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx)) dut_name = []
for index in range(0, len(self.dut_data)):
dut_name.append(self.dut_data[index]["identifier"])
if num_stations == 0: if num_stations == 0:
logging.warning("0 Stations") logging.warning("0 Stations")
return return
idx = idx for dut in dut_name:
if self.run_lf or self.cc_1: logging.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) +
" DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx))
idx = idx
if self.run_lf or self.cc_1:
if band == "2G":
idx = 0
if band == "5G":
idx = 1
for i in self.dut_idx_mapping:
if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band:
idx = i
if band == "2G": if band == "2G":
idx = 0 all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + \
self.ax210_radios
print("all_2g_rdio", all_radio_2g)
if num_stations != "max":
logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g)))
total_sta = num_stations
max_possible = 0
for radio in all_radio_2g:
max_possible = max_possible + int(self.get_max_sta(radio))
if total_sta <= max_possible:
per_radio_sta = int(total_sta / len(all_radio_2g))
rem = total_sta % len(all_radio_2g)
else:
total_sta = max_possible
per_radio_sta = int(total_sta / len(all_radio_2g))
rem = total_sta % len(all_radio_2g)
if rem != 0 and per_radio_sta == 0:
per_radio_sta = rem / len(all_radio_2g)
logging.info("Total stations per radio: " + str(per_radio_sta))
for radio in all_radio_2g:
max_possible = int(self.get_max_sta(radio))
if total_sta == 0:
return
num_stations = per_radio_sta
if rem == 0 and num_stations == 0:
return
if max_possible - num_stations >= rem:
num_stations = num_stations + rem
rem = 0
elif max_possible - rem >= num_stations:
num_stations = num_stations + rem
rem = 0
elif total_sta <= max_possible:
num_stations = total_sta
if per_radio_sta < 1:
num_stations = 1
total_sta = total_sta - num_stations
logging.info("Adding " + str(num_stations) + " Stations on " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.temp_raw_lines.append(station_data)
logging.debug("Raw Line : " + str(station_data))
if num_stations == "max":
logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g)))
for radio in all_radio_2g:
num_stations = self.get_max_sta(radio)
logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.temp_raw_lines.append(station_data)
logging.debug("Raw Line : " + str(station_data))
if band == "5G": if band == "5G":
idx = 1 all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
if num_stations != "max":
logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g)))
total_sta = num_stations
max_possible = 0
for radio in all_radio_5g:
max_possible = max_possible + int(self.get_max_sta(radio))
if total_sta <= max_possible:
per_radio_sta = int(total_sta / len(all_radio_5g))
rem = total_sta % len(all_radio_5g)
else:
total_sta = max_possible
per_radio_sta = int(total_sta / len(all_radio_5g))
rem = total_sta % len(all_radio_5g)
if rem != 0 and per_radio_sta == 0:
per_radio_sta = rem / len(all_radio_5g)
logging.info("Total stations per radio: " + str(per_radio_sta))
for radio in all_radio_5g:
max_possible = int(self.get_max_sta(radio))
if total_sta == 0:
return
num_stations = per_radio_sta
if rem == 0 and num_stations == 0:
return
if max_possible - num_stations >= rem:
num_stations = num_stations + rem
rem = 0
elif max_possible - rem >= num_stations:
num_stations = num_stations + rem
rem = 0
elif total_sta <= max_possible:
num_stations = total_sta
if per_radio_sta < 1:
num_stations = 1
total_sta = total_sta - num_stations
logging.info("Adding " + str(num_stations) + " Stations on " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.temp_raw_lines.append(station_data)
logging.debug("Raw Line : " + str(station_data))
for i in self.dut_idx_mapping: if num_stations == "max":
if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band: logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g)))
idx = i for radio in all_radio_5g:
if band == "2G": num_stations = self.get_max_sta(radio)
all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + \ logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
self.ax210_radios station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
if num_stations != "max": " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
total_sta = num_stations self.temp_raw_lines.append(station_data)
max_possible = 0 logging.debug("Raw Line : " + str(station_data))
for radio in all_radio_2g: if band == "6g":
max_possible = max_possible + int(self.get_max_sta(radio)) all_radio_6g = self.ax210_radios
if total_sta <= max_possible: if num_stations != "max":
per_radio_sta = int(total_sta / len(all_radio_2g)) logging.info("Total 6G Radios Available in Testbed: " + str(len(all_radio_6g)))
rem = total_sta % len(all_radio_2g) total_sta = num_stations
else: max_possible = 0
total_sta = max_possible for radio in all_radio_6g:
per_radio_sta = int(total_sta / len(all_radio_2g)) max_possible = max_possible + int(self.get_max_sta(radio))
rem = total_sta % len(all_radio_2g) if total_sta <= max_possible:
if rem != 0 and per_radio_sta == 0: per_radio_sta = int(total_sta / len(all_radio_6g))
per_radio_sta = rem / len(all_radio_2g) rem = total_sta % len(all_radio_6g)
logging.info("Total stations per radio: " + str(per_radio_sta)) else:
for radio in all_radio_2g: total_sta = max_possible
max_possible = int(self.get_max_sta(radio)) per_radio_sta = int(total_sta / len(all_radio_6g))
if total_sta == 0: rem = total_sta % len(all_radio_6g)
return if rem != 0 and per_radio_sta == 0:
num_stations = per_radio_sta per_radio_sta = rem / len(all_radio_6g)
if rem == 0 and num_stations == 0: logging.info("Total stations per radio: " + str(per_radio_sta))
return for radio in all_radio_6g:
if max_possible - num_stations >= rem: max_possible = int(self.get_max_sta(radio))
num_stations = num_stations + rem if total_sta == 0:
rem = 0 return
elif max_possible - rem >= num_stations: num_stations = per_radio_sta
num_stations = num_stations + rem if rem == 0 and num_stations == 0:
rem = 0 return
elif total_sta <= max_possible: if max_possible - num_stations >= rem:
num_stations = total_sta num_stations = num_stations + rem
if per_radio_sta < 1: rem = 0
num_stations = 1 elif max_possible - rem >= num_stations:
total_sta = total_sta - num_stations num_stations = num_stations + rem
logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) rem = 0
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + elif total_sta <= max_possible:
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + num_stations = total_sta
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] if per_radio_sta < 1:
self.raw_line.append(station_data) num_stations = 1
logging.debug("Raw Line : " + str(station_data)) total_sta = total_sta - num_stations
logging.info("Adding " + str(num_stations) + " Stations on " + str(radio))
if num_stations == "max": station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
for radio in all_radio_2g: str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
num_stations = self.get_max_sta(radio) self.temp_raw_lines.append(station_data)
logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) logging.debug("Raw Line : " + str(station_data))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + if num_stations == "max":
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + logging.info("Total AX Radios Available in Testbed: " + str(len(all_radio_6g)))
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] for radio in all_radio_6g:
self.raw_line.append(station_data) num_stations = self.get_max_sta(radio)
logging.debug("Raw Line : " + str(station_data)) logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
if band == "5G": " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
if num_stations != "max": self.temp_raw_lines.append(station_data)
logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g))) logging.debug("Raw Line : " + str(station_data))
total_sta = num_stations
max_possible = 0
for radio in all_radio_5g:
max_possible = max_possible + int(self.get_max_sta(radio))
if total_sta <= max_possible:
per_radio_sta = int(total_sta / len(all_radio_5g))
rem = total_sta % len(all_radio_5g)
else:
total_sta = max_possible
per_radio_sta = int(total_sta / len(all_radio_5g))
rem = total_sta % len(all_radio_5g)
if rem != 0 and per_radio_sta == 0:
per_radio_sta = rem / len(all_radio_5g)
logging.info("Total stations per radio: " + str(per_radio_sta))
for radio in all_radio_5g:
max_possible = int(self.get_max_sta(radio))
if total_sta == 0:
return
num_stations = per_radio_sta
if rem == 0 and num_stations == 0:
return
if max_possible - num_stations >= rem:
num_stations = num_stations + rem
rem = 0
elif max_possible - rem >= num_stations:
num_stations = num_stations + rem
rem = 0
elif total_sta <= max_possible:
num_stations = total_sta
if per_radio_sta < 1:
num_stations = 1
total_sta = total_sta - num_stations
logging.info("Adding " + str(num_stations) + " Stations on " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
logging.debug("Raw Line : " + str(station_data))
if num_stations == "max":
logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g)))
for radio in all_radio_5g:
num_stations = self.get_max_sta(radio)
logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
logging.debug("Raw Line : " + str(station_data))
if band == "6g":
all_radio_6g = self.ax210_radios
if num_stations != "max":
logging.info("Total 6G Radios Available in Testbed: " + str(len(all_radio_6g)))
total_sta = num_stations
max_possible = 0
for radio in all_radio_6g:
max_possible = max_possible + int(self.get_max_sta(radio))
if total_sta <= max_possible:
per_radio_sta = int(total_sta / len(all_radio_6g))
rem = total_sta % len(all_radio_6g)
else:
total_sta = max_possible
per_radio_sta = int(total_sta / len(all_radio_6g))
rem = total_sta % len(all_radio_6g)
if rem != 0 and per_radio_sta == 0:
per_radio_sta = rem / len(all_radio_6g)
logging.info("Total stations per radio: " + str(per_radio_sta))
for radio in all_radio_6g:
max_possible = int(self.get_max_sta(radio))
if total_sta == 0:
return
num_stations = per_radio_sta
if rem == 0 and num_stations == 0:
return
if max_possible - num_stations >= rem:
num_stations = num_stations + rem
rem = 0
elif max_possible - rem >= num_stations:
num_stations = num_stations + rem
rem = 0
elif total_sta <= max_possible:
num_stations = total_sta
if per_radio_sta < 1:
num_stations = 1
total_sta = total_sta - num_stations
logging.info("Adding " + str(num_stations) + " Stations on " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
logging.debug("Raw Line : " + str(station_data))
if num_stations == "max":
logging.info("Total AX Radios Available in Testbed: " + str(len(all_radio_6g)))
for radio in all_radio_6g:
num_stations = self.get_max_sta(radio)
logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio))
station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] +
" STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]]
self.raw_line.append(station_data)
logging.debug("Raw Line : " + str(station_data))
if __name__ == '__main__': if __name__ == '__main__':
advance_03= { basic_04 = {
"target": "tip_2x", "target": "tip_2x",
"controller": { "controller": {
"url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001",
@@ -557,10 +542,11 @@ if __name__ == '__main__':
"password": "OpenWifi%123" "password": "OpenWifi%123"
}, },
"device_under_tests": [{ "device_under_tests": [{
"model": "cig_wf196", "model": "edgecore_ecw5211",
"supported_bands": ["2G", "5G", "6G"], "supported_bands": ["2G", "5G"],
"supported_modes": ["BRIDGE", "NAT", "VLAN"], "supported_modes": ["BRIDGE", "NAT", "VLAN"],
"wan_port": "1.3.eth2", "wan_port": "1.1.eth2",
"lan_port": "1.1.eth1",
"ssid": { "ssid": {
"2g-ssid": "OpenWifi", "2g-ssid": "OpenWifi",
"5g-ssid": "OpenWifi", "5g-ssid": "OpenWifi",
@@ -568,21 +554,21 @@ if __name__ == '__main__':
"2g-password": "OpenWifi", "2g-password": "OpenWifi",
"5g-password": "OpenWifi", "5g-password": "OpenWifi",
"6g-password": "OpenWifi", "6g-password": "OpenWifi",
"2g-encryption": "WPA2", "2g-encryption": "OPEN",
"5g-encryption": "WPA2", "5g-encryption": "OPEN",
"6g-encryption": "WPA3", "6g-encryption": "OPEN",
"2g-bssid": "68:7d:b4:5f:5c:31", "2g-bssid": "68:7d:b4:5f:5c:31",
"5g-bssid": "68:7d:b4:5f:5c:3c", "5g-bssid": "68:7d:b4:5f:5c:3c",
"6g-bssid": "68:7d:b4:5f:5c:38" "6g-bssid": "68:7d:b4:5f:5c:38"
}, },
"mode": "wifi6e", "mode": "wifi5",
"identifier": "824f816011e4", "identifier": "68215fda456d",
"method": "serial", "method": "serial",
"host_ip": "10.28.3.115", "host_ip": "localhost",
"host_username": "root", "host_username": "lanforge",
"host_password": "pumpkin77", "host_password": "pumpkin77",
"host_ssh_port": 22, "host_ssh_port": 22,
"serial_tty": "/dev/ttyAP0", "serial_tty": "/dev/ttyAP5",
"firmware_version": "next-latest" "firmware_version": "next-latest"
}], }],
"traffic_generator": { "traffic_generator": {
@@ -590,12 +576,12 @@ if __name__ == '__main__':
"testbed": "basic", "testbed": "basic",
"scenario": "dhcp-bridge", "scenario": "dhcp-bridge",
"details": { "details": {
"manager_ip": "10.28.3.117", "manager_ip": "10.28.3.6",
"http_port": 8080, "http_port": 8080,
"ssh_port": 22, "ssh_port": 22,
"setup": {"method": "build", "DB": "Test_Scenario_Automation"}, "setup": {"method": "build", "DB": "Test_Scenario_Automation"},
"wan_ports": { "wan_ports": {
"1.3.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": {
"lease-first": 10, "lease-first": 10,
"lease-count": 10000, "lease-count": 10000,
"lease-time": "6h" "lease-time": "6h"
@@ -603,12 +589,14 @@ if __name__ == '__main__':
} }
}, },
"lan_ports": { "lan_ports": {
"1.1.eth1": {
"addressing": "dynamic"
}
}, },
"uplink_nat_ports": { "uplink_nat_ports": {
"1.3.eth3": { "1.1.eth3": {
"addressing": "static", "addressing": "dhcp-server",
"ip": "10.28.2.39", "ip": "10.28.2.9",
"gateway_ip": "10.28.2.1/24", "gateway_ip": "10.28.2.1/24",
"ip_mask": "255.255.255.0", "ip_mask": "255.255.255.0",
"dns_servers": "BLANK" "dns_servers": "BLANK"
@@ -618,8 +606,13 @@ if __name__ == '__main__':
} }
} }
obj = lf_tests(lf_data=dict(advance_03["traffic_generator"]), dut_data=list(advance_03["device_under_tests"]), obj = lf_tests(lf_data=dict(basic_04["traffic_generator"]), dut_data=list(basic_04["device_under_tests"]),
log_level=logging.DEBUG, run_lf=True) log_level=logging.DEBUG, run_lf=True)
obj.setup_dut()
#obj.add_stations(band="2G", num_stations=6, ssid_name=["OpenWifi"])
#obj.chamber_view(raw_lines="custom")
# A =obj.setup_interfaces(band="fiveg", vlan_id=100, mode="NAT-WAN", num_sta=1)
# print(A)
# obj.setup_relevent_profiles() # obj.setup_relevent_profiles()
# obj.Client_Connect(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", mode="BRIDGE", band="twog", # obj.Client_Connect(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", mode="BRIDGE", band="twog",
# vlan_id=100, num_sta=5, scan_ssid=True, # vlan_id=100, num_sta=5, scan_ssid=True,
@@ -629,9 +622,12 @@ if __name__ == '__main__':
# obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600]) # obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600])
# obj.get_cx_data() # obj.get_cx_data()
# obj.chamber_view() # obj.chamber_view()
# dut = {'0000c1018812': [['OpenWifi', 'wpa2', 'OpenWifi', '2G', '6A:21:5F:DA:45:6F'],
# {'2G': [6, 40, 2437], '5G': None, '6G': None}]}
# c = obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[], # c = obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[],
# num_sta=1, mode="BRIDGE", vlan_id=[100], # num_sta=1, mode="BRIDGE", dut_data=dut,
# band="twog", ssid_channel=11) # band="twog")
# obj.start_sniffer(radio_channel=1, radio="wiphy7", test_name="sniff_radio", duration=30) # obj.start_sniffer(radio_channel=1, radio="wiphy7", test_name="sniff_radio", duration=30)
# print("started") # print("started")
# time.sleep(30) # time.sleep(30)

View File

@@ -1,4 +0,0 @@
,age,auth,beacon,bss,channel,entity id,frequency,info,signal,ssid
0,4698,Open,100,00:00:c1:01:88:15,1,1.1.4,2412,2x2 MCS 0-11 AX,-17.0,Maverick-018812
1,354731974,WPA2 WPA3,100,02:00:c1:01:88:15,1,1.1.4,2412,2x2 MCS 0-11 AX,-22.0,ssid_wpa3_p_m_2g_br
2,354731972,WPA WPA2,100,06:00:c1:01:88:15,1,1.1.4,2412,2x2 MCS 0-11 AX,-21.0,ssid_wpa_wpa2_p_m_2g_br
1 age auth beacon bss channel entity id frequency info signal ssid
2 0 4698 Open 100 00:00:c1:01:88:15 1 1.1.4 2412 2x2 MCS 0-11 AX -17.0 Maverick-018812
3 1 354731974 WPA2 WPA3 100 02:00:c1:01:88:15 1 1.1.4 2412 2x2 MCS 0-11 AX -22.0 ssid_wpa3_p_m_2g_br
4 2 354731972 WPA WPA2 100 06:00:c1:01:88:15 1 1.1.4 2412 2x2 MCS 0-11 AX -21.0 ssid_wpa_wpa2_p_m_2g_br

View File