mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-11-02 20:07:57 +00:00
This is pure 'git mv'. No changes were made to files. They will not work until we fix up the include paths again, that will be handled in future commits. Signed-off-by: Ben Greear <greearb@candelatech.com>
355 lines
15 KiB
Python
Executable File
355 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
#########################################################################################################
|
|
# Built to allow connection and test of clients using EAP-TTLS.
|
|
# Functions can be called to create a station, create TCP and UDP traffic, run it a short amount of time.
|
|
#
|
|
# Used by Nightly_Sanity and Throughput_Test ############################################################
|
|
#########################################################################################################
|
|
|
|
# This will create a station, create TCP and UDP traffic, run it a short amount of time,
|
|
# and verify whether traffic was sent and received. It also verifies the station connected
|
|
# to the requested BSSID if bssid is specified as an argument.
|
|
# The script will clean up the station and connections at the end of the test.
|
|
|
|
import sys
|
|
|
|
if sys.version_info[0] != 3:
|
|
print("This script requires Python 3")
|
|
exit(1)
|
|
|
|
if 'py-json' not in sys.path:
|
|
sys.path.append('../../py-json')
|
|
|
|
import argparse
|
|
import LANforge
|
|
from LANforge import LFUtils
|
|
# from LANforge import LFCliBase
|
|
from LANforge import lfcli_base
|
|
from LANforge.lfcli_base import LFCliBase
|
|
from LANforge.LFUtils import *
|
|
import realm
|
|
from realm import Realm
|
|
from lf_lib import *
|
|
import pprint
|
|
|
|
OPEN="open"
|
|
WEP="wep"
|
|
WPA="wpa"
|
|
WPA2="wpa2"
|
|
MODE_AUTO=0
|
|
|
|
class EAPConnect(LFCliBase):
|
|
def __init__(self, host, port, security=None, ssid=None, sta_list=None, number_template="00000", _debug_on=False, _dut_bssid="",
|
|
_exit_on_error=False, _sta_name=None, _resource=1, radio="wiphy0", key_mgmt="WPA-EAP", eap="", identity="",
|
|
ttls_passwd="", hessid=None, ttls_realm="", domain="", _sta_prefix='eap', _exit_on_fail=False, _cleanup_on_exit=True):
|
|
super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
|
|
self.host = host
|
|
self.port = port
|
|
self.ssid = ssid
|
|
self.radio = radio
|
|
self.security = security
|
|
#self.password = password
|
|
self.sta_list = sta_list
|
|
self.key_mgmt = key_mgmt
|
|
self.eap = eap
|
|
self.sta_prefix = _sta_prefix
|
|
self.identity = identity
|
|
self.ttls_passwd = ttls_passwd
|
|
self.ttls_realm = ttls_realm
|
|
self.domain = domain
|
|
self.hessid = hessid
|
|
self.dut_bssid = _dut_bssid
|
|
self.timeout = 120
|
|
self.number_template = number_template
|
|
self.debug = _debug_on
|
|
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
|
self.station_profile = self.local_realm.new_station_profile()
|
|
self.station_profile.lfclient_url = self.lfclient_url
|
|
self.station_profile.ssid = self.ssid
|
|
self.station_profile.security = self.security
|
|
self.station_profile.number_template_ = self.number_template
|
|
self.station_profile.mode = 0
|
|
#Added to test_ipv4_ttls code
|
|
self.upstream_url = None # defer construction
|
|
self.sta_url_map = None
|
|
self.upstream_resource = None
|
|
self.upstream_port = "eth2"
|
|
self.station_names = []
|
|
if _sta_name is not None:
|
|
self.station_names = [_sta_name]
|
|
self.localrealm = Realm(lfclient_host=host, lfclient_port=port)
|
|
self.resource = _resource
|
|
self.cleanup_on_exit = _cleanup_on_exit
|
|
self.resulting_stations = {}
|
|
self.resulting_endpoints = {}
|
|
self.station_profile = None
|
|
self.l3_udp_profile = None
|
|
self.l3_tcp_profile = None
|
|
|
|
# def get_realm(self) -> Realm: # py > 3.6
|
|
def get_realm(self):
|
|
return self.localrealm
|
|
|
|
def get_station_url(self, sta_name_=None):
|
|
if sta_name_ is None:
|
|
raise ValueError("get_station_url wants a station name")
|
|
if self.sta_url_map is None:
|
|
self.sta_url_map = {}
|
|
for sta_name in self.station_names:
|
|
self.sta_url_map[sta_name] = "port/1/%s/%s" % (self.resource, sta_name)
|
|
return self.sta_url_map[sta_name_]
|
|
|
|
def get_upstream_url(self):
|
|
if self.upstream_url is None:
|
|
self.upstream_url = "port/1/%s/%s" % (self.upstream_resource, self.upstream_port)
|
|
return self.upstream_url
|
|
|
|
# Compare pre-test values to post-test values
|
|
def compare_vals(self, name, postVal, print_pass=False, print_fail=True):
|
|
# print(f"Comparing {name}")
|
|
if postVal > 0:
|
|
self._pass("%s %s" % (name, postVal), print_pass)
|
|
else:
|
|
self._fail("%s did not report traffic: %s" % (name, postVal), print_fail)
|
|
|
|
def remove_stations(self):
|
|
for name in self.station_names:
|
|
LFUtils.removePort(self.resource, name, self.lfclient_url)
|
|
|
|
def num_associated(self, bssid):
|
|
counter = 0
|
|
# print("there are %d results" % len(self.station_results))
|
|
fields = "_links,port,alias,ip,ap,port+type"
|
|
self.station_results = self.localrealm.find_ports_like("%s*"%self.sta_prefix, fields, debug_=False)
|
|
if (self.station_results is None) or (len(self.station_results) < 1):
|
|
self.get_failed_result_list()
|
|
for eid,record in self.station_results.items():
|
|
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
|
|
#pprint(eid)
|
|
#pprint(record)
|
|
if record["ap"] == bssid:
|
|
counter += 1
|
|
#print("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")
|
|
return counter
|
|
|
|
def clear_test_results(self):
|
|
self.resulting_stations = {}
|
|
self.resulting_endpoints = {}
|
|
super().clear_test_results()
|
|
#super(StaConnect, self).clear_test_results().test_results.clear()
|
|
|
|
def setup(self):
|
|
self.clear_test_results()
|
|
self.check_connect()
|
|
upstream_json = self.json_get("%s?fields=alias,phantom,down,port,ip" % self.get_upstream_url(), debug_=False)
|
|
|
|
if upstream_json is None:
|
|
self._fail(message="Unable to query %s, bye" % self.upstream_port, print_=True)
|
|
return False
|
|
|
|
if upstream_json['interface']['ip'] == "0.0.0.0":
|
|
if self.debug:
|
|
pprint.pprint(upstream_json)
|
|
self._fail("Warning: %s lacks ip address" % self.get_upstream_url(), print_=True)
|
|
return False
|
|
|
|
# remove old stations
|
|
print("Removing old stations")
|
|
for sta_name in self.station_names:
|
|
sta_url = self.get_station_url(sta_name)
|
|
response = self.json_get(sta_url)
|
|
if (response is not None) and (response["interface"] is not None):
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
LFUtils.wait_until_ports_disappear(self.lfclient_url, self.station_names)
|
|
|
|
# Create stations and turn dhcp on
|
|
self.station_profile = self.localrealm.new_station_profile()
|
|
|
|
# Build stations
|
|
self.station_profile.use_security(self.security, self.ssid, passwd="[BLANK]")
|
|
self.station_profile.set_number_template(self.number_template)
|
|
print("Creating stations")
|
|
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
|
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
|
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
|
self.station_profile.set_wifi_extra(key_mgmt=self.key_mgmt, eap=self.eap, identity=self.identity,
|
|
passwd=self.ttls_passwd,
|
|
realm=self.ttls_realm, domain=self.domain,
|
|
hessid=self.hessid)
|
|
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug, use_radius=True, hs20_enable=False)
|
|
self._pass("PASS: Station build finished")
|
|
|
|
self.create_traffic = createTraffic(self.localrealm, self.sta_prefix, self.resource, self.upstream_port)
|
|
self.create_traffic.lf_l3_udp_profile()
|
|
self.create_traffic.lf_l3_tcp_profile()
|
|
|
|
def start(self):
|
|
if self.station_profile is None:
|
|
self._fail("Incorrect setup")
|
|
pprint.pprint(self.station_profile)
|
|
if self.station_profile.up is None:
|
|
self._fail("Incorrect station profile, missing profile.up")
|
|
if self.station_profile.up == False:
|
|
print("\nBringing ports up...")
|
|
data = {"shelf": 1,
|
|
"resource": self.resource,
|
|
"port": "ALL",
|
|
"probe_flags": 1}
|
|
self.json_post("/cli-json/nc_show_ports", data)
|
|
self.station_profile.admin_up()
|
|
LFUtils.waitUntilPortsAdminUp(self.resource, self.lfclient_url, self.station_names)
|
|
|
|
# station_info = self.jsonGet(self.mgr_url, "%s?fields=port,ip,ap" % (self.getStaUrl()))
|
|
duration = 0
|
|
maxTime = 60
|
|
ip = "0.0.0.0"
|
|
ap = ""
|
|
print("Waiting for %s stations to associate to AP: " % len(self.station_names), end="")
|
|
connected_stations = {}
|
|
while (len(connected_stations.keys()) < len(self.station_names)) and (duration < maxTime):
|
|
duration += 3
|
|
time.sleep(3)
|
|
print(".", end="")
|
|
for sta_name in self.station_names:
|
|
sta_url = self.get_station_url(sta_name)
|
|
station_info = self.json_get(sta_url + "?fields=port,ip,ap")
|
|
|
|
# LFUtils.debug_printer.pprint(station_info)
|
|
if (station_info is not None) and ("interface" in station_info):
|
|
if "ip" in station_info["interface"]:
|
|
ip = station_info["interface"]["ip"]
|
|
if "ap" in station_info["interface"]:
|
|
ap = station_info["interface"]["ap"]
|
|
|
|
if (ap == "Not-Associated") or (ap == ""):
|
|
if self.debug:
|
|
print(" -%s," % sta_name, end="")
|
|
else:
|
|
if ip == "0.0.0.0":
|
|
if self.debug:
|
|
print(" %s (0.0.0.0)" % sta_name, end="")
|
|
else:
|
|
connected_stations[sta_name] = sta_url
|
|
data = {
|
|
"shelf":1,
|
|
"resource": self.resource,
|
|
"port": "ALL",
|
|
"probe_flags": 1
|
|
}
|
|
self.json_post("/cli-json/nc_show_ports", data)
|
|
|
|
for sta_name in self.station_names:
|
|
sta_url = self.get_station_url(sta_name)
|
|
station_info = self.json_get(sta_url) # + "?fields=port,ip,ap")
|
|
if station_info is None:
|
|
print("unable to query %s" % sta_url)
|
|
self.resulting_stations[sta_url] = station_info
|
|
ap = station_info["interface"]["ap"]
|
|
ip = station_info["interface"]["ip"]
|
|
if (ap != "") and (ap != "Not-Associated"):
|
|
print(" %s +AP %s, " % (sta_name, ap), end="")
|
|
if self.dut_bssid != "":
|
|
if self.dut_bssid.lower() == ap.lower():
|
|
self._pass(sta_name+" connected to BSSID: " + ap)
|
|
# self.test_results.append("PASSED: )
|
|
# print("PASSED: Connected to BSSID: "+ap)
|
|
else:
|
|
self._fail("%s connected to wrong BSSID, requested: %s Actual: %s" % (sta_name, self.dut_bssid, ap))
|
|
else:
|
|
self._fail(sta_name+" did not connect to AP")
|
|
return False
|
|
|
|
if ip == "0.0.0.0":
|
|
self._fail("%s did not get an ip. Ending test" % sta_name)
|
|
else:
|
|
self._pass("%s connected to AP: %s With IP: %s" % (sta_name, ap, ip))
|
|
|
|
if self.passes() == False:
|
|
if self.cleanup_on_exit:
|
|
print("Cleaning up...")
|
|
self.remove_stations()
|
|
return False
|
|
|
|
# start cx traffic
|
|
print("\nStarting CX Traffic")
|
|
|
|
self.create_traffic.l3_udp_profile.start_cx()
|
|
self.create_traffic.l3_tcp_profile.start_cx()
|
|
time.sleep(1)
|
|
self.create_traffic.l3_tcp_profile.refresh_cx()
|
|
self.create_traffic.l3_udp_profile.refresh_cx()
|
|
|
|
def collect_endp_stats(self, endp_map):
|
|
print("Collecting Data")
|
|
fields="?fields=name,tx+bytes,rx+bytes"
|
|
for (cx_name, endps) in endp_map.items():
|
|
try:
|
|
endp_url = "/endp/%s%s" % (endps[0], fields)
|
|
endp_json = self.json_get(endp_url)
|
|
self.resulting_endpoints[endp_url] = endp_json
|
|
ptest_a_tx = endp_json['endpoint']['tx bytes']
|
|
ptest_a_rx = endp_json['endpoint']['rx bytes']
|
|
|
|
#ptest = self.json_get("/endp/%s?fields=tx+bytes,rx+bytes" % cx_names[cx_name]["b"])
|
|
endp_url = "/endp/%s%s" % (endps[1], fields)
|
|
endp_json = self.json_get(endp_url)
|
|
self.resulting_endpoints[endp_url] = endp_json
|
|
|
|
ptest_b_tx = endp_json['endpoint']['tx bytes']
|
|
ptest_b_rx = endp_json['endpoint']['rx bytes']
|
|
|
|
self.compare_vals("testTCP-A TX", ptest_a_tx)
|
|
self.compare_vals("testTCP-A RX", ptest_a_rx)
|
|
|
|
self.compare_vals("testTCP-B TX", ptest_b_tx)
|
|
self.compare_vals("testTCP-B RX", ptest_b_rx)
|
|
|
|
except Exception as e:
|
|
print("Is this the function having the error?")
|
|
self.error(e)
|
|
|
|
|
|
def stop(self):
|
|
# stop cx traffic
|
|
print("Stopping CX Traffic")
|
|
self.create_traffic.l3_udp_profile.stop_cx()
|
|
self.create_traffic.l3_tcp_profile.stop_cx()
|
|
|
|
# Refresh stats
|
|
print("\nRefresh CX stats")
|
|
self.create_traffic.l3_udp_profile.refresh_cx()
|
|
self.create_traffic.l3_tcp_profile.refresh_cx()
|
|
|
|
print("Sleeping for 5 seconds")
|
|
time.sleep(5)
|
|
|
|
# get data for endpoints JSON
|
|
self.collect_endp_stats(self.create_traffic.l3_udp_profile.created_cx)
|
|
self.collect_endp_stats(self.create_traffic.l3_tcp_profile.created_cx)
|
|
# print("\n")
|
|
|
|
def cleanup(self):
|
|
# remove all endpoints and cxs
|
|
if self.cleanup_on_exit:
|
|
for sta_name in self.station_names:
|
|
LFUtils.removePort(self.resource, sta_name, self.lfclient_url)
|
|
curr_endp_names = []
|
|
removeCX(self.lfclient_url, self.create_traffic.l3_udp_profile.get_cx_names())
|
|
removeCX(self.lfclient_url, self.create_traffic.l3_tcp_profile.get_cx_names())
|
|
for (cx_name, endp_names) in self.create_traffic.l3_udp_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
for (cx_name, endp_names) in self.create_traffic.l3_tcp_profile.created_cx.items():
|
|
curr_endp_names.append(endp_names[0])
|
|
curr_endp_names.append(endp_names[1])
|
|
removeEndps(self.lfclient_url, curr_endp_names, debug= self.debug)
|
|
|
|
# ~class
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|