Files
wlan-lanforge-scripts/py-scripts/tip-cicd-sanity/single_client_throughput.py
2021-09-17 15:29:15 -07:00

1067 lines
45 KiB
Python
Executable File

#!/usr/bin/env python3
####################################################################################
# Script is based off of LANForge sta_connect2.py
# Script built for max throughput testing on a single client
# The main function of the script creates a station, then tests:
# 1. UDP Downstream (AP to STA)
# 2. UDP Upstream (STA to AP)
# 3. TCP Downstream (AP to STA)
# 4. TCP Upstream (STA to AP)
# The script will clean up the station and connections at the end of the test.
#
# Used by Throughput_Test ###########################################################
####################################################################################
# Script is based off of sta_connect2.py
# Script built for max throughput testing on a single client
# The main function of the script creates a station, then tests:
# 1. UDP Downstream (AP to STA)
# 2. UDP Upstream (STA to AP)
# 3. TCP Downstream (AP to STA)
# 4. TCP Upstream (STA to AP)
# The script will clean up the station and connections at the end of the test.
import sys
import os
import importlib
import csv
import argparse
import pprint
import time
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../../")))
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
removeCX = LFUtils.removeCX
removeEndps = LFUtils.removeEndps
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
LFCliBase = lfcli_base.LFCliBase
realm = importlib.import_module("py-json.realm")
Realm = realm.Realm
OPEN = "open"
WEP = "wep"
WPA = "wpa"
WPA2 = "wpa2"
MODE_AUTO = 0
class SingleClient(LFCliBase):
def __init__(self, host, port, _dut_ssid="jedway-open-1", _dut_passwd="NA", _dut_bssid="",
_user="", _passwd="", _sta_mode="0", _radio="wiphy0",
_resource=1, _upstream_resource=1, _upstream_port="eth1",
_sta_name=None, debug_=False, _dut_security=OPEN, _exit_on_error=False,
_cleanup_on_exit=True, _runtime_sec=60, _exit_on_fail=False):
# do not use `super(LFCLiBase,self).__init__(self, host, port, _debugOn)
# that is py2 era syntax and will force self into the host variable, making you
# very confused.
super().__init__(host, port, _debug=debug_, _exit_on_fail=_exit_on_fail)
self.debug = debug_
self.dut_security = _dut_security
self.dut_ssid = _dut_ssid
self.dut_passwd = _dut_passwd
self.dut_bssid = _dut_bssid
self.user = _user
self.passwd = _passwd
self.sta_mode = _sta_mode # See add_sta LANforge CLI users guide entry
self.radio = _radio
self.resource = _resource
self.upstream_resource = _upstream_resource
self.upstream_port = _upstream_port
self.runtime_secs = _runtime_sec
self.cleanup_on_exit = _cleanup_on_exit
self.sta_url_map = None # defer construction
self.upstream_url = None # defer construction
self.station_names = []
if _sta_name is not None:
self.station_names = [ _sta_name ]
# self.localrealm :Realm = Realm(lfclient_host=host, lfclient_port=port) # py > 3.6
self.localrealm = Realm(lfclient_host=host, lfclient_port=port) # py > 3.6
self.resulting_stations = {}
self.resulting_endpoints = {}
self.station_profile = None
self.l3_udp_profile = None
self.l3_tcp_profile = None
# def get_realm(self) -> Realm: # py > 3.6
def get_realm(self):
return self.localrealm
def get_station_url(self, sta_name_=None):
if sta_name_ is None:
raise ValueError("get_station_url wants a station name")
if self.sta_url_map is None:
self.sta_url_map = {}
for sta_name in self.station_names:
self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name)
return self.sta_url_map[sta_name_]
def get_upstream_url(self):
if self.upstream_url is None:
self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port)
return self.upstream_url
# Compare pre-test values to post-test values
def compare_vals(self, name, postVal, print_pass=False, print_fail=True):
# print(f"Comparing {name}")
if postVal > 0:
self._pass("%s %s" % (name, postVal), print_pass)
else:
self._fail("%s did not report traffic: %s" % (name, postVal), print_fail)
def remove_stations(self):
for name in self.station_names:
LFUtils.removePort(self.resource, name, self.lfclient_url)
def num_associated(self, bssid):
counter = 0
# print("there are %d results" % len(self.station_results))
fields = "_links,port,alias,ip,ap,port+type"
self.station_results = self.localrealm.find_ports_like("sta*", fields, debug_=False)
if (self.station_results is None) or (len(self.station_results) < 1):
self.get_failed_result_list()
for eid,record in self.station_results.items():
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
#pprint(eid)
#pprint(record)
if record["ap"] == bssid:
counter += 1
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
return counter
def clear_test_results(self):
self.resulting_stations = {}
self.resulting_endpoints = {}
super().clear_test_results()
#super(StaConnect, self).clear_test_results().test_results.clear()
def setup(self):
self.clear_test_results()
self.check_connect()
upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False)
if upstream_json is None:
self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True)
return False
if upstream_json['interface']['ip'] == "0.0.0.0":
if self.debug:
pprint.pprint(upstream_json)
self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True)
return False
# remove old stations
print("Removing old stations")
for sta_name in self.station_names:
sta_url = self.get_station_url(sta_name)
response = self.json_get(sta_url)
if (response is not None) and (response["interface"] is not None):
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names)
# Create stations and turn dhcp on
self.station_profile = self.localrealm.new_station_profile()
if self.dut_security == WPA2:
self.station_profile.use_security(security_type="wpa2", ssid=self.dut_ssid, passwd=self.dut_passwd)
elif self.dut_security == WPA:
self.station_profile.use_security(security_type="wpa", ssid=self.dut_ssid, passwd=self.dut_passwd)
elif self.dut_security == OPEN:
self.station_profile.use_security(security_type="open", ssid=self.dut_ssid, passwd="[BLANK]")
elif self.dut_security == WPA:
self.station_profile.use_security(security_type="wpa", ssid=self.dut_ssid, passwd=self.dut_passwd)
elif self.dut_security == WEP:
self.station_profile.use_security(security_type="wep", ssid=self.dut_ssid, passwd=self.dut_passwd)
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
print("Adding new stations ", end="")
self.station_profile.create(radio=self.radio, sta_names_=self.station_names, up_=False, debug=self.debug, suppress_related_commands_=True)
LFUtils.wait_until_ports_appear(self.lfclient_url, self.station_names, debug=self.debug)
def start(self):
if self.station_profile is None:
self._fail("Incorrect setup")
pprint.pprint(self.station_profile)
if self.station_profile.up is None:
self._fail("Incorrect station profile, missing profile.up")
if self.station_profile.up == False:
print("\nBringing ports up...")
data = {"shelf": 1,
"resource": self.resource,
"port": "ALL",
"probe_flags": 1}
self.json_post("/cli-json/nc_show_ports", data)
self.station_profile.admin_up()
LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names)
# station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl()))
duration = 0
maxTime = 100
ip = "0.0.0.0"
ap = ""
print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="")
connected_stations = {}
while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime):
duration += 3
time.sleep(10)
print(".", end="")
for sta_name in self.station_names:
sta_url = self.get_station_url(sta_name)
station_info = self.json_get(sta_url + "?fields=port,ip,ap")
# LFUtils.debug_printer.pprint(station_info)
if (station_info is not None) and ("interface" in station_info):
if "ip" in station_info["interface"]:
ip = station_info["interface"]["ip"]
if "ap" in station_info["interface"]:
ap = station_info["interface"]["ap"]
if (ap == "Not-Associated") or (ap == ""):
if self.debug:
print(" -%s," % sta_name, end="")
else:
if ip == "0.0.0.0":
if self.debug:
print(" %s (0.0.0.0)" % sta_name, end="")
else:
connected_stations[sta_name] = sta_url
data = {
"shelf":1,
"resource": self.resource,
"port": "ALL",
"probe_flags": 1
}
self.json_post("/cli-json/nc_show_ports", data)
for sta_name in self.station_names:
sta_url = self.get_station_url(sta_name)
station_info = self.json_get(sta_url) # + "?fields=port,ip,ap")
if station_info is None:
print("unable to query %s" % sta_url)
self.resulting_stations[sta_url] = station_info
ap = station_info["interface"]["ap"]
ip = station_info["interface"]["ip"]
if (ap != "") and (ap != "Not-Associated"):
print(" %s +AP %s, " % (sta_name, ap), end="")
if self.dut_bssid != "":
if self.dut_bssid.lower() == ap.lower():
self._pass(sta_name+" connected to BSSID: " + ap)
# self.test_results.append("PASSED: )
# print("PASSED: Connected to BSSID: "+ap)
else:
self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap))
else:
self._fail(sta_name+" did not connect to AP")
return False
if ip == "0.0.0.0":
self._fail("%s did not get an ip. Ending test" % sta_name)
else:
self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip))
if self.passes() == False:
if self.cleanup_on_exit:
print("Cleaning up...")
self.remove_stations()
return False
def udp_profile(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu):
# Create UDP endpoint - Alex's code!
self.l3_udp_tput_profile = self.localrealm.new_l3_cx_profile()
self.l3_udp_tput_profile.side_a_min_bps = side_a_min_bps
self.l3_udp_tput_profile.side_b_min_bps = side_b_min_bps
self.l3_udp_tput_profile.side_a_min_pdu = side_a_min_pdu
self.l3_udp_tput_profile.side_b_min_pdu = side_b_min_pdu
self.l3_udp_tput_profile.report_timer = 1000
self.l3_udp_tput_profile.name_prefix = "udp"
self.l3_udp_tput_profile.create(endp_type="lf_udp",
side_a=list(self.localrealm.find_ports_like("tput+")),
side_b="%d.%s" % (self.resource, self.upstream_port),
suppress_related_commands=True)
def tcp_profile(self, side_a_min_bps, side_b_min_bps):
# Create TCP endpoints - original code!
self.l3_tcp_tput_profile = self.localrealm.new_l3_cx_profile()
self.l3_tcp_tput_profile.side_a_min_bps = side_a_min_bps
self.l3_tcp_tput_profile.side_b_min_bps = side_b_min_bps
self.l3_tcp_tput_profile.name_prefix = "tcp"
self.l3_tcp_tput_profile.report_timer = 1000
self.l3_tcp_tput_profile.create(endp_type="lf_tcp",
side_a=list(self.localrealm.find_ports_like("tput+")),
side_b="%d.%s" % (self.resource, self.upstream_port),
suppress_related_commands=True)
# Start UDP Downstream Traffic
def udp_throughput(self):
print("\nStarting UDP Traffic")
self.l3_udp_tput_profile.start_cx()
time.sleep(1)
self.l3_udp_tput_profile.refresh_cx()
def tcp_throughput(self):
print("\nStarting TCP Traffic")
self.l3_tcp_tput_profile.start_cx()
time.sleep(1)
self.l3_tcp_tput_profile.refresh_cx()
def udp_stop(self):
# stop cx traffic
print("Stopping CX Traffic")
self.l3_udp_tput_profile.stop_cx()
# Refresh stats
print("\nRefresh CX stats")
self.l3_udp_tput_profile.refresh_cx()
print("Sleeping for 5 seconds")
time.sleep(5)
# get data for endpoints JSON
return self.collect_client_stats(self.l3_udp_tput_profile.created_cx)
# print("\n")
def tcp_stop(self):
# stop cx traffic
print("Stopping CX Traffic")
self.l3_tcp_tput_profile.stop_cx()
# Refresh stats
print("\nRefresh CX stats")
self.l3_tcp_tput_profile.refresh_cx()
print("Sleeping for 5 seconds")
time.sleep(5)
# get data for endpoints JSON
return self.collect_client_stats(self.l3_tcp_tput_profile.created_cx)
# print("\n")
# New Endpoint code to print TX and RX numbers
def collect_client_stats(self, endp_map):
print("Collecting Data")
fields="?fields=name,tx+bytes,rx+bytes"
for (cx_name, endps) in endp_map.items():
try:
endp_url = "/endp/%s%s" % (endps[0], fields)
endp_json = self.json_get(endp_url)
self.resulting_endpoints[endp_url] = endp_json
ptest_a_tx = endp_json['endpoint']['tx bytes']
ptest_a_rx = endp_json['endpoint']['rx bytes']
# ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"])
endp_url = "/endp/%s%s" % (endps[1], fields)
endp_json = self.json_get(endp_url)
self.resulting_endpoints[endp_url] = endp_json
ptest_b_tx = endp_json['endpoint']['tx bytes']
ptest_b_rx = endp_json['endpoint']['rx bytes']
byte_values = []
byte_values.append("Station TX: " + str(ptest_a_tx))
byte_values.append("Station RX: " + str(ptest_a_rx))
byte_values.append("AP TX: " + str(ptest_b_tx))
byte_values.append("AP RX: " + str(ptest_b_rx))
return byte_values
except Exception as e:
self.error(e)
def cleanup_udp(self):
# remove all endpoints and cxs
if self.cleanup_on_exit:
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
curr_endp_names = []
removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names())
for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
def cleanup_tcp(self):
# remove all endpoints and cxs
if self.cleanup_on_exit:
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
curr_endp_names = []
removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names())
for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
def cleanup(self):
# remove all endpoints and cxs
if self.cleanup_on_exit:
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
curr_endp_names = []
removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names())
removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names())
for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
removeEndps(self.lfclient_url, curr_endp_names, debug=self.debug)
def udp_unidirectional(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line):
self.udp_profile(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu)
self.start()
print("Running", direction, "Traffic for %s seconds" % self.runtime_secs)
self.udp_throughput()
print("napping %f sec" % self.runtime_secs)
time.sleep(self.runtime_secs)
values = self.udp_stop()
print(values)
# Get value required for measurement
bytes = values[values_line]
# Get value in Bits and convert to Mbps
bits = (int(bytes.split(": ", 1)[1])) * 8
mpbs = round((bits / 1000000) / self.runtime_secs, 2)
return mpbs
def tcp_unidirectional(self, side_a_min_bps, side_b_min_bps, direction, values_line):
self.tcp_profile(side_a_min_bps, side_b_min_bps)
self.start()
print("Running", direction, "Traffic for %s seconds" % self.runtime_secs)
self.tcp_throughput()
print("napping %f sec" % self.runtime_secs)
time.sleep(self.runtime_secs)
values = self.tcp_stop()
print(values)
# Get value required for measurement
bytes = values[values_line]
# Get value in Bits and convert to Mbps
bits = (int(bytes.split(": ", 1)[1])) * 8
mpbs = round((bits / 1000000) / self.runtime_secs, 2)
return mpbs
def throughput_csv(csv_file, ssid_name, ap_model, firmware, security, udp_ds, udp_us, tcp_ds, tcp_us):
# Find band for CSV ---> This code is not great, it SHOULD get that info from LANForge!
if "5G" in ssid_name:
frequency = "5 GHz"
elif "2dot4G" in ssid_name:
frequency = "2.4 GHz"
else:
frequency = "Unknown"
# Append row to top of CSV file
row = [ap_model, firmware, frequency, security, udp_ds, udp_us, tcp_ds, tcp_us]
with open(csv_file, 'r') as readFile:
reader = csv.reader(readFile)
lines = list(reader)
lines.insert(1, row)
with open(csv_file, 'w') as writeFile:
writer = csv.writer(writeFile)
writer.writerows(lines)
readFile.close()
writeFile.close()
class SingleClientEAP(LFCliBase):
def __init__(self, host, port, security=None, ssid=None, sta_list=None, number_template="00000", _debug_on=False, _dut_bssid="",
_exit_on_error=False, _sta_name=None, _resource=1, radio="wiphy0", key_mgmt="WPA-EAP", eap="", identity="",
ttls_passwd="", hessid=None, ttls_realm="", domain="", _exit_on_fail=False, _cleanup_on_exit=True):
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.ssid = ssid
self.radio = radio
self.security = security
#self.password = password
self.sta_list = sta_list
self.key_mgmt = key_mgmt
self.eap = eap
self.identity = identity
self.ttls_passwd = ttls_passwd
self.ttls_realm = ttls_realm
self.domain = domain
self.hessid = hessid
self.dut_bssid = _dut_bssid
self.timeout = 120
self.number_template = number_template
self.debug = _debug_on
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
#Added to test_ipv4_ttls code
self.upstream_url = None # defer construction
self.sta_url_map = None
self.upstream_resource = None
self.upstream_port = None
self.station_names = []
if _sta_name is not None:
self.station_names = [_sta_name]
self.localrealm = Realm(lfclient_host=host, lfclient_port=port)
self.resource = _resource
self.cleanup_on_exit = _cleanup_on_exit
self.resulting_stations = {}
self.resulting_endpoints = {}
self.station_profile = None
self.l3_udp_profile = None
self.l3_tcp_profile = None
# def get_realm(self) -> Realm: # py > 3.6
def get_realm(self):
return self.localrealm
def get_station_url(self, sta_name_=None):
if sta_name_ is None:
raise ValueError("get_station_url wants a station name")
if self.sta_url_map is None:
self.sta_url_map = {}
for sta_name in self.station_names:
self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name)
return self.sta_url_map[sta_name_]
def get_upstream_url(self):
if self.upstream_url is None:
self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port)
return self.upstream_url
# Compare pre-test values to post-test values
def compare_vals(self, name, postVal, print_pass=False, print_fail=True):
# print(f"Comparing {name}")
if postVal > 0:
self._pass("%s %s" % (name, postVal), print_pass)
else:
self._fail("%s did not report traffic: %s" % (name, postVal), print_fail)
def remove_stations(self):
for name in self.station_names:
LFUtils.removePort(self.resource, name, self.lfclient_url)
def num_associated(self, bssid):
counter = 0
# print("there are %d results" % len(self.station_results))
fields = "_links,port,alias,ip,ap,port+type"
self.station_results = self.localrealm.find_ports_like("eap*", fields, debug_=False)
if (self.station_results is None) or (len(self.station_results) < 1):
self.get_failed_result_list()
for eid,record in self.station_results.items():
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
#pprint(eid)
#pprint(record)
if record["ap"] == bssid:
counter += 1
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
return counter
def clear_test_results(self):
self.resulting_stations = {}
self.resulting_endpoints = {}
super().clear_test_results()
#super(StaConnect, self).clear_test_results().test_results.clear()
def setup(self):
self.clear_test_results()
self.check_connect()
upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False)
if upstream_json is None:
self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True)
return False
if upstream_json['interface']['ip'] == "0.0.0.0":
if self.debug:
pprint.pprint(upstream_json)
self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True)
return False
# remove old stations
print("Removing old stations")
for sta_name in self.station_names:
sta_url = self.get_station_url(sta_name)
response = self.json_get(sta_url)
if (response is not None) and (response["interface"] is not None):
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names)
# Create stations and turn dhcp on
self.station_profile = self.localrealm.new_station_profile()
# Build stations
self.station_profile.use_security(self.security, self.ssid, passwd="[BLANK]")
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.set_wifi_extra(key_mgmt=self.key_mgmt, eap=self.eap, identity=self.identity,
passwd=self.ttls_passwd,
realm=self.ttls_realm, domain=self.domain,
hessid=self.hessid)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug, use_radius=True, hs20_enable=False)
self._pass("PASS: Station build finished")
def start(self):
if self.station_profile is None:
self._fail("Incorrect setup")
pprint.pprint(self.station_profile)
if self.station_profile.up is None:
self._fail("Incorrect station profile, missing profile.up")
if self.station_profile.up == False:
print("\nBringing ports up...")
data = {"shelf": 1,
"resource": self.resource,
"port": "ALL",
"probe_flags": 1}
self.json_post("/cli-json/nc_show_ports", data)
self.station_profile.admin_up()
LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names)
# station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl()))
duration = 0
maxTime = 30
ip = "0.0.0.0"
ap = ""
print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="")
connected_stations = {}
while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime):
duration += 3
time.sleep(10)
print(".", end="")
for sta_name in self.station_names:
sta_url = self.get_station_url(sta_name)
station_info = self.json_get(sta_url + "?fields=port,ip,ap")
# LFUtils.debug_printer.pprint(station_info)
if (station_info is not None) and ("interface" in station_info):
if "ip" in station_info["interface"]:
ip = station_info["interface"]["ip"]
if "ap" in station_info["interface"]:
ap = station_info["interface"]["ap"]
if (ap == "Not-Associated") or (ap == ""):
if self.debug:
print(" -%s," % sta_name, end="")
else:
if ip == "0.0.0.0":
if self.debug:
print(" %s (0.0.0.0)" % sta_name, end="")
else:
connected_stations[sta_name] = sta_url
data = {
"shelf":1,
"resource": self.resource,
"port": "ALL",
"probe_flags": 1
}
self.json_post("/cli-json/nc_show_ports", data)
for sta_name in self.station_names:
sta_url = self.get_station_url(sta_name)
station_info = self.json_get(sta_url) # + "?fields=port,ip,ap")
if station_info is None:
print("unable to query %s" % sta_url)
self.resulting_stations[sta_url] = station_info
ap = station_info["interface"]["ap"]
ip = station_info["interface"]["ip"]
if (ap != "") and (ap != "Not-Associated"):
print(" %s +AP %s, " % (sta_name, ap), end="")
if self.dut_bssid != "":
if self.dut_bssid.lower() == ap.lower():
self._pass(sta_name+" connected to BSSID: " + ap)
# self.test_results.append("PASSED: )
# print("PASSED: Connected to BSSID: "+ap)
else:
self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap))
else:
self._fail(sta_name+" did not connect to AP")
return False
if ip == "0.0.0.0":
self._fail("%s did not get an ip. Ending test" % sta_name)
else:
self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip))
if self.passes() == False:
if self.cleanup_on_exit:
print("Cleaning up...")
self.remove_stations()
return False
def udp_profile(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu):
# Create UDP endpoint - Alex's code!
self.l3_udp_tput_profile = self.localrealm.new_l3_cx_profile()
self.l3_udp_tput_profile.side_a_min_bps = side_a_min_bps
self.l3_udp_tput_profile.side_b_min_bps = side_b_min_bps
self.l3_udp_tput_profile.side_a_min_pdu = side_a_min_pdu
self.l3_udp_tput_profile.side_b_min_pdu = side_b_min_pdu
self.l3_udp_tput_profile.report_timer = 1000
self.l3_udp_tput_profile.name_prefix = "udp"
self.l3_udp_tput_profile.create(endp_type="lf_udp",
side_a=list(self.localrealm.find_ports_like("tput+")),
side_b="%d.%s" % (self.resource, self.upstream_port),
suppress_related_commands=True)
def tcp_profile(self, side_a_min_bps, side_b_min_bps):
# Create TCP endpoints - original code!
self.l3_tcp_tput_profile = self.localrealm.new_l3_cx_profile()
self.l3_tcp_tput_profile.side_a_min_bps = side_a_min_bps
self.l3_tcp_tput_profile.side_b_min_bps = side_b_min_bps
self.l3_tcp_tput_profile.name_prefix = "tcp"
self.l3_tcp_tput_profile.report_timer = 1000
self.l3_tcp_tput_profile.create(endp_type="lf_tcp",
side_a=list(self.localrealm.find_ports_like("tput+")),
side_b="%d.%s" % (self.resource, self.upstream_port),
suppress_related_commands=True)
# Start UDP Downstream Traffic
def udp_throughput(self):
print("\nStarting UDP Traffic")
self.l3_udp_tput_profile.start_cx()
time.sleep(1)
self.l3_udp_tput_profile.refresh_cx()
def tcp_throughput(self):
print("\nStarting TCP Traffic")
self.l3_tcp_tput_profile.start_cx()
time.sleep(1)
self.l3_tcp_tput_profile.refresh_cx()
def udp_stop(self):
# stop cx traffic
print("Stopping CX Traffic")
self.l3_udp_tput_profile.stop_cx()
# Refresh stats
print("\nRefresh CX stats")
self.l3_udp_tput_profile.refresh_cx()
print("Sleeping for 5 seconds")
time.sleep(5)
# get data for endpoints JSON
return self.collect_client_stats(self.l3_udp_tput_profile.created_cx)
# print("\n")
def tcp_stop(self):
# stop cx traffic
print("Stopping CX Traffic")
self.l3_tcp_tput_profile.stop_cx()
# Refresh stats
print("\nRefresh CX stats")
self.l3_tcp_tput_profile.refresh_cx()
print("Sleeping for 5 seconds")
time.sleep(5)
# get data for endpoints JSON
return self.collect_client_stats(self.l3_tcp_tput_profile.created_cx)
# print("\n")
# New Endpoint code to print TX and RX numbers
def collect_client_stats(self, endp_map):
print("Collecting Data")
fields="?fields=name,tx+bytes,rx+bytes"
for (cx_name, endps) in endp_map.items():
try:
endp_url = "/endp/%s%s" % (endps[0], fields)
endp_json = self.json_get(endp_url)
self.resulting_endpoints[endp_url] = endp_json
ptest_a_tx = endp_json['endpoint']['tx bytes']
ptest_a_rx = endp_json['endpoint']['rx bytes']
# ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"])
endp_url = "/endp/%s%s" % (endps[1], fields)
endp_json = self.json_get(endp_url)
self.resulting_endpoints[endp_url] = endp_json
ptest_b_tx = endp_json['endpoint']['tx bytes']
ptest_b_rx = endp_json['endpoint']['rx bytes']
byte_values = []
byte_values.append("Station TX: " + str(ptest_a_tx))
byte_values.append("Station RX: " + str(ptest_a_rx))
byte_values.append("AP TX: " + str(ptest_b_tx))
byte_values.append("AP RX: " + str(ptest_b_rx))
return byte_values
except Exception as e:
self.error(e)
def cleanup_udp(self):
# remove all endpoints and cxs
if self.cleanup_on_exit:
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
curr_endp_names = []
removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names())
for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
def cleanup_tcp(self):
# remove all endpoints and cxs
if self.cleanup_on_exit:
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
curr_endp_names = []
removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names())
for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
def cleanup(self):
# remove all endpoints and cxs
if self.cleanup_on_exit:
for sta_name in self.station_names:
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
curr_endp_names = []
removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names())
removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names())
for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items():
curr_endp_names.append(endp_names[0])
curr_endp_names.append(endp_names[1])
removeEndps(self.lfclient_url, curr_endp_names, debug=self.debug)
def udp_unidirectional(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line):
self.udp_profile(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu)
self.start()
print("Running", direction, "Traffic for %s seconds" % self.runtime_secs)
self.udp_throughput()
print("napping %f sec" % self.runtime_secs)
time.sleep(self.runtime_secs)
values = self.udp_stop()
print(values)
# Get value required for measurement
bytes = values[values_line]
# Get value in Bits and convert to Mbps
bits = (int(bytes.split(": ", 1)[1])) * 8
mpbs = round((bits / 1000000) / self.runtime_secs, 2)
return mpbs
def tcp_unidirectional(self, side_a_min_bps, side_b_min_bps, direction, values_line):
self.tcp_profile(side_a_min_bps, side_b_min_bps)
self.start()
print("Running", direction, "Traffic for %s seconds" % self.runtime_secs)
self.tcp_throughput()
print("napping %f sec" % self.runtime_secs)
time.sleep(self.runtime_secs)
values = self.tcp_stop()
print(values)
# Get value required for measurement
bytes = values[values_line]
# Get value in Bits and convert to Mbps
bits = (int(bytes.split(": ", 1)[1])) * 8
mpbs = round((bits / 1000000) / self.runtime_secs, 2)
return mpbs
# ~class
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
########################## Test Code ################################
##Main will perform 4 throughput tests on SSID provided by input and return a list with the values
def main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime, upstream_port):
######## Establish Client Connection #########################
singleClient = SingleClient("10.10.10.201", 8080, debug_=False)
singleClient.sta_mode = 0
singleClient.upstream_resource = 1
singleClient.upstream_port = upstream_port
singleClient.radio = radio
singleClient.resource = 1
singleClient.dut_ssid = ssid_name
singleClient.dut_passwd = ssid_psk
singleClient.dut_security = security
singleClient.station_names = station
singleClient.runtime_secs = runtime
singleClient.cleanup_on_exit = True
#Create List for Throughput Data
tput_data = []
####### Setup UDP Profile and Run Traffic Downstream (AP to STA) #######################
singleClient.setup()
side_a_min_bps = 56000
side_b_min_bps = 500000000
side_a_min_pdu = 1200
side_b_min_pdu = 1500
direction = "Downstream"
values_line = 1 # 1 = Station Rx
try:
udp_ds = singleClient.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line)
print("UDP Downstream:", udp_ds, "Mbps")
tput_data.append("UDP Downstream: " + str(udp_ds))
except:
udp_ds = "error"
print("UDP Downstream Test Error")
tput_data.append("UDP Downstream: Error")
####### Setup UDP Profile and Run Traffic Upstream (STA to AP) #######################
#singleClient.setup()
side_a_min_bps = 500000000
side_b_min_bps = 0
side_a_min_pdu = 1200
side_b_min_pdu = 1500
direction = "Upstream"
values_line = 3 # 3 = AP Rx
try:
udp_us = singleClient.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line)
print("UDP Upstream:",udp_us,"Mbps")
tput_data.append("UDP Upstream: " + str(udp_us))
except:
udp_us = "error"
print("UDP Upstream Test Error")
tput_data.append("UDP Upstream: Error")
#Cleanup UDP Endpoints
#singleClient.cleanup_udp()
####### Setup TCP Profile and Run Traffic Downstream (AP to STA) #######################
#singleClient.setup()
side_a_min_bps = 0
side_b_min_bps = 500000000
direction = "Downstream"
values_line = 1 # 1 = Station Rx
try:
tcp_ds = singleClient.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line)
print("TCP Downstream:",tcp_ds,"Mbps")
tput_data.append("TCP Downstream: " + str(tcp_ds))
except:
tcp_ds = "error"
print("TCP Downstream Test Error")
tput_data.append("TCP Downstream: Error")
####### Setup TCP Profile and Run Traffic Upstream (STA to AP) #######################
#singleClient.setup()
side_a_min_bps = 500000000
side_b_min_bps = 0
direction = "Upstream"
values_line = 3 # 3 = AP Rx
try:
tcp_us = singleClient.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line)
print("TCP Upstream:",tcp_us,"Mbps")
tput_data.append("TCP Upstream: " + str(tcp_us))
except:
tcp_us = "error"
print("TCP Upstream Test Error")
tput_data.append("TCP Uptream: Error")
#Cleanup TCP Endpoints
#singleClient.cleanup_tcp()
#Cleanup Endpoints
singleClient.cleanup()
return(tput_data)
def eap_tput(sta_list, ssid_name, radio, security, eap_type, identity, ttls_password, upstream_port):
eap_connect = SingleClientEAP("10.10.10.201", 8080, _debug_on=False)
eap_connect.upstream_resource = 1
eap_connect.upstream_port = upstream_port
eap_connect.security = security
eap_connect.sta_list = sta_list
eap_connect.station_names = sta_list
eap_connect.ssid = ssid_name
eap_connect.radio = radio
eap_connect.eap = eap_type
eap_connect.identity = identity
eap_connect.ttls_passwd = ttls_password
eap_connect.runtime_secs = 10
#Create List for Throughput Data
tput_data = []
####### Setup UDP Profile and Run Traffic Downstream (AP to STA) #######################
eap_connect.setup()
side_a_min_bps = 56000
side_b_min_bps = 500000000
side_a_min_pdu = 1200
side_b_min_pdu = 1500
direction = "Downstream"
values_line = 1 # 1 = Station Rx
try:
udp_ds = eap_connect.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line)
print("UDP Downstream:", udp_ds, "Mbps")
tput_data.append("UDP Downstream: " + str(udp_ds))
except:
udp_ds = "error"
print("UDP Downstream Test Error")
tput_data.append("UDP Downstream: Error")
####### Setup UDP Profile and Run Traffic Upstream (STA to AP) #######################
#singleClient.setup()
side_a_min_bps = 500000000
side_b_min_bps = 0
side_a_min_pdu = 1200
side_b_min_pdu = 1500
direction = "Upstream"
values_line = 3 # 3 = AP Rx
try:
udp_us = eap_connect.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line)
print("UDP Upstream:",udp_us,"Mbps")
tput_data.append("UDP Upstream: " + str(udp_us))
except:
udp_us = "error"
print("UDP Upstream Test Error")
tput_data.append("UDP Upstream: Error")
#Cleanup UDP Endpoints
#singleClient.cleanup_udp()
####### Setup TCP Profile and Run Traffic Downstream (AP to STA) #######################
#singleClient.setup()
side_a_min_bps = 0
side_b_min_bps = 500000000
direction = "Downstream"
values_line = 1 # 1 = Station Rx
try:
tcp_ds = eap_connect.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line)
print("TCP Downstream:",tcp_ds,"Mbps")
tput_data.append("TCP Downstream: " + str(tcp_ds))
except:
tcp_ds = "error"
print("TCP Downstream Test Error")
tput_data.append("TCP Downstream: Error")
####### Setup TCP Profile and Run Traffic Upstream (STA to AP) #######################
#singleClient.setup()
side_a_min_bps = 500000000
side_b_min_bps = 0
direction = "Upstream"
values_line = 3 # 3 = AP Rx
try:
tcp_us = eap_connect.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line)
print("TCP Upstream:",tcp_us,"Mbps")
tput_data.append("TCP Upstream: " + str(tcp_us))
except:
tcp_us = "error"
print("TCP Upstream Test Error")
tput_data.append("TCP Uptream: Error")
#Cleanup TCP Endpoints
#singleClient.cleanup_tcp()
#Cleanup Endpoints
eap_connect.cleanup()
return(tput_data)