lf_report.py: modified headers properly in neat and clean format

Signed-off-by: amrit <amrit.raj@candelatech.com>
This commit is contained in:
amrit
2022-05-12 01:04:29 +05:30
committed by shivam
parent ae984564b2
commit ee8c0b57c7
3 changed files with 767 additions and 2 deletions

View File

@@ -37,6 +37,8 @@ import traceback
import logging import logging
import importlib import importlib
from matplotlib import pyplot as plt
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../"))) sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -495,11 +497,51 @@ class lf_report:
self.path_date_time = os.path.join(curr_dir_path, self.date_time) self.path_date_time = os.path.join(curr_dir_path, self.date_time)
os.mkdir(self.path_date_time) os.mkdir(self.path_date_time)
def pass_fail_background(self, cell_value):
highlight_success = 'background-color: #4af84a;'
highlight_fail = 'background-color: #ff1300;'
if type(cell_value) in [str]:
if cell_value == "Success":
return highlight_success
elif cell_value == "Failed":
return highlight_fail
def build_table(self): def build_table(self):
self.dataframe_html = self.dataframe.to_html(index=False, self.dataframe_html = self.dataframe.to_html(index=False,
justify='center') # have the index be able to be passed in. justify='center') # have the index be able to be passed in.
self.html += self.dataframe_html self.html += self.dataframe_html
def pass_failed_build_table(self):
self.dataframe_html = self.dataframe.style.hide_index(subset=None, level=None, names=False).applymap \
(self.pass_fail_background).to_html(index=False,
justify='center') # have the index be able to be passed in.
self.html += self.dataframe_html
def save_csv(self, file_name, save_to_csv_data):
save_to_csv_data.to_csv(str(self.path_date_time) + "/" + file_name)
def save_pie_chart(self, pie_chart_data):
explode = (0, 0.1)
pie_chart = pie_chart_data.plot.pie(y='Pass/Fail', autopct="%.2f%%", explode=explode, figsize=(10, 10),
shadow=True, startangle=90,
colors=['#4af84a', '#ff1300'])
plt.savefig(str(self.path_date_time) + '/pie-chart.png')
def save_bar_chart(self, bar_chart_data, name):
plot = bar_chart_data.plot.bar(alpha=0.3)
plot.legend(bbox_to_anchor=(1.0, 1.0))
plot.set_title(name)
for p in plot.patches:
height = p.get_height()
plot.annotate('{}'.format(height),
xy=(p.get_x() + p.get_width() / 2, height),
xytext=(0, 0), # 3 points vertical offset
textcoords="offset points",
ha='center', va='bottom')
plt.tight_layout()
plt.savefig(str(self.path_date_time) + '/' + name + '.png')
def test_setup_table(self, test_setup_data, value): def test_setup_table(self, test_setup_data, value):
if test_setup_data is None: if test_setup_data is None:
@@ -614,6 +656,19 @@ function copyTextToClipboard(ele) {
def end_content_div(self): def end_content_div(self):
self.html += "\n</div><!-- end contentDiv -->\n" self.html += "\n</div><!-- end contentDiv -->\n"
def build_chart_title(self, chart_title):
self.chart_title_html = """
<div class='HeaderStyle'>
<h3 class='TitleFontPrint' style='color:darkgreen;'>{title}</h3>
""".format(title=chart_title)
self.html += self.chart_title_html
def build_chart(self, name):
self.chart_html_obj = """
<img align='center' style='padding:15px;margin:5px 5px 2em 5px;width:500px;' src='{image}'/>
""".format(image=name)
self.html += self.chart_html_obj
# Unit Test # Unit Test
if __name__ == "__main__": if __name__ == "__main__":
@@ -622,7 +677,7 @@ if __name__ == "__main__":
formatter_class=argparse.RawTextHelpFormatter, formatter_class=argparse.RawTextHelpFormatter,
description="Reporting library Unit Test") description="Reporting library Unit Test")
parser.add_argument('--lfmgr', help='sample argument: where LANforge GUI is running', default='localhost') parser.add_argument('--lfmgr', help='sample argument: where LANforge GUI is running', default='localhost')
# the args parser is not really used , this is so the report is not generated when testing # the args parser is not really used , this is so the report is not generated when testing
# the imports with --help # the imports with --help
args = parser.parse_args() args = parser.parse_args()
logger.info("LANforge manager {lfmgr}".format(lfmgr=args.lfmgr)) logger.info("LANforge manager {lfmgr}".format(lfmgr=args.lfmgr))
@@ -639,7 +694,7 @@ if __name__ == "__main__":
logger.info(dataframe) logger.info(dataframe)
# Testing: generate data frame # Testing: generate data frame
dataframe2 = pd.DataFrame({ dataframe2 = pd.DataFrame({
'station': [1, 2, 3, 4, 5, 6, 7], 'station': [1, 2, 3, 4, 5, 6, 7],
'time_seconds': [23, 78, 22, 19, 45, 22, 25] 'time_seconds': [23, 78, 22, 19, 45, 22, 25]

View File

@@ -0,0 +1,336 @@
#!/usr/bin/env python3
"""
NAME: lf_we_can_scan.py
PURPOSE:
This program is used for scanning the ssid using real client.
The program will generate an output directory based on date and time in the /home/lanforge/html-reports/ .
EXAMPLE: ./python we_can_scan.py --mgr 192.168.200.220 --mgr_port 8080 --ssid wecan --security wpa2 --radio wiphy0
Note: To Run this script
WE-CAN app should be installed on the phone and should be Connected to lanforge server.
LICENSE:
Free to distribute and modify. LANforge systems must be licensed.
Copyright 2021 Candela Technologies Inc
"""
import datetime
import sys
import os
import importlib
import pandas as pd
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
import argparse
import time
from lf_report import lf_report
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
realm = importlib.import_module("py-json.realm")
Realm = realm.Realm
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
# lf_report = importlib.import_module("py-scripts.lf_report")
class WeCanStaScan(Realm):
def __init__(self,
ssid=None,
security=None,
password=None,
sta_list=None,
upstream=None,
radio=None,
host="localhost",
port=8080,
resource=1,
use_ht160=False,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
if sta_list is None:
sta_list = []
super().__init__(lfclient_host=host,
lfclient_port=port),
self.upstream = upstream
self.host = host
self.port = port
self.ssid = ssid
self.sta_list = sta_list
self.security = security
self.password = password
self.radio = radio
self.debug = _debug_on
self.station_profile = self.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.debug = self.debug
self.station_profile.use_ht160 = use_ht160
def start(self):
stations = ['wlan0']
scan_list = {}
pass_fail = []
signal_str = []
full_scan_data = []
resource_id, phone_name, mac_address, user_name, phone_radio = self.get_resource_data()
print("Phone Name: ", phone_name)
print("Resource Id ",resource_id)
for id in resource_id:
for port in stations:
port = LFUtils.name_to_eid(port)
data = {
"shelf": port[0],
"resource": id,
"port": port[2]
}
self.json_post("/cli-json/scan_wifi", data)
time.sleep(3)
scan_results = self.json_get(
"scanresults/%s/%s/%s" % (data['shelf'], str(data['resource']), data['port']))
if (scan_results == None):
print(f"Unable to scan properly please check it manually (Either in Phontom Mode or Disconnected)")
pass_fail.append("Failed")
signal_str.append('NA')
full_scan_data.append(['Unable to scan properly please check it manually (Real Client is '
'Either in Phantom Mode or Disconnected)'])
break
tmp_ssid = []
tmp_bss = []
tmp_signal = []
for result in scan_results['scan-results']:
for name, info in result.items():
if info['ssid'] != '':
tmp_ssid.append(info['ssid'])
tmp_bss.append(info['bss'])
tmp_signal.append(info['signal'])
if "\\x" not in info['ssid']:
scan_list[info['ssid']] = info['signal']
table = pd.DataFrame({"SSID":tmp_ssid,
"BSSI":tmp_bss,
"Signal":tmp_signal,
}).to_html(index=False)
full_scan_data.append(table)
if (self.ssid not in scan_list):
print("[FAILED] ssid not found in the scan \nResource id %s is unable to scan the ssid: \" %s \""
% (id, self.ssid))
pass_fail.append("Failed")
signal_str.append('NA')
# return None
else:
pass_fail.append("Success")
signal_str.append(scan_list[self.ssid])
print("[PASSED] for Resouce id ", id)
# print("All Phones are able scan the provided ssid: ", self.ssid)
print("Generating Reports ")
print("RESOURCE ID: ", resource_id, "\nPHONE NAME: ", phone_name, "\nMAC Address: ", mac_address,
"\nPASS/FAIL: ", pass_fail, "\nSignal: ", signal_str, "\nUser Name: ", user_name,
"\nPhone Radio: ", phone_radio)
radio = ["2G/5G" for i in range(len(pass_fail))]
data = pd.DataFrame({
'Phone Name': phone_name,
'User Name': user_name,
'MAC Address': mac_address,
'Resource ID': resource_id,
'Signal Strength': signal_str,
'Phone Radio': phone_radio,
'Passed/Failed': pass_fail,
})
self.generate_report(data, full_scan_data, phone_name, pass_fail)
def build(self):
self._pass("PASS: Station build finished")
def get_resource_data(self):
resource_id_list = []
phone_name_list = []
mac_address = []
user_name = []
phone_radio = []
eid_data = self.json_get("ports?fields=alias,mac,mode,Parent Dev")
for alias in eid_data["interfaces"]:
for i in alias:
if (int(i.split(".")[1]) > 1 and alias[i]["alias"] == 'wlan0'):
resource_id_list.append(i.split(".")[1])
resource_hw_data = self.json_get("/resource/" + i.split(".")[0] + "/" + i.split(".")[1])
# Getting MAC address
mac = alias[i]["mac"]
# Getting user name
user = resource_hw_data['resource']['user']
user_name.append(user)
# Getting user Hardware details/Name
hw_name = resource_hw_data['resource']['hw version'].split(" ")
name = " ".join(hw_name[0:2])
phone_name_list.append(name)
mac_address.append(mac)
if int(i.split(".")[1]) > 1 and alias[i]["alias"] == 'wlan0' and alias[i]["parent dev"] == 'wiphy0':
phone_radio.append(alias[i]['mode'])
# Mapping Radio Name in human readable format
# if 'a' in alias[i]['mode']:
# phone_radio.append('2G/5G')
# elif 'AUTO' in alias[i]['mode']:
# phone_radio.append("AUTO")
# else:
# phone_radio.append('2G')
return resource_id_list, phone_name_list, mac_address, user_name, phone_radio
def generate_report(self, dataset, full_scan_data_list, phone_name, pass_fail):
input_table = pd.DataFrame({
"Server IP": ["192.168.200.218"],
"Target SSID": ["Candela-Office"],
"Security": ["wpa2"],
"radio": ["2G"],
})
save_to_csv = pd.DataFrame(
{"Phone Name": phone_name,
"Full Scan List": full_scan_data_list,
"Pass/Fail": pass_fail, }
)
pass_fail_count = []
pass_count = 0
fail_count = 0
for i in pass_fail:
if i == "Success":
pass_count += 1
else:
fail_count += 1
pass_fail_count.append(int(pass_count))
pass_fail_count.append(int(fail_count))
pie_chart = pd.DataFrame({'Pass/Fail': pass_fail_count}, index=['Success', 'Failed'])
report = lf_report(_output_html="we-can-scan.html", _output_pdf="we-can-scan.pdf",
_results_dir_name="we-can scan result")
report_path = report.get_path()
report_path_date_time = report.get_path_date_time()
print("path: {}".format(report_path))
print("path_date_time: {}".format(report_path_date_time))
report.set_title("WE-CAN Real Client Scan ")
report.build_banner()
report.start_content_div()
report.set_text("<h3>Objective:" + "<h4>The WE-CAN scan Test is designed to test the user's Phone Wi-Fi radio scan "
"the script Verifies that wheather a station is available in a scan of a phone with specified resource "
"id, for each resouce id it performs a scan and compares if given ssid is present in the scan.")
report.build_date_time()
report.build_text()
report.save_pie_chart(pie_chart)
report.start_content_div()
report.set_table_title("<h3>Testing Data (User Input)")
report.build_table_title()
report.set_table_dataframe(input_table)
report.build_table()
# report.end_content_div()
report.start_content_div()
report.set_table_title("<h3>Phone Scan Table ")
report.build_table_title()
report.end_content_div()
report.set_table_dataframe(dataset)
report.pass_failed_build_table()
report.start_content_div()
report.build_chart_title("Pie Chart of Success and Failure")
report.build_chart("pie-chart.png")
if len(phone_name) == len(full_scan_data_list):
for i in range(len(phone_name)):
tmp_Phone_name = [phone_name[i]]
tmp_scan_data = [''.join(full_scan_data_list[i])]
tmp_dataset = pd.DataFrame({
'Phone Name': tmp_Phone_name,
'Scan List': tmp_scan_data,
'Pass/Fail': pass_fail[i],
})
report.start_content_div()
report.set_table_title("<h3>Scan Result for %s" % phone_name[i])
report.build_table_title()
report.set_table_dataframe(tmp_dataset)
report.pass_failed_build_table()
# if (all(dataset['Passed/Failed'])):
# report.set_text("Description: All the phones are able to scan the given ssid as we can see from the table")
# else:
# report.set_text(
# "Description: Some of the phones are not able to scan the given ssid as we can see from the table")
# report.start_content_div()
# report.build_text()
report.save_csv("we-can-scan.csv", save_to_csv)
report.build_footer()
html_file = report.write_html()
print("returned file {}".format(html_file))
print(html_file)
report.write_pdf(_page_size='Legal', _orientation='Portrait')
def main():
parser = Realm.create_basic_argparse(
prog='we_can_scan.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Used to verify if a ssid is available in a scan of a real client(Mobile Phones)
''',
description='''\
Verifies that wheather a station is available in a scan of a phone with specified resource id, for each
resouce id it performs a scan and compares if given ssid is present in the scan.
Example:
./we_can_scan.py --ssid ssid_for_test --security open --radio wiphy0
./we_can_scan.py --mgr 192.168.200.218 --mgr_port 8080 --ssid Candela-Office --security wpa2 --radio 2G
''')
# parser.add_argument('--mode', help='Used to force mode of stations')
parser.add_argument('--sta_name',
help='Optional: User defined station names, can be a comma or space separated list', nargs='+',
default=["sta0000"])
# parser.add_argument('--radio', help='Radio of real client', nargs='+',
# default=1)
args = parser.parse_args()
station_list = args.sta_name
sta_scan_test = WeCanStaScan(host=args.mgr,
port=args.mgr_port,
sta_list=station_list,
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
security=args.security,
use_ht160=False,
_debug_on=args.debug)
sta_scan_test.build()
if not sta_scan_test.passes():
print(sta_scan_test.get_fail_message())
sta_scan_test.exit_fail()
sta_scan_test.start()
del sta_scan_test
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,374 @@
#!/usr/bin/env python3
"""
NAME: lf_we_can_wifi_capacity_test.py
Use './lf_we_can_wifi_capacity_test.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
example: ./python lf_we_can_wifi_capacity_test.py --mgr 192.168.200.220 --mgr_port 8080 --ssid wecan --security wpa2 --radio wiphy0
Note: To Run this script gui should be opened with
path: cd LANforgeGUI_5.4.3 (5.4.3 can be changed with GUI version)
pwd (Output : /home/lanforge/LANforgeGUI_5.4.3)
./lfclient.bash -cli-socket 3990
"""
import importlib
import os
import sys
import pandas as pd
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
import argparse
from lf_report import lf_report
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
realm = importlib.import_module("py-json.realm")
Realm = realm.Realm
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
cv_test_manager = importlib.import_module("py-json.cv_test_manager")
cv_test = cv_test_manager.cv_test
cv_add_base_parser = cv_test_manager.cv_add_base_parser
cv_base_adjust_parser = cv_test_manager.cv_base_adjust_parser
lf_graph = importlib.import_module("py-scripts.lf_graph")
lf_bar_graph = lf_graph.lf_bar_graph
lf_logger_config = importlib.import_module("py-scripts.lf_logger_config")
from lf_wifi_capacity_test import WiFiCapacityTest
class we_can_wifi_capacity((Realm)):
def __init__(self,
ssid=None,
security=None,
password=None,
sta_list=None,
upstream=None,
radio=None,
host="localhost",
port=8080,
resource=1,
use_ht160=False,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
if sta_list is None:
sta_list = []
super().__init__(lfclient_host=host,
lfclient_port=port),
self.upstream = upstream
self.host = host
self.port = port
self.ssid = ssid
self.sta_list = sta_list
self.security = security
self.password = password
self.radio = radio
self.debug = _debug_on
self.station_profile = self.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.debug = self.debug
self.station_profile.use_ht160 = use_ht160
def get_data(self):
resource_id_real, phone_name, mac_address, user_name, phone_radio, rx_rate, tx_rate = self.get_resource_data()
rx_rate = [(i.split(" ")[0]) if (i.split(" ")[0]) != '' else '0' for i in rx_rate]
tx_rate = [(i.split(" ")[0]) if (i.split(" ")[0]) != '' else '0' for i in tx_rate]
print("DATAAAAAAAAAAAA:\n", resource_id_real, phone_name, mac_address, user_name, phone_radio, rx_rate, tx_rate)
dataframe = pd.read_csv("/home/amrit-candela/Desktop/wifi-capacity-2022-04-26-10-52-21/csv-data/data-"
"Combined_Mbps__60_second_running_average-1.csv", header=1)
# dataFrame.drop("Unnamed: 2",inplace=True, axis=1)
del dataframe["Unnamed: 2"]
print(dataframe)
download_rate = []
upload_rate = []
resource_id = []
for column in dataframe:
# for each resource id getting upload and Download Data
resource_id.append(column.split('.')[0])
download_rate.append(float(dataframe[column].loc[0]))
upload_rate.append(float(dataframe[column].loc[1]))
# Plotting Graph 01
# Creating DataFrames
rx_tx_df = pd.DataFrame({
"upload": upload_rate,
"download": download_rate,
}, index=[phone_name[0], phone_name[0]])
# rx_tx_plot = rx_tx_df.plot.bar(alpha=0.5)
# for p in rx_tx_plot.patches:
# height = p.get_height()
# rx_tx_plot.annotate('{}Mbps'.format(height),
# xy=(p.get_x() + p.get_width() / 2, height),
# xytext=(0, 10), # 3 points vertical offset
# textcoords="offset points",
# ha='center', va='bottom')
# plt.tight_layout()
# plt.show()
# Plotting Graph 03
# Creating DataFrames
# print(rx_rate, tx_rate)
Band_2G_5G_df = pd.DataFrame({
"upload": upload_rate,
"download": download_rate,
}, index=["( " + phone_radio[0] + " )" + phone_name[0], "( " + phone_radio[0] + " )" + phone_name[0]])
# band_2G_5G_plot = Band_2G_5G_df.plot.bar(alpha=0.5)
# for p in band_2G_5G_plot.patches:
# height = p.get_height()
# band_2G_5G_plot.annotate('{}'.format(height),
# xy=(p.get_x() + p.get_width() / 2, height),
# xytext=(0, 3), # 3 points vertical offset
# textcoords="offset points",
# ha='center', va='bottom')
# plt.tight_layout()
# plt.show()
# Plotting Graph 04
# Creating DataFrames
print(rx_rate, tx_rate)
link_rate_df = pd.DataFrame({
"Link Rx Rate": rx_rate,
"Actual Rx Rate": download_rate,
"Link Tx Rate": tx_rate,
"Actual Tx Rate": upload_rate,
}, index=[phone_name[0], phone_name[0]])
# rx_tx_plot = rx_tx_df.plot.bar(alpha=0.5)
# for p in rx_tx_plot.patches:
# height = p.get_height()
# rx_tx_plot.annotate('{}'.format(height),
# xy=(p.get_x() + p.get_width() / 2, height),
# xytext=(0, 3), # 3 points vertical offset
# textcoords="offset points",
# ha='center', va='bottom')
# plt.tight_layout()
# plt.show()
# Plotting Graph 05 (User Name)
avg_rate = []
for i in range(len(upload_rate)):
avg_rate.append((upload_rate[i] + download_rate[i]) / 2)
# Creating DataFrames
user_name_df = pd.DataFrame({
"upload": upload_rate,
"download": download_rate,
"Average": avg_rate,
}, index=[user_name[0], user_name[0]])
# user_name_plot = user_name_df.plot.bar(alpha=0.5)
# for p in user_name_plot.patches:
# height = p.get_height()
# user_name_plot.annotate('{}'.format(height),
# xy=(p.get_x() + p.get_width() / 2, height),
# xytext=(0, 3), # 3 points vertical offset
# textcoords="offset points",
# ha='center', va='bottom')
# plt.tight_layout()
# plt.show()
phone_data = [resource_id_real, phone_name, mac_address, user_name, phone_radio, rx_rate, tx_rate]
self.generate_report(phone_data, rx_tx_df, Band_2G_5G_df, link_rate_df, user_name_df)
# exit(0)
# Getting Resource id, phone name, mac address, username, phone radio
# resource_id, phone_name, mac_address, user_name, phone_radio = self.get_resource_data()
# resource_id_phone_company_dict_map = {}
# resource_id_phone_radio_dict_map = {}
# for i in range(len(phone_name)):
# resource_id_phone_company_dict_map[resource_id[i]] = phone_name[i]
# resource_id_phone_radio_dict_map[resource_id[i]] = phone_radio[i]
# print("Company Name: ", resource_id_phone_company_dict_map)
# print("Radio Name: ", resource_id_phone_radio_dict_map)
#
# print(self.get_resource_data())
def generate_report(self, get_data, rx_tx_df, Band_2G_5G_df, link_rate_df, user_name_df):
resource_id = get_data[0]
phone_name = get_data[1]
mac_address = get_data[2]
user_name = get_data[3]
phone_radio = get_data[4]
rx_rate = get_data[5]
tx_rate = get_data[6]
report = lf_report(_output_html="we-can-wifi-capacity.html", _output_pdf="we-can-wifi-capacity.pdf",
_results_dir_name="we-can wifi-capacity result")
report_path = report.get_path()
report_path_date_time = report.get_path_date_time()
print("path: {}".format(report_path))
print("path_date_time: {}".format(report_path_date_time))
report.set_title("WE-CAN Wi-Fi Capacity test for Real Clients")
report.build_banner()
report.start_content_div()
report.set_text(
"<h3>Objective:" + "<h4>The WE-CAN wifi-capacity Test is designed to measure the performance of an Access "
"Point when handling different types of real clients.")
report.build_date_time()
report.build_text()
# report.start_content_div()
# report.set_text("<h3>Phone Details:" + "<h4>All the Phone Details are providede in the table below.")
# report.build_text()
data = {
"Resource ID": resource_id,
"Phone Name": phone_name,
"MAC Address": mac_address,
"User Name": user_name,
"Phone Radio": phone_radio,
"Rx Rate (Mbps) ": rx_rate,
"Tx Rate (Mbps)" : tx_rate,
}
phone_details = pd.DataFrame(data)
report.start_content_div()
report.set_table_title("<h3>Phone Details")
report.build_table_title()
report.set_table_dataframe(phone_details)
report.build_table()
report.save_bar_chart(rx_tx_df, "rx-tx")
report.start_content_div()
report.build_chart_title("Rx Tx Chart")
report.build_chart("rx-tx.png")
report.save_bar_chart(Band_2G_5G_df, "2G-5G")
report.start_content_div()
report.build_chart_title("2G vs 5G ")
report.build_chart("2G-5G.png")
report.save_bar_chart(link_rate_df, "link_rate")
report.start_content_div()
report.build_chart_title("Link Rate Chart")
report.build_chart("link_rate.png")
report.save_bar_chart(user_name_df, "user_name")
report.start_content_div()
report.build_chart_title("User Name ")
report.build_chart("user_name.png")
report.build_footer()
html_file = report.write_html()
print("returned file {}".format(html_file))
print(html_file)
report.write_pdf(_page_size='Legal', _orientation='Portrait')
def get_resource_data(self):
resource_id_list = []
phone_name_list = []
mac_address = []
user_name = []
phone_radio = []
rx_rate = []
tx_rate = []
eid_data = self.json_get("ports?fields=alias,mac,mode,Parent Dev,rx-rate,tx-rate")
for alias in eid_data["interfaces"]:
for i in alias:
if int(i.split(".")[1]) > 1 and alias[i]["alias"] == 'wlan0':
resource_id_list.append(i.split(".")[1])
resource_hw_data = self.json_get("/resource/" + i.split(".")[0] + "/" + i.split(".")[1])
# Getting MAC address
mac = alias[i]["mac"]
rx = alias[i]["rx-rate"]
tx = alias[i]["tx-rate"]
rx_rate.append(rx)
tx_rate.append(tx)
# Getting username
user = resource_hw_data['resource']['user']
user_name.append(user)
# Getting user Hardware details/Name
hw_name = resource_hw_data['resource']['hw version'].split(" ")
name = " ".join(hw_name[0:2])
phone_name_list.append(name)
mac_address.append(mac)
if int(i.split(".")[1]) > 1 and alias[i]["alias"] == 'wlan0' and alias[i]["parent dev"] == 'wiphy0':
# phone_radio.append(alias[i]['mode'])
# Mapping Radio Name in human readable format
if 'a' not in alias[i]['mode'] or "20" in alias[i]['mode']:
phone_radio.append('2G')
elif 'AUTO' in alias[i]['mode']:
phone_radio.append("AUTO")
else:
phone_radio.append('2G/5G')
return resource_id_list, phone_name_list, mac_address, user_name, phone_radio, rx_rate, tx_rate
def main():
parser = argparse.ArgumentParser(
prog="we_can_wifi_capacity_test.py",
formatter_class=argparse.RawTextHelpFormatter,
description="""
./we_can_wifi_capacity_test.py --mgr localhost --port 8080 --lf_user lanforge --lf_password lanforge \
--instance_name wct_instance --config_name wifi_config --upstream 1.1.eth1 --batch_size 1 --loop_iter 1 \
--protocol UDP-IPv4 --duration 6000 --pull_report --stations 1.1.sta0000,1.1.sta0001 \
--create_stations --radio wiphy0 --ssid test-ssid --security open --paswd [BLANK] \
""")
cv_add_base_parser(parser) # see cv_test_manager.py
parser = argparse.ArgumentParser(description="Netgear AP DFS Test Script")
parser.add_argument('--mgr', type=str, help='host name', default="localhost")
parser.add_argument('--port', type=str, help='port number', default="8080")
parser.add_argument("--upstream", type=str, default="", help="Upstream port for wifi capacity test ex. 1.1.eth1")
parser.add_argument("--batch_size", type=str, default="", help="station increment ex. 1,2,3")
parser.add_argument("--protocol", type=str, default="", help="Protocol ex.TCP-IPv4")
parser.add_argument("--lf_user", type=str, default="", help="lanforge user name ex. root,lanforge")
parser.add_argument("--lf_password", type=str, default="", help="lanforge user password ex. root,lanforge")
parser.add_argument("--duration", type=str, default="", help="duration in ms. ex. 5000")
parser.add_argument("--download_rate", type=str, default="10Mbps",
help="Select requested download rate. Kbps, Mbps, Gbps units supported. Default is 10Mbps")
parser.add_argument("--upload_rate", type=str, default="10Mbps",
help="Select requested upload rate. Kbps, Mbps, Gbps units supported. Default is 10Mbps")
parser.add_argument("--influx_host", type=str, default="localhost", help="NA")
parser.add_argument("--local_lf_report_dir",
help="--local_lf_report_dir <where to pull reports to> default '' put where dataplane script run from",
default="")
args = parser.parse_args()
WFC_Test = WiFiCapacityTest(lfclient_host=args.mgr,
lf_port=args.port,
lf_user=args.lf_user,
lf_password=args.lf_password,
upstream=args.upstream,
batch_size=args.batch_size,
protocol=args.protocol,
duration=args.duration,
# pull_report=args.pull_report,
download_rate=args.download_rate,
upload_rate=args.upload_rate,
influx_host=args.mgr,
influx_port=8086,
local_lf_report_dir=args.local_lf_report_dir,
)
# WFC_Test.setup()
# time1 = datetime.datetime.now() - timedelta(minutes=30)
# print("time 12 hr format : ", (datetime.datetime.now() - timedelta(minutes=30)).strftime('%Y-%m-%d-%I-%M-%S'))
# WFC_Test.run()
# print("Code Stopped")
# print("Time2: ", (datetime.datetime.now()).strftime('%Y-%m-%d-%I-%M-%S'))
wifi_capacity = we_can_wifi_capacity(host=args.mgr, port=args.port)
wifi_capacity.get_data()
# WFC_Test.check_influx_kpi(args)
if __name__ == "__main__":
main()