diff --git a/py-scripts/sta_connect2.py b/py-scripts/sta_connect2.py index 43c2869b..b9786d36 100755 --- a/py-scripts/sta_connect2.py +++ b/py-scripts/sta_connect2.py @@ -40,7 +40,7 @@ class StaConnect2(LFCliBase): # very confused. super().__init__(host, port, _debug=_debugOn, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail) self.debugOn = _debugOn - self.dut_security = "" + self.dut_security = _dut_security self.dut_ssid = _dut_ssid self.dut_passwd = _dut_passwd self.dut_bssid = _dut_bssid @@ -62,6 +62,9 @@ class StaConnect2(LFCliBase): self.localrealm = Realm(lfclient_host=host, lfclient_port=port) # py > 3.6 self.resulting_stations = {} self.resulting_endpoints = {} + self.sta_profile = None + self.l3_udp_profile = None + self.l3_tcp_profile = None # def get_realm(self) -> Realm: # py > 3.6 def get_realm(self): @@ -134,55 +137,58 @@ class StaConnect2(LFCliBase): for sta_name in self.station_names: sta_url = self.get_station_url(sta_name) response = self.json_get(sta_url) - if response is not None: - if response["interface"] is not None: - for sta_name in self.station_names: - LFUtils.removePort(self.resource, sta_name, self.lfclient_url) - LFUtils.wait_until_ports_disappear(self.resource, self.lfclient_url, self.station_names) + if (response is not None) and (response["interface"] is not None): + for sta_name in self.station_names: + LFUtils.removePort(self.resource, sta_name, self.lfclient_url) + LFUtils.wait_until_ports_disappear(self.resource, self.lfclient_url, self.station_names) # Create stations and turn dhcp on - sta_profile = self.localrealm.new_station_profile() + self.station_profile = self.localrealm.new_station_profile() + pprint.pprint(self.dut_security) if self.dut_security == WPA2: - sta_profile.use_wpa2(on=True, ssid=self.dut_ssid, passwd=self.dut_passwd) + self.station_profile.use_wpa2(on=True, ssid=self.dut_ssid, passwd=self.dut_passwd) elif self.dut_security == OPEN: - sta_profile.use_wpa2(on=False, ssid=self.dut_ssid) - sta_profile.set_command_flag("add_sta", "create_admin_down", 1) + self.station_profile.use_wpa2(on=False, ssid=self.dut_ssid) + self.station_profile.set_command_flag("add_sta", "create_admin_down", 1) print("Adding new stations ", end="") - sta_profile.create(resource=self.resource, radio=self.radio, sta_names_=self.station_names) + self.station_profile.create(resource=self.resource, radio=self.radio, sta_names_=self.station_names, up_=False, debug=True) - # Create endpoints and cxs # Create UDP endpoints - - l3_udp_profile = self.localrealm.new_l3_cx_profile() - l3_udp_profile.side_a_min_bps = 128000 - l3_udp_profile.side_b_min_bps = 128000 - l3_udp_profile.side_a_min_pdu = 1200 - l3_udp_profile.side_b_min_pdu = 1500 - l3_udp_profile.report_timer = 1000 - l3_udp_profile.prefix = "udp_" - l3_udp_profile.create(endp_type="lf_udp", - side_a=list(self.localrealm.find_ports_like("sta+")), - side_b="%d.%s" % (self.resource, self.upstream_port)) + self.l3_udp_profile = self.localrealm.new_l3_cx_profile() + self.l3_udp_profile.side_a_min_bps = 128000 + self.l3_udp_profile.side_b_min_bps = 128000 + self.l3_udp_profile.side_a_min_pdu = 1200 + self.l3_udp_profile.side_b_min_pdu = 1500 + self.l3_udp_profile.report_timer = 1000 + self.l3_udp_profile.name_prefix = "udp" + self.l3_udp_profile.create(endp_type="lf_udp", + side_a=list(self.localrealm.find_ports_like("sta+")), + side_b="%d.%s" % (self.resource, self.upstream_port), + suppress_related_commands=True) # Create TCP endpoints - - l3_tcp_profile = self.localrealm.new_l3_cx_profile() - l3_tcp_profile.side_a_min_bps = 128000 - l3_tcp_profile.side_b_min_bps = 56000 - l3_tcp_profile.prefix = "tcp_" - l3_tcp_profile.report_timer = 1000 - + self.l3_tcp_profile = self.localrealm.new_l3_cx_profile() + self.l3_tcp_profile.side_a_min_bps = 128000 + self.l3_tcp_profile.side_b_min_bps = 56000 + self.l3_tcp_profile.name_prefix = "tcp" + self.l3_tcp_profile.report_timer = 1000 + self.l3_tcp_profile.create(endp_type="lf_tcp", + side_a=list(self.localrealm.find_ports_like("sta+")), + side_b="%d.%s" % (self.resource, self.upstream_port), + suppress_related_commands=True) def start(self): - # print("\nBringing ports up...") - # data = {"shelf": 1, - # "resource": self.resource, - # "port": "ALL", - # "probe_flags": 1} - # self.json_post("/cli-json/nc_show_ports", data) - # LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names) + if not self.station_profile.up: + print("\nBringing ports up...") + data = {"shelf": 1, + "resource": self.resource, + "port": "ALL", + "probe_flags": 1} + self.json_post("/cli-json/nc_show_ports", data) + self.station_profile.admin_up(self.resource) + LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names) # station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl())) duration = 0 @@ -191,37 +197,37 @@ class StaConnect2(LFCliBase): ap = "" print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="") connected_stations = {} - # while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime): - # duration += 3 - # time.sleep(3) - # print(".", end="") - # for sta_name in self.station_names: - # sta_url = self.get_station_url(sta_name) - # station_info = self.json_get(sta_url + "?fields=port,ip,ap") - # - # # LFUtils.debug_printer.pprint(station_info) - # if (station_info is not None) and ("interface" in station_info): - # if "ip" in station_info["interface"]: - # ip = station_info["interface"]["ip"] - # if "ap" in station_info["interface"]: - # ap = station_info["interface"]["ap"] - # - # if (ap == "Not-Associated") or (ap == ""): - # if self.debugOn: - # print(" -%s," % sta_name, end="") - # else: - # if ip == "0.0.0.0": - # if self.debugOn: - # print(" %s (0.0.0.0)" % sta_name, end="") - # else: - # connected_stations[sta_name] = sta_url - # data = { - # "shelf":1, - # "resource": self.resource, - # "port": "ALL", - # "probe_flags": 1 - # } - # self.json_post("/cli-json/nc_show_ports", data) + while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime): + duration += 3 + time.sleep(3) + print(".", end="") + for sta_name in self.station_names: + sta_url = self.get_station_url(sta_name) + station_info = self.json_get(sta_url + "?fields=port,ip,ap") + + # LFUtils.debug_printer.pprint(station_info) + if (station_info is not None) and ("interface" in station_info): + if "ip" in station_info["interface"]: + ip = station_info["interface"]["ip"] + if "ap" in station_info["interface"]: + ap = station_info["interface"]["ap"] + + if (ap == "Not-Associated") or (ap == ""): + if self.debugOn: + print(" -%s," % sta_name, end="") + else: + if ip == "0.0.0.0": + if self.debugOn: + print(" %s (0.0.0.0)" % sta_name, end="") + else: + connected_stations[sta_name] = sta_url + data = { + "shelf":1, + "resource": self.resource, + "port": "ALL", + "probe_flags": 1 + } + self.json_post("/cli-json/nc_show_ports", data) # make a copy of the connected stations for test records @@ -279,61 +285,52 @@ class StaConnect2(LFCliBase): # } # self.json_post("/cli-json/show_cxe", data) + def collect_endp_stats(self, endp_map): + print("Collecting Data") + fields="?fields=name,tx+bytes,rx+bytes" + for (cx_name, endps) in endp_map.items(): + try: + endp_url = "/endp/%s%s" % (endps[0], fields) + endp_json = self.json_get(endp_url) + self.resulting_endpoints[endp_url] = endp_json + ptest_a_tx = endp_json['endpoint']['tx bytes'] + ptest_a_rx = endp_json['endpoint']['rx bytes'] + + #ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"]) + endp_url = "/endp/%s%s" % (endps[1], fields) + endp_json = self.json_get(endp_url) + self.resulting_endpoints[endp_url] = endp_json + + ptest_b_tx = endp_json['endpoint']['tx bytes'] + ptest_b_rx = endp_json['endpoint']['rx bytes'] + + self.compare_vals("testTCP-A TX", ptest_a_tx) + self.compare_vals("testTCP-A RX", ptest_a_rx) + + self.compare_vals("testTCP-B TX", ptest_b_tx) + self.compare_vals("testTCP-B RX", ptest_b_rx) + + except Exception as e: + self.error(e) + + def stop(self): # stop cx traffic print("Stopping CX Traffic") - # for cx_name in cx_names.keys(): - # data = { - # "test_mgr": "ALL", - # "cx_name": cx_name, - # "cx_state": "STOPPED" - # } - # self.json_post("/cli-json/set_cx_state", data) + self.l3_udp_profile.stop_cx() + self.l3_tcp_profile.stop_cx() # Refresh stats print("\nRefresh CX stats") - # for cx_name in cx_names.keys(): - # data = { - # "test_mgr": "ALL", - # "cross_connect": cx_name - # } - # self.json_post("/cli-json/show_cxe", data) + self.l3_udp_profile.refresh_cx() + self.l3_tcp_profile.refresh_cx() - # print("Sleeping for 5 seconds") + print("Sleeping for 5 seconds") time.sleep(5) # get data for endpoints JSON - print("Collecting Data") - # for cx_name in cx_names.keys(): - # try: - # # ?fields=tx+bytes,rx+bytes - # endp_url = "/endp/%s" % cx_names[cx_name]["a"] - # ptest = self.json_get(endp_url) - # self.resulting_endpoints[endp_url] = ptest - # - # ptest_a_tx = ptest['endpoint']['tx bytes'] - # ptest_a_rx = ptest['endpoint']['rx bytes'] - # - # #ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"]) - # endp_url = "/endp/%s" % cx_names[cx_name]["b"] - # ptest = self.json_get(endp_url) - # self.resulting_endpoints[endp_url] = ptest - # - # ptest_b_tx = ptest['endpoint']['tx bytes'] - # ptest_b_rx = ptest['endpoint']['rx bytes'] - # - # self.compare_vals("testTCP-A TX", ptest_a_tx) - # self.compare_vals("testTCP-A RX", ptest_a_rx) - # - # self.compare_vals("testTCP-B TX", ptest_b_tx) - # self.compare_vals("testTCP-B RX", ptest_b_rx) - # - # except Exception as e: - # self.error(e) - - # Examples of what happens when you add test results that do not begin with PASS - # self.test_results.append("Neutral message will fail") - # self.test_results.append("FAILED message will fail") + self.collect_endp_stats(self.l3_udp_profile.created_cx) + self.collect_endp_stats(self.l3_tcp_profile.created_cx) # print("\n") def cleanup(self): @@ -342,10 +339,12 @@ class StaConnect2(LFCliBase): for sta_name in self.station_names: LFUtils.removePort(self.resource, sta_name, self.lfclient_url) endp_names = [] - removeCX(self.lfclient_url, cx_names.keys()) - for cx_name in cx_names: - endp_names.append(cx_names[cx_name]["a"]) - endp_names.append(cx_names[cx_name]["b"]) + + removeCX(self.lfclient_url, self.l3_udp_profile.get_cx_names()) + removeCX(self.lfclient_url, self.l3_tcp_profile.get_cx_names()) + for (cx_name, endp_names) in self.l3_udp_profile.created_cx.items(): + endp_names.append(endp_names[0]) + endp_names.append(endp_names[1]) removeEndps(self.lfclient_url, endp_names) # ~class @@ -373,6 +372,7 @@ Example: parser.add_argument("--sta_mode", type=str, help="LANforge station-mode setting (see add_sta LANforge CLI documentation, default is 0 (auto))") parser.add_argument("--dut_ssid", type=str, help="DUT SSID") + parser.add_argument("--dut_security", type=str, help="DUT security: open, wpa, wpa2, wpa3") parser.add_argument("--dut_passwd", type=str, help="DUT PSK password. Do not set for OPEN auth") parser.add_argument("--dut_bssid", type=str, help="DUT BSSID to which we expect to connect.") @@ -398,16 +398,18 @@ Example: staConnect.radio = args.radio if args.resource is not None: staConnect.resource = args.resource + if args.dut_ssid is not None: + staConnect.dut_ssid = args.dut_ssid if args.dut_passwd is not None: staConnect.dut_passwd = args.dut_passwd if args.dut_bssid is not None: staConnect.dut_bssid = args.dut_bssid - if args.dut_ssid is not None: - staConnect.dut_ssid = args.dut_ssid + if args.dut_security is not None: + staConnect.dut_security = args.dut_security staConnect.setup() staConnect.start() - + print("napping %f sec" % staConnect.runtime_secs) time.sleep(staConnect.runtime_secs) staConnect.stop() run_results = staConnect.get_result_list()