mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-02 19:58:03 +00:00
initial commit: lf_ftp_test.py ftp_html.py (using current reporting)
Signed-off-by: Chuck SmileyRekiere <chuck.smileyrekiere@candelatech.com>
This commit is contained in:
381
py-scripts/ftp_html.py
Normal file
381
py-scripts/ftp_html.py
Normal file
@@ -0,0 +1,381 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from matplotlib import pyplot as plt
|
||||
from datetime import datetime
|
||||
import numpy as np
|
||||
import os.path
|
||||
from os import path
|
||||
import sys
|
||||
import pdfkit
|
||||
sys.path.append('/home/lanforge/.local/lib/python3.6/site-packages')
|
||||
def report_banner(date):
|
||||
banner_data = """
|
||||
<!DOCTYPE html>
|
||||
<html lang='en'>
|
||||
<head>
|
||||
<meta charset='UTF-8'>
|
||||
<meta name='viewport' content='width=device-width, initial-scale=1' />
|
||||
<title>LANforge Report</title>
|
||||
</head>
|
||||
<title>FTP Test </title></head>
|
||||
<body>
|
||||
<div class='Section report_banner-1000x205' style='background-image:url("/home/lanforge/LANforgeGUI_5.4.3/images/report_banner-1000x205.jpg");background-repeat:no-repeat;padding:0;margin:0;min-width:1000px; min-height:205px;width:1000px; height:205px;max-width:1000px; max-height:205px;'>
|
||||
<br>
|
||||
<img align='right' style='padding:25;margin:5;width:200px;' src="/home/lanforge/LANforgeGUI_5.4.3/images/CandelaLogo2-90dpi-200x90-trans.png" border='0' />
|
||||
<div class='HeaderStyle'>
|
||||
<br>
|
||||
<h1 class='TitleFontPrint' style='color:darkgreen;'> FTP Test </h1>
|
||||
<h3 class='TitleFontPrint' style='color:darkgreen;'>""" + str(date) + """</h3>
|
||||
</div>
|
||||
</div>
|
||||
<br><br>
|
||||
"""
|
||||
return str(banner_data)
|
||||
def test_objective(objective= 'This FTP Test is used to "Verify that N clients connected on Specified band and can simultaneously download some amount of file from FTP server and measuring the time taken by client to Download/Upload the file."'):
|
||||
test_objective = """
|
||||
<!-- Test Objective -->
|
||||
<h3 align='left'>Objective</h3>
|
||||
<p align='left' width='900'>""" + str(objective) + """</p>
|
||||
<br>
|
||||
"""
|
||||
return str(test_objective)
|
||||
def test_setup_information(test_setup_data=None):
|
||||
if test_setup_data is None:
|
||||
return None
|
||||
else:
|
||||
var = ""
|
||||
for i in test_setup_data:
|
||||
var = var + "<tr><td>" + i + "</td><td colspan='3'>" + str(test_setup_data[i]) + "</td></tr>"
|
||||
|
||||
setup_information = """
|
||||
<!-- Test Setup Information -->
|
||||
<table width='700px' border='1' cellpadding='2' cellspacing='0' style='border-top-color: gray; border-top-style: solid; border-top-width: 1px; border-right-color: gray; border-right-style: solid; border-right-width: 1px; border-bottom-color: gray; border-bottom-style: solid; border-bottom-width: 1px; border-left-color: gray; border-left-style: solid; border-left-width: 1px'>
|
||||
<tr>
|
||||
<th colspan='2'>Test Setup Information</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Device Under Test</td>
|
||||
<td>
|
||||
<table width='100%' border='0' cellpadding='2' cellspacing='0' style='border-top-color: gray; border-top-style: solid; border-top-width: 1px; border-right-color: gray; border-right-style: solid; border-right-width: 1px; border-bottom-color: gray; border-bottom-style: solid; border-bottom-width: 1px; border-left-color: gray; border-left-style: solid; border-left-width: 1px'>
|
||||
""" + str(var) + """
|
||||
</table>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
<br>
|
||||
"""
|
||||
return str(setup_information)
|
||||
|
||||
|
||||
def pass_fail_description(data=" This Table will give Pass/Fail results. "):
|
||||
pass_fail_info = """
|
||||
<!-- Radar Detect status -->
|
||||
<h3 align='left'>PASS/FAIL Results</h3>
|
||||
<p align='left' width='900'>""" + str(data) + """</p>
|
||||
<br>
|
||||
"""
|
||||
return str(pass_fail_info)
|
||||
|
||||
|
||||
def download_upload_time_description(data=" This Table will FTP Download/Upload Time of Clients."):
|
||||
download_upload_time= """
|
||||
<!-- Radar Detect status -->
|
||||
<h3 align='left'>File Download/Upload Time (sec)</h3>
|
||||
<p align='left' width='900'>""" + str(data) + """</p>
|
||||
<br>
|
||||
"""
|
||||
return str(download_upload_time)
|
||||
|
||||
|
||||
def add_pass_fail_table(result_data, row_head_list, col_head_list):
|
||||
var_row = "<th></th>"
|
||||
for row in col_head_list:
|
||||
var_row = var_row + "<th>" + str(row) + "</th>"
|
||||
list_data = []
|
||||
dict_data = {}
|
||||
bands = result_data[1]["bands"]
|
||||
file_sizes = result_data[1]["file_sizes"]
|
||||
directions = result_data[1]["directions"]
|
||||
for b in bands:
|
||||
final_data = ""
|
||||
for size in file_sizes:
|
||||
for d in directions:
|
||||
for data in result_data.values():
|
||||
if data["band"] == b and data["direction"] == d and data["file_size"] == size:
|
||||
if data["result"] == "Pass":
|
||||
final_data = final_data + "<td style='background-color:Green'>Pass</td>"
|
||||
elif data["result"] == "Fail":
|
||||
final_data = final_data + "<td style='background-color:Red'>Fail</td>"
|
||||
|
||||
list_data.append(final_data)
|
||||
|
||||
#print(list_data)
|
||||
j = 0
|
||||
for i in row_head_list:
|
||||
dict_data[i] = list_data[j]
|
||||
j = j + 1
|
||||
#print(dict_data)
|
||||
var_col = ""
|
||||
for col in row_head_list:
|
||||
var_col = var_col + "<tr><td>" + str(col) + "</td><!-- Add Variable Here -->" + str(
|
||||
dict_data[col]) + "</tr>"
|
||||
|
||||
pass_fail_table = """
|
||||
<!-- Radar Detected Table -->
|
||||
<table width='1000px' border='1' cellpadding='2' cellspacing='0' >
|
||||
|
||||
<table width='1000px' border='1' >
|
||||
<tr>
|
||||
""" + str(var_row) + """
|
||||
</tr>
|
||||
""" + str(var_col) + """
|
||||
</table>
|
||||
</table>
|
||||
<br><br><br><br><br><br><br>
|
||||
"""
|
||||
return pass_fail_table
|
||||
|
||||
|
||||
def download_upload_time_table(result_data, row_head_list, col_head_list):
|
||||
var_row = "<th></th>"
|
||||
for row in col_head_list:
|
||||
var_row = var_row + "<th>" + str(row) + "</th>"
|
||||
list_data = []
|
||||
dict_data = {}
|
||||
bands = result_data[1]["bands"]
|
||||
file_sizes = result_data[1]["file_sizes"]
|
||||
directions = result_data[1]["directions"]
|
||||
for b in bands:
|
||||
final_data = ""
|
||||
for size in file_sizes:
|
||||
for d in directions:
|
||||
for data in result_data.values():
|
||||
data_time = data['time']
|
||||
if data_time.count(0) == 0:
|
||||
Min = min(data_time)
|
||||
Max = max(data_time)
|
||||
Sum = int(sum(data_time))
|
||||
Len = len(data_time)
|
||||
Avg = round(Sum / Len,2)
|
||||
elif data_time.count(0) == len(data_time):
|
||||
Min = "-"
|
||||
Max = "-"
|
||||
Avg = "-"
|
||||
else:
|
||||
data_time = [i for i in data_time if i != 0]
|
||||
Min = min(data_time)
|
||||
Max = max(data_time)
|
||||
Sum = int(sum(data_time))
|
||||
Len = len(data_time)
|
||||
Avg = round(Sum / Len,2)
|
||||
string_data = "Min=" + str(Min) + ",Max=" + str(Max) + ",Avg=" + str(Avg) + " (sec)"
|
||||
if data["band"] == b and data["direction"] == d and data["file_size"] == size:
|
||||
final_data = final_data + """<td>""" + string_data + """</td>"""
|
||||
|
||||
list_data.append(final_data)
|
||||
|
||||
#print(list_data)
|
||||
j = 0
|
||||
for i in row_head_list:
|
||||
dict_data[i] = list_data[j]
|
||||
j = j + 1
|
||||
#print(dict_data)
|
||||
var_col = ""
|
||||
for col in row_head_list:
|
||||
var_col = var_col + "<tr><td>" + str(col) + "</td><!-- Add Variable Here -->" + str(
|
||||
dict_data[col]) + "</tr>"
|
||||
|
||||
download_upload_table = """
|
||||
<!-- Radar Detected Table -->
|
||||
<table width='1000px' border='1' cellpadding='2' cellspacing='0' >
|
||||
|
||||
<table width='1000px' border='1' >
|
||||
<tr>
|
||||
""" + str(var_row) + """
|
||||
</tr>
|
||||
""" + str(var_col) + """
|
||||
</table>
|
||||
</table>
|
||||
<br><br><br><br><br><br><br>
|
||||
"""
|
||||
return download_upload_table
|
||||
def graph_html(graph_path="",graph_name="",graph_description=""):
|
||||
graph_html_obj = """
|
||||
<h3>""" +graph_name+ """</h3>
|
||||
<p>""" +graph_description+ """</p>
|
||||
<img align='center' style='padding:15;margin:5;width:1000px;' src=""" + graph_path + """ border='1' />
|
||||
<br><br>
|
||||
"""
|
||||
return str(graph_html_obj)
|
||||
|
||||
|
||||
def bar_plot(ax,x_axis, data, colors=None, total_width=0.8, single_width=1, legend=True):
|
||||
# Check if colors where provided, otherwhise use the default color cycle
|
||||
if colors is None:
|
||||
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
|
||||
|
||||
# Number of bars per group
|
||||
n_bars = len(data)
|
||||
|
||||
# The width of a single bar
|
||||
bar_width = total_width / n_bars
|
||||
|
||||
# List containing handles for the drawn bars, used for the legend
|
||||
bars = []
|
||||
|
||||
# Iterate over all data
|
||||
for i, (name, values) in enumerate(data.items()):
|
||||
# The offset in x direction of that bar
|
||||
x_offset = (i - n_bars / 2) * bar_width + bar_width / 2
|
||||
|
||||
# Draw a bar for every value of that type
|
||||
for x, y in enumerate(values):
|
||||
bar = ax.bar(x + x_offset, y, width=bar_width * single_width, color=colors[i % len(colors)])
|
||||
|
||||
# Add a handle to the last drawn bar, which we'll need for the legend
|
||||
bars.append(bar[0])
|
||||
|
||||
# Draw legend if we need
|
||||
if legend:
|
||||
ax.legend(bars, data.keys(),bbox_to_anchor=(1.1,1.05),loc='upper right')
|
||||
ax.set_ylabel('Time in seconds')
|
||||
ax.set_xlabel("stations")
|
||||
x_data = x_axis
|
||||
idx = np.asarray([i for i in range(len(x_data))])
|
||||
ax.set_xticks(idx)
|
||||
|
||||
ax.set_xticklabels(x_data)
|
||||
|
||||
def generate_graph(result_data, x_axis,band,size,graph_path):
|
||||
# bands = result_data[1]["bands"]
|
||||
# file_sizes = result_data[1]["file_sizes"]
|
||||
num_stations = result_data[1]["num_stations"]
|
||||
# for b in bands:
|
||||
# for size in file_sizes:
|
||||
|
||||
dict_of_graph = {}
|
||||
color = []
|
||||
graph_name = ""
|
||||
graph_description=""
|
||||
count = 0
|
||||
for data in result_data.values():
|
||||
if data["band"] == band and data["file_size"] == size and data["direction"] == "Download":
|
||||
dict_of_graph["Download"] = data["time"]
|
||||
color.append("Orange")
|
||||
graph_name = "File size "+ size +" " + str(num_stations) + " Clients " +band+ "-File Download Times(secs)"
|
||||
graph_description = "Out of "+ str(data["num_stations"])+ " clients, "+ str(data["num_stations"] - data["time"].count(0))+ " are able to download " + "within " + str(data["duration"]) + " min."
|
||||
count = count + 1
|
||||
if data["band"] == band and data["file_size"] == size and data["direction"] == "Upload":
|
||||
dict_of_graph["Upload"] = data["time"]
|
||||
color.append("Blue")
|
||||
graph_name = "File size "+ size +" " + str(num_stations) + " Clients " +band+ "-File Upload Times(secs)"
|
||||
graph_description = graph_description + "Out of " + str(data["num_stations"]) + " clients, " + str(
|
||||
data["num_stations"] - data["time"].count(0)) + " are able to upload " + "within " +str(data["duration"]) + " min."
|
||||
count = count + 1
|
||||
if count == 2:
|
||||
graph_name = "File size "+ size +" " + str(num_stations) + " Clients " +band+ "-File Download and Upload Times(secs)"
|
||||
if len(dict_of_graph) != 0:
|
||||
fig, ax = plt.subplots()
|
||||
bar_plot(ax, x_axis, dict_of_graph, total_width=.8, single_width=.9, colors=color)
|
||||
my_dpi = 96
|
||||
figure = plt.gcf() # get current figure
|
||||
figure.set_size_inches(18, 6)
|
||||
|
||||
# when saving, specify the DPI
|
||||
plt.savefig(graph_path + "/image"+band+size+".png", dpi=my_dpi)
|
||||
return str(graph_html(graph_path + "/image"+band+size+".png", graph_name,graph_description))
|
||||
else:
|
||||
return ""
|
||||
def input_setup_info_table(input_setup_info=None):
|
||||
if input_setup_info is None:
|
||||
return None
|
||||
else:
|
||||
var = ""
|
||||
for i in input_setup_info:
|
||||
var = var + "<tr><td>" + i + "</td><td colspan='3'>" + str(input_setup_info[i]) + "</td></tr>"
|
||||
|
||||
setup_information = """
|
||||
<!-- Test Setup Information -->
|
||||
<table width='700px' border='1' cellpadding='2' cellspacing='0' style='border-top-color: gray; border-top-style: solid; border-top-width: 1px; border-right-color: gray; border-right-style: solid; border-right-width: 1px; border-bottom-color: gray; border-bottom-style: solid; border-bottom-width: 1px; border-left-color: gray; border-left-style: solid; border-left-width: 1px'>
|
||||
<tr>
|
||||
<th colspan='2'>Input Setup Information</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Information</td>
|
||||
<td>
|
||||
<table width='100%' border='0' cellpadding='2' cellspacing='0' style='border-top-color: gray; border-top-style: solid; border-top-width: 1px; border-right-color: gray; border-right-style: solid; border-right-width: 1px; border-bottom-color: gray; border-bottom-style: solid; border-bottom-width: 1px; border-left-color: gray; border-left-style: solid; border-left-width: 1px'>
|
||||
""" + str(var) + """
|
||||
</table>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
<br>
|
||||
"""
|
||||
return str(setup_information)
|
||||
|
||||
|
||||
def generate_report(result_data=None,
|
||||
date=None,
|
||||
test_setup_info={},
|
||||
input_setup_info={},
|
||||
graph_path="/home/lanforge/html-reports/FTP-Test"):
|
||||
# Need to pass this to test_setup_information()
|
||||
input_setup_info = input_setup_info
|
||||
test_setup_data = test_setup_info
|
||||
x_axis = []
|
||||
num_stations = result_data[1]["num_stations"]
|
||||
for i in range(1, num_stations + 1, 1):
|
||||
x_axis.append(i)
|
||||
column_head = []
|
||||
rows_head = []
|
||||
bands = result_data[1]["bands"]
|
||||
file_sizes = result_data[1]["file_sizes"]
|
||||
directions = result_data[1]["directions"]
|
||||
|
||||
for size in file_sizes:
|
||||
for direction in directions:
|
||||
column_head.append(size + " " + direction)
|
||||
for band in bands:
|
||||
if band != "Both":
|
||||
rows_head.append(str(num_stations) + " Clients-" + band)
|
||||
else:
|
||||
rows_head.append(str(num_stations // 2) + "+" + str(num_stations // 2) + " Clients-2.4G+5G")
|
||||
|
||||
reports_root = graph_path + "/" + str(date)
|
||||
if path.exists(graph_path):
|
||||
os.mkdir(reports_root)
|
||||
print("Reports Root is Created")
|
||||
|
||||
else:
|
||||
os.mkdir(graph_path)
|
||||
os.mkdir(reports_root)
|
||||
print("Reports Root is created")
|
||||
print("Generating Reports in : ", reports_root)
|
||||
|
||||
html_report = report_banner(date) + \
|
||||
test_setup_information(test_setup_data) + \
|
||||
test_objective() + \
|
||||
pass_fail_description() + \
|
||||
add_pass_fail_table(result_data, rows_head, column_head) + \
|
||||
download_upload_time_description() + \
|
||||
download_upload_time_table(result_data, rows_head, column_head)
|
||||
|
||||
for b in bands:
|
||||
for size in file_sizes:
|
||||
html_report = html_report + \
|
||||
generate_graph(result_data, x_axis, b, size, graph_path=reports_root)
|
||||
|
||||
html_report = html_report + input_setup_info_table(input_setup_info)
|
||||
|
||||
# write the html_report into a file in /home/lanforge/html_reports in a directory named FTP-Test and html_report name should be having a timesnap with it
|
||||
f = open(reports_root + "/report.html", "a")
|
||||
# f = open("report.html", "a")
|
||||
f.write(html_report)
|
||||
f.close()
|
||||
# write logic to generate pdf here
|
||||
pdfkit.from_file(reports_root + "/report.html", reports_root + "/report.pdf")
|
||||
|
||||
|
||||
# test blocks from here
|
||||
if __name__ == '__main__':
|
||||
generate_report()
|
||||
516
py-scripts/lf_ftp_test.py
Normal file
516
py-scripts/lf_ftp_test.py
Normal file
@@ -0,0 +1,516 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
""" ftp_test.py will create stations and endpoints to generate and verify layer-4 traffic over an ftp connection.
|
||||
find out download/upload time of each client according to file size.
|
||||
This script will monitor the bytes-rd attribute of the endpoints.
|
||||
Use './ftp_test.py --help' to see command line usage and options
|
||||
-Jitendrakumar Kushavah
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
License: Free to distribute and modify. LANforge systems must be licensed.
|
||||
"""
|
||||
import sys
|
||||
from ftp_html import *
|
||||
import paramiko
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append('../py-json')
|
||||
|
||||
from LANforge import LFUtils
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge.LFUtils import *
|
||||
import realm
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
import time
|
||||
import os
|
||||
|
||||
class ftp_test(LFCliBase):
|
||||
def __init__(self, lfclient_host="localhost", lfclient_port=8080, sta_prefix="sta", start_id=0, num_sta= None,
|
||||
dut_ssid=None,dut_security=None, dut_passwd=None, file_size=None, band=None,
|
||||
upstream="eth1",_debug_on=False, _exit_on_error=False, _exit_on_fail=False, direction= None):
|
||||
super().__init__(lfclient_host, lfclient_port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
|
||||
print("Test is about to start")
|
||||
self.host = lfclient_host
|
||||
self.port = lfclient_port
|
||||
#self.radio = radio
|
||||
self.upstream = upstream
|
||||
self.sta_prefix = sta_prefix
|
||||
self.sta_start_id = start_id
|
||||
self.num_sta = num_sta
|
||||
self.ssid = dut_ssid
|
||||
self.security = dut_security
|
||||
self.password = dut_passwd
|
||||
self.requests_per_ten = 1
|
||||
self.band=band
|
||||
self.file_size=file_size
|
||||
self.direction=direction
|
||||
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
||||
self.station_profile = self.local_realm.new_station_profile()
|
||||
self.cx_profile = self.local_realm.new_http_profile()
|
||||
self.port_util = realm.PortUtils(self.local_realm)
|
||||
self.cx_profile.requests_per_ten = self.requests_per_ten
|
||||
|
||||
print("Test is Initialized")
|
||||
|
||||
def set_values(self):
|
||||
#This method will set values according user input
|
||||
|
||||
if self.band == "5G":
|
||||
self.radio = ["wiphy2"] # need to pass in the radios
|
||||
if self.file_size == "2MB":
|
||||
|
||||
#providing time duration for Pass or fail criteria
|
||||
self.duration = self.convert_min_in_time(1)
|
||||
elif self.file_size == "500MB":
|
||||
self.duration = self.convert_min_in_time(1) # 30
|
||||
elif self.file_size == "1000MB":
|
||||
self.duration = self.convert_min_in_time(1) # 50
|
||||
else:
|
||||
self.duration = self.convert_min_in_time(1) # 10
|
||||
elif self.band == "2.4G":
|
||||
self.radio = ["wiphy0"] # need to pass in the radios
|
||||
if self.file_size == "2MB":
|
||||
self.duration = self.convert_min_in_time(1) # 2
|
||||
elif self.file_size == "500MB":
|
||||
self.duration = self.convert_min_in_time(1) # 60
|
||||
elif self.file_size == "1000MB":
|
||||
self.duration = self.convert_min_in_time(1) # 80
|
||||
else:
|
||||
self.duration = self.convert_min_in_time(1) # 10
|
||||
elif self.band == "Both":
|
||||
self.radio = ["wiphy2", "wiphy0"] # need to pass in the radios
|
||||
|
||||
#if Both then number of stations are half for 2.4G and half for 5G
|
||||
self.num_sta = self.num_sta // 2
|
||||
print(self.num_sta)
|
||||
if self.file_size == "2MB":
|
||||
self.duration = self.convert_min_in_time(1) # 2
|
||||
elif self.file_size == "500MB":
|
||||
self.duration = self.convert_min_in_time(1) # 60
|
||||
elif self.file_size == "1000MB":
|
||||
self.duration = self.convert_min_in_time(1) # 80
|
||||
else:
|
||||
self.duration = self.convert_min_in_time(1) # 10
|
||||
|
||||
self.file_size_bytes=int(self.convert_file_size_in_Bytes(self.file_size))
|
||||
|
||||
|
||||
def precleanup(self):
|
||||
self.count=0
|
||||
|
||||
#delete everything in the GUI before starting the script
|
||||
try:
|
||||
self.local_realm.load("BLANK")
|
||||
except:
|
||||
print("Couldn't load 'BLANK' Test configurations")
|
||||
|
||||
for rad in self.radio:
|
||||
if rad == "wiphy2":
|
||||
|
||||
#select mode(All stations will connects to 5G)
|
||||
self.station_profile.mode = 10
|
||||
self.count=self.count+1
|
||||
|
||||
elif rad == "wiphy0": # This probably is not the best selection mode
|
||||
|
||||
# select mode(All stations will connects to 2.4G)
|
||||
self.station_profile.mode = 6
|
||||
self.count = self.count + 1
|
||||
|
||||
#check Both band if both band then for 2.4G station id start with 20
|
||||
if self.count == 2:
|
||||
self.sta_start_id = self.num_sta
|
||||
self.num_sta = 2 * (self.num_sta)
|
||||
|
||||
#if Both band then first 20 stations will connects to 5G
|
||||
self.station_profile.mode = 10
|
||||
self.cx_profile.cleanup()
|
||||
|
||||
#create station list with sta_id 20
|
||||
self.station_list1 = LFUtils.portNameSeries(prefix_=self.sta_prefix, start_id_=self.sta_start_id,
|
||||
end_id_=self.num_sta - 1, padding_number_=10000,
|
||||
radio=rad)
|
||||
|
||||
#cleanup station list which started sta_id 20
|
||||
self.station_profile.cleanup(self.station_list1, debug_=self.debug)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url,
|
||||
port_list=self.station_list,
|
||||
debug=self.debug)
|
||||
return
|
||||
#clean layer4 ftp traffic
|
||||
self.cx_profile.cleanup()
|
||||
self.station_list = LFUtils.portNameSeries(prefix_=self.sta_prefix, start_id_=self.sta_start_id,
|
||||
end_id_=self.num_sta - 1, padding_number_=10000,
|
||||
radio=rad)
|
||||
|
||||
#cleans stations
|
||||
self.station_profile.cleanup(self.station_list , delay=1, debug_=self.debug)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url,
|
||||
port_list=self.station_list,
|
||||
debug=self.debug)
|
||||
time.sleep(1)
|
||||
|
||||
print("precleanup done")
|
||||
|
||||
def build(self):
|
||||
|
||||
#set ftp
|
||||
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream)[2], resource=1, on=True)
|
||||
|
||||
for rad in self.radio:
|
||||
|
||||
#station build
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.set_number_template("00")
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
self.station_profile.create(radio=rad, sta_names_=self.station_list, debug=self.debug)
|
||||
self.local_realm.wait_until_ports_appear(sta_list=self.station_list)
|
||||
self.station_profile.admin_up()
|
||||
if self.local_realm.wait_for_ip(self.station_list):
|
||||
self._pass("All stations got IPs")
|
||||
else:
|
||||
self._fail("Stations failed to get IPs")
|
||||
exit(1)
|
||||
#building layer4
|
||||
self.cx_profile.direction ="dl"
|
||||
self.cx_profile.dest = "/dev/null"
|
||||
print('DIRECTION',self.direction)
|
||||
|
||||
if self.direction == "Download":
|
||||
self.cx_profile.create(ports=self.station_profile.station_names, ftp_ip="10.40.0.1/ftp_test.txt",
|
||||
sleep_time=.5,debug_=self.debug,suppress_related_commands_=True, ftp=True, user="lanforge",
|
||||
passwd="lanforge", source="")
|
||||
|
||||
|
||||
elif self.direction == "Upload":
|
||||
dict_sta_and_ip = {}
|
||||
|
||||
#data from GUI for find out ip addr of each station
|
||||
data = self.json_get("ports/list?fields=IP")
|
||||
|
||||
# This loop for find out proper ip addr and station name
|
||||
for i in self.station_list:
|
||||
for j in data['interfaces']:
|
||||
for k in j:
|
||||
if i == k:
|
||||
dict_sta_and_ip[k] = j[i]['ip']
|
||||
|
||||
|
||||
|
||||
#list of ip addr of all stations
|
||||
ip = list(dict_sta_and_ip.values())
|
||||
|
||||
eth_list = []
|
||||
client_list = []
|
||||
|
||||
#list of all stations
|
||||
for i in range(len(self.station_list)):
|
||||
client_list.append(self.station_list[i][4:])
|
||||
|
||||
#list of upstream port
|
||||
eth_list.append(self.upstream)
|
||||
|
||||
#create layer for connection for upload
|
||||
for client_num in range(len(self.station_list)):
|
||||
self.cx_profile.create(ports=eth_list, ftp_ip=ip[client_num] + "/ftp_test.txt", sleep_time=.5,
|
||||
debug_=self.debug, suppress_related_commands_=True, ftp=True,
|
||||
user="lanforge", passwd="lanforge",
|
||||
source="", upload_name=client_list[client_num])
|
||||
|
||||
#check Both band present then build stations with another station list
|
||||
if self.count == 2:
|
||||
self.station_list = self.station_list1
|
||||
|
||||
# if Both band then another 20 stations will connects to 2.4G
|
||||
self.station_profile.mode = 6
|
||||
print("Test Build done")
|
||||
|
||||
def start(self, print_pass=False, print_fail=False):
|
||||
for rad in self.radio:
|
||||
self.cx_profile.start_cx()
|
||||
|
||||
print("Test Started")
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.cx_profile.stop_cx()
|
||||
self.station_profile.admin_down()
|
||||
|
||||
def postcleanup(self):
|
||||
self.cx_profile.cleanup()
|
||||
self.local_realm.load("BLANK")
|
||||
self.station_profile.cleanup(self.station_profile.station_names, delay=1, debug_=self.debug)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
|
||||
debug=self.debug)
|
||||
|
||||
#Create file for given file size
|
||||
def file_create(self):
|
||||
if os.path.isfile("/home/lanforge/ftp_test.txt"):
|
||||
os.remove("/home/lanforge/ftp_test.txt")
|
||||
os.system("fallocate -l " +self.file_size +" /home/lanforge/ftp_test.txt")
|
||||
print("File creation done", self.file_size)
|
||||
|
||||
#convert file size MB or GB into Bytes
|
||||
def convert_file_size_in_Bytes(self,size):
|
||||
if (size.endswith("MB")) or (size.endswith("Mb")) or (size.endswith("GB")) or (size.endswith("Gb")):
|
||||
if (size.endswith("MB")) or (size.endswith("Mb")):
|
||||
return float(size[:-2]) * 10**6
|
||||
elif (size.endswith("GB")) or (size.endswith("Gb")):
|
||||
return float(size[:-2]) * 10**9
|
||||
|
||||
|
||||
def my_monitor(self,time1):
|
||||
#data in json format
|
||||
data = self.json_get("layer4/list?fields=bytes-rd")
|
||||
|
||||
print("layer4/list?fields=bytes-read: {}".format(data))
|
||||
|
||||
|
||||
#list of layer 4 connections name
|
||||
self.data1 = []
|
||||
for i in range(self.num_sta):
|
||||
if self.num_sta == 1:
|
||||
# the station num is 1 , yet the station is 0
|
||||
print("i: {} self.num_sta: {}".format(i,self.num_sta))
|
||||
print("data['endpoint'][{}]: {}".format(i,data['endpoint']))
|
||||
print("data list: {}".format((str(list(data['endpoint'].keys())))[2:-2]))
|
||||
print("data list: {}".format((str(list(data['endpoint'].keys())))[2:-2]))
|
||||
#self.data1.append((str(list(data['endpoint']['name']))))
|
||||
self.data1.append((str((data['endpoint']['name']))))
|
||||
|
||||
else:
|
||||
# the station num is 1 , yet the station is 0
|
||||
print("i: {} self.num_sta: {}".format(i,self.num_sta))
|
||||
print("data['endpoint'][{}]: {}".format(i,data['endpoint'][i]))
|
||||
print("data list: {}".format((str(list(data['endpoint'][i].keys())))[2:-2]))
|
||||
self.data1.append((str(list(data['endpoint'][i].keys())))[2:-2])
|
||||
|
||||
data2 = self.data1
|
||||
print("data1: {}".format(self.data1))
|
||||
print("data2: {}".format(data2))
|
||||
list_of_time = []
|
||||
list1 = []
|
||||
list2 = []
|
||||
|
||||
for i in range(self.num_sta):
|
||||
list_of_time.append(0)
|
||||
while list_of_time.count(0) != 0:
|
||||
|
||||
#run script upto given time
|
||||
if str(datetime.now()- time1) >= self.duration:
|
||||
break
|
||||
|
||||
for i in range(self.num_sta):
|
||||
data = self.json_get("layer4/list?fields=bytes-rd")
|
||||
#print("data from bytes-rd: {}".format(data))
|
||||
|
||||
if self.num_sta == 1:
|
||||
#reading uc-avg data in json format
|
||||
uc_avg= self.json_get("layer4/list?fields=uc-avg")
|
||||
#print("layer4/list?fields=uc-avg: {}".format(uc_avg))
|
||||
if data['endpoint']['bytes-rd'] <= self.file_size_bytes:
|
||||
data = self.json_get("layer4/list?fields=bytes-rd")
|
||||
if data['endpoint']['bytes-rd'] >= self.file_size_bytes:
|
||||
list1.append(i)
|
||||
if list1.count(i) == 1:
|
||||
list2.append(i)
|
||||
list1 = list2
|
||||
|
||||
#stop station after download or upload file with particular size
|
||||
self.json_post("/cli-json/set_cx_state", {
|
||||
"test_mgr": "default_tm",
|
||||
"cx_name": "CX_" + data2[0],
|
||||
"cx_state": "STOPPED"
|
||||
}, debug_=self.debug)
|
||||
|
||||
list_of_time[i] = round(int(uc_avg['endpoint']['uc-avg'])/1000,1)
|
||||
else:
|
||||
#reading uc-avg data in json format
|
||||
uc_avg= self.json_get("layer4/list?fields=uc-avg")
|
||||
if data['endpoint'][i][data2[i]]['bytes-rd'] <= self.file_size_bytes:
|
||||
data = self.json_get("layer4/list?fields=bytes-rd")
|
||||
if data['endpoint'][i][data2[i]]['bytes-rd'] >= self.file_size_bytes:
|
||||
list1.append(i)
|
||||
if list1.count(i) == 1:
|
||||
list2.append(i)
|
||||
list1 = list2
|
||||
|
||||
#stop station after download or upload file with particular size
|
||||
self.json_post("/cli-json/set_cx_state", {
|
||||
"test_mgr": "default_tm",
|
||||
"cx_name": "CX_" + data2[i],
|
||||
"cx_state": "STOPPED"
|
||||
}, debug_=self.debug)
|
||||
|
||||
list_of_time[i] = round(int(uc_avg['endpoint'][i][data2[i]]['uc-avg'])/1000,1)
|
||||
time.sleep(0.5)
|
||||
|
||||
#return list of download/upload time in seconds
|
||||
return list_of_time
|
||||
|
||||
#Method for arrange ftp download/upload time data in dictionary
|
||||
def ftp_test_data(self, list_time, pass_fail, bands, file_sizes, directions, num_stations):
|
||||
|
||||
#creating dictionary for single iteration
|
||||
create_dict={}
|
||||
|
||||
create_dict["band"] = self.band
|
||||
create_dict["direction"] = self.direction
|
||||
create_dict["file_size"] = self.file_size
|
||||
create_dict["time"] = list_time
|
||||
create_dict["duration"] = self.time_test
|
||||
create_dict["result"] = pass_fail
|
||||
create_dict["bands"] = bands
|
||||
create_dict["file_sizes"] = file_sizes
|
||||
create_dict["directions"] = directions
|
||||
create_dict["num_stations"] = num_stations
|
||||
|
||||
return create_dict
|
||||
|
||||
#Method for AP reboot
|
||||
def ap_reboot(self, ip, user, pswd):
|
||||
|
||||
print("starting AP reboot")
|
||||
|
||||
# creating shh client object we use this object to connect to router
|
||||
ssh = paramiko.SSHClient()
|
||||
|
||||
# automatically adds the missing host key
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
ssh.connect(ip, port=22, username=user, password=pswd, banner_timeout=600)
|
||||
stdin, stdout, stderr = ssh.exec_command('reboot')
|
||||
output = stdout.readlines()
|
||||
ssh.close()
|
||||
# print('\n'.join(output))
|
||||
|
||||
time.sleep(180)
|
||||
print("AP rebooted")
|
||||
|
||||
def convert_min_in_time(self,total_minutes):
|
||||
|
||||
#saving time in minutus
|
||||
self.time_test = total_minutes
|
||||
|
||||
# Get hours with floor division
|
||||
hours = total_minutes // 60
|
||||
|
||||
# Get additional minutes with modulus
|
||||
minutes = total_minutes % 60
|
||||
|
||||
# Create time as a string
|
||||
time_string = str("%d:%02d" % (divmod(total_minutes, 60))) + ":00" + ":000000"
|
||||
|
||||
return time_string
|
||||
|
||||
def pass_fail_check(self,time_list):
|
||||
if time_list.count(0) == 0:
|
||||
return "Pass"
|
||||
else:
|
||||
return "Fail"
|
||||
|
||||
def main():
|
||||
# This has --mgr, --mgr_port and --debug
|
||||
parser = LFCliBase.create_bare_argparse(prog="netgear-ftp", formatter_class=argparse.RawTextHelpFormatter, epilog="About This Script")
|
||||
# Adding More Arguments for custom use
|
||||
parser.add_argument('--ssid',type=str, help='--ssid', default="TestAP-Jitendra")
|
||||
parser.add_argument('--passwd',type=str, help='--passwd', default="BLANK")
|
||||
parser.add_argument('--security', type=str, help='--security', default="open")
|
||||
#parser.add_argument('--radios',nargs="+",help='--radio to use on LANforge for 5G and 2G', default=["wiphy0","wiphy1"])
|
||||
|
||||
# Test variables
|
||||
parser.add_argument('--bands', nargs="+", help='--bands defaults ["5G","2.4G","Both"]', default=["5G","2.4G","Both"])
|
||||
parser.add_argument('--directions', nargs="+", help='--directions defaults ["Download","Upload"]', default=["Download","Upload"])
|
||||
parser.add_argument('--file_sizes', nargs="+", help='--File Size defaults ["2MB","500MB","1000MB"]', default=["2MB","500MB","1000MB"])
|
||||
parser.add_argument('--num_stations', type=int, help='--num_client is number of stations', default=40)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# 1st time stamp for test duration
|
||||
time_stamp1 = datetime.now()
|
||||
|
||||
#use for creating ftp_test dictionary
|
||||
iteraration_num=0
|
||||
|
||||
#empty dictionary for whole test data
|
||||
ftp_data={}
|
||||
|
||||
#For all combinations ftp_data of directions, file size and client counts, run the test
|
||||
for band in args.bands:
|
||||
for direction in args.directions:
|
||||
for file_size in args.file_sizes:
|
||||
# Start Test
|
||||
obj = ftp_test(lfclient_host=args.mgr,
|
||||
lfclient_port=args.mgr_port,
|
||||
dut_ssid=args.ssid,
|
||||
dut_passwd=args.passwd,
|
||||
dut_security=args.security,
|
||||
num_sta= args.num_stations,
|
||||
band=band,
|
||||
file_size=file_size,
|
||||
direction=direction
|
||||
)
|
||||
|
||||
iteraration_num=iteraration_num+1
|
||||
obj.file_create()
|
||||
obj.set_values()
|
||||
obj.precleanup()
|
||||
#if file_size != "2MB":
|
||||
#obj.ap_reboot("192.168.213.190","root","Password@123xzsawq@!")
|
||||
obj.build()
|
||||
if not obj.passes():
|
||||
print(obj.get_fail_message())
|
||||
exit(1)
|
||||
|
||||
#First time stamp
|
||||
time1 = datetime.now()
|
||||
|
||||
obj.start(False, False)
|
||||
|
||||
#return list of download/upload completed time stamp
|
||||
time_list = obj.my_monitor(time1)
|
||||
|
||||
# check pass or fail
|
||||
pass_fail = obj.pass_fail_check(time_list)
|
||||
|
||||
#dictionary of whole data
|
||||
ftp_data[iteraration_num] = obj.ftp_test_data(time_list,pass_fail,args.bands,args.file_sizes,args.directions,args.num_stations)
|
||||
|
||||
obj.stop()
|
||||
obj.postcleanup()
|
||||
|
||||
#2nd time stamp for test duration
|
||||
time_stamp2 = datetime.now()
|
||||
|
||||
#total time for test duration
|
||||
test_duration = str(time_stamp2 - time_stamp1)[:-7]
|
||||
|
||||
print("FTP Test Data", ftp_data)
|
||||
|
||||
date = str(datetime.now()).split(",")[0].replace(" ", "-").split(".")[0]
|
||||
|
||||
test_setup_info = {
|
||||
"AP Name": "vap5",
|
||||
"SSID": args.ssid,
|
||||
"Number of Stations": args.num_stations,
|
||||
"Test Duration": test_duration
|
||||
}
|
||||
|
||||
input_setup_info = {
|
||||
"IP": "192.168.213.190" ,
|
||||
"user": "root",
|
||||
"Contact": "support@candelatech.com"
|
||||
}
|
||||
generate_report(ftp_data,
|
||||
date,
|
||||
test_setup_info,
|
||||
input_setup_info,
|
||||
graph_path="/home/lanforge/html-reports/FTP-Test")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user