mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
Added multi-station performance, spacial consistency, rate vs range functionalities
Signed-off-by: tarun-candela <tarunkumar.madabathula@candelatech.com>
This commit is contained in:
@@ -1865,6 +1865,289 @@ class lf_tests(lf_libs):
|
||||
pass
|
||||
return self.dualbandptest_obj
|
||||
|
||||
def multi_station_performance(self, ssid_name=None, security_key=None, mode="BRIDGE", vlan=1, band="twog", antenna=1,
|
||||
instance_name="", set_att_db="10db",download_rate="0Gbps",upload_rate="1Gbps",
|
||||
batch_size="",protocol="UDP-IPv4",duration="120000",expected_throughput=35,
|
||||
traffic_type="udp_upload",create_vlan=True):
|
||||
global station_name,radio_prefix, set_value, set_value1, type
|
||||
self.chamber_view()
|
||||
self.client_disconnect(clean_l3_traffic=True)
|
||||
batch_size = batch_size
|
||||
if band == "twog":
|
||||
station_name = self.twog_prefix
|
||||
radio_prefix = self.wave2_2g_radios
|
||||
elif band == "fiveg":
|
||||
station_name = self.fiveg_prefix
|
||||
radio_prefix = self.wave2_5g_radios
|
||||
print("station_name:", station_name)
|
||||
print("radio:", radio_prefix)
|
||||
|
||||
# setting attenuator serial
|
||||
atten_sr = self.attenuator_serial()
|
||||
print(atten_sr)
|
||||
atten_sr1 = atten_sr[1].split(".")
|
||||
atten_sr2 = atten_sr[0].split(".")
|
||||
print(atten_sr1, atten_sr2)
|
||||
|
||||
# creating stations
|
||||
if batch_size == "3":
|
||||
radio_name = radio_prefix[0]
|
||||
print("radio:", radio_name)
|
||||
values = radio_name.split(".")
|
||||
shelf = int(values[0])
|
||||
resource = int(values[1])
|
||||
print(shelf, resource)
|
||||
sta = []
|
||||
for i in range(3):
|
||||
sta.append(station_name + str(i))
|
||||
print(sta)
|
||||
data = {"shelf": shelf, "resource": resource, "radio": values[2], "antenna": antenna}
|
||||
self.json_post(_req_url="cli-json/set_wifi_radio", data=data)
|
||||
sta_ip = self.client_connect_using_radio(ssid=ssid_name, passkey=security_key,radio=radio_name, station_name=sta)
|
||||
if not sta_ip:
|
||||
print("test failed due to no station ip")
|
||||
return False, "Test Failed, due to no station ip"
|
||||
|
||||
elif batch_size == "3,6" or batch_size == "3,6,9":
|
||||
sta = []
|
||||
list_three_sta = []
|
||||
count = batch_size.split(',')
|
||||
n, j = 0, 0
|
||||
if len(count) == 2:
|
||||
n, j = 6, 2
|
||||
elif len(count) == 3:
|
||||
n, j = 9, 3
|
||||
print("number_of_stations:%s & iterations : %s" % (n, j))
|
||||
for i in range(n):
|
||||
list_three_sta.append(station_name + str(i))
|
||||
if (i != 0) and (((i + 1) % 3) == 0):
|
||||
sta.append(list_three_sta)
|
||||
list_three_sta = []
|
||||
print(sta)
|
||||
for i in range(j):
|
||||
radio_name = radio_prefix[i]
|
||||
print(radio_name)
|
||||
print(station_name)
|
||||
values = radio_name.split(".")
|
||||
shelf = int(values[0])
|
||||
resource = int(values[1])
|
||||
print(shelf, resource)
|
||||
data = {"shelf": shelf, "resource": resource, "radio": values[2], "antenna": antenna}
|
||||
self.json_post(_req_url="cli-json/set_wifi_radio", data=data)
|
||||
time.sleep(0.5)
|
||||
sta_ip = self.client_connect_using_radio(ssid=ssid_name,passkey=security_key,radio=radio_name,
|
||||
station_name=sta[i])
|
||||
if not sta_ip:
|
||||
print("test failed due to no station ip")
|
||||
return False, "Test Failed, due to no station ip"
|
||||
time.sleep(0.5)
|
||||
# attenuator setup for different db
|
||||
if set_att_db == "10db":
|
||||
for i in range(4):
|
||||
self.attenuator_modify(int(atten_sr1[2]), i, 100)
|
||||
time.sleep(0.5)
|
||||
elif set_att_db == "10db,38db" or "10db,25db":
|
||||
for i in range(4):
|
||||
self.attenuator_modify(int(atten_sr1[2]), i, 100)
|
||||
time.sleep(0.5)
|
||||
if "25db" in set_att_db:
|
||||
set_value = 250
|
||||
elif "38db" in set_att_db:
|
||||
set_value =380
|
||||
print(set_value)
|
||||
for i in range(2):
|
||||
self.attenuator_modify(int(atten_sr2[2]), i, set_value)
|
||||
time.sleep(0.5)
|
||||
elif set_att_db == "10db,38db,48db" or "10db,25db,35db":
|
||||
for i in range(4):
|
||||
self.attenuator_modify(int(atten_sr1[2]), i, 100)
|
||||
time.sleep(0.5)
|
||||
if "25db" and "35db" in set_att_db:
|
||||
set_value = 250
|
||||
set_value1 = 350
|
||||
elif "38db" and "48db" in set_att_db:
|
||||
set_value = 380
|
||||
set_value1 = 480
|
||||
print(set_value, set_value1)
|
||||
for i in range(4):
|
||||
self.attenuator_modify(int(atten_sr2[2]), i, set_value)
|
||||
time.sleep(0.5)
|
||||
if i >= 2:
|
||||
self.attenuator_modify(int(atten_sr2[2]), i, set_value1)
|
||||
time.sleep(0.5)
|
||||
# wifi_capacity test
|
||||
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan,
|
||||
download_rate=download_rate, batch_size=batch_size,
|
||||
upload_rate=upload_rate, protocol=protocol, duration=duration,
|
||||
sort="linear",create_vlan=create_vlan)
|
||||
report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
|
||||
self.attach_report_graphs(report_name=report_name)
|
||||
csv_val = self.read_csv_individual_station_throughput(dir_name=report_name, option=None,
|
||||
individual_station_throughput=False,
|
||||
kpi_csv=True,
|
||||
file_name="/kpi.csv", batch_size=batch_size)
|
||||
print(csv_val)
|
||||
# considering the 70% from the expected throughput
|
||||
pass_value = (expected_throughput * 0.7)
|
||||
print("pass value ", pass_value)
|
||||
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
|
||||
if not csv_val:
|
||||
print("csv file does not exist, Test failed")
|
||||
allure.attach(name="Csv Data", body="csv file does not exist, Test failed")
|
||||
return False, "CSV file does not exist, Test failed"
|
||||
else:
|
||||
if traffic_type == "udp_upload":
|
||||
type="Up"
|
||||
elif traffic_type == "udp_download":
|
||||
type="Down"
|
||||
print("Traffic type", type)
|
||||
if list(csv_val[type].values())[-1] >= pass_value:
|
||||
allure.attach(name="Csv Data", body="Throughput value : " + str(list(csv_val[type].values())[-1]))
|
||||
logging.info("Test passed successfully")
|
||||
return True, "TEST PASSED"
|
||||
else:
|
||||
allure.attach(name="Csv Data", body="Throughput value : " + str(list(csv_val[type].values())[-1]))
|
||||
logging.info("TEST FAILED, Actual throughput is lesser than Expected.")
|
||||
return False, "TEST FAILED, Actual throughput (%sMbps) is lesser than Expected (%sMbps)" % (str(list(csv_val[type].values())[-1]),str(pass_value))
|
||||
|
||||
def spacial_consistency(self, ssid_name=None, security_key=None, security="wpa2", mode="BRIDGE", band="twog",
|
||||
vlan=1, dut_data=None, num_sta=1, spatial_streams=1, instance_name="", pass_value=None,
|
||||
attenuations=None):
|
||||
logging.info("Cleanup existing clients and traffic")
|
||||
chamber_view_obj, dut_name = self.chamber_view()
|
||||
self.client_disconnect(clean_l3_traffic=True)
|
||||
# client connect
|
||||
station = self.client_connect(ssid=ssid_name, security=security, passkey=security_key, mode=mode,
|
||||
band=band, num_sta=num_sta, vlan_id=vlan, dut_data=dut_data)
|
||||
sta_name = list(station.keys())
|
||||
ser_no = self.attenuator_serial()
|
||||
print(ser_no)
|
||||
val = [['modes: Auto'], ['pkts: MTU'], ['directions: DUT Transmit'], ['traffic_types:UDP'],
|
||||
['bandw_options: AUTO'], ['spatial_streams: ' + str(spatial_streams)],
|
||||
['attenuator: ' + str(ser_no[0])],
|
||||
['attenuator2: ' + str(ser_no[1])],
|
||||
['attenuations: 100 380 480'], ['attenuations2: 100 380 480'], ['chamber: DUT-Chamber'],
|
||||
['tt_deg: 0..+60..300']]
|
||||
if station:
|
||||
# rvr test
|
||||
rvr_o, report_name = self.rate_vs_range_test(station_name=sta_name[0], mode=mode, download_rate="100%",
|
||||
instance_name=instance_name, duration="60000",
|
||||
vlan_id=vlan,
|
||||
dut_name=dut_name, raw_lines=val)
|
||||
entries = os.listdir("../reports/" + report_name + '/')
|
||||
print("entries", entries)
|
||||
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
|
||||
logging.info("Test Completed... Cleaning up Stations")
|
||||
kpi = "kpi.csv"
|
||||
pass_value = pass_value
|
||||
atn, deg = attenuations, [0, 60, 120, 180, 240, 300]
|
||||
if kpi in entries:
|
||||
kpi_val = self.read_kpi_file(column_name=["numeric-score"], dir_name=report_name)
|
||||
print("kpi_calue :", kpi_val)
|
||||
if str(kpi_val) == "empty":
|
||||
logging.info("TEST FAILED, Throughput value from kpi.csv is empty.")
|
||||
allure.attach(name="CSV Data", body="TEST FAILED, Throughput value from kpi.csv is empty.")
|
||||
return False, "TEST FAILED, Throughput value from kpi.csv is empty."
|
||||
else:
|
||||
allure.attach(name="CSV Data", body="Throughput value : " + str(kpi_val))
|
||||
start, thrpt_val, pass_fail = 0, {}, []
|
||||
for i in pass_value:
|
||||
count = 0
|
||||
for j in range(start, len(kpi_val), len(atn)):
|
||||
thrpt_val[f"{atn[start]}atn-{deg[count]}deg"] = kpi_val[j][0]
|
||||
if kpi_val[j][0] >= pass_value[i]:
|
||||
pass_fail.append("PASS")
|
||||
else:
|
||||
pass_fail.append("FAIL")
|
||||
count += 1
|
||||
# start += 6
|
||||
print(thrpt_val, "\n", pass_fail)
|
||||
if "FAIL" in pass_fail:
|
||||
logging.info("TEST FAILED, Actual throughput is lesser than Expected.")
|
||||
return False, "TEST FAILED, Actual throughput is lesser than Expected."
|
||||
else:
|
||||
logging.info("Test passed successfully")
|
||||
return True, "TEST PASSED"
|
||||
else:
|
||||
logging.info("csv file does not exist, TEST FAILED.")
|
||||
allure.attach(name="CSV Data", body="csv file does not exist")
|
||||
return False, "TEST FAILED, , CSV file does not exist"
|
||||
else:
|
||||
logging.info("TEST FAILED due to no station ip")
|
||||
return False, "TEST FAILED due to no station ip"
|
||||
|
||||
def rate_vs_range(self, ssid_name=None, security_key=None, security="wpa2", mode="BRIDGE", band="twog", vlan=1,
|
||||
dut_data=None, num_sta=1, spatial_streams=2, direction="DUT Transmit", instance_name="",
|
||||
pass_value=None, attenuations=None):
|
||||
logging.info("Cleanup existing clients and traffic")
|
||||
chamber_view_obj, dut_name = self.chamber_view()
|
||||
self.client_disconnect(clean_l3_traffic=True)
|
||||
# client connect
|
||||
station = self.client_connect(ssid=ssid_name, security=security, passkey=security_key, mode=mode, band=band,
|
||||
num_sta=num_sta, vlan_id=vlan, dut_data=dut_data)
|
||||
sta_name = list(station.keys())
|
||||
ser_no = self.attenuator_serial()
|
||||
print(ser_no)
|
||||
atn2 = ser_no[1].split(".")[2]
|
||||
print(f"antenuation-2 : {atn2}")
|
||||
val = [['modes: Auto'], ['pkts: MTU'], ['directions: ' + str(direction)], ['traffic_types:TCP'],
|
||||
['bandw_options: AUTO'], ['spatial_streams: 2'], ['attenuator: ' + str(ser_no[0])],
|
||||
['attenuator2: 0'], ['attenuations: 0 100 210..+30..630'], ['attenuations2: 0 100 210..+30..630'],
|
||||
['chamber: 0'], ['tt_deg: 0']]
|
||||
if station:
|
||||
# rvr test
|
||||
rvr_o, report_name = self.rate_vs_range_test(station_name=sta_name[0], mode=mode,
|
||||
download_rate="100%",
|
||||
duration='30000',
|
||||
instance_name=instance_name, vlan_id=vlan,
|
||||
dut_name=dut_name, raw_lines=val)
|
||||
entries = os.listdir("../reports/" + report_name + '/')
|
||||
print("entries", entries)
|
||||
print("Test Completed... Cleaning up Stations")
|
||||
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
|
||||
kpi = "kpi.csv"
|
||||
pass_value = pass_value
|
||||
atn = attenuations
|
||||
if kpi in entries:
|
||||
kpi_val = self.read_kpi_file(column_name=["numeric-score"], dir_name=report_name)
|
||||
print(kpi_val)
|
||||
if str(kpi_val) == "empty":
|
||||
logging.info("Throughput value from kpi.csv is empty, TEST FAILED, ")
|
||||
allure.attach(name="CSV Data", body="Throughput value from kpi.csv is empty, TEST FAILED, ")
|
||||
return False, "Throughput value from kpi.csv is empty, TEST FAILED, "
|
||||
else:
|
||||
allure.attach(name="CSV Data", body="Throughput value : " + str(kpi_val))
|
||||
start, thrpt_val, pass_fail = 0, {}, []
|
||||
for i in pass_value:
|
||||
# count = 0
|
||||
direction = "DUT-TX"
|
||||
for j in range(start, len(kpi_val), len(atn)):
|
||||
thrpt_val[f"{atn[start]}--{direction}"] = kpi_val[j][0]
|
||||
if kpi_val[j][0] >= pass_value[i]:
|
||||
pass_fail.append("PASS")
|
||||
else:
|
||||
pass_fail.append("FAIL")
|
||||
# count += 1
|
||||
direction = "DUT-RX"
|
||||
if len(atn) == 14:
|
||||
start += 6
|
||||
elif len(atn) == 17:
|
||||
start += 7
|
||||
print(pass_fail, "\nThroughput value-->", thrpt_val)
|
||||
if "FAIL" in pass_fail:
|
||||
logging.info("TEST FAILED, Actual throughput is lesser than Expected")
|
||||
return False, "TEST FAILED, Actual throughput is lesser than Expected"
|
||||
else:
|
||||
logging.info("TEST PASSED successfully")
|
||||
return True, "TEST PASSED"
|
||||
else:
|
||||
logging.info("csv file does not exist, TEST FAILED.")
|
||||
allure.attach(name="CSV Data", body="csv file does not exist")
|
||||
return False, "TEST FAILED, CSV file does not exist"
|
||||
else:
|
||||
logging.info("TEST FAILED, due to no station ip")
|
||||
return False, "TEST FAILED, due to no station ip"
|
||||
|
||||
if __name__ == '__main__':
|
||||
basic = {
|
||||
"target": "tip_2x",
|
||||
|
||||
Reference in New Issue
Block a user