diff --git a/lf_libs/__pycache__/lf_libs.cpython-39.pyc b/lf_libs/__pycache__/lf_libs.cpython-39.pyc deleted file mode 100644 index a16ce234..00000000 Binary files a/lf_libs/__pycache__/lf_libs.cpython-39.pyc and /dev/null differ diff --git a/lf_libs/lf_libs.py b/lf_libs/lf_libs.py index 25f58aee..1f67d8c8 100644 --- a/lf_libs/lf_libs.py +++ b/lf_libs/lf_libs.py @@ -37,6 +37,7 @@ StaScan = stascan.StaScan cv_test_reports = importlib.import_module("py-json.cv_test_reports") lf_report = cv_test_reports.lanforge_reports + class lf_libs: """ "traffic_generator": { @@ -55,6 +56,7 @@ class lf_libs: lanforge_data = dict() manager_ip = None testbed = None + run_lf = False manager_http_port = None manager_ssh_port = None manager_default_db = None @@ -147,10 +149,11 @@ class lf_libs: """ local_realm = None - def __init__(self, lf_data={}, dut_data=[], log_level=logging.DEBUG): + def __init__(self, lf_data={}, dut_data=[], run_lf=False, log_level=logging.DEBUG): logging.basicConfig(format='%(asctime)s - %(message)s', level=log_level) lf_data = dict(lf_data) self.dut_data = dut_data + self.run_lf = run_lf # try: self.lanforge_data = lf_data.get("details") self.testbed = lf_data.get("testbed") @@ -200,16 +203,18 @@ class lf_libs: """ def setup_dut(self): + self.dut_objects = [] for index in range(0, len(self.dut_data)): dut_obj = DUT(lfmgr=self.manager_ip, port=self.manager_http_port, - dut_name=self.testbed + "-" + str(index), + dut_name=self.dut_data[index]["identifier"], sw_version=self.dut_data[index]["firmware_version"], hw_version=self.dut_data[index]["mode"], model_num=self.dut_data[index]["model"], serial_num=self.dut_data[index]["identifier"]) dut_obj.setup() dut_obj.add_ssids() + self.dut_objects.append(dut_obj) logging.info("Creating DUT") def setup_metadata(self): @@ -404,40 +409,103 @@ class lf_libs: temp_raw_lines.append([d]) logging.info("Saved default CV Scenario details: " + str(temp_raw_lines)) - def setup_interfaces(self, band=None, vlan_id=None, mode=None, num_sta=None): - if band is None: - logging.error("Band value is not available.") - pytest.exit("Band value is not available.") - if mode is None: - logging.error("mode value is not available") - pytest.exit("mode value is not available") - if num_sta is None: - logging.error("Number of stations are not available") - pytest.exit("Number of stations are not available") + def setup_interfaces(self, ssid="", bssid="", passkey="", encryption="", band=None, vlan_id=None, mode=None, + num_sta=None): + if band not in ["twog", "fiveg", "sixg"]: + pytest.skip("Unsupported Band Selected: " + str(band)) + if mode not in ["BRIDGE", "NAT-WAN", "NAT-LAN", "VLAN"]: + pytest.skip("Unsupported Mode Selected: " + str(mode)) + r_val = dict() + for dut in self.dut_data: + r_val[dut["identifier"]] = None + for i in r_val: + r_val[i] = dict.fromkeys(["ssid", "bssid", "passkey", "encryption", "upstream_port", + "upstream_resource", "upstream", "station_data", "sniff_radio_2g", + "sniff_radio_5g", "sniff_radio_6g"]) if mode == "BRIDGE": - upstream_port = self.wan_upstream_port() - elif mode == "NAT-WAN": - upstream_port = self.wan_upstream_port() - elif mode == "NAT-LAN": - upstream_port = self.lan_upstream_port() - elif mode == "VLAN": - # for vlan mode vlan id should be available - if vlan_id is not None: - upstream_port = self.wan_upstream_port() + "." + str(vlan_id) + ret = self.get_wan_upstream_ports() + for dut in r_val: + if ret.keys().__contains__(dut) and ret[dut] is not None: + upstream_data = (ret[dut]).split(".") + r_val[dut]["upstream_port"] = ret[dut] + upstream_resource = upstream_data[1] + r_val[dut]["upstream_resource"] = upstream_resource + upstream_data.pop(0) + upstream_data.pop(0) + upstream = ".".join(upstream_data) + r_val[dut]["upstream"] = upstream + else: + r_val.pop(dut) + if mode == "NAT-WAN": + ret = self.get_wan_upstream_ports() + for dut in r_val: + if ret.keys().__contains__(dut) and ret[dut] is not None: + upstream_data = (ret[dut]).split(".") + r_val[dut]["upstream_port"] = ret[dut] + upstream_resource = upstream_data[1] + r_val[dut]["upstream_resource"] = upstream_resource + upstream_data.pop(0) + upstream_data.pop(0) + upstream = ".".join(upstream_data) + r_val[dut]["upstream"] = upstream + else: + r_val.pop(dut) + if mode == "NAT-LAN": + ret = self.get_lan_upstream_ports() + for dut in r_val: + if ret.keys().__contains__(dut) and ret[dut] is not None: + upstream_data = (ret[dut]).split(".") + r_val[dut]["upstream_port"] = ret[dut] + upstream_resource = upstream_data[1] + r_val[dut]["upstream_resource"] = upstream_resource + upstream_data.pop(0) + upstream_data.pop(0) + upstream = ".".join(upstream_data) + r_val[dut]["upstream"] = upstream + else: + r_val.pop(dut) + if mode == "VLAN": + if vlan_id is None: + logging.error("VLAN ID is Unspecified in the VLAN Case") + pytest.skip("VLAN ID is Unspecified in the VLAN Case") else: - logging.error("Vlan id is not available for vlan") - pytest.exit("Vlan id is not available for vlan") - else: - logging.error("Mode value is wrong.Value e.g. BRIDGE or NAT or VLAN") - pytest.exit("Mode value is wrong.Value e.g. BRIDGE or NAT or VLAN") - radio_data = {} - sta_prefix = "" - sniff_radio = "" - data_dict = {} - # deleting existing stations and layer 3 - self.pre_cleanup() + ret = self.get_wan_upstream_ports() + for dut in r_val: + if ret.keys().__contains__(dut) and ret[dut] is not None: + upstream_data = (ret[dut] + "." + str(vlan_id)).split(".") + r_val[dut]["upstream_port"] = ret[dut] + "." + str(vlan_id) + upstream_resource = upstream_data[1] + r_val[dut]["upstream_resource"] = upstream_resource + upstream_data.pop(0) + upstream_data.pop(0) + upstream = ".".join(upstream_data) + r_val[dut]["upstream"] = upstream + else: + r_val.pop(dut) + dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios, + "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, + "ax200_radios": self.ax200_radios, + "ax210_radios": self.ax210_radios} + + dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios, + "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, + "ax200_radios": self.ax200_radios, + "ax210_radios": self.ax210_radios} + + dict_all_radios_6g = {"ax210_radios": self.ax210_radios} + max_station_per_radio = {"wave2_2g_radios": 64, "wave2_5g_radios": 64, "wave1_radios": 64, "mtk_radios": 19, "ax200_radios": 1, "ax210_radios": 1} + radio_data = {} + sniff_radio = "" + + sta_prefix = "" + data_dict = {} + # deleting existing stations and layer 3 + # self.pre_cleanup() + data_dict["sniff_radio_2g"] = None + data_dict["sniff_radio_5g"] = None + data_dict["sniff_radio_6g"] = None if band == "twog": if self.run_lf: for i in self.dut_data: @@ -445,6 +513,7 @@ class lf_libs: passkey = i["ssid"]["2g-password"] security = i["ssid"]["2g-encryption"].lower() sta_prefix = self.twog_prefix + data_dict["sta_prefix"] = sta_prefix # checking station compitality of lanforge if int(num_sta) > int(self.max_2g_stations): logging.error("Can't create %s stations on lanforge" % num_sta) @@ -454,12 +523,6 @@ class lf_libs: self.ax200_radios) == 0 and len(self.mtk_radios) == 0: logging.error("Twog radio is not available") pytest.skip("Twog radio is not available") - - dict_all_radios_2g = {"wave2_2g_radios": self.wave2_2g_radios, - "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, - "ax200_radios": self.ax200_radios, - "ax210_radios": self.ax210_radios} - # radio and station selection stations = num_sta for j in dict_all_radios_2g: @@ -476,7 +539,6 @@ class lf_libs: radio_data[i] = max_station stations = stations - max_station diff = max_station - stations - # setup sniffer sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) data_dict["sniff_radio_2g"] = sniff_radio if band == "fiveg": @@ -487,6 +549,7 @@ class lf_libs: security = i["ssid"]["2g-encryption"].lower() sta_prefix = self.fiveg_prefix + data_dict["sta_prefix"] = sta_prefix # checking station compitality of lanforge if int(num_sta) > int(self.max_5g_stations): logging.error("Can't create %s stations on lanforge" % num_sta) @@ -497,11 +560,6 @@ class lf_libs: logging.error("fiveg radio is not available") pytest.skip("fiveg radio is not available") - dict_all_radios_5g = {"wave2_5g_radios": self.wave2_5g_radios, - "wave1_radios": self.wave1_radios, "mtk_radios": self.mtk_radios, - "ax200_radios": self.ax200_radios, - "ax210_radios": self.ax210_radios} - # radio and station selection stations = num_sta for j in dict_all_radios_5g: @@ -529,6 +587,7 @@ class lf_libs: security = i["ssid"]["6g-encryption"].lower() sta_prefix = self.sixg_prefix + data_dict["sta_prefix"] = sta_prefix # checking station compitality of lanforge if int(num_sta) > int(self.max_6g_stations): logging.error("Can't create %s stations on lanforge" % num_sta) @@ -538,8 +597,6 @@ class lf_libs: logging.error("sixg radio is not available") pytest.skip("sixg radio is not available") - dict_all_radios_6g = {"ax210_radios": self.ax210_radios} - # radio and station selection stations = num_sta for j in dict_all_radios_6g: @@ -559,7 +616,7 @@ class lf_libs: sniff_radio = self.setup_sniffer(band=band, station_radio_data=radio_data) data_dict["sniff_radio_6g"] = sniff_radio - # creating dict of radio and station_list + dict_radio_sta_list = {} # list of per radio station length_to_split = list(radio_data.values()) @@ -577,22 +634,97 @@ class lf_libs: for j in dict_radio_sta_list[i]: temp_list.append(shelf_resource + j) dict_radio_sta_list[i] = temp_list + data_dict["radios"] = dict_radio_sta_list + for i in r_val: + r_val[i]["station_data"] = data_dict["radios"] + r_val[i]["sta_prefix"] = data_dict["sta_prefix"] + r_val[i]["sniff_radio_2g"] = data_dict["sniff_radio_2g"] + r_val[i]["sniff_radio_5g"] = data_dict["sniff_radio_5g"] + r_val[i]["sniff_radio_6g"] = data_dict["sniff_radio_6g"] if self.run_lf: - data_dict["radios"] = dict_radio_sta_list - data_dict["upstream_port"] = upstream_port - data_dict["ssid"] = ssid - data_dict["passkey"] = passkey - data_dict["security"] = security - data_dict["sta_prefix"] = sta_prefix - # data_dict["sniff_radio"] = sniff_radio - return data_dict + for dut in self.dut_data: + ssid_data = [] + if r_val.keys().__contains__(dut["identifier"]): + if dut.keys().__contains__("ssid"): + if band == "twog": + r_val[dut["identifier"]]["ssid"] = dut["ssid"]["2g-ssid"] + r_val[dut["identifier"]]["passkey"] = dut["ssid"]["2g-password"] + r_val[dut["identifier"]]["encryption"] = dut["ssid"]["2g-encryption"] + r_val[dut["identifier"]]["bssid"] = dut["ssid"]["2g-bssid"] + if dut["ssid"]["2g-encryption"] == "OPEN": + ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] + + ' password=' + dut["ssid"]["2g-password"] + + ' bssid=' + dut["ssid"]["2g-bssid"]]) + else: + ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["2g-ssid"] + + ' security=' + str(dut["ssid"]["2g-encryption"]).upper() + + ' password=' + dut["ssid"]["2g-password"] + + ' bssid=' + dut["ssid"]["2g-bssid"]]) + self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) + if band == "fiveg": + r_val[dut["identifier"]]["ssid"] = dut["ssid"]["5g-ssid"] + r_val[dut["identifier"]]["passkey"] = dut["ssid"]["5g-password"] + r_val[dut["identifier"]]["encryption"] = dut["ssid"]["5g-encryption"] + r_val[dut["identifier"]]["bssid"] = dut["ssid"]["5g-bssid"] + if dut["ssid"]["5g-encryption"] == "OPEN": + ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["5g-ssid"] + + ' password=' + dut["ssid"]["5g-password"] + + ' bssid=' + dut["ssid"]["5g-bssid"]]) + else: + ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["5g-ssid"] + + ' security=' + str(dut["ssid"]["5g-encryption"]).upper() + + ' password=' + dut["ssid"]["5g-password"] + + ' bssid=' + dut["ssid"]["5g-bssid"]]) + self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) + if band == "sixg": + r_val[dut["identifier"]]["ssid"] = dut["ssid"]["6g-ssid"] + r_val[dut["identifier"]]["passkey"] = dut["ssid"]["6g-password"] + r_val[dut["identifier"]]["encryption"] = dut["ssid"]["6g-encryption"] + r_val[dut["identifier"]]["bssid"] = dut["ssid"]["6g-bssid"] + if dut["ssid"]["6g-encryption"] == "OPEN": + ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["6g-ssid"] + + ' password=' + dut["ssid"]["6g-password"] + + ' bssid=' + dut["ssid"]["6g-bssid"]]) + else: + ssid_data.append(['ssid_idx=0 ssid=' + dut["ssid"]["6g-ssid"] + + ' security=' + str(dut["ssid"]["6g-encryption"]).upper() + + ' password=' + dut["ssid"]["6g-password"] + + ' bssid=' + dut["ssid"]["6g-bssid"]]) + self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) else: - data_dict["radios"] = dict_radio_sta_list - data_dict["upstream_port"] = upstream_port - data_dict["sta_prefix"] = sta_prefix - # data_dict["sniff_radio"] = sniff_radio - return data_dict + for dut in self.dut_data: + ssid_data = [] + if r_val.keys().__contains__(dut["identifier"]): + r_val[dut["identifier"]]["ssid"] = ssid + r_val[dut["identifier"]]["passkey"] = passkey + r_val[dut["identifier"]]["encryption"] = encryption + r_val[dut["identifier"]]["bssid"] = bssid + if encryption == "OPEN": + ssid_data.append(['ssid_idx=0 ssid=' + ssid + + ' password=' + passkey + + ' bssid=' + str(bssid).upper()]) + else: + ssid_data.append(['ssid_idx=0 ssid=' + ssid + + ' security=' + str(encryption).upper() + + ' password=' + passkey + + ' bssid=' + str(bssid).upper()]) + + if str(encryption).upper() in ["OPEN", "WPA", "WPA2", "WPA3", "WEP"]: + self.update_duts(identifier=dut["identifier"], ssid_data=ssid_data) + print(r_val) + return r_val + + def update_duts(self, identifier=0, ssid_data=[]): + for dut_obj in self.dut_objects: + if identifier == dut_obj.dut_name: + dut_obj.ssid = ssid_data + dut_obj.add_ssids() + # SSID data should be in this format + # [ + # ['ssid_idx=0 ssid=Default-SSID-2g security=WPA|WEP| password=12345678 bssid=90:3c:b3:94:48:58'], + # ['ssid_idx=1 ssid=Default-SSID-5gl password=12345678 bssid=90:3c:b3:94:48:59'] + # ] def setup_relevent_profiles(self): """ TODO @@ -695,21 +827,25 @@ class lf_libs: else: logging.error("Name is not provided") - def wan_upstream_port(self): - """finding upstream port""" - upstream_port = "" - for i in self.dut_data: - if dict(i).keys().__contains__("wan_port"): - upstream_port = i["wan_port"] - return upstream_port + def get_wan_upstream_ports(self): + """finding upstream wan ports""" + r_val = dict() + for dut in self.dut_data: + r_val[dut["identifier"]] = None + if dut["wan_port"] is not None: + if dut["wan_port"] in self.lanforge_data["wan_ports"].keys(): + r_val[dut["identifier"]] = dut["wan_port"] + return r_val - def lan_upstream_port(self): - """finding upstream port""" - upstream_port = "" - for i in self.dut_data: - if dict(i).keys().__contains__("lan_port"): - upstream_port = i["lan_port"] - return upstream_port + def get_lan_upstream_ports(self): + """finding upstream wan ports""" + r_val = dict() + for dut in self.dut_data: + r_val[dut["identifier"]] = None + if dut["lan_port"] is not None: + if dut["lan_port"] in self.lanforge_data["lan_ports"].keys(): + r_val[dut["identifier"]] = dut["lan_port"] + return r_val def setup_sniffer(self, band=None, station_radio_data=None): """Setup sniff radio""" diff --git a/lf_libs/lf_tests.py b/lf_libs/lf_tests.py index bf3c72d3..dc4f4ae8 100644 --- a/lf_libs/lf_tests.py +++ b/lf_libs/lf_tests.py @@ -47,154 +47,136 @@ class lf_tests(lf_libs): """ def __init__(self, lf_data={}, dut_data={}, log_level=logging.DEBUG, run_lf=False, influx_params=None): - super().__init__(lf_data, dut_data, log_level) - self.run_lf = run_lf - self.dut_idx_mapping = {} - # self.upstream_port = list(self.uplink_nat_ports.keys())[0] - # self.skip_pcap = skip_pcap - # self.wan_upstream = list(self.wan_ports.keys()) - # self.lan_upstream = - #self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam", _cleanup_on_exit=False) + super().__init__(lf_data, dut_data, run_lf, log_level) + def client_connectivity_test(self, ssid="[BLANK]", passkey="[BLANK]", dut_data={}, - security="open", extra_securities=[], - num_sta=1, mode="BRIDGE", vlan_id=[None], band="twog", ssid_channel=None, + security="open", extra_securities=[], sta_mode=0, + num_sta=1, mode="BRIDGE", vlan_id=[None], band="twog", allure_attach=True, runtime_secs=40): - # self.staConnect = StaConnect2(self.manager_ip, self.manager_http_port, debug_=self.debug) - # setup_interfaces() interface selection return radio name along no of station on each radio, upstream port - # - - data = self.setup_interfaces(band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta) + data = self.setup_interfaces(ssid=ssid, bssid=passkey, passkey=passkey, encryption=security, + band=band, vlan_id=vlan_id[0], mode=mode, num_sta=num_sta) self.add_vlan(vlan_ids=vlan_id) - logging.info("Setup interface data" + str(data)) - if self.run_lf: - ssid = data["ssid"] - passkey = data["passkey"] - security = data["security"] - sta_connect_obj = [] - for radio in data["radios"]: - obj_sta_connect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam", - _cleanup_on_exit=False) - obj_sta_connect.sta_mode = 0 - upstream_data = list(data["upstream_port"].split(".")) - obj_sta_connect.upstream_resource = upstream_data[1] - upstream_data.pop(0) - upstream_data.pop(0) - upstream_port = ".".join(upstream_data) - obj_sta_connect.upstream_port = upstream_port - self.enable_verbose_debug(radio=radio, enable=False) - obj_sta_connect.radio = radio - obj_sta_connect.admin_down(obj_sta_connect.radio) - obj_sta_connect.admin_up(obj_sta_connect.radio) - obj_sta_connect.sta_prefix = data["sta_prefix"] - # changed to auto channel - self.set_radio_channel(radio=radio, channel="AUTO") - logging.info("scan ssid radio: " + str(radio.split(".")[2])) - result = self.scan_ssid(radio=radio, ssid=ssid, ssid_channel=ssid_channel) - logging.info("ssid scan data : " + str(result)) - if not result and ssid_channel: - # Sniffer required - # print("sniff radio", data["sniff_radio"].split(".")[2]) - for dut in self.dut_data: - identifier = dut["identifier"] - if dut_data.keys().__contains__(identifier): - if band == "twog": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \ - dict(dut_data.get(identifier)[-1])["2G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["2G"][0] - if data["sniff_radio_2g"] is not None: - self.start_sniffer(radio_channel=channel, - radio=data["sniff_radio_2g"].split(".")[2], - duration=10) - time.sleep(10) - self.stop_sniffer() - elif band == "fiveg": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \ - dict(dut_data.get(identifier)[-1])["5G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["5G"][0] - if data["sniff_radio_5g"] is not None: - self.start_sniffer(radio_channel=channel, - radio=data["sniff_radio_5g"].split(".")[2], - duration=10) - time.sleep(10) - self.stop_sniffer() - elif band == "sixg": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \ - dict(dut_data.get(identifier)[-1])["6G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["6G"][0] - if data["sniff_radio_6g"] is not None: - self.start_sniffer(radio_channel=channel, - radio=data["sniff_radio_6g"].split(".")[2], - duration=10) - time.sleep(10) - self.stop_sniffer() - # print("ssid not available in scan result") - # return "FAIL", "ssid not available in scan result" - pass - obj_sta_connect.resource = radio.split(".")[1] - obj_sta_connect.dut_ssid = ssid - obj_sta_connect.dut_passwd = passkey - obj_sta_connect.dut_security = security - obj_sta_connect.station_names = data["radios"][radio] - obj_sta_connect.runtime_secs = runtime_secs - obj_sta_connect.bringup_time_sec = 80 - obj_sta_connect.cleanup_on_exit = True - obj_sta_connect.download_bps = 128000 - obj_sta_connect.upload_bps = 128000 - obj_sta_connect.side_a_pdu = 1200 - obj_sta_connect.side_b_pdu = 1500 - obj_sta_connect.setup(extra_securities=extra_securities) - if ssid_channel: - pass - # Need to start sniffer - # print("sniff radio", data["sniff_radio"].split(".")[2]) - # self.start_sniffer(radio_channel=ssid_channel, radio=data["sniff_radio"].split(".")[2], duration=30) + logging.info("Setup interface data" + str(data)) + sta_connect_obj = [] + for dut in data: + for radio in data[dut]["station_data"]: + obj_sta_connect = StaConnect2(self.manager_ip, self.manager_http_port, outfile="shivam", + _cleanup_on_exit=False) + + obj_sta_connect.sta_mode = sta_mode + obj_sta_connect.upstream_resource = data[dut]["upstream_resource"] + obj_sta_connect.upstream_port = data[dut]["upstream"] + self.enable_verbose_debug(radio=radio, enable=True) + obj_sta_connect.radio = radio + obj_sta_connect.admin_down(obj_sta_connect.radio) + obj_sta_connect.admin_up(obj_sta_connect.radio) + obj_sta_connect.sta_prefix = data[dut]["sta_prefix"] + obj_sta_connect.resource = radio.split(".")[1] + obj_sta_connect.dut_ssid = ssid + obj_sta_connect.dut_ssid = ssid + obj_sta_connect.dut_passwd = passkey + obj_sta_connect.dut_security = security + obj_sta_connect.station_names = data[dut]["station_data"][radio] + obj_sta_connect.runtime_secs = runtime_secs + obj_sta_connect.bringup_time_sec = 80 + obj_sta_connect.cleanup_on_exit = True + obj_sta_connect.download_bps = 128000 + obj_sta_connect.upload_bps = 128000 + obj_sta_connect.side_a_pdu = 1200 + obj_sta_connect.side_b_pdu = 1500 + + + # changed to auto channel + self.set_radio_channel(radio=radio, channel="AUTO") + logging.info("scan ssid radio: " + str(radio.split(".")[2])) + result = self.scan_ssid(radio=radio, ssid=ssid) + logging.info("ssid scan data : " + str(result)) + if not result: + # Sniffer required + # print("sniff radio", data["sniff_radio"].split(".")[2]) + for duts in self.dut_data: + identifier = duts["identifier"] + if dut_data.keys().__contains__(identifier): + if band == "twog": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \ + dict(dut_data.get(identifier)[-1])["2G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["2G"][0] + if data[dut]["sniff_radio_2g"] is not None: + self.start_sniffer(radio_channel=channel, + radio=data[dut]["sniff_radio_2g"].split(".")[2], + duration=10) + time.sleep(10) + self.stop_sniffer() + elif band == "fiveg": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \ + dict(dut_data.get(identifier)[-1])["5G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["5G"][0] + if data[dut]["sniff_radio_5g"] is not None: + self.start_sniffer(radio_channel=channel, + radio=data[dut]["sniff_radio_5g"].split(".")[2], + duration=10) + time.sleep(10) + self.stop_sniffer() + elif band == "sixg": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \ + dict(dut_data.get(identifier)[-1])["6G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["6G"][0] + if data[dut]["sniff_radio_6g"] is not None: + self.start_sniffer(radio_channel=channel, + radio=data[dut]["sniff_radio_6g"].split(".")[2], + duration=10) + time.sleep(10) + self.stop_sniffer() + if not result: + pytest.fail("SSID is not Available in Scan Result") + obj_sta_connect.setup(extra_securities=extra_securities) sta_connect_obj.append(obj_sta_connect) - for dut in self.dut_data: - identifier = dut["identifier"] - if dut_data.keys().__contains__(identifier): - if band == "twog": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \ - dict(dut_data.get(identifier)[-1])["2G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["2G"][0] - self.start_sniffer(radio_channel=channel, radio=data["sniff_radio_2g"].split(".")[2], - duration=runtime_secs) - logging.info("started-sniffer") - for obj in sta_connect_obj: - obj.start() - logging.info("napping %f sec" % runtime_secs) - time.sleep(runtime_secs) - logging.info("stopping-sniffer") - self.stop_sniffer() - elif band == "fiveg": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \ - dict(dut_data.get(identifier)[-1])["5G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["5G"][0] - self.start_sniffer(radio_channel=channel, radio=data["sniff_radio_5g"].split(".")[2], - duration=runtime_secs) - for obj in sta_connect_obj: - obj.start() - logging.info("napping %f sec" % runtime_secs) - time.sleep(runtime_secs) - self.stop_sniffer() - elif band == "sixg": - if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \ - dict(dut_data.get(identifier)[-1])["6G"] is not None: - channel = dict(dut_data.get(identifier)[-1])["6G"][0] - self.start_sniffer(radio_channel=channel, radio=data["sniff_radio_6g"].split(".")[2], - duration=runtime_secs) - for obj in sta_connect_obj: - obj.start() - logging.info("napping %f sec" % runtime_secs) - time.sleep(runtime_secs) - self.stop_sniffer() - else: - for obj in sta_connect_obj: - print(obj) - obj.start() - print("napping %f sec" % runtime_secs) - time.sleep(runtime_secs) + for dut_ in self.dut_data: + identifier = dut_["identifier"] + if dut_data.keys().__contains__(identifier): + if band == "twog": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("2G") and \ + dict(dut_data.get(identifier)[-1])["2G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["2G"][0] + self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_2g"].split(".")[2], + duration=runtime_secs) + logging.info("started-sniffer") + for obj in sta_connect_obj: + obj.start() + logging.info("napping %f sec" % runtime_secs) + time.sleep(runtime_secs) + logging.info("stopping-sniffer") + self.stop_sniffer() + elif band == "fiveg": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("5G") and \ + dict(dut_data.get(identifier)[-1])["5G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["5G"][0] + self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_5g"].split(".")[2], + duration=runtime_secs) + for obj in sta_connect_obj: + obj.start() + logging.info("napping %f sec" % runtime_secs) + time.sleep(runtime_secs) + self.stop_sniffer() + elif band == "sixg": + if dict(dut_data.get(identifier)[-1]).keys().__contains__("6G") and \ + dict(dut_data.get(identifier)[-1])["6G"] is not None: + channel = dict(dut_data.get(identifier)[-1])["6G"][0] + self.start_sniffer(radio_channel=channel, radio=data[dut]["sniff_radio_6g"].split(".")[2], + duration=runtime_secs) + for obj in sta_connect_obj: + obj.start() + logging.info("napping %f sec" % runtime_secs) + time.sleep(runtime_secs) + self.stop_sniffer() + else: + for obj in sta_connect_obj: + print(obj) + obj.start() + print("napping %f sec" % runtime_secs) + time.sleep(runtime_secs) pass_fail_result = [] for obj in sta_connect_obj: sta_rows = ["4way time (us)", "channel", "cx time (us)", "dhcp (ms)", "ip", "signal"] @@ -225,7 +207,7 @@ class lf_tests(lf_libs): cx_table_dict = {} upstream = [] for i in range(len(obj.station_names)): - upstream.append(data["upstream_port"]) + upstream.append(data[dut]["upstream_port"]) cx_table_dict["Upstream"] = upstream cx_table_dict["Downstream"] = obj.station_names cx_tcp_ul = [] @@ -280,9 +262,6 @@ class lf_tests(lf_libs): logging.info("client connection to" + str(obj.dut_ssid) + "unsuccessful. Test Failed") result = "FAIL" - if ssid_channel: - # need to stop sniffer - pass result = "PASS" description = "" for i in pass_fail_result: @@ -371,185 +350,191 @@ class lf_tests(lf_libs): logging.info("ALL Stations got IP's") return station_data_all - def add_stations(self, band="2G", num_stations="max", dut="NA", ssid_name=[], idx=0): - logging.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) + - " DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx)) + def add_stations(self, band="2G", num_stations="max", ssid_name=[], idx=0): + + dut_name = [] + for index in range(0, len(self.dut_data)): + dut_name.append(self.dut_data[index]["identifier"]) if num_stations == 0: logging.warning("0 Stations") return - idx = idx - if self.run_lf or self.cc_1: + for dut in dut_name: + logging.info("Adding Stations:" + band + " band, Number of Stations: " + str(num_stations) + + " DUT: " + str(dut) + " SSID: " + str(ssid_name) + " idx: " + str(idx)) + idx = idx + if self.run_lf or self.cc_1: + if band == "2G": + idx = 0 + if band == "5G": + idx = 1 + + for i in self.dut_idx_mapping: + if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band: + idx = i if band == "2G": - idx = 0 + all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + \ + self.ax210_radios + print("all_2g_rdio", all_radio_2g) + if num_stations != "max": + logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) + total_sta = num_stations + max_possible = 0 + for radio in all_radio_2g: + max_possible = max_possible + int(self.get_max_sta(radio)) + if total_sta <= max_possible: + per_radio_sta = int(total_sta / len(all_radio_2g)) + rem = total_sta % len(all_radio_2g) + else: + total_sta = max_possible + per_radio_sta = int(total_sta / len(all_radio_2g)) + rem = total_sta % len(all_radio_2g) + if rem != 0 and per_radio_sta == 0: + per_radio_sta = rem / len(all_radio_2g) + logging.info("Total stations per radio: " + str(per_radio_sta)) + for radio in all_radio_2g: + max_possible = int(self.get_max_sta(radio)) + if total_sta == 0: + return + num_stations = per_radio_sta + if rem == 0 and num_stations == 0: + return + if max_possible - num_stations >= rem: + num_stations = num_stations + rem + rem = 0 + elif max_possible - rem >= num_stations: + num_stations = num_stations + rem + rem = 0 + elif total_sta <= max_possible: + num_stations = total_sta + if per_radio_sta < 1: + num_stations = 1 + total_sta = total_sta - num_stations + logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.temp_raw_lines.append(station_data) + logging.debug("Raw Line : " + str(station_data)) + + if num_stations == "max": + logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) + for radio in all_radio_2g: + num_stations = self.get_max_sta(radio) + logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.temp_raw_lines.append(station_data) + logging.debug("Raw Line : " + str(station_data)) + if band == "5G": - idx = 1 + all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios + if num_stations != "max": + logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g))) + total_sta = num_stations + max_possible = 0 + for radio in all_radio_5g: + max_possible = max_possible + int(self.get_max_sta(radio)) + if total_sta <= max_possible: + per_radio_sta = int(total_sta / len(all_radio_5g)) + rem = total_sta % len(all_radio_5g) + else: + total_sta = max_possible + per_radio_sta = int(total_sta / len(all_radio_5g)) + rem = total_sta % len(all_radio_5g) + if rem != 0 and per_radio_sta == 0: + per_radio_sta = rem / len(all_radio_5g) + logging.info("Total stations per radio: " + str(per_radio_sta)) + for radio in all_radio_5g: + max_possible = int(self.get_max_sta(radio)) + if total_sta == 0: + return + num_stations = per_radio_sta + if rem == 0 and num_stations == 0: + return + if max_possible - num_stations >= rem: + num_stations = num_stations + rem + rem = 0 + elif max_possible - rem >= num_stations: + num_stations = num_stations + rem + rem = 0 + elif total_sta <= max_possible: + num_stations = total_sta + if per_radio_sta < 1: + num_stations = 1 + total_sta = total_sta - num_stations + logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.temp_raw_lines.append(station_data) + logging.debug("Raw Line : " + str(station_data)) - for i in self.dut_idx_mapping: - if self.dut_idx_mapping[i][0] == ssid_name and self.dut_idx_mapping[i][3] == band: - idx = i - if band == "2G": - all_radio_2g = self.wave2_2g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + \ - self.ax210_radios - if num_stations != "max": - logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) - total_sta = num_stations - max_possible = 0 - for radio in all_radio_2g: - max_possible = max_possible + int(self.get_max_sta(radio)) - if total_sta <= max_possible: - per_radio_sta = int(total_sta / len(all_radio_2g)) - rem = total_sta % len(all_radio_2g) - else: - total_sta = max_possible - per_radio_sta = int(total_sta / len(all_radio_2g)) - rem = total_sta % len(all_radio_2g) - if rem != 0 and per_radio_sta == 0: - per_radio_sta = rem / len(all_radio_2g) - logging.info("Total stations per radio: " + str(per_radio_sta)) - for radio in all_radio_2g: - max_possible = int(self.get_max_sta(radio)) - if total_sta == 0: - return - num_stations = per_radio_sta - if rem == 0 and num_stations == 0: - return - if max_possible - num_stations >= rem: - num_stations = num_stations + rem - rem = 0 - elif max_possible - rem >= num_stations: - num_stations = num_stations + rem - rem = 0 - elif total_sta <= max_possible: - num_stations = total_sta - if per_radio_sta < 1: - num_stations = 1 - total_sta = total_sta - num_stations - logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - logging.debug("Raw Line : " + str(station_data)) - - if num_stations == "max": - logging.info("Total 2G Radios Available in Testbed: " + str(len(all_radio_2g))) - for radio in all_radio_2g: - num_stations = self.get_max_sta(radio) - logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - logging.debug("Raw Line : " + str(station_data)) - - if band == "5G": - all_radio_5g = self.wave2_5g_radios + self.wave1_radios + self.mtk_radios + self.ax200_radios + self.ax210_radios - if num_stations != "max": - logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g))) - total_sta = num_stations - max_possible = 0 - for radio in all_radio_5g: - max_possible = max_possible + int(self.get_max_sta(radio)) - if total_sta <= max_possible: - per_radio_sta = int(total_sta / len(all_radio_5g)) - rem = total_sta % len(all_radio_5g) - else: - total_sta = max_possible - per_radio_sta = int(total_sta / len(all_radio_5g)) - rem = total_sta % len(all_radio_5g) - if rem != 0 and per_radio_sta == 0: - per_radio_sta = rem / len(all_radio_5g) - logging.info("Total stations per radio: " + str(per_radio_sta)) - for radio in all_radio_5g: - max_possible = int(self.get_max_sta(radio)) - if total_sta == 0: - return - num_stations = per_radio_sta - if rem == 0 and num_stations == 0: - return - if max_possible - num_stations >= rem: - num_stations = num_stations + rem - rem = 0 - elif max_possible - rem >= num_stations: - num_stations = num_stations + rem - rem = 0 - elif total_sta <= max_possible: - num_stations = total_sta - if per_radio_sta < 1: - num_stations = 1 - total_sta = total_sta - num_stations - logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - logging.debug("Raw Line : " + str(station_data)) - - if num_stations == "max": - logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g))) - for radio in all_radio_5g: - num_stations = self.get_max_sta(radio) - logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - logging.debug("Raw Line : " + str(station_data)) - if band == "6g": - all_radio_6g = self.ax210_radios - if num_stations != "max": - logging.info("Total 6G Radios Available in Testbed: " + str(len(all_radio_6g))) - total_sta = num_stations - max_possible = 0 - for radio in all_radio_6g: - max_possible = max_possible + int(self.get_max_sta(radio)) - if total_sta <= max_possible: - per_radio_sta = int(total_sta / len(all_radio_6g)) - rem = total_sta % len(all_radio_6g) - else: - total_sta = max_possible - per_radio_sta = int(total_sta / len(all_radio_6g)) - rem = total_sta % len(all_radio_6g) - if rem != 0 and per_radio_sta == 0: - per_radio_sta = rem / len(all_radio_6g) - logging.info("Total stations per radio: " + str(per_radio_sta)) - for radio in all_radio_6g: - max_possible = int(self.get_max_sta(radio)) - if total_sta == 0: - return - num_stations = per_radio_sta - if rem == 0 and num_stations == 0: - return - if max_possible - num_stations >= rem: - num_stations = num_stations + rem - rem = 0 - elif max_possible - rem >= num_stations: - num_stations = num_stations + rem - rem = 0 - elif total_sta <= max_possible: - num_stations = total_sta - if per_radio_sta < 1: - num_stations = 1 - total_sta = total_sta - num_stations - logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - logging.debug("Raw Line : " + str(station_data)) - if num_stations == "max": - logging.info("Total AX Radios Available in Testbed: " + str(len(all_radio_6g))) - for radio in all_radio_6g: - num_stations = self.get_max_sta(radio) - logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) - station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + - " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + - str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] - self.raw_line.append(station_data) - logging.debug("Raw Line : " + str(station_data)) + if num_stations == "max": + logging.info("Total 5G Radios Available in Testbed: " + str(len(all_radio_5g))) + for radio in all_radio_5g: + num_stations = self.get_max_sta(radio) + logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.temp_raw_lines.append(station_data) + logging.debug("Raw Line : " + str(station_data)) + if band == "6g": + all_radio_6g = self.ax210_radios + if num_stations != "max": + logging.info("Total 6G Radios Available in Testbed: " + str(len(all_radio_6g))) + total_sta = num_stations + max_possible = 0 + for radio in all_radio_6g: + max_possible = max_possible + int(self.get_max_sta(radio)) + if total_sta <= max_possible: + per_radio_sta = int(total_sta / len(all_radio_6g)) + rem = total_sta % len(all_radio_6g) + else: + total_sta = max_possible + per_radio_sta = int(total_sta / len(all_radio_6g)) + rem = total_sta % len(all_radio_6g) + if rem != 0 and per_radio_sta == 0: + per_radio_sta = rem / len(all_radio_6g) + logging.info("Total stations per radio: " + str(per_radio_sta)) + for radio in all_radio_6g: + max_possible = int(self.get_max_sta(radio)) + if total_sta == 0: + return + num_stations = per_radio_sta + if rem == 0 and num_stations == 0: + return + if max_possible - num_stations >= rem: + num_stations = num_stations + rem + rem = 0 + elif max_possible - rem >= num_stations: + num_stations = num_stations + rem + rem = 0 + elif total_sta <= max_possible: + num_stations = total_sta + if per_radio_sta < 1: + num_stations = 1 + total_sta = total_sta - num_stations + logging.info("Adding " + str(num_stations) + " Stations on " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.temp_raw_lines.append(station_data) + logging.debug("Raw Line : " + str(station_data)) + if num_stations == "max": + logging.info("Total AX Radios Available in Testbed: " + str(len(all_radio_6g))) + for radio in all_radio_6g: + num_stations = self.get_max_sta(radio) + logging.info("Total stations: " + str(num_stations) + " On Radio: " + str(radio)) + station_data = ["profile_link " + radio.split(".")[0] + "." + radio.split(".")[1] + + " STA-AUTO " + str(num_stations) + " 'DUT: " + dut + " Radio-" + + str(int(idx) + 1) + "'" + " NA " + radio.split(".")[2]] + self.temp_raw_lines.append(station_data) + logging.debug("Raw Line : " + str(station_data)) if __name__ == '__main__': - advance_03= { + basic_04 = { "target": "tip_2x", "controller": { "url": "https://sec-qa01.cicd.lab.wlan.tip.build:16001", @@ -557,10 +542,11 @@ if __name__ == '__main__': "password": "OpenWifi%123" }, "device_under_tests": [{ - "model": "cig_wf196", - "supported_bands": ["2G", "5G", "6G"], + "model": "edgecore_ecw5211", + "supported_bands": ["2G", "5G"], "supported_modes": ["BRIDGE", "NAT", "VLAN"], - "wan_port": "1.3.eth2", + "wan_port": "1.1.eth2", + "lan_port": "1.1.eth1", "ssid": { "2g-ssid": "OpenWifi", "5g-ssid": "OpenWifi", @@ -568,21 +554,21 @@ if __name__ == '__main__': "2g-password": "OpenWifi", "5g-password": "OpenWifi", "6g-password": "OpenWifi", - "2g-encryption": "WPA2", - "5g-encryption": "WPA2", - "6g-encryption": "WPA3", + "2g-encryption": "OPEN", + "5g-encryption": "OPEN", + "6g-encryption": "OPEN", "2g-bssid": "68:7d:b4:5f:5c:31", "5g-bssid": "68:7d:b4:5f:5c:3c", "6g-bssid": "68:7d:b4:5f:5c:38" }, - "mode": "wifi6e", - "identifier": "824f816011e4", + "mode": "wifi5", + "identifier": "68215fda456d", "method": "serial", - "host_ip": "10.28.3.115", - "host_username": "root", + "host_ip": "localhost", + "host_username": "lanforge", "host_password": "pumpkin77", "host_ssh_port": 22, - "serial_tty": "/dev/ttyAP0", + "serial_tty": "/dev/ttyAP5", "firmware_version": "next-latest" }], "traffic_generator": { @@ -590,12 +576,12 @@ if __name__ == '__main__': "testbed": "basic", "scenario": "dhcp-bridge", "details": { - "manager_ip": "10.28.3.117", + "manager_ip": "10.28.3.6", "http_port": 8080, "ssh_port": 22, "setup": {"method": "build", "DB": "Test_Scenario_Automation"}, "wan_ports": { - "1.3.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { + "1.1.eth2": {"addressing": "dhcp-server", "subnet": "172.16.0.1/16", "dhcp": { "lease-first": 10, "lease-count": 10000, "lease-time": "6h" @@ -603,12 +589,14 @@ if __name__ == '__main__': } }, "lan_ports": { - + "1.1.eth1": { + "addressing": "dynamic" + } }, "uplink_nat_ports": { - "1.3.eth3": { - "addressing": "static", - "ip": "10.28.2.39", + "1.1.eth3": { + "addressing": "dhcp-server", + "ip": "10.28.2.9", "gateway_ip": "10.28.2.1/24", "ip_mask": "255.255.255.0", "dns_servers": "BLANK" @@ -618,8 +606,13 @@ if __name__ == '__main__': } } - obj = lf_tests(lf_data=dict(advance_03["traffic_generator"]), dut_data=list(advance_03["device_under_tests"]), + obj = lf_tests(lf_data=dict(basic_04["traffic_generator"]), dut_data=list(basic_04["device_under_tests"]), log_level=logging.DEBUG, run_lf=True) + obj.setup_dut() + #obj.add_stations(band="2G", num_stations=6, ssid_name=["OpenWifi"]) + #obj.chamber_view(raw_lines="custom") + # A =obj.setup_interfaces(band="fiveg", vlan_id=100, mode="NAT-WAN", num_sta=1) + # print(A) # obj.setup_relevent_profiles() # obj.Client_Connect(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", mode="BRIDGE", band="twog", # vlan_id=100, num_sta=5, scan_ssid=True, @@ -629,9 +622,12 @@ if __name__ == '__main__': # obj.create_dhcp_external()obj.add_vlan(vlan_ids=[100, 200, 300, 400, 500, 600]) # obj.get_cx_data() # obj.chamber_view() + # dut = {'0000c1018812': [['OpenWifi', 'wpa2', 'OpenWifi', '2G', '6A:21:5F:DA:45:6F'], + # {'2G': [6, 40, 2437], '5G': None, '6G': None}]} + # c = obj.client_connectivity_test(ssid="OpenWifi", passkey="OpenWifi", security="wpa2", extra_securities=[], - # num_sta=1, mode="BRIDGE", vlan_id=[100], - # band="twog", ssid_channel=11) + # num_sta=1, mode="BRIDGE", dut_data=dut, + # band="twog") # obj.start_sniffer(radio_channel=1, radio="wiphy7", test_name="sniff_radio", duration=30) # print("started") # time.sleep(30) diff --git a/lf_libs/scan_ssid.csv b/lf_libs/scan_ssid.csv deleted file mode 100644 index 9de151a5..00000000 --- a/lf_libs/scan_ssid.csv +++ /dev/null @@ -1,4 +0,0 @@ -,age,auth,beacon,bss,channel,entity id,frequency,info,signal,ssid -0,4698,Open,100,00:00:c1:01:88:15,1,1.1.4,2412,2x2 MCS 0-11 AX,-17.0,Maverick-018812 -1,354731974,WPA2 WPA3,100,02:00:c1:01:88:15,1,1.1.4,2412,2x2 MCS 0-11 AX,-22.0,ssid_wpa3_p_m_2g_br -2,354731972,WPA WPA2,100,06:00:c1:01:88:15,1,1.1.4,2412,2x2 MCS 0-11 AX,-21.0,ssid_wpa_wpa2_p_m_2g_br diff --git a/lf_libs/sh-results.csv b/lf_libs/sh-results.csv deleted file mode 100644 index e69de29b..00000000