mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-08 06:33:23 +00:00
sta-scan-test: Allow using existing stations.
Lots of changes to allow this script to scan on existing stations on multiple resources. Add --use_existing_station option. Fix help to show how to properly specify multiple stations with --sta_name Comment out convoluted logic that tried to find radio info for stations. This is not needed for any purpose that I can find, and it did not allow me to specify sta_name with EID format. Start scan on all stations, then do sleep time, then grab results. This allows concurrent scanning on multiple resources/radios. Print more info in text based output, add station resource and name to output so we can decipher multiple radio output. Add note about how .csv output is broken in this case, the logic needs to be improved to append results for all stations and write a single file. Add --scan_time argument to specify sleep time (defaults to 15 seconds to match the original hard-coded value). Signed-off-by: Ben Greear <greearb@candelatech.com>
This commit is contained in:
@@ -45,6 +45,8 @@ class StaScan(Realm):
|
|||||||
number_template="00000",
|
number_template="00000",
|
||||||
csv_output=False,
|
csv_output=False,
|
||||||
use_ht160=False,
|
use_ht160=False,
|
||||||
|
use_existing_station=False,
|
||||||
|
scan_time=15,
|
||||||
_debug_on=False,
|
_debug_on=False,
|
||||||
_exit_on_error=False,
|
_exit_on_error=False,
|
||||||
_exit_on_fail=False):
|
_exit_on_fail=False):
|
||||||
@@ -73,60 +75,105 @@ class StaScan(Realm):
|
|||||||
self.station_profile.debug = self.debug
|
self.station_profile.debug = self.debug
|
||||||
|
|
||||||
self.station_profile.use_ht160 = use_ht160
|
self.station_profile.use_ht160 = use_ht160
|
||||||
|
self.scan_time = scan_time
|
||||||
|
self.use_existing_station = use_existing_station
|
||||||
if self.station_profile.use_ht160:
|
if self.station_profile.use_ht160:
|
||||||
self.station_profile.mode = 9
|
self.station_profile.mode = 9
|
||||||
self.station_profile.mode = mode
|
self.station_profile.mode = mode
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
try:
|
if self.use_existing_station:
|
||||||
|
# bring up existing sta list
|
||||||
|
#print(self.sta_list)
|
||||||
|
for s in self.sta_list:
|
||||||
|
eid = LFUtils.name_to_eid(s)
|
||||||
|
up_request = LFUtils.port_up_request(resource_id=eid[1], port_name=eid[2])
|
||||||
|
self.json_post("/cli-json/set_port", up_request)
|
||||||
|
else:
|
||||||
self.station_profile.admin_up()
|
self.station_profile.admin_up()
|
||||||
print(self.sta_list)
|
self.sta_list = self.station_profile.station_names
|
||||||
LFUtils.wait_until_ports_admin_up(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
|
|
||||||
debug_=self.debug)
|
|
||||||
stations = [LFUtils.name_to_eid(x) for x in self.sta_list]
|
|
||||||
stations = pd.DataFrame(stations)
|
|
||||||
resources = stations[1].unique()
|
|
||||||
interfaces = list()
|
|
||||||
for resource in resources:
|
|
||||||
shelf = stations[0][0]
|
|
||||||
resource_station = list(stations[stations[1] == resource][2])
|
|
||||||
url = '/port/%s/%s/%s' % (shelf, resource, ','.join(resource_station))
|
|
||||||
response = self.json_get(url)
|
|
||||||
if 'interface' in response.keys():
|
|
||||||
interface = response['interface']
|
|
||||||
interfaces.append(interface)
|
|
||||||
elif 'interfaces' in response.keys():
|
|
||||||
response_interfaces = response['interfaces']
|
|
||||||
for interface in response_interfaces:
|
|
||||||
for item in interface.values():
|
|
||||||
interfaces.append(item)
|
|
||||||
df = pd.DataFrame(interfaces)
|
|
||||||
stations = df[df['port type'] == 'WIFI-STA']
|
|
||||||
stations = list(stations.drop_duplicates('parent dev')['alias'])
|
|
||||||
stations = [station for station in stations if station in self.sta_list]
|
|
||||||
|
|
||||||
for port in stations:
|
LFUtils.wait_until_ports_admin_up(base_url=self.lfclient_url, port_list=self.sta_list,
|
||||||
port = LFUtils.name_to_eid(port)
|
debug_=self.debug)
|
||||||
data = {
|
|
||||||
"shelf": port[0],
|
if self.debug:
|
||||||
"resource": port[1],
|
print("ports are admin-up, initiating scan requests.")
|
||||||
"port": port[2]
|
|
||||||
}
|
# Request port-table info for stations on each resource.
|
||||||
self.json_post("/cli-json/scan_wifi", data)
|
#stations = [LFUtils.name_to_eid(x) for x in self.sta_list]
|
||||||
time.sleep(15)
|
#stations = pd.DataFrame(stations)
|
||||||
scan_results = self.json_get("scanresults/%s/%s/%s" % (port[0], port[1], port[2]))
|
#resources = stations[1].unique()
|
||||||
if self.csv_output:
|
#interfaces = list()
|
||||||
results = scan_results['scan-results']
|
#for resource in resources:
|
||||||
df = pd.DataFrame([list(result.values())[0] for result in results])
|
# if self.debug:
|
||||||
df.to_csv(self.csv_output)
|
# print("Requesting port listing on resource: %s"%(resource))
|
||||||
print('CSV output saved at %s' % self.csv_output)
|
# shelf = stations[0][0]
|
||||||
else:
|
# resource_station = list(stations[stations[1] == resource][2])
|
||||||
print("{0:<23}".format("BSS"), "{0:<7}".format("Signal"), "{0:<5}".format("SSID"))
|
# url = '/port/%s/%s/%s' % (shelf, resource, ','.join(resource_station))
|
||||||
for result in scan_results['scan-results']:
|
# if self.debug:
|
||||||
for name, info in result.items():
|
# print("Requesting station scan on resource with url: %s"%(url))
|
||||||
print("%s\t%s\t%s" % (info['bss'], info['signal'], info['ssid']))
|
# response = self.json_get(url)
|
||||||
except Exception as e:
|
# if 'interface' in response.keys():
|
||||||
print(e)
|
# interface = response['interface']
|
||||||
|
# interfaces.append(interface)
|
||||||
|
# elif 'interfaces' in response.keys():
|
||||||
|
# response_interfaces = response['interfaces']
|
||||||
|
# for interface in response_interfaces:
|
||||||
|
# for item in interface.values():
|
||||||
|
# interfaces.append(item)
|
||||||
|
|
||||||
|
#df = pd.DataFrame(interfaces)
|
||||||
|
#stations = df[df['port type'] == 'WIFI-STA']
|
||||||
|
#stations = list(stations.drop_duplicates('parent dev')['alias'])
|
||||||
|
#stations = [station for station in stations if station in self.sta_list]
|
||||||
|
|
||||||
|
#if self.debug:
|
||||||
|
# print("interfaces: %s\nstations: %s"%(interfaces, stations))
|
||||||
|
|
||||||
|
# Start scan on all stations.
|
||||||
|
for port in self.sta_list:
|
||||||
|
port = LFUtils.name_to_eid(port)
|
||||||
|
data = {
|
||||||
|
"shelf": port[0],
|
||||||
|
"resource": port[1],
|
||||||
|
"port": port[2]
|
||||||
|
}
|
||||||
|
self.json_post("/cli-json/scan_wifi", data)
|
||||||
|
|
||||||
|
# TODO: Make configurable
|
||||||
|
# Wait for scans to complete.
|
||||||
|
if self.debug:
|
||||||
|
print("Waiting for %s seconds for scan to complete" %(self.scan_time))
|
||||||
|
time.sleep(self.scan_time)
|
||||||
|
|
||||||
|
# Get results for all stations.
|
||||||
|
fmt = "%08s\t%015s\t%023s\t%07s\t%020s\t%07s\t%09s\t%07s"
|
||||||
|
if not self.csv_output:
|
||||||
|
print(fmt % ("Resource", "Station", "BSS", "Signal", "SSID", "Channel", "Frequency", "Age"))
|
||||||
|
|
||||||
|
for p in self.sta_list:
|
||||||
|
port = LFUtils.name_to_eid(p)
|
||||||
|
data = {
|
||||||
|
"shelf": port[0],
|
||||||
|
"resource": port[1],
|
||||||
|
"port": port[2]
|
||||||
|
}
|
||||||
|
scan_results = self.json_get("scanresults/%s/%s/%s" % (port[0], port[1], port[2]))
|
||||||
|
if self.debug:
|
||||||
|
print("Scan results for port: %s\n%s"%(port, scan_results))
|
||||||
|
if self.csv_output:
|
||||||
|
# TODO: This clobbers output of previous station, need a way to
|
||||||
|
# append (and add resource and wlan to the csv output so that
|
||||||
|
# multiple stations can be reported.
|
||||||
|
results = scan_results['scan-results']
|
||||||
|
df = pd.DataFrame([list(result.values())[0] for result in results])
|
||||||
|
df.to_csv(self.csv_output)
|
||||||
|
print('CSV output saved at %s' % self.csv_output)
|
||||||
|
else:
|
||||||
|
for result in scan_results['scan-results']:
|
||||||
|
for name, info in result.items():
|
||||||
|
print(fmt % (port[1], port[2], info['bss'], info['signal'], info['ssid'],
|
||||||
|
info['channel'], info['frequency'], info['age']))
|
||||||
|
|
||||||
def pre_cleanup(self):
|
def pre_cleanup(self):
|
||||||
self.station_profile.cleanup(self.sta_list)
|
self.station_profile.cleanup(self.sta_list)
|
||||||
@@ -156,17 +203,23 @@ def main():
|
|||||||
Used to scan for ssids after creating a station
|
Used to scan for ssids after creating a station
|
||||||
''',
|
''',
|
||||||
description='''\
|
description='''\
|
||||||
Creates a station with specified ssid info (can be real or fake ssid, if fake use open for security), then
|
Optionally creates a station with specified ssid info (can be real or fake ssid, if fake use open for security).
|
||||||
starts a scan and waits 15 seconds, finally scan results are printed to console
|
If not creating a station, it can use existing station.
|
||||||
|
Then starts a scan and waits 15 seconds, finally scan results are printed to console.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
./sta_scan_test.py --ssid test_name --security open --radio wiphy0
|
./sta_scan_test.py --ssid test_name --security open --radio wiphy0
|
||||||
|
./sta_scan_test.py --sta_name 1.14.wlan0 1.1.wlan0 --use_existing_station --scan_time 5
|
||||||
''')
|
''')
|
||||||
|
|
||||||
|
parser.add_argument('--use_existing_station', action='store_true', help='Use existing station instead of trying to create stations.')
|
||||||
parser.add_argument('--mode', help='Used to force mode of stations')
|
parser.add_argument('--mode', help='Used to force mode of stations')
|
||||||
parser.add_argument('--sta_name', help='Optional: User defined station names, can be a comma or space separated list', nargs='+',
|
parser.add_argument('--sta_name', help='Optional: User defined station names: 1.2.wlan0 1.3.wlan0', nargs='+',
|
||||||
default=["sta0000"])
|
default=["sta0000"])
|
||||||
parser.add_argument('--csv_output', help='create CSV from scan results, otherwise print it in the terminal', default=None)
|
parser.add_argument('--csv_output', help='Specify file to which csv output will be saved, otherwise print it in the terminal',
|
||||||
|
default=None)
|
||||||
|
parser.add_argument('--scan_time', help='Specify time in seconds to wait for scan to complete. Default is 15',
|
||||||
|
default=15, type=int)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
@@ -181,20 +234,25 @@ def main():
|
|||||||
radio=args.radio,
|
radio=args.radio,
|
||||||
security=args.security,
|
security=args.security,
|
||||||
use_ht160=False,
|
use_ht160=False,
|
||||||
|
use_existing_station=args.use_existing_station,
|
||||||
|
scan_time=args.scan_time,
|
||||||
csv_output=args.csv_output,
|
csv_output=args.csv_output,
|
||||||
mode=args.mode,
|
mode=args.mode,
|
||||||
_debug_on=args.debug)
|
_debug_on=args.debug)
|
||||||
|
|
||||||
sta_scan_test.pre_cleanup()
|
if (not args.use_existing_station):
|
||||||
|
sta_scan_test.pre_cleanup()
|
||||||
|
|
||||||
sta_scan_test.build()
|
sta_scan_test.build()
|
||||||
# exit()
|
# exit()
|
||||||
if not sta_scan_test.passes():
|
if not sta_scan_test.passes():
|
||||||
print(sta_scan_test.get_fail_message())
|
print(sta_scan_test.get_fail_message())
|
||||||
sta_scan_test.exit_fail()
|
sta_scan_test.exit_fail()
|
||||||
|
|
||||||
sta_scan_test.start()
|
sta_scan_test.start()
|
||||||
sta_scan_test.cleanup()
|
|
||||||
|
if (not args.use_existing_station):
|
||||||
|
sta_scan_test.cleanup()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user