mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-02 19:58:03 +00:00
sta_connect: refactoring leading to a base class LFCliBase and class structure for sta_connect
This commit is contained in:
32
py-json/LANforge/LFCliBase.py
Normal file
32
py-json/LANforge/LFCliBase.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
#!env /usr/bin/python
|
||||||
|
|
||||||
|
# Extend this class to use common set of debug and request features for your script
|
||||||
|
|
||||||
|
from LANforge.LFUtils import *
|
||||||
|
|
||||||
|
|
||||||
|
class LFCliBase:
|
||||||
|
def __init__(self, lfjson_host="localhost", lfjson_port=8080, _debug=False):
|
||||||
|
self.debugOn = _debug
|
||||||
|
self.lfjson_host = lfjson_host
|
||||||
|
self.lfjson_port = lfjson_port
|
||||||
|
self.mgr_url = f"http://{self.lfjson_host}:{self.lfjson_port}/"
|
||||||
|
|
||||||
|
def jsonPost(self, _req_url, _data):
|
||||||
|
lf_r = LFRequest.LFRequest(self.mgr_url + _req_url)
|
||||||
|
_data['suppress_preexec_cli'] = True
|
||||||
|
_data['suppress_preexec_method'] = True
|
||||||
|
lf_r.addPostData(_data)
|
||||||
|
json_response = lf_r.jsonPost(self.debugOn)
|
||||||
|
# Debugging
|
||||||
|
# if (json_response != None):
|
||||||
|
# print("jsonReq: response: ")
|
||||||
|
# LFUtils.debug_printer.pprint(vars(json_response))
|
||||||
|
return json_response
|
||||||
|
|
||||||
|
def jsonGet(self, _req_url):
|
||||||
|
lf_r = LFRequest.LFRequest(self.mgr_url + _req_url)
|
||||||
|
json_response = lf_r.getAsJson(self.debugOn)
|
||||||
|
return json_response
|
||||||
|
|
||||||
|
# ~class
|
||||||
@@ -5,412 +5,418 @@
|
|||||||
# to the requested BSSID if bssid is specified as an argument.
|
# to the requested BSSID if bssid is specified as an argument.
|
||||||
# The script will clean up the station and connections at the end of the test.
|
# The script will clean up the station and connections at the end of the test.
|
||||||
|
|
||||||
import os
|
|
||||||
import time
|
|
||||||
import sys
|
import sys
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
if 'py-json' not in sys.path:
|
if 'py-json' not in sys.path:
|
||||||
sys.path.append('../py-json')
|
sys.path.append('../py-json')
|
||||||
|
|
||||||
import subprocess
|
# from LANforge import LFRequest
|
||||||
import json
|
|
||||||
import pprint
|
|
||||||
from LANforge import LFRequest
|
|
||||||
from LANforge import LFUtils
|
from LANforge import LFUtils
|
||||||
|
# from LANforge import LFCliBase
|
||||||
|
from LANforge.LFCliBase import LFCliBase
|
||||||
from LANforge.LFUtils import *
|
from LANforge.LFUtils import *
|
||||||
|
|
||||||
import create_genlink as genl
|
|
||||||
|
|
||||||
debugOn = True
|
|
||||||
if sys.version_info[0] != 3:
|
if sys.version_info[0] != 3:
|
||||||
print("This script requires Python 3")
|
print("This script requires Python 3")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
dest = "localhost";
|
|
||||||
port = "8080";
|
|
||||||
dut_ssid = "MyAP"
|
|
||||||
dut_passwd = "NA"
|
|
||||||
dut_bssid = ""
|
|
||||||
user = ""
|
|
||||||
passwd = ""
|
|
||||||
sta_mode = "0" # See add_sta LANforge CLI users guide entry
|
|
||||||
radio = "wiphy0"
|
|
||||||
resource = "1"
|
|
||||||
upstream_resource = "1"
|
|
||||||
upstream_port = "eth2"
|
|
||||||
sta_name = "sta001"
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description="LANforge Unit Test: Connect Station to AP\nExample:\n./sta_connect.py --dest 192.168.100.209 --dut_ssid OpenWrt-2 --dut_bssid 24:F5:A2:08:21:6C")
|
class StaConnect(LFCliBase):
|
||||||
parser.add_argument("-d", "--dest", type=str, help="address of the LANforge GUI machine (localhost is default)")
|
def __init__(self, lfjson_host, lfjson_port, _dut_ssid="MyAP", _dut_passwd="NA", _dut_bssid="",
|
||||||
parser.add_argument("-o", "--port", type=int, help="IP Port the LANforge GUI is listening on (8080 is default)")
|
_user="", _passwd="", _sta_mode="0", _radio="wiphy0",
|
||||||
parser.add_argument("-u", "--user", type=str, help="TBD: credential login/username")
|
_resource=1, _upstream_resource=1, _upstream_port="eth2",
|
||||||
parser.add_argument("-p", "--passwd", type=str, help="TBD: credential password")
|
_sta_name="sta001", _debugOn=False):
|
||||||
parser.add_argument("--resource", type=str, help="LANforge Station resource ID to use, default is 1")
|
# self.dest = "localhost" # in super
|
||||||
parser.add_argument("--upstream_resource", type=str, help="LANforge Ethernet port resource ID to use, default is 1")
|
# self.port = "8080" # in super
|
||||||
parser.add_argument("--upstream_port", type=str, help="LANforge Ethernet port name, default is eth2")
|
|
||||||
parser.add_argument("--radio", type=str, help="LANforge radio to use, default is wiphy0")
|
|
||||||
parser.add_argument("--sta_mode", type=str, help="LANforge station-mode setting (see add_sta LANforge CLI documentation, default is 0 (auto))")
|
|
||||||
parser.add_argument("--dut_ssid", type=str, help="DUT SSID")
|
|
||||||
parser.add_argument("--dut_passwd", type=str, help="DUT PSK password. Do not set for OPEN auth")
|
|
||||||
parser.add_argument("--dut_bssid", type=str, help="DUT BSSID to which we expect to connect.")
|
|
||||||
|
|
||||||
args = None
|
super().__init__(self, lfjson_host, lfjson_port)
|
||||||
|
self.dut_ssid = _dut_ssid
|
||||||
|
self.dut_passwd = _dut_passwd
|
||||||
|
self.dut_bssid = _dut_bssid
|
||||||
|
self.user = _user
|
||||||
|
self.passwd = _passwd
|
||||||
|
self.sta_mode = _sta_mode # See add_sta LANforge CLI users guide entry
|
||||||
|
self.radio = _radio
|
||||||
|
self.resource = _resource
|
||||||
|
self.upstream_resource = _upstream_resource
|
||||||
|
self.upstream_port = _upstream_port
|
||||||
|
self.sta_name = _sta_name
|
||||||
|
self.sta_url = None # defer construction
|
||||||
|
self.upstream_url = None # defer construction
|
||||||
|
|
||||||
args = parser.parse_args()
|
def getStaUrl(self):
|
||||||
if (args.dest != None):
|
if self.sta_url is None:
|
||||||
dest = args.dest
|
self.sta_url = f"port/1/{self.resource}/{self.sta_name}"
|
||||||
if (args.port != None):
|
return self.sta_url
|
||||||
port = args.port
|
|
||||||
if (args.user != None):
|
|
||||||
user = args.user
|
|
||||||
if (args.passwd != None):
|
|
||||||
passwd = args.passwd
|
|
||||||
if (args.sta_mode != None):
|
|
||||||
sta_mode = args.sta_mode
|
|
||||||
if (args.upstream_resource != None):
|
|
||||||
upstream_resource = args.upstream_resource
|
|
||||||
if (args.upstream_port != None):
|
|
||||||
upstream_port = args.upstream_port
|
|
||||||
if (args.radio != None):
|
|
||||||
radio = args.radio
|
|
||||||
if (args.resource != None):
|
|
||||||
resource = args.resource
|
|
||||||
if (args.dut_passwd != None):
|
|
||||||
dut_passwd = args.dut_passwd
|
|
||||||
if (args.dut_bssid != None):
|
|
||||||
dut_bssid = args.dut_bssid
|
|
||||||
if (args.dut_ssid != None):
|
|
||||||
dut_ssid = args.dut_ssid
|
|
||||||
|
|
||||||
mgrURL = "http://%s:%s/"%(dest, port)
|
def getUpstreamUrl(self):
|
||||||
radio_url = "port/1/%s/%s"%(resource, radio)
|
if self.upstream_url is None:
|
||||||
sta_url = "port/1/%s/%s"%(resource, sta_name)
|
self.upstream_url = f"port/1/{self.upstream_resource}/{self.upstream_port}"
|
||||||
upstream_url = "port/1/%s/%s"%(upstream_resource, upstream_port)
|
return self.upstream_url
|
||||||
|
|
||||||
def jsonReq(mgrURL, reqURL, data, exitWhenCalled=False):
|
def checkConnect(self):
|
||||||
lf_r = LFRequest.LFRequest(mgrURL + reqURL)
|
print(f"Checking for LANforge GUI connection: {self.mgr_url}")
|
||||||
|
response = super().jsonGet("/")
|
||||||
|
duration = 0
|
||||||
|
while (response is None) and (duration < 300):
|
||||||
|
print(f"LANforge GUI connection not found sleeping 5 seconds, tried: {self.mgr_url}")
|
||||||
|
duration += 2
|
||||||
|
time.sleep(2)
|
||||||
|
response = super().jsonGet("/")
|
||||||
|
|
||||||
data['suppress_preexec_cli'] = True
|
if duration >= 300:
|
||||||
data['suppress_preexec_method'] = True
|
print("Could not connect to LANforge GUI")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
lf_r.addPostData(data)
|
# Compare pre-test values to post-test values
|
||||||
|
@staticmethod
|
||||||
|
def compareVals(name, postVal):
|
||||||
|
# print(f"Comparing {name}")
|
||||||
|
if postVal > 0:
|
||||||
|
print("PASSED: %s %s" % (name, postVal))
|
||||||
|
else:
|
||||||
|
print("FAILED: %s did not report traffic: %s" % (name, postVal))
|
||||||
|
|
||||||
json_response = lf_r.jsonPost(True)
|
def run(self):
|
||||||
# Debugging
|
self.checkConnect()
|
||||||
#if (json_response != None):
|
# Create stations and turn dhcp on
|
||||||
# print("jsonReq: response: ")
|
print("Creating station and turning on dhcp")
|
||||||
# LFUtils.debug_printer.pprint(vars(json_response))
|
|
||||||
if exitWhenCalled:
|
|
||||||
print("jsonReq: bye")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
def getJsonInfo(mgrURL, reqURL, debug=False):
|
url = self.getStaUrl()
|
||||||
lf_r = LFRequest.LFRequest(mgrURL + reqURL)
|
response = super().jsonGet(url)
|
||||||
json_response = lf_r.getAsJson(debug)
|
if response is not None:
|
||||||
return json_response
|
if response["interface"] is not None:
|
||||||
#print(name)
|
print("removing old station")
|
||||||
#j_printer = pprint.PrettyPrinter(indent=2)
|
LFUtils.removePort(self.resource, self.sta_name, self.mgr_url)
|
||||||
#j_printer.pprint(json_response)
|
time.sleep(5)
|
||||||
#for record in json_response[key]:
|
|
||||||
# j_printer.pprint(record)
|
|
||||||
|
|
||||||
|
# See add_sta in LANforge CLI user guide
|
||||||
print("Checking for LANforge GUI connection: %s"%(mgrURL))
|
url = "cli-json/add_sta"
|
||||||
response = getJsonInfo(mgrURL, radio_url);
|
data = {
|
||||||
duration = 0
|
"shelf": 1,
|
||||||
while ((response == None) and (duration < 300)):
|
"resource": self.resource,
|
||||||
print("LANforge GUI connection not found sleeping 5 seconds, tried: %s"%(mgrURL))
|
"radio": self.radio,
|
||||||
duration += 2
|
"sta_name": self.sta_name,
|
||||||
time.sleep(2)
|
"ssid": self.dut_ssid,
|
||||||
response = getJsonInfo(mgrURL, radio_url)
|
"key": self.dut_passwd,
|
||||||
|
"mode": self.sta_mode,
|
||||||
if duration >= 300:
|
"mac": "xx:xx:xx:xx:*:xx",
|
||||||
print("Could not connect to LANforge GUI")
|
"flags": 0x10000 # verbose, open
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
#Create stations and turn dhcp on
|
|
||||||
print("Creating station and turning on dhcp")
|
|
||||||
|
|
||||||
url = sta_url
|
|
||||||
debugOn = True
|
|
||||||
response = getJsonInfo(mgrURL, url)
|
|
||||||
if (response is not None):
|
|
||||||
if (response["interface"] is not None):
|
|
||||||
print("removing old station")
|
|
||||||
LFUtils.removePort(resource, sta_name, mgrURL)
|
|
||||||
time.sleep(5)
|
|
||||||
|
|
||||||
# See add_sta in LANforge CLI user guide
|
|
||||||
url = "cli-json/add_sta"
|
|
||||||
data = {
|
|
||||||
"shelf":1,
|
|
||||||
"resource":resource,
|
|
||||||
"radio":radio,
|
|
||||||
"sta_name":sta_name,
|
|
||||||
"ssid":dut_ssid,
|
|
||||||
"key":dut_passwd,
|
|
||||||
"mode":sta_mode,
|
|
||||||
"mac":"xx:xx:xx:xx:*:xx",
|
|
||||||
"flags":0x10000 # verbose, open
|
|
||||||
}
|
|
||||||
print("adding new station")
|
|
||||||
jsonReq(mgrURL, url, data)
|
|
||||||
|
|
||||||
reqURL = "cli-json/set_port"
|
|
||||||
data = {
|
|
||||||
"shelf":1,
|
|
||||||
"resource":resource,
|
|
||||||
"port":sta_name,
|
|
||||||
"current_flags": 0x80000000, # use DHCP, not down
|
|
||||||
"interest":0x4002 # set dhcp, current flags
|
|
||||||
}
|
|
||||||
print("configuring port")
|
|
||||||
jsonReq(mgrURL, reqURL, data)
|
|
||||||
|
|
||||||
time.sleep(5)
|
|
||||||
|
|
||||||
eth1IP = getJsonInfo(mgrURL, upstream_url)
|
|
||||||
if eth1IP['interface']['ip'] == "0.0.0.0":
|
|
||||||
print("Warning: %s lacks ip address"%(upstream_url))
|
|
||||||
|
|
||||||
reqURL = "cli-json/nc_show_ports"
|
|
||||||
data = { "shelf":1,
|
|
||||||
"resource":resource,
|
|
||||||
"port":sta_name,
|
|
||||||
"probe_flags":1 }
|
|
||||||
jsonReq(mgrURL, reqURL, data)
|
|
||||||
|
|
||||||
station_info = getJsonInfo(mgrURL, "%s?fields=port,ip,ap"%(sta_url))
|
|
||||||
duration = 0
|
|
||||||
maxTime = 300
|
|
||||||
ip = "0.0.0.0"
|
|
||||||
ap = ""
|
|
||||||
while ((ip == "0.0.0.0") and (duration < maxTime)):
|
|
||||||
|
|
||||||
duration += 2
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
station_info = getJsonInfo(mgrURL, "%s?fields=port,ip,ap"%(sta_url))
|
|
||||||
|
|
||||||
#LFUtils.debug_printer.pprint(station_info)
|
|
||||||
if ((station_info is not None) and ("interface" in station_info)):
|
|
||||||
if ("ip" in station_info["interface"]):
|
|
||||||
ip = station_info["interface"]["ip"]
|
|
||||||
if ("ap" in station_info["interface"]):
|
|
||||||
ap = station_info["interface"]["ap"]
|
|
||||||
|
|
||||||
if ((ap == "Not-Associated") or (ap == "")):
|
|
||||||
print("Station waiting to associate...")
|
|
||||||
else:
|
|
||||||
if (ip == "0.0.0.0"):
|
|
||||||
print("Station waiting for IP ...")
|
|
||||||
|
|
||||||
if ((ap != "") and (ap != "Not-Associated")):
|
|
||||||
print("Connected to AP: %s"%(ap))
|
|
||||||
if (dut_bssid != ""):
|
|
||||||
if (dut_bssid.lower() == ap.lower()):
|
|
||||||
print("PASSED: Connected to BSSID: %s"%(ap))
|
|
||||||
else:
|
|
||||||
print("FAILED: Connected to wrong BSSID, requested: %s Actual: %s"%(dut_bssid, ap))
|
|
||||||
else:
|
|
||||||
print("FAILED: Did not connect to AP");
|
|
||||||
sys.exit(3)
|
|
||||||
|
|
||||||
if (ip is "0.0.0.0"):
|
|
||||||
print("FAILED: %s did not get an ip. Ending test"%(sta_name))
|
|
||||||
print("Cleaning up...")
|
|
||||||
removePort(resource, sta_name, mgrURL)
|
|
||||||
sys.exit(1)
|
|
||||||
else:
|
|
||||||
print("PASSED: Connected to AP: %s With IP: %s"%(ap, ip))
|
|
||||||
|
|
||||||
#create endpoints and cxs
|
|
||||||
# Create UDP endpoints
|
|
||||||
reqURL = "cli-json/add_endp"
|
|
||||||
data = {
|
|
||||||
"alias":"testUDP-A",
|
|
||||||
"shelf":1,
|
|
||||||
"resource":resource,
|
|
||||||
"port":sta_name,
|
|
||||||
"type":"lf_udp",
|
|
||||||
"ip_port":"-1",
|
|
||||||
"min_rate":1000000
|
|
||||||
}
|
|
||||||
jsonReq(mgrURL, reqURL, data)
|
|
||||||
|
|
||||||
reqURL = "cli-json/add_endp"
|
|
||||||
data = {
|
|
||||||
"alias":"testUDP-B",
|
|
||||||
"shelf":1,
|
|
||||||
"resource":upstream_resource,
|
|
||||||
"port":upstream_port,
|
|
||||||
"type":"lf_udp",
|
|
||||||
"ip_port":"-1",
|
|
||||||
"min_rate":1000000
|
|
||||||
}
|
|
||||||
jsonReq(mgrURL, reqURL, data)
|
|
||||||
|
|
||||||
# Create CX
|
|
||||||
reqURL = "cli-json/add_cx"
|
|
||||||
data = {
|
|
||||||
"alias":"testUDP",
|
|
||||||
"test_mgr":"default_tm",
|
|
||||||
"tx_endp":"testUDP-A",
|
|
||||||
"rx_endp":"testUDP-B",
|
|
||||||
}
|
|
||||||
jsonReq(mgrURL, reqURL, data)
|
|
||||||
|
|
||||||
# Create TCP endpoints
|
|
||||||
reqURL = "cli-json/add_endp"
|
|
||||||
data = {
|
|
||||||
"alias":"testTCP-A",
|
|
||||||
"shelf":1,
|
|
||||||
"resource":resource,
|
|
||||||
"port":sta_name,
|
|
||||||
"type":"lf_tcp",
|
|
||||||
"ip_port":"0",
|
|
||||||
"min_rate":1000000
|
|
||||||
}
|
|
||||||
jsonReq(mgrURL, reqURL, data)
|
|
||||||
|
|
||||||
reqURL = "cli-json/add_endp"
|
|
||||||
data = {
|
|
||||||
"alias":"testTCP-B",
|
|
||||||
"shelf":1,
|
|
||||||
"resource":upstream_resource,
|
|
||||||
"port":upstream_port,
|
|
||||||
"type":"lf_tcp",
|
|
||||||
"ip_port":"-1",
|
|
||||||
"min_rate":1000000
|
|
||||||
}
|
|
||||||
jsonReq(mgrURL, reqURL, data)
|
|
||||||
|
|
||||||
# Create CX
|
|
||||||
reqURL = "cli-json/add_cx"
|
|
||||||
data = {
|
|
||||||
"alias":"testTCP",
|
|
||||||
"test_mgr":"default_tm",
|
|
||||||
"tx_endp":"testTCP-A",
|
|
||||||
"rx_endp":"testTCP-B",
|
|
||||||
}
|
|
||||||
jsonReq(mgrURL, reqURL, data)
|
|
||||||
|
|
||||||
cxNames = ["testTCP","testUDP"]
|
|
||||||
endpNames = ["testTCP-A", "testTCP-B",
|
|
||||||
"testUDP-A", "testUDP-B"]
|
|
||||||
|
|
||||||
#start cx traffic
|
|
||||||
print("\nStarting CX Traffic")
|
|
||||||
for name in range(len(cxNames)):
|
|
||||||
reqURL = "cli-json/set_cx_state"
|
|
||||||
data = {
|
|
||||||
"test_mgr":"ALL",
|
|
||||||
"cx_name":cxNames[name],
|
|
||||||
"cx_state":"RUNNING"
|
|
||||||
}
|
}
|
||||||
jsonReq(mgrURL, reqURL, data)
|
print("adding new station")
|
||||||
|
super().jsonPost(url, data)
|
||||||
|
|
||||||
# Refresh stats
|
reqURL = "cli-json/set_port"
|
||||||
print("\nRefresh CX stats")
|
data = {
|
||||||
for name in range(len(cxNames)):
|
"shelf": 1,
|
||||||
reqURL = "cli-json/show_cxe"
|
"resource": self.resource,
|
||||||
data = {
|
"port": self.sta_name,
|
||||||
"test_mgr":"ALL",
|
"current_flags": 0x80000000, # use DHCP, not down
|
||||||
"cross_connect":cxNames[name]
|
"interest": 0x4002 # set dhcp, current flags
|
||||||
}
|
}
|
||||||
jsonReq(mgrURL, reqURL, data)
|
print("configuring port")
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
#print("Sleeping for 15 seconds")
|
time.sleep(5)
|
||||||
time.sleep(15)
|
|
||||||
|
|
||||||
#stop cx traffic
|
eth1IP = self.jsonGet(self.getUpstreamUrl())
|
||||||
print("\nStopping CX Traffic")
|
if eth1IP['interface']['ip'] == "0.0.0.0":
|
||||||
for name in range(len(cxNames)):
|
print(f"Warning: {self.getUpstreamUrl()} lacks ip address")
|
||||||
reqURL = "cli-json/set_cx_state"
|
|
||||||
data = {
|
reqURL = "cli-json/nc_show_ports"
|
||||||
"test_mgr":"ALL",
|
data = {"shelf": 1,
|
||||||
"cx_name":cxNames[name],
|
"resource": self.resource,
|
||||||
"cx_state":"STOPPED"
|
"port": self.sta_name,
|
||||||
|
"probe_flags": 1}
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
|
# station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl()))
|
||||||
|
duration = 0
|
||||||
|
maxTime = 300
|
||||||
|
ip = "0.0.0.0"
|
||||||
|
ap = ""
|
||||||
|
while (ip == "0.0.0.0") and (duration < maxTime):
|
||||||
|
|
||||||
|
duration += 2
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
station_info = super().jsonGet(f"{self.getStaUrl()}?fields=port,ip,ap")
|
||||||
|
|
||||||
|
# LFUtils.debug_printer.pprint(station_info)
|
||||||
|
if (station_info is not None) and ("interface" in station_info):
|
||||||
|
if "ip" in station_info["interface"]:
|
||||||
|
ip = station_info["interface"]["ip"]
|
||||||
|
if "ap" in station_info["interface"]:
|
||||||
|
ap = station_info["interface"]["ap"]
|
||||||
|
|
||||||
|
if (ap == "Not-Associated") or (ap == ""):
|
||||||
|
print("Station waiting to associate...")
|
||||||
|
else:
|
||||||
|
if ip == "0.0.0.0":
|
||||||
|
print("Station waiting for IP ...")
|
||||||
|
|
||||||
|
if (ap != "") and (ap != "Not-Associated"):
|
||||||
|
print(f"Connected to AP: {ap}")
|
||||||
|
if self.dut_bssid != "":
|
||||||
|
if self.dut_bssid.lower() == ap.lower():
|
||||||
|
print(f"PASSED: Connected to BSSID: {ap}")
|
||||||
|
else:
|
||||||
|
print("FAILED: Connected to wrong BSSID, requested: %s Actual: %s" % (self.dut_bssid, ap))
|
||||||
|
else:
|
||||||
|
print("FAILED: Did not connect to AP")
|
||||||
|
sys.exit(3)
|
||||||
|
|
||||||
|
if ip == "0.0.0.0":
|
||||||
|
print(f"FAILED: {self.sta_name} did not get an ip. Ending test")
|
||||||
|
print("Cleaning up...")
|
||||||
|
removePort(self.resource, self.sta_name, self.mgr_url)
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
print("PASSED: Connected to AP: %s With IP: %s" % (ap, ip))
|
||||||
|
|
||||||
|
# create endpoints and cxs
|
||||||
|
# Create UDP endpoints
|
||||||
|
reqURL = "cli-json/add_endp"
|
||||||
|
data = {
|
||||||
|
"alias": "testUDP-A",
|
||||||
|
"shelf": 1,
|
||||||
|
"resource": self.resource,
|
||||||
|
"port": self.sta_name,
|
||||||
|
"type": "lf_udp",
|
||||||
|
"ip_port": "-1",
|
||||||
|
"min_rate": 1000000
|
||||||
}
|
}
|
||||||
jsonReq(mgrURL, reqURL, data)
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
# Refresh stats
|
reqURL = "cli-json/add_endp"
|
||||||
print("\nRefresh CX stats")
|
data = {
|
||||||
for name in range(len(cxNames)):
|
"alias": "testUDP-B",
|
||||||
reqURL = "cli-json/show_cxe"
|
"shelf": 1,
|
||||||
data = {
|
"resource": self.upstream_resource,
|
||||||
"test_mgr":"ALL",
|
"port": self.upstream_port,
|
||||||
"cross_connect":cxNames[name]
|
"type": "lf_udp",
|
||||||
|
"ip_port": "-1",
|
||||||
|
"min_rate": 1000000
|
||||||
}
|
}
|
||||||
jsonReq(mgrURL, reqURL, data)
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
#print("Sleeping for 5 seconds")
|
# Create CX
|
||||||
time.sleep(5)
|
reqURL = "cli-json/add_cx"
|
||||||
|
data = {
|
||||||
|
"alias": "testUDP",
|
||||||
|
"test_mgr": "default_tm",
|
||||||
|
"tx_endp": "testUDP-A",
|
||||||
|
"rx_endp": "testUDP-B",
|
||||||
|
}
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
#get data for endpoints JSON
|
# Create TCP endpoints
|
||||||
print("Collecting Data")
|
reqURL = "cli-json/add_endp"
|
||||||
try:
|
data = {
|
||||||
ptestTCPA = getJsonInfo(mgrURL, "endp/testTCP-A?fields=tx+bytes,rx+bytes")
|
"alias": "testTCP-A",
|
||||||
ptestTCPATX = ptestTCPA['endpoint']['tx bytes']
|
"shelf": 1,
|
||||||
ptestTCPARX = ptestTCPA['endpoint']['rx bytes']
|
"resource": self.resource,
|
||||||
|
"port": self.sta_name,
|
||||||
|
"type": "lf_tcp",
|
||||||
|
"ip_port": "0",
|
||||||
|
"min_rate": 1000000
|
||||||
|
}
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
ptestTCPB = getJsonInfo(mgrURL, "endp/testTCP-B?fields=tx+bytes,rx+bytes")
|
reqURL = "cli-json/add_endp"
|
||||||
ptestTCPBTX = ptestTCPB['endpoint']['tx bytes']
|
data = {
|
||||||
ptestTCPBRX = ptestTCPB['endpoint']['rx bytes']
|
"alias": "testTCP-B",
|
||||||
|
"shelf": 1,
|
||||||
|
"resource": self.upstream_resource,
|
||||||
|
"port": self.upstream_port,
|
||||||
|
"type": "lf_tcp",
|
||||||
|
"ip_port": "-1",
|
||||||
|
"min_rate": 1000000
|
||||||
|
}
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
ptestUDPA = getJsonInfo(mgrURL, "endp/testUDP-A?fields=tx+bytes,rx+bytes")
|
# Create CX
|
||||||
ptestUDPATX = ptestUDPA['endpoint']['tx bytes']
|
reqURL = "cli-json/add_cx"
|
||||||
ptestUDPARX = ptestUDPA['endpoint']['rx bytes']
|
data = {
|
||||||
|
"alias": "testTCP",
|
||||||
|
"test_mgr": "default_tm",
|
||||||
|
"tx_endp": "testTCP-A",
|
||||||
|
"rx_endp": "testTCP-B",
|
||||||
|
}
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
ptestUDPB = getJsonInfo(mgrURL, "endp/testUDP-B?fields=tx+bytes,rx+bytes")
|
cxNames = ["testTCP", "testUDP"]
|
||||||
ptestUDPBTX = ptestUDPB['endpoint']['tx bytes']
|
endpNames = ["testTCP-A", "testTCP-B",
|
||||||
ptestUDPBRX = ptestUDPB['endpoint']['rx bytes']
|
"testUDP-A", "testUDP-B"]
|
||||||
except Exception as e:
|
|
||||||
print("Something went wrong")
|
|
||||||
print(e)
|
|
||||||
print("Cleaning up...")
|
|
||||||
reqURL = "cli-json/rm_vlan"
|
|
||||||
data = {
|
|
||||||
"shelf":1,
|
|
||||||
"resource":resource,
|
|
||||||
"port":sta_name
|
|
||||||
}
|
|
||||||
|
|
||||||
jsonReq(mgrURL, reqURL, data)
|
# start cx traffic
|
||||||
|
print("\nStarting CX Traffic")
|
||||||
|
for name in range(len(cxNames)):
|
||||||
|
reqURL = "cli-json/set_cx_state"
|
||||||
|
data = {
|
||||||
|
"test_mgr": "ALL",
|
||||||
|
"cx_name": cxNames[name],
|
||||||
|
"cx_state": "RUNNING"
|
||||||
|
}
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
removeCX(mgrURL, cxNames)
|
# Refresh stats
|
||||||
removeEndps(mgrURL, endpNames)
|
print("\nRefresh CX stats")
|
||||||
sys.exit(1)
|
for name in range(len(cxNames)):
|
||||||
|
reqURL = "cli-json/show_cxe"
|
||||||
|
data = {
|
||||||
|
"test_mgr": "ALL",
|
||||||
|
"cross_connect": cxNames[name]
|
||||||
|
}
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
#compare pre-test values to post-test values
|
# print("Sleeping for 15 seconds")
|
||||||
|
time.sleep(15)
|
||||||
|
|
||||||
def compareVals(name, postVal):
|
# stop cx traffic
|
||||||
#print(f"Comparing {name}")
|
print("\nStopping CX Traffic")
|
||||||
if postVal > 0:
|
for name in range(len(cxNames)):
|
||||||
print("PASSED: %s %s"%(name, postVal))
|
reqURL = "cli-json/set_cx_state"
|
||||||
else:
|
data = {
|
||||||
print("FAILED: %s did not report traffic: %s"%(name, postVal))
|
"test_mgr": "ALL",
|
||||||
|
"cx_name": cxNames[name],
|
||||||
|
"cx_state": "STOPPED"
|
||||||
|
}
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
print("\n")
|
# Refresh stats
|
||||||
compareVals("testTCP-A TX", ptestTCPATX)
|
print("\nRefresh CX stats")
|
||||||
compareVals("testTCP-A RX", ptestTCPARX)
|
for name in range(len(cxNames)):
|
||||||
|
reqURL = "cli-json/show_cxe"
|
||||||
|
data = {
|
||||||
|
"test_mgr": "ALL",
|
||||||
|
"cross_connect": cxNames[name]
|
||||||
|
}
|
||||||
|
super().jsonPost(reqURL, data)
|
||||||
|
|
||||||
compareVals("testTCP-B TX", ptestTCPBTX)
|
# print("Sleeping for 5 seconds")
|
||||||
compareVals("testTCP-B RX", ptestTCPBRX)
|
time.sleep(5)
|
||||||
|
|
||||||
compareVals("testUDP-A TX", ptestUDPATX)
|
# get data for endpoints JSON
|
||||||
compareVals("testUDP-A RX", ptestUDPARX)
|
print("Collecting Data")
|
||||||
|
try:
|
||||||
|
ptestTCPA = super().jsonGet("endp/testTCP-A?fields=tx+bytes,rx+bytes")
|
||||||
|
ptestTCPATX = ptestTCPA['endpoint']['tx bytes']
|
||||||
|
ptestTCPARX = ptestTCPA['endpoint']['rx bytes']
|
||||||
|
|
||||||
compareVals("testUDP-B TX", ptestUDPBTX)
|
ptestTCPB = super().jsonGet("endp/testTCP-B?fields=tx+bytes,rx+bytes")
|
||||||
compareVals("testUDP-B RX", ptestUDPBRX)
|
ptestTCPBTX = ptestTCPB['endpoint']['tx bytes']
|
||||||
print("\n")
|
ptestTCPBRX = ptestTCPB['endpoint']['rx bytes']
|
||||||
|
|
||||||
|
ptestUDPA = super().jsonGet("endp/testUDP-A?fields=tx+bytes,rx+bytes")
|
||||||
|
ptestUDPATX = ptestUDPA['endpoint']['tx bytes']
|
||||||
|
ptestUDPARX = ptestUDPA['endpoint']['rx bytes']
|
||||||
|
|
||||||
|
ptestUDPB = super().jsonGet("endp/testUDP-B?fields=tx+bytes,rx+bytes")
|
||||||
|
ptestUDPBTX = ptestUDPB['endpoint']['tx bytes']
|
||||||
|
ptestUDPBRX = ptestUDPB['endpoint']['rx bytes']
|
||||||
|
except Exception as e:
|
||||||
|
print("Something went wrong")
|
||||||
|
print(e)
|
||||||
|
print("Cleaning up...")
|
||||||
|
reqURL = "cli-json/rm_vlan"
|
||||||
|
data = {
|
||||||
|
"shelf": 1,
|
||||||
|
"resource": self.resource,
|
||||||
|
"port": self.sta_name
|
||||||
|
}
|
||||||
|
|
||||||
|
self.jsonPost(reqURL, data)
|
||||||
|
|
||||||
|
removeCX(self.mgr_url, cxNames)
|
||||||
|
removeEndps(self.mgr_url, endpNames)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print("\n")
|
||||||
|
self.compareVals("testTCP-A TX", ptestTCPATX)
|
||||||
|
self.compareVals("testTCP-A RX", ptestTCPARX)
|
||||||
|
|
||||||
|
self.compareVals("testTCP-B TX", ptestTCPBTX)
|
||||||
|
self.compareVals("testTCP-B RX", ptestTCPBRX)
|
||||||
|
|
||||||
|
self.compareVals("testUDP-A TX", ptestUDPATX)
|
||||||
|
self.compareVals("testUDP-A RX", ptestUDPARX)
|
||||||
|
|
||||||
|
self.compareVals("testUDP-B TX", ptestUDPBTX)
|
||||||
|
self.compareVals("testUDP-B RX", ptestUDPBRX)
|
||||||
|
print("\n")
|
||||||
|
|
||||||
|
# remove all endpoints and cxs
|
||||||
|
LFUtils.removePort(self.resource, self.sta_name, self.mgr_url)
|
||||||
|
|
||||||
|
removeCX(self.mgr_url, cxNames)
|
||||||
|
removeEndps(self.mgr_url, endpNames)
|
||||||
|
|
||||||
|
# ~class
|
||||||
|
|
||||||
|
|
||||||
|
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
lfjson_host = "http://localhost"
|
||||||
|
lfjson_port = 8080
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="""LANforge Unit Test: Connect Station to AP
|
||||||
|
Example:
|
||||||
|
./sta_connect.py --dest 192.168.100.209 --dut_ssid OpenWrt-2 --dut_bssid 24:F5:A2:08:21:6C
|
||||||
|
""")
|
||||||
|
parser.add_argument("-d", "--dest", type=str, help="address of the LANforge GUI machine (localhost is default)")
|
||||||
|
parser.add_argument("-o", "--port", type=int, help="IP Port the LANforge GUI is listening on (8080 is default)")
|
||||||
|
parser.add_argument("-u", "--user", type=str, help="TBD: credential login/username")
|
||||||
|
parser.add_argument("-p", "--passwd", type=str, help="TBD: credential password")
|
||||||
|
parser.add_argument("--resource", type=str, help="LANforge Station resource ID to use, default is 1")
|
||||||
|
parser.add_argument("--upstream_resource", type=str, help="LANforge Ethernet port resource ID to use, default is 1")
|
||||||
|
parser.add_argument("--upstream_port", type=str, help="LANforge Ethernet port name, default is eth2")
|
||||||
|
parser.add_argument("--radio", type=str, help="LANforge radio to use, default is wiphy0")
|
||||||
|
parser.add_argument("--sta_mode", type=str,
|
||||||
|
help="LANforge station-mode setting (see add_sta LANforge CLI documentation, default is 0 (auto))")
|
||||||
|
parser.add_argument("--dut_ssid", type=str, help="DUT SSID")
|
||||||
|
parser.add_argument("--dut_passwd", type=str, help="DUT PSK password. Do not set for OPEN auth")
|
||||||
|
parser.add_argument("--dut_bssid", type=str, help="DUT BSSID to which we expect to connect.")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
if args.dest is not None:
|
||||||
|
lfjson_host = args.dest
|
||||||
|
if args.port is not None:
|
||||||
|
lfjson_port = args.port
|
||||||
|
|
||||||
|
staConnect = StaConnect(lfjson_host, lfjson_port)
|
||||||
|
|
||||||
|
if args.user is not None:
|
||||||
|
staConnect.user = args.user
|
||||||
|
if args.passwd is not None:
|
||||||
|
staConnect.passwd = args.passwd
|
||||||
|
if args.sta_mode is not None:
|
||||||
|
staConnect.sta_mode = args.sta_mode
|
||||||
|
if args.upstream_resource is not None:
|
||||||
|
staConnect.upstream_resource = args.upstream_resource
|
||||||
|
if args.upstream_port is not None:
|
||||||
|
staConnect.upstream_port = args.upstream_port
|
||||||
|
if args.radio is not None:
|
||||||
|
staConnect.radio = args.radio
|
||||||
|
if args.resource is not None:
|
||||||
|
staConnect.resource = args.resource
|
||||||
|
if args.dut_passwd is not None:
|
||||||
|
staConnect.dut_passwd = args.dut_passwd
|
||||||
|
if args.dut_bssid is not None:
|
||||||
|
staConnect.dut_bssid = args.dut_bssid
|
||||||
|
if args.dut_ssid is not None:
|
||||||
|
staConnect.dut_ssid = args.dut_ssid
|
||||||
|
|
||||||
|
|
||||||
|
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#remove all endpoints and cxs
|
if __name__ == "__main__":
|
||||||
LFUtils.removePort(resource, sta_name, mgrURL)
|
main()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
removeCX(mgrURL, cxNames)
|
|
||||||
removeEndps(mgrURL, endpNames)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user