mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 19:28:00 +00:00
885 lines
39 KiB
Python
Executable File
885 lines
39 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""This script will create 40 clients on 5Ghz , 2.4Ghz and Both and generate layer4 traffic on LANforge ,The Webpage
|
|
Download Test is designed to test the performance of the Access Point.The goal is to check whether the webpage
|
|
loading time meets the expectation when clients connected on single radio as well as dual radio.
|
|
|
|
how to run - python3 lf_webpage.py --mgr 192.168.200.29 --mgr_port 8080 --upstream_port eth1 --num_stations 10
|
|
--security open --ssid testap210 --passwd [BLANK] --target_per_ten 1 --bands 5G --file_size 10MB --fiveg_radio
|
|
wiphy0 --twog_radio wiphy1 --duration 1 Copyright 2021 Candela Technologies Inc 04 - April - 2021 """
|
|
import sys
|
|
import os
|
|
import importlib
|
|
import time
|
|
import argparse
|
|
import paramiko
|
|
from datetime import datetime
|
|
import pandas as pd
|
|
|
|
|
|
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
|
|
|
|
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
|
|
realm = importlib.import_module("py-json.realm")
|
|
Realm = realm.Realm
|
|
PortUtils = realm.PortUtils
|
|
lf_report = importlib.import_module("py-scripts.lf_report")
|
|
lf_graph = importlib.import_module("py-scripts.lf_graph")
|
|
lf_kpi_csv = importlib.import_module("py-scripts.lf_kpi_csv")
|
|
|
|
|
|
class HttpDownload(Realm):
|
|
def __init__(self, lfclient_host, lfclient_port, upstream, num_sta, security, ssid, password,
|
|
target_per_ten, file_size, bands, start_id=0, twog_radio=None, fiveg_radio=None, _debug_on=False, _exit_on_error=False,
|
|
_exit_on_fail=False):
|
|
self.host = lfclient_host
|
|
self.port = lfclient_port
|
|
self.upstream = upstream
|
|
self.num_sta = num_sta
|
|
self.security = security
|
|
self.ssid = ssid
|
|
self.sta_start_id = start_id
|
|
self.password = password
|
|
self.twog_radio = twog_radio
|
|
self.fiveg_radio = fiveg_radio
|
|
self.target_per_ten = target_per_ten
|
|
self.file_size = file_size
|
|
self.bands = bands
|
|
self.debug = _debug_on
|
|
print(bands)
|
|
|
|
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
|
self.station_profile = self.local_realm.new_station_profile()
|
|
self.http_profile = self.local_realm.new_http_profile()
|
|
self.http_profile.requests_per_ten = self.target_per_ten
|
|
# self.http_profile.url = self.url
|
|
self.port_util = PortUtils(self.local_realm)
|
|
self.http_profile.debug = _debug_on
|
|
self.created_cx = {}
|
|
self.station_list = []
|
|
self.radio = []
|
|
|
|
def set_values(self):
|
|
# This method will set values according user input
|
|
if self.bands == "5G":
|
|
self.radio = [self.fiveg_radio]
|
|
self.station_list = [LFUtils.portNameSeries(prefix_="fiveg_sta", start_id_=self.sta_start_id,
|
|
end_id_=self.num_sta - 1, padding_number_=10000,
|
|
radio=self.fiveg_radio)]
|
|
elif self.bands == "2.4G":
|
|
self.radio = [self.twog_radio]
|
|
self.station_list = [LFUtils.portNameSeries(prefix_="twog_sta", start_id_=self.sta_start_id,
|
|
end_id_=self.num_sta - 1, padding_number_=10000,
|
|
radio=self.twog_radio)]
|
|
elif self.bands == "Both":
|
|
self.radio = [self.fiveg_radio, self.twog_radio]
|
|
print(self.radio)
|
|
# self.num_sta = self.num_sta // 2
|
|
self.station_list = [
|
|
LFUtils.portNameSeries(prefix_="fiveg_sta", start_id_=self.sta_start_id,
|
|
end_id_=self.num_sta - 1, padding_number_=10000,
|
|
radio=self.fiveg_radio),
|
|
LFUtils.portNameSeries(prefix_="twog_sta", start_id_=self.sta_start_id,
|
|
end_id_=self.num_sta - 1, padding_number_=10000,
|
|
radio=self.twog_radio)
|
|
]
|
|
|
|
def precleanup(self):
|
|
self.count = 0
|
|
for rad in range(len(self.radio)):
|
|
print("radio", self.radio[rad])
|
|
if self.radio[rad] == self.fiveg_radio:
|
|
# select an mode
|
|
self.station_profile.mode = 10
|
|
self.count = self.count + 1
|
|
elif self.radio[rad] == self.twog_radio:
|
|
# select an mode
|
|
self.station_profile.mode = 6
|
|
self.count = self.count + 1
|
|
|
|
if self.count == 2:
|
|
self.sta_start_id = self.num_sta
|
|
self.num_sta = 2 * (self.num_sta)
|
|
self.station_profile.mode = 10
|
|
self.http_profile.cleanup()
|
|
# cleanup station list which started sta_id 20
|
|
self.station_profile.cleanup(self.station_list[rad], debug_=self.local_realm.debug)
|
|
LFUtils.wait_until_ports_disappear(base_url=self.local_realm.lfclient_url,
|
|
port_list=self.station_list[rad],
|
|
debug=self.local_realm.debug)
|
|
return
|
|
# clean dlayer4 ftp traffic
|
|
self.http_profile.cleanup()
|
|
|
|
# cleans stations
|
|
self.station_profile.cleanup(self.station_list[rad], delay=1, debug_=self.local_realm.debug)
|
|
LFUtils.wait_until_ports_disappear(base_url=self.local_realm.lfclient_url,
|
|
port_list=self.station_list[rad],
|
|
debug=self.local_realm.debug)
|
|
time.sleep(1)
|
|
print("precleanup done")
|
|
|
|
def build(self):
|
|
# enable http on ethernet
|
|
self.port_util.set_http(port_name=self.local_realm.name_to_eid(self.upstream)[2], resource=1, on=True)
|
|
for rad in range(len(self.radio)):
|
|
print(self.station_list[rad])
|
|
self.station_profile.use_security(self.security[rad], self.ssid[rad], self.password[rad])
|
|
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
|
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
|
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
|
self.station_profile.create(radio=self.radio[rad], sta_names_=self.station_list[rad], debug=self.local_realm.debug)
|
|
self.local_realm.wait_until_ports_appear(sta_list=self.station_list[rad])
|
|
self.station_profile.admin_up()
|
|
if self.local_realm.wait_for_ip(self.station_list[rad], timeout_sec=60):
|
|
self.local_realm._pass("All stations got IPs")
|
|
else:
|
|
self.local_realm._fail("Stations failed to get IPs")
|
|
# building layer4
|
|
self.http_profile.direction = 'dl'
|
|
self.http_profile.dest = '/dev/null'
|
|
data = self.local_realm.json_get("ports/list?fields=IP")
|
|
|
|
# getting eth ip
|
|
for i in data["interfaces"]:
|
|
for j in i:
|
|
if "1.1." + self.upstream == j:
|
|
ip_upstream = i["1.1." + self.upstream]['ip']
|
|
|
|
# create http profile
|
|
self.http_profile.create(ports=self.station_profile.station_names, sleep_time=.5,
|
|
suppress_related_commands_=None, http=True,
|
|
http_ip=ip_upstream + "/webpage.html")
|
|
if self.count == 2:
|
|
self.station_profile.mode = 6
|
|
print("Test Build done")
|
|
|
|
def start(self):
|
|
print("Test Started")
|
|
self.http_profile.start_cx()
|
|
try:
|
|
for i in self.http_profile.created_cx.keys():
|
|
while self.local_realm.json_get("/cx/" + i).get(i).get('state') != 'Run':
|
|
continue
|
|
except Exception as e:
|
|
pass
|
|
|
|
def stop(self):
|
|
self.http_profile.stop_cx()
|
|
|
|
def my_monitor(self, data_mon):
|
|
# data in json format
|
|
data = self.local_realm.json_get("layer4/%s/list?fields=%s" %
|
|
(','.join(self.http_profile.created_cx.keys()), data_mon.replace(' ', '+')))
|
|
# print(data)
|
|
data1 = []
|
|
data = data['endpoint']
|
|
if self.num_sta == 1:
|
|
data1.append(data[data_mon])
|
|
else:
|
|
for cx in self.http_profile.created_cx.keys():
|
|
for info in data:
|
|
if cx in info:
|
|
data1.append(info[cx][data_mon])
|
|
# print(data_mon, data1)
|
|
return data1
|
|
|
|
def postcleanup(self):
|
|
self.http_profile.cleanup()
|
|
self.station_profile.cleanup()
|
|
LFUtils.wait_until_ports_disappear(base_url=self.local_realm.lfclient_url, port_list=self.station_profile.station_names,
|
|
debug=self.debug)
|
|
|
|
def file_create(self, ssh_port):
|
|
ip = self.host
|
|
user = "root"
|
|
pswd = "lanforge"
|
|
port = ssh_port
|
|
ssh = paramiko.SSHClient() # creating shh client object we use this object to connect to router
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # automatically adds the missing host key
|
|
ssh.connect(ip, port=port, username=user, password=pswd, banner_timeout=600)
|
|
cmd = '[ -f /usr/local/lanforge/nginx/html/webpage.html ] && echo "True" || echo "False"'
|
|
stdin, stdout, stderr = ssh.exec_command(str(cmd))
|
|
output = stdout.readlines()
|
|
print(output)
|
|
if output == ["True\n"]:
|
|
cmd1 = "rm /usr/local/lanforge/nginx/html/webpage.html"
|
|
stdin, stdout, stderr = ssh.exec_command(str(cmd1))
|
|
output = stdout.readlines()
|
|
time.sleep(10)
|
|
cmd2 = "sudo fallocate -l " + self.file_size + " /usr/local/lanforge/nginx/html/webpage.html"
|
|
stdin, stdout, stderr = ssh.exec_command(str(cmd2))
|
|
print("File creation done", self.file_size)
|
|
output = stdout.readlines()
|
|
else:
|
|
cmd2 = "sudo fallocate -l " + self.file_size + " /usr/local/lanforge/nginx/html/webpage.html"
|
|
stdin, stdout, stderr = ssh.exec_command(str(cmd2))
|
|
print("File creation done", self.file_size)
|
|
output = stdout.readlines()
|
|
ssh.close()
|
|
time.sleep(1)
|
|
return output
|
|
|
|
def download_time_in_sec(self, result_data):
|
|
self.resullt_data = result_data
|
|
download_time = dict.fromkeys(result_data.keys())
|
|
for i in download_time:
|
|
try:
|
|
download_time[i] = result_data[i]['dl_time']
|
|
except BaseException:
|
|
download_time[i] = []
|
|
print(download_time)
|
|
lst = []
|
|
lst1 = []
|
|
lst2 = []
|
|
dwnld_time = dict.fromkeys(result_data.keys())
|
|
dataset = []
|
|
for i in download_time:
|
|
if i == "5G":
|
|
for j in download_time[i]:
|
|
x = (j / 1000)
|
|
y = round(x, 1)
|
|
lst.append(y)
|
|
print(lst)
|
|
dwnld_time["5G"] = lst
|
|
dataset.append(dwnld_time["5G"])
|
|
if i == "2.4G":
|
|
for j in download_time[i]:
|
|
x = (j / 1000)
|
|
y = round(x, 1)
|
|
lst1.append(y)
|
|
print(lst)
|
|
dwnld_time["2.4G"] = lst1
|
|
dataset.append(dwnld_time["2.4G"])
|
|
if i == "Both":
|
|
# print("yes", download_time[i])
|
|
for j in download_time[i]:
|
|
x = (j / 1000)
|
|
y = round(x, 1)
|
|
lst2.append(y)
|
|
# print(lst2)
|
|
dwnld_time["Both"] = lst2
|
|
dataset.append(dwnld_time["Both"])
|
|
return dataset
|
|
|
|
def speed_in_Mbps(self, result_data):
|
|
self.resullt_data = result_data
|
|
speed = dict.fromkeys(result_data.keys())
|
|
for i in speed:
|
|
try:
|
|
speed[i] = result_data[i]['speed']
|
|
except BaseException:
|
|
speed[i] = []
|
|
print(speed)
|
|
lst = []
|
|
lst1 = []
|
|
lst2 = []
|
|
speed_ = dict.fromkeys(result_data.keys())
|
|
dataset = []
|
|
for i in speed:
|
|
if i == "5G":
|
|
for j in speed[i]:
|
|
x = (j / 1000000)
|
|
y = round(x, 1)
|
|
lst.append(y)
|
|
print("hi", lst)
|
|
speed_["5G"] = lst
|
|
dataset.append(speed_["5G"])
|
|
if i == "2.4G":
|
|
for j in speed[i]:
|
|
x = (j / 1000000)
|
|
y = round(x, 1)
|
|
lst1.append(y)
|
|
print("yes", lst1)
|
|
speed_["2.4G"] = lst1
|
|
dataset.append(speed_["2.4G"])
|
|
if i == "Both":
|
|
# print("yes", speed[i])
|
|
for j in speed[i]:
|
|
x = (j / 1000000)
|
|
y = round(x, 1)
|
|
lst2.append(y)
|
|
print(lst2)
|
|
speed_["Both"] = lst2
|
|
dataset.append(speed_["Both"])
|
|
return dataset
|
|
|
|
def summary_calculation(self, result_data, bands, threshold_5g, threshold_2g, threshold_both):
|
|
self.resullt_data = result_data
|
|
x1 = []
|
|
html_struct = dict.fromkeys(list(result_data.keys()))
|
|
for fcc in list(result_data.keys()):
|
|
fcc_type = result_data[fcc]["avg"]
|
|
print(fcc_type)
|
|
for i in fcc_type:
|
|
x1.append(i)
|
|
# print(x)
|
|
y11 = []
|
|
for i in x1:
|
|
i = i / 1000
|
|
y11.append(i)
|
|
# print(y)
|
|
z11 = []
|
|
for i in y11:
|
|
i = str(round(i, 1))
|
|
z11.append(i)
|
|
print(z11)
|
|
pass_fail_list = []
|
|
# print(list(result_data.keys()))
|
|
sumry2 = []
|
|
sumry5 = []
|
|
sumryB = []
|
|
data = []
|
|
if bands == ["5G", "2.4G", "Both"]:
|
|
print("yes")
|
|
# 5G
|
|
if float(z11[0]) == 0.0 or float(z11[0]) > float(threshold_5g):
|
|
print("FAIL")
|
|
pass_fail_list.append("FAIL")
|
|
sumry5.append("FAIL")
|
|
elif float(z11[0]) < float(threshold_5g):
|
|
print("PASS")
|
|
pass_fail_list.append("PASS")
|
|
sumry5.append("PASS")
|
|
# 2.4g
|
|
if float(z11[1]) == 0.0 or float(z11[1]) > float(threshold_2g):
|
|
var = "FAIL"
|
|
pass_fail_list.append(var)
|
|
sumry2.append("FAIL")
|
|
elif float(z11[1]) < float(threshold_2g):
|
|
pass_fail_list.append("PASS")
|
|
sumry2.append("PASS")
|
|
# BOTH
|
|
if float(z11[2]) == 0.0 or float(z11[2]) > float(threshold_both):
|
|
var = "FAIL"
|
|
pass_fail_list.append(var)
|
|
sumryB.append("FAIL")
|
|
elif float(z11[2]) < float(threshold_both):
|
|
pass_fail_list.append("PASS")
|
|
sumryB.append("PASS")
|
|
|
|
data.append(','.join(sumry5))
|
|
data.append(','.join(sumry2))
|
|
data.append(','.join(sumryB))
|
|
|
|
elif bands == ['5G']:
|
|
if float(z11[0]) == 0.0 or float(z11[0]) > float(threshold_5g):
|
|
print("FAIL")
|
|
pass_fail_list.append("FAIL")
|
|
sumry5.append("FAIL")
|
|
elif float(z11[0]) < float(threshold_5g):
|
|
print("PASS")
|
|
pass_fail_list.append("PASS")
|
|
sumry5.append("PASS")
|
|
data.append(','.join(sumry5))
|
|
|
|
elif bands == ['2.4G']:
|
|
if float(z11[0]) == 0.0 or float(z11[0]) > float(threshold_2g):
|
|
var = "FAIL"
|
|
pass_fail_list.append(var)
|
|
sumry2.append("FAIL")
|
|
elif float(z11[0]) < float(threshold_2g):
|
|
pass_fail_list.append("PASS")
|
|
sumry2.append("PASS")
|
|
data.append(','.join(sumry2))
|
|
elif bands == ["Both"]:
|
|
if float(z11[0]) == 0.0 or float(z11[0]) > float(threshold_both):
|
|
var = "FAIL"
|
|
pass_fail_list.append(var)
|
|
sumryB.append("FAIL")
|
|
elif float(z11[0]) < float(threshold_both):
|
|
pass_fail_list.append("PASS")
|
|
sumryB.append("PASS")
|
|
data.append(','.join(sumryB))
|
|
elif bands == ['5G', '2.4G']:
|
|
if float(z11[0]) == 0.0 or float(z11[0]) > float(threshold_5g):
|
|
print("FAIL")
|
|
pass_fail_list.append("FAIL")
|
|
sumry5.append("FAIL")
|
|
elif float(z11[0]) < float(threshold_5g):
|
|
print("PASS")
|
|
pass_fail_list.append("PASS")
|
|
sumry5.append("PASS")
|
|
# 2.4g
|
|
if float(z11[1]) == 0.0 or float(z11[1]) > float(threshold_2g):
|
|
var = "FAIL"
|
|
pass_fail_list.append(var)
|
|
sumry2.append("FAIL")
|
|
elif float(z11[1]) < float(threshold_2g):
|
|
pass_fail_list.append("PASS")
|
|
sumry2.append("PASS")
|
|
data.append(','.join(sumry5))
|
|
data.append(','.join(sumry2))
|
|
|
|
elif bands == ['5G', 'Both']:
|
|
if float(z11[0]) == 0.0 or float(z11[0]) > float(threshold_5g):
|
|
print("FAIL")
|
|
pass_fail_list.append("FAIL")
|
|
sumry5.append("FAIL")
|
|
elif float(z11[0]) < float(threshold_5g):
|
|
print("PASS")
|
|
pass_fail_list.append("PASS")
|
|
sumry5.append("PASS")
|
|
if float(z11[1]) == 0.0 or float(z11[1]) > float(threshold_both):
|
|
var = "FAIL"
|
|
pass_fail_list.append(var)
|
|
sumryB.append("FAIL")
|
|
elif float(z11[1]) < float(threshold_both):
|
|
pass_fail_list.append("PASS")
|
|
sumryB.append("PASS")
|
|
|
|
data.append(','.join(sumry5))
|
|
data.append(','.join(sumryB))
|
|
|
|
elif bands == ['2.4G', 'Both']:
|
|
if float(z11[0]) == 0.0 or float(z11[0]) > float(threshold_2g):
|
|
var = "FAIL"
|
|
pass_fail_list.append(var)
|
|
sumry2.append("FAIL")
|
|
elif float(z11[0]) < float(threshold_2g):
|
|
pass_fail_list.append("PASS")
|
|
sumry2.append("PASS")
|
|
if float(z11[1]) == 0.0 or float(z11[1]) > float(threshold_both):
|
|
var = "FAIL"
|
|
pass_fail_list.append(var)
|
|
sumryB.append("FAIL")
|
|
elif float(z11[1]) < float(threshold_both):
|
|
pass_fail_list.append("PASS")
|
|
sumryB.append("PASS")
|
|
data.append(','.join(sumry2))
|
|
data.append(','.join(sumryB))
|
|
return data
|
|
|
|
def check_station_ip(self):
|
|
pass
|
|
|
|
def generate_graph(self, dataset, lis, bands):
|
|
graph = lf_graph.lf_bar_graph(_data_set=dataset, _xaxis_name="Stations", _yaxis_name="Time in Seconds",
|
|
_xaxis_categories=lis, _label=bands, _xticks_font=8,
|
|
_graph_image_name="webpage download time graph",
|
|
_color=['forestgreen', 'darkorange', 'blueviolet'], _color_edge='black', _figsize=(14, 5),
|
|
_grp_title="Download time taken by each client", _xaxis_step=1, _show_bar_value=True,
|
|
_text_font=6, _text_rotation=60,
|
|
_legend_loc="upper right",
|
|
_legend_box=(1, 1.15),
|
|
_enable_csv=True
|
|
)
|
|
graph_png = graph.build_bar_graph()
|
|
print("graph name {}".format(graph_png))
|
|
return graph_png
|
|
|
|
def graph_2(self, dataset2, lis, bands):
|
|
graph_2 = lf_graph.lf_bar_graph(_data_set=dataset2, _xaxis_name="Stations", _yaxis_name="Download Rate in Mbps",
|
|
_xaxis_categories=lis, _label=bands, _xticks_font=8,
|
|
_graph_image_name="webpage_speed_graph",
|
|
_color=['forestgreen', 'darkorange', 'blueviolet'], _color_edge='black',
|
|
_figsize=(14, 5),
|
|
_grp_title="Download rate for each client (Mbps)", _xaxis_step=1, _show_bar_value=True,
|
|
_text_font=6, _text_rotation=60,
|
|
_legend_loc="upper right",
|
|
_legend_box=(1, 1.15),
|
|
_enable_csv=True
|
|
)
|
|
graph_png = graph_2.build_bar_graph()
|
|
return graph_png
|
|
|
|
def generate_report(self, date, num_stations, duration, test_setup_info, dataset, lis, bands, threshold_2g,
|
|
threshold_5g, threshold_both, dataset2, summary_table_value, result_data, test_rig,
|
|
test_tag, dut_hw_version, dut_sw_version, dut_model_num, dut_serial_num, test_id,
|
|
test_input_infor, csv_outfile):
|
|
report = lf_report.lf_report(_results_dir_name="webpage_test", _output_html="Webpage.html", _output_pdf="Webpage.pdf")
|
|
|
|
# Section commented because graphing breaks two band report generation
|
|
# TODO: Fix graphing bug with multiple bands being recorded
|
|
if bands == "Both":
|
|
num_stations = num_stations * 2
|
|
report.set_title("WEBPAGE DOWNLOAD TEST")
|
|
report.set_date(date)
|
|
report.build_banner()
|
|
report.set_table_title("Test Setup Information")
|
|
report.build_table_title()
|
|
|
|
report.test_setup_table(value="Device under test", test_setup_data=test_setup_info)
|
|
|
|
report.set_obj_html("Objective", "The Webpage Download Test is designed to test the performance of the "
|
|
"Access Point.The goal is to check whether the webpage loading time of all the "
|
|
+ str(num_stations) +
|
|
"clients which are downloading at the same time meets the expectation when clients"
|
|
"connected on single radio as well as dual radio")
|
|
report.build_objective()
|
|
report.set_obj_html("Download Time Graph", "The below graph provides information about the download time taken "
|
|
"by each client to download webpage for test duration of " + str(duration) + " min")
|
|
report.build_objective()
|
|
|
|
graph = self.generate_graph(dataset=dataset, lis=lis, bands=bands)
|
|
report.set_graph_image(graph)
|
|
report.set_csv_filename(graph)
|
|
report.move_csv_file()
|
|
report.move_graph_image()
|
|
report.build_graph()
|
|
report.set_obj_html("Download Rate Graph", "The below graph provides information about the download rate in "
|
|
"Mbps of each client to download the webpage for test duration of " + str(duration) + " min")
|
|
report.build_objective()
|
|
graph2 = self.graph_2(dataset2, lis=lis, bands=bands)
|
|
print("graph name {}".format(graph2))
|
|
report.set_graph_image(graph2)
|
|
report.set_csv_filename(graph2)
|
|
report.move_csv_file()
|
|
report.move_graph_image()
|
|
report.build_graph()
|
|
report.set_obj_html("Summary Table Description", "This Table shows you the summary "
|
|
"result of Webpage Download Test as PASS or FAIL criteria. If the average time taken by " +
|
|
str(num_stations) + " clients to access the webpage is less than " + str( threshold_2g) +
|
|
"s it's a PASS criteria for 2.4 ghz clients, If the average time taken by " + "" +
|
|
str( num_stations) + " clients to access the webpage is less than " + str( threshold_5g) +
|
|
"s it's a PASS criteria for 5 ghz clients and If the average time taken by " + str( num_stations) +
|
|
" clients to access the webpage is less than " + str(threshold_both) +
|
|
"s it's a PASS criteria for 2.4 ghz and 5ghz clients")
|
|
|
|
report.build_objective()
|
|
test_setup1 = pd.DataFrame(summary_table_value)
|
|
report.set_table_dataframe(test_setup1)
|
|
report.build_table()
|
|
|
|
report.set_obj_html("Download Time Table Description", "This Table will provide you information of the "
|
|
"minimum, maximum and the average time taken by clients to download a webpage in seconds")
|
|
|
|
report.build_objective()
|
|
x = []
|
|
for fcc in list(result_data.keys()):
|
|
fcc_type = result_data[fcc]["min"]
|
|
# print(fcc_type)
|
|
for i in fcc_type:
|
|
x.append(i)
|
|
# print(x)
|
|
y = []
|
|
for i in x:
|
|
i = i / 1000
|
|
y.append(i)
|
|
z = []
|
|
for i in y:
|
|
i = str(round(i, 1))
|
|
z.append(i)
|
|
# rint(z)
|
|
x1 = []
|
|
|
|
for fcc in list(result_data.keys()):
|
|
fcc_type = result_data[fcc]["max"]
|
|
# print(fcc_type)
|
|
for i in fcc_type:
|
|
x1.append(i)
|
|
# print(x1)
|
|
y1 = []
|
|
for i in x1:
|
|
i = i / 1000
|
|
y1.append(i)
|
|
z1 = []
|
|
for i in y1:
|
|
i = str(round(i, 1))
|
|
z1.append(i)
|
|
# print(z1)
|
|
x2 = []
|
|
|
|
for fcc in list(result_data.keys()):
|
|
fcc_type = result_data[fcc]["avg"]
|
|
# print(fcc_type)
|
|
for i in fcc_type:
|
|
x2.append(i)
|
|
# print(x2)
|
|
y2 = []
|
|
for i in x2:
|
|
i = i / 1000
|
|
y2.append(i)
|
|
z2 = []
|
|
for i in y2:
|
|
i = str(round(i, 1))
|
|
z2.append(i)
|
|
|
|
download_table_value = {
|
|
"Band": bands,
|
|
"Minimum": z,
|
|
"Maximum": z1,
|
|
"Average": z2
|
|
}
|
|
|
|
# Get the report path to create the kpi.csv path
|
|
kpi_path = report.get_report_path()
|
|
print("kpi_path :{kpi_path}".format(kpi_path=kpi_path))
|
|
|
|
kpi_csv = lf_kpi_csv.lf_kpi_csv(
|
|
_kpi_path=kpi_path,
|
|
_kpi_test_rig=test_rig,
|
|
_kpi_test_tag=test_tag,
|
|
_kpi_dut_hw_version=dut_hw_version,
|
|
_kpi_dut_sw_version=dut_sw_version,
|
|
_kpi_dut_model_num=dut_model_num,
|
|
_kpi_dut_serial_num=dut_serial_num,
|
|
_kpi_test_id=test_id)
|
|
kpi_csv.kpi_dict['Units'] = "Mbps"
|
|
for band in range(len(download_table_value["Band"])):
|
|
kpi_csv.kpi_csv_get_dict_update_time()
|
|
kpi_csv.kpi_dict['Graph-Group'] = "Webpage Download {band}".format(
|
|
band=download_table_value['Band'][band])
|
|
kpi_csv.kpi_dict['short-description'] = "Webpage download {band} Minimum".format(
|
|
band=download_table_value['Band'][band])
|
|
kpi_csv.kpi_dict['numeric-score'] = "{min}".format(min=download_table_value['Minimum'][band])
|
|
kpi_csv.kpi_csv_write_dict(kpi_csv.kpi_dict)
|
|
kpi_csv.kpi_dict['short-description'] = "Webpage download {band} Maximum".format(
|
|
band=download_table_value['Band'][band])
|
|
kpi_csv.kpi_dict['numeric-score'] = "{max}".format(max=download_table_value['Maximum'][band])
|
|
kpi_csv.kpi_csv_write_dict(kpi_csv.kpi_dict)
|
|
kpi_csv.kpi_dict['short-description'] = "Webpage download {band} Average".format(
|
|
band=download_table_value['Band'][band])
|
|
kpi_csv.kpi_dict['numeric-score'] = "{avg}".format(avg=download_table_value['Average'][band])
|
|
kpi_csv.kpi_csv_write_dict(kpi_csv.kpi_dict)
|
|
|
|
if csv_outfile is not None:
|
|
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
|
|
csv_outfile = "{}_{}-test_l3_longevity.csv".format(
|
|
csv_outfile, current_time)
|
|
csv_outfile = report.file_add_path(csv_outfile)
|
|
print("csv output file : {}".format(csv_outfile))
|
|
|
|
test_setup = pd.DataFrame(download_table_value)
|
|
report.set_table_dataframe(test_setup)
|
|
report.build_table()
|
|
report.set_table_title("Test input Information")
|
|
report.build_table_title()
|
|
report.test_setup_table(value="Information", test_setup_data=test_input_infor)
|
|
report.build_footer()
|
|
html_file = report.write_html()
|
|
print("returned file {}".format(html_file))
|
|
print(html_file)
|
|
report.write_pdf_with_timestamp(_page_size='A4', _orientation='Landscape')
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog="lf_webpage.py",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
description="lanforge webpage download Test Script")
|
|
parser.add_argument('--mgr', help='hostname for where LANforge GUI is running', default='localhost')
|
|
parser.add_argument('--mgr_port', help='port LANforge GUI HTTP service is running on', default=8080)
|
|
parser.add_argument('--upstream_port', help='non-station port that generates traffic: eg: eth1', default='eth2')
|
|
parser.add_argument('--num_stations', type=int, help='number of stations to create', default=1)
|
|
parser.add_argument('--twog_radio', help='specify radio for 2.4G clients', default='wiphy3')
|
|
parser.add_argument('--fiveg_radio', help='specify radio for 5 GHz client', default='wiphy0')
|
|
parser.add_argument('--twog_security', help='WiFi Security protocol: {open|wep|wpa2|wpa3} for 2.4G clients')
|
|
parser.add_argument('--twog_ssid', help='WiFi SSID for script object to associate for 2.4G clients')
|
|
parser.add_argument('--twog_passwd', help='WiFi passphrase/password/key for 2.4G clients')
|
|
parser.add_argument('--fiveg_security', help='WiFi Security protocol: {open|wep|wpa2|wpa3} for 5G clients')
|
|
parser.add_argument('--fiveg_ssid', help='WiFi SSID for script object to associate for 5G clients')
|
|
parser.add_argument('--fiveg_passwd', help='WiFi passphrase/password/key for 5G clients')
|
|
parser.add_argument('--target_per_ten', help='number of request per 10 minutes', default=100)
|
|
parser.add_argument('--file_size', type=str, help='specify the size of file you want to download', default='5MB')
|
|
parser.add_argument('--bands', nargs="+", help='specify which band testing you want to run eg 5G OR 2.4G OR Both',
|
|
default=["5G", "2.4G", "Both"])
|
|
parser.add_argument('--duration', type=int, help='time to run traffic')
|
|
parser.add_argument('--threshold_5g', help="Enter the threshold value for 5G Pass/Fail criteria", default="60")
|
|
parser.add_argument('--threshold_2g', help="Enter the threshold value for 2.4G Pass/Fail criteria", default="90")
|
|
parser.add_argument('--threshold_both', help="Enter the threshold value for Both Pass/Fail criteria", default="50")
|
|
parser.add_argument('--ap_name', help="specify the ap model ", default="TestAP")
|
|
parser.add_argument('--ssh_port', type=int, help="specify the ssh port eg 22", default=22)
|
|
parser.add_argument("--test_rig", default="", help="test rig for kpi.csv, testbed that the tests are run on")
|
|
parser.add_argument("--test_tag", default="",
|
|
help="test tag for kpi.csv, test specific information to differentiate the test")
|
|
parser.add_argument("--dut_hw_version", default="",
|
|
help="dut hw version for kpi.csv, hardware version of the device under test")
|
|
parser.add_argument("--dut_sw_version", default="",
|
|
help="dut sw version for kpi.csv, software version of the device under test")
|
|
parser.add_argument("--dut_model_num", default="",
|
|
help="dut model for kpi.csv, model number / name of the device under test")
|
|
parser.add_argument("--dut_serial_num", default="",
|
|
help="dut serial for kpi.csv, serial number / serial number of the device under test")
|
|
parser.add_argument("--test_priority", default="", help="dut model for kpi.csv, test-priority is arbitrary number")
|
|
parser.add_argument("--test_id", default="lf_webpage", help="test-id for kpi.csv, script or test name")
|
|
parser.add_argument('--csv_outfile', help="--csv_outfile <Output file for csv data>", default="")
|
|
|
|
args = parser.parse_args()
|
|
test_time = datetime.now()
|
|
test_time = test_time.strftime("%b %d %H:%M:%S")
|
|
print("Test started at ", test_time)
|
|
list5G = []
|
|
list5G_bytes = []
|
|
list5G_speed = []
|
|
list2G = []
|
|
list2G_bytes = []
|
|
list2G_speed = []
|
|
Both = []
|
|
Both_bytes = []
|
|
Both_speed = []
|
|
dict_keys = []
|
|
dict_keys.extend(args.bands)
|
|
# print(dict_keys)
|
|
final_dict = dict.fromkeys(dict_keys)
|
|
# print(final_dict)
|
|
dict1_keys = ['dl_time', 'min', 'max', 'avg', 'bytes_rd', 'speed']
|
|
for i in final_dict:
|
|
final_dict[i] = dict.fromkeys(dict1_keys)
|
|
print(final_dict)
|
|
min5 = []
|
|
min2 = []
|
|
min_both = []
|
|
max5 = []
|
|
max2 = []
|
|
max_both = []
|
|
avg2 = []
|
|
avg5 = []
|
|
avg_both = []
|
|
|
|
for bands in args.bands:
|
|
if bands == "2.4G":
|
|
security = [args.twog_security]
|
|
ssid = [args.twog_ssid]
|
|
passwd = [args.twog_passwd]
|
|
elif bands == "5G":
|
|
security = [args.fiveg_security]
|
|
ssid = [args.fiveg_ssid]
|
|
passwd = [args.fiveg_passwd]
|
|
elif bands == "Both":
|
|
security = [args.fiveg_security, args.twog_security]
|
|
ssid = [args.fiveg_ssid, args.twog_ssid]
|
|
passwd = [args.fiveg_passwd, args.twog_passwd]
|
|
http = HttpDownload(lfclient_host=args.mgr, lfclient_port=args.mgr_port,
|
|
upstream=args.upstream_port, num_sta=args.num_stations,
|
|
security=security,
|
|
ssid=ssid, password=passwd,
|
|
target_per_ten=args.target_per_ten,
|
|
file_size=args.file_size, bands=bands,
|
|
twog_radio=args.twog_radio,
|
|
fiveg_radio=args.fiveg_radio
|
|
)
|
|
http.file_create(ssh_port=args.ssh_port)
|
|
http.set_values()
|
|
http.precleanup()
|
|
http.build()
|
|
http.start()
|
|
duration = args.duration
|
|
duration = 60 * duration
|
|
print("time in seconds ", duration)
|
|
time.sleep(duration)
|
|
http.stop()
|
|
uc_avg_val = http.my_monitor('uc-avg')
|
|
rx_bytes_val = http.my_monitor('bytes-rd')
|
|
rx_rate_val = http.my_monitor('rx rate')
|
|
http.postcleanup()
|
|
|
|
if bands == "5G":
|
|
print("yes")
|
|
list5G.extend(uc_avg_val)
|
|
list5G_bytes.extend(rx_bytes_val)
|
|
list5G_speed.extend(rx_rate_val)
|
|
print(list5G)
|
|
print(list5G_bytes)
|
|
print(list5G_speed)
|
|
final_dict['5G']['dl_time'] = list5G
|
|
min5.append(min(list5G))
|
|
final_dict['5G']['min'] = min5
|
|
max5.append(max(list5G))
|
|
final_dict['5G']['max'] = max5
|
|
avg5.append((sum(list5G) / args.num_stations))
|
|
final_dict['5G']['avg'] = avg5
|
|
final_dict['5G']['bytes_rd'] = list5G_bytes
|
|
final_dict['5G']['speed'] = list5G_speed
|
|
elif bands == "2.4G":
|
|
print("no")
|
|
list2G.extend(uc_avg_val)
|
|
list2G_bytes.extend(rx_bytes_val)
|
|
list2G_speed.extend(rx_rate_val)
|
|
print(list2G)
|
|
print(list2G_bytes)
|
|
print(list2G_speed)
|
|
final_dict['2.4G']['dl_time'] = list2G
|
|
min2.append(min(list2G))
|
|
final_dict['2.4G']['min'] = min2
|
|
max2.append(max(list2G))
|
|
final_dict['2.4G']['max'] = max2
|
|
avg2.append((sum(list2G) / args.num_stations))
|
|
final_dict['2.4G']['avg'] = avg2
|
|
final_dict['2.4G']['bytes_rd'] = list2G_bytes
|
|
final_dict['2.4G']['speed'] = list2G_speed
|
|
elif bands == "Both":
|
|
Both.extend(uc_avg_val)
|
|
Both_bytes.extend(rx_bytes_val)
|
|
Both_speed.extend(rx_rate_val)
|
|
final_dict['Both']['dl_time'] = Both
|
|
min_both.append(min(Both))
|
|
final_dict['Both']['min'] = min_both
|
|
max_both.append(max(Both))
|
|
final_dict['Both']['max'] = max_both
|
|
avg_both.append((sum(Both) / args.num_stations))
|
|
final_dict['Both']['avg'] = avg_both
|
|
final_dict['Both']['bytes_rd'] = Both_bytes
|
|
final_dict['Both']['speed'] = Both_speed
|
|
|
|
result_data = final_dict
|
|
print("result", result_data)
|
|
print("Test Finished")
|
|
test_end = datetime.now()
|
|
test_end = test_end.strftime("%b %d %H:%M:%S")
|
|
print("Test ended at ", test_end)
|
|
s1 = test_time
|
|
s2 = test_end # for example
|
|
FMT = '%b %d %H:%M:%S'
|
|
test_duration = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT)
|
|
|
|
print("total test duration ", test_duration)
|
|
date = str(datetime.now()).split(",")[0].replace(" ", "-").split(".")[0]
|
|
test_setup_info = {
|
|
"DUT Name": args.ap_name,
|
|
"SSID": ','.join(ssid),
|
|
"Test Duration": test_duration,
|
|
}
|
|
test_input_infor = {
|
|
"LANforge ip": args.mgr,
|
|
"File Size": args.file_size,
|
|
"Bands": args.bands,
|
|
"Upstream": args.upstream_port,
|
|
"Stations": args.num_stations,
|
|
"SSID": ','.join(ssid),
|
|
"Security": ','.join(security),
|
|
"Duration": args.duration,
|
|
"Contact": "support@candelatech.com"
|
|
}
|
|
http1 = HttpDownload(lfclient_host=args.mgr, lfclient_port=args.mgr_port,
|
|
upstream=args.upstream_port, num_sta=args.num_stations,
|
|
security=security,
|
|
ssid=ssid, password=passwd,
|
|
target_per_ten=args.target_per_ten,
|
|
file_size=args.file_size, bands=args.bands,
|
|
twog_radio=args.twog_radio,
|
|
fiveg_radio=args.fiveg_radio)
|
|
dataset = http1.download_time_in_sec(result_data=result_data)
|
|
lis = []
|
|
if bands == "Both":
|
|
for i in range(1, args.num_stations*2 + 1):
|
|
lis.append(i)
|
|
else:
|
|
for i in range(1, args.num_stations + 1):
|
|
lis.append(i)
|
|
|
|
dataset2 = http1.speed_in_Mbps(result_data=result_data)
|
|
data = http1.summary_calculation(
|
|
result_data=result_data,
|
|
bands=args.bands,
|
|
threshold_5g=args.threshold_5g,
|
|
threshold_2g=args.threshold_2g,
|
|
threshold_both=args.threshold_both)
|
|
|
|
summary_table_value = {
|
|
"": args.bands,
|
|
"PASS/FAIL": data
|
|
}
|
|
http1.generate_report(date, num_stations=args.num_stations,
|
|
duration=args.duration, test_setup_info=test_setup_info, dataset=dataset, lis=lis,
|
|
bands=args.bands, threshold_2g=args.threshold_2g, threshold_5g=args.threshold_5g,
|
|
threshold_both=args.threshold_both, dataset2=dataset2,
|
|
summary_table_value=summary_table_value, result_data=result_data,
|
|
test_rig=args.test_rig, test_tag=args.test_tag, dut_hw_version=args.dut_hw_version,
|
|
dut_sw_version=args.dut_sw_version, dut_model_num=args.dut_model_num,
|
|
dut_serial_num=args.dut_serial_num, test_id=args.test_id,
|
|
test_input_infor=test_input_infor, csv_outfile=args.csv_outfile)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|