Files
wlan-lanforge-scripts/py-scripts/sta_scan_test.py
2022-07-12 15:45:45 +05:30

260 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
NAME: sta_scan_test.py
PURPOSE:
Creates a station with specified ssid info (can be real or fake ssid, if fake use open for security), then
starts a scan and waits 15 seconds, finally scan results are printed to console
Use './sta_scan_test.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
import importlib
import pandas as pd
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
import argparse
import time
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
realm = importlib.import_module("py-json.realm")
Realm = realm.Realm
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
class StaScan(Realm):
def __init__(self,
ssid=None,
security=None,
password=None,
sta_list=None,
upstream=None,
radio=None,
host="localhost",
port=8080,
mode=0,
number_template="00000",
csv_output=False,
use_ht160=False,
use_existing_station=False,
scan_time=15,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
if sta_list is None:
sta_list = []
super().__init__(lfclient_host=host,
lfclient_port=port),
self.upstream = upstream
self.host = host
self.port = port
self.ssid = ssid
self.sta_list = sta_list
self.security = security
self.password = password
self.radio = radio
self.mode = mode
self.number_template = number_template
self.csv_output = csv_output
self.debug = _debug_on
self.station_profile = self.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.debug = self.debug
self.station_profile.use_ht160 = use_ht160
self.scan_time = scan_time
self.use_existing_station = use_existing_station
if self.station_profile.use_ht160:
self.station_profile.mode = 9
self.station_profile.mode = mode
def start(self):
if self.use_existing_station:
# bring up existing sta list
#print(self.sta_list)
for s in self.sta_list:
eid = LFUtils.name_to_eid(s)
up_request = LFUtils.port_up_request(resource_id=eid[1], port_name=eid[2])
self.json_post("/cli-json/set_port", up_request)
else:
self.station_profile.admin_up()
self.sta_list = self.station_profile.station_names
LFUtils.wait_until_ports_admin_up(base_url=self.lfclient_url, port_list=self.sta_list,
debug_=self.debug)
if self.debug:
print("ports are admin-up, initiating scan requests.")
# Request port-table info for stations on each resource.
#stations = [LFUtils.name_to_eid(x) for x in self.sta_list]
#stations = pd.DataFrame(stations)
#resources = stations[1].unique()
#interfaces = list()
#for resource in resources:
# if self.debug:
# print("Requesting port listing on resource: %s"%(resource))
# shelf = stations[0][0]
# resource_station = list(stations[stations[1] == resource][2])
# url = '/port/%s/%s/%s' % (shelf, resource, ','.join(resource_station))
# if self.debug:
# print("Requesting station scan on resource with url: %s"%(url))
# response = self.json_get(url)
# if 'interface' in response.keys():
# interface = response['interface']
# interfaces.append(interface)
# elif 'interfaces' in response.keys():
# response_interfaces = response['interfaces']
# for interface in response_interfaces:
# for item in interface.values():
# interfaces.append(item)
#df = pd.DataFrame(interfaces)
#stations = df[df['port type'] == 'WIFI-STA']
#stations = list(stations.drop_duplicates('parent dev')['alias'])
#stations = [station for station in stations if station in self.sta_list]
#if self.debug:
# print("interfaces: %s\nstations: %s"%(interfaces, stations))
# Start scan on all stations.
for port in self.sta_list:
port = LFUtils.name_to_eid(port)
data = {
"shelf": port[0],
"resource": port[1],
"port": port[2]
}
self.json_post("/cli-json/scan_wifi", data)
# TODO: Make configurable
# Wait for scans to complete.
if self.debug:
print("Waiting for %s seconds for scan to complete" %(self.scan_time))
time.sleep(self.scan_time)
# Get results for all stations.
fmt = "%08s\t%015s\t%023s\t%07s\t%020s\t%07s\t%09s\t%07s"
if not self.csv_output:
print(fmt % ("Resource", "Station", "BSS", "Signal", "SSID", "Channel", "Frequency", "Age"))
for p in self.sta_list:
port = LFUtils.name_to_eid(p)
data = {
"shelf": port[0],
"resource": port[1],
"port": port[2]
}
scan_results = self.json_get("scanresults/%s/%s/%s" % (port[0], port[1], port[2]))
if self.debug:
print("Scan results for port: %s\n%s"%(port, scan_results))
if self.csv_output:
# TODO: This clobbers output of previous station, need a way to
# append (and add resource and wlan to the csv output so that
# multiple stations can be reported.
results = scan_results['scan-results']
df = pd.DataFrame([list(result.values())[0] for result in results])
df.to_csv(self.csv_output)
print('CSV output saved at %s' % self.csv_output)
else:
for result in scan_results['scan-results']:
for name, info in result.items():
print(fmt % (port[1], port[2], info['bss'], info['signal'], info['ssid'],
info['channel'], info['frequency'], info['age']))
def pre_cleanup(self):
self.station_profile.cleanup(self.sta_list)
def cleanup(self):
self.station_profile.cleanup()
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
debug=self.debug)
def build(self):
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, num_stations=1, debug=self.debug)
self._pass("PASS: Station build finished")
LFUtils.wait_until_ports_appear(base_url=self.lfclient_url, port_list=self.sta_list, debug=self.debug)
def main():
parser = Realm.create_basic_argparse(
prog='sta_scan_test.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Used to scan for ssids after creating a station
''',
description='''\
Optionally creates a station with specified ssid info (can be real or fake ssid, if fake use open for security).
If not creating a station, it can use existing station.
Then starts a scan and waits 15 seconds, finally scan results are printed to console.
Example:
./sta_scan_test.py --ssid test_name --security open --radio wiphy0
./sta_scan_test.py --sta_name 1.14.wlan0 1.1.wlan0 --use_existing_station --scan_time 5
''')
parser.add_argument('--use_existing_station', action='store_true', help='Use existing station instead of trying to create stations.')
parser.add_argument('--mode', help='Used to force mode of stations')
parser.add_argument('--sta_name', help='Optional: User defined station names: 1.2.wlan0 1.3.wlan0', nargs='+',
default=["sta0000"])
parser.add_argument('--csv_output', help='Specify file to which csv output will be saved, otherwise print it in the terminal',
default=None)
parser.add_argument('--scan_time', help='Specify time in seconds to wait for scan to complete. Default is 15',
default=15, type=int)
args = parser.parse_args()
station_list = args.sta_name
sta_scan_test = StaScan(host=args.mgr,
port=args.mgr_port,
number_template="0000",
sta_list=station_list,
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
security=args.security,
use_ht160=False,
use_existing_station=args.use_existing_station,
scan_time=args.scan_time,
csv_output=args.csv_output,
mode=args.mode,
_debug_on=args.debug)
if (not args.use_existing_station):
sta_scan_test.pre_cleanup()
sta_scan_test.build()
# exit()
if not sta_scan_test.passes():
print(sta_scan_test.get_fail_message())
sta_scan_test.exit_fail()
sta_scan_test.start()
if (not args.use_existing_station):
sta_scan_test.cleanup()
if __name__ == "__main__":
main()