Added air_time_fairness function

Signed-off-by: karthikaeyetea <karthika.subramani@candelatech.com>
This commit is contained in:
karthikaeyetea
2022-12-12 22:15:42 +05:30
parent d9178a330f
commit 83db419926

View File

@@ -1353,14 +1353,14 @@ class lf_tests(lf_libs):
thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],))
thr1.start()
wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan,
download_rate=downld_rate,
download_rate=downld_rate,add_stations=False,
stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=upld_rate,
protocol="UDP-IPv4", duration="120000", create_stations=False,
dut_data=dut_data,
sort="interleave", )
report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/"
self.attach_report_graphs(report_name=report_name)
# self.attach_report_graphs(report_name=report_name)
csv_val = self.read_csv_individual_station_throughput(dir_name=report_name, option=traffic_direction)
logging.info(csv_val)
pass_value = int(traffic_rate[0]) * 0.99
@@ -1372,9 +1372,13 @@ class lf_tests(lf_libs):
return False, "csv file does not exist"
else:
pass_fail = [1 if i >= pass_value else 0 for i in csv_val.values()]
allure.attach.file(
source="../reports/" + report_name + "/csv-data/data-Combined_bps__60_second_running_average-1.csv",
try:
allure.attach.file(source="../reports/" + report_name + "/csv-data/data-Combined_bps__60_second_running_average-1.csv",
name="Throughput CSV file", attachment_type=allure.attachment_type.CSV)
except FileNotFoundError as e:
allure.attach.file(
source="../reports/" + report_name + "/csv-data/data-Combined_Mbps__60_second_running_average-1.csv",
name="Throughput CSV file", attachment_type=allure.attachment_type.CSV)
if pass_fail.count(0) == 0:
return True, "Test passed"
else:
@@ -1553,6 +1557,116 @@ class lf_tests(lf_libs):
logging.error(f"{e}")
return False, f"{e}"
def air_time_fairness(self, ssid="[BLANK]", passkey='[BLANK]', security="wpa2", mode="BRIDGE", band='twog',
vlan_id=100, atn=100, pass_value=None, dut_data={}):
try:
allure.attach(name="Definition",
body="Airtime Fairness test intends to verify the capacity of Wi-Fi device to ensure the fairness of " \
"airtime usage.")
allure.attach(name="Procedure",
body="This test case definition states that Create 2 stations of greenfeild mode and 1 station of legacy mode"
" on 2.4/5Ghz radio. Run TCP download for station_1 as throughpt_1, station_2 as throughpt_2, "
"station_2 with attenuation as throughpt_3, station_3 as throughpt_4, UDP download for station_1 + station_2"
"of data_rates 40% of throughput_1 and 40% of throughput_2 as throughput_5, station_1 + station_2 with attenuation"
"of data_rates 40% of throughput_1 and 40% of throughput_3 as throughput_6, station_1 + station_3"
"of data_rates 40% of throughput_1 and 40% of throughput_4 as throughput_7")
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
sta = list(map(lambda i: f"sta000{i}",range(3)))
radios, sta_mode = (self.wave2_5g_radios, [1,9]) if band == "fiveg" else (self.wave2_2g_radios, [2,11])
thrpt = {"sta0_tcp_dl": None, "sta1_tcp_dl": None, "sta1_tcp_dl_atn": None, "sta2_tcp_dl": None,
"sta0+1_udp": None, "sta0+1_udp_atn": None, "sta0+2": None}
no_of_iter = list(thrpt.keys())
atten_serial = self.attenuator_serial_radio(ssid=ssid, passkey=passkey, security=security, radio=radios[1],
station_name=[sta[0]])
atten_serial_split = atten_serial[0].split(".")
self.attenuator_modify("all", 'all', 100)
for i in range(len(radios)):
if i == 2:
# mode = 2/1 will create legacy client
create_sta = self.client_connect_using_radio(ssid=ssid, passkey=passkey, security=security,
radio=radios[i], station_name=[sta[i]], sta_mode=sta_mode[0])
else:
# mode = 11/9 will create bgn-AC/an-AC client
create_sta = self.client_connect_using_radio(ssid=ssid, passkey=passkey, security=security,
radio=radios[i], station_name=[sta[i]], sta_mode=sta_mode[1])
if create_sta == False:
logging.info(f"Test failed due to no IP for {sta[i]}")
assert False, f"Test failed due to no IP for {sta[i]}"
else:
lf_sta = list(create_sta.station_map().keys())
def wifi_cap(sta=None, down=None, up=0, proto=None, thrpt_key=None, wifi_cap=False, atn=None, l3_trf=False):
if atn:
for i in range(2):
self.attenuator_modify(int(atten_serial_split[2]), i, int(atn))
time.sleep(0.5)
if wifi_cap:
wct_obj = self.wifi_capacity(mode=mode, add_stations=False, vlan_id=vlan_id, download_rate=down,
batch_size="1", stations=f"{sta}", create_stations=False,
upload_rate=up, protocol=proto, duration="60000", sort="linear",
dut_data=dut_data)
report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1]
# self.attach_report_graphs(report_name=report_name)
entries = os.listdir("../reports/" + report_name + '/')
if "kpi.csv" in entries:
thrpt[thrpt_key] = self.read_kpi_file(column_name=["numeric-score"], dir_name=report_name)[0][0]
if l3_trf:
self.client_disconnect(clean_l3_traffic=True)
self.create_layer3(sta_list=sta[0:1], traffic_type=proto, side_a_min_rate=0,
side_b_min_rate=int(down[0]), start_cx=False)
self.create_layer3(sta_list=sta[1:2], traffic_type=proto, side_a_min_rate=0,
side_b_min_rate=int(down[1]), start_cx=False)
created_cx = {}
cx_list = [created_cx.setdefault(i, "Endpoints") for i in self.get_cx_list() if i not in created_cx]
self.start_cx_list(created_cx=created_cx, check_run_status=True)
thrpt[thrpt_key] = self.monitor(duration_sec=int(60) + 10, monitor_interval=1, created_cx=created_cx.keys(),
col_names=['bps rx a', 'bps rx b'], iterations=0, side_a_min_rate=0,
side_b_min_rate=down)[0]
# station_0 TCP down throughtput
wifi_cap(down="1Gbps", sta=f"{lf_sta[0]}", up="0Gbps", proto="TCP-IPv4", thrpt_key=f"{no_of_iter[0]}",
wifi_cap=True)
# station_1 TCP down throughtput
wifi_cap(down="1Gbps", sta=f"{lf_sta[1]}", up="0Gbps", proto="TCP-IPv4", thrpt_key=f"{no_of_iter[1]}",
wifi_cap=True)
# station_1 with medium distance TCP down throughtput
wifi_cap(down="1Gbps", sta=f"{lf_sta[1]}", up="0Gbps", proto="TCP-IPv4", thrpt_key=f"{no_of_iter[2]}",
wifi_cap=True, atn=atn)
# station_2 TCP down throughtput
wifi_cap(down="1Gbps", sta=f"{lf_sta[2]}", up="0Gbps", proto="TCP-IPv4", thrpt_key=f"{no_of_iter[3]}",
wifi_cap=True, atn=100)
# UDP traffic for station_0 of data-rate 40% of sta0_data_rate and station_1 of data-rate 40% of sta1_data_rate
wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta1_tcp_dl"] * 0.01) * 4E7], sta=sta[0:2],
up="0Gbps", thrpt_key=f"{no_of_iter[4]}", l3_trf=True, proto="lf_udp")
# UDP traffic for station_0 of data-rate 40% of sta0_data_rate and medium distance station_1 of data-rate 40% of sta1_data_rate
wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta1_tcp_dl_atn"] * 0.01) * 4E7], sta=sta[0:2],
up="0Gbps", thrpt_key=f"{no_of_iter[5]}", l3_trf=True, atn=atn, proto="lf_udp")
# UDP traffic for station_0 of data-rate 40% of sta0_data_rate and station_2 of data-rate 40% of sta2_data_rate
wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta2_tcp_dl"] * 0.01) * 4E7], sta=sta[0:3:2],
up="0Gbps", thrpt_key=f"{no_of_iter[6]}", l3_trf=True, atn=100, proto="lf_udp")
logging.info("Throughput values: \n", thrpt)
self.allure_report_table_format(dict_data=thrpt, key="Station combination", value="Throughput values",
name="Test_results")
if pass_value:
if sum(thrpt["sta0+1_udp"]) >= pass_value[0] and sum(thrpt["sta0+1_udp_atn"]) >= pass_value[1] and \
sum(thrpt["sta0+2"]) >= pass_value[2]:
return True, "Test Passed"
else:
return False, "Failed due to Lesser value"
else:
return True, "Test Passed without pass-fail verification"
except Exception as e:
logging.error(f"{e}")
return False, f"{e}"
finally:
try:
self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True)
except Exception as e:
logging.error(f"{e}")
return False, f"{e}"
if __name__ == '__main__':
basic = {
"target": "tip_2x",