diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index d1541943..3ac6e97d 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -913,7 +913,7 @@ class lf_libs: time.sleep(5) self.pcap_obj.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration) - def stop_sniffer(self): + def stop_sniffer(self, sta_list=[]): self.pcap_obj.monitor.admin_down() time.sleep(2) self.pcap_obj.cleanup() @@ -923,7 +923,7 @@ class lf_libs: report_location="/home/lanforge/" + self.pcap_name, report_dir=".") allure.attach.file(source=self.pcap_name, - name="pcap_file", attachment_type=allure.attachment_type.PCAP) + name="pcap_file " + ", ".join(sta_list), attachment_type=allure.attachment_type.PCAP) logging.info("pcap file name: " + str(self.pcap_name)) except Exception as e: logging.error(e) @@ -1695,7 +1695,8 @@ class lf_libs: def client_connect_using_radio(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band=None, vlan_id=[None], radio=None, client_type=0, station_name=[], dut_data=None, - sniff_radio=False, create_vlan=True): + sniff_radio=False, create_vlan=True, attach_port_info=True, + attach_station_data=True): # pre cleanup # if pre_cleanup: # self.pre_cleanup() @@ -1723,10 +1724,11 @@ class lf_libs: client_connect.upstream_port = upstream_port client_connect.upstream_resource = 1 client_connect.radio = radio - # allure attach for port info - port_data = self.json_get(_req_url="port?fields=ip") - port_info = {key: value for d in port_data["interfaces"] for key, value in d.items()} - self.allure_report_table_format(dict_data=port_info, key="Port Names", value="ip", name="Port info") + if attach_port_info is True: + # allure attach for port info + port_data = self.json_get(_req_url="port?fields=ip") + port_info = {key: value for d in port_data["interfaces"] for key, value in d.items()} + self.allure_report_table_format(dict_data=port_info, key="Port Names", value="ip", name="Port info") if sniff_radio: for dut_ in self.dut_data: identifier = dut_["identifier"] @@ -1745,15 +1747,16 @@ class lf_libs: client_connect.build() logging.info("napping 10 sec") time.sleep(10) - # station data - sta_length = len(station_name) - for i in range(sta_length): - sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i])) - self.allure_report_table_format(dict_data=sta_data1["interface"], key="STATION DETAILS", - value="VALUE", name="%s info" % (station_name[i])) + if attach_station_data is True: + # station data + sta_length = len(station_name) + for i in range(sta_length): + sta_data1 = self.json_get(_req_url="port/1/1/%s" % (station_name[i])) + self.allure_report_table_format(dict_data=sta_data1["interface"], key="STATION DETAILS", + value="VALUE", name="%s info" % (station_name[i])) # to stop sniffer if radio is not None and sniffer_channel is not None: - self.stop_sniffer() + self.stop_sniffer(station_name) else: logging.info("missing identifier.") else: diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index 06f710c1..c91fccbf 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -2527,126 +2527,173 @@ class lf_tests(lf_libs): logging.info("Test Failed, due to station has no ip") return False, "TEST FAILED, due to station has no ip" - def client_isolation(self, ssid1=None, ssid2=None, passkey=None, security=None, mode="BRIDGE", band_2g=None, - band_5g=None, dut_data=None, num_sta=None, side_a_min_rate=None, side_a_max_rate=None, - side_b_min_rate=None, - side_b_max_rate=None, sniff_radio=True): - band = radio_name = station_name = radio_names = station_name1 = station_name2 = station_result = layer3_result = None - if band_5g is None and band_2g is not None: + def client_isolation(self, ssid1=None, ssid2=None, passkey=None, security=None, mode="BRIDGE", band_2g=False, + band_5g=False, dut_data=None, num_sta=None, side_a_min_rate=None, side_a_max_rate=None, + side_b_min_rate=None, side_b_max_rate=None, sniff_radio=True): + copy_num_sta = num_sta + + # selecting radio(s) based on the requested bands of the client(s) + dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios, "wave1_radios": self.wave1_radios, + "mtk_radios": self.mtk_radios, "ax200_radios": self.ax200_radios, + "ax210_radios": self.ax210_radios} + dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios, "wave1_radios": self.wave1_radios, + "mtk_radios": self.mtk_radios, "ax200_radios": self.ax200_radios, + "ax210_radios": self.ax210_radios} + max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19, + "ax200_radios": 1, "ax210_radios": 1} + radio_name_2g = [] + radio_name_5g = [] + if band_2g is True and band_5g is True: # a 2G and a 5G station + for type_of_radio in dict_all_radios_2g: + if len(dict_all_radios_2g[type_of_radio]) > 0: + radio_name_2g.append(dict_all_radios_2g[type_of_radio][0]) + max_station_per_radio[type_of_radio] -= 1 + break + for type_of_radio in dict_all_radios_5g: + if len(dict_all_radios_5g[type_of_radio]) > 0 and max_station_per_radio[type_of_radio] > 0: + radio_name_5g.append(dict_all_radios_5g[type_of_radio][0]) + break + + if len(radio_name_2g) == 0 or len(radio_name_5g) == 0: + logging.info("Looks like the langforge radios can't support creating a 2G and a 5G station, " + "simultaneously.") + pytest.skip("Looks like the langforge radios can't support creating a 2G and a 5G station, " + "simultaneously.") + + station_name_2g = "sta_2g" + station_name_5g = "sta_5g" + + band = ["twog", "fiveg"] + elif band_2g is True: # only 2g bands but num_sta can be 1 or 2 + if self.max_2g_stations < num_sta: + logging.info(f"Looks like the langforge radios can't support creating {num_sta} 2G stations.") + raise ValueError(f"Looks like the langforge radios can't support creating {num_sta} 2G stations.") band = "twog" - radio_name = self.wave2_2g_radios[0] - station_name = self.twog_prefix - elif band_2g is None and band_5g is not None: + enough_radios = False + for type_of_radio in dict_all_radios_2g: + if len(dict_all_radios_2g[type_of_radio]) > 0: + for i in range(len(dict_all_radios_2g[type_of_radio])): + radio_name_2g.append(dict_all_radios_2g[type_of_radio][i]) + if num_sta <= max_station_per_radio[type_of_radio]: + num_sta = 0 + enough_radios = True + break + else: + num_sta -= max_station_per_radio[type_of_radio] + if enough_radios: + break + station_name = "sta_2g" + elif band_5g is True: # only 5g bands but num_sta can be 1 or 2 + if self.max_5g_stations < num_sta: + logging.info(f"Looks like the langforge radios can't support creating {num_sta} 5G stations.") + raise ValueError(f"Looks like the langforge radios can't support creating {num_sta} 5G stations.") band = "fiveg" - radio_name = self.wave2_5g_radios[0] - station_name = self.fiveg_prefix - elif band_2g and band_5g is not None: - radio_name1 = self.wave2_2g_radios[0] - radio_name2 = self.wave2_5g_radios[0] - station_name1 = self.twog_prefix - station_name2 = self.fiveg_prefix - band = [band_2g, band_5g] - radio_names = [radio_name1, radio_name2] - else: - logging.info("band is not selected.") - allure.attach(name="Min Tx rate -A", body=f"{side_a_min_rate} bytes") - allure.attach(name="Min Tx rate -B", body=f"{side_b_min_rate} bytes") - # allure.attach(name="Max Tx rate -A", body=f"{side_a_max_rate} bytes") - # allure.attach(name="Max Tx rate -B", body=f"{side_b_max_rate} bytes") + enough_radios = False + for type_of_radio in dict_all_radios_5g: + if len(dict_all_radios_5g[type_of_radio]) > 0: + for i in range(len(dict_all_radios_5g[type_of_radio])): + radio_name_5g.append(dict_all_radios_5g[type_of_radio][i]) + if num_sta <= max_station_per_radio[type_of_radio]: + num_sta = 0 + enough_radios = True + break + else: + num_sta -= max_station_per_radio[type_of_radio] + if enough_radios: + break + station_name = "sta_5g" + + logging.info("Clearing any existing stations and Layer-3 traffics before starting the test...") + self.pre_cleanup() # clear any existing stations and traffic sta = [] - ssids = [ssid1, ssid2] - if num_sta > 1: - if band_2g and band_5g is not None: - for i in range(1): - sta.append(station_name1 + str(i)) - sta.append(station_name2 + str(i)) - for i in range(num_sta): - station_result = self.client_connect_using_radio(ssid=ssids[i], passkey=passkey, - security=security, mode=mode, - band=band[i], radio=radio_names[i], - station_name=[sta[i]], - dut_data=dut_data, - sniff_radio=True) - layer3_result = self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate, - side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate, - traffic_type="lf_udp", sta_list=[sta[0]], side_b=sta[1]) - else: + num_sta = copy_num_sta + sta_got_ip = [] + if num_sta > 1: # between 2 stations + if band_2g is True and band_5g is True: # a 2G and a 5G station + sta_got_ip.append(self.client_connect_using_radio(ssid=ssid1, passkey=passkey, security=security, + mode=mode, band="twog", radio=radio_name_2g[0], + station_name=[station_name_2g], dut_data=dut_data, + sniff_radio=sniff_radio, attach_port_info=False, + attach_station_data=False)) + sta_got_ip.append(self.client_connect_using_radio(ssid=ssid2, passkey=passkey, security=security, + mode=mode, band="fiveg", radio=radio_name_5g[0], + station_name=[station_name_5g], dut_data=dut_data, + sniff_radio=sniff_radio, attach_port_info=False, + attach_station_data=False)) + + self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate, + side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate, + traffic_type="lf_udp", sta_list=[station_name_2g], side_b=station_name_5g) + else: # else both are either 2G or 5G stations + ssids = [ssid1, ssid2] + radio_name = radio_name_2g + radio_name_5g + if len(radio_name) == 1: + radio_name.append(radio_name[0]) for i in range(2): - sta.append(station_name + str(i)) - station_result = self.client_connect_using_radio(ssid=ssids[i], passkey=passkey, band=band, - security=security, - mode=mode, radio=radio_name, station_name=[sta[i]], - dut_data=dut_data, sniff_radio=sniff_radio) - layer3_result = self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate, - side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate, - traffic_type="lf_udp", sta_list=sta, side_b=sta[1]) - elif num_sta == 1: - station_result = self.client_connect_using_radio(ssid=ssid1, passkey=passkey, band=band, security=security, - mode=mode, radio=radio_name, station_name=[station_name], - dut_data=dut_data, sniff_radio=sniff_radio) - layer3_result = self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate, - side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate, - traffic_type="lf_udp", sta_list=[station_name], side_b="") - logging.info("waiting for 20 seconds") - time.sleep(20) + sta.append(station_name + "_" + str(i + 1)) + sta_got_ip.append(self.client_connect_using_radio(ssid=ssids[i], passkey=passkey, band=band, + security=security, mode=mode, radio=radio_name[i], + station_name=[sta[i]], dut_data=dut_data, + sniff_radio=sniff_radio, attach_port_info=False, + attach_station_data=False)) + self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate, + side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate, + traffic_type="lf_udp", sta_list=[sta[0]], side_b=sta[1]) + elif num_sta == 1: # else between a 2G/5G station and uplink port + radio_name = radio_name_2g if band_2g is True else radio_name_5g + sta_got_ip.append(self.client_connect_using_radio(ssid=ssid1, passkey=passkey, band=band, security=security, + mode=mode, radio=radio_name[0], station_name=[station_name], + dut_data=dut_data, sniff_radio=sniff_radio, + attach_port_info=False, attach_station_data=False)) + self.create_layer3(side_a_min_rate=side_a_min_rate, side_a_max_rate=side_a_max_rate, + side_b_min_rate=side_b_min_rate, side_b_max_rate=side_b_max_rate, + traffic_type="lf_udp", sta_list=[station_name], side_b="") + + if False in sta_got_ip: + self.pre_cleanup() + logging.info("TEST FAILED, due to station has no ip") + return False, "TEST FAILED, due to station has no ip" + + logging.info("Running Traffic for 60 seconds...") + time.sleep(60) + + logging.info("Getting Layer-3 and Endpoints Data...") cx_list = self.get_cx_list() rx_data = self.json_get(_req_url=f"cx/{cx_list[0]}") - rx_drop_a = rx_data[f"{cx_list[0]}"]["rx drop % a"] - rx_drop_b = rx_data[f"{cx_list[0]}"]["rx drop % b"] - bps_rx_a = rx_data[f"{cx_list[0]}"]["bps rx a"] - bps_rx_b = rx_data[f"{cx_list[0]}"]["bps rx b"] - self.allure_report_table_format(dict_data=rx_data[f"{cx_list[0]}"], key="layer3 column names", value="Values", - name="Layer-3 Data") - self.client_disconnect(clear_all_sta=True, clean_l3_traffic=True) - if len(sta) == 0: - up = self.get_wan_upstream_ports() - upstream = list(up.values()) - upstream_port = upstream[0] - table_columns = [station_name, upstream_port] - else: - table_columns = [sta[0], sta[1]] - table_data = {"Station Name": table_columns, "bps rx a": [bps_rx_a, bps_rx_b], - "rx drop %": [rx_drop_a, rx_drop_b]} - table = tabulate(table_data, headers='keys', tablefmt='fancy_grid', showindex=True) - print(table) - if not station_result: - allure.attach(name="Test Result", body="TEST FAILED, due to station has no ip") - return False, "TEST FAILED, due to station has no ip" - else: - logging.info("Station creation passed. Successful.") - if layer3_result is None: - logging.info("Layer3 traffic ran.") - if ssid1 == ssid2: - if ((bps_rx_a == 0 and bps_rx_b == 0) and (rx_drop_a == 100 and rx_drop_b == 100)) or ( - (bps_rx_a != 0 and bps_rx_b != 0) and (rx_drop_a != 100 and rx_drop_b != 100)): - allure.attach(name="Test Result", body="TEST PASSED" + "\n\n" + str(table)) - return True, "TEST PASS" - else: - allure.attach(name="Test Result", - body="TEST FAILED, Stations should not ping each other, when isolation enabled or" - "rx-drop should not be 100% when isolation disabled in same ssid" + "\n\n" + str( - table)) - return False, "TEST FAILED, Stations should not ping each other, when isolation enabled or " \ - "rx-drop should not be 100% when isolation disabled in same ssid" - elif band_2g and band_5g is not None: - if (bps_rx_a == 0 and bps_rx_b != 0) or (bps_rx_a != 0 and bps_rx_b == 0): - allure.attach(name="Test Result", body="TEST PASSED" + "\n\n" + str(table)) - return True, "TEST PASS" - else: - allure.attach(name="Test Result", body="TEST FAILED " + "\n\n" + str(table)) - return False, "TEST FAILED, Traffic not ran properly" - else: - if rx_drop_a and rx_drop_b == 100 or 0: - allure.attach(name="Test Result", - body="TEST FAILED, Rx drop should not be 100% or 0%" + "\n\n" + str(table)) - return False, "TEST FAILED, Rx drop should not be 100% or 0%" - else: - allure.attach(name="Test Result", body="TEST PASSED" + "\n\n" + str(table)) - return True, "TEST PASS" - else: - logging.info("Layer3 not ran properly.") + rx_drop_a = rx_data[cx_list[0]]["rx drop % a"] + rx_drop_b = rx_data[cx_list[0]]["rx drop % b"] + + sta = [] + for u in self.json_get("/port/?fields=port+type,alias,ssid")['interfaces']: + if (list(u.values())[0]['port type'] in ['WIFI-STA'] + and list(u.values())[0]['ssid'] in [ssid1, ssid2]): + sta.append(list(u.keys())[0]) + + if len(sta) == 1: + sta.append(list(self.get_wan_upstream_ports().values())[0]) + + sta_rows = ["ssid", "ip", "mode", "channel", "signal", "mac", "parent dev"] + sta_dict = self.get_station_data(sta_name=sta, rows=sta_rows, allure_attach=False) + station_table_dict = {"station name": list(sta_dict.keys()), + "Min/Max Tx rate": [f"{side_a_min_rate} bytes", f"{side_b_min_rate} bytes"], + "rx drop %": [rx_drop_a, rx_drop_b]} + for col in sta_rows: + temp_list = [] + for port in sta: + temp_list.append(sta_dict[port][col]) + station_table_dict[col] = temp_list + + logging.info("Attaching to the allure report...") + self.attach_table_allure(data=station_table_dict, allure_name="Endpoints Data") + self.allure_report_table_format(dict_data=rx_data[cx_list[0]], key="Layer-3 Column", value="Value", + name="Layer-3 Data") + + logging.info("Traffic ran, Clearing stations and Layer-3 traffic...") + self.pre_cleanup() + + return True, {"drop_a": rx_drop_a, "drop_b": rx_drop_b} def ax_capacity_test(self, instance_name="", dut_data=None, mode="BRIDGE", download_rate="10Gbps", upload_rate="0Gbps", dut_mode="", protocol="UDP-IPv4", num_stations={}, vlan_id=None):