Added be200 radio for WiFi 7

Signed-off-by: jitendracandela <jitendra.kushavah@candelatech.com>
This commit is contained in:
jitendracandela
2024-07-18 14:32:14 +05:30
parent b76cf52e72
commit 2153afcd5b
2 changed files with 85 additions and 44 deletions

View File

@@ -43,7 +43,6 @@ lf_atten_mod_test = importlib.import_module("py-scripts.lf_atten_mod_test")
Attenuator_modify = lf_atten_mod_test.CreateAttenuator
class lf_libs:
"""
"traffic_generator": {
@@ -84,7 +83,7 @@ class lf_libs:
max_ac_stations = None
twog_prefix = "ath10k_2g0"
fiveg_prefix = "ath10k_5g0"
sixg_prefix = "AX210_0"
sixg_prefix = "sixg_0"
ax_prefix = "AX200_0"
pcap_obj = None
"""
@@ -119,6 +118,12 @@ class lf_libs:
6e radio - supports (2.4GHz, 5gHz and 6gHz Band)
Maximum 1 Station per radio
"""
be200_radios = []
"""
6e radio - supports (2.4GHz, 5gHz and 6gHz Band)
Maximum 1 Station per radio
"""
ax210_radios = []
"""
@@ -264,6 +269,13 @@ class lf_libs:
phantom_radios.append(str(data[info]["entity id"]))
logging.error("Radio is in phantom state: " + str(data[info]["entity id"]) +
" ,Please Contact: support@candelatech.com")
if str(data[info]["driver"]).__contains__("BE200"):
self.max_possible_stations += 1
self.max_2g_stations += 1 * int(str(data[info]["max_vifs"]))
self.max_5g_stations += 1 * int(str(data[info]["max_vifs"]))
self.max_6g_stations += 1 * int(str(data[info]["max_vifs"]))
self.max_ax_stations += 1 * int(str(data[info]["max_vifs"]))
self.be200_radios.append(info)
if str(data[info]["driver"]).__contains__("AX210"):
self.max_possible_stations += 1
self.max_2g_stations += 1 * int(str(data[info]["max_vifs"]))
@@ -501,17 +513,22 @@ class lf_libs:
dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"ax210_radios": self.ax210_radios}
"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios
}
dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios
}
dict_all_radios_6g = {"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios}
dict_all_radios_6g = {"ax210_radios": self.ax210_radios}
max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19,
"ax200_radios": 1, "ax210_radios": 1}
"ax200_radios": 1, "ax210_radios": 1, "be200_radios": 1}
radio_data = {}
sniff_radio = ""
@@ -531,7 +548,7 @@ class lf_libs:
pytest.skip("Can't create %s stations on lanforge" % num_sta)
# checking atleast one 2g radio is available or not
elif len(self.wave2_2g_radios) == 0 and len(self.wave1_radios) == 0 and len(self.ax210_radios) == 0 and len(
self.ax200_radios) == 0 and len(self.mtk_radios) == 0:
self.ax200_radios) == 0 and len(self.mtk_radios) == 0 and len(self.be200_radios) == 0:
logging.error("Twog radio is not available")
pytest.skip("Twog radio is not available")
# radio and station selection
@@ -561,7 +578,7 @@ class lf_libs:
pytest.skip("Can't create %s stations on lanforge" % num_sta)
# checking atleast one 5g radio is available or not
elif len(self.wave2_5g_radios) == 0 and len(self.wave1_radios) == 0 and len(self.ax210_radios) == 0 and len(
self.ax200_radios) == 0 and len(self.mtk_radios) == 0:
self.ax200_radios) == 0 and len(self.mtk_radios) == 0 and len(self.be200_radios) == 0:
logging.error("fiveg radio is not available")
pytest.skip("fiveg radio is not available")
@@ -592,7 +609,7 @@ class lf_libs:
logging.error("Can't create %s stations on lanforge" % num_sta)
pytest.skip("Can't create %s stations on lanforge" % num_sta)
# checking atleast one 6g radio is available or not
elif len(self.ax210_radios) == 0:
elif len(self.be200_radios) == 0 and len(self.ax210_radios) == 0:
logging.error("sixg radio is not available")
pytest.skip("sixg radio is not available")
@@ -679,7 +696,8 @@ class lf_libs:
r_val[dut["identifier"]]["passkey"] = passkey
r_val[dut["identifier"]]["encryption"] = encryption
r_val[dut["identifier"]]["bssid"] = bssid
r_val[dut["identifier"]]["channel"] = dut_data_[dut["identifier"]]["radio_data"][temp_band]["channel"]
r_val[dut["identifier"]]["channel"] = dut_data_[dut["identifier"]]["radio_data"][temp_band][
"channel"]
if str(encryption).upper() == "OPEN":
ssid_data.append(['ssid_idx=0 ssid=' + ssid +
' bssid=' + str(bssid).upper()])
@@ -829,7 +847,8 @@ class lf_libs:
"""Setup sniff radio"""
sniff_radio = None
if band == "twog":
all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
all_radio_2g = (self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios +
self.be200_radios + self.ax210_radios)
logging.info("All 2g radios" + str(all_radio_2g))
left_radio = list(set(all_radio_2g) - set(list(station_radio_data.keys())))
if len(left_radio) == 0:
@@ -838,7 +857,8 @@ class lf_libs:
else:
sniff_radio = left_radio[0]
elif band == "fiveg":
all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
all_radio_5g = (self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios
+ self.be200_radios + self.ax210_radios)
logging.info("All 5g radios" + str(all_radio_5g))
left_radio = list(set(all_radio_5g) - set(list(station_radio_data.keys())))
if len(left_radio) == 0:
@@ -847,7 +867,7 @@ class lf_libs:
else:
sniff_radio = left_radio[0]
elif band == "sixg":
all_radio_6g = self.ax210_radios
all_radio_6g = self.be200_radios + self.ax210_radios
logging.info("All 6g radios" + str(all_radio_6g))
left_radio = list(set(all_radio_6g) - set(list(station_radio_data.keys())))
if len(left_radio) == 0:
@@ -964,7 +984,7 @@ class lf_libs:
"channel": channel
}
try:
if country: # update the dictionary
if country: # update the dictionary
data["country"] = country
if antenna:
data["antenna"] = antenna
@@ -1155,7 +1175,7 @@ class lf_libs:
profile_name = "vlan_profile"
port_list.append(str(port) + "." + str(vlans))
vlan_raws.append(["profile_link " + port + " " + profile_name + " 1 " + port
+ " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)])
+ " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)])
self.temp_raw_lines.append(["profile_link " + port + " " + profile_name + " 1 " + port
+ " NA " + port.split(".")[2] + ",AUTO -1 " + str(vlans)])
@@ -1209,7 +1229,6 @@ class lf_libs:
else:
return vlan_raws
def chamber_view(self, delete_old_scenario=True, raw_lines="default"):
"""create chamber view. raw_lines values are default | custom"""
if delete_old_scenario:
@@ -1304,17 +1323,18 @@ class lf_libs:
name=i,
attachment_type="image/png", extension=None)
def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True, kpi_csv=False,
def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True,
kpi_csv=False,
file_name="/csv-data/data-Combined_bps__60_second_running_average-1.csv",
batch_size="0"):
try:
df = pd.read_csv("../reports/" + str(dir_name) + file_name,
sep=r'\t', engine='python')
sep=r'\t', engine='python')
logging.info("csv file opened")
except FileNotFoundError:
logging.info(f"csv file {file_name} does not exist\nTrying {file_name.replace('_bps__','_Mbps__')}")
logging.info(f"csv file {file_name} does not exist\nTrying {file_name.replace('_bps__', '_Mbps__')}")
try:
df = pd.read_csv("../reports/" + str(dir_name) + file_name.replace('_bps__','_Mbps__'),
df = pd.read_csv("../reports/" + str(dir_name) + file_name.replace('_bps__', '_Mbps__'),
sep=r'\t', engine='python')
logging.info("csv file opened")
except FileNotFoundError:
@@ -1324,8 +1344,8 @@ class lf_libs:
if kpi_csv:
count = 0
dict_data = {"Down": {}, "Up": {}, "Both": {}}
csv_short_dis = df.loc[:,"short-description"]
csv_num_score = df.loc[:,"numeric-score"]
csv_short_dis = df.loc[:, "short-description"]
csv_num_score = df.loc[:, "numeric-score"]
for i in range(len(batch_size.split(","))):
dict_data["Down"][csv_short_dis[count + 0]] = csv_num_score[count + 0]
dict_data["Up"][csv_short_dis[count + 1]] = csv_num_score[count + 1]
@@ -1360,7 +1380,7 @@ class lf_libs:
name=file_name, attachment_type="CSV")
return os.path.exists(path)
def get_supplicant_logs(self, radio="1.1.wiphy0", sta_list=[], attach_allure=True):
def get_supplicant_logs(self, radio="1.1.wiphy0", sta_list=[], attach_allure=True):
try:
resource = radio.split(".")[1]
radio = radio.split(".")[2]
@@ -1522,9 +1542,10 @@ class lf_libs:
return ap_lanforge_6g_channel_dict[channel]
else:
return None
def attenuator_serial(self):
self.obj = Attenuator(
lfclient_host= self.manager_ip,
lfclient_host=self.manager_ip,
lfclient_port=self.manager_http_port
)
val = self.obj.show()
@@ -1548,17 +1569,18 @@ class lf_libs:
return signal
def attenuator_serial_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", atn_val=400,
vlan_id=100, client_type=0, station_name=[], radio='1.1.wiphy0', timeout=20):
vlan_id=100, client_type=0, station_name=[], radio='1.1.wiphy0', timeout=20):
# index 0 of atten_serial_radio will ser no of 1st 2g/5g radio and index 1 will ser no of 2nd and 3rd 2g/5g radio
atten_serial_radio = []
atten_serial = self.attenuator_serial()
self.client_connect_using_radio(ssid=ssid, passkey=passkey, security=security, mode=mode,
vlan_id=vlan_id, radio=radio, client_type=client_type, station_name=station_name)
signal1 = self.get_station_signal(station_name[0],timeout)
vlan_id=vlan_id, radio=radio, client_type=client_type,
station_name=station_name)
signal1 = self.get_station_signal(station_name[0], timeout)
atten_sr = atten_serial[0].split(".")
self.attenuator_modify(int(atten_sr[2]), "all", atn_val)
time.sleep(0.5)
signal2 = self.get_station_signal(station_name[0],timeout)
signal2 = self.get_station_signal(station_name[0], timeout)
try:
if abs(int(signal2.split(" ")[0])) - abs(int(signal1.split(" ")[0])) >= 5:
atten_serial_radio = atten_serial
@@ -1587,7 +1609,8 @@ class lf_libs:
result = df[column_name].values.tolist()
return result
def monitor(self, duration_sec, monitor_interval, created_cx, col_names, iterations, side_a_min_rate=0, side_b_min_rate=0):
def monitor(self, duration_sec, monitor_interval, created_cx, col_names, iterations, side_a_min_rate=0,
side_b_min_rate=0):
try:
duration_sec = self.local_realm.parse_time(duration_sec).seconds
except:
@@ -1610,7 +1633,8 @@ class lf_libs:
for test in range(1 + iterations):
while datetime.now() < end_time:
index += 1
response = list(self.json_get('/cx/%s?fields=%s' % (','.join(created_cx), ",".join(col_names))).values())[2:]
response = list(
self.json_get('/cx/%s?fields=%s' % (','.join(created_cx), ",".join(col_names))).values())[2:]
self.bps_rx[index] = list(map(lambda i: [float(f"{x / (1E6):.2f}") for x in i.values()], response))
time.sleep(monitor_interval)
# bps_rx list is calculated
@@ -1680,13 +1704,14 @@ class lf_libs:
while timeout:
timeout -= 1
check_run_state = list(self.json_get('/cx/%s?fields=%s' % (','.join(
cx_profile.created_cx.keys()), ",".join(['bps rx a', 'bps rx b']))).values())[2:]
cx_profile.created_cx.keys()), ",".join(['bps rx a', 'bps rx b']))).values())[2:]
for i in check_run_state:
if list(i.values()).count(0) != len(i):
timeout = 0
break
def allure_report_table_format(self, dict_data=None, key=None, value=None, name=None):#, value_on_same_table=True):
def allure_report_table_format(self, dict_data=None, key=None, value=None,
name=None): #, value_on_same_table=True):
report_obj = Report()
data_table, dict_table = "", {}
dict_table[key] = list(dict_data.keys())
@@ -1718,6 +1743,7 @@ class lf_libs:
"wave1_radios": self.wave1_radios,
"mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios
}
dict_all_radios_5g = {
@@ -1725,6 +1751,7 @@ class lf_libs:
"wave1_radios": self.wave1_radios,
"mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios
}
max_station_per_radio = {
@@ -1733,6 +1760,7 @@ class lf_libs:
"wave1_radios": 64,
"mtk_radios": 19,
"ax200_radios": 1,
"be200_radios": 1,
"ax210_radios": 1
}
@@ -1839,7 +1867,7 @@ class lf_libs:
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["5G"]["channel"]
elif band == "sixg":
sniffer_channel = dict(dut_data.get(identifier)["radio_data"])["6G"]["channel"]
sniff_radio = self.setup_sniffer(band=band, station_radio_data={radio:1}) # to setup sniffer radio
sniff_radio = self.setup_sniffer(band=band, station_radio_data={radio: 1}) # to setup sniffer radio
print("sniffer_radio", sniff_radio)
if radio is not None and sniffer_channel is not None:
self.start_sniffer(radio_channel=sniffer_channel, radio=sniff_radio, duration=60)

View File

@@ -969,17 +969,19 @@ class lf_tests(lf_libs):
dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios}
dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios,
"wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios}
dict_all_radios_6g = {"ax210_radios": self.ax210_radios}
dict_all_radios_6g = {"be200_radios": self.be200_radios, "ax210_radios": self.ax210_radios}
max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19,
"ax200_radios": 1, "ax210_radios": 1}
"ax200_radios": 1, "ax210_radios": 1, "be200_radios": 1}
radio_data = {}
sniff_radio = ""
@@ -1493,9 +1495,11 @@ class lf_tests(lf_libs):
# clean l3 traffics which won't get cleaned by deleting old scenario in CV
self.client_disconnect(clean_l3_traffic=True)
all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
all_radio_5g = (self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios +
self.be200_radios + self.ax210_radios)
logging.info("All 5g radios" + str(all_radio_5g))
all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
all_radio_2g = (self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios +
self.be200_radios + self.ax210_radios)
logging.info("All 2g radios" + str(all_radio_2g))
radio = all_radio_5g[:2] if band == "5G" else all_radio_2g[:2]
logging.info("Radios: " + str(radio))
@@ -2118,9 +2122,11 @@ class lf_tests(lf_libs):
"of data_rates 40% of throughput_1 and 40% of throughput_4 as throughput_7")
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
sta = list(map(lambda i: f"sta000{i}", range(3)))
all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
all_radio_5g = (self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios +
self.be200_radios + self.ax210_radios)
logging.info("All 5g radios" + str(all_radio_5g))
all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
all_radio_2g = (self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios +
self.be200_radios + self.ax210_radios)
logging.info("All 2g radios" + str(all_radio_2g))
if len(all_radio_5g) < 3:
pytest.fail("3 Radios are not available")
@@ -2393,10 +2399,12 @@ class lf_tests(lf_libs):
batch_size = batch_size
if band == "twog":
station_name = self.twog_prefix
radio_prefix = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
radio_prefix = (self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios +
self.be200_radios + self.ax210_radios)
elif band == "fiveg":
station_name = self.fiveg_prefix
radio_prefix = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
radio_prefix = (self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios +
self.be200_radios + self.ax210_radios)
print("station_name:", station_name)
print("radio:", radio_prefix)
@@ -2684,12 +2692,14 @@ class lf_tests(lf_libs):
# selecting radio(s) based on the requested bands of the client(s)
dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios, "wave1_radios": self.wave1_radios,
"mtk_radios": self.mtk_radios, "ax200_radios": self.ax200_radios,
"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios}
dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios, "wave1_radios": self.wave1_radios,
"mtk_radios": self.mtk_radios, "ax200_radios": self.ax200_radios,
"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios}
max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19,
"ax200_radios": 1, "ax210_radios": 1}
"ax200_radios": 1, "ax210_radios": 1, "be200_radios": 1}
radio_name_2g = []
radio_name_5g = []
if band_2g is True and band_5g is True: # a 2G and a 5G station
@@ -2857,6 +2867,7 @@ class lf_tests(lf_libs):
dict_all_radios_ax = {"mtk_radios": self.mtk_radios,
"ax200_radios": self.ax200_radios,
"be200_radios": self.be200_radios,
"ax210_radios": self.ax210_radios}
selected_ax_radio = None
for radio in dict_all_radios_ax:
@@ -3414,9 +3425,11 @@ class lf_tests(lf_libs):
dut_data=dut_data)
sta_list = sta_list + list(station_result.keys())
else:
all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
all_radio_5g = (self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios +
self.be200_radios + self.ax210_radios)
logging.info("All 5g radios" + str(all_radio_5g))
all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios
all_radio_2g = (self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios +
self.be200_radios + self.ax210_radios)
logging.info("All 2g radios" + str(all_radio_2g))
if band == "twog":
radio_prefix = all_radio_2g