mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-11-03 12:27:53 +00:00
This is pure 'git mv'. No changes were made to files. They will not work until we fix up the include paths again, that will be handled in future commits. Signed-off-by: Ben Greear <greearb@candelatech.com>
1065 lines
45 KiB
Python
Executable File
1065 lines
45 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
####################################################################################
|
|
# Script is based off of LANForge sta_connect2.py
|
|
# Script built for max throughput testing on a single client
|
|
# The main function of the script creates a station, then tests:
|
|
# 1. UDP Downstream (AP to STA)
|
|
# 2. UDP Upstream (STA to AP)
|
|
# 3. TCP Downstream (AP to STA)
|
|
# 4. TCP Upstream (STA to AP)
|
|
# The script will clean up the station and connections at the end of the test.
|
|
#
|
|
# Used by Throughput_Test ###########################################################
|
|
####################################################################################
|
|
|
|
# Script is based off of sta_connect2.py
|
|
# Script built for max throughput testing on a single client
|
|
# The main function of the script creates a station, then tests:
|
|
# 1. UDP Downstream (AP to STA)
|
|
# 2. UDP Upstream (STA to AP)
|
|
# 3. TCP Downstream (AP to STA)
|
|
# 4. TCP Upstream (STA to AP)
|
|
# The script will clean up the station and connections at the end of the test.
|
|
|
|
import sys
|
|
import csv
|
|
|
|
if sys.version_info[0] != 3:
|
|
print("This script requires Python 3")
|
|
exit(1)
|
|
|
|
if 'py-json' not in sys.path:
|
|
sys.path.append('../../py-json')
|
|
|
|
import argparse
|
|
from LANforge import LFUtils
|
|
# from LANforge import LFCliBase
|
|
from LANforge import lfcli_base
|
|
from LANforge.lfcli_base import LFCliBase
|
|
from LANforge.LFUtils import *
|
|
import realm
|
|
from realm import Realm
|
|
import pprint
|
|
|
|
OPEN="open"
|
|
WEP="wep"
|
|
WPA="wpa"
|
|
WPA2="wpa2"
|
|
MODE_AUTO=0
|
|
|
|
class SingleClient(LFCliBase):
|
|
def __init__(self, host, port, _dut_ssid="jedway-open-1", _dut_passwd="NA", _dut_bssid="",
|
|
_user="", _passwd="", _sta_mode="0", _radio="wiphy0",
|
|
_resource=1, _upstream_resource=1, _upstream_port="eth1",
|
|
_sta_name=None, debug_=False, _dut_security=OPEN, _exit_on_error=False,
|
|
_cleanup_on_exit=True, _runtime_sec=60, _exit_on_fail=False):
|
|
# do not use `super(LFCLiBase,self).__init__(self, host, port, _debugOn)
|
|
# that is py2 era syntax and will force self into the host variable, making you
|
|
# very confused.
|
|
super().__init__(host, port, _debug=debug_, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
|
|
self.debug = debug_
|
|
self.dut_security = _dut_security
|
|
self.dut_ssid = _dut_ssid
|
|
self.dut_passwd = _dut_passwd
|
|
self.dut_bssid = _dut_bssid
|
|
self.user = _user
|
|
self.passwd = _passwd
|
|
self.sta_mode = _sta_mode # See add_sta LANforge CLI users guide entry
|
|
self.radio = _radio
|
|
self.resource = _resource
|
|
self.upstream_resource = _upstream_resource
|
|
self.upstream_port = _upstream_port
|
|
self.runtime_secs = _runtime_sec
|
|
self.cleanup_on_exit = _cleanup_on_exit
|
|
self.sta_url_map = None # defer construction
|
|
self.upstream_url = None # defer construction
|
|
self.station_names = []
|
|
if _sta_name is not None:
|
|
self.station_names = [ _sta_name ]
|
|
# self.localrealm :Realm = Realm(lfclient_host=host, lfclient_port=port) # py > 3.6
|
|
self.localrealm = Realm(lfclient_host=host, lfclient_port=port) # py > 3.6
|
|
self.resulting_stations = {}
|
|
self.resulting_endpoints = {}
|
|
self.station_profile = None
|
|
self.l3_udp_profile = None
|
|
self.l3_tcp_profile = None
|
|
|
|
# def get_realm(self) -> Realm: # py > 3.6
|
|
def get_realm(self):
|
|
return self.localrealm
|
|
|
|
def get_station_url(self, sta_name_=None):
|
|
if sta_name_ is None:
|
|
raise ValueError("get_station_url wants a station name")
|
|
if self.sta_url_map is None:
|
|
self.sta_url_map = {}
|
|
for sta_name in self.station_names:
|
|
self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name)
|
|
return self.sta_url_map[sta_name_]
|
|
|
|
def get_upstream_url(self):
|
|
if self.upstream_url is None:
|
|
self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port)
|
|
return self.upstream_url
|
|
|
|
# Compare pre-test values to post-test values
|
|
def compare_vals(self, name, postVal, print_pass=False, print_fail=True):
|
|
# print(f"Comparing {name}")
|
|
if postVal > 0:
|
|
self._pass("%s %s" % (name, postVal), print_pass)
|
|
else:
|
|
self._fail("%s did not report traffic: %s" % (name, postVal), print_fail)
|
|
|
|
def remove_stations(self):
|
|
for name in self.station_names:
|
|
LFUtils.removePort(self.resource, name, self.lfclient_url)
|
|
|
|
def num_associated(self, bssid):
|
|
counter = 0
|
|
# print("there are %d results" % len(self.station_results))
|
|
fields = "_links,port,alias,ip,ap,port+type"
|
|
self.station_results = self.localrealm.find_ports_like("sta*", fields, debug_=False)
|
|
if (self.station_results is None) or (len(self.station_results) < 1):
|
|
self.get_failed_result_list()
|
|
for eid,record in self.station_results.items():
|
|
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
|
|
#pprint(eid)
|
|
#pprint(record)
|
|
if record["ap"] == bssid:
|
|
counter += 1
|
|
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
|
|
return counter
|
|
|
|
def clear_test_results(self):
|
|
self.resulting_stations = {}
|
|
self.resulting_endpoints = {}
|
|
super().clear_test_results()
|
|
#super(StaConnect, self).clear_test_results().test_results.clear()
|
|
|
|
def setup(self):
|
|
self.clear_test_results()
|
|
self.check_connect()
|
|
upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False)
|
|
|
|
if upstream_json is None:
|
|
self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True)
|
|
return False
|
|
|
|
if upstream_json['interface']['ip'] == "0.0.0.0":
|
|
if self.debug:
|
|
pprint.pprint(upstream_json)
|
|
self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True)
|
|
return False
|
|
|
|
# remove old stations
|
|
print("Removing old stations")
|
|
for sta_name in self.station_names:
|
|
sta_url = self.get_station_url(sta_name)
|
|
response = self.json_get(sta_url)
|
|
if (response is not None) and (response["interface"] is not None):
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names)
|
|
|
|
# Create stations and turn dhcp on
|
|
self.station_profile = self.localrealm.new_station_profile()
|
|
|
|
if self.dut_security == WPA2:
|
|
self.station_profile.use_security(security_type="wpa2", ssid=self.dut_ssid, passwd=self.dut_passwd)
|
|
elif self.dut_security == WPA:
|
|
self.station_profile.use_security(security_type="wpa", ssid=self.dut_ssid, passwd=self.dut_passwd)
|
|
elif self.dut_security == OPEN:
|
|
self.station_profile.use_security(security_type="open", ssid=self.dut_ssid, passwd="[BLANK]")
|
|
elif self.dut_security == WPA:
|
|
self.station_profile.use_security(security_type="wpa", ssid=self.dut_ssid, passwd=self.dut_passwd)
|
|
elif self.dut_security == WEP:
|
|
self.station_profile.use_security(security_type="wep", ssid=self.dut_ssid, passwd=self.dut_passwd)
|
|
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
|
|
|
print("Adding new stations ", end="")
|
|
self.station_profile.create(radio=self.radio, sta_names_=self.station_names, up_=False, debug=self.debug, suppress_related_commands_=True)
|
|
LFUtils.wait_until_ports_appear(self.lfclient_url, self.station_names, debug=self.debug)
|
|
|
|
def start(self):
|
|
if self.station_profile is None:
|
|
self._fail("Incorrect setup")
|
|
pprint.pprint(self.station_profile)
|
|
if self.station_profile.up is None:
|
|
self._fail("Incorrect station profile, missing profile.up")
|
|
if self.station_profile.up == False:
|
|
print("\nBringing ports up...")
|
|
data = {"shelf": 1,
|
|
"resource": self.resource,
|
|
"port": "ALL",
|
|
"probe_flags": 1}
|
|
self.json_post("/cli-json/nc_show_ports", data)
|
|
self.station_profile.admin_up()
|
|
LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names)
|
|
|
|
# station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl()))
|
|
duration = 0
|
|
maxTime = 100
|
|
ip = "0.0.0.0"
|
|
ap = ""
|
|
print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="")
|
|
connected_stations = {}
|
|
while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime):
|
|
duration += 3
|
|
time.sleep(10)
|
|
print(".", end="")
|
|
for sta_name in self.station_names:
|
|
sta_url = self.get_station_url(sta_name)
|
|
station_info = self.json_get(sta_url + "?fields=port,ip,ap")
|
|
|
|
# LFUtils.debug_printer.pprint(station_info)
|
|
if (station_info is not None) and ("interface" in station_info):
|
|
if "ip" in station_info["interface"]:
|
|
ip = station_info["interface"]["ip"]
|
|
if "ap" in station_info["interface"]:
|
|
ap = station_info["interface"]["ap"]
|
|
|
|
if (ap == "Not-Associated") or (ap == ""):
|
|
if self.debug:
|
|
print(" -%s," % sta_name, end="")
|
|
else:
|
|
if ip == "0.0.0.0":
|
|
if self.debug:
|
|
print(" %s (0.0.0.0)" % sta_name, end="")
|
|
else:
|
|
connected_stations[sta_name] = sta_url
|
|
data = {
|
|
"shelf":1,
|
|
"resource": self.resource,
|
|
"port": "ALL",
|
|
"probe_flags": 1
|
|
}
|
|
self.json_post("/cli-json/nc_show_ports", data)
|
|
|
|
for sta_name in self.station_names:
|
|
sta_url = self.get_station_url(sta_name)
|
|
station_info = self.json_get(sta_url) # + "?fields=port,ip,ap")
|
|
if station_info is None:
|
|
print("unable to query %s" % sta_url)
|
|
self.resulting_stations[sta_url] = station_info
|
|
ap = station_info["interface"]["ap"]
|
|
ip = station_info["interface"]["ip"]
|
|
if (ap != "") and (ap != "Not-Associated"):
|
|
print(" %s +AP %s, " % (sta_name, ap), end="")
|
|
if self.dut_bssid != "":
|
|
if self.dut_bssid.lower() == ap.lower():
|
|
self._pass(sta_name+" connected to BSSID: " + ap)
|
|
# self.test_results.append("PASSED: )
|
|
# print("PASSED: Connected to BSSID: "+ap)
|
|
else:
|
|
self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap))
|
|
else:
|
|
self._fail(sta_name+" did not connect to AP")
|
|
return False
|
|
|
|
if ip == "0.0.0.0":
|
|
self._fail("%s did not get an ip. Ending test" % sta_name)
|
|
else:
|
|
self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip))
|
|
|
|
if self.passes() == False:
|
|
if self.cleanup_on_exit:
|
|
print("Cleaning up...")
|
|
self.remove_stations()
|
|
return False
|
|
|
|
def udp_profile(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu):
|
|
# Create UDP endpoint - Alex's code!
|
|
self.l3_udp_tput_profile = self.localrealm.new_l3_cx_profile()
|
|
self.l3_udp_tput_profile.side_a_min_bps = side_a_min_bps
|
|
self.l3_udp_tput_profile.side_b_min_bps = side_b_min_bps
|
|
self.l3_udp_tput_profile.side_a_min_pdu = side_a_min_pdu
|
|
self.l3_udp_tput_profile.side_b_min_pdu = side_b_min_pdu
|
|
self.l3_udp_tput_profile.report_timer = 1000
|
|
self.l3_udp_tput_profile.name_prefix = "udp"
|
|
self.l3_udp_tput_profile.create(endp_type="lf_udp",
|
|
side_a=list(self.localrealm.find_ports_like("tput+")),
|
|
side_b="%d.%s" % (self.resource, self.upstream_port),
|
|
suppress_related_commands=True)
|
|
|
|
def tcp_profile(self, side_a_min_bps, side_b_min_bps):
|
|
# Create TCP endpoints - original code!
|
|
self.l3_tcp_tput_profile = self.localrealm.new_l3_cx_profile()
|
|
self.l3_tcp_tput_profile.side_a_min_bps = side_a_min_bps
|
|
self.l3_tcp_tput_profile.side_b_min_bps = side_b_min_bps
|
|
self.l3_tcp_tput_profile.name_prefix = "tcp"
|
|
self.l3_tcp_tput_profile.report_timer = 1000
|
|
self.l3_tcp_tput_profile.create(endp_type="lf_tcp",
|
|
side_a=list(self.localrealm.find_ports_like("tput+")),
|
|
side_b="%d.%s" % (self.resource, self.upstream_port),
|
|
suppress_related_commands=True)
|
|
|
|
# Start UDP Downstream Traffic
|
|
def udp_throughput(self):
|
|
print("\nStarting UDP Traffic")
|
|
self.l3_udp_tput_profile.start_cx()
|
|
time.sleep(1)
|
|
self.l3_udp_tput_profile.refresh_cx()
|
|
|
|
def tcp_throughput(self):
|
|
print("\nStarting TCP Traffic")
|
|
self.l3_tcp_tput_profile.start_cx()
|
|
time.sleep(1)
|
|
self.l3_tcp_tput_profile.refresh_cx()
|
|
|
|
def udp_stop(self):
|
|
# stop cx traffic
|
|
print("Stopping CX Traffic")
|
|
self.l3_udp_tput_profile.stop_cx()
|
|
|
|
# Refresh stats
|
|
print("\nRefresh CX stats")
|
|
self.l3_udp_tput_profile.refresh_cx()
|
|
|
|
print("Sleeping for 5 seconds")
|
|
time.sleep(5)
|
|
|
|
# get data for endpoints JSON
|
|
return self.collect_client_stats(self.l3_udp_tput_profile.created_cx)
|
|
# print("\n")
|
|
|
|
def tcp_stop(self):
|
|
# stop cx traffic
|
|
print("Stopping CX Traffic")
|
|
self.l3_tcp_tput_profile.stop_cx()
|
|
|
|
# Refresh stats
|
|
print("\nRefresh CX stats")
|
|
self.l3_tcp_tput_profile.refresh_cx()
|
|
|
|
print("Sleeping for 5 seconds")
|
|
time.sleep(5)
|
|
|
|
# get data for endpoints JSON
|
|
return self.collect_client_stats(self.l3_tcp_tput_profile.created_cx)
|
|
# print("\n")
|
|
|
|
# New Endpoint code to print TX and RX numbers
|
|
def collect_client_stats(self, endp_map):
|
|
print("Collecting Data")
|
|
fields="?fields=name,tx+bytes,rx+bytes"
|
|
for (cx_name, endps) in endp_map.items():
|
|
try:
|
|
endp_url = "/endp/%s%s" % (endps[0], fields)
|
|
endp_json = self.json_get(endp_url)
|
|
self.resulting_endpoints[endp_url] = endp_json
|
|
ptest_a_tx = endp_json['endpoint']['tx bytes']
|
|
ptest_a_rx = endp_json['endpoint']['rx bytes']
|
|
|
|
# ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"])
|
|
endp_url = "/endp/%s%s" % (endps[1], fields)
|
|
endp_json = self.json_get(endp_url)
|
|
self.resulting_endpoints[endp_url] = endp_json
|
|
ptest_b_tx = endp_json['endpoint']['tx bytes']
|
|
ptest_b_rx = endp_json['endpoint']['rx bytes']
|
|
|
|
byte_values = []
|
|
byte_values.append("Station TX: " + str(ptest_a_tx))
|
|
byte_values.append("Station RX: " + str(ptest_a_rx))
|
|
byte_values.append("AP TX: " + str(ptest_b_tx))
|
|
byte_values.append("AP RX: " + str(ptest_b_rx))
|
|
|
|
return byte_values
|
|
|
|
except Exception as e:
|
|
self.error(e)
|
|
|
|
def cleanup_udp(self):
|
|
# remove all endpoints and cxs
|
|
if self.cleanup_on_exit:
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
curr_endp_names = []
|
|
removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names())
|
|
for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
|
|
|
|
def cleanup_tcp(self):
|
|
# remove all endpoints and cxs
|
|
if self.cleanup_on_exit:
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
curr_endp_names = []
|
|
removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names())
|
|
for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
|
|
|
|
def cleanup(self):
|
|
# remove all endpoints and cxs
|
|
if self.cleanup_on_exit:
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
curr_endp_names = []
|
|
removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names())
|
|
removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names())
|
|
for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
removeEndps(self.lfclient_url, curr_endp_names, debug=self.debug)
|
|
|
|
def udp_unidirectional(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line):
|
|
self.udp_profile(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu)
|
|
self.start()
|
|
print("Running", direction, "Traffic for %s seconds" % self.runtime_secs)
|
|
self.udp_throughput()
|
|
print("napping %f sec" % self.runtime_secs)
|
|
time.sleep(self.runtime_secs)
|
|
values = self.udp_stop()
|
|
print(values)
|
|
# Get value required for measurement
|
|
bytes = values[values_line]
|
|
# Get value in Bits and convert to Mbps
|
|
bits = (int(bytes.split(": ", 1)[1])) * 8
|
|
mpbs = round((bits / 1000000) / self.runtime_secs, 2)
|
|
return mpbs
|
|
|
|
def tcp_unidirectional(self, side_a_min_bps, side_b_min_bps, direction, values_line):
|
|
self.tcp_profile(side_a_min_bps, side_b_min_bps)
|
|
self.start()
|
|
print("Running", direction, "Traffic for %s seconds" % self.runtime_secs)
|
|
self.tcp_throughput()
|
|
print("napping %f sec" % self.runtime_secs)
|
|
time.sleep(self.runtime_secs)
|
|
values = self.tcp_stop()
|
|
print(values)
|
|
# Get value required for measurement
|
|
bytes = values[values_line]
|
|
# Get value in Bits and convert to Mbps
|
|
bits = (int(bytes.split(": ", 1)[1])) * 8
|
|
mpbs = round((bits / 1000000) / self.runtime_secs, 2)
|
|
return mpbs
|
|
|
|
def throughput_csv(csv_file, ssid_name, ap_model, firmware, security, udp_ds, udp_us, tcp_ds, tcp_us):
|
|
# Find band for CSV ---> This code is not great, it SHOULD get that info from LANForge!
|
|
if "5G" in ssid_name:
|
|
frequency = "5 GHz"
|
|
|
|
elif "2dot4G" in ssid_name:
|
|
frequency = "2.4 GHz"
|
|
|
|
else:
|
|
frequency = "Unknown"
|
|
|
|
# Append row to top of CSV file
|
|
row = [ap_model, firmware, frequency, security, udp_ds, udp_us, tcp_ds, tcp_us]
|
|
|
|
with open(csv_file, 'r') as readFile:
|
|
reader = csv.reader(readFile)
|
|
lines = list(reader)
|
|
lines.insert(1, row)
|
|
|
|
with open(csv_file, 'w') as writeFile:
|
|
writer = csv.writer(writeFile)
|
|
writer.writerows(lines)
|
|
|
|
readFile.close()
|
|
writeFile.close()
|
|
|
|
|
|
class SingleClientEAP(LFCliBase):
|
|
def __init__(self, host, port, security=None, ssid=None, sta_list=None, number_template="00000", _debug_on=False, _dut_bssid="",
|
|
_exit_on_error=False, _sta_name=None, _resource=1, radio="wiphy0", key_mgmt="WPA-EAP", eap="", identity="",
|
|
ttls_passwd="", hessid=None, ttls_realm="", domain="", _exit_on_fail=False, _cleanup_on_exit=True):
|
|
super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
|
|
self.host = host
|
|
self.port = port
|
|
self.ssid = ssid
|
|
self.radio = radio
|
|
self.security = security
|
|
#self.password = password
|
|
self.sta_list = sta_list
|
|
self.key_mgmt = key_mgmt
|
|
self.eap = eap
|
|
self.identity = identity
|
|
self.ttls_passwd = ttls_passwd
|
|
self.ttls_realm = ttls_realm
|
|
self.domain = domain
|
|
self.hessid = hessid
|
|
self.dut_bssid = _dut_bssid
|
|
self.timeout = 120
|
|
self.number_template = number_template
|
|
self.debug = _debug_on
|
|
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
|
self.station_profile = self.local_realm.new_station_profile()
|
|
self.station_profile.lfclient_url = self.lfclient_url
|
|
self.station_profile.ssid = self.ssid
|
|
self.station_profile.security = self.security
|
|
self.station_profile.number_template_ = self.number_template
|
|
self.station_profile.mode = 0
|
|
#Added to test_ipv4_ttls code
|
|
self.upstream_url = None # defer construction
|
|
self.sta_url_map = None
|
|
self.upstream_resource = None
|
|
self.upstream_port = None
|
|
self.station_names = []
|
|
if _sta_name is not None:
|
|
self.station_names = [_sta_name]
|
|
self.localrealm = Realm(lfclient_host=host, lfclient_port=port)
|
|
self.resource = _resource
|
|
self.cleanup_on_exit = _cleanup_on_exit
|
|
self.resulting_stations = {}
|
|
self.resulting_endpoints = {}
|
|
self.station_profile = None
|
|
self.l3_udp_profile = None
|
|
self.l3_tcp_profile = None
|
|
|
|
# def get_realm(self) -> Realm: # py > 3.6
|
|
def get_realm(self):
|
|
return self.localrealm
|
|
|
|
def get_station_url(self, sta_name_=None):
|
|
if sta_name_ is None:
|
|
raise ValueError("get_station_url wants a station name")
|
|
if self.sta_url_map is None:
|
|
self.sta_url_map = {}
|
|
for sta_name in self.station_names:
|
|
self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name)
|
|
return self.sta_url_map[sta_name_]
|
|
|
|
def get_upstream_url(self):
|
|
if self.upstream_url is None:
|
|
self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port)
|
|
return self.upstream_url
|
|
|
|
# Compare pre-test values to post-test values
|
|
def compare_vals(self, name, postVal, print_pass=False, print_fail=True):
|
|
# print(f"Comparing {name}")
|
|
if postVal > 0:
|
|
self._pass("%s %s" % (name, postVal), print_pass)
|
|
else:
|
|
self._fail("%s did not report traffic: %s" % (name, postVal), print_fail)
|
|
|
|
def remove_stations(self):
|
|
for name in self.station_names:
|
|
LFUtils.removePort(self.resource, name, self.lfclient_url)
|
|
|
|
def num_associated(self, bssid):
|
|
counter = 0
|
|
# print("there are %d results" % len(self.station_results))
|
|
fields = "_links,port,alias,ip,ap,port+type"
|
|
self.station_results = self.localrealm.find_ports_like("eap*", fields, debug_=False)
|
|
if (self.station_results is None) or (len(self.station_results) < 1):
|
|
self.get_failed_result_list()
|
|
for eid,record in self.station_results.items():
|
|
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
|
|
#pprint(eid)
|
|
#pprint(record)
|
|
if record["ap"] == bssid:
|
|
counter += 1
|
|
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
|
|
return counter
|
|
|
|
def clear_test_results(self):
|
|
self.resulting_stations = {}
|
|
self.resulting_endpoints = {}
|
|
super().clear_test_results()
|
|
#super(StaConnect, self).clear_test_results().test_results.clear()
|
|
|
|
def setup(self):
|
|
self.clear_test_results()
|
|
self.check_connect()
|
|
upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False)
|
|
|
|
if upstream_json is None:
|
|
self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True)
|
|
return False
|
|
|
|
if upstream_json['interface']['ip'] == "0.0.0.0":
|
|
if self.debug:
|
|
pprint.pprint(upstream_json)
|
|
self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True)
|
|
return False
|
|
|
|
# remove old stations
|
|
print("Removing old stations")
|
|
for sta_name in self.station_names:
|
|
sta_url = self.get_station_url(sta_name)
|
|
response = self.json_get(sta_url)
|
|
if (response is not None) and (response["interface"] is not None):
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names)
|
|
|
|
# Create stations and turn dhcp on
|
|
self.station_profile = self.localrealm.new_station_profile()
|
|
|
|
# Build stations
|
|
self.station_profile.use_security(self.security, self.ssid, passwd="[BLANK]")
|
|
self.station_profile.set_number_template(self.number_template)
|
|
print("Creating stations")
|
|
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
|
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
|
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
|
self.station_profile.set_wifi_extra(key_mgmt=self.key_mgmt, eap=self.eap, identity=self.identity,
|
|
passwd=self.ttls_passwd,
|
|
realm=self.ttls_realm, domain=self.domain,
|
|
hessid=self.hessid)
|
|
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug, use_radius=True, hs20_enable=False)
|
|
self._pass("PASS: Station build finished")
|
|
|
|
def start(self):
|
|
if self.station_profile is None:
|
|
self._fail("Incorrect setup")
|
|
pprint.pprint(self.station_profile)
|
|
if self.station_profile.up is None:
|
|
self._fail("Incorrect station profile, missing profile.up")
|
|
if self.station_profile.up == False:
|
|
print("\nBringing ports up...")
|
|
data = {"shelf": 1,
|
|
"resource": self.resource,
|
|
"port": "ALL",
|
|
"probe_flags": 1}
|
|
self.json_post("/cli-json/nc_show_ports", data)
|
|
self.station_profile.admin_up()
|
|
LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names)
|
|
|
|
# station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl()))
|
|
duration = 0
|
|
maxTime = 30
|
|
ip = "0.0.0.0"
|
|
ap = ""
|
|
print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="")
|
|
connected_stations = {}
|
|
while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime):
|
|
duration += 3
|
|
time.sleep(10)
|
|
print(".", end="")
|
|
for sta_name in self.station_names:
|
|
sta_url = self.get_station_url(sta_name)
|
|
station_info = self.json_get(sta_url + "?fields=port,ip,ap")
|
|
|
|
# LFUtils.debug_printer.pprint(station_info)
|
|
if (station_info is not None) and ("interface" in station_info):
|
|
if "ip" in station_info["interface"]:
|
|
ip = station_info["interface"]["ip"]
|
|
if "ap" in station_info["interface"]:
|
|
ap = station_info["interface"]["ap"]
|
|
|
|
if (ap == "Not-Associated") or (ap == ""):
|
|
if self.debug:
|
|
print(" -%s," % sta_name, end="")
|
|
else:
|
|
if ip == "0.0.0.0":
|
|
if self.debug:
|
|
print(" %s (0.0.0.0)" % sta_name, end="")
|
|
else:
|
|
connected_stations[sta_name] = sta_url
|
|
data = {
|
|
"shelf":1,
|
|
"resource": self.resource,
|
|
"port": "ALL",
|
|
"probe_flags": 1
|
|
}
|
|
self.json_post("/cli-json/nc_show_ports", data)
|
|
|
|
for sta_name in self.station_names:
|
|
sta_url = self.get_station_url(sta_name)
|
|
station_info = self.json_get(sta_url) # + "?fields=port,ip,ap")
|
|
if station_info is None:
|
|
print("unable to query %s" % sta_url)
|
|
self.resulting_stations[sta_url] = station_info
|
|
ap = station_info["interface"]["ap"]
|
|
ip = station_info["interface"]["ip"]
|
|
if (ap != "") and (ap != "Not-Associated"):
|
|
print(" %s +AP %s, " % (sta_name, ap), end="")
|
|
if self.dut_bssid != "":
|
|
if self.dut_bssid.lower() == ap.lower():
|
|
self._pass(sta_name+" connected to BSSID: " + ap)
|
|
# self.test_results.append("PASSED: )
|
|
# print("PASSED: Connected to BSSID: "+ap)
|
|
else:
|
|
self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap))
|
|
else:
|
|
self._fail(sta_name+" did not connect to AP")
|
|
return False
|
|
|
|
if ip == "0.0.0.0":
|
|
self._fail("%s did not get an ip. Ending test" % sta_name)
|
|
else:
|
|
self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip))
|
|
|
|
if self.passes() == False:
|
|
if self.cleanup_on_exit:
|
|
print("Cleaning up...")
|
|
self.remove_stations()
|
|
return False
|
|
|
|
def udp_profile(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu):
|
|
# Create UDP endpoint - Alex's code!
|
|
self.l3_udp_tput_profile = self.localrealm.new_l3_cx_profile()
|
|
self.l3_udp_tput_profile.side_a_min_bps = side_a_min_bps
|
|
self.l3_udp_tput_profile.side_b_min_bps = side_b_min_bps
|
|
self.l3_udp_tput_profile.side_a_min_pdu = side_a_min_pdu
|
|
self.l3_udp_tput_profile.side_b_min_pdu = side_b_min_pdu
|
|
self.l3_udp_tput_profile.report_timer = 1000
|
|
self.l3_udp_tput_profile.name_prefix = "udp"
|
|
self.l3_udp_tput_profile.create(endp_type="lf_udp",
|
|
side_a=list(self.localrealm.find_ports_like("tput+")),
|
|
side_b="%d.%s" % (self.resource, self.upstream_port),
|
|
suppress_related_commands=True)
|
|
|
|
def tcp_profile(self, side_a_min_bps, side_b_min_bps):
|
|
# Create TCP endpoints - original code!
|
|
self.l3_tcp_tput_profile = self.localrealm.new_l3_cx_profile()
|
|
self.l3_tcp_tput_profile.side_a_min_bps = side_a_min_bps
|
|
self.l3_tcp_tput_profile.side_b_min_bps = side_b_min_bps
|
|
self.l3_tcp_tput_profile.name_prefix = "tcp"
|
|
self.l3_tcp_tput_profile.report_timer = 1000
|
|
self.l3_tcp_tput_profile.create(endp_type="lf_tcp",
|
|
side_a=list(self.localrealm.find_ports_like("tput+")),
|
|
side_b="%d.%s" % (self.resource, self.upstream_port),
|
|
suppress_related_commands=True)
|
|
|
|
# Start UDP Downstream Traffic
|
|
def udp_throughput(self):
|
|
print("\nStarting UDP Traffic")
|
|
self.l3_udp_tput_profile.start_cx()
|
|
time.sleep(1)
|
|
self.l3_udp_tput_profile.refresh_cx()
|
|
|
|
def tcp_throughput(self):
|
|
print("\nStarting TCP Traffic")
|
|
self.l3_tcp_tput_profile.start_cx()
|
|
time.sleep(1)
|
|
self.l3_tcp_tput_profile.refresh_cx()
|
|
|
|
def udp_stop(self):
|
|
# stop cx traffic
|
|
print("Stopping CX Traffic")
|
|
self.l3_udp_tput_profile.stop_cx()
|
|
|
|
# Refresh stats
|
|
print("\nRefresh CX stats")
|
|
self.l3_udp_tput_profile.refresh_cx()
|
|
|
|
print("Sleeping for 5 seconds")
|
|
time.sleep(5)
|
|
|
|
# get data for endpoints JSON
|
|
return self.collect_client_stats(self.l3_udp_tput_profile.created_cx)
|
|
# print("\n")
|
|
|
|
def tcp_stop(self):
|
|
# stop cx traffic
|
|
print("Stopping CX Traffic")
|
|
self.l3_tcp_tput_profile.stop_cx()
|
|
|
|
# Refresh stats
|
|
print("\nRefresh CX stats")
|
|
self.l3_tcp_tput_profile.refresh_cx()
|
|
|
|
print("Sleeping for 5 seconds")
|
|
time.sleep(5)
|
|
|
|
# get data for endpoints JSON
|
|
return self.collect_client_stats(self.l3_tcp_tput_profile.created_cx)
|
|
# print("\n")
|
|
|
|
# New Endpoint code to print TX and RX numbers
|
|
def collect_client_stats(self, endp_map):
|
|
print("Collecting Data")
|
|
fields="?fields=name,tx+bytes,rx+bytes"
|
|
for (cx_name, endps) in endp_map.items():
|
|
try:
|
|
endp_url = "/endp/%s%s" % (endps[0], fields)
|
|
endp_json = self.json_get(endp_url)
|
|
self.resulting_endpoints[endp_url] = endp_json
|
|
ptest_a_tx = endp_json['endpoint']['tx bytes']
|
|
ptest_a_rx = endp_json['endpoint']['rx bytes']
|
|
|
|
# ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"])
|
|
endp_url = "/endp/%s%s" % (endps[1], fields)
|
|
endp_json = self.json_get(endp_url)
|
|
self.resulting_endpoints[endp_url] = endp_json
|
|
ptest_b_tx = endp_json['endpoint']['tx bytes']
|
|
ptest_b_rx = endp_json['endpoint']['rx bytes']
|
|
|
|
byte_values = []
|
|
byte_values.append("Station TX: " + str(ptest_a_tx))
|
|
byte_values.append("Station RX: " + str(ptest_a_rx))
|
|
byte_values.append("AP TX: " + str(ptest_b_tx))
|
|
byte_values.append("AP RX: " + str(ptest_b_rx))
|
|
|
|
return byte_values
|
|
|
|
except Exception as e:
|
|
self.error(e)
|
|
|
|
def cleanup_udp(self):
|
|
# remove all endpoints and cxs
|
|
if self.cleanup_on_exit:
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
curr_endp_names = []
|
|
removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names())
|
|
for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
|
|
|
|
def cleanup_tcp(self):
|
|
# remove all endpoints and cxs
|
|
if self.cleanup_on_exit:
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
curr_endp_names = []
|
|
removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names())
|
|
for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
|
|
|
|
def cleanup(self):
|
|
# remove all endpoints and cxs
|
|
if self.cleanup_on_exit:
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
curr_endp_names = []
|
|
removeCX(self.lfclient_url, self.l3_tcp_tput_profile.get_cx_names())
|
|
removeCX(self.lfclient_url, self.l3_udp_tput_profile.get_cx_names())
|
|
for (cx_name, endp_names) in self.l3_tcp_tput_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
for (cx_name, endp_names) in self.l3_udp_tput_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
removeEndps(self.lfclient_url, curr_endp_names, debug=self.debug)
|
|
|
|
def udp_unidirectional(self, side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line):
|
|
self.udp_profile(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu)
|
|
self.start()
|
|
print("Running", direction, "Traffic for %s seconds" % self.runtime_secs)
|
|
self.udp_throughput()
|
|
print("napping %f sec" % self.runtime_secs)
|
|
time.sleep(self.runtime_secs)
|
|
values = self.udp_stop()
|
|
print(values)
|
|
# Get value required for measurement
|
|
bytes = values[values_line]
|
|
# Get value in Bits and convert to Mbps
|
|
bits = (int(bytes.split(": ", 1)[1])) * 8
|
|
mpbs = round((bits / 1000000) / self.runtime_secs, 2)
|
|
return mpbs
|
|
|
|
def tcp_unidirectional(self, side_a_min_bps, side_b_min_bps, direction, values_line):
|
|
self.tcp_profile(side_a_min_bps, side_b_min_bps)
|
|
self.start()
|
|
print("Running", direction, "Traffic for %s seconds" % self.runtime_secs)
|
|
self.tcp_throughput()
|
|
print("napping %f sec" % self.runtime_secs)
|
|
time.sleep(self.runtime_secs)
|
|
values = self.tcp_stop()
|
|
print(values)
|
|
# Get value required for measurement
|
|
bytes = values[values_line]
|
|
# Get value in Bits and convert to Mbps
|
|
bits = (int(bytes.split(": ", 1)[1])) * 8
|
|
mpbs = round((bits / 1000000) / self.runtime_secs, 2)
|
|
return mpbs
|
|
# ~class
|
|
|
|
|
|
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
########################## Test Code ################################
|
|
|
|
##Main will perform 4 throughput tests on SSID provided by input and return a list with the values
|
|
|
|
def main(ap_model, firmware, radio, ssid_name, ssid_psk, security, station, runtime, upstream_port):
|
|
######## Establish Client Connection #########################
|
|
singleClient = SingleClient("10.10.10.201", 8080, debug_=False)
|
|
singleClient.sta_mode = 0
|
|
singleClient.upstream_resource = 1
|
|
singleClient.upstream_port = upstream_port
|
|
singleClient.radio = radio
|
|
singleClient.resource = 1
|
|
singleClient.dut_ssid = ssid_name
|
|
singleClient.dut_passwd = ssid_psk
|
|
singleClient.dut_security = security
|
|
singleClient.station_names = station
|
|
singleClient.runtime_secs = runtime
|
|
singleClient.cleanup_on_exit = True
|
|
|
|
#Create List for Throughput Data
|
|
tput_data = []
|
|
|
|
####### Setup UDP Profile and Run Traffic Downstream (AP to STA) #######################
|
|
singleClient.setup()
|
|
side_a_min_bps = 56000
|
|
side_b_min_bps = 500000000
|
|
side_a_min_pdu = 1200
|
|
side_b_min_pdu = 1500
|
|
direction = "Downstream"
|
|
values_line = 1 # 1 = Station Rx
|
|
try:
|
|
udp_ds = singleClient.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line)
|
|
print("UDP Downstream:", udp_ds, "Mbps")
|
|
tput_data.append("UDP Downstream: " + str(udp_ds))
|
|
except:
|
|
udp_ds = "error"
|
|
print("UDP Downstream Test Error")
|
|
tput_data.append("UDP Downstream: Error")
|
|
|
|
|
|
####### Setup UDP Profile and Run Traffic Upstream (STA to AP) #######################
|
|
#singleClient.setup()
|
|
side_a_min_bps = 500000000
|
|
side_b_min_bps = 0
|
|
side_a_min_pdu = 1200
|
|
side_b_min_pdu = 1500
|
|
direction = "Upstream"
|
|
values_line = 3 # 3 = AP Rx
|
|
try:
|
|
udp_us = singleClient.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line)
|
|
print("UDP Upstream:",udp_us,"Mbps")
|
|
tput_data.append("UDP Upstream: " + str(udp_us))
|
|
except:
|
|
udp_us = "error"
|
|
print("UDP Upstream Test Error")
|
|
tput_data.append("UDP Upstream: Error")
|
|
#Cleanup UDP Endpoints
|
|
#singleClient.cleanup_udp()
|
|
|
|
|
|
####### Setup TCP Profile and Run Traffic Downstream (AP to STA) #######################
|
|
#singleClient.setup()
|
|
side_a_min_bps = 0
|
|
side_b_min_bps = 500000000
|
|
direction = "Downstream"
|
|
values_line = 1 # 1 = Station Rx
|
|
try:
|
|
tcp_ds = singleClient.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line)
|
|
print("TCP Downstream:",tcp_ds,"Mbps")
|
|
tput_data.append("TCP Downstream: " + str(tcp_ds))
|
|
except:
|
|
tcp_ds = "error"
|
|
print("TCP Downstream Test Error")
|
|
tput_data.append("TCP Downstream: Error")
|
|
|
|
####### Setup TCP Profile and Run Traffic Upstream (STA to AP) #######################
|
|
#singleClient.setup()
|
|
side_a_min_bps = 500000000
|
|
side_b_min_bps = 0
|
|
direction = "Upstream"
|
|
values_line = 3 # 3 = AP Rx
|
|
try:
|
|
tcp_us = singleClient.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line)
|
|
print("TCP Upstream:",tcp_us,"Mbps")
|
|
tput_data.append("TCP Upstream: " + str(tcp_us))
|
|
except:
|
|
tcp_us = "error"
|
|
print("TCP Upstream Test Error")
|
|
tput_data.append("TCP Uptream: Error")
|
|
|
|
#Cleanup TCP Endpoints
|
|
#singleClient.cleanup_tcp()
|
|
|
|
#Cleanup Endpoints
|
|
singleClient.cleanup()
|
|
|
|
return(tput_data)
|
|
|
|
def eap_tput(sta_list, ssid_name, radio, security, eap_type, identity, ttls_password, upstream_port):
|
|
eap_connect = SingleClientEAP("10.10.10.201", 8080, _debug_on=False)
|
|
eap_connect.upstream_resource = 1
|
|
eap_connect.upstream_port = upstream_port
|
|
eap_connect.security = security
|
|
eap_connect.sta_list = sta_list
|
|
eap_connect.station_names = sta_list
|
|
eap_connect.ssid = ssid_name
|
|
eap_connect.radio = radio
|
|
eap_connect.eap = eap_type
|
|
eap_connect.identity = identity
|
|
eap_connect.ttls_passwd = ttls_password
|
|
eap_connect.runtime_secs = 10
|
|
|
|
#Create List for Throughput Data
|
|
tput_data = []
|
|
|
|
####### Setup UDP Profile and Run Traffic Downstream (AP to STA) #######################
|
|
eap_connect.setup()
|
|
side_a_min_bps = 56000
|
|
side_b_min_bps = 500000000
|
|
side_a_min_pdu = 1200
|
|
side_b_min_pdu = 1500
|
|
direction = "Downstream"
|
|
values_line = 1 # 1 = Station Rx
|
|
try:
|
|
udp_ds = eap_connect.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line)
|
|
print("UDP Downstream:", udp_ds, "Mbps")
|
|
tput_data.append("UDP Downstream: " + str(udp_ds))
|
|
except:
|
|
udp_ds = "error"
|
|
print("UDP Downstream Test Error")
|
|
tput_data.append("UDP Downstream: Error")
|
|
|
|
|
|
####### Setup UDP Profile and Run Traffic Upstream (STA to AP) #######################
|
|
#singleClient.setup()
|
|
side_a_min_bps = 500000000
|
|
side_b_min_bps = 0
|
|
side_a_min_pdu = 1200
|
|
side_b_min_pdu = 1500
|
|
direction = "Upstream"
|
|
values_line = 3 # 3 = AP Rx
|
|
try:
|
|
udp_us = eap_connect.udp_unidirectional(side_a_min_bps, side_b_min_bps, side_a_min_pdu, side_b_min_pdu, direction, values_line)
|
|
print("UDP Upstream:",udp_us,"Mbps")
|
|
tput_data.append("UDP Upstream: " + str(udp_us))
|
|
except:
|
|
udp_us = "error"
|
|
print("UDP Upstream Test Error")
|
|
tput_data.append("UDP Upstream: Error")
|
|
#Cleanup UDP Endpoints
|
|
#singleClient.cleanup_udp()
|
|
|
|
|
|
####### Setup TCP Profile and Run Traffic Downstream (AP to STA) #######################
|
|
#singleClient.setup()
|
|
side_a_min_bps = 0
|
|
side_b_min_bps = 500000000
|
|
direction = "Downstream"
|
|
values_line = 1 # 1 = Station Rx
|
|
try:
|
|
tcp_ds = eap_connect.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line)
|
|
print("TCP Downstream:",tcp_ds,"Mbps")
|
|
tput_data.append("TCP Downstream: " + str(tcp_ds))
|
|
except:
|
|
tcp_ds = "error"
|
|
print("TCP Downstream Test Error")
|
|
tput_data.append("TCP Downstream: Error")
|
|
|
|
####### Setup TCP Profile and Run Traffic Upstream (STA to AP) #######################
|
|
#singleClient.setup()
|
|
side_a_min_bps = 500000000
|
|
side_b_min_bps = 0
|
|
direction = "Upstream"
|
|
values_line = 3 # 3 = AP Rx
|
|
try:
|
|
tcp_us = eap_connect.tcp_unidirectional(side_a_min_bps, side_b_min_bps, direction, values_line)
|
|
print("TCP Upstream:",tcp_us,"Mbps")
|
|
tput_data.append("TCP Upstream: " + str(tcp_us))
|
|
except:
|
|
tcp_us = "error"
|
|
print("TCP Upstream Test Error")
|
|
tput_data.append("TCP Uptream: Error")
|
|
|
|
#Cleanup TCP Endpoints
|
|
#singleClient.cleanup_tcp()
|
|
|
|
#Cleanup Endpoints
|
|
eap_connect.cleanup()
|
|
|
|
return(tput_data)
|