This commit is contained in:
Logan Lipke
2020-06-02 09:51:43 -07:00
3 changed files with 422 additions and 384 deletions

View File

@@ -7,16 +7,14 @@ if sys.version_info[0] != 3:
print("This script requires Python 3") print("This script requires Python 3")
exit() exit()
import email.message
import http.client
import urllib.request import urllib.request
import urllib.error import urllib.error
from urllib import error
import urllib.parse import urllib.parse
import json import json
import LANforge
from LANforge import LFUtils from LANforge import LFUtils
class LFRequest: class LFRequest:
Default_Base_URL = "http://localhost:8080" Default_Base_URL = "http://localhost:8080"
No_Data = {'No Data':0} No_Data = {'No Data':0}
@@ -45,14 +43,14 @@ class LFRequest:
request.headers['Content-type'] = 'application/x-www-form-urlencoded' request.headers['Content-type'] = 'application/x-www-form-urlencoded'
resp = '' resp = ''
try: try:
resp = urllib.request.urlopen(request); resp = urllib.request.urlopen(request)
responses.append(resp) responses.append(resp)
return responses[0] return responses[0]
except urllib.error.HTTPError as error: except urllib.error.HTTPError as error:
if (show_error): if (show_error):
print("----- formPost() HTTPError: --------------------------------------------") print("----- formPost() HTTPError: --------------------------------------------")
print("%s: %s; URL: %s"%(error.code, error.reason, request.get_full_url())) print("%s: %s; URL: %s"%(error.code, error.reason, request.get_full_url()))
LFUtils.debug_printer.pprint(error.headers()) LFUtils.debug_printer.pprint(error.headers)
#print("Error: ", sys.exc_info()[0]) #print("Error: ", sys.exc_info()[0])
#print("Request URL:", request.get_full_url()) #print("Request URL:", request.get_full_url())
print("Request Content-type:", request.get_header('Content-type')) print("Request Content-type:", request.get_header('Content-type'))
@@ -84,11 +82,11 @@ class LFRequest:
request.headers['Content-type'] = 'application/json' request.headers['Content-type'] = 'application/json'
try: try:
resp = urllib.request.urlopen(request); resp = urllib.request.urlopen(request)
responses.append(resp) responses.append(resp)
return responses[0] return responses[0]
except urllib.error.HTTPError as error: except urllib.error.HTTPError as error:
if (show_error): if show_error:
print("----- jsonPost() HTTPError: --------------------------------------------") print("----- jsonPost() HTTPError: --------------------------------------------")
print("<%s> HTTP %s: %s"%(request.get_full_url(), error.code, error.reason, )) print("<%s> HTTP %s: %s"%(request.get_full_url(), error.code, error.reason, ))
@@ -99,24 +97,23 @@ class LFRequest:
print("Request Data:") print("Request Data:")
LFUtils.debug_printer.pprint(request.data) LFUtils.debug_printer.pprint(request.data)
if (error.headers): if error.headers:
# the HTTPError is of type HTTPMessage a subclass of email.message # the HTTPError is of type HTTPMessage a subclass of email.message
#print(type(error.keys())) # print(type(error.keys()))
for headername in sorted(error.headers.keys()): for headername in sorted(error.headers.keys()):
print ("Response %s: %s "%(headername, error.headers.get(headername))) print ("Response %s: %s "%(headername, error.headers.get(headername)))
if (len(responses) > 0): if len(responses) > 0:
print("----- Response: --------------------------------------------------------") print("----- Response: --------------------------------------------------------")
LFUtils.debug_printer.pprint(responses[0].reason) LFUtils.debug_printer.pprint(responses[0].reason)
print("------------------------------------------------------------------------") print("------------------------------------------------------------------------")
except urllib.error.URLError as uerror: except urllib.error.URLError as uerror:
if (show_error): if show_error:
print("----- jsonPost() URLError: ---------------------------------------------") print("----- jsonPost() URLError: ---------------------------------------------")
print("Reason: %s; URL: %s"%(uerror.reason, request.get_full_url())) print("Reason: %s; URL: %s"%(uerror.reason, request.get_full_url()))
print("------------------------------------------------------------------------") print("------------------------------------------------------------------------")
return None return None
def get(self, show_error=True): def get(self, show_error=True):
myrequest = urllib.request.Request(url=self.requested_url, headers=self.default_headers) myrequest = urllib.request.Request(url=self.requested_url, headers=self.default_headers)
myresponses = [] myresponses = []
@@ -124,10 +121,10 @@ class LFRequest:
myresponses.append(urllib.request.urlopen(myrequest)) myresponses.append(urllib.request.urlopen(myrequest))
return myresponses[0] return myresponses[0]
except urllib.error.HTTPError as error: except urllib.error.HTTPError as error:
if (show_error): if show_error:
print("----- get() HTTPError: --------------------------------------------") print("----- get() HTTPError: --------------------------------------------")
print("<%s> HTTP %s: %s"%(myrequest.get_full_url(), error.code, error.reason, )) print("<%s> HTTP %s: %s"%(myrequest.get_full_url(), error.code, error.reason, ))
if (error.code != 404): if error.code != 404:
print("Error: ", sys.exc_info()[0]) print("Error: ", sys.exc_info()[0])
print("Request URL:", myrequest.get_full_url()) print("Request URL:", myrequest.get_full_url())
print("Request Content-type:", myrequest.get_header('Content-type')) print("Request Content-type:", myrequest.get_header('Content-type'))
@@ -135,24 +132,23 @@ class LFRequest:
print("Request Data:") print("Request Data:")
LFUtils.debug_printer.pprint(myrequest.data) LFUtils.debug_printer.pprint(myrequest.data)
if (error.headers): if error.headers:
# the HTTPError is of type HTTPMessage a subclass of email.message # the HTTPError is of type HTTPMessage a subclass of email.message
#print(type(error.keys())) # print(type(error.keys()))
for headername in sorted(error.headers.keys()): for headername in sorted(error.headers.keys()):
print ("Response %s: %s "%(headername, error.headers.get(headername))) print ("Response %s: %s "%(headername, error.headers.get(headername)))
if (len(myresponses) > 0): if len(myresponses) > 0:
print("----- Response: --------------------------------------------------------") print("----- Response: --------------------------------------------------------")
LFUtils.debug_printer.pprint(myresponses[0].reason) LFUtils.debug_printer.pprint(myresponses[0].reason)
print("------------------------------------------------------------------------") print("------------------------------------------------------------------------")
except urllib.error.URLError as uerror: except urllib.error.URLError as uerror:
if (show_error): if show_error:
print("----- get() URLError: ---------------------------------------------") print("----- get() URLError: ---------------------------------------------")
print("Reason: %s; URL: %s"%(uerror.reason, myrequest.get_full_url())) print("Reason: %s; URL: %s"%(uerror.reason, myrequest.get_full_url()))
print("------------------------------------------------------------------------") print("------------------------------------------------------------------------")
return None return None
def getAsJson(self, show_error=True): def getAsJson(self, show_error=True):
responses = [] responses = []
responses.append(self.get(show_error)) responses.append(self.get(show_error))

View File

@@ -0,0 +1,52 @@
#!env /usr/bin/python
# Extend this class to use common set of debug and request features for your script
from pprint import pprint
import LANforge.LFUtils
from LANforge.LFUtils import *
class LFCliBase:
# do not use `super(LFCLiBase,self).__init__(self, host, port, _debugOn)
# that is py2 era syntax and will force self into the host variable, making you
# very confused.
def __init__(self, _lfjson_host, _lfjson_port, _debug=False):
self.lfjson_host = _lfjson_host
self.lfjson_port = _lfjson_port
self.debugOn = _debug
self.mgr_url = "http://%s:%s/" % (self.lfjson_host, self.lfjson_port)
def jsonPost(self, _req_url, _data):
lf_r = LFRequest.LFRequest(self.mgr_url + _req_url)
_data['suppress_preexec_cli'] = True
_data['suppress_preexec_method'] = True
lf_r.addPostData(_data)
if (self.debugOn):
LANforge.LFUtils.debug_printer.pprint(_data)
json_response = lf_r.jsonPost(self.debugOn)
# Debugging
# if (json_response != None):
# print("jsonReq: response: ")
# LFUtils.debug_printer.pprint(vars(json_response))
return json_response
def jsonGet(self, _req_url):
lf_r = LFRequest.LFRequest(self.mgr_url + _req_url)
json_response = lf_r.getAsJson(self.debugOn)
return json_response
def checkConnect(self):
print(f"Checking for LANforge GUI connection: {self.mgr_url}")
response = self.jsonGet("/")
duration = 0
while (response is None) and (duration < 300):
print(f"LANforge GUI connection not found sleeping 5 seconds, tried: {self.mgr_url}")
duration += 2
time.sleep(2)
response = self.jsonGet("")
if duration >= 300:
print("Could not connect to LANforge GUI")
sys.exit(1)
# ~class

View File

@@ -5,412 +5,402 @@
# to the requested BSSID if bssid is specified as an argument. # to the requested BSSID if bssid is specified as an argument.
# The script will clean up the station and connections at the end of the test. # The script will clean up the station and connections at the end of the test.
import os
import time
import sys import sys
import argparse import argparse
if 'py-json' not in sys.path: if 'py-json' not in sys.path:
sys.path.append('../py-json') sys.path.append('../py-json')
import subprocess # from LANforge import LFRequest
import json
import pprint
from LANforge import LFRequest
from LANforge import LFUtils from LANforge import LFUtils
# from LANforge import LFCliBase
from LANforge.lfcli_base import LFCliBase
from LANforge.LFUtils import * from LANforge.LFUtils import *
from pprint import pprint
import create_genlink as genl
debugOn = True
if sys.version_info[0] != 3: if sys.version_info[0] != 3:
print("This script requires Python 3") print("This script requires Python 3")
exit(1) exit(1)
dest = "localhost";
port = "8080";
dut_ssid = "MyAP"
dut_passwd = "NA"
dut_bssid = ""
user = ""
passwd = ""
sta_mode = "0" # See add_sta LANforge CLI users guide entry
radio = "wiphy0"
resource = "1"
upstream_resource = "1"
upstream_port = "eth2"
sta_name = "sta001"
parser = argparse.ArgumentParser(description="LANforge Unit Test: Connect Station to AP\nExample:\n./sta_connect.py --dest 192.168.100.209 --dut_ssid OpenWrt-2 --dut_bssid 24:F5:A2:08:21:6C") class StaConnect(LFCliBase):
parser.add_argument("-d", "--dest", type=str, help="address of the LANforge GUI machine (localhost is default)") def __init__(self, host, port, _dut_ssid="MyAP", _dut_passwd="NA", _dut_bssid="",
parser.add_argument("-o", "--port", type=int, help="IP Port the LANforge GUI is listening on (8080 is default)") _user="", _passwd="", _sta_mode="0", _radio="wiphy0",
parser.add_argument("-u", "--user", type=str, help="TBD: credential login/username") _resource=1, _upstream_resource=1, _upstream_port="eth2",
parser.add_argument("-p", "--passwd", type=str, help="TBD: credential password") _sta_name="sta001", _debugOn=False):
parser.add_argument("--resource", type=str, help="LANforge Station resource ID to use, default is 1") # do not use `super(LFCLiBase,self).__init__(self, host, port, _debugOn)
parser.add_argument("--upstream_resource", type=str, help="LANforge Ethernet port resource ID to use, default is 1") # that is py2 era syntax and will force self into the host variable, making you
parser.add_argument("--upstream_port", type=str, help="LANforge Ethernet port name, default is eth2") # very confused.
parser.add_argument("--radio", type=str, help="LANforge radio to use, default is wiphy0") super().__init__(host, port, _debugOn)
parser.add_argument("--sta_mode", type=str, help="LANforge station-mode setting (see add_sta LANforge CLI documentation, default is 0 (auto))")
parser.add_argument("--dut_ssid", type=str, help="DUT SSID")
parser.add_argument("--dut_passwd", type=str, help="DUT PSK password. Do not set for OPEN auth")
parser.add_argument("--dut_bssid", type=str, help="DUT BSSID to which we expect to connect.")
args = None self.dut_ssid = _dut_ssid
self.dut_passwd = _dut_passwd
self.dut_bssid = _dut_bssid
self.user = _user
self.passwd = _passwd
self.sta_mode = _sta_mode # See add_sta LANforge CLI users guide entry
self.radio = _radio
self.resource = _resource
self.upstream_resource = _upstream_resource
self.upstream_port = _upstream_port
self.sta_name = _sta_name
self.sta_url = None # defer construction
self.upstream_url = None # defer construction
args = parser.parse_args() def getStaUrl(self):
if (args.dest != None): if self.sta_url is None:
dest = args.dest self.sta_url = f"port/1/{self.resource}/{self.sta_name}"
if (args.port != None): return self.sta_url
port = args.port
if (args.user != None):
user = args.user
if (args.passwd != None):
passwd = args.passwd
if (args.sta_mode != None):
sta_mode = args.sta_mode
if (args.upstream_resource != None):
upstream_resource = args.upstream_resource
if (args.upstream_port != None):
upstream_port = args.upstream_port
if (args.radio != None):
radio = args.radio
if (args.resource != None):
resource = args.resource
if (args.dut_passwd != None):
dut_passwd = args.dut_passwd
if (args.dut_bssid != None):
dut_bssid = args.dut_bssid
if (args.dut_ssid != None):
dut_ssid = args.dut_ssid
mgrURL = "http://%s:%s/"%(dest, port) def getUpstreamUrl(self):
radio_url = "port/1/%s/%s"%(resource, radio) if self.upstream_url is None:
sta_url = "port/1/%s/%s"%(resource, sta_name) self.upstream_url = f"port/1/{self.upstream_resource}/{self.upstream_port}"
upstream_url = "port/1/%s/%s"%(upstream_resource, upstream_port) return self.upstream_url
def jsonReq(mgrURL, reqURL, data, exitWhenCalled=False): # Compare pre-test values to post-test values
lf_r = LFRequest.LFRequest(mgrURL + reqURL) @staticmethod
def compareVals(name, postVal):
# print(f"Comparing {name}")
if postVal > 0:
print("PASSED: %s %s" % (name, postVal))
else:
print("FAILED: %s did not report traffic: %s" % (name, postVal))
data['suppress_preexec_cli'] = True def run(self):
data['suppress_preexec_method'] = True self.checkConnect()
eth1IP = self.jsonGet(self.getUpstreamUrl())
if eth1IP['interface']['ip'] == "0.0.0.0":
print(f"Warning: {self.getUpstreamUrl()} lacks ip address")
lf_r.addPostData(data) url = self.getStaUrl()
response = super().jsonGet(url)
if response is not None:
if response["interface"] is not None:
print("removing old station")
LFUtils.removePort(self.resource, self.sta_name, self.mgr_url)
LFUtils.waitUntilPortsDisappear(self.resource, self.mgr_url, [self.sta_name])
json_response = lf_r.jsonPost(True) # Create stations and turn dhcp on
# Debugging print("Creating station %s and turning on dhcp..." % self.sta_name)
#if (json_response != None): url = "cli-json/add_sta"
# print("jsonReq: response: ") flags = 0x10000
# LFUtils.debug_printer.pprint(vars(json_response)) if "" != self.dut_passwd:
if exitWhenCalled: flags += 0x400
print("jsonReq: bye") data = {
sys.exit(1) "shelf": 1,
"resource": self.resource,
def getJsonInfo(mgrURL, reqURL, debug=False): "radio": self.radio,
lf_r = LFRequest.LFRequest(mgrURL + reqURL) "sta_name": self.sta_name,
json_response = lf_r.getAsJson(debug) "ssid": self.dut_ssid,
return json_response "key": self.dut_passwd,
#print(name) "mode": self.sta_mode,
#j_printer = pprint.PrettyPrinter(indent=2) "mac": "xx:xx:xx:xx:*:xx",
#j_printer.pprint(json_response) "flags": flags # verbose, wpa2
#for record in json_response[key]:
# j_printer.pprint(record)
print("Checking for LANforge GUI connection: %s"%(mgrURL))
response = getJsonInfo(mgrURL, radio_url);
duration = 0
while ((response == None) and (duration < 300)):
print("LANforge GUI connection not found sleeping 5 seconds, tried: %s"%(mgrURL))
duration += 2
time.sleep(2)
response = getJsonInfo(mgrURL, radio_url)
if duration >= 300:
print("Could not connect to LANforge GUI")
sys.exit(1)
#Create stations and turn dhcp on
print("Creating station and turning on dhcp")
url = sta_url
debugOn = True
response = getJsonInfo(mgrURL, url)
if (response is not None):
if (response["interface"] is not None):
print("removing old station")
LFUtils.removePort(resource, sta_name, mgrURL)
time.sleep(5)
# See add_sta in LANforge CLI user guide
url = "cli-json/add_sta"
data = {
"shelf":1,
"resource":resource,
"radio":radio,
"sta_name":sta_name,
"ssid":dut_ssid,
"key":dut_passwd,
"mode":sta_mode,
"mac":"xx:xx:xx:xx:*:xx",
"flags":0x10000 # verbose, open
}
print("adding new station")
jsonReq(mgrURL, url, data)
reqURL = "cli-json/set_port"
data = {
"shelf":1,
"resource":resource,
"port":sta_name,
"current_flags": 0x80000000, # use DHCP, not down
"interest":0x4002 # set dhcp, current flags
}
print("configuring port")
jsonReq(mgrURL, reqURL, data)
time.sleep(5)
eth1IP = getJsonInfo(mgrURL, upstream_url)
if eth1IP['interface']['ip'] == "0.0.0.0":
print("Warning: %s lacks ip address"%(upstream_url))
reqURL = "cli-json/nc_show_ports"
data = { "shelf":1,
"resource":resource,
"port":sta_name,
"probe_flags":1 }
jsonReq(mgrURL, reqURL, data)
station_info = getJsonInfo(mgrURL, "%s?fields=port,ip,ap"%(sta_url))
duration = 0
maxTime = 300
ip = "0.0.0.0"
ap = ""
while ((ip == "0.0.0.0") and (duration < maxTime)):
duration += 2
time.sleep(2)
station_info = getJsonInfo(mgrURL, "%s?fields=port,ip,ap"%(sta_url))
#LFUtils.debug_printer.pprint(station_info)
if ((station_info is not None) and ("interface" in station_info)):
if ("ip" in station_info["interface"]):
ip = station_info["interface"]["ip"]
if ("ap" in station_info["interface"]):
ap = station_info["interface"]["ap"]
if ((ap == "Not-Associated") or (ap == "")):
print("Station waiting to associate...")
else:
if (ip == "0.0.0.0"):
print("Station waiting for IP ...")
if ((ap != "") and (ap != "Not-Associated")):
print("Connected to AP: %s"%(ap))
if (dut_bssid != ""):
if (dut_bssid.lower() == ap.lower()):
print("PASSED: Connected to BSSID: %s"%(ap))
else:
print("FAILED: Connected to wrong BSSID, requested: %s Actual: %s"%(dut_bssid, ap))
else:
print("FAILED: Did not connect to AP");
sys.exit(3)
if (ip is "0.0.0.0"):
print("FAILED: %s did not get an ip. Ending test"%(sta_name))
print("Cleaning up...")
removePort(resource, sta_name, mgrURL)
sys.exit(1)
else:
print("PASSED: Connected to AP: %s With IP: %s"%(ap, ip))
#create endpoints and cxs
# Create UDP endpoints
reqURL = "cli-json/add_endp"
data = {
"alias":"testUDP-A",
"shelf":1,
"resource":resource,
"port":sta_name,
"type":"lf_udp",
"ip_port":"-1",
"min_rate":1000000
}
jsonReq(mgrURL, reqURL, data)
reqURL = "cli-json/add_endp"
data = {
"alias":"testUDP-B",
"shelf":1,
"resource":upstream_resource,
"port":upstream_port,
"type":"lf_udp",
"ip_port":"-1",
"min_rate":1000000
}
jsonReq(mgrURL, reqURL, data)
# Create CX
reqURL = "cli-json/add_cx"
data = {
"alias":"testUDP",
"test_mgr":"default_tm",
"tx_endp":"testUDP-A",
"rx_endp":"testUDP-B",
}
jsonReq(mgrURL, reqURL, data)
# Create TCP endpoints
reqURL = "cli-json/add_endp"
data = {
"alias":"testTCP-A",
"shelf":1,
"resource":resource,
"port":sta_name,
"type":"lf_tcp",
"ip_port":"0",
"min_rate":1000000
}
jsonReq(mgrURL, reqURL, data)
reqURL = "cli-json/add_endp"
data = {
"alias":"testTCP-B",
"shelf":1,
"resource":upstream_resource,
"port":upstream_port,
"type":"lf_tcp",
"ip_port":"-1",
"min_rate":1000000
}
jsonReq(mgrURL, reqURL, data)
# Create CX
reqURL = "cli-json/add_cx"
data = {
"alias":"testTCP",
"test_mgr":"default_tm",
"tx_endp":"testTCP-A",
"rx_endp":"testTCP-B",
}
jsonReq(mgrURL, reqURL, data)
cxNames = ["testTCP","testUDP"]
endpNames = ["testTCP-A", "testTCP-B",
"testUDP-A", "testUDP-B"]
#start cx traffic
print("\nStarting CX Traffic")
for name in range(len(cxNames)):
reqURL = "cli-json/set_cx_state"
data = {
"test_mgr":"ALL",
"cx_name":cxNames[name],
"cx_state":"RUNNING"
} }
jsonReq(mgrURL, reqURL, data) print("Adding new station %s " % self.sta_name)
super().jsonPost(url, data)
# Refresh stats reqURL = "cli-json/set_port"
print("\nRefresh CX stats") data = {
for name in range(len(cxNames)): "shelf": 1,
reqURL = "cli-json/show_cxe" "resource": self.resource,
data = { "port": self.sta_name,
"test_mgr":"ALL", "current_flags": 0x80000000, # use DHCP, not down
"cross_connect":cxNames[name] "interest": 0x4002 # set dhcp, current flags
} }
jsonReq(mgrURL, reqURL, data) print("Configuring %s..." % self.sta_name)
super().jsonPost(reqURL, data)
#print("Sleeping for 15 seconds") reqURL = "cli-json/nc_show_ports"
time.sleep(15) data = {"shelf": 1,
"resource": self.resource,
"port": self.sta_name,
"probe_flags": 1}
super().jsonPost(reqURL, data)
LFUtils.waitUntilPortsAdminUp(self.resource, self.mgr_url, [self.sta_name])
#stop cx traffic # station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl()))
print("\nStopping CX Traffic") duration = 0
for name in range(len(cxNames)): maxTime = 300
reqURL = "cli-json/set_cx_state" ip = "0.0.0.0"
data = { ap = ""
"test_mgr":"ALL", while (ip == "0.0.0.0") and (duration < maxTime):
"cx_name":cxNames[name], duration += 2
"cx_state":"STOPPED" time.sleep(2)
station_info = super().jsonGet(f"{self.getStaUrl()}?fields=port,ip,ap")
# LFUtils.debug_printer.pprint(station_info)
if (station_info is not None) and ("interface" in station_info):
if "ip" in station_info["interface"]:
ip = station_info["interface"]["ip"]
if "ap" in station_info["interface"]:
ap = station_info["interface"]["ap"]
if (ap == "Not-Associated") or (ap == ""):
print("Waiting for %s associate to AP [%s]..." % (self.sta_name, ap))
else:
if ip == "0.0.0.0":
print("Waiting for %s to gain IP ..." % self.sta_name)
if (ap != "") and (ap != "Not-Associated"):
print(f"Connected to AP: {ap}")
if self.dut_bssid != "":
if self.dut_bssid.lower() == ap.lower():
print(f"PASSED: Connected to BSSID: {ap}")
else:
print("FAILED: Connected to wrong BSSID, requested: %s Actual: %s" % (self.dut_bssid, ap))
else:
print("FAILED: Did not connect to AP")
sys.exit(3)
if ip == "0.0.0.0":
print(f"FAILED: {self.sta_name} did not get an ip. Ending test")
print("Cleaning up...")
removePort(self.resource, self.sta_name, self.mgr_url)
sys.exit(1)
else:
print("PASSED: Connected to AP: %s With IP: %s" % (ap, ip))
# create endpoints and cxs
# Create UDP endpoints
reqURL = "cli-json/add_endp"
data = {
"alias": "testUDP-A",
"shelf": 1,
"resource": self.resource,
"port": self.sta_name,
"type": "lf_udp",
"ip_port": "-1",
"min_rate": 1000000
} }
jsonReq(mgrURL, reqURL, data) super().jsonPost(reqURL, data)
# Refresh stats reqURL = "cli-json/add_endp"
print("\nRefresh CX stats") data = {
for name in range(len(cxNames)): "alias": "testUDP-B",
reqURL = "cli-json/show_cxe" "shelf": 1,
data = { "resource": self.upstream_resource,
"test_mgr":"ALL", "port": self.upstream_port,
"cross_connect":cxNames[name] "type": "lf_udp",
"ip_port": "-1",
"min_rate": 1000000
} }
jsonReq(mgrURL, reqURL, data) super().jsonPost(reqURL, data)
#print("Sleeping for 5 seconds") # Create CX
time.sleep(5) reqURL = "cli-json/add_cx"
data = {
"alias": "testUDP",
"test_mgr": "default_tm",
"tx_endp": "testUDP-A",
"rx_endp": "testUDP-B",
}
super().jsonPost(reqURL, data)
#get data for endpoints JSON # Create TCP endpoints
print("Collecting Data") reqURL = "cli-json/add_endp"
try: data = {
ptestTCPA = getJsonInfo(mgrURL, "endp/testTCP-A?fields=tx+bytes,rx+bytes") "alias": "testTCP-A",
ptestTCPATX = ptestTCPA['endpoint']['tx bytes'] "shelf": 1,
ptestTCPARX = ptestTCPA['endpoint']['rx bytes'] "resource": self.resource,
"port": self.sta_name,
"type": "lf_tcp",
"ip_port": "0",
"min_rate": 1000000
}
super().jsonPost(reqURL, data)
ptestTCPB = getJsonInfo(mgrURL, "endp/testTCP-B?fields=tx+bytes,rx+bytes") reqURL = "cli-json/add_endp"
ptestTCPBTX = ptestTCPB['endpoint']['tx bytes'] data = {
ptestTCPBRX = ptestTCPB['endpoint']['rx bytes'] "alias": "testTCP-B",
"shelf": 1,
"resource": self.upstream_resource,
"port": self.upstream_port,
"type": "lf_tcp",
"ip_port": "-1",
"min_rate": 1000000
}
super().jsonPost(reqURL, data)
ptestUDPA = getJsonInfo(mgrURL, "endp/testUDP-A?fields=tx+bytes,rx+bytes") # Create CX
ptestUDPATX = ptestUDPA['endpoint']['tx bytes'] reqURL = "cli-json/add_cx"
ptestUDPARX = ptestUDPA['endpoint']['rx bytes'] data = {
"alias": "testTCP",
"test_mgr": "default_tm",
"tx_endp": "testTCP-A",
"rx_endp": "testTCP-B",
}
super().jsonPost(reqURL, data)
ptestUDPB = getJsonInfo(mgrURL, "endp/testUDP-B?fields=tx+bytes,rx+bytes") cxNames = ["testTCP", "testUDP"]
ptestUDPBTX = ptestUDPB['endpoint']['tx bytes'] endpNames = ["testTCP-A", "testTCP-B",
ptestUDPBRX = ptestUDPB['endpoint']['rx bytes'] "testUDP-A", "testUDP-B"]
except Exception as e:
print("Something went wrong")
print(e)
print("Cleaning up...")
reqURL = "cli-json/rm_vlan"
data = {
"shelf":1,
"resource":resource,
"port":sta_name
}
jsonReq(mgrURL, reqURL, data) # start cx traffic
print("\nStarting CX Traffic")
for name in range(len(cxNames)):
reqURL = "cli-json/set_cx_state"
data = {
"test_mgr": "ALL",
"cx_name": cxNames[name],
"cx_state": "RUNNING"
}
super().jsonPost(reqURL, data)
removeCX(mgrURL, cxNames) # Refresh stats
removeEndps(mgrURL, endpNames) print("\nRefresh CX stats")
sys.exit(1) for name in range(len(cxNames)):
reqURL = "cli-json/show_cxe"
data = {
"test_mgr": "ALL",
"cross_connect": cxNames[name]
}
super().jsonPost(reqURL, data)
#compare pre-test values to post-test values # print("Sleeping for 15 seconds")
time.sleep(15)
def compareVals(name, postVal): # stop cx traffic
#print(f"Comparing {name}") print("\nStopping CX Traffic")
if postVal > 0: for name in range(len(cxNames)):
print("PASSED: %s %s"%(name, postVal)) reqURL = "cli-json/set_cx_state"
else: data = {
print("FAILED: %s did not report traffic: %s"%(name, postVal)) "test_mgr": "ALL",
"cx_name": cxNames[name],
"cx_state": "STOPPED"
}
super().jsonPost(reqURL, data)
print("\n") # Refresh stats
compareVals("testTCP-A TX", ptestTCPATX) print("\nRefresh CX stats")
compareVals("testTCP-A RX", ptestTCPARX) for name in range(len(cxNames)):
reqURL = "cli-json/show_cxe"
data = {
"test_mgr": "ALL",
"cross_connect": cxNames[name]
}
super().jsonPost(reqURL, data)
compareVals("testTCP-B TX", ptestTCPBTX) # print("Sleeping for 5 seconds")
compareVals("testTCP-B RX", ptestTCPBRX) time.sleep(5)
compareVals("testUDP-A TX", ptestUDPATX) # get data for endpoints JSON
compareVals("testUDP-A RX", ptestUDPARX) print("Collecting Data")
try:
ptestTCPA = super().jsonGet("endp/testTCP-A?fields=tx+bytes,rx+bytes")
ptestTCPATX = ptestTCPA['endpoint']['tx bytes']
ptestTCPARX = ptestTCPA['endpoint']['rx bytes']
compareVals("testUDP-B TX", ptestUDPBTX) ptestTCPB = super().jsonGet("endp/testTCP-B?fields=tx+bytes,rx+bytes")
compareVals("testUDP-B RX", ptestUDPBRX) ptestTCPBTX = ptestTCPB['endpoint']['tx bytes']
print("\n") ptestTCPBRX = ptestTCPB['endpoint']['rx bytes']
ptestUDPA = super().jsonGet("endp/testUDP-A?fields=tx+bytes,rx+bytes")
ptestUDPATX = ptestUDPA['endpoint']['tx bytes']
ptestUDPARX = ptestUDPA['endpoint']['rx bytes']
ptestUDPB = super().jsonGet("endp/testUDP-B?fields=tx+bytes,rx+bytes")
ptestUDPBTX = ptestUDPB['endpoint']['tx bytes']
ptestUDPBRX = ptestUDPB['endpoint']['rx bytes']
except Exception as e:
print("Something went wrong")
print(e)
print("Cleaning up...")
reqURL = "cli-json/rm_vlan"
data = {
"shelf": 1,
"resource": self.resource,
"port": self.sta_name
}
self.jsonPost(reqURL, data)
removeCX(self.mgr_url, cxNames)
removeEndps(self.mgr_url, endpNames)
sys.exit(1)
print("\n")
self.compareVals("testTCP-A TX", ptestTCPATX)
self.compareVals("testTCP-A RX", ptestTCPARX)
self.compareVals("testTCP-B TX", ptestTCPBTX)
self.compareVals("testTCP-B RX", ptestTCPBRX)
self.compareVals("testUDP-A TX", ptestUDPATX)
self.compareVals("testUDP-A RX", ptestUDPARX)
self.compareVals("testUDP-B TX", ptestUDPBTX)
self.compareVals("testUDP-B RX", ptestUDPBRX)
print("\n")
# remove all endpoints and cxs
LFUtils.removePort(self.resource, self.sta_name, self.mgr_url)
removeCX(self.mgr_url, cxNames)
removeEndps(self.mgr_url, endpNames)
# ~class
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
#remove all endpoints and cxs
LFUtils.removePort(resource, sta_name, mgrURL)
removeCX(mgrURL, cxNames) def main():
removeEndps(mgrURL, endpNames) lfjson_host = "localhost"
lfjson_port = 8080
parser = argparse.ArgumentParser(
description="""LANforge Unit Test: Connect Station to AP
Example:
./sta_connect.py --dest 192.168.100.209 --dut_ssid OpenWrt-2 --dut_bssid 24:F5:A2:08:21:6C
""")
parser.add_argument("-d", "--dest", type=str, help="address of the LANforge GUI machine (localhost is default)")
parser.add_argument("-o", "--port", type=int, help="IP Port the LANforge GUI is listening on (8080 is default)")
parser.add_argument("-u", "--user", type=str, help="TBD: credential login/username")
parser.add_argument("-p", "--passwd", type=str, help="TBD: credential password")
parser.add_argument("--resource", type=str, help="LANforge Station resource ID to use, default is 1")
parser.add_argument("--upstream_resource", type=str, help="LANforge Ethernet port resource ID to use, default is 1")
parser.add_argument("--upstream_port", type=str, help="LANforge Ethernet port name, default is eth2")
parser.add_argument("--radio", type=str, help="LANforge radio to use, default is wiphy0")
parser.add_argument("--sta_mode", type=str,
help="LANforge station-mode setting (see add_sta LANforge CLI documentation, default is 0 (auto))")
parser.add_argument("--dut_ssid", type=str, help="DUT SSID")
parser.add_argument("--dut_passwd", type=str, help="DUT PSK password. Do not set for OPEN auth")
parser.add_argument("--dut_bssid", type=str, help="DUT BSSID to which we expect to connect.")
args = parser.parse_args()
if args.dest is not None:
lfjson_host = args.dest
if args.port is not None:
lfjson_port = args.port
staConnect = StaConnect(lfjson_host, lfjson_port)
if args.user is not None:
staConnect.user = args.user
if args.passwd is not None:
staConnect.passwd = args.passwd
if args.sta_mode is not None:
staConnect.sta_mode = args.sta_mode
if args.upstream_resource is not None:
staConnect.upstream_resource = args.upstream_resource
if args.upstream_port is not None:
staConnect.upstream_port = args.upstream_port
if args.radio is not None:
staConnect.radio = args.radio
if args.resource is not None:
staConnect.resource = args.resource
if args.dut_passwd is not None:
staConnect.dut_passwd = args.dut_passwd
if args.dut_bssid is not None:
staConnect.dut_bssid = args.dut_bssid
if args.dut_ssid is not None:
staConnect.dut_ssid = args.dut_ssid
staConnect.run()
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
if __name__ == "__main__":
main()