mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-10-29 18:02:35 +00:00
sta_connect.py: WIP: converting all things into loops to handle multiple stations
This commit is contained in:
@@ -21,16 +21,22 @@ import LANforge.lfcli_base
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge.LFUtils import *
|
||||
|
||||
OPEN="open"
|
||||
WEP="wep"
|
||||
WPA="wpa"
|
||||
WPA2="wpa"
|
||||
MODE_AUTO=0
|
||||
|
||||
class StaConnect(LFCliBase):
|
||||
def __init__(self, host, port, _dut_ssid="MyAP", _dut_passwd="NA", _dut_bssid="",
|
||||
_user="", _passwd="", _sta_mode="0", _radio="wiphy0",
|
||||
_resource=1, _upstream_resource=1, _upstream_port="eth2",
|
||||
_sta_name="sta001", _debugOn=False):
|
||||
_sta_name=None, _debugOn=False, _dut_security=OPEN):
|
||||
# do not use `super(LFCLiBase,self).__init__(self, host, port, _debugOn)
|
||||
# that is py2 era syntax and will force self into the host variable, making you
|
||||
# very confused.
|
||||
super().__init__(host, port, _debug=_debugOn, _halt_on_error=False)
|
||||
self.dut_security = ""
|
||||
self.dut_ssid = _dut_ssid
|
||||
self.dut_passwd = _dut_passwd
|
||||
self.dut_bssid = _dut_bssid
|
||||
@@ -41,14 +47,20 @@ class StaConnect(LFCliBase):
|
||||
self.resource = _resource
|
||||
self.upstream_resource = _upstream_resource
|
||||
self.upstream_port = _upstream_port
|
||||
self.sta_name = _sta_name
|
||||
self.sta_url = None # defer construction
|
||||
self.upstream_url = None # defer construction
|
||||
# self.sta_name = _sta_name
|
||||
|
||||
def getStaUrl(self):
|
||||
if self.sta_url is None:
|
||||
self.sta_url = "port/1/%s/%s" % (self.resource, self.sta_name)
|
||||
return self.sta_url
|
||||
self.sta_url_map = None # defer construction
|
||||
self.upstream_url = None # defer construction
|
||||
if _sta_name is not None:
|
||||
self.station_names = [ _sta_name ]
|
||||
|
||||
def get_station_url(self, sta_name_=None):
|
||||
if sta_name_ is None:
|
||||
raise ValueError("get_station_url wants a station name")
|
||||
if self.sta_url_map is None:
|
||||
for sta_name in self.station_names:
|
||||
self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, self.sta_name)
|
||||
return self.sta_url_map[sta_name_]
|
||||
|
||||
def getUpstreamUrl(self):
|
||||
if self.upstream_url is None:
|
||||
@@ -63,6 +75,10 @@ class StaConnect(LFCliBase):
|
||||
else:
|
||||
self._fail("%s did not report traffic: %s" % (name, postVal), print_fail)
|
||||
|
||||
def remove_stations(self):
|
||||
for name in self.station_names:
|
||||
LFUtils.removePort(self.resource, name, self.mgr_url)
|
||||
|
||||
def run(self):
|
||||
self.clear_test_results()
|
||||
self.check_connect()
|
||||
@@ -85,35 +101,40 @@ class StaConnect(LFCliBase):
|
||||
# Create stations and turn dhcp on
|
||||
print("Creating station %s and turning on dhcp..." % self.sta_name)
|
||||
flags = 0x10000
|
||||
if "" != self.dut_passwd:
|
||||
if self.dut_security == WPA2:
|
||||
flags += 0x400
|
||||
data = {
|
||||
elif self.dut_security == OPEN:
|
||||
pass
|
||||
|
||||
add_sta_data = {
|
||||
"shelf": 1,
|
||||
"resource": self.resource,
|
||||
"radio": self.radio,
|
||||
"sta_name": self.sta_name,
|
||||
"ssid": self.dut_ssid,
|
||||
"key": self.dut_passwd,
|
||||
"mode": self.sta_mode,
|
||||
"mac": "xx:xx:xx:xx:*:xx",
|
||||
"flags": flags # verbose, wpa2
|
||||
}
|
||||
print("Adding new station %s " % self.sta_name)
|
||||
self.json_post("/cli-json/add_sta", data)
|
||||
for sta_name in self.station_names:
|
||||
add_sta_data["sta_name"] = sta_name;
|
||||
print("Adding new station %s " % self.sta_name)
|
||||
self.json_post("/cli-json/add_sta", add_sta_data)
|
||||
|
||||
data = {
|
||||
set_port_data = {
|
||||
"shelf": 1,
|
||||
"resource": self.resource,
|
||||
"port": self.sta_name,
|
||||
"current_flags": 0x80000000, # use DHCP, not down
|
||||
"interest": 0x4002 # set dhcp, current flags
|
||||
}
|
||||
print("Configuring %s..." % self.sta_name)
|
||||
self.json_post("/cli-json/set_port", data)
|
||||
for sta_name in self.station_names:
|
||||
set_port_data["port"] = sta_name
|
||||
print("Configuring %s..." % sta_name)
|
||||
self.json_post("/cli-json/set_port", set_port_data)
|
||||
|
||||
data = {"shelf": 1,
|
||||
"resource": self.resource,
|
||||
"port": self.sta_name,
|
||||
"port": "ALL",
|
||||
"probe_flags": 1}
|
||||
self.json_post("/cli-json/nc_show_ports", data)
|
||||
LFUtils.waitUntilPortsAdminUp(self.resource, self.mgr_url, [self.sta_name])
|
||||
@@ -124,151 +145,173 @@ class StaConnect(LFCliBase):
|
||||
ip = "0.0.0.0"
|
||||
ap = ""
|
||||
print("Waiting for %s associate to AP [%s]..." % (self.sta_name, ap))
|
||||
while (ip == "0.0.0.0") and (duration < maxTime):
|
||||
|
||||
connected_stations = []
|
||||
while (len(connected_stations) < len(self.station_names)) and (duration < maxTime):
|
||||
duration += 2
|
||||
time.sleep(2)
|
||||
station_info = self.json_get(self.getStaUrl() + "?fields=port,ip,ap")
|
||||
for sta_name in self.station_names:
|
||||
sta_url = self.get_station_urls(sta_name)
|
||||
station_info = self.json_get(sta_url + "?fields=port,ip,ap")
|
||||
|
||||
# LFUtils.debug_printer.pprint(station_info)
|
||||
if (station_info is not None) and ("interface" in station_info):
|
||||
if "ip" in station_info["interface"]:
|
||||
ip = station_info["interface"]["ip"]
|
||||
if "ap" in station_info["interface"]:
|
||||
ap = station_info["interface"]["ap"]
|
||||
# LFUtils.debug_printer.pprint(station_info)
|
||||
if (station_info is not None) and ("interface" in station_info):
|
||||
if "ip" in station_info["interface"]:
|
||||
ip = station_info["interface"]["ip"]
|
||||
if "ap" in station_info["interface"]:
|
||||
ap = station_info["interface"]["ap"]
|
||||
|
||||
if (ap == "Not-Associated") or (ap == ""):
|
||||
print("Waiting for %s associate to AP [%s]..." % (self.sta_name, ap))
|
||||
else:
|
||||
if ip == "0.0.0.0":
|
||||
print("Waiting for %s to gain IP ..." % self.sta_name)
|
||||
|
||||
if (ap != "") and (ap != "Not-Associated"):
|
||||
print("Connected to AP: "+ap)
|
||||
if self.dut_bssid != "":
|
||||
if self.dut_bssid.lower() == ap.lower():
|
||||
self._pass("Connected to BSSID: " + ap)
|
||||
# self.test_results.append("PASSED: )
|
||||
# print("PASSED: Connected to BSSID: "+ap)
|
||||
if (ap == "Not-Associated") or (ap == ""):
|
||||
if self.debugOn:
|
||||
print("Waiting for %s associate to AP [%s]..." % (self.sta_name, ap))
|
||||
else:
|
||||
self._fail("Connected to wrong BSSID, requested: %s Actual: %s" % (self.dut_bssid, ap))
|
||||
else:
|
||||
self._fail("Did not connect to AP")
|
||||
return False
|
||||
if ip == "0.0.0.0":
|
||||
if self.debugOn:
|
||||
print("Waiting for %s to gain IP ..." % self.sta_name)
|
||||
else:
|
||||
connected_stations.append(sta_url)
|
||||
|
||||
if ip == "0.0.0.0":
|
||||
self._fail("%s did not get an ip. Ending test" % self.sta_name)
|
||||
for sta_name in self.station_names:
|
||||
sta_url = self.get_station_urls(sta_name)
|
||||
station_info = self.json_get(sta_url + "?fields=port,ip,ap")
|
||||
ap = station_info["interface"]["ap"]
|
||||
ip = station_info["interface"]["ip"]
|
||||
if (ap != "") and (ap != "Not-Associated"):
|
||||
print("Connected to AP: "+ap)
|
||||
if self.dut_bssid != "":
|
||||
if self.dut_bssid.lower() == ap.lower():
|
||||
self._pass(sta_name+" connected to BSSID: " + ap)
|
||||
# self.test_results.append("PASSED: )
|
||||
# print("PASSED: Connected to BSSID: "+ap)
|
||||
else:
|
||||
self._fail(sta_name+" connected to wrong BSSID, requested: %s Actual: %s" % (self.dut_bssid, ap))
|
||||
else:
|
||||
self._fail(sta_name+" did not connect to AP")
|
||||
return False
|
||||
|
||||
if ip == "0.0.0.0":
|
||||
self._fail("%s did not get an ip. Ending test" % sta_name)
|
||||
else:
|
||||
self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip))
|
||||
|
||||
if self.passes() == False:
|
||||
print("Cleaning up...")
|
||||
removePort(self.resource, self.sta_name, self.mgr_url)
|
||||
self.remove_stations()
|
||||
return False
|
||||
else:
|
||||
self._pass("Connected to AP: %s With IP: %s" % (ap, ip))
|
||||
|
||||
# create endpoints and cxs
|
||||
# Create UDP endpoints
|
||||
data = {
|
||||
"alias": "testUDP-A",
|
||||
"shelf": 1,
|
||||
"resource": self.resource,
|
||||
"port": self.sta_name,
|
||||
"type": "lf_udp",
|
||||
"ip_port": "-1",
|
||||
"min_rate": 1000000
|
||||
}
|
||||
self.json_post("/cli-json/add_endp", data)
|
||||
cx_names = {}
|
||||
|
||||
data = {
|
||||
"alias": "testUDP-B",
|
||||
"shelf": 1,
|
||||
"resource": self.upstream_resource,
|
||||
"port": self.upstream_port,
|
||||
"type": "lf_udp",
|
||||
"ip_port": "-1",
|
||||
"min_rate": 1000000
|
||||
}
|
||||
self.json_post("/cli-json/add_endp", data)
|
||||
for sta_name in self.station_names:
|
||||
cx_names["testUDP-"+sta_name] = { "a": "testUDP-%s-A" % sta_name,
|
||||
"b": "testUDP-%s-B" % sta_name}
|
||||
data = {
|
||||
"alias": "testUDP-%s-A" % sta_name,
|
||||
"shelf": 1,
|
||||
"resource": self.resource,
|
||||
"port": sta_name,
|
||||
"type": "lf_udp",
|
||||
"ip_port": "-1",
|
||||
"min_rate": 1000000
|
||||
}
|
||||
self.json_post("/cli-json/add_endp", data)
|
||||
|
||||
# Create CX
|
||||
data = {
|
||||
"alias": "testUDP",
|
||||
"test_mgr": "default_tm",
|
||||
"tx_endp": "testUDP-A",
|
||||
"rx_endp": "testUDP-B",
|
||||
}
|
||||
self.json_post("/cli-json/add_cx", data)
|
||||
data = {
|
||||
"alias": "testUDP-%s-B" % sta_name,
|
||||
"shelf": 1,
|
||||
"resource": self.upstream_resource,
|
||||
"port": self.upstream_port,
|
||||
"type": "lf_udp",
|
||||
"ip_port": "-1",
|
||||
"min_rate": 1000000
|
||||
}
|
||||
self.json_post("/cli-json/add_endp", data)
|
||||
|
||||
# Create TCP endpoints
|
||||
data = {
|
||||
"alias": "testTCP-A",
|
||||
"shelf": 1,
|
||||
"resource": self.resource,
|
||||
"port": self.sta_name,
|
||||
"type": "lf_tcp",
|
||||
"ip_port": "0",
|
||||
"min_rate": 1000000
|
||||
}
|
||||
self.json_post("/cli-json/add_endp", data)
|
||||
# Create CX
|
||||
data = {
|
||||
"alias": "testUDP-%" % sta_name,
|
||||
"test_mgr": "default_tm",
|
||||
"tx_endp": "testUDP-%s-A" % sta_name,
|
||||
"rx_endp": "testUDP-%s-B" % sta_name,
|
||||
}
|
||||
self.json_post("/cli-json/add_cx", data)
|
||||
|
||||
data = {
|
||||
"alias": "testTCP-B",
|
||||
"shelf": 1,
|
||||
"resource": self.upstream_resource,
|
||||
"port": self.upstream_port,
|
||||
"type": "lf_tcp",
|
||||
"ip_port": "-1",
|
||||
"min_rate": 1000000
|
||||
}
|
||||
self.json_post("/cli-json/add_endp", data)
|
||||
# Create TCP endpoints
|
||||
cx_names["testTCP-"+sta_name] = { "a": "testUDP-%s-A" % sta_name,
|
||||
"b": "testUDP-%s-B" % sta_name}
|
||||
data = {
|
||||
"alias": "testTCP-%s-A" % sta_name,
|
||||
"shelf": 1,
|
||||
"resource": self.resource,
|
||||
"port": sta_name,
|
||||
"type": "lf_tcp",
|
||||
"ip_port": "0",
|
||||
"min_rate": 1000000
|
||||
}
|
||||
self.json_post("/cli-json/add_endp", data)
|
||||
|
||||
# Create CX
|
||||
data = {
|
||||
"alias": "testTCP",
|
||||
"test_mgr": "default_tm",
|
||||
"tx_endp": "testTCP-A",
|
||||
"rx_endp": "testTCP-B",
|
||||
}
|
||||
self.json_post("/cli-json/add_cx", data)
|
||||
data = {
|
||||
"alias": "testTCP-%s-B" % sta_name,
|
||||
"shelf": 1,
|
||||
"resource": self.upstream_resource,
|
||||
"port": self.upstream_port,
|
||||
"type": "lf_tcp",
|
||||
"ip_port": "-1",
|
||||
"min_rate": 1000000
|
||||
}
|
||||
self.json_post("/cli-json/add_endp", data)
|
||||
|
||||
cxNames = ["testTCP", "testUDP"]
|
||||
endpNames = ["testTCP-A", "testTCP-B",
|
||||
"testUDP-A", "testUDP-B"]
|
||||
# Create CX
|
||||
data = {
|
||||
"alias": "testTCP-%s" % sta_name,
|
||||
"test_mgr": "default_tm",
|
||||
"tx_endp": "testTCP-A" % sta_name,
|
||||
"rx_endp": "testTCP-B" % sta_name,
|
||||
}
|
||||
self.json_post("/cli-json/add_cx", data)
|
||||
|
||||
#cxNames = ["testTCP", "testUDP"]
|
||||
#endpNames = ["testTCP-A", "testTCP-B", "testUDP-A", "testUDP-B"]
|
||||
|
||||
# start cx traffic
|
||||
print("\nStarting CX Traffic")
|
||||
for name in range(len(cxNames)):
|
||||
for cx_name in cx_names.keys():
|
||||
data = {
|
||||
"test_mgr": "ALL",
|
||||
"cx_name": cxNames[name],
|
||||
"cx_name": cx_name,
|
||||
"cx_state": "RUNNING"
|
||||
}
|
||||
self.json_post("/cli-json/set_cx_state", data)
|
||||
|
||||
# Refresh stats
|
||||
print("\nRefresh CX stats")
|
||||
for name in range(len(cxNames)):
|
||||
|
||||
print("Refresh CX stats")
|
||||
for cx_name in cx_names.keys():
|
||||
data = {
|
||||
"test_mgr": "ALL",
|
||||
"cross_connect": cxNames[name]
|
||||
"cross_connect": cx_name
|
||||
}
|
||||
self.json_post("/cli-json/show_cxe", data)
|
||||
|
||||
time.sleep(15)
|
||||
|
||||
# stop cx traffic
|
||||
print("\nStopping CX Traffic")
|
||||
for name in range(len(cxNames)):
|
||||
print("Stopping CX Traffic")
|
||||
for cx_name in cx_names.keys():
|
||||
data = {
|
||||
"test_mgr": "ALL",
|
||||
"cx_name": cxNames[name],
|
||||
"cx_name": cx_name,
|
||||
"cx_state": "STOPPED"
|
||||
}
|
||||
self.json_post("/cli-json/set_cx_state", data)
|
||||
|
||||
# Refresh stats
|
||||
print("\nRefresh CX stats")
|
||||
for name in range(len(cxNames)):
|
||||
for cx_name in cx_names.keys():
|
||||
data = {
|
||||
"test_mgr": "ALL",
|
||||
"cross_connect": cxNames[name]
|
||||
"cross_connect": cx_name
|
||||
}
|
||||
self.json_post("/cli-json/show_cxe", data)
|
||||
|
||||
@@ -277,57 +320,40 @@ class StaConnect(LFCliBase):
|
||||
|
||||
# get data for endpoints JSON
|
||||
print("Collecting Data")
|
||||
try:
|
||||
ptestTCPA = self.json_get("/endp/testTCP-A?fields=tx+bytes,rx+bytes")
|
||||
ptestTCPATX = ptestTCPA['endpoint']['tx bytes']
|
||||
ptestTCPARX = ptestTCPA['endpoint']['rx bytes']
|
||||
for cx_name in cx_names.keys():
|
||||
|
||||
ptestTCPB = self.json_get("/endp/testTCP-B?fields=tx+bytes,rx+bytes")
|
||||
ptestTCPBTX = ptestTCPB['endpoint']['tx bytes']
|
||||
ptestTCPBRX = ptestTCPB['endpoint']['rx bytes']
|
||||
try:
|
||||
ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["a"])
|
||||
ptest_a_tx = ptest['endpoint']['tx bytes']
|
||||
ptest_a_rx = ptest['endpoint']['rx bytes']
|
||||
|
||||
ptestUDPA = self.json_get("/endp/testUDP-A?fields=tx+bytes,rx+bytes")
|
||||
ptestUDPATX = ptestUDPA['endpoint']['tx bytes']
|
||||
ptestUDPARX = ptestUDPA['endpoint']['rx bytes']
|
||||
ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"])
|
||||
ptest_b_tx = ptest['endpoint']['tx bytes']
|
||||
ptest_b_rx = ptest['endpoint']['rx bytes']
|
||||
|
||||
ptestUDPB = self.json_get("/endp/testUDP-B?fields=tx+bytes,rx+bytes")
|
||||
ptestUDPBTX = ptestUDPB['endpoint']['tx bytes']
|
||||
ptestUDPBRX = ptestUDPB['endpoint']['rx bytes']
|
||||
except Exception as e:
|
||||
self.error(e)
|
||||
print("Cleaning up...")
|
||||
data = {
|
||||
"shelf": 1,
|
||||
"resource": self.resource,
|
||||
"port": self.sta_name
|
||||
}
|
||||
self.json_post("/cli-json/rm_vlan", data)
|
||||
removeCX(self.mgr_url, cxNames)
|
||||
removeEndps(self.mgr_url, endpNames)
|
||||
return False
|
||||
self.compareVals("testTCP-A TX", ptest_a_tx)
|
||||
self.compareVals("testTCP-A RX", ptest_a_rx)
|
||||
|
||||
self.compareVals("testTCP-B TX", ptest_b_tx)
|
||||
self.compareVals("testTCP-B RX", ptest_b_rx)
|
||||
|
||||
except Exception as e:
|
||||
self.error(e)
|
||||
|
||||
# print("\n")
|
||||
# self.test_results.append("Neutral message will fail")
|
||||
# self.test_results.append("FAILED message will fail")
|
||||
|
||||
self.compareVals("testTCP-A TX", ptestTCPATX)
|
||||
self.compareVals("testTCP-A RX", ptestTCPARX)
|
||||
|
||||
self.compareVals("testTCP-B TX", ptestTCPBTX)
|
||||
self.compareVals("testTCP-B RX", ptestTCPBRX)
|
||||
|
||||
self.compareVals("testUDP-A TX", ptestUDPATX)
|
||||
self.compareVals("testUDP-A RX", ptestUDPARX)
|
||||
|
||||
self.compareVals("testUDP-B TX", ptestUDPBTX)
|
||||
self.compareVals("testUDP-B RX", ptestUDPBRX)
|
||||
# print("\n")
|
||||
|
||||
# remove all endpoints and cxs
|
||||
LFUtils.removePort(self.resource, self.sta_name, self.mgr_url)
|
||||
|
||||
removeCX(self.mgr_url, cxNames)
|
||||
removeEndps(self.mgr_url, endpNames)
|
||||
endp_names = []
|
||||
removeCX(self.mgr_url, cx_names.keys())
|
||||
for cx_name in cx_names:
|
||||
endp_names.append(cx_names[cx_name]["a"])
|
||||
endp_names.append(cx_names[cx_name]["b"])
|
||||
removeEndps(self.mgr_url, endp_names)
|
||||
|
||||
# ~class
|
||||
|
||||
|
||||
Reference in New Issue
Block a user