mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-10-31 18:58:01 +00:00
202 lines
7.8 KiB
Python
Executable File
202 lines
7.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
"""
|
|
NAME: sta_scan_test.py
|
|
|
|
PURPOSE:
|
|
Creates a station with specified ssid info (can be real or fake ssid, if fake use open for security), then
|
|
starts a scan and waits 15 seconds, finally scan results are printed to console
|
|
|
|
Use './sta_scan_test.py --help' to see command line usage and options
|
|
Copyright 2021 Candela Technologies Inc
|
|
License: Free to distribute and modify. LANforge systems must be licensed.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import importlib
|
|
import pandas as pd
|
|
|
|
if sys.version_info[0] != 3:
|
|
print("This script requires Python 3")
|
|
exit(1)
|
|
|
|
import argparse
|
|
import time
|
|
|
|
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
|
|
|
|
realm = importlib.import_module("py-json.realm")
|
|
Realm = realm.Realm
|
|
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
|
|
|
|
|
|
class StaScan(Realm):
|
|
def __init__(self,
|
|
ssid=None,
|
|
security=None,
|
|
password=None,
|
|
sta_list=None,
|
|
upstream=None,
|
|
radio=None,
|
|
host="localhost",
|
|
port=8080,
|
|
mode=0,
|
|
number_template="00000",
|
|
csv_output=False,
|
|
use_ht160=False,
|
|
_debug_on=False,
|
|
_exit_on_error=False,
|
|
_exit_on_fail=False):
|
|
if sta_list is None:
|
|
sta_list = []
|
|
super().__init__(lfclient_host=host,
|
|
lfclient_port=port),
|
|
self.upstream = upstream
|
|
self.host = host
|
|
self.port = port
|
|
self.ssid = ssid
|
|
self.sta_list = sta_list
|
|
self.security = security
|
|
self.password = password
|
|
self.radio = radio
|
|
self.mode = mode
|
|
self.number_template = number_template
|
|
self.csv_output = csv_output
|
|
self.debug = _debug_on
|
|
self.station_profile = self.new_station_profile()
|
|
self.station_profile.lfclient_url = self.lfclient_url
|
|
self.station_profile.ssid = self.ssid
|
|
self.station_profile.ssid_pass = self.password
|
|
self.station_profile.security = self.security
|
|
self.station_profile.number_template_ = self.number_template
|
|
self.station_profile.debug = self.debug
|
|
|
|
self.station_profile.use_ht160 = use_ht160
|
|
if self.station_profile.use_ht160:
|
|
self.station_profile.mode = 9
|
|
self.station_profile.mode = mode
|
|
|
|
def start(self):
|
|
try:
|
|
self.station_profile.admin_up()
|
|
print(self.sta_list)
|
|
LFUtils.wait_until_ports_admin_up(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
|
|
debug_=self.debug)
|
|
stations = [LFUtils.name_to_eid(x) for x in self.sta_list]
|
|
stations = pd.DataFrame(stations)
|
|
resources = stations[1].unique()
|
|
interfaces = list()
|
|
for resource in resources:
|
|
shelf = stations[0][0]
|
|
resource_station = list(stations[stations[1] == resource][2])
|
|
url = '/port/%s/%s/%s' % (shelf, resource, ','.join(resource_station))
|
|
response = self.json_get(url)
|
|
if 'interface' in response.keys():
|
|
interface = response['interface']
|
|
interfaces.append(interface)
|
|
elif 'interfaces' in response.keys():
|
|
response_interfaces = response['interfaces']
|
|
for interface in response_interfaces:
|
|
for item in interface.values():
|
|
interfaces.append(item)
|
|
df = pd.DataFrame(interfaces)
|
|
stations = df[df['port type'] == 'WIFI-STA']
|
|
stations = list(stations.drop_duplicates('parent dev')['alias'])
|
|
stations = [station for station in stations if station in self.sta_list]
|
|
|
|
for port in stations:
|
|
port = LFUtils.name_to_eid(port)
|
|
data = {
|
|
"shelf": port[0],
|
|
"resource": port[1],
|
|
"port": port[2]
|
|
}
|
|
self.json_post("/cli-json/scan_wifi", data)
|
|
time.sleep(15)
|
|
scan_results = self.json_get("scanresults/%s/%s/%s" % (port[0], port[1], port[2]))
|
|
if self.csv_output:
|
|
results = scan_results['scan-results']
|
|
df = pd.DataFrame([list(result.values())[0] for result in results])
|
|
df.to_csv(self.csv_output)
|
|
print('CSV output saved at %s' % self.csv_output)
|
|
else:
|
|
print("{0:<23}".format("BSS"), "{0:<7}".format("Signal"), "{0:<5}".format("SSID"))
|
|
for result in scan_results['scan-results']:
|
|
for name, info in result.items():
|
|
print("%s\t%s\t%s" % (info['bss'], info['signal'], info['ssid']))
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
def pre_cleanup(self):
|
|
self.station_profile.cleanup(self.sta_list)
|
|
|
|
def cleanup(self):
|
|
self.station_profile.cleanup()
|
|
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
|
|
debug=self.debug)
|
|
|
|
def build(self):
|
|
self.station_profile.use_security(self.security, self.ssid, self.password)
|
|
self.station_profile.set_number_template(self.number_template)
|
|
print("Creating stations")
|
|
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
|
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
|
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
|
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, num_stations=1, debug=self.debug)
|
|
self._pass("PASS: Station build finished")
|
|
LFUtils.wait_until_ports_appear(','.join(self.sta_list))
|
|
|
|
|
|
def main():
|
|
parser = Realm.create_basic_argparse(
|
|
prog='sta_scan_test.py',
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
epilog='''\
|
|
Used to scan for ssids after creating a station
|
|
''',
|
|
description='''\
|
|
Creates a station with specified ssid info (can be real or fake ssid, if fake use open for security), then
|
|
starts a scan and waits 15 seconds, finally scan results are printed to console
|
|
|
|
Example:
|
|
./sta_scan_test.py --ssid test_name --security open --radio wiphy0
|
|
''')
|
|
|
|
parser.add_argument('--mode', help='Used to force mode of stations')
|
|
parser.add_argument('--sta_name', help='Optional: User defined station names, can be a comma or space separated list', nargs='+',
|
|
default=["sta0000"])
|
|
parser.add_argument('--csv_output', help='create CSV from scan results, otherwise print it in the terminal', default=None)
|
|
|
|
args = parser.parse_args()
|
|
|
|
station_list = args.sta_name
|
|
sta_scan_test = StaScan(host=args.mgr,
|
|
port=args.mgr_port,
|
|
number_template="0000",
|
|
sta_list=station_list,
|
|
upstream=args.upstream_port,
|
|
ssid=args.ssid,
|
|
password=args.passwd,
|
|
radio=args.radio,
|
|
security=args.security,
|
|
use_ht160=False,
|
|
csv_output=args.csv_output,
|
|
mode=args.mode,
|
|
_debug_on=args.debug)
|
|
|
|
sta_scan_test.pre_cleanup()
|
|
|
|
sta_scan_test.build()
|
|
# exit()
|
|
if not sta_scan_test.passes():
|
|
print(sta_scan_test.get_fail_message())
|
|
sta_scan_test.exit_fail()
|
|
|
|
sta_scan_test.start()
|
|
sta_scan_test.cleanup()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|