mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
339 lines
13 KiB
Python
Executable File
339 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
"""
|
|
NAME: lf_we_can_scan.py
|
|
|
|
|
|
PURPOSE:
|
|
This program is used for scanning the ssid using real client.
|
|
The program will generate an output directory based on date and time in the /home/lanforge/html-reports/ .
|
|
|
|
|
|
EXAMPLE: ./python we_can_scan.py --mgr 192.168.200.220 --mgr_port 8080 --ssid wecan --security wpa2 --radio wiphy0
|
|
|
|
Note: To Run this script
|
|
WE-CAN app should be installed on the phone and should be Connected to lanforge server.
|
|
|
|
LICENSE:
|
|
Free to distribute and modify. LANforge systems must be licensed.
|
|
Copyright 2021 Candela Technologies Inc
|
|
"""
|
|
import datetime
|
|
import sys
|
|
import os
|
|
import importlib
|
|
import pandas as pd
|
|
|
|
if sys.version_info[0] != 3:
|
|
print("This script requires Python 3")
|
|
exit(1)
|
|
|
|
import argparse
|
|
import time
|
|
from lf_report import lf_report
|
|
|
|
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
|
|
|
|
realm = importlib.import_module("py-json.realm")
|
|
Realm = realm.Realm
|
|
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
|
|
|
|
# lf_report = importlib.import_module("py-scripts.lf_report")
|
|
|
|
|
|
class WeCanStaScan(Realm):
|
|
def __init__(self,
|
|
ssid=None,
|
|
security=None,
|
|
password=None,
|
|
sta_list=None,
|
|
upstream=None,
|
|
radio=None,
|
|
host="localhost",
|
|
port=8080,
|
|
resource=1,
|
|
use_ht160=False,
|
|
_debug_on=False,
|
|
_exit_on_error=False,
|
|
_exit_on_fail=False):
|
|
if sta_list is None:
|
|
sta_list = []
|
|
super().__init__(lfclient_host=host,
|
|
lfclient_port=port),
|
|
self.upstream = upstream
|
|
self.host = host
|
|
self.port = port
|
|
self.ssid = ssid
|
|
self.sta_list = sta_list
|
|
self.security = security
|
|
self.password = password
|
|
self.radio = radio
|
|
self.debug = _debug_on
|
|
self.station_profile = self.new_station_profile()
|
|
self.station_profile.lfclient_url = self.lfclient_url
|
|
self.station_profile.ssid = self.ssid
|
|
self.station_profile.ssid_pass = self.password
|
|
self.station_profile.security = self.security
|
|
self.station_profile.debug = self.debug
|
|
self.station_profile.use_ht160 = use_ht160
|
|
|
|
def start(self):
|
|
stations = ['wlan0']
|
|
scan_list = {}
|
|
pass_fail = []
|
|
signal_str = []
|
|
full_scan_data = []
|
|
resource_id, phone_name, mac_address, user_name, phone_radio = self.get_resource_data()
|
|
print("Phone Name: ", phone_name)
|
|
print("Resource Id ",resource_id)
|
|
for id in resource_id:
|
|
for port in stations:
|
|
port = LFUtils.name_to_eid(port)
|
|
data = {
|
|
"shelf": port[0],
|
|
"resource": id,
|
|
"port": port[2]
|
|
}
|
|
self.json_post("/cli-json/scan_wifi", data)
|
|
time.sleep(3)
|
|
scan_results = self.json_get(
|
|
"scanresults/%s/%s/%s" % (data['shelf'], str(data['resource']), data['port']))
|
|
|
|
if (scan_results == None):
|
|
print(f"Unable to scan properly please check it manually (Either in Phontom Mode or Disconnected)")
|
|
pass_fail.append("Failed")
|
|
signal_str.append('NA')
|
|
full_scan_data.append(['Unable to scan properly please check it manually (Real Client is '
|
|
'Either in Phantom Mode or Disconnected)'])
|
|
break
|
|
tmp_ssid = []
|
|
tmp_bss = []
|
|
tmp_signal = []
|
|
for result in scan_results['scan-results']:
|
|
for name, info in result.items():
|
|
if info['ssid'] != '':
|
|
tmp_ssid.append(info['ssid'])
|
|
tmp_bss.append(info['bss'])
|
|
tmp_signal.append(info['signal'])
|
|
if "\\x" not in info['ssid']:
|
|
scan_list[info['ssid']] = info['signal']
|
|
table = pd.DataFrame({"SSID":tmp_ssid,
|
|
"BSSI":tmp_bss,
|
|
"Signal":tmp_signal,
|
|
}).to_html(index=False)
|
|
full_scan_data.append(table)
|
|
|
|
if (self.ssid not in scan_list):
|
|
print("[FAILED] ssid not found in the scan \nResource id %s is unable to scan the ssid: \" %s \""
|
|
% (id, self.ssid))
|
|
pass_fail.append("Failed")
|
|
signal_str.append('NA')
|
|
# return None
|
|
else:
|
|
pass_fail.append("Success")
|
|
signal_str.append(scan_list[self.ssid])
|
|
print("[PASSED] for Resouce id ", id)
|
|
|
|
# print("All Phones are able scan the provided ssid: ", self.ssid)
|
|
|
|
print("Generating Reports ")
|
|
print("RESOURCE ID: ", resource_id, "\nPHONE NAME: ", phone_name, "\nMAC Address: ", mac_address,
|
|
"\nPASS/FAIL: ", pass_fail, "\nSignal: ", signal_str, "\nUser Name: ", user_name,
|
|
"\nPhone Radio: ", phone_radio)
|
|
radio = ["2G/5G" for i in range(len(pass_fail))]
|
|
|
|
data = pd.DataFrame({
|
|
'Phone Name': phone_name,
|
|
'User Name': user_name,
|
|
'MAC Address': mac_address,
|
|
'Resource ID': resource_id,
|
|
'Signal Strength': signal_str,
|
|
'Phone Radio': phone_radio,
|
|
'Passed/Failed': pass_fail,
|
|
})
|
|
self.generate_report(data, full_scan_data, phone_name, pass_fail)
|
|
|
|
def build(self):
|
|
self._pass("PASS: Station build finished")
|
|
|
|
def get_resource_data(self):
|
|
resource_id_list = []
|
|
phone_name_list = []
|
|
mac_address = []
|
|
user_name = []
|
|
phone_radio = []
|
|
eid_data = self.json_get("ports?fields=alias,mac,mode,Parent Dev")
|
|
for alias in eid_data["interfaces"]:
|
|
for i in alias:
|
|
if (int(i.split(".")[1]) > 1 and alias[i]["alias"] == 'wlan0'):
|
|
resource_id_list.append(i.split(".")[1])
|
|
resource_hw_data = self.json_get("/resource/" + i.split(".")[0] + "/" + i.split(".")[1])
|
|
# Getting MAC address
|
|
mac = alias[i]["mac"]
|
|
# Getting user name
|
|
user = resource_hw_data['resource']['user']
|
|
user_name.append(user)
|
|
# Getting user Hardware details/Name
|
|
hw_name = resource_hw_data['resource']['hw version'].split(" ")
|
|
name = " ".join(hw_name[0:2])
|
|
phone_name_list.append(name)
|
|
mac_address.append(mac)
|
|
if int(i.split(".")[1]) > 1 and alias[i]["alias"] == 'wlan0' and alias[i]["parent dev"] == 'wiphy0':
|
|
phone_radio.append(alias[i]['mode'])
|
|
# Mapping Radio Name in human readable format
|
|
# if 'a' in alias[i]['mode']:
|
|
# phone_radio.append('2G/5G')
|
|
# elif 'AUTO' in alias[i]['mode']:
|
|
# phone_radio.append("AUTO")
|
|
# else:
|
|
# phone_radio.append('2G')
|
|
return resource_id_list, phone_name_list, mac_address, user_name, phone_radio
|
|
|
|
def generate_report(self, dataset, full_scan_data_list, phone_name, pass_fail):
|
|
input_table = pd.DataFrame({
|
|
"Server IP": [self.host],
|
|
"Target SSID": [self.ssid],
|
|
"Security": [self.security],
|
|
"radio": [self.radio],
|
|
})
|
|
save_to_csv = pd.DataFrame(
|
|
{"Phone Name": phone_name,
|
|
"Full Scan List": full_scan_data_list,
|
|
"Pass/Fail": pass_fail, }
|
|
)
|
|
pass_fail_count = []
|
|
pass_count = 0
|
|
fail_count = 0
|
|
for i in pass_fail:
|
|
if i == "Success":
|
|
pass_count += 1
|
|
else:
|
|
fail_count += 1
|
|
pass_fail_count.append(int(pass_count))
|
|
pass_fail_count.append(int(fail_count))
|
|
|
|
pie_chart = pd.DataFrame({'Pass/Fail': pass_fail_count}, index=['Success', 'Failed'])
|
|
|
|
report = lf_report(_output_html="we-can-scan.html", _output_pdf="we-can-scan.pdf",
|
|
_results_dir_name="we-can scan result")
|
|
|
|
report_path = report.get_path()
|
|
report_path_date_time = report.get_path_date_time()
|
|
|
|
print("path: {}".format(report_path))
|
|
print("path_date_time: {}".format(report_path_date_time))
|
|
|
|
report.set_title("WE-CAN Real Client Scan ")
|
|
report.build_banner()
|
|
|
|
report.start_content_div()
|
|
report.set_text("<h3>Objective:" + "<h4>The scan test in the WE-CAN APP is designed to scan the SSID of the "
|
|
"different Access Points in the particular network within its range so all "
|
|
"the connected clients in the LANforge WE-CAN server should scan the given "
|
|
"SSID within its range")
|
|
report.build_date_time()
|
|
report.build_text()
|
|
|
|
report.save_pie_chart(pie_chart)
|
|
|
|
report.start_content_div()
|
|
report.set_table_title("<h3>Testing Data (User Input)")
|
|
report.build_table_title()
|
|
report.set_table_dataframe(input_table)
|
|
report.build_table()
|
|
# report.end_content_div()
|
|
|
|
report.start_content_div()
|
|
report.set_table_title("<h3>Phone Scan Table ")
|
|
report.build_table_title()
|
|
report.end_content_div()
|
|
|
|
report.set_table_dataframe(dataset)
|
|
report.pass_failed_build_table()
|
|
|
|
report.start_content_div()
|
|
report.build_chart_title("Pie Chart of Success and Failure")
|
|
report.build_chart("pie-chart.png")
|
|
|
|
if len(phone_name) == len(full_scan_data_list):
|
|
for i in range(len(phone_name)):
|
|
tmp_Phone_name = [phone_name[i]]
|
|
tmp_scan_data = [''.join(full_scan_data_list[i])]
|
|
tmp_dataset = pd.DataFrame({
|
|
'Phone Name': tmp_Phone_name,
|
|
'Scan List': tmp_scan_data,
|
|
'Pass/Fail': pass_fail[i],
|
|
})
|
|
report.start_content_div()
|
|
report.set_table_title("<h3>Scan Result for %s" % phone_name[i])
|
|
report.build_table_title()
|
|
report.set_table_dataframe(tmp_dataset)
|
|
report.pass_failed_build_table()
|
|
report.end_content_div()
|
|
|
|
# if (all(dataset['Passed/Failed'])):
|
|
# report.set_text("Description: All the phones are able to scan the given ssid as we can see from the table")
|
|
# else:
|
|
# report.set_text(
|
|
# "Description: Some of the phones are not able to scan the given ssid as we can see from the table")
|
|
# report.start_content_div()
|
|
# report.build_text()
|
|
|
|
report.save_csv("we-can-scan.csv", save_to_csv)
|
|
|
|
report.build_footer()
|
|
html_file = report.write_html()
|
|
print("returned file {}".format(html_file))
|
|
print(html_file)
|
|
|
|
report.write_pdf(_page_size='Legal', _orientation='Portrait')
|
|
|
|
|
|
def main():
|
|
parser = Realm.create_basic_argparse(
|
|
prog='we_can_scan.py',
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
epilog='''\
|
|
Used to verify if a ssid is available in a scan of a real client(Mobile Phones)
|
|
''',
|
|
description='''\
|
|
Verifies that wheather a station is available in a scan of a phone with specified resource id, for each
|
|
resouce id it performs a scan and compares if given ssid is present in the scan.
|
|
|
|
Example:
|
|
./we_can_scan.py --ssid ssid_for_test --security open --radio wiphy0
|
|
./we_can_scan.py --mgr 192.168.200.218 --mgr_port 8080 --ssid Candela-Office --security wpa2 --radio 2G
|
|
''')
|
|
|
|
# parser.add_argument('--mode', help='Used to force mode of stations')
|
|
parser.add_argument('--sta_name',
|
|
help='Optional: User defined station names, can be a comma or space separated list', nargs='+',
|
|
default=["sta0000"])
|
|
# parser.add_argument('--radio', help='Radio of real client', nargs='+',
|
|
# default=1)
|
|
|
|
args = parser.parse_args()
|
|
|
|
station_list = args.sta_name
|
|
sta_scan_test = WeCanStaScan(host=args.mgr,
|
|
port=args.mgr_port,
|
|
sta_list=station_list,
|
|
upstream=args.upstream_port,
|
|
ssid=args.ssid,
|
|
password=args.passwd,
|
|
radio=args.radio,
|
|
security=args.security,
|
|
use_ht160=False,
|
|
_debug_on=args.debug)
|
|
sta_scan_test.build()
|
|
if not sta_scan_test.passes():
|
|
print(sta_scan_test.get_fail_message())
|
|
sta_scan_test.exit_fail()
|
|
|
|
sta_scan_test.start()
|
|
del sta_scan_test
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|