mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 11:18:03 +00:00
Antenna arg added in set_radio_channel
Signed-off-by: karthikaeyetea <karthika.subramani@candelatech.com>
This commit is contained in:
@@ -938,7 +938,7 @@ class lf_libs:
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
|
||||
def set_radio_channel(self, radio="1.1.wiphy0", channel="AUTO", country=None):
|
||||
def set_radio_channel(self, radio="1.1.wiphy0", channel="AUTO", country=None, antenna=None):
|
||||
# country_code = US(840)
|
||||
try:
|
||||
radio = radio.split(".")
|
||||
@@ -956,6 +956,8 @@ class lf_libs:
|
||||
try:
|
||||
if country: # update the dictionary
|
||||
data["country"] = country
|
||||
if antenna:
|
||||
data["antenna"] = antenna
|
||||
except Exception as e:
|
||||
logging.error(f"{e}\nunable to change lanforge radio country code")
|
||||
local_realm_obj.json_post("/cli-json/set_wifi_radio", _data=data)
|
||||
@@ -1281,6 +1283,46 @@ class lf_libs:
|
||||
name=i,
|
||||
attachment_type="image/png", extension=None)
|
||||
|
||||
def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True, kpi_csv=False,
|
||||
file_name="/csv-data/data-Combined_bps__60_second_running_average-1.csv",
|
||||
batch_size="0"):
|
||||
try:
|
||||
df = pd.read_csv("../reports/" + str(dir_name) + file_name,
|
||||
sep=r'\t', engine='python')
|
||||
logging.info("csv file opened")
|
||||
except FileNotFoundError:
|
||||
logging.info("csv file does not exist")
|
||||
return False
|
||||
|
||||
if kpi_csv:
|
||||
count = 0
|
||||
dict_data = {"Down": {}, "Up": {}, "Both": {}}
|
||||
csv_short_dis = df.loc[:,"short-description"]
|
||||
csv_num_score = df.loc[:,"numeric-score"]
|
||||
for i in range(len(batch_size.split(","))):
|
||||
dict_data["Down"][csv_short_dis[count + 0]] = csv_num_score[count + 0]
|
||||
dict_data["Up"][csv_short_dis[count + 1]] = csv_num_score[count + 1]
|
||||
dict_data["Both"][csv_short_dis[count + 2]] = csv_num_score[count + 2]
|
||||
count += 3
|
||||
|
||||
if individual_station_throughput:
|
||||
dict_data = {}
|
||||
if option == "download":
|
||||
csv_sta_names = df.iloc[[0]].values.tolist()
|
||||
csv_throughput_values = df.iloc[[1]].values.tolist()
|
||||
elif option == "upload":
|
||||
csv_sta_names = df.iloc[[0]].values.tolist()
|
||||
csv_throughput_values = df.iloc[[2]].values.tolist()
|
||||
else:
|
||||
print("Provide proper option: download or upload")
|
||||
return
|
||||
|
||||
sta_list = csv_sta_names[0][0][:-1].replace('"', '').split(",")
|
||||
th_list = list(map(float, csv_throughput_values[0][0].split(",")))
|
||||
for i in range(len(sta_list)):
|
||||
dict_data[sta_list[i]] = th_list[i]
|
||||
return dict_data
|
||||
|
||||
def attach_report_kpi(self, report_name=None, file_name="kpi_file"):
|
||||
if report_name[-1] == "/":
|
||||
path = "../reports/" + str(report_name) + "kpi.csv"
|
||||
|
||||
@@ -959,6 +959,29 @@ class lf_tests(lf_libs):
|
||||
logging.error("5 Ghz channel didn't changed after radar detected")
|
||||
pytest.fail("5 Ghz channel didn't changed after radar detected")
|
||||
|
||||
def update_dut_ssid(self, dut_data={}):
|
||||
r_val = dict()
|
||||
for dut in self.dut_data:
|
||||
r_val[dut["identifier"]] = None
|
||||
# updating ssids on all APS
|
||||
for dut in self.dut_data:
|
||||
ssid_data = []
|
||||
identifier = dut["identifier"]
|
||||
if r_val.keys().__contains__(identifier):
|
||||
for idx_ in dut_data[identifier]["ssid_data"]:
|
||||
if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() == "OPEN":
|
||||
ssid_data.append(
|
||||
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"]
|
||||
+
|
||||
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
|
||||
else:
|
||||
ssid_data.append(
|
||||
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"] +
|
||||
' security=' + str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() +
|
||||
' password=' + dut_data[identifier]["ssid_data"][idx_]["password"] +
|
||||
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
|
||||
self.update_duts(identifier=identifier, ssid_data=ssid_data)
|
||||
|
||||
def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None):
|
||||
dut_name = []
|
||||
# for index in range(0, len(self.dut_data)):
|
||||
@@ -967,10 +990,10 @@ class lf_tests(lf_libs):
|
||||
if num_stations == 0:
|
||||
logging.warning("0 Stations")
|
||||
return
|
||||
idx = None
|
||||
r_val = dict()
|
||||
for dut in self.dut_data:
|
||||
r_val[dut["identifier"]] = None
|
||||
idx = None
|
||||
# updating ssids on all APS
|
||||
for dut in self.dut_data:
|
||||
ssid_data = []
|
||||
@@ -1333,52 +1356,97 @@ class lf_tests(lf_libs):
|
||||
return dataplane_obj_list
|
||||
|
||||
def multi_asso_disasso(self, band="2G", num_stations=16, dut_data={}, idx=0, mode="BRIDGE", vlan=1,
|
||||
instance_name="wct_instance"):
|
||||
def thread_fun(station_list):
|
||||
print(station_list)
|
||||
time.sleep(60)
|
||||
self.local_realm.admin_down(sta_list=station_list)
|
||||
print("stations down")
|
||||
time.sleep(10)
|
||||
self.admin_up(sta_list=station_list)
|
||||
print("stations up")
|
||||
radio, traffic_rate = (self.wave2_5g_radios, '8Mbps') if band == "5G" else (self.wave2_2g_radios, '4Mbps')
|
||||
per_radio_sta = int(num_stations / len(radio))
|
||||
rem = num_stations % len(radio)
|
||||
logging.info("Total stations per radio: " + str(per_radio_sta))
|
||||
num_stations = lambda rem: per_radio_sta+1 if rem else per_radio_sta
|
||||
identifier = list(dut_data.keys())[0]
|
||||
for i in radio:
|
||||
station_data = ["profile_link " + i.split(".")[0] + "." + i.split(".")[1] +
|
||||
" STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" +
|
||||
str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]]
|
||||
rem = 0
|
||||
self.temp_raw_lines.append(station_data)
|
||||
logging.debug("Raw Line : " + str(station_data))
|
||||
instance_name="wct_instance", traffic_direction="upload", traffic_rate="0Mbps"):
|
||||
try:
|
||||
def thread_fun(station_list):
|
||||
time.sleep(60)
|
||||
for i in station_list:
|
||||
self.local_realm.admin_down(i)
|
||||
logging.info("stations down")
|
||||
time.sleep(10)
|
||||
for i in station_list:
|
||||
self.local_realm.admin_up(i)
|
||||
logging.info("stations up")
|
||||
|
||||
self.chamber_view(raw_lines="custom")
|
||||
sta_list = []
|
||||
for rad in radio:
|
||||
self.set_radio_channel(radio=rad, antenna=4)
|
||||
for u in self.json_get("/port/?fields=port+type,alias")['interfaces']:
|
||||
if list(u.values())[0]['port type'] in ['WIFI-STA']:
|
||||
sta_list.append(list(u.keys())[0])
|
||||
# clean l3 traffics which won't get cleaned by deleting old scenario in CV
|
||||
self.client_disconnect(clean_l3_traffic=True)
|
||||
radio = self.wave2_5g_radios if band == "5G" else self.wave2_2g_radios
|
||||
upld_rate, downld_rate = "0Gbps", "0Gbps"
|
||||
if traffic_direction == "upload":
|
||||
upld_rate = traffic_rate
|
||||
elif traffic_direction == "download":
|
||||
downld_rate = traffic_rate
|
||||
per_radio_sta = int(num_stations / len(radio))
|
||||
rem = num_stations % len(radio)
|
||||
logging.info("Total stations per radio: " + str(per_radio_sta))
|
||||
num_stations = lambda rem: per_radio_sta+1 if rem else per_radio_sta
|
||||
identifier = list(dut_data.keys())[0]
|
||||
allure.attach(name="Definition",
|
||||
body="Multiple association/disassociation stability test intends to measure stability of Wi-Fi device " \
|
||||
"under a dynamic environment with frequent change of connection status.")
|
||||
allure.attach(name="Procedure",
|
||||
body=f"This test case definition states that we Create 16 stations on {band} radio and" \
|
||||
" Run Wifi-capacity test for first 8 stations. 8 stations are picked for sending/receiving packets "
|
||||
"while the other 8 STAs are picked to do a dis-association/re-association process during the test" \
|
||||
f" Enable {traffic_direction} {traffic_rate} Mbps UDP flow from DUT to each of the 8 traffic stations" \
|
||||
"Disassociate the other 8 stations. Wait for 30 seconds, after that Re-associate the 8 stations.")
|
||||
|
||||
self.local_realm.admin_up(sta_list=sta_list)
|
||||
sel_stations = ",".join(sta_list[0:8])
|
||||
val = [['ul_rate_sel: Per-Station Upload Rate:']]
|
||||
thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],))
|
||||
thr1.start()
|
||||
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, download_rate="0Gbps",
|
||||
stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=traffic_rate,
|
||||
protocol="UDP-IPv4", duration="120000", create_stations=False, dut_data=dut_data,
|
||||
sort = "interleave",)
|
||||
for i in radio:
|
||||
station_data = ["profile_link " + i.split(".")[0] + "." + i.split(".")[1] +
|
||||
" STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" +
|
||||
str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]]
|
||||
rem = 0
|
||||
self.temp_raw_lines.append(station_data)
|
||||
logging.debug("Raw Line : " + str(station_data))
|
||||
# update the dut ssid in CV
|
||||
self.update_dut_ssid(dut_data=dut_data)
|
||||
self.chamber_view(raw_lines="custom")
|
||||
sta_list = []
|
||||
for rad in radio:
|
||||
self.set_radio_channel(radio=rad, antenna=4)
|
||||
for u in self.json_get("/port/?fields=port+type,alias")['interfaces']:
|
||||
if list(u.values())[0]['port type'] in ['WIFI-STA']:
|
||||
sta_list.append(list(u.keys())[0])
|
||||
|
||||
report_name = wct_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
||||
self.attach_report_graphs(report_name=report_name)
|
||||
csv_val = get_test_library.read_csv_individual_station_throughput(dir_name=report_name, option="upload")
|
||||
print(type(csv_val))
|
||||
print(csv_val)
|
||||
for i in sta_list:
|
||||
self.local_realm.admin_up(i)
|
||||
sel_stations = ",".join(sta_list[0:8])
|
||||
val = [['ul_rate_sel: Per-Station Upload Rate:']]
|
||||
thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],))
|
||||
thr1.start()
|
||||
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, download_rate=downld_rate,
|
||||
stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=upld_rate,
|
||||
protocol="UDP-IPv4", duration="120000", create_stations=False, dut_data=dut_data,
|
||||
sort = "interleave",)
|
||||
|
||||
report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
|
||||
self.attach_report_graphs(report_name=report_name)
|
||||
csv_val = self.read_csv_individual_station_throughput(dir_name=report_name, option=traffic_direction)
|
||||
logging.info(csv_val)
|
||||
pass_value = int(traffic_rate[0]) * 0.99
|
||||
logging.info(csv_val)
|
||||
allure.attach(name="Pass Fail Criteria",
|
||||
body=f"UDP traffic rate is at least 99% of the configured rate for each station. Here configured " \
|
||||
f"traffic rate is {traffic_rate[0]} Mbps so traffic for each station should be {pass_value} Mbps ")
|
||||
if not csv_val:
|
||||
return False, "csv file does not exist"
|
||||
else:
|
||||
pass_fail = [1 if i >= pass_value else 0 for i in csv_val.values()]
|
||||
allure.attach.file(
|
||||
source="../reports/" + report_name + "/csv-data/data-Combined_bps__60_second_running_average-1.csv",
|
||||
name="Throughput CSV file", attachment_type=allure.attachment_type.CSV)
|
||||
if pass_fail.count(0) == 0:
|
||||
return True, "Test passed"
|
||||
else:
|
||||
return False, "Test failed due to lesser value"
|
||||
except Exception as e:
|
||||
logging.error(f"{e}")
|
||||
return False, f"{e}"
|
||||
finally:
|
||||
try:
|
||||
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
|
||||
except Exception as e:
|
||||
logging.error(f"{e}")
|
||||
|
||||
def country_code_channel_division(self, ssid="[BLANK]", passkey='[BLANK]', security="wpa2", mode="BRIDGE",
|
||||
band='twog', num_sta=1, vlan_id=100, channel='1', channel_width=20,
|
||||
|
||||
Reference in New Issue
Block a user