mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-02 03:37:55 +00:00
merge conflict lf_check, dpt solved
Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
@@ -5,9 +5,11 @@ Note: This script is working as library for chamberview tests.
|
||||
|
||||
import time
|
||||
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from realm import Realm
|
||||
import json
|
||||
from pprint import pprint
|
||||
import argparse
|
||||
from cv_test_reports import lanforge_reports as lf_rpt
|
||||
from csv_to_influx import *
|
||||
import os.path
|
||||
@@ -66,14 +68,12 @@ class cv_test(Realm):
|
||||
def __init__(self,
|
||||
lfclient_host="localhost",
|
||||
lfclient_port=8080,
|
||||
lf_report_dir="",
|
||||
debug=False
|
||||
lf_report_dir=""
|
||||
):
|
||||
super().__init__(lfclient_host=lfclient_host,
|
||||
lfclient_port=lfclient_port)
|
||||
self.lf_report_dir = lf_report_dir
|
||||
self.report_name = None
|
||||
self.debug = debug
|
||||
|
||||
# Add a config line to a text blob. Will create new text blob
|
||||
# if none exists already.
|
||||
@@ -128,7 +128,7 @@ class cv_test(Realm):
|
||||
"cmd": command
|
||||
}
|
||||
debug_par = ""
|
||||
rsp = self.json_post("/gui-json/cmd%s" % debug_par, data, debug_=self.debug, response_json_list_=response_json)
|
||||
rsp = self.json_post("/gui-json/cmd%s" % debug_par, data, debug_=False, response_json_list_=response_json)
|
||||
try:
|
||||
if response_json[0]["LAST"]["warnings"].startswith("Unknown"):
|
||||
print("Unknown command?\n");
|
||||
|
||||
@@ -22,7 +22,7 @@ import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class LfCSV:
|
||||
class lf_csv:
|
||||
def __init__(self,
|
||||
_columns=['Stations', 'bk', 'be', 'vi', 'vo'],
|
||||
_rows=[['sta0001', 'sta0002', 'sta0003', 'sta0004', 'sta0005'],
|
||||
@@ -45,5 +45,5 @@ class LfCSV:
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test = LfCSV()
|
||||
test = lf_csv()
|
||||
test.generate_csv()
|
||||
|
||||
@@ -139,8 +139,7 @@ class DataplaneTest(cv_test):
|
||||
sets=[],
|
||||
graph_groups=None,
|
||||
report_dir="",
|
||||
test_rig="",
|
||||
debug=False
|
||||
test_rig=""
|
||||
):
|
||||
super().__init__(lfclient_host=lf_host, lfclient_port=lf_port)
|
||||
|
||||
@@ -169,7 +168,6 @@ class DataplaneTest(cv_test):
|
||||
self.ssh_port = ssh_port
|
||||
self.local_lf_report_dir = local_lf_report_dir
|
||||
self.test_rig = test_rig
|
||||
self.debug = debug
|
||||
|
||||
def setup(self):
|
||||
# Nothing to do at this time.
|
||||
@@ -216,7 +214,7 @@ class DataplaneTest(cv_test):
|
||||
self.config_name, self.sets,
|
||||
self.pull_report, self.lf_host, self.lf_user, self.lf_password,
|
||||
cv_cmds, ssh_port=self.ssh_port, local_lf_report_dir=self.local_lf_report_dir,
|
||||
graph_groups_file=self.graph_groups, debug=self.debug)
|
||||
graph_groups_file=self.graph_groups)
|
||||
self.rm_text_blob(self.config_name, blob_test) # To delete old config with same name
|
||||
|
||||
|
||||
@@ -311,7 +309,6 @@ def main():
|
||||
parser.add_argument("--local_lf_report_dir",
|
||||
help="--local_lf_report_dir <where to pull reports to> default '' put where dataplane script run from",
|
||||
default="")
|
||||
parser.add_argument("--debug", default=False)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -398,8 +395,7 @@ def main():
|
||||
raw_lines_file=args.raw_lines_file,
|
||||
sets=args.set,
|
||||
graph_groups=args.graph_groups,
|
||||
test_rig=args.test_rig,
|
||||
debug=args.debug
|
||||
test_rig=args.test_rig
|
||||
)
|
||||
CV_Test.setup()
|
||||
CV_Test.run()
|
||||
|
||||
@@ -25,7 +25,7 @@ import pandas as pd
|
||||
import pdfkit
|
||||
import math
|
||||
from matplotlib.colors import ListedColormap
|
||||
from lf_csv import LfCSV
|
||||
from lf_csv import lf_csv
|
||||
|
||||
|
||||
# internal candela references included during intial phases, to be deleted at future date
|
||||
@@ -78,7 +78,7 @@ class lf_bar_graph():
|
||||
self.text_rotation = _text_rotation
|
||||
self.grp_title = _grp_title
|
||||
self.enable_csv = _enable_csv
|
||||
self.lf_csv = LfCSV()
|
||||
self.lf_csv = lf_csv()
|
||||
|
||||
def build_bar_graph(self):
|
||||
if self.color is None:
|
||||
@@ -162,7 +162,7 @@ class lf_scatter_graph():
|
||||
self.label = _label
|
||||
self.values = _values
|
||||
self.enable_csv = _enable_csv
|
||||
self.lf_csv = LfCSV()
|
||||
self.lf_csv = lf_csv()
|
||||
|
||||
def build_scatter_graph(self):
|
||||
if self.color is None:
|
||||
@@ -214,7 +214,7 @@ class lf_stacked_graph():
|
||||
self.label = _label
|
||||
self.color = _color
|
||||
self.enable_csv = _enable_csv
|
||||
self.lf_csv = LfCSV()
|
||||
self.lf_csv = lf_csv()
|
||||
|
||||
def build_stacked_graph(self):
|
||||
fig = plt.subplots(figsize=self.figsize)
|
||||
@@ -266,7 +266,7 @@ class lf_horizontal_stacked_graph():
|
||||
self.color = _color
|
||||
self.disable_xaxis = _disable_xaxis
|
||||
self.enable_csv = _enable_csv
|
||||
self.lf_csv = LfCSV()
|
||||
self.lf_csv = lf_csv()
|
||||
|
||||
def build_horizontal_stacked_graph(self):
|
||||
def sumzip(items):
|
||||
|
||||
643
py-scripts/lf_webpage.py
Normal file
643
py-scripts/lf_webpage.py
Normal file
@@ -0,0 +1,643 @@
|
||||
"""
|
||||
This script will create 40 clients on 5Ghz , 2.4Ghz and Both and generate layer4 traffic on LANforge ,The Webpage Download Test is designed to test the performance of the Access Point.The goal is to check whether the
|
||||
webpage loading time meets the expectation when clients connected on single radio as well as dual radio.
|
||||
|
||||
how to run -
|
||||
sudo python3 web.py --upstream_port eth1 --num_stations 40 --security open --ssid ASUSAP --passwd [BLANK] --target_per_ten 1 --bands 5G --file_size 10Mb --fiveg_radio wiphy0 --twog_radio wiphy1 --duration 1
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
04 - April - 2021
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import paramiko
|
||||
import argparse
|
||||
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append('../py-json')
|
||||
from LANforge import LFUtils
|
||||
from LANforge import lfcli_base
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge.LFUtils import *
|
||||
import realm
|
||||
from realm import Realm
|
||||
from realm import PortUtils
|
||||
|
||||
from webpage_report import *
|
||||
from lf_report import *
|
||||
from lf_graph import *
|
||||
|
||||
class HttpDownload(Realm):
|
||||
def __init__(self, lfclient_host, lfclient_port, upstream, num_sta, security, ssid, password,
|
||||
target_per_ten, file_size, bands, start_id=0, twog_radio=None, fiveg_radio=None, _debug_on=False, _exit_on_error=False,
|
||||
_exit_on_fail=False):
|
||||
self.host = lfclient_host
|
||||
self.port = lfclient_port
|
||||
self.upstream = upstream
|
||||
self.num_sta = num_sta
|
||||
#self.radio = radio
|
||||
self.security = security
|
||||
self.ssid = ssid
|
||||
self.sta_start_id = start_id
|
||||
self.password = password
|
||||
#self.url = url
|
||||
self.twog_radio = twog_radio
|
||||
self.fiveg_radio = fiveg_radio
|
||||
self.target_per_ten = target_per_ten
|
||||
self.file_size = file_size
|
||||
self.bands = bands
|
||||
self.debug = _debug_on
|
||||
print(bands)
|
||||
|
||||
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
||||
self.station_profile = self.local_realm.new_station_profile()
|
||||
self.http_profile = self.local_realm.new_http_profile()
|
||||
self.http_profile.requests_per_ten = self.target_per_ten
|
||||
#self.http_profile.url = self.url
|
||||
self.port_util = PortUtils(self.local_realm)
|
||||
self.http_profile.debug = _debug_on
|
||||
self.created_cx = {}
|
||||
|
||||
def set_values(self):
|
||||
# This method will set values according user input
|
||||
if self.bands == "5G":
|
||||
self.radio = [self.fiveg_radio]
|
||||
elif self.bands == "2.4G":
|
||||
self.radio = [self.twog_radio]
|
||||
elif self.bands == "Both":
|
||||
self.radio = [self.fiveg_radio, self.twog_radio]
|
||||
print("hello", self.radio)
|
||||
self.num_sta = self.num_sta // 2
|
||||
|
||||
def precleanup(self):
|
||||
self.count = 0
|
||||
try:
|
||||
self.local_realm.load("BLANK")
|
||||
except:
|
||||
print("couldn't load 'BLANK' Test Configuration")
|
||||
|
||||
#print("hi", self.radio)
|
||||
|
||||
for rad in self.radio:
|
||||
print("radio", rad)
|
||||
if rad == self.fiveg_radio:
|
||||
# select an mode
|
||||
self.station_profile.mode = 10
|
||||
self.count = self.count + 1
|
||||
elif rad == self.twog_radio:
|
||||
# select an mode
|
||||
self.station_profile.mode = 6
|
||||
self.count = self.count + 1
|
||||
|
||||
if self.count == 2:
|
||||
self.sta_start_id = self.num_sta
|
||||
self.num_sta = 2 * (self.num_sta)
|
||||
self.station_profile.mode = 10
|
||||
self.http_profile.cleanup()
|
||||
self.station_list1 = LFUtils.portNameSeries(prefix_="sta", start_id_=self.sta_start_id,
|
||||
end_id_=self.num_sta - 1, padding_number_=10000,
|
||||
radio=rad)
|
||||
# cleanup station list which started sta_id 20
|
||||
self.station_profile.cleanup(self.station_list1, debug_=self.local_realm.debug)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.local_realm.lfclient_url,
|
||||
port_list=self.station_list,
|
||||
debug=self.local_realm.debug)
|
||||
return
|
||||
# clean dlayer4 ftp traffic
|
||||
self.http_profile.cleanup()
|
||||
self.station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=self.sta_start_id,
|
||||
end_id_=self.num_sta - 1, padding_number_=10000,
|
||||
radio=rad)
|
||||
# cleans stations
|
||||
self.station_profile.cleanup(self.station_list, delay=1, debug_=self.local_realm.debug)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.local_realm.lfclient_url,
|
||||
port_list=self.station_list,
|
||||
debug=self.local_realm.debug)
|
||||
time.sleep(1)
|
||||
print("precleanup done")
|
||||
|
||||
def build(self):
|
||||
# enable http on ethernet
|
||||
self.port_util.set_http(port_name=self.local_realm.name_to_eid(self.upstream)[2], resource=1, on=True)
|
||||
|
||||
# enable dhcp to ethernet
|
||||
data = {
|
||||
"shelf": 1,
|
||||
"resource": 1,
|
||||
"port": "eth1",
|
||||
"current_flags": 2147483648,
|
||||
"interest": 16384
|
||||
|
||||
}
|
||||
url = "/cli-json/set_port"
|
||||
self.local_realm.json_post(url, data, debug_=True)
|
||||
time.sleep(10)
|
||||
|
||||
for rad in self.radio:
|
||||
|
||||
# station build
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
self.station_profile.create(radio=rad, sta_names_=self.station_list, debug=self.local_realm.debug)
|
||||
self.local_realm.wait_until_ports_appear(sta_list=self.station_list)
|
||||
self.station_profile.admin_up()
|
||||
if self.local_realm.wait_for_ip(self.station_list,timeout_sec=60):
|
||||
self.local_realm._pass("All stations got IPs")
|
||||
else:
|
||||
self.local_realm._fail("Stations failed to get IPs")
|
||||
# building layer4
|
||||
self.http_profile.direction = 'dl'
|
||||
self.http_profile.dest = '/dev/null'
|
||||
data = self.local_realm.json_get("ports/list?fields=IP")
|
||||
|
||||
# getting eth ip
|
||||
for i in data["interfaces"]:
|
||||
for j in i:
|
||||
if "1.1." + self.upstream == j:
|
||||
ip_upstream = i["1.1." + self.upstream]['ip']
|
||||
|
||||
# create http profile
|
||||
self.http_profile.create(ports=self.station_profile.station_names, sleep_time=.5,
|
||||
suppress_related_commands_=None, http=True,
|
||||
http_ip=ip_upstream + "/webpage.html")
|
||||
if self.count == 2:
|
||||
self.station_list = self.station_list1
|
||||
self.station_profile.mode = 6
|
||||
print("Test Build done")
|
||||
|
||||
def start(self):
|
||||
print("Test Started")
|
||||
self.http_profile.start_cx()
|
||||
try:
|
||||
for i in self.http_profile.created_cx.keys():
|
||||
while self.local_realm.json_get("/cx/" + i).get(i).get('state') != 'Run':
|
||||
continue
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
def my_monitor(self):
|
||||
# data in json format
|
||||
data = self.local_realm.json_get("layer4/list?fields=uc-avg")
|
||||
data1 = []
|
||||
for i in range(len(data['endpoint'])):
|
||||
data1.append(str(list(data['endpoint'][i]))[2:-2])
|
||||
# print(data1)
|
||||
data2 = []
|
||||
for i in range(self.num_sta):
|
||||
data = self.local_realm.json_get("layer4/list?fields=uc-avg")
|
||||
# print(type(data['endpoint'][i][data1[i]]['uc-avg']))
|
||||
data2.append((data['endpoint'][i][data1[i]]['uc-avg']))
|
||||
|
||||
# print("downloading time for all clients", data2)
|
||||
return data2
|
||||
|
||||
def postcleanup(self):
|
||||
# for rad in self.radio
|
||||
self.http_profile.cleanup()
|
||||
self.station_profile.cleanup()
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
|
||||
debug=self.debug)
|
||||
|
||||
def file_create(self):
|
||||
os.chdir('/usr/local/lanforge/nginx/html/')
|
||||
if os.path.isfile("/usr/local/lanforge/nginx/html/webpage.html"):
|
||||
os.system("sudo rm /usr/local/lanforge/nginx/html/webpage.html")
|
||||
os.system("sudo fallocate -l " + self.file_size + " /usr/local/lanforge/nginx/html/webpage.html")
|
||||
print("File creation done", self.file_size)
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="lanforge webpage download Test Script")
|
||||
parser.add_argument('--mgr', help='hostname for where LANforge GUI is running', default='localhost')
|
||||
parser.add_argument('--mgr_port', help='port LANforge GUI HTTP service is running on', default=8080)
|
||||
parser.add_argument('--upstream_port', help='non-station port that generates traffic: eg: eth1', default='eth2')
|
||||
parser.add_argument('--num_stations', type=int, help='number of stations to create', default=1)
|
||||
parser.add_argument('--twog_radio', help='specify radio for 2.4G clients', default='wiphy3')
|
||||
parser.add_argument('--fiveg_radio', help='specify radio for 5 GHz client', default='wiphy0')
|
||||
parser.add_argument('--security', help='WiFi Security protocol: {open|wep|wpa2|wpa3')
|
||||
parser.add_argument('--ssid', help='WiFi SSID for script object to associate to')
|
||||
parser.add_argument('--passwd', help='WiFi passphrase/password/key')
|
||||
parser.add_argument('--target_per_ten', help='number of request per 10 minutes', default=100)
|
||||
parser.add_argument('--file_size', type=str, help='specify the size of file you want to download', default='5Mb')
|
||||
parser.add_argument('--bands', nargs="+", help='--bands', default=["5G", "2.4G", "Both"])
|
||||
parser.add_argument('--duration', type=int, help='time to run traffic')
|
||||
parser.add_argument('--threshold_5g',help="Enter the threshold value for 5G Pass/Fail criteria", default="60")
|
||||
parser.add_argument('--threshold_2g',help="Enter the threshold value for 2.4G Pass/Fail criteria",default="90")
|
||||
parser.add_argument('--threshold_both',help="Enter the threshold value for Both Pass/Fail criteria" , default="50")
|
||||
parser.add_argument('--ap_name', help="specify the ap model ", default="TestAP")
|
||||
|
||||
args = parser.parse_args()
|
||||
test_time = datetime.datetime.now()
|
||||
test_time = test_time.strftime("%b %d %H:%M:%S")
|
||||
print("Test started at ", test_time)
|
||||
list5G = []
|
||||
list2G = []
|
||||
Both = []
|
||||
dict_keys = []
|
||||
dict_keys.extend(args.bands)
|
||||
# print(dict_keys)
|
||||
final_dict = dict.fromkeys(dict_keys)
|
||||
# print(final_dict)
|
||||
dict1_keys = ['dl_time', 'min', 'max', 'avg']
|
||||
for i in final_dict:
|
||||
final_dict[i] = dict.fromkeys(dict1_keys)
|
||||
print(final_dict)
|
||||
min5 = []
|
||||
min2 = []
|
||||
min_both = []
|
||||
max5 = []
|
||||
max2 = []
|
||||
max_both = []
|
||||
avg2 = []
|
||||
avg5 = []
|
||||
avg_both = []
|
||||
|
||||
#ap = AP_automate(args.ap_ip, args.user, args.pswd)
|
||||
for bands in args.bands:
|
||||
http = HttpDownload(lfclient_host=args.mgr, lfclient_port=args.mgr_port,
|
||||
upstream=args.upstream_port, num_sta=args.num_stations,
|
||||
security=args.security,
|
||||
ssid=args.ssid, password=args.passwd,
|
||||
target_per_ten=args.target_per_ten,
|
||||
file_size=args.file_size, bands=bands,
|
||||
twog_radio=args.twog_radio,
|
||||
fiveg_radio=args.fiveg_radio
|
||||
)
|
||||
#chttp.file_create()
|
||||
http.set_values()
|
||||
http.precleanup()
|
||||
http.build()
|
||||
http.start()
|
||||
duration = args.duration
|
||||
duration = 60 * duration
|
||||
print("time in seconds ", duration)
|
||||
time.sleep(duration)
|
||||
value = http.my_monitor()
|
||||
|
||||
if bands == "5G":
|
||||
list5G.extend(value)
|
||||
# print("5G", list5G)
|
||||
final_dict['5G']['dl_time'] = list5G
|
||||
min5.append(min(list5G))
|
||||
final_dict['5G']['min'] = min5
|
||||
max5.append(max(list5G))
|
||||
final_dict['5G']['max'] = max5
|
||||
avg5.append((sum(list5G) / args.num_stations))
|
||||
final_dict['5G']['avg'] = avg5
|
||||
elif bands == "2.4G":
|
||||
list2G.extend(value)
|
||||
print("2.4G", list2G)
|
||||
final_dict['2.4G']['dl_time'] = list2G
|
||||
min2.append(min(list2G))
|
||||
final_dict['2.4G']['min'] = min2
|
||||
max2.append(max(list2G))
|
||||
final_dict['2.4G']['max'] = max2
|
||||
avg2.append((sum(list2G) / args.num_stations))
|
||||
final_dict['2.4G']['avg'] = avg2
|
||||
elif bands == "Both":
|
||||
Both.extend(value)
|
||||
print("both", Both)
|
||||
final_dict['Both']['dl_time'] = Both
|
||||
min_both.append(min(Both))
|
||||
final_dict['Both']['min'] = min_both
|
||||
max_both.append(max(Both))
|
||||
final_dict['Both']['max'] = max_both
|
||||
avg_both.append((sum(Both) /args.num_stations))
|
||||
final_dict['Both']['avg'] = avg_both
|
||||
print("final list", final_dict)
|
||||
result_data = final_dict
|
||||
print("result", result_data)
|
||||
print("Test Finished")
|
||||
test_end = datetime.datetime.now()
|
||||
test_end = test_end.strftime("%b %d %H:%M:%S")
|
||||
print("Test ended at ", test_end)
|
||||
s1 = test_time
|
||||
s2 = test_end # for example
|
||||
FMT = '%b %d %H:%M:%S'
|
||||
test_duration = datetime.datetime.strptime(s2, FMT) - datetime.datetime.strptime(s1, FMT)
|
||||
#test_duration = "2:04:34"
|
||||
print("total test duration ", test_duration)
|
||||
date = str(datetime.datetime.now()).split(",")[0].replace(" ", "-").split(".")[0]
|
||||
test_setup_info = {
|
||||
"DUT Name": args.ap_name,
|
||||
"SSID": args.ssid,
|
||||
"Test Duration": test_duration,
|
||||
|
||||
}
|
||||
test_input_infor={
|
||||
"File Size" : args.file_size,
|
||||
"Bands" : args.bands,
|
||||
"Upstream": args.upstream_port,
|
||||
"Stations": args.num_stations,
|
||||
"SSID" :args.ssid,
|
||||
"Security": args.security,
|
||||
"Duration" : args.duration,
|
||||
"Contact": "support@candelatech.com"
|
||||
}
|
||||
|
||||
report = lf_report(_results_dir_name="webpage_test")
|
||||
report.set_title("WEBPAGE DOWNLOAD TEST")
|
||||
report.set_date(date)
|
||||
report.build_banner()
|
||||
report.set_table_title("Test Setup Information")
|
||||
report.build_table_title()
|
||||
report.test_setup_table(value="Device under test",test_setup_data= test_setup_info)
|
||||
|
||||
report.set_obj_html("Objective",
|
||||
"The Webpage Download Test is designed to test the performance of the Access Point.The goal is to check whether the webpage loading time meets the expectation when clients connected on single radio as well as dual radio")
|
||||
report.build_objective()
|
||||
report.set_obj_html("Download Time Graph","The below graph provides information about the average download time taken by each client for duration of "+ str(args.duration) + " min")
|
||||
report.build_objective()
|
||||
|
||||
download_time = dict.fromkeys(result_data.keys())
|
||||
for i in download_time:
|
||||
try:
|
||||
download_time[i] = result_data[i]['dl_time']
|
||||
except:
|
||||
download_time[i] = []
|
||||
print(download_time)
|
||||
lst = []
|
||||
lst1 = []
|
||||
lst2 = []
|
||||
dwnld_time = dict.fromkeys(result_data.keys())
|
||||
dataset = []
|
||||
for i in download_time:
|
||||
if i == "5G":
|
||||
for j in download_time[i]:
|
||||
x = (j / 1000)
|
||||
y = round(x, 2)
|
||||
lst.append(y)
|
||||
print(lst)
|
||||
dwnld_time["5G"] = lst
|
||||
dataset.append(dwnld_time["5G"])
|
||||
if i == "2.4G":
|
||||
for j in download_time[i]:
|
||||
x = (j / 1000)
|
||||
y = round(x, 2)
|
||||
lst1.append(y)
|
||||
print(lst)
|
||||
dwnld_time["2.4G"] = lst1
|
||||
dataset.append(dwnld_time["2.4G"])
|
||||
if i == "Both":
|
||||
# print("yes", download_time[i])
|
||||
for j in download_time[i]:
|
||||
x = (j / 1000)
|
||||
y = round(x, 2)
|
||||
lst2.append(y)
|
||||
# print(lst2)
|
||||
dwnld_time["Both"] = lst2
|
||||
dataset.append(dwnld_time["Both"] )
|
||||
|
||||
print("download time",dwnld_time)
|
||||
#dataset = [dwnld_time['5G'], dwnld_time['2.4G'], dwnld_time['Both']]
|
||||
lis = []
|
||||
for i in range(1, args.num_stations + 1):
|
||||
print(i)
|
||||
lis.append(i)
|
||||
|
||||
graph = lf_bar_graph(_data_set=dataset, _xaxis_name="Stations", _yaxis_name="Time in Seconds",
|
||||
_xaxis_categories=lis, _label=args.bands, _xticks_font=7,
|
||||
_graph_image_name="webpage",
|
||||
_color=['forestgreen', 'darkorange', 'blueviolet'], _color_edge='black', _figsize=(10, 5),
|
||||
_grp_title="Download time taken by each client", _xaxis_step=1, )
|
||||
graph_png = graph.build_bar_graph()
|
||||
print("graph name {}".format(graph_png))
|
||||
report.set_graph_image(graph_png)
|
||||
report.move_graph_image()
|
||||
report.build_graph()
|
||||
|
||||
report.set_obj_html("Summary Table Description",
|
||||
"This Table shows you the summary result of Webpage Download Test as PASS or FAIL criteria. If the average time taken by " + str(args.num_stations) + " clients to access the webpage is less than " + str(args.threshold_2g) + "s it's a PASS criteria for 2.4 ghz clients, If the average time taken by " + "" + str(args.num_stations) + " clients to access the webpage is less than " + str(args.threshold_5g) + "s it's a PASS criteria for 5 ghz clients and If the average time taken by " + str(args.num_stations) + " clients to access the webpage is less than " + str(args.threshold_both) + "s it's a PASS criteria for 2.4 ghz and 5ghz clients")
|
||||
|
||||
report.build_objective()
|
||||
x1= []
|
||||
html_struct = dict.fromkeys(list(result_data.keys()))
|
||||
for fcc in list(result_data.keys()):
|
||||
fcc_type = result_data[fcc]["avg"]
|
||||
print(fcc_type)
|
||||
for i in fcc_type:
|
||||
x1.append(i)
|
||||
# print(x)
|
||||
y11 = []
|
||||
for i in x1:
|
||||
i = i / 1000
|
||||
y11.append(i)
|
||||
# print(y)
|
||||
z11 = []
|
||||
for i in y11:
|
||||
i = str(round(i, 2))
|
||||
z11.append(i)
|
||||
print(z11)
|
||||
pass_fail_list = []
|
||||
# print(list(result_data.keys()))
|
||||
sumry2 = []
|
||||
sumry5 = []
|
||||
sumryB = []
|
||||
data = []
|
||||
if args.bands == ["5G", "2.4G", "Both"]:
|
||||
print("yes")
|
||||
# 5G
|
||||
if float(z11[0]) < float(args.threshold_5g):
|
||||
print("PASS")
|
||||
pass_fail_list.append("PASS")
|
||||
sumry5.append("PASS")
|
||||
elif float(z11[0]) == 0.0 or float(z11[0]) > float(args.threshold_5g):
|
||||
print("FAIL")
|
||||
pass_fail_list.append("FAIL")
|
||||
sumry5.append("FAIL")
|
||||
# 2.4g
|
||||
if float(z11[1]) < float(args.threshold_2g):
|
||||
var = "PASS"
|
||||
pass_fail_list.append(var)
|
||||
sumry2.append("PASS")
|
||||
elif float(z11[1]) == 0.0 or float(z11[1]) > float(args.threshold_2g):
|
||||
pass_fail_list.append("FAIL")
|
||||
sumry2.append("FAIL")
|
||||
# BOTH
|
||||
if float(z11[2]) < float(args.threshold_both):
|
||||
var = "PASS"
|
||||
pass_fail_list.append(var)
|
||||
sumryB.append("PASS")
|
||||
elif float(z11[2]) == 0.0 or float(z11[2]) > float(args.threshold_both):
|
||||
pass_fail_list.append("FAIL")
|
||||
sumryB.append("FAIL")
|
||||
|
||||
data.append(','.join(sumry5))
|
||||
data.append(','.join(sumry2))
|
||||
data.append(','.join(sumryB))
|
||||
|
||||
elif args.bands == ['5G']:
|
||||
if float(z11[0]) < float(args.threshold_5g):
|
||||
print("PASS")
|
||||
pass_fail_list.append("PASS")
|
||||
sumry5.append("PASS")
|
||||
elif float(z11[0]) == 0.0 or float(z11[0]) > float(args.threshold_5g):
|
||||
print("FAIL")
|
||||
pass_fail_list.append("FAIL")
|
||||
sumry5.append("FAIL")
|
||||
data.append(','.join(sumry5))
|
||||
|
||||
elif args.bands == ['2.4G']:
|
||||
if float(z11[0]) < float(args.threshold_2g):
|
||||
var = "PASS"
|
||||
pass_fail_list.append(var)
|
||||
sumry2.append("PASS")
|
||||
elif float(z11[0]) == 0.0 or float(z11[0]) > float(args.threshold_2g):
|
||||
pass_fail_list.append("FAIL")
|
||||
sumry2.append("FAIL")
|
||||
data.append(','.join(sumry2))
|
||||
elif args.bands == ["Both"]:
|
||||
if float(z11[0]) < float(args.threshold_both):
|
||||
var = "PASS"
|
||||
pass_fail_list.append(var)
|
||||
sumryB.append("PASS")
|
||||
elif float(z11[0]) == 0.0 or float(z11[0]) > float(args.threshold_both):
|
||||
pass_fail_list.append("FAIL")
|
||||
sumryB.append("FAIL")
|
||||
data.append(','.join(sumryB))
|
||||
elif args.bands == ['5G', '2.4G']:
|
||||
if float(z11[0]) < float(args.threshold_5g):
|
||||
print("PASS")
|
||||
pass_fail_list.append("PASS")
|
||||
sumry5.append("PASS")
|
||||
elif float(z11[0]) == 0.0 or float(z11[0]) > float(args.threshold_5g):
|
||||
print("FAIL")
|
||||
pass_fail_list.append("FAIL")
|
||||
sumry5.append("FAIL")
|
||||
# 2.4g
|
||||
if float(z11[1]) < float(args.threshold_2g):
|
||||
var = "PASS"
|
||||
pass_fail_list.append(var)
|
||||
sumry2.append("PASS")
|
||||
elif float(z11[1]) == 0.0 or float(z11[1]) > float(args.threshold_2g):
|
||||
pass_fail_list.append("FAIL")
|
||||
sumry2.append("FAIL")
|
||||
data.append(','.join(sumry5))
|
||||
data.append(','.join(sumry2))
|
||||
|
||||
elif args.bands == ['5G', 'Both']:
|
||||
if float(z11[0]) < float(args.threshold_5g):
|
||||
print("PASS")
|
||||
pass_fail_list.append("PASS")
|
||||
sumry5.append("PASS")
|
||||
elif float(z11[0]) == 0.0 or float(z11[0]) > float(args.threshold_5g):
|
||||
print("FAIL")
|
||||
pass_fail_list.append("FAIL")
|
||||
sumry5.append("FAIL")
|
||||
if float(z11[1]) < float(args.threshold_both):
|
||||
var = "PASS"
|
||||
pass_fail_list.append(var)
|
||||
sumryB.append("PASS")
|
||||
elif float(z11[1]) == 0.0 or float(z11[1]) > float(args.threshold_both):
|
||||
pass_fail_list.append("FAIL")
|
||||
sumryB.append("FAIL")
|
||||
|
||||
data.append(','.join(sumry5))
|
||||
data.append(','.join(sumryB))
|
||||
|
||||
elif args.bands == ['2.4G', 'Both']:
|
||||
if float(z11[0]) < float(args.threshold_2g):
|
||||
var = "PASS"
|
||||
pass_fail_list.append(var)
|
||||
sumry2.append("PASS")
|
||||
elif float(z11[0]) == 0.0 or float(z11[0]) > float(args.threshold_2g):
|
||||
pass_fail_list.append("FAIL")
|
||||
sumry2.append("FAIL")
|
||||
if float(z11[1]) < float(args.threshold_both):
|
||||
var = "PASS"
|
||||
pass_fail_list.append(var)
|
||||
sumryB.append("PASS")
|
||||
elif float(z11[1]) == 0.0 or float(z11[1]) > float(args.threshold_both):
|
||||
pass_fail_list.append("FAIL")
|
||||
sumryB.append("FAIL")
|
||||
data.append(','.join(sumry2))
|
||||
data.append(','.join(sumryB))
|
||||
|
||||
#print("SUMMARRY",sumry2, sumry5, sumryB)
|
||||
#print(data)
|
||||
#[result_data.keys()]
|
||||
summary_table_value = {
|
||||
"": args.bands,
|
||||
"PASS/FAIL": data
|
||||
}
|
||||
test_setup1 = pd.DataFrame(summary_table_value)
|
||||
report.set_table_dataframe(test_setup1)
|
||||
report.build_table()
|
||||
|
||||
report.set_obj_html("Download Time Table Description",
|
||||
"This Table will provide you information of the minimum, maximum and the average time taken by clients to download a webpage in seconds")
|
||||
|
||||
report.build_objective()
|
||||
x = []
|
||||
for fcc in list(result_data.keys()):
|
||||
fcc_type = result_data[fcc]["min"]
|
||||
#print(fcc_type)
|
||||
for i in fcc_type:
|
||||
x.append(i)
|
||||
#print(x)
|
||||
y = []
|
||||
for i in x:
|
||||
i = i / 1000
|
||||
y.append(i)
|
||||
z = []
|
||||
for i in y:
|
||||
i = str(round(i, 2))
|
||||
z.append(i)
|
||||
#rint(z)
|
||||
x1 = []
|
||||
|
||||
for fcc in list(result_data.keys()):
|
||||
fcc_type = result_data[fcc]["max"]
|
||||
#print(fcc_type)
|
||||
for i in fcc_type:
|
||||
x1.append(i)
|
||||
#print(x1)
|
||||
y1 = []
|
||||
for i in x1:
|
||||
i = i / 1000
|
||||
y1.append(i)
|
||||
z1 = []
|
||||
for i in y1:
|
||||
i = str(round(i, 2))
|
||||
z1.append(i)
|
||||
#print(z1)
|
||||
x2 = []
|
||||
|
||||
for fcc in list(result_data.keys()):
|
||||
fcc_type = result_data[fcc]["avg"]
|
||||
#print(fcc_type)
|
||||
for i in fcc_type:
|
||||
x2.append(i)
|
||||
#print(x2)
|
||||
y2 = []
|
||||
for i in x2:
|
||||
i = i / 1000
|
||||
y2.append(i)
|
||||
z2 = []
|
||||
for i in y2:
|
||||
i = str(round(i, 2))
|
||||
z2.append(i)
|
||||
#print(z2)
|
||||
#[result_data.keys()]
|
||||
download_table_value= {
|
||||
"" : args.bands,
|
||||
"Minimum" : z,
|
||||
"Maximum" : z1,
|
||||
"Average" : z2
|
||||
|
||||
}
|
||||
test_setup = pd.DataFrame(download_table_value)
|
||||
report.set_table_dataframe(test_setup)
|
||||
report.build_table()
|
||||
report.set_table_title("Test input Information")
|
||||
report.build_table_title()
|
||||
report.test_setup_table(value="Information", test_setup_data=test_input_infor)
|
||||
report.build_footer()
|
||||
html_file = report.write_html()
|
||||
print("returned file {}".format(html_file))
|
||||
print(html_file)
|
||||
report.write_pdf()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -357,8 +357,7 @@ class WiFiCapacityTest(cv_test):
|
||||
report_dir="",
|
||||
graph_groups=None,
|
||||
test_rig="",
|
||||
local_lf_report_dir="",
|
||||
debug=False,
|
||||
local_lf_report_dir=""
|
||||
):
|
||||
super().__init__(lfclient_host=lfclient_host, lfclient_port=lf_port)
|
||||
|
||||
@@ -397,7 +396,6 @@ class WiFiCapacityTest(cv_test):
|
||||
self.graph_groups = graph_groups
|
||||
self.test_rig = test_rig
|
||||
self.local_lf_report_dir = local_lf_report_dir
|
||||
self.debug = debug
|
||||
|
||||
def setup(self):
|
||||
if self.create_stations and self.stations != "":
|
||||
@@ -475,8 +473,7 @@ class WiFiCapacityTest(cv_test):
|
||||
self.create_and_run_test(self.load_old_cfg, self.test_name, self.instance_name,
|
||||
self.config_name, self.sets,
|
||||
self.pull_report, self.lfclient_host, self.lf_user, self.lf_password,
|
||||
cv_cmds, graph_groups_file=self.graph_groups, local_lf_report_dir=self.local_lf_report_dir,
|
||||
debug=self.debug)
|
||||
cv_cmds, graph_groups_file=self.graph_groups, local_lf_report_dir=self.local_lf_report_dir)
|
||||
|
||||
self.rm_text_blob(self.config_name, blob_test) # To delete old config with same name
|
||||
|
||||
@@ -531,7 +528,6 @@ def main():
|
||||
parser.add_argument("--scenario", default="")
|
||||
parser.add_argument("--graph_groups", help="File to save graph groups to", default=None)
|
||||
parser.add_argument("--local_lf_report_dir", help="--local_lf_report_dir <where to pull reports to> default '' put where dataplane script run from",default="")
|
||||
parser.add_argument("--debug", default=False)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -566,8 +562,7 @@ def main():
|
||||
sets=args.set,
|
||||
graph_groups=args.graph_groups,
|
||||
test_rig=args.test_rig,
|
||||
local_lf_report_dir=args.local_lf_report_dir,
|
||||
debug=args.debug
|
||||
local_lf_report_dir=args.local_lf_report_dir
|
||||
)
|
||||
WFC_Test.setup()
|
||||
WFC_Test.run()
|
||||
|
||||
@@ -58,6 +58,7 @@ import csv
|
||||
import shutil
|
||||
from os import path
|
||||
import shlex
|
||||
import paramiko
|
||||
|
||||
# lf_report is from the parent of the current file
|
||||
dir_path = os.path.dirname(os.path.realpath(__file__))
|
||||
@@ -132,8 +133,10 @@ class lf_check():
|
||||
self.host_ip_test = None
|
||||
self.email_title_txt = ""
|
||||
self.email_txt = ""
|
||||
self.lf_mgr_ip = ""
|
||||
self.lf_mgr_ip = "192.168.0.102"
|
||||
self.lf_mgr_port = ""
|
||||
self.lf_mgr_user = "lanforge"
|
||||
self.lf_mgr_pass = "lanforge"
|
||||
self.dut_name = "" # "ASUSRT-AX88U" note this is not dut_set_name
|
||||
self.dut_bssid = "" # "3c:7c:3f:55:4d:64" - this is the mac for the radio this may be seen with a scan
|
||||
|
||||
@@ -173,6 +176,40 @@ class lf_check():
|
||||
|
||||
self.test_run = ""
|
||||
|
||||
def get_scripts_git_sha(self):
|
||||
# get git sha
|
||||
process = subprocess.Popen(["git", "rev-parse", "HEAD"], stdout=subprocess.PIPE)
|
||||
(commit_hash, err) = process.communicate()
|
||||
exit_code = process.wait()
|
||||
scripts_git_sha = commit_hash.decode('utf-8', 'ignore')
|
||||
return scripts_git_sha
|
||||
|
||||
def get_lanforge_kernel_version(self):
|
||||
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
|
||||
# ssh.connect(self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass, banner_timeout=600)
|
||||
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
|
||||
banner_timeout=600)
|
||||
stdin, stdout, stderr = ssh.exec_command('uname -r')
|
||||
output = stdout.readlines()
|
||||
# print('\n'.join(output))
|
||||
ssh.close()
|
||||
time.sleep(1)
|
||||
return output
|
||||
|
||||
def get_lanforge_gui_version(self):
|
||||
output = ""
|
||||
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
|
||||
ssh.connect(hostname=self.lf_mgr_ip, port=22, username=self.lf_mgr_user, password=self.lf_mgr_pass,
|
||||
banner_timeout=600)
|
||||
stdin, stdout, stderr = ssh.exec_command('./btserver --version | grep Version')
|
||||
output = stdout.readlines()
|
||||
# print('\n'.join(output))
|
||||
ssh.close()
|
||||
time.sleep(1)
|
||||
return output
|
||||
|
||||
# NOT complete : will send the email results
|
||||
def send_results_email(self, report_file=None):
|
||||
if (report_file is None):
|
||||
@@ -963,8 +1000,7 @@ Example :
|
||||
default="lf_check_config.ini")
|
||||
parser.add_argument('--json', help="--json <lf_ckeck_config.json file> ", default="lf_check_config.json")
|
||||
parser.add_argument('--use_json', help="--use_json ", action='store_true')
|
||||
parser.add_argument('--suite', '--test_suite', help="--suite <suite name> default TEST_DICTIONARY",
|
||||
default="TEST_DICTIONARY")
|
||||
parser.add_argument('--suite', help="--suite <suite name> default TEST_DICTIONARY", default="TEST_DICTIONARY")
|
||||
parser.add_argument('--production', help="--production stores true, sends email results to production email list",
|
||||
action='store_true')
|
||||
parser.add_argument('--outfile', help="--outfile <Output Generic Name> used as base name for all files generated",
|
||||
@@ -1031,7 +1067,25 @@ Example :
|
||||
exit_code = process.wait()
|
||||
git_sha = commit_hash.decode('utf-8', 'ignore')
|
||||
|
||||
# set up logging
|
||||
try:
|
||||
scripts_git_sha = check.get_scripts_git_sha()
|
||||
print("git_sha {sha}".format(sha=scripts_git_sha))
|
||||
except:
|
||||
print("git_sha read exception ")
|
||||
|
||||
try:
|
||||
lanforge_kernel_version = check.get_lanforge_kernel_version()
|
||||
print("lanforge_kernel_version {kernel_ver}".format(kernel_ver=lanforge_kernel_version))
|
||||
except:
|
||||
print("lanforge_kernel_version exception")
|
||||
|
||||
try:
|
||||
lanforge_gui_version = check.get_lanforge_gui_version()
|
||||
print("lanforge_gui_version {gui_ver}".format(gui_ver=lanforge_gui_version))
|
||||
except:
|
||||
print("lanforge_gui_version exception")
|
||||
|
||||
# set up logging
|
||||
logfile = args.logfile[:-4]
|
||||
print("logfile: {}".format(logfile))
|
||||
logfile = "{}-{}.log".format(logfile, current_time)
|
||||
@@ -1053,6 +1107,8 @@ Example :
|
||||
check.read_config()
|
||||
check.run_script_test()
|
||||
|
||||
# read lanforge
|
||||
|
||||
# generate output reports
|
||||
report.set_title("LF Check: lf_check.py")
|
||||
report.build_banner()
|
||||
|
||||
Reference in New Issue
Block a user