diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index 536a3ccc..f0bda45a 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -456,7 +456,8 @@ class lf_libs: r_val.pop(dut) if mode == "NAT-LAN": ret = self.get_lan_upstream_ports() - for dut in r_val: + temp = r_val.copy() + for dut in temp: if ret.keys().__contains__(dut) and ret[dut] is not None: upstream_data = (ret[dut]).split(".") r_val[dut]["upstream_port"] = ret[dut] @@ -473,6 +474,7 @@ class lf_libs: logging.error("VLAN ID is Unspecified in the VLAN Case") pytest.skip("VLAN ID is Unspecified in the VLAN Case") else: + self.add_vlan(vlan_ids=[vlan_id]) ret = self.get_wan_upstream_ports() for dut in r_val: if ret.keys().__contains__(dut) and ret[dut] is not None: @@ -656,9 +658,8 @@ class lf_libs: r_val[dut["identifier"]]["passkey"] = dut["ssid"]["2g-password"] r_val[dut["identifier"]]["encryption"] = dut["ssid"]["2g-encryption"] r_val[dut["identifier"]]["bssid"] = dut["ssid"]["2g-bssid"] - if dut["ssid"]["2g-encryption"] == "OPEN": + if str(dut["ssid"]["2g-encryption"]).upper() == "OPEN": ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] + - ' password=' + dut["ssid"]["2g-password"] + ' bssid=' + dut["ssid"]["2g-bssid"]]) else: ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] + @@ -671,9 +672,8 @@ class lf_libs: r_val[dut["identifier"]]["passkey"] = dut["ssid"]["5g-password"] r_val[dut["identifier"]]["encryption"] = dut["ssid"]["5g-encryption"] r_val[dut["identifier"]]["bssid"] = dut["ssid"]["5g-bssid"] - if dut["ssid"]["5g-encryption"] == "OPEN": + if str(dut["ssid"]["5g-encryption"]).upper() == "OPEN": ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["5g-ssid"] + - ' password=' + dut["ssid"]["5g-password"] + ' bssid=' + dut["ssid"]["5g-bssid"]]) else: ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["5g-ssid"] + @@ -686,9 +686,8 @@ class lf_libs: r_val[dut["identifier"]]["passkey"] = dut["ssid"]["6g-password"] r_val[dut["identifier"]]["encryption"] = dut["ssid"]["6g-encryption"] r_val[dut["identifier"]]["bssid"] = dut["ssid"]["6g-bssid"] - if dut["ssid"]["6g-encryption"] == "OPEN": + if str(dut["ssid"]["6g-encryption"]).upper() == "OPEN": ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["6g-ssid"] + - ' password=' + dut["ssid"]["6g-password"] + ' bssid=' + dut["ssid"]["6g-bssid"]]) else: ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["6g-ssid"] + @@ -704,9 +703,8 @@ class lf_libs: r_val[dut["identifier"]]["passkey"] = passkey r_val[dut["identifier"]]["encryption"] = encryption r_val[dut["identifier"]]["bssid"] = bssid - if encryption == "OPEN": + if str(encryption).upper() == "OPEN": ssid_data.append(['ssid_idx=0 ssid=' + ssid + - ' password=' + passkey + ' bssid=' + str(bssid).upper()]) else: ssid_data.append(['ssid_idx=0 ssid=' + ssid + @@ -716,7 +714,6 @@ class lf_libs: if str(encryption).upper() in ["OPEN", "WPA", "WPA2", "WPA3", "WEP"]: self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) - print(r_val) return r_val def update_duts(self, identifier=0, ssid_data=[]): diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index dc4f4ae8..a888fa21 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -47,18 +47,19 @@ class lf_tests(lf_libs): """ def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None): - super().__init__(lf_data, dut_data, run_lf, log_level) + super().__init__(lf_data, dut_data, run_lf, log_level) - def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", dut_data={}, + def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", bssid="[BLANK]", dut_data={}, security="open", extra_securities=[], sta_mode=0, num_sta=1, mode="BRIDGE", vlan_id=[None], band="twog", allure_attach=True, runtime_secs=40): - data = self.setup_interfaces(ssid=ssid, bssid=passkey, passkey=passkey, encryption=security, + data = self.setup_interfaces(ssid=ssid, bssid=bssid, passkey=passkey, encryption=security, band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta) - self.add_vlan(vlan_ids=vlan_id) logging.info("Setup interface data" + str(data)) + if data == {}: + pytest.skip("Skipping This Test") sta_connect_obj = [] for dut in data: for radio in data[dut]["station_data"]: @@ -87,7 +88,6 @@ class lf_tests(lf_libs): obj_sta_connect.side_a_pdu = 1200 obj_sta_connect.side_b_pdu = 1500 - # changed to auto channel self.set_radio_channel(radio=radio, channel="AUTO") logging.info("scan ssid radio: " + str(radio.split(".")[2])) @@ -350,7 +350,7 @@ class lf_tests(lf_libs): logging.info("ALL Stations got IP's") return station_data_all - def add_stations(self, band="2G", num_stations="max", ssid_name=[], idx=0): + def add_stations(self, band="2G", num_stations=9, ssid_name=[], idx=0): dut_name = [] for index in range(0, len(self.dut_data)): @@ -358,180 +358,377 @@ class lf_tests(lf_libs): if num_stations == 0: logging.warning("0 Stations") return + r_val = dict() + for dut in self.dut_data: + r_val[dut["identifier"]] = None + idx = None + if self.run_lf: + # idx = idx + # if self.run_lf or self.cc_1: + # if band == "2G": + # idx = 0 + # if band == "5G": + # idx = 1 + # if band == "6g": + # idx = 2 + for dut in self.dut_data: + ssid_data = [] + if r_val.keys().__contains__(dut["identifier"]): + print("hi", dut["identifier"]) + if dut.keys().__contains__("ssid"): + print("Hii") + if band == "2G": + print("hiiiiii") + if str(dut["ssid"]["2g-encryption"]).upper() == "OPEN": + ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] + + ' bssid=' + dut["ssid"]["2g-bssid"]]) + print(ssid_data) + else: + ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] + + ' security=' + str(dut["ssid"]["2g-encryption"]).upper() + + ' password=' + dut["ssid"]["2g-password"] + + ' bssid=' + dut["ssid"]["2g-bssid"]]) + print("Bye") + print(ssid_data) + self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) + if band == "5G": + if str(dut["ssid"]["5g-encryption"]).upper() == "OPEN": + ssid_data.append(['ssid_idx=1 ssid=' + dut["ssid"]["5g-ssid"] + + ' bssid=' + dut["ssid"]["5g-bssid"]]) + else: + ssid_data.append(['ssid_idx=1 ssid=' + dut["ssid"]["5g-ssid"] + + ' security=' + str(dut["ssid"]["5g-encryption"]).upper() + + ' password=' + dut["ssid"]["5g-password"] + + ' bssid=' + dut["ssid"]["5g-bssid"]]) + self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) + if band == "6G": + if str(dut["ssid"]["6g-encryption"]).upper() == "OPEN": + ssid_data.append(['ssid_idx=2 ssid=' + dut["ssid"]["6g-ssid"] + + ' bssid=' + dut["ssid"]["6g-bssid"]]) + else: + ssid_data.append(['ssid_idx=2 ssid=' + dut["ssid"]["6g-ssid"] + + ' security=' + str(dut["ssid"]["6g-encryption"]).upper() + + ' password=' + dut["ssid"]["6g-password"] + + ' bssid=' + dut["ssid"]["6g-bssid"]]) + print(ssid_data) + self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) + else: + for dut in self.dut_data: + ssid_data = [] + if r_val.keys().__contains__(dut["identifier"]): + for idx_ in self.dut_data[dut]["ssid_data"]: + + if str(self.dut_data[dut]["ssid_data"][idx_]["encryption"]).upper() == "OPEN": + ssid_data.append(['ssid_idx=' + str(idx_) + ' ssid=' + self.dut_data[dut]["ssid_data"][idx_]["ssid"] + + + ' bssid=' + str(self.dut_data[dut]["ssid_data"][idx_]["bssid"]).upper()]) + else: + ssid_data.append(['ssid_idx=' + str(idx_) + ' ssid=' + self.dut_data[dut]["ssid_data"][idx_]["ssid"] + + ' security=' + str(self.dut_data[dut]["ssid_data"][idx_]["encryption"]).upper() + + ' password=' + self.dut_data[dut]["ssid_data"][idx_]["password"] + + ' bssid=' + str(self.dut_data[dut]["ssid_data"][idx_]["bssid"]).upper()]) + + if str(self.dut_data[dut]["ssid_data"][idx_]["encryption"]).upper() in ["OPEN", "WPA", "WPA2", "WPA3", "WEP"]: + self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) + + dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios, + "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, + "ax200_radios": self.ax200_radios, + "ax210_radios": self.ax210_radios} + + dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios, + "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, + "ax200_radios": self.ax200_radios, + "ax210_radios": self.ax210_radios} + + dict_all_radios_6g = {"ax210_radios": self.ax210_radios} + + max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19, + "ax200_radios": 1, "ax210_radios": 1} + radio_data = {} + sniff_radio = "" for dut in dut_name: - logging.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) + - " DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx)) - idx = idx if self.run_lf or self.cc_1: if band == "2G": idx = 0 if band == "5G": idx = 1 - - for i in self.dut_idx_mapping: - if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band: - idx = i + if band == "6g": + idx = 2 + else: + for idx_ in self.dut_data[dut]["ssid_data"]: + if band == self.dut_data[dut]["ssid_data"][idx_]["band"] and ssid_data == \ + self.dut_data[dut]["ssid_data"][idx_]["ssid"]: + idx = idx_ + print("*********idx***********", idx) if band == "2G": - all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + \ - self.ax210_radios - print("all_2g_rdio", all_radio_2g) - if num_stations != "max": - logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) - total_sta = num_stations - max_possible = 0 - for radio in all_radio_2g: - max_possible = max_possible + int(self.get_max_sta(radio)) - if total_sta <= max_possible: - per_radio_sta = int(total_sta / len(all_radio_2g)) - rem = total_sta % len(all_radio_2g) - else: - total_sta = max_possible - per_radio_sta = int(total_sta / len(all_radio_2g)) - rem = total_sta % len(all_radio_2g) - if rem != 0 and per_radio_sta == 0: - per_radio_sta = rem / len(all_radio_2g) - logging.info("Total stations per radio: " + str(per_radio_sta)) - for radio in all_radio_2g: - max_possible = int(self.get_max_sta(radio)) - if total_sta == 0: - return - num_stations = per_radio_sta - if rem == 0 and num_stations == 0: - return - if max_possible - num_stations >= rem: - num_stations = num_stations + rem - rem = 0 - elif max_possible - rem >= num_stations: - num_stations = num_stations + rem - rem = 0 - elif total_sta <= max_possible: - num_stations = total_sta - if per_radio_sta < 1: - num_stations = 1 - total_sta = total_sta - num_stations - logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.temp_raw_lines.append(station_data) - logging.debug("Raw Line : " + str(station_data)) - - if num_stations == "max": - logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) - for radio in all_radio_2g: - num_stations = self.get_max_sta(radio) - logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.temp_raw_lines.append(station_data) - logging.debug("Raw Line : " + str(station_data)) - + if int(num_stations) > int(self.max_2g_stations): + logging.error("Can't create %s stations on lanforge" % num_stations) + pytest.skip("Can't create %s stations on lanforge" % num_stations) + # checking atleast one 2g radio is available or not + elif len(self.wave2_2g_radios) == 0 and len(self.wave1_radios) and len(self.ax210_radios) == 0 and len( + self.ax200_radios) == 0 and len(self.mtk_radios) == 0: + logging.error("Twog radio is not available") + pytest.skip("Twog radio is not available") + stations = num_stations + for j in dict_all_radios_2g: + max_station = max_station_per_radio[j] + if stations > 0: + if len(dict_all_radios_2g[j]) > 0: + diff = max_station - stations + for i in dict_all_radios_2g[j]: + if diff >= 0: + radio_data[i] = stations + stations = 0 + break + elif diff < 0: + radio_data[i] = max_station + stations = stations - max_station + diff = max_station - stations + sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) if band == "5G": - all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios - if num_stations != "max": - logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g))) - total_sta = num_stations - max_possible = 0 - for radio in all_radio_5g: - max_possible = max_possible + int(self.get_max_sta(radio)) - if total_sta <= max_possible: - per_radio_sta = int(total_sta / len(all_radio_5g)) - rem = total_sta % len(all_radio_5g) - else: - total_sta = max_possible - per_radio_sta = int(total_sta / len(all_radio_5g)) - rem = total_sta % len(all_radio_5g) - if rem != 0 and per_radio_sta == 0: - per_radio_sta = rem / len(all_radio_5g) - logging.info("Total stations per radio: " + str(per_radio_sta)) - for radio in all_radio_5g: - max_possible = int(self.get_max_sta(radio)) - if total_sta == 0: - return - num_stations = per_radio_sta - if rem == 0 and num_stations == 0: - return - if max_possible - num_stations >= rem: - num_stations = num_stations + rem - rem = 0 - elif max_possible - rem >= num_stations: - num_stations = num_stations + rem - rem = 0 - elif total_sta <= max_possible: - num_stations = total_sta - if per_radio_sta < 1: - num_stations = 1 - total_sta = total_sta - num_stations - logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.temp_raw_lines.append(station_data) - logging.debug("Raw Line : " + str(station_data)) + # checking station compitality of lanforge + if int(num_stations) > int(self.max_5g_stations): + logging.error("Can't create %s stations on lanforge" % num_stations) + pytest.skip("Can't create %s stations on lanforge" % num_stations) + # checking atleast one 5g radio is available or not + elif len(self.wave2_5g_radios) == 0 and len(self.wave1_radios) and len(self.ax210_radios) == 0 and len( + self.ax200_radios) == 0 and len(self.mtk_radios) == 0: + logging.error("fiveg radio is not available") + pytest.skip("fiveg radio is not available") - if num_stations == "max": - logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g))) - for radio in all_radio_5g: - num_stations = self.get_max_sta(radio) - logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.temp_raw_lines.append(station_data) - logging.debug("Raw Line : " + str(station_data)) - if band == "6g": - all_radio_6g = self.ax210_radios - if num_stations != "max": - logging.info("Total 6G Radios Available in Testbed: " + str(len(all_radio_6g))) - total_sta = num_stations - max_possible = 0 - for radio in all_radio_6g: - max_possible = max_possible + int(self.get_max_sta(radio)) - if total_sta <= max_possible: - per_radio_sta = int(total_sta / len(all_radio_6g)) - rem = total_sta % len(all_radio_6g) - else: - total_sta = max_possible - per_radio_sta = int(total_sta / len(all_radio_6g)) - rem = total_sta % len(all_radio_6g) - if rem != 0 and per_radio_sta == 0: - per_radio_sta = rem / len(all_radio_6g) - logging.info("Total stations per radio: " + str(per_radio_sta)) - for radio in all_radio_6g: - max_possible = int(self.get_max_sta(radio)) - if total_sta == 0: - return - num_stations = per_radio_sta - if rem == 0 and num_stations == 0: - return - if max_possible - num_stations >= rem: - num_stations = num_stations + rem - rem = 0 - elif max_possible - rem >= num_stations: - num_stations = num_stations + rem - rem = 0 - elif total_sta <= max_possible: - num_stations = total_sta - if per_radio_sta < 1: - num_stations = 1 - total_sta = total_sta - num_stations - logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.temp_raw_lines.append(station_data) - logging.debug("Raw Line : " + str(station_data)) - if num_stations == "max": - logging.info("Total AX Radios Available in Testbed: " + str(len(all_radio_6g))) - for radio in all_radio_6g: - num_stations = self.get_max_sta(radio) - logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.temp_raw_lines.append(station_data) - logging.debug("Raw Line : " + str(station_data)) + # radio and station selection + stations = num_stations + for j in dict_all_radios_5g: + max_station = max_station_per_radio[j] + if stations > 0: + if len(dict_all_radios_5g[j]) > 0: + diff = max_station - stations + for i in dict_all_radios_5g[j]: + if diff >= 0: + radio_data[i] = stations + stations = 0 + break + elif diff < 0: + radio_data[i] = max_station + stations = stations - max_station + diff = max_station - stations + # setup sniffer + sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) + if band == "6G": + # checking station compitality of lanforge + if int(num_stations) > int(self.max_6g_stations): + logging.error("Can't create %s stations on lanforge" % num_stations) + pytest.skip("Can't create %s stations on lanforge" % num_stations) + # checking atleast one 6g radio is available or not + elif len(self.ax210_radios) == 0: + logging.error("sixg radio is not available") + pytest.skip("sixg radio is not available") + + # radio and station selection + stations = num_stations + for j in dict_all_radios_6g: + max_station = max_station_per_radio[j] + if stations > 0: + if len(dict_all_radios_6g[j]) > 0: + diff = max_station - stations + for i in dict_all_radios_6g[j]: + if diff >= 0: + radio_data[i] = stations + stations = 0 + break + elif diff < 0: + radio_data[i] = max_station + stations = stations - max_station + diff = max_station - stations + + sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) + print(radio_data) + for radio in radio_data: + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(radio_data[radio]) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.temp_raw_lines.append(station_data) + print(self.temp_raw_lines) + + + + # for dut in dut_name: + # logging.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) + + # " DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx)) + # idx = idx + # if self.run_lf or self.cc_1: + # if band == "2G": + # idx = 0 + # if band == "5G": + # idx = 1 + # if band == "6g": + # idx = 2 + # + # for i in self.dut_idx_mapping: + # if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band: + # idx = i + # if band == "2G": + # all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + \ + # self.ax210_radios + # print("all_2g_rdio", all_radio_2g) + # if num_stations != "max": + # logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) + # total_sta = num_stations + # max_possible = 0 + # for radio in all_radio_2g: + # max_possible = max_possible + int(self.get_max_sta(radio)) + # if total_sta <= max_possible: + # per_radio_sta = int(total_sta / len(all_radio_2g)) + # rem = total_sta % len(all_radio_2g) + # else: + # total_sta = max_possible + # per_radio_sta = int(total_sta / len(all_radio_2g)) + # rem = total_sta % len(all_radio_2g) + # if rem != 0 and per_radio_sta == 0: + # per_radio_sta = rem / len(all_radio_2g) + # logging.info("Total stations per radio: " + str(per_radio_sta)) + # for radio in all_radio_2g: + # max_possible = int(self.get_max_sta(radio)) + # if total_sta == 0: + # return + # num_stations = per_radio_sta + # if rem == 0 and num_stations == 0: + # return + # if max_possible - num_stations >= rem: + # num_stations = num_stations + rem + # rem = 0 + # elif max_possible - rem >= num_stations: + # num_stations = num_stations + rem + # rem = 0 + # elif total_sta <= max_possible: + # num_stations = total_sta + # if per_radio_sta < 1: + # num_stations = 1 + # total_sta = total_sta - num_stations + # logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) + # station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + # " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + # str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + # self.temp_raw_lines.append(station_data) + # logging.debug("Raw Line : " + str(station_data)) + # + # if num_stations == "max": + # logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) + # for radio in all_radio_2g: + # num_stations = self.get_max_sta(radio) + # logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) + # station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + # " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + # str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + # self.temp_raw_lines.append(station_data) + # logging.debug("Raw Line : " + str(station_data)) + # + # if band == "5G": + # all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios + # if num_stations != "max": + # logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g))) + # total_sta = num_stations + # max_possible = 0 + # for radio in all_radio_5g: + # max_possible = max_possible + int(self.get_max_sta(radio)) + # if total_sta <= max_possible: + # per_radio_sta = int(total_sta / len(all_radio_5g)) + # rem = total_sta % len(all_radio_5g) + # else: + # total_sta = max_possible + # per_radio_sta = int(total_sta / len(all_radio_5g)) + # rem = total_sta % len(all_radio_5g) + # if rem != 0 and per_radio_sta == 0: + # per_radio_sta = rem / len(all_radio_5g) + # logging.info("Total stations per radio: " + str(per_radio_sta)) + # for radio in all_radio_5g: + # max_possible = int(self.get_max_sta(radio)) + # if total_sta == 0: + # return + # num_stations = per_radio_sta + # if rem == 0 and num_stations == 0: + # return + # if max_possible - num_stations >= rem: + # num_stations = num_stations + rem + # rem = 0 + # elif max_possible - rem >= num_stations: + # num_stations = num_stations + rem + # rem = 0 + # elif total_sta <= max_possible: + # num_stations = total_sta + # if per_radio_sta < 1: + # num_stations = 1 + # total_sta = total_sta - num_stations + # logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) + # station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + # " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + # str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + # self.temp_raw_lines.append(station_data) + # logging.debug("Raw Line : " + str(station_data)) + # + # if num_stations == "max": + # logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g))) + # for radio in all_radio_5g: + # num_stations = self.get_max_sta(radio) + # logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) + # station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + # " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + # str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + # self.temp_raw_lines.append(station_data) + # logging.debug("Raw Line : " + str(station_data)) + # if band == "6g": + # all_radio_6g = self.ax210_radios + # if num_stations != "max": + # logging.info("Total 6G Radios Available in Testbed: " + str(len(all_radio_6g))) + # total_sta = num_stations + # max_possible = 0 + # for radio in all_radio_6g: + # max_possible = max_possible + int(self.get_max_sta(radio)) + # if total_sta <= max_possible: + # per_radio_sta = int(total_sta / len(all_radio_6g)) + # rem = total_sta % len(all_radio_6g) + # else: + # total_sta = max_possible + # per_radio_sta = int(total_sta / len(all_radio_6g)) + # rem = total_sta % len(all_radio_6g) + # if rem != 0 and per_radio_sta == 0: + # per_radio_sta = rem / len(all_radio_6g) + # logging.info("Total stations per radio: " + str(per_radio_sta)) + # for radio in all_radio_6g: + # max_possible = int(self.get_max_sta(radio)) + # if total_sta == 0: + # return + # num_stations = per_radio_sta + # if rem == 0 and num_stations == 0: + # return + # if max_possible - num_stations >= rem: + # num_stations = num_stations + rem + # rem = 0 + # elif max_possible - rem >= num_stations: + # num_stations = num_stations + rem + # rem = 0 + # elif total_sta <= max_possible: + # num_stations = total_sta + # if per_radio_sta < 1: + # num_stations = 1 + # total_sta = total_sta - num_stations + # logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) + # station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + # " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + # str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + # self.temp_raw_lines.append(station_data) + # logging.debug("Raw Line : " + str(station_data)) + # if num_stations == "max": + # logging.info("Total AX Radios Available in Testbed: " + str(len(all_radio_6g))) + # for radio in all_radio_6g: + # num_stations = self.get_max_sta(radio) + # logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) + # station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + # " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + # str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + # self.temp_raw_lines.append(station_data) + # logging.debug("Raw Line : " + str(station_data)) if __name__ == '__main__': basic_04 = { @@ -608,9 +805,9 @@ if __name__ == '__main__': obj = lf_tests(lf_data=dict(basic_04["traffic_generator"]), dut_data=list(basic_04["device_under_tests"]), log_level=logging.DEBUG, run_lf=True) - obj.setup_dut() - #obj.add_stations(band="2G", num_stations=6, ssid_name=["OpenWifi"]) - #obj.chamber_view(raw_lines="custom") + #obj.add_stations() + obj.add_stations(band="5G") + obj.chamber_view(raw_lines="custom") # A =obj.setup_interfaces(band="fiveg", vlan_id=100, mode="NAT-WAN", num_sta=1) # print(A) # obj.setup_relevent_profiles() @@ -622,13 +819,15 @@ if __name__ == '__main__': # obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600]) # obj.get_cx_data() # obj.chamber_view() - # dut = {'0000c1018812': [['OpenWifi', 'wpa2', 'OpenWifi', '2G', '6A:21:5F:DA:45:6F'], - # {'2G': [6, 40, 2437], '5G': None, '6G': None}]} + # dut = {'0000c1018812': [['ssid_open_2g_nat', 'open', 'something', '2G', '6A:21:5F:DA:45:6F'], + # {'2G': [6, 40, 2437], '5G': None, '6G': None}]} - # c = obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[], - # num_sta=1, mode="BRIDGE", dut_data=dut, - # band="twog") - # obj.start_sniffer(radio_channel=1, radio="wiphy7", test_name="sniff_radio", duration=30) + # passes, result = obj.client_connectivity_test(ssid="ssid_open_2g_nat", passkey="something", security="open", + # extra_securities=[], + # num_sta=1, mode="NAT-WAN", dut_data=dut, + # band="twog") + # print(passes == "PASS", result) + # # obj.start_sniffer(radio_channel=1, radio="wiphy7", test_name="sniff_radio", duration=30) # print("started") # time.sleep(30) # obj.stop_sniffer()