Merge branch 'WIFI-1321-create-a-lan-forge-pip-module' of https://github.com/Telecominfraproject/wlan-lanforge-scripts into WIFI-1321-create-a-lan-forge-pip-module

This commit is contained in:
jitendracandela
2022-11-26 00:46:00 +05:30
2 changed files with 169 additions and 49 deletions

View File

@@ -938,7 +938,7 @@ class lf_libs:
except Exception as e: except Exception as e:
logging.error(e) logging.error(e)
def set_radio_channel(self, radio="1.1.wiphy0", channel="AUTO", country=None): def set_radio_channel(self, radio="1.1.wiphy0", channel="AUTO", country=None, antenna=None):
# country_code = US(840) # country_code = US(840)
try: try:
radio = radio.split(".") radio = radio.split(".")
@@ -956,6 +956,8 @@ class lf_libs:
try: try:
if country: # update the dictionary if country: # update the dictionary
data["country"] = country data["country"] = country
if antenna:
data["antenna"] = antenna
except Exception as e: except Exception as e:
logging.error(f"{e}\nunable to change lanforge radio country code") logging.error(f"{e}\nunable to change lanforge radio country code")
local_realm_obj.json_post("/cli-json/set_wifi_radio", _data=data) local_realm_obj.json_post("/cli-json/set_wifi_radio", _data=data)
@@ -1282,6 +1284,46 @@ class lf_libs:
name=i, name=i,
attachment_type="image/png", extension=None) attachment_type="image/png", extension=None)
def read_csv_individual_station_throughput(self, dir_name, option, individual_station_throughput=True, kpi_csv=False,
file_name="/csv-data/data-Combined_bps__60_second_running_average-1.csv",
batch_size="0"):
try:
df = pd.read_csv("../reports/" + str(dir_name) + file_name,
sep=r'\t', engine='python')
logging.info("csv file opened")
except FileNotFoundError:
logging.info("csv file does not exist")
return False
if kpi_csv:
count = 0
dict_data = {"Down": {}, "Up": {}, "Both": {}}
csv_short_dis = df.loc[:,"short-description"]
csv_num_score = df.loc[:,"numeric-score"]
for i in range(len(batch_size.split(","))):
dict_data["Down"][csv_short_dis[count + 0]] = csv_num_score[count + 0]
dict_data["Up"][csv_short_dis[count + 1]] = csv_num_score[count + 1]
dict_data["Both"][csv_short_dis[count + 2]] = csv_num_score[count + 2]
count += 3
if individual_station_throughput:
dict_data = {}
if option == "download":
csv_sta_names = df.iloc[[0]].values.tolist()
csv_throughput_values = df.iloc[[1]].values.tolist()
elif option == "upload":
csv_sta_names = df.iloc[[0]].values.tolist()
csv_throughput_values = df.iloc[[2]].values.tolist()
else:
print("Provide proper option: download or upload")
return
sta_list = csv_sta_names[0][0][:-1].replace('"', '').split(",")
th_list = list(map(float, csv_throughput_values[0][0].split(",")))
for i in range(len(sta_list)):
dict_data[sta_list[i]] = th_list[i]
return dict_data
def attach_report_kpi(self, report_name=None, file_name="kpi_file"): def attach_report_kpi(self, report_name=None, file_name="kpi_file"):
if report_name[-1] == "/": if report_name[-1] == "/":
path = "../reports/" + str(report_name) + "kpi.csv" path = "../reports/" + str(report_name) + "kpi.csv"

View File

@@ -574,6 +574,7 @@ class lf_tests(lf_libs):
vlan_ids = list(mpsk_data.keys()) vlan_ids = list(mpsk_data.keys())
if "default" in vlan_ids: if "default" in vlan_ids:
vlan_ids.remove("default") vlan_ids.remove("default")
data = self.setup_interfaces(ssid=ssid, passkey=passkey, encryption=encryption, data = self.setup_interfaces(ssid=ssid, passkey=passkey, encryption=encryption,
band=band, vlan_id=vlan_ids, mode="VLAN", num_sta=num_sta, dut_data_=dut_data) band=band, vlan_id=vlan_ids, mode="VLAN", num_sta=num_sta, dut_data_=dut_data)
if data == {}: if data == {}:
@@ -581,8 +582,6 @@ class lf_tests(lf_libs):
logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2)) logging.info("Setup interface data:\n" + json.dumps(str(data), indent=2))
allure.attach(name="Interface Info: \n", body=json.dumps(str(data), indent=2),
attachment_type=allure.attachment_type.JSON)
# query and fetch vlan Ip Address # query and fetch vlan Ip Address
port_data = self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces'] port_data = self.json_get(_req_url="/port?fields=alias,port+type,ip,mac")['interfaces']
@@ -766,7 +765,7 @@ class lf_tests(lf_libs):
[sta_data[i][item]['alias'], str(i), f'{exp1[0]}.{exp1[1]}.X.X', sta_data[i][item]['ip'], [sta_data[i][item]['alias'], str(i), f'{exp1[0]}.{exp1[1]}.X.X', sta_data[i][item]['ip'],
sta_data[i][item]['mac'], sta_data[i][item]['mac'],
f'{pf}']) f'{pf}'])
elif str(i) == "WAN Upstream": elif str(i) == "WAN Upstream" and mode == "BRIDGE":
for item in sta_data[i]: for item in sta_data[i]:
exp2 = sta_data[i][item]['ip'].split('.') exp2 = sta_data[i][item]['ip'].split('.')
ip2 = vlan_data[str(i)]['subnet'].split('.') ip2 = vlan_data[str(i)]['subnet'].split('.')
@@ -780,10 +779,21 @@ class lf_tests(lf_libs):
[sta_data[i][item]['alias'], str(i), vlan_data[str(i)]['subnet'], [sta_data[i][item]['alias'], str(i), vlan_data[str(i)]['subnet'],
sta_data[i][item]['ip'], sta_data[i][item]['mac'], sta_data[i][item]['ip'], sta_data[i][item]['mac'],
f'{pf}']) f'{pf}'])
elif str(i) == "WAN Upstream" and mode == "NAT-WAN":
for item in sta_data[i]:
exp3 = sta_data[i][item]['ip'].split('.')
if exp3[0] == '192' and exp3[1] == '168':
pf = 'PASS'
logging.info(f"PASS: Station got IP from WAN Upstream")
else:
pf = 'FAIL'
logging.info(f"FAIL: Station did not got IP from WAN Upstream")
table_data.append(
[sta_data[i][item]['alias'], 'WAN upstream', f'192.168.X.X', sta_data[i][item]['ip'],
sta_data[i][item]['mac'], f'{pf}'])
elif str(i) == "LAN Upstream": elif str(i) == "LAN Upstream":
for item in sta_data[i]: for item in sta_data[i]:
exp3 = sta_data[i][item]['ip'].split('.') exp3 = sta_data[i][item]['ip'].split('.')
ip3 = vlan_data[str(i)]['ip'].split('.')
if exp3[0] == '192' and exp3[1] == '168': if exp3[0] == '192' and exp3[1] == '168':
pf = 'PASS' pf = 'PASS'
logging.info(f"PASS: Station got IP from LAN Upstream") logging.info(f"PASS: Station got IP from LAN Upstream")
@@ -949,6 +959,29 @@ class lf_tests(lf_libs):
logging.error("5 Ghz channel didn't changed after radar detected") logging.error("5 Ghz channel didn't changed after radar detected")
pytest.fail("5 Ghz channel didn't changed after radar detected") pytest.fail("5 Ghz channel didn't changed after radar detected")
def update_dut_ssid(self, dut_data={}):
r_val = dict()
for dut in self.dut_data:
r_val[dut["identifier"]] = None
# updating ssids on all APS
for dut in self.dut_data:
ssid_data = []
identifier = dut["identifier"]
if r_val.keys().__contains__(identifier):
for idx_ in dut_data[identifier]["ssid_data"]:
if str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() == "OPEN":
ssid_data.append(
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"]
+
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
else:
ssid_data.append(
['ssid_idx=' + str(idx_) + ' ssid=' + dut_data[identifier]["ssid_data"][idx_]["ssid"] +
' security=' + str(dut_data[identifier]["ssid_data"][idx_]["encryption"]).upper() +
' password=' + dut_data[identifier]["ssid_data"][idx_]["password"] +
' bssid=' + str(dut_data[identifier]["ssid_data"][idx_]["bssid"]).upper()])
self.update_duts(identifier=identifier, ssid_data=ssid_data)
def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None): def add_stations(self, band="2G", num_stations=9, ssid_name="", dut_data={}, identifier=None):
dut_name = [] dut_name = []
# for index in range(0, len(self.dut_data)): # for index in range(0, len(self.dut_data)):
@@ -957,10 +990,10 @@ class lf_tests(lf_libs):
if num_stations == 0: if num_stations == 0:
logging.warning("0 Stations") logging.warning("0 Stations")
return return
idx = None
r_val = dict() r_val = dict()
for dut in self.dut_data: for dut in self.dut_data:
r_val[dut["identifier"]] = None r_val[dut["identifier"]] = None
idx = None
# updating ssids on all APS # updating ssids on all APS
for dut in self.dut_data: for dut in self.dut_data:
ssid_data = [] ssid_data = []
@@ -1323,52 +1356,97 @@ class lf_tests(lf_libs):
return dataplane_obj_list return dataplane_obj_list
def multi_asso_disasso(self, band="2G", num_stations=16, dut_data={}, idx=0, mode="BRIDGE", vlan=1, def multi_asso_disasso(self, band="2G", num_stations=16, dut_data={}, idx=0, mode="BRIDGE", vlan=1,
instance_name="wct_instance"): instance_name="wct_instance", traffic_direction="upload", traffic_rate="0Mbps"):
def thread_fun(station_list): try:
print(station_list) def thread_fun(station_list):
time.sleep(60) time.sleep(60)
self.local_realm.admin_down(sta_list=station_list) for i in station_list:
print("stations down") self.local_realm.admin_down(i)
time.sleep(10) logging.info("stations down")
self.admin_up(sta_list=station_list) time.sleep(10)
print("stations up") for i in station_list:
radio, traffic_rate = (self.wave2_5g_radios, '8Mbps') if band == "5G" else (self.wave2_2g_radios, '4Mbps') self.local_realm.admin_up(i)
per_radio_sta = int(num_stations / len(radio)) logging.info("stations up")
rem = num_stations % len(radio)
logging.info("Total stations per radio: " + str(per_radio_sta))
num_stations = lambda rem: per_radio_sta+1 if rem else per_radio_sta
identifier = list(dut_data.keys())[0]
for i in radio:
station_data = ["profile_link " + i.split(".")[0] + "." + i.split(".")[1] +
" STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" +
str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]]
rem = 0
self.temp_raw_lines.append(station_data)
logging.debug("Raw Line : " + str(station_data))
self.chamber_view(raw_lines="custom") # clean l3 traffics which won't get cleaned by deleting old scenario in CV
sta_list = [] self.client_disconnect(clean_l3_traffic=True)
for rad in radio: radio = self.wave2_5g_radios if band == "5G" else self.wave2_2g_radios
self.set_radio_channel(radio=rad, antenna=4) upld_rate, downld_rate = "0Gbps", "0Gbps"
for u in self.json_get("/port/?fields=port+type,alias")['interfaces']: if traffic_direction == "upload":
if list(u.values())[0]['port type'] in ['WIFI-STA']: upld_rate = traffic_rate
sta_list.append(list(u.keys())[0]) elif traffic_direction == "download":
downld_rate = traffic_rate
per_radio_sta = int(num_stations / len(radio))
rem = num_stations % len(radio)
logging.info("Total stations per radio: " + str(per_radio_sta))
num_stations = lambda rem: per_radio_sta+1 if rem else per_radio_sta
identifier = list(dut_data.keys())[0]
allure.attach(name="Definition",
body="Multiple association/disassociation stability test intends to measure stability of Wi-Fi device " \
"under a dynamic environment with frequent change of connection status.")
allure.attach(name="Procedure",
body=f"This test case definition states that we Create 16 stations on {band} radio and" \
" Run Wifi-capacity test for first 8 stations. 8 stations are picked for sending/receiving packets "
"while the other 8 STAs are picked to do a dis-association/re-association process during the test" \
f" Enable {traffic_direction} {traffic_rate} Mbps UDP flow from DUT to each of the 8 traffic stations" \
"Disassociate the other 8 stations. Wait for 30 seconds, after that Re-associate the 8 stations.")
self.local_realm.admin_up(sta_list=sta_list) for i in radio:
sel_stations = ",".join(sta_list[0:8]) station_data = ["profile_link " + i.split(".")[0] + "." + i.split(".")[1] +
val = [['ul_rate_sel: Per-Station Upload Rate:']] " STA-AUTO " + str(num_stations(rem)) + " 'DUT: " + identifier + " Radio-" +
thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],)) str(int(idx) + 1) + "'" + " NA " + i.split(".")[2]]
thr1.start() rem = 0
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, download_rate="0Gbps", self.temp_raw_lines.append(station_data)
stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=traffic_rate, logging.debug("Raw Line : " + str(station_data))
protocol="UDP-IPv4", duration="120000", create_stations=False, dut_data=dut_data, # update the dut ssid in CV
sort = "interleave",) self.update_dut_ssid(dut_data=dut_data)
self.chamber_view(raw_lines="custom")
sta_list = []
for rad in radio:
self.set_radio_channel(radio=rad, antenna=4)
for u in self.json_get("/port/?fields=port+type,alias")['interfaces']:
if list(u.values())[0]['port type'] in ['WIFI-STA']:
sta_list.append(list(u.keys())[0])
report_name = wct_obj.report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] for i in sta_list:
self.attach_report_graphs(report_name=report_name) self.local_realm.admin_up(i)
csv_val = get_test_library.read_csv_individual_station_throughput(dir_name=report_name, option="upload") sel_stations = ",".join(sta_list[0:8])
print(type(csv_val)) val = [['ul_rate_sel: Per-Station Upload Rate:']]
print(csv_val) thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],))
thr1.start()
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, download_rate=downld_rate,
stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=upld_rate,
protocol="UDP-IPv4", duration="120000", create_stations=False, dut_data=dut_data,
sort = "interleave",)
report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
self.attach_report_graphs(report_name=report_name)
csv_val = self.read_csv_individual_station_throughput(dir_name=report_name, option=traffic_direction)
logging.info(csv_val)
pass_value = int(traffic_rate[0]) * 0.99
logging.info(csv_val)
allure.attach(name="Pass Fail Criteria",
body=f"UDP traffic rate is at least 99% of the configured rate for each station. Here configured " \
f"traffic rate is {traffic_rate[0]} Mbps so traffic for each station should be {pass_value} Mbps ")
if not csv_val:
return False, "csv file does not exist"
else:
pass_fail = [1 if i >= pass_value else 0 for i in csv_val.values()]
allure.attach.file(
source="../reports/" + report_name + "/csv-data/data-Combined_bps__60_second_running_average-1.csv",
name="Throughput CSV file", attachment_type=allure.attachment_type.CSV)
if pass_fail.count(0) == 0:
return True, "Test passed"
else:
return False, "Test failed due to lesser value"
except Exception as e:
logging.error(f"{e}")
return False, f"{e}"
finally:
try:
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
except Exception as e:
logging.error(f"{e}")
def country_code_channel_division(self, ssid="[BLANK]", passkey='[BLANK]', security="wpa2", mode="BRIDGE", def country_code_channel_division(self, ssid="[BLANK]", passkey='[BLANK]', security="wpa2", mode="BRIDGE",
band='twog', num_sta=1, vlan_id=100, channel='1', channel_width=20, band='twog', num_sta=1, vlan_id=100, channel='1', channel_width=20,