lf_csv added to py-scripts

This commit is contained in:
anil-tegala
2021-07-13 23:46:23 +05:30
parent bcbaff86e6
commit 02d937d1ea

View File

@@ -1,847 +0,0 @@
#!/usr/bin/env python3
"""
NAME: throughput_qos.py
PURPOSE: throughput_qos.py will create stations and endpoints which evaluates l3 traffic for a particular type of service.
EXAMPLE:
python3 throughput_qos.py --mgr 192.168.200.37 --mgr_port 8080 -u eth1 --num_stations 1
--radio wiphy1 --ssid TestAP5-71 --passwd lanforge --security wpa2 --mode 11 --a_min 1000000 --b_min 1000000 --traffic_type lf_udp
python3 throughput_qos.py --num_stations 1 --radio wiphy1 --ssid ct523c-vap --passwd ct523c-vap --security wpa2 --mode 11 --a_min 1000000 --b_min 1000000 --traffic_type lf_udp
Use './throughput_qos.py --help' to see command line usage and options
Copyright 2021 Candela Technologies Inc
License: Free to distribute and modify. LANforge systems must be licensed.
"""
import sys
import os
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pandas as pd
import pdfkit
from lf_report import lf_report
from lf_graph import lf_bar_graph
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
import argparse
from LANforge import LFUtils
from realm import Realm
import time
import datetime
class ThroughputQOS(Realm):
def __init__(self,
tos,
ssid=None,
security=None,
password=None,
ssid_2g=None,
security_2g=None,
password_2g=None,
ssid_5g=None,
security_5g=None,
password_5g=None,
sta_list=[],
create_sta=True,
name_prefix=None,
upstream=None,
radio_2g="wiphy0",
radio_5g="wiphy1",
host="localhost",
port=8080,
mode=0,
ap=None,
ap_name="",
traffic_type=None,
side_a_min_rate=56, side_a_max_rate=0,
side_b_min_rate=56, side_b_max_rate=0,
number_template="00000",
test_duration="2m",
bands="2.4G, 5G, BOTH",
test_case={},
use_ht160=False,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(lfclient_host=host,
lfclient_port=port),
self.upstream = upstream
self.host = host
self.port = port
self.ssid = ssid
self.security = security
self.password = password
self.ssid_2g = ssid_2g
self.security_2g = security_2g
self.password_2g = password_2g
self.ssid_5g = ssid_5g
self.security_5g = security_5g
self.password_5g = password_5g
self.radio_2g = radio_2g
self.radio_5g = radio_5g
self.sta_list = sta_list
self.create_sta = create_sta
self.mode = mode
self.ap = ap
self.ap_name = ap_name
self.traffic_type = traffic_type
self.tos = tos.split(",")
self.bands = bands.split(",")
self.test_case = test_case
self.number_template = number_template
self.debug = _debug_on
self.name_prefix = name_prefix
self.test_duration = test_duration
self.station_profile = self.new_station_profile()
self.cx_profile = self.new_l3_cx_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.debug = self.debug
self.station_profile.use_ht160 = use_ht160
if self.station_profile.use_ht160:
self.station_profile.mode = 9
# self.station_profile.mode = mode
if self.ap is not None:
self.station_profile.set_command_param("add_sta", "ap", self.ap)
self.cx_profile.host = self.host
self.cx_profile.port = self.port
self.cx_profile.name_prefix = self.name_prefix
self.cx_profile.side_a_min_bps = side_a_min_rate
self.cx_profile.side_a_max_bps = side_a_max_rate
self.cx_profile.side_b_min_bps = side_b_min_rate
self.cx_profile.side_b_max_bps = side_b_max_rate
def start(self, print_pass=False, print_fail=False):
self.cx_profile.start_cx()
def stop(self):
self.cx_profile.stop_cx()
self.station_profile.admin_down()
def pre_cleanup(self):
self.cx_profile.cleanup_prefix()
if self.create_sta:
for sta in self.sta_list:
self.rm_port(sta, check_exists=True)
def cleanup(self):
self.cx_profile.cleanup()
if self.create_sta:
self.station_profile.cleanup()
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=self.station_profile.station_names,
debug=self.debug)
def build(self):
for key in self.bands:
if self.create_sta:
if key == "2.4G" or key == "2.4g":
self.station_profile.mode = 11
if self.ssid is None:
self.station_profile.use_security(self.security_2g, self.ssid_2g, self.password_2g)
else:
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio_2g, sta_names_=self.sta_list, debug=self.debug)
if key == "5G" or key == "5g":
self.station_profile.mode = 9
if self.ssid is None:
self.station_profile.use_security(self.security_5g, self.ssid_5g, self.password_5g)
else:
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio_5g, sta_names_=self.sta_list, debug=self.debug)
if key == "BOTH" or key == "both":
split = len(self.sta_list) // 2
if self.ssid is None:
self.station_profile.use_security(self.security_2g, self.ssid_2g, self.password_2g)
else:
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.mode = 11
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio_2g, sta_names_=self.sta_list[:split],
debug=self.debug)
if self.ssid is None:
self.station_profile.use_security(self.security_5g, self.ssid_5g, self.password_5g)
else:
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.mode = 9
self.station_profile.set_number_template(self.number_template)
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio_5g, sta_names_=self.sta_list[split:],
debug=self.debug)
self.station_profile.admin_up()
# check here if upstream port got IP
temp_stations = self.station_profile.station_names.copy()
if self.wait_for_ip(temp_stations):
self._pass("All stations got IPs")
else:
self._fail("Stations failed to get IPs")
self.exit_fail()
self._pass("PASS: Station build finished")
self.create_cx()
print("cx build finished")
def create_cx(self):
_tos = "BK,BE,VI,VO"
self.tos = _tos.split(",")
print("tos: {}".format(self.tos))
for ip_tos in self.tos:
print("## ip_tos: {}".format(ip_tos))
print("Creating connections for endpoint type: %s TOS: %s cx-count: %s" % (
self.traffic_type, ip_tos, self.cx_profile.get_cx_count()))
self.cx_profile.create(endp_type=self.traffic_type, side_a=self.sta_list,
side_b=self.upstream,
sleep_time=0, tos=ip_tos)
print("cross connections with TOS type created.")
def evaluate_qos(self):
case = ""
tos_download = {'video': [], 'voice': [], 'bk': [], 'be': []}
tx_b = {'bk': [], 'be': [], 'video': [], 'voice': []}
rx_a = {'bk': [], 'be': [], 'video': [], 'voice': []}
delay = {'bk': [], 'be': [], 'video': [], 'voice': []}
pkt_loss = {}
tx_endps = {}
rx_endps = {}
if int(self.cx_profile.side_b_min_bps) != 0:
case = str(int(self.cx_profile.side_b_min_bps) // 1000000)
elif int(self.cx_profile.side_a_min_bps) != 0:
case = str(int(self.cx_profile.side_a_min_bps) // 1000000)
if len(self.cx_profile.created_cx.keys()) > 0:
endp_data = self.json_get('endp/all?fields=name,tx+pkts+ll,rx+pkts+ll,delay')
endp_data.pop("handler")
endp_data.pop("uri")
endps = endp_data['endpoint']
for i in range(len(endps)):
if i < len(endps) // 2:
tx_endps.update(endps[i])
if i >= len(endps) // 2:
rx_endps.update(endps[i])
for sta in self.cx_profile.created_cx.keys():
temp = int(sta[12:])
if temp % 4 == 0:
if int(self.cx_profile.side_b_min_bps) != 0:
tos_download['bk'].append(float(
f"{list((self.json_get('/cx/%s?fields=bps+rx+a' % sta)).values())[2]['bps rx a'] / 1000000:.2f}"))
tx_b['bk'].append(int(f"{tx_endps['%s-B' % sta]['tx pkts ll']}"))
rx_a['bk'].append(int(f"{rx_endps['%s-A' % sta]['rx pkts ll']}"))
delay['bk'].append(int(f"{rx_endps['%s-A' % sta]['delay']}"))
else:
tos_download['bk'].append(float(0))
tx_b['bk'].append(int(0))
rx_a['bk'].append(int(0))
delay['bk'].append(int(0))
elif temp % 4 == 1:
if int(self.cx_profile.side_b_min_bps) != 0:
tos_download['be'].append(float(
f"{list((self.json_get('/cx/%s?fields=bps+rx+a' % sta)).values())[2]['bps rx a'] / 1000000:.2f}"))
tx_b['be'].append(int(f"{tx_endps['%s-B' % sta]['tx pkts ll']}"))
rx_a['be'].append(int(f"{rx_endps['%s-A' % sta]['rx pkts ll']}"))
delay['be'].append(int(f"{rx_endps['%s-A' % sta]['delay']}"))
else:
tos_download['be'].append(float(0))
tx_b['be'].append(int(0))
rx_a['be'].append(int(0))
delay['be'].append(int(0))
elif temp % 4 == 2:
if int(self.cx_profile.side_b_min_bps) != 0:
tos_download['voice'].append(float(
f"{list((self.json_get('/cx/%s?fields=bps+rx+a' % sta)).values())[2]['bps rx a'] / 1000000:.2f}"))
tx_b['voice'].append(int(f"{tx_endps['%s-B' % sta]['tx pkts ll']}"))
rx_a['voice'].append(int(f"{rx_endps['%s-A' % sta]['rx pkts ll']}"))
delay['voice'].append(int(f"{rx_endps['%s-A' % sta]['delay']}"))
else:
tos_download['voice'].append(float(0))
tx_b['voice'].append(int(0))
rx_a['voice'].append(int(0))
delay['voice'].append(int(0))
elif temp % 4 == 3:
if int(self.cx_profile.side_b_min_bps) != 0:
tos_download['video'].append(float(
f"{list((self.json_get('/cx/%s?fields=bps+rx+a' % sta)).values())[2]['bps rx a'] / 1000000:.2f}"))
tx_b['video'].append(int(f"{tx_endps['%s-B' % sta]['tx pkts ll']}"))
rx_a['video'].append(int(f"{rx_endps['%s-A' % sta]['rx pkts ll']}"))
delay['video'].append(int(f"{rx_endps['%s-A' % sta]['delay']}"))
else:
tos_download['video'].append(float(0))
tx_b['video'].append(int(0))
rx_a['video'].append(int(0))
delay['video'].append(int(0))
tos_download.update({"bkQOS": float(f"{sum(tos_download['bk']):.2f}")})
tos_download.update({"beQOS": float(f"{sum(tos_download['be']):.2f}")})
tos_download.update({"videoQOS": float(f"{sum(tos_download['video']):.2f}")})
tos_download.update({"voiceQOS": float(f"{sum(tos_download['voice']):.2f}")})
tos_download.update({"bkDELAY": float(f"{sum(delay['bk']) / 1000:.2f}")})
tos_download.update({"beDELAY": float(f"{sum(delay['be']) / 1000:.2f}")})
tos_download.update({"videoDELAY": float(f"{sum(delay['video']) / 1000:.2f}")})
tos_download.update({"voiceDELAY": float(f"{sum(delay['voice']) / 1000:.2f}")})
if sum(tx_b['bk']) != 0 or sum(tx_b['be']) != 0 or sum(tx_b['video']) != 0 or sum(tx_b['voice']) != 0:
tos_download.update(
{"bkLOSS": float(f"{((sum(tx_b['bk']) - sum(rx_a['bk'])) / sum(tx_b['bk'])) * 100:.2f}")})
tos_download.update(
{"beLOSS": float(f"{((sum(tx_b['be']) - sum(rx_a['be'])) / sum(tx_b['be'])) * 100:.2f}")})
tos_download.update({"videoLOSS": float(
f"{((sum(tx_b['video']) - sum(rx_a['video'])) / sum(tx_b['video'])) * 100:.2f}")})
tos_download.update({"voiceLOSS": float(
f"{((sum(tx_b['voice']) - sum(rx_a['voice'])) / sum(tx_b['voice'])) * 100:.2f}")})
tos_download.update({'tx_b': tx_b})
tos_download.update({'rx_a': rx_a})
tos_download.update({'delay': delay})
else:
print("no RX values available to evaluate QOS")
key = case + " " + "Mbps"
return {key: tos_download}
def set_report_data(self, data):
print(data)
res = {}
if data is not None:
for i in range(len(data)):
res.update({self.test_case[i]: data[i]})
else:
print("No Data found to generate report!")
exit(1)
if self.test_case is not None:
table_df = {}
num_stations = []
throughput = []
graph_df = {}
for case in self.test_case:
throughput_df = [[], [], [], []]
pkt_loss_df = [[], [], [], []]
latency_df = [[], [], [], []]
if case == "2.4g" or case == "2.4G":
num_stations.append("{} - bgn-AC".format(str(len(self.sta_list))))
elif case == "5g" or case == "5G":
num_stations.append("{} - an-AC".format(str(len(self.sta_list))))
elif case == "both" or case == "BOTH":
num_stations.append(
"{} - bgn-AC + {} - an-AC ".format(str(len(self.sta_list) // 2), str(len(self.sta_list) // 2)))
for key in res[case]:
throughput.append(
"BK : {}, BE : {}, VI: {}, VO: {}".format(res[case][key]["bkQOS"],
res[case][key]["beQOS"],
res[case][key][
"videoQOS"],
res[case][key][
"voiceQOS"]))
throughput_df[0].append(res[case][key]['bkQOS'])
throughput_df[1].append(res[case][key]['beQOS'])
throughput_df[2].append(res[case][key]['videoQOS'])
throughput_df[3].append(res[case][key]['voiceQOS'])
pkt_loss_df[0].append(res[case][key]['bkLOSS'])
pkt_loss_df[1].append(res[case][key]['beLOSS'])
pkt_loss_df[2].append(res[case][key]['videoLOSS'])
pkt_loss_df[3].append(res[case][key]['voiceLOSS'])
latency_df[0].append(res[case][key]['bkDELAY'])
latency_df[1].append(res[case][key]['beDELAY'])
latency_df[2].append(res[case][key]['videoDELAY'])
latency_df[3].append(res[case][key]['voiceDELAY'])
table_df.update({"No of Stations": num_stations})
table_df.update({"Throughput for Load {}".format(key): throughput})
graph_df.update({case: [throughput_df, pkt_loss_df, latency_df]})
res.update({"throughput_table_df": table_df})
res.update({"graph_df": graph_df})
return res
def generate_report(self, data, test_setup_info, input_setup_info):
res = self.set_report_data(data)
report = lf_report(_output_pdf="throughput_qos.pdf", _output_html="throughput_qos.html")
report_path = report.get_path()
report_path_date_time = report.get_path_date_time()
print("path: {}".format(report_path))
print("path_date_time: {}".format(report_path_date_time))
report.set_title("Throughput QOS")
report.build_banner()
# objective title and description
report.set_obj_html(_obj_title="Objective",
_obj="Through this test we can evaluate the throughput for given number of clients which"
" runs the traffic with a particular Type of Service i.e BK,BE,VI,VO")
report.build_objective()
report.test_setup_table(test_setup_data=test_setup_info, value="Device Under Test")
report.set_table_title(
"Overall download Throughput for all TOS i.e BK | BE | Video (VI) | Voice (VO)")
report.build_table_title()
df_throughput = pd.DataFrame(res["throughput_table_df"])
report.set_table_dataframe(df_throughput)
report.build_table()
for key in res["graph_df"]:
report.set_graph_title(f"Overall Download throughput for {len(self.sta_list)} - {key} clients with different TOS.")
report.set_obj_html(_obj_title="", _obj="The below graph represents overall download throughput for all "
"connected stations running BK, BE, VO, VI traffic with different "
"intended loads per station {}".format(res["graph_df"][key].keys()))
report.build_graph_title()
report.build_objective()
graph = lf_bar_graph(_data_set=res["graph_df"][key][0],
_xaxis_name="Load per Type of Service",
_yaxis_name="Throughput (Mbps)",
_xaxis_categories=[1],
_xaxis_label=['1 Mbps', '2 Mbps', '3 Mbps', '4 Mbps', '5 Mbps'],
_graph_image_name=f"tos_download_{key}Hz",
_label=["BK", "BE", "VI", "VO"],
_xaxis_step=1,
_graph_title="Overall download throughput BK,BE,VO,VI traffic streams",
_title_size=16,
_color=['orangered', 'greenyellow', 'steelblue', 'blueviolet'],
_color_edge='black',
_bar_width=0.15,
_dpi=96,
_enable_csv=True,
_color_name=['orangered', 'greenyellow', 'steelblue', 'blueviolet'])
graph_png = graph.build_bar_graph()
print("graph name {}".format(graph_png))
report.set_graph_image(graph_png)
# need to move the graph image to the results directory
report.move_graph_image()
report.set_csv_filename(graph_png)
report.move_csv_file()
report.build_graph()
report.set_graph_title(f"Overall Packet loss for {len(self.sta_list)} - {key} clients with different TOS.")
report.set_obj_html(_obj_title="",
_obj="This graph shows the overall packet loss for the connected stations "
"for BK,BE,VO,VI traffic with intended load per station {}".format(res["graph_df"][key].keys()))
report.build_graph_title()
report.build_objective()
graph = lf_bar_graph(_data_set=res["graph_df"][key][1],
_xaxis_name="Load per Type of Service",
_yaxis_name="Packet Loss (%)",
_xaxis_categories=[1],
_xaxis_label=['1 Mbps', '2 Mbps', '3 Mbps', '4 Mbps', '5 Mbps'],
_graph_image_name=f"pkt_loss_{key}Hz",
_label=["BK", "BE", "VI", "VO"],
_xaxis_step=1,
_graph_title="Load vs Packet Loss",
_title_size=16,
_color=['orangered', 'greenyellow', 'steelblue', 'blueviolet'],
_color_edge='black',
_bar_width=0.15,
_dpi=96,
_enable_csv=True,
_color_name=['orangered', 'greenyellow', 'steelblue', 'blueviolet'])
graph_png = graph.build_bar_graph()
print("graph name {}".format(graph_png))
report.set_graph_image(graph_png)
# need to move the graph image to the results directory
report.move_graph_image()
report.set_csv_filename(graph_png)
report.move_csv_file()
report.build_graph()
report.set_obj_html(
_obj_title=f"Overall Latency for {len(self.sta_list)} - {key} clients with different TOS.",
_obj="This graph shows the overall Latency for the connected stations "
"for BK,BE,VO,VI traffic with intended load per station {}".format(res["graph_df"][key].keys()))
report.build_objective()
graph = lf_bar_graph(_data_set=res["graph_df"][key][2],
_xaxis_name="Load per Type of Service",
_yaxis_name="Average Latency (in seconds)",
_xaxis_categories=[1],
_xaxis_label=['1 Mbps', '2 Mbps', '3 Mbps', '4 Mbps', '5 Mbps'],
_graph_image_name=f"latency_{key}Hz",
_label=["BK", "BE", "VI", "VO"],
_xaxis_step=1,
_graph_title="Overall Download Latency BK,BE,VO,VI traffic streams",
_title_size=16,
_show_bar_value=True,
_color=['orangered', 'yellowgreen', 'steelblue', 'blueviolet'],
_color_edge='black',
_bar_width=0.15,
_dpi=96,
_enable_csv=True,
_color_name=['orangered', 'yellowgreen', 'steelblue', 'blueviolet'])
graph_png = graph.build_bar_graph()
print("graph name {}".format(graph_png))
report.set_graph_image(graph_png)
# need to move the graph image to the results directory
report.move_graph_image()
report.set_csv_filename(graph_png)
report.move_csv_file()
report.build_graph()
self.generate_individual_graph(res, report)
report.test_setup_table(test_setup_data=input_setup_info, value="Information")
report.build_custom()
report.write_html()
report.write_pdf()
def generate_individual_graph(self, res, report):
if len(res.keys()) > 0:
if "throughput_table_df" in res:
res.pop("throughput_table_df")
if "graph_df" in res:
res.pop("graph_df")
for key in res:
for load in res[key]:
report.set_obj_html(
_obj_title=f"Individual download throughput with intended load {load}/station for traffic BK(WiFi).",
_obj=f"The below graph represents individual throughput for {len(self.sta_list)} clients running BK "
f"(WiFi) traffic. X- axis shows “number of clients” and Y-axis shows “"
f"Throughput in Mbps”.")
report.build_objective()
graph = lf_bar_graph(_data_set=[res[key][load]['bk']], _xaxis_name="Clients running - BK",
_yaxis_name="Throughput in Mbps",
_xaxis_categories=[i + 1 for i in range(len(self.sta_list))],
_xaxis_label=[i + 1 for i in range(len(self.sta_list))],
_label=["BK"],
_xaxis_step=1,
_xticks_font=8,
_graph_title="Individual download throughput for BK(WIFI) traffic - {} clients".format(
key),
_title_size=16,
_bar_width=0.15, _color_name=['orangered'],
_enable_csv=True,
_graph_image_name="{}_bk_{}".format(key, load), _color_edge=['black'],
_color=['orangered'])
graph_png = graph.build_bar_graph()
print("graph name {}".format(graph_png))
report.set_graph_image(graph_png)
# need to move the graph image to the results
report.move_graph_image()
report.set_csv_filename(graph_png)
report.move_csv_file()
report.build_graph()
report.set_obj_html(
_obj_title=f"Individual download throughput with intended load {load}/station for traffic BE(WiFi).",
_obj=f"The below graph represents individual throughput for {len(self.sta_list)} clients running BE "
f"(WiFi) traffic. X- axis shows “number of clients” and Y-axis shows "
f"“Throughput in Mbps”.")
report.build_objective()
graph = lf_bar_graph(_data_set=[res[key][load]['be']], _xaxis_name="Clients running - BE",
_yaxis_name="Throughput in Mbps",
_xaxis_categories=[i + 1 for i in range(len(self.sta_list))],
_xaxis_label=[i + 1 for i in range(len(self.sta_list))],
_label=["BE"],
_xaxis_step=1,
_xticks_font=8,
_graph_title="Individual download throughput for BE(WIFI) traffic - {} clients".format(
key),
_title_size=16,
_bar_width=0.15, _color_name=['yellowgreen'],
_enable_csv=True,
_graph_image_name="{}_be_{}".format(key, load), _color_edge=['black'],
_color=['yellowgreen'])
graph_png = graph.build_bar_graph()
print("graph name {}".format(graph_png))
report.set_graph_image(graph_png)
# need to move the graph image to the results
report.move_graph_image()
report.set_csv_filename(graph_png)
report.move_csv_file()
report.build_graph()
report.set_obj_html(
_obj_title=f"Individual download throughput with intended load {load}/station for traffic VI(WiFi).",
_obj=f"The below graph represents individual throughput for {len(self.sta_list)} clients running VI "
f"(WiFi) traffic. X- axis shows “number of clients” and Y-axis shows "
f"“Throughput in Mbps”.")
report.build_objective()
graph = lf_bar_graph(_data_set=[res[key][load]['video']], _xaxis_name="Clients running - VI",
_yaxis_name="Throughput in Mbps",
_xaxis_categories=[i + 1 for i in range(len(self.sta_list))],
_xaxis_label=[i + 1 for i in range(len(self.sta_list))],
_label=["Video"],
_xaxis_step=1,
_xticks_font=8,
_graph_title="Individual download throughput for VI(WIFI) traffic - {} clients".format(
key),
_title_size=16,
_bar_width=0.15, _color_name=['steelblue'],
_enable_csv=True,
_graph_image_name="{}_video_{}".format(key, load),
_color_edge=['black'],
_color=['steelblue'])
graph_png = graph.build_bar_graph()
print("graph name {}".format(graph_png))
report.set_graph_image(graph_png)
# need to move the graph image to the results
report.move_graph_image()
report.set_csv_filename(graph_png)
report.move_csv_file()
report.build_graph()
report.set_obj_html(
_obj_title=f"Individual download throughput with intended load {load}/station for traffic VO(WiFi).",
_obj=f"The below graph represents individual throughput for {len(self.sta_list)} clients running VO "
f"(WiFi) traffic. X- axis shows “number of clients” and Y-axis shows "
f"“Throughput in Mbps”.")
report.build_objective()
graph = lf_bar_graph(_data_set=[res[key][load]['voice']], _xaxis_name="Clients running - VO",
_yaxis_name="Throughput in Mbps",
_xaxis_categories=[i + 1 for i in range(len(self.sta_list))],
_xaxis_label=[i + 1 for i in range(len(self.sta_list))],
_label=['Voice'],
_xaxis_step=1,
_xticks_font=8,
_graph_title="Individual download throughput for VO(WIFI) traffic - {} clients".format(
key),
_title_size=16,
_bar_width=0.15, _color_name=['blueviolet'],
_enable_csv=True,
_graph_image_name="{}_voice_{}".format(key, load),
_color_edge=['black'],
_color=['blueviolet'])
graph_png = graph.build_bar_graph()
print("graph name {}".format(graph_png))
report.set_graph_image(graph_png)
# need to move the graph image to the results
report.move_graph_image()
report.set_csv_filename(graph_png)
report.move_csv_file()
report.build_graph()
else:
print("No individual graph to generate.")
def main():
parser = Realm.create_basic_argparse(
prog='throughput_QOS.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Create stations and endpoints and runs L3 traffic with various IP type of service(BK | BE | Video | Voice)
''',
description='''\
throughput_QOS.py:
--------------------
Generic command layout:
python3 ./throughput_QOS.py
--upstream_port eth1
--radio_2g wiphy0
--num_stations 32
--security {open|wep|wpa|wpa2|wpa3}
--mode 1
{"auto" : "0",
"a" : "1",
"b" : "2",
"g" : "3",
"abg" : "4",
"abgn" : "5",
"bgn" : "6",
"bg" : "7",
"abgnAC" : "8",
"anAC" : "9",
"an" : "10",
"bgnAC" : "11",
"abgnAX" : "12",
"bgnAX" : "13"}
--ssid netgear
--password admin123
--a_min 3000
--b_min 1000
--ap "00:0e:8e:78:e1:76"
--debug
--ap_name Netgear RC6700
--upstream_port eth1 (upstream Port)
--traffic_type lf_udp (traffic type, lf_udp | lf_tcp)
--test_duration 5m (duration to run traffic 5m --> 5 Minutes)
--create_sta False (False, means it will not create stations and use the sta_names specified below)
--sta_names sta000,sta001,sta002 (used if --create_sta is False, comma separated names of stations)
''')
parser.add_argument('--mode', help='Used to force mode of stations', default="0")
parser.add_argument('--ap', help='Used to force a connection to a particular AP')
parser.add_argument('--traffic_type', help='Select the Traffic Type [lf_udp, lf_tcp]', required=True)
parser.add_argument('--a_min', help='--a_min bps rate minimum for side_a', default=256000)
parser.add_argument('--b_min', help='--b_min bps rate minimum for side_b', default=256000)
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="2m")
parser.add_argument('--create_sta', help='Used to force a connection to a particular AP', default=True)
parser.add_argument('--sta_names', help='Used to force a connection to a particular AP', default="sta0000")
parser.add_argument('--tos', help='used to provide different ToS settings: BK | BE | VI | VO | numeric',
default="Best Effort")
parser.add_argument('--ap_name', help="AP Model Name", default="Test-AP")
parser.add_argument('--bands', help='used to run on multiple radio bands,can be used with multiple stations',
default="2.4G, 5G, BOTH", required=True)
parser.add_argument('--ssid_2g', help="ssid for 2.4Ghz band")
parser.add_argument('--security_2g', help="security type for 2.4Ghz band")
parser.add_argument('--passwd_2g', help="password for 2.4Ghz band")
parser.add_argument('--ssid_5g', help="ssid for 5Ghz band")
parser.add_argument('--security_5g', help="security type for 5Ghz band")
parser.add_argument('--passwd_5g', help="password for 5Ghz band")
parser.add_argument('--radio_2g', help="radio which supports 2.4G bandwidth", default="wiphy0")
parser.add_argument('--radio_5g', help="radio which supports 5G bandwidth", default="wiphy1")
args = parser.parse_args()
print("--------------------------------------------")
print(args)
print("--------------------------------------------")
args.test_case = args.bands.split(',')
test_results = []
loads = {}
bands = []
station_list = []
if (args.a_min is not None) and (args.b_min is not None):
if (type(args.a_min) is not int) and (type(args.b_min) is not int):
args.a_min = args.a_min.split(',')
args.b_min = args.b_min.split(',')
loads = {"a_min": args.a_min, "b_min": args.b_min}
else:
args.a_min = str(args.a_min).split(",")
args.b_min = str(args.b_min).split(",")
loads = {"a_min": args.a_min, "b_min": args.b_min}
if args.bands is not None:
bands = args.bands.split(',')
if args.test_duration is not None:
args.test_duration = args.test_duration.strip('m')
test_start_time = datetime.datetime.now().strftime("%b %d %H:%M:%S")
print("Test started at: ", test_start_time)
for i in range(len(bands)):
if bands[i] == "2.4G" or bands[i] == "2.4g":
args.bands = bands[i]
args.mode = 11
if args.create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=int(args.num_stations) - 1,
padding_number_=10000, radio=args.radio_2g)
else:
station_list = args.sta_names.split(",")
elif bands[i] == "5G" or bands[i] == "5g":
args.bands = bands[i]
args.mode = 9
if args.create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=int(args.num_stations) - 1,
padding_number_=10000,
radio=args.radio_5g)
else:
station_list = args.sta_names.split(",")
elif bands[i] == "BOTH" or bands[i] == "both":
args.bands = bands[i]
args.mode = 0
if (int(args.num_stations) % 2) != 0:
print("Number of stations for Both Band should be even in number.")
exit(1)
mid = int(args.num_stations) // 2
if args.create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=mid - 1,
padding_number_=10000,
radio=args.radio_2g)
station_list.extend(LFUtils.portNameSeries(prefix_="sta", start_id_=mid,
end_id_=int(args.num_stations) - 1,
padding_number_=10000,
radio=args.radio_5g))
else:
station_list = args.sta_names.split(",")
else:
print("Band " + bands[i] + " Not Exist")
exit(1)
print("------------------------")
print(args.bands)
print("------------------------")
# ---------------------------------------#
for index in range(len(loads["b_min"])):
throughput_qos = ThroughputQOS(host=args.mgr,
port=args.mgr_port,
number_template="0000",
ap_name=args.ap_name,
sta_list=station_list,
create_sta=args.create_sta,
name_prefix="TOS-",
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
security=args.security,
ssid_2g=args.ssid_2g,
password_2g=args.passwd_2g,
security_2g=args.security_2g,
ssid_5g=args.ssid_5g,
password_5g=args.passwd_5g,
security_5g=args.security_5g,
radio_2g=args.radio_2g,
radio_5g=args.radio_5g,
test_duration=args.test_duration,
use_ht160=False,
side_a_min_rate=int(loads['a_min'][index]),
side_b_min_rate=int(loads['b_min'][index]),
mode=args.mode,
bands=args.bands,
ap=args.ap,
traffic_type=args.traffic_type,
tos=args.tos,
test_case=args.test_case,
_debug_on=args.debug)
throughput_qos.pre_cleanup()
throughput_qos.build()
if args.create_sta:
if not throughput_qos.passes():
print(throughput_qos.get_fail_message())
throughput_qos.exit_fail()
try:
layer3connections = ','.join([[*x.keys()][0] for x in throughput_qos.json_get('endp')['endpoint']])
except:
raise ValueError('Try setting the upstream port flag if your device does not have an eth1 port')
throughput_qos.start(False, False)
time.sleep(int(args.test_duration) * 60)
throughput_qos.stop()
test_results.append(throughput_qos.evaluate_qos())
if args.create_sta:
if not throughput_qos.passes():
print(throughput_qos.get_fail_message())
throughput_qos.exit_fail()
LFUtils.wait_until_ports_admin_up(port_list=station_list)
if throughput_qos.passes():
throughput_qos.success()
throughput_qos.cleanup()
test_end_time = datetime.datetime.now().strftime("%b %d %H:%M:%S")
print("Test ended at: ", test_end_time)
test_setup_info = {
"AP Model": throughput_qos.ap_name,
"SSID": throughput_qos.ssid,
"SSID - 2.4 Ghz": throughput_qos.ssid_2g,
"SSID - 5 Ghz": throughput_qos.ssid_5g,
"Test Duration": datetime.datetime.strptime(test_end_time, "%b %d %H:%M:%S") - datetime.datetime.strptime(
test_start_time, "%b %d %H:%M:%S")
}
if throughput_qos.ssid is None:
test_setup_info.pop("SSID")
else:
if throughput_qos.ssid_2g is None:
test_setup_info.pop("SSID - 2.4 Ghz")
if throughput_qos.ssid_5g is None:
test_setup_info.pop("SSID - 5 Ghz")
input_setup_info = {
"contact": "support@candelatech.com"
}
data = test_results
throughput_qos.generate_report(data=data, test_setup_info=test_setup_info, input_setup_info=input_setup_info)
if __name__ == "__main__":
main()