diff --git a/py-scripts/sta_connect.py b/py-scripts/sta_connect.py index 05ca57b4..ddf54d77 100755 --- a/py-scripts/sta_connect.py +++ b/py-scripts/sta_connect.py @@ -21,16 +21,22 @@ import LANforge.lfcli_base from LANforge.lfcli_base import LFCliBase from LANforge.LFUtils import * +OPEN="open" +WEP="wep" +WPA="wpa" +WPA2="wpa" +MODE_AUTO=0 class StaConnect(LFCliBase): def __init__(self, host, port, _dut_ssid="MyAP", _dut_passwd="NA", _dut_bssid="", _user="", _passwd="", _sta_mode="0", _radio="wiphy0", _resource=1, _upstream_resource=1, _upstream_port="eth2", - _sta_name="sta001", _debugOn=False): + _sta_name=None, _debugOn=False, _dut_security=OPEN): # do not use `super(LFCLiBase,self).__init__(self, host, port, _debugOn) # that is py2 era syntax and will force self into the host variable, making you # very confused. super().__init__(host, port, _debug=_debugOn, _halt_on_error=False) + self.dut_security = "" self.dut_ssid = _dut_ssid self.dut_passwd = _dut_passwd self.dut_bssid = _dut_bssid @@ -41,14 +47,20 @@ class StaConnect(LFCliBase): self.resource = _resource self.upstream_resource = _upstream_resource self.upstream_port = _upstream_port - self.sta_name = _sta_name - self.sta_url = None # defer construction - self.upstream_url = None # defer construction + # self.sta_name = _sta_name - def getStaUrl(self): - if self.sta_url is None: - self.sta_url = "port/1/%s/%s" % (self.resource, self.sta_name) - return self.sta_url + self.sta_url_map = None # defer construction + self.upstream_url = None # defer construction + if _sta_name is not None: + self.station_names = [ _sta_name ] + + def get_station_url(self, sta_name_=None): + if sta_name_ is None: + raise ValueError("get_station_url wants a station name") + if self.sta_url_map is None: + for sta_name in self.station_names: + self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, self.sta_name) + return self.sta_url_map[sta_name_] def getUpstreamUrl(self): if self.upstream_url is None: @@ -63,6 +75,10 @@ class StaConnect(LFCliBase): else: self._fail("%s did not report traffic: %s" % (name, postVal), print_fail) + def remove_stations(self): + for name in self.station_names: + LFUtils.removePort(self.resource, name, self.mgr_url) + def run(self): self.clear_test_results() self.check_connect() @@ -85,35 +101,40 @@ class StaConnect(LFCliBase): # Create stations and turn dhcp on print("Creating station %s and turning on dhcp..." % self.sta_name) flags = 0x10000 - if "" != self.dut_passwd: + if self.dut_security == WPA2: flags += 0x400 - data = { + elif self.dut_security == OPEN: + pass + + add_sta_data = { "shelf": 1, "resource": self.resource, "radio": self.radio, - "sta_name": self.sta_name, "ssid": self.dut_ssid, "key": self.dut_passwd, "mode": self.sta_mode, "mac": "xx:xx:xx:xx:*:xx", "flags": flags # verbose, wpa2 } - print("Adding new station %s " % self.sta_name) - self.json_post("/cli-json/add_sta", data) + for sta_name in self.station_names: + add_sta_data["sta_name"] = sta_name; + print("Adding new station %s " % self.sta_name) + self.json_post("/cli-json/add_sta", add_sta_data) - data = { + set_port_data = { "shelf": 1, "resource": self.resource, - "port": self.sta_name, "current_flags": 0x80000000, # use DHCP, not down "interest": 0x4002 # set dhcp, current flags } - print("Configuring %s..." % self.sta_name) - self.json_post("/cli-json/set_port", data) + for sta_name in self.station_names: + set_port_data["port"] = sta_name + print("Configuring %s..." % sta_name) + self.json_post("/cli-json/set_port", set_port_data) data = {"shelf": 1, "resource": self.resource, - "port": self.sta_name, + "port": "ALL", "probe_flags": 1} self.json_post("/cli-json/nc_show_ports", data) LFUtils.waitUntilPortsAdminUp(self.resource, self.mgr_url, [self.sta_name]) @@ -124,151 +145,173 @@ class StaConnect(LFCliBase): ip = "0.0.0.0" ap = "" print("Waiting for %s associate to AP [%s]..." % (self.sta_name, ap)) - while (ip == "0.0.0.0") and (duration < maxTime): + + connected_stations = [] + while (len(connected_stations) < len(self.station_names)) and (duration < maxTime): duration += 2 time.sleep(2) - station_info = self.json_get(self.getStaUrl() + "?fields=port,ip,ap") + for sta_name in self.station_names: + sta_url = self.get_station_urls(sta_name) + station_info = self.json_get(sta_url + "?fields=port,ip,ap") - # LFUtils.debug_printer.pprint(station_info) - if (station_info is not None) and ("interface" in station_info): - if "ip" in station_info["interface"]: - ip = station_info["interface"]["ip"] - if "ap" in station_info["interface"]: - ap = station_info["interface"]["ap"] + # LFUtils.debug_printer.pprint(station_info) + if (station_info is not None) and ("interface" in station_info): + if "ip" in station_info["interface"]: + ip = station_info["interface"]["ip"] + if "ap" in station_info["interface"]: + ap = station_info["interface"]["ap"] - if (ap == "Not-Associated") or (ap == ""): - print("Waiting for %s associate to AP [%s]..." % (self.sta_name, ap)) - else: - if ip == "0.0.0.0": - print("Waiting for %s to gain IP ..." % self.sta_name) - - if (ap != "") and (ap != "Not-Associated"): - print("Connected to AP: "+ap) - if self.dut_bssid != "": - if self.dut_bssid.lower() == ap.lower(): - self._pass("Connected to BSSID: " + ap) - # self.test_results.append("PASSED: ) - # print("PASSED: Connected to BSSID: "+ap) + if (ap == "Not-Associated") or (ap == ""): + if self.debugOn: + print("Waiting for %s associate to AP [%s]..." % (self.sta_name, ap)) else: - self._fail("Connected to wrong BSSID, requested: %s Actual: %s" % (self.dut_bssid, ap)) - else: - self._fail("Did not connect to AP") - return False + if ip == "0.0.0.0": + if self.debugOn: + print("Waiting for %s to gain IP ..." % self.sta_name) + else: + connected_stations.append(sta_url) - if ip == "0.0.0.0": - self._fail("%s did not get an ip. Ending test" % self.sta_name) + for sta_name in self.station_names: + sta_url = self.get_station_urls(sta_name) + station_info = self.json_get(sta_url + "?fields=port,ip,ap") + ap = station_info["interface"]["ap"] + ip = station_info["interface"]["ip"] + if (ap != "") and (ap != "Not-Associated"): + print("Connected to AP: "+ap) + if self.dut_bssid != "": + if self.dut_bssid.lower() == ap.lower(): + self._pass(sta_name+" connected to BSSID: " + ap) + # self.test_results.append("PASSED: ) + # print("PASSED: Connected to BSSID: "+ap) + else: + self._fail(sta_name+" connected to wrong BSSID, requested: %s Actual: %s" % (self.dut_bssid, ap)) + else: + self._fail(sta_name+" did not connect to AP") + return False + + if ip == "0.0.0.0": + self._fail("%s did not get an ip. Ending test" % sta_name) + else: + self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip)) + + if self.passes() == False: print("Cleaning up...") - removePort(self.resource, self.sta_name, self.mgr_url) + self.remove_stations() return False - else: - self._pass("Connected to AP: %s With IP: %s" % (ap, ip)) # create endpoints and cxs # Create UDP endpoints - data = { - "alias": "testUDP-A", - "shelf": 1, - "resource": self.resource, - "port": self.sta_name, - "type": "lf_udp", - "ip_port": "-1", - "min_rate": 1000000 - } - self.json_post("/cli-json/add_endp", data) + cx_names = {} - data = { - "alias": "testUDP-B", - "shelf": 1, - "resource": self.upstream_resource, - "port": self.upstream_port, - "type": "lf_udp", - "ip_port": "-1", - "min_rate": 1000000 - } - self.json_post("/cli-json/add_endp", data) + for sta_name in self.station_names: + cx_names["testUDP-"+sta_name] = { "a": "testUDP-%s-A" % sta_name, + "b": "testUDP-%s-B" % sta_name} + data = { + "alias": "testUDP-%s-A" % sta_name, + "shelf": 1, + "resource": self.resource, + "port": sta_name, + "type": "lf_udp", + "ip_port": "-1", + "min_rate": 1000000 + } + self.json_post("/cli-json/add_endp", data) - # Create CX - data = { - "alias": "testUDP", - "test_mgr": "default_tm", - "tx_endp": "testUDP-A", - "rx_endp": "testUDP-B", - } - self.json_post("/cli-json/add_cx", data) + data = { + "alias": "testUDP-%s-B" % sta_name, + "shelf": 1, + "resource": self.upstream_resource, + "port": self.upstream_port, + "type": "lf_udp", + "ip_port": "-1", + "min_rate": 1000000 + } + self.json_post("/cli-json/add_endp", data) - # Create TCP endpoints - data = { - "alias": "testTCP-A", - "shelf": 1, - "resource": self.resource, - "port": self.sta_name, - "type": "lf_tcp", - "ip_port": "0", - "min_rate": 1000000 - } - self.json_post("/cli-json/add_endp", data) + # Create CX + data = { + "alias": "testUDP-%" % sta_name, + "test_mgr": "default_tm", + "tx_endp": "testUDP-%s-A" % sta_name, + "rx_endp": "testUDP-%s-B" % sta_name, + } + self.json_post("/cli-json/add_cx", data) - data = { - "alias": "testTCP-B", - "shelf": 1, - "resource": self.upstream_resource, - "port": self.upstream_port, - "type": "lf_tcp", - "ip_port": "-1", - "min_rate": 1000000 - } - self.json_post("/cli-json/add_endp", data) + # Create TCP endpoints + cx_names["testTCP-"+sta_name] = { "a": "testUDP-%s-A" % sta_name, + "b": "testUDP-%s-B" % sta_name} + data = { + "alias": "testTCP-%s-A" % sta_name, + "shelf": 1, + "resource": self.resource, + "port": sta_name, + "type": "lf_tcp", + "ip_port": "0", + "min_rate": 1000000 + } + self.json_post("/cli-json/add_endp", data) - # Create CX - data = { - "alias": "testTCP", - "test_mgr": "default_tm", - "tx_endp": "testTCP-A", - "rx_endp": "testTCP-B", - } - self.json_post("/cli-json/add_cx", data) + data = { + "alias": "testTCP-%s-B" % sta_name, + "shelf": 1, + "resource": self.upstream_resource, + "port": self.upstream_port, + "type": "lf_tcp", + "ip_port": "-1", + "min_rate": 1000000 + } + self.json_post("/cli-json/add_endp", data) - cxNames = ["testTCP", "testUDP"] - endpNames = ["testTCP-A", "testTCP-B", - "testUDP-A", "testUDP-B"] + # Create CX + data = { + "alias": "testTCP-%s" % sta_name, + "test_mgr": "default_tm", + "tx_endp": "testTCP-A" % sta_name, + "rx_endp": "testTCP-B" % sta_name, + } + self.json_post("/cli-json/add_cx", data) + + #cxNames = ["testTCP", "testUDP"] + #endpNames = ["testTCP-A", "testTCP-B", "testUDP-A", "testUDP-B"] # start cx traffic print("\nStarting CX Traffic") - for name in range(len(cxNames)): + for cx_name in cx_names.keys(): data = { "test_mgr": "ALL", - "cx_name": cxNames[name], + "cx_name": cx_name, "cx_state": "RUNNING" } self.json_post("/cli-json/set_cx_state", data) # Refresh stats - print("\nRefresh CX stats") - for name in range(len(cxNames)): + + print("Refresh CX stats") + for cx_name in cx_names.keys(): data = { "test_mgr": "ALL", - "cross_connect": cxNames[name] + "cross_connect": cx_name } self.json_post("/cli-json/show_cxe", data) time.sleep(15) # stop cx traffic - print("\nStopping CX Traffic") - for name in range(len(cxNames)): + print("Stopping CX Traffic") + for cx_name in cx_names.keys(): data = { "test_mgr": "ALL", - "cx_name": cxNames[name], + "cx_name": cx_name, "cx_state": "STOPPED" } self.json_post("/cli-json/set_cx_state", data) # Refresh stats print("\nRefresh CX stats") - for name in range(len(cxNames)): + for cx_name in cx_names.keys(): data = { "test_mgr": "ALL", - "cross_connect": cxNames[name] + "cross_connect": cx_name } self.json_post("/cli-json/show_cxe", data) @@ -277,57 +320,40 @@ class StaConnect(LFCliBase): # get data for endpoints JSON print("Collecting Data") - try: - ptestTCPA = self.json_get("/endp/testTCP-A?fields=tx+bytes,rx+bytes") - ptestTCPATX = ptestTCPA['endpoint']['tx bytes'] - ptestTCPARX = ptestTCPA['endpoint']['rx bytes'] + for cx_name in cx_names.keys(): - ptestTCPB = self.json_get("/endp/testTCP-B?fields=tx+bytes,rx+bytes") - ptestTCPBTX = ptestTCPB['endpoint']['tx bytes'] - ptestTCPBRX = ptestTCPB['endpoint']['rx bytes'] + try: + ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["a"]) + ptest_a_tx = ptest['endpoint']['tx bytes'] + ptest_a_rx = ptest['endpoint']['rx bytes'] - ptestUDPA = self.json_get("/endp/testUDP-A?fields=tx+bytes,rx+bytes") - ptestUDPATX = ptestUDPA['endpoint']['tx bytes'] - ptestUDPARX = ptestUDPA['endpoint']['rx bytes'] + ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"]) + ptest_b_tx = ptest['endpoint']['tx bytes'] + ptest_b_rx = ptest['endpoint']['rx bytes'] - ptestUDPB = self.json_get("/endp/testUDP-B?fields=tx+bytes,rx+bytes") - ptestUDPBTX = ptestUDPB['endpoint']['tx bytes'] - ptestUDPBRX = ptestUDPB['endpoint']['rx bytes'] - except Exception as e: - self.error(e) - print("Cleaning up...") - data = { - "shelf": 1, - "resource": self.resource, - "port": self.sta_name - } - self.json_post("/cli-json/rm_vlan", data) - removeCX(self.mgr_url, cxNames) - removeEndps(self.mgr_url, endpNames) - return False + self.compareVals("testTCP-A TX", ptest_a_tx) + self.compareVals("testTCP-A RX", ptest_a_rx) + + self.compareVals("testTCP-B TX", ptest_b_tx) + self.compareVals("testTCP-B RX", ptest_b_rx) + + except Exception as e: + self.error(e) # print("\n") # self.test_results.append("Neutral message will fail") # self.test_results.append("FAILED message will fail") - self.compareVals("testTCP-A TX", ptestTCPATX) - self.compareVals("testTCP-A RX", ptestTCPARX) - - self.compareVals("testTCP-B TX", ptestTCPBTX) - self.compareVals("testTCP-B RX", ptestTCPBRX) - - self.compareVals("testUDP-A TX", ptestUDPATX) - self.compareVals("testUDP-A RX", ptestUDPARX) - - self.compareVals("testUDP-B TX", ptestUDPBTX) - self.compareVals("testUDP-B RX", ptestUDPBRX) # print("\n") # remove all endpoints and cxs LFUtils.removePort(self.resource, self.sta_name, self.mgr_url) - - removeCX(self.mgr_url, cxNames) - removeEndps(self.mgr_url, endpNames) + endp_names = [] + removeCX(self.mgr_url, cx_names.keys()) + for cx_name in cx_names: + endp_names.append(cx_names[cx_name]["a"]) + endp_names.append(cx_names[cx_name]["b"]) + removeEndps(self.mgr_url, endp_names) # ~class