Files
wlan-lanforge-scripts/py-scripts/sta_scan_test.py
Ben Greear 5056a01e94 sta-scan-test: Allow using existing stations.
Lots of changes to allow this script to scan on existing stations
on multiple resources.

Add --use_existing_station option.

Fix help to show how to properly specify multiple stations with
--sta_name

Comment out convoluted logic that tried to find radio info for stations.
This is not needed for any purpose that I can find, and it did not
allow me to specify sta_name with EID format.

Start scan on all stations, then do sleep time, then grab results.
This allows concurrent scanning on multiple resources/radios.

Print more info in text based output, add station resource and name
to output so we can decipher multiple radio output.

Add note about how .csv output is broken in this case, the logic
needs to be improved to append results for all stations and write a single file.

Add --scan_time argument to specify sleep time (defaults to 15 seconds
to match the original hard-coded value).

Signed-off-by: Ben Greear <greearb@candelatech.com>
2022-07-12 15:45:23 +05:30

260 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
NAME: sta_scan_test.py
PURPOSE:
Creates a station with specified ssid info (can be real or fake ssid, if fake use open for security), then
starts a scan and waits 15 seconds, finally scan results are printed to console
Use './sta_scan_test.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
import importlib
import pandas as pd
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
import argparse
import time
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
realm = importlib.import_module("py-json.realm")
Realm = realm.Realm
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
class StaScan(Realm):
def __init__(self,
ssid=None,
security=None,
password=None,
sta_list=None,
upstream=None,
radio=None,
host="localhost",
port=8080,
mode=0,
number_template="00000",
csv_output=False,
use_ht160=False,
use_existing_station=False,
scan_time=15,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
if sta_list is None:
sta_list = []
super().__init__(lfclient_host=host,
lfclient_port=port),
self.upstream = upstream
self.host = host
self.port = port
self.ssid = ssid
self.sta_list = sta_list
self.security = security
self.password = password
self.radio = radio
self.mode = mode
self.number_template = number_template
self.csv_output = csv_output
self.debug = _debug_on
self.station_profile = self.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.debug = self.debug
self.station_profile.use_ht160 = use_ht160
self.scan_time = scan_time
self.use_existing_station = use_existing_station
if self.station_profile.use_ht160:
self.station_profile.mode = 9
self.station_profile.mode = mode
def start(self):
if self.use_existing_station:
# bring up existing sta list
#print(self.sta_list)
for s in self.sta_list:
eid = LFUtils.name_to_eid(s)
up_request = LFUtils.port_up_request(resource_id=eid[1], port_name=eid[2])
self.json_post("/cli-json/set_port", up_request)
else:
self.station_profile.admin_up()
self.sta_list = self.station_profile.station_names
LFUtils.wait_until_ports_admin_up(base_url=self.lfclient_url, port_list=self.sta_list,
debug_=self.debug)
if self.debug:
print("ports are admin-up, initiating scan requests.")
# Request port-table info for stations on each resource.
#stations = [LFUtils.name_to_eid(x) for x in self.sta_list]
#stations = pd.DataFrame(stations)
#resources = stations[1].unique()
#interfaces = list()
#for resource in resources:
# if self.debug:
# print("Requesting port listing on resource: %s"%(resource))
# shelf = stations[0][0]
# resource_station = list(stations[stations[1] == resource][2])
# url = '/port/%s/%s/%s' % (shelf, resource, ','.join(resource_station))
# if self.debug:
# print("Requesting station scan on resource with url: %s"%(url))
# response = self.json_get(url)
# if 'interface' in response.keys():
# interface = response['interface']
# interfaces.append(interface)
# elif 'interfaces' in response.keys():
# response_interfaces = response['interfaces']
# for interface in response_interfaces:
# for item in interface.values():
# interfaces.append(item)
#df = pd.DataFrame(interfaces)
#stations = df[df['port type'] == 'WIFI-STA']
#stations = list(stations.drop_duplicates('parent dev')['alias'])
#stations = [station for station in stations if station in self.sta_list]
#if self.debug:
# print("interfaces: %s\nstations: %s"%(interfaces, stations))
# Start scan on all stations.
for port in self.sta_list:
port = LFUtils.name_to_eid(port)
data = {
"shelf": port[0],
"resource": port[1],
"port": port[2]
}
self.json_post("/cli-json/scan_wifi", data)
# TODO: Make configurable
# Wait for scans to complete.
if self.debug:
print("Waiting for %s seconds for scan to complete" %(self.scan_time))
time.sleep(self.scan_time)
# Get results for all stations.
fmt = "%08s\t%015s\t%023s\t%07s\t%020s\t%07s\t%09s\t%07s"
if not self.csv_output:
print(fmt % ("Resource", "Station", "BSS", "Signal", "SSID", "Channel", "Frequency", "Age"))
for p in self.sta_list:
port = LFUtils.name_to_eid(p)
data = {
"shelf": port[0],
"resource": port[1],
"port": port[2]
}
scan_results = self.json_get("scanresults/%s/%s/%s" % (port[0], port[1], port[2]))
if self.debug:
print("Scan results for port: %s\n%s"%(port, scan_results))
if self.csv_output:
# TODO: This clobbers output of previous station, need a way to
# append (and add resource and wlan to the csv output so that
# multiple stations can be reported.
results = scan_results['scan-results']
df = pd.DataFrame([list(result.values())[0] for result in results])
df.to_csv(self.csv_output)
print('CSV output saved at %s' % self.csv_output)
else:
for result in scan_results['scan-results']:
for name, info in result.items():
print(fmt % (port[1], port[2], info['bss'], info['signal'], info['ssid'],
info['channel'], info['frequency'], info['age']))
def pre_cleanup(self):
self.station_profile.cleanup(self.sta_list)
def cleanup(self):
self.station_profile.cleanup()
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
debug=self.debug)
def build(self):
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, num_stations=1, debug=self.debug)
self._pass("PASS: Station build finished")
LFUtils.wait_until_ports_appear(','.join(self.sta_list))
def main():
parser = Realm.create_basic_argparse(
prog='sta_scan_test.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Used to scan for ssids after creating a station
''',
description='''\
Optionally creates a station with specified ssid info (can be real or fake ssid, if fake use open for security).
If not creating a station, it can use existing station.
Then starts a scan and waits 15 seconds, finally scan results are printed to console.
Example:
./sta_scan_test.py --ssid test_name --security open --radio wiphy0
./sta_scan_test.py --sta_name 1.14.wlan0 1.1.wlan0 --use_existing_station --scan_time 5
''')
parser.add_argument('--use_existing_station', action='store_true', help='Use existing station instead of trying to create stations.')
parser.add_argument('--mode', help='Used to force mode of stations')
parser.add_argument('--sta_name', help='Optional: User defined station names: 1.2.wlan0 1.3.wlan0', nargs='+',
default=["sta0000"])
parser.add_argument('--csv_output', help='Specify file to which csv output will be saved, otherwise print it in the terminal',
default=None)
parser.add_argument('--scan_time', help='Specify time in seconds to wait for scan to complete. Default is 15',
default=15, type=int)
args = parser.parse_args()
station_list = args.sta_name
sta_scan_test = StaScan(host=args.mgr,
port=args.mgr_port,
number_template="0000",
sta_list=station_list,
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
security=args.security,
use_ht160=False,
use_existing_station=args.use_existing_station,
scan_time=args.scan_time,
csv_output=args.csv_output,
mode=args.mode,
_debug_on=args.debug)
if (not args.use_existing_station):
sta_scan_test.pre_cleanup()
sta_scan_test.build()
# exit()
if not sta_scan_test.passes():
print(sta_scan_test.get_fail_message())
sta_scan_test.exit_fail()
sta_scan_test.start()
if (not args.use_existing_station):
sta_scan_test.cleanup()
if __name__ == "__main__":
main()