From 83db4199264af71ac8960280b85ba9c59b40fa3f Mon Sep 17 00:00:00 2001 From: karthikaeyetea Date: Mon, 12 Dec 2022 22:15:42 +0530 Subject: [PATCH] Added air_time_fairness function Signed-off-by: karthikaeyetea --- lf_libs/lf_tests.py | 122 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 118 insertions(+), 4 deletions(-) diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index faffcdd0..01adf5c7 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -1353,14 +1353,14 @@ class lf_tests(lf_libs): thr1 = threading.Thread(target=thread_fun, args=(sta_list[8:16],)) thr1.start() wct_obj = self.wifi_capacity(instance_name=instance_name, mode=mode, vlan_id=vlan, - download_rate=downld_rate, + download_rate=downld_rate,add_stations=False, stations=sel_stations, raw_lines=val, batch_size="8", upload_rate=upld_rate, protocol="UDP-IPv4", duration="120000", create_stations=False, dut_data=dut_data, sort="interleave", ) report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + "/" - self.attach_report_graphs(report_name=report_name) + # self.attach_report_graphs(report_name=report_name) csv_val = self.read_csv_individual_station_throughput(dir_name=report_name, option=traffic_direction) logging.info(csv_val) pass_value = int(traffic_rate[0]) * 0.99 @@ -1372,9 +1372,13 @@ class lf_tests(lf_libs): return False, "csv file does not exist" else: pass_fail = [1 if i >= pass_value else 0 for i in csv_val.values()] - allure.attach.file( - source="../reports/" + report_name + "/csv-data/data-Combined_bps__60_second_running_average-1.csv", + try: + allure.attach.file(source="../reports/" + report_name + "/csv-data/data-Combined_bps__60_second_running_average-1.csv", name="Throughput CSV file", attachment_type=allure.attachment_type.CSV) + except FileNotFoundError as e: + allure.attach.file( + source="../reports/" + report_name + "/csv-data/data-Combined_Mbps__60_second_running_average-1.csv", + name="Throughput CSV file", attachment_type=allure.attachment_type.CSV) if pass_fail.count(0) == 0: return True, "Test passed" else: @@ -1553,6 +1557,116 @@ class lf_tests(lf_libs): logging.error(f"{e}") return False, f"{e}" + def air_time_fairness(self, ssid="[BLANK]", passkey='[BLANK]', security="wpa2", mode="BRIDGE", band='twog', + vlan_id=100, atn=100, pass_value=None, dut_data={}): + try: + allure.attach(name="Definition", + body="Airtime Fairness test intends to verify the capacity of Wi-Fi device to ensure the fairness of " \ + "airtime usage.") + allure.attach(name="Procedure", + body="This test case definition states that Create 2 stations of greenfeild mode and 1 station of legacy mode" + " on 2.4/5Ghz radio. Run TCP download for station_1 as throughpt_1, station_2 as throughpt_2, " + "station_2 with attenuation as throughpt_3, station_3 as throughpt_4, UDP download for station_1 + station_2" + "of data_rates 40% of throughput_1 and 40% of throughput_2 as throughput_5, station_1 + station_2 with attenuation" + "of data_rates 40% of throughput_1 and 40% of throughput_3 as throughput_6, station_1 + station_3" + "of data_rates 40% of throughput_1 and 40% of throughput_4 as throughput_7") + self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True) + sta = list(map(lambda i: f"sta000{i}",range(3))) + radios, sta_mode = (self.wave2_5g_radios, [1,9]) if band == "fiveg" else (self.wave2_2g_radios, [2,11]) + thrpt = {"sta0_tcp_dl": None, "sta1_tcp_dl": None, "sta1_tcp_dl_atn": None, "sta2_tcp_dl": None, + "sta0+1_udp": None, "sta0+1_udp_atn": None, "sta0+2": None} + no_of_iter = list(thrpt.keys()) + + atten_serial = self.attenuator_serial_radio(ssid=ssid, passkey=passkey, security=security, radio=radios[1], + station_name=[sta[0]]) + atten_serial_split = atten_serial[0].split(".") + self.attenuator_modify("all", 'all', 100) + for i in range(len(radios)): + if i == 2: + # mode = 2/1 will create legacy client + create_sta = self.client_connect_using_radio(ssid=ssid, passkey=passkey, security=security, + radio=radios[i], station_name=[sta[i]], sta_mode=sta_mode[0]) + else: + # mode = 11/9 will create bgn-AC/an-AC client + create_sta = self.client_connect_using_radio(ssid=ssid, passkey=passkey, security=security, + radio=radios[i], station_name=[sta[i]], sta_mode=sta_mode[1]) + if create_sta == False: + logging.info(f"Test failed due to no IP for {sta[i]}") + assert False, f"Test failed due to no IP for {sta[i]}" + else: + lf_sta = list(create_sta.station_map().keys()) + + def wifi_cap(sta=None, down=None, up=0, proto=None, thrpt_key=None, wifi_cap=False, atn=None, l3_trf=False): + if atn: + for i in range(2): + self.attenuator_modify(int(atten_serial_split[2]), i, int(atn)) + time.sleep(0.5) + if wifi_cap: + wct_obj = self.wifi_capacity(mode=mode, add_stations=False, vlan_id=vlan_id, download_rate=down, + batch_size="1", stations=f"{sta}", create_stations=False, + upload_rate=up, protocol=proto, duration="60000", sort="linear", + dut_data=dut_data) + report_name = wct_obj[0].report_name[0]['LAST']["response"].split(":::")[1].split("/")[-1] + # self.attach_report_graphs(report_name=report_name) + entries = os.listdir("../reports/" + report_name + '/') + if "kpi.csv" in entries: + thrpt[thrpt_key] = self.read_kpi_file(column_name=["numeric-score"], dir_name=report_name)[0][0] + if l3_trf: + self.client_disconnect(clean_l3_traffic=True) + self.create_layer3(sta_list=sta[0:1], traffic_type=proto, side_a_min_rate=0, + side_b_min_rate=int(down[0]), start_cx=False) + self.create_layer3(sta_list=sta[1:2], traffic_type=proto, side_a_min_rate=0, + side_b_min_rate=int(down[1]), start_cx=False) + created_cx = {} + cx_list = [created_cx.setdefault(i, "Endpoints") for i in self.get_cx_list() if i not in created_cx] + self.start_cx_list(created_cx=created_cx, check_run_status=True) + thrpt[thrpt_key] = self.monitor(duration_sec=int(60) + 10, monitor_interval=1, created_cx=created_cx.keys(), + col_names=['bps rx a', 'bps rx b'], iterations=0, side_a_min_rate=0, + side_b_min_rate=down)[0] + + # station_0 TCP down throughtput + wifi_cap(down="1Gbps", sta=f"{lf_sta[0]}", up="0Gbps", proto="TCP-IPv4", thrpt_key=f"{no_of_iter[0]}", + wifi_cap=True) + # station_1 TCP down throughtput + wifi_cap(down="1Gbps", sta=f"{lf_sta[1]}", up="0Gbps", proto="TCP-IPv4", thrpt_key=f"{no_of_iter[1]}", + wifi_cap=True) + # station_1 with medium distance TCP down throughtput + wifi_cap(down="1Gbps", sta=f"{lf_sta[1]}", up="0Gbps", proto="TCP-IPv4", thrpt_key=f"{no_of_iter[2]}", + wifi_cap=True, atn=atn) + # station_2 TCP down throughtput + wifi_cap(down="1Gbps", sta=f"{lf_sta[2]}", up="0Gbps", proto="TCP-IPv4", thrpt_key=f"{no_of_iter[3]}", + wifi_cap=True, atn=100) + # UDP traffic for station_0 of data-rate 40% of sta0_data_rate and station_1 of data-rate 40% of sta1_data_rate + wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta1_tcp_dl"] * 0.01) * 4E7], sta=sta[0:2], + up="0Gbps", thrpt_key=f"{no_of_iter[4]}", l3_trf=True, proto="lf_udp") + # UDP traffic for station_0 of data-rate 40% of sta0_data_rate and medium distance station_1 of data-rate 40% of sta1_data_rate + wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta1_tcp_dl_atn"] * 0.01) * 4E7], sta=sta[0:2], + up="0Gbps", thrpt_key=f"{no_of_iter[5]}", l3_trf=True, atn=atn, proto="lf_udp") + # UDP traffic for station_0 of data-rate 40% of sta0_data_rate and station_2 of data-rate 40% of sta2_data_rate + wifi_cap(down=[(thrpt["sta0_tcp_dl"] * 0.01) * 4E7, (thrpt["sta2_tcp_dl"] * 0.01) * 4E7], sta=sta[0:3:2], + up="0Gbps", thrpt_key=f"{no_of_iter[6]}", l3_trf=True, atn=100, proto="lf_udp") + logging.info("Throughput values: \n", thrpt) + self.allure_report_table_format(dict_data=thrpt, key="Station combination", value="Throughput values", + name="Test_results") + if pass_value: + if sum(thrpt["sta0+1_udp"]) >= pass_value[0] and sum(thrpt["sta0+1_udp_atn"]) >= pass_value[1] and \ + sum(thrpt["sta0+2"]) >= pass_value[2]: + return True, "Test Passed" + else: + return False, "Failed due to Lesser value" + else: + return True, "Test Passed without pass-fail verification" + except Exception as e: + logging.error(f"{e}") + return False, f"{e}" + finally: + try: + self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True) + except Exception as e: + logging.error(f"{e}") + return False, f"{e}" + + if __name__ == '__main__': basic = { "target": "tip_2x",