mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 11:18:03 +00:00
fetch results for all bands 2.4g,5g,both
This commit is contained in:
@@ -61,6 +61,8 @@ class ThroughputQOS(Realm):
|
||||
number_template="00000",
|
||||
test_duration="2m",
|
||||
bands="2.4G, 5G, BOTH",
|
||||
radios="",
|
||||
test_case={},
|
||||
use_ht160=False,
|
||||
_debug_on=False,
|
||||
_exit_on_error=False,
|
||||
@@ -87,6 +89,8 @@ class ThroughputQOS(Realm):
|
||||
self.traffic_type = traffic_type
|
||||
self.tos = tos.split(",")
|
||||
self.bands = bands.split(",")
|
||||
self.radios = radios
|
||||
self.test_case = test_case
|
||||
self.number_template = number_template
|
||||
self.debug = _debug_on
|
||||
self.name_prefix = name_prefix
|
||||
@@ -145,31 +149,61 @@ class ThroughputQOS(Realm):
|
||||
def build(self):
|
||||
for key in self.bands:
|
||||
if self.create_sta:
|
||||
if key == "2.4G" or key == "2,4g":
|
||||
if key == "2.4G" or key == "2.4g":
|
||||
if self.ssid is None:
|
||||
self.station_profile.use_security(self.security_2g, self.ssid_2g, self.password_2g)
|
||||
else:
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
elif key == "5G" or key == "5G":
|
||||
|
||||
if key == "5G" or key == "5g":
|
||||
if self.ssid is None:
|
||||
self.station_profile.use_security(self.security_5g, self.ssid_5g, self.password_5g)
|
||||
else:
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
else:
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
|
||||
self.station_profile.set_number_template(self.number_template)
|
||||
print("Creating stations")
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
|
||||
if key == "BOTH" or key == "both":
|
||||
if (self.ssid is None) and (self.password is None) and (self.security is None):
|
||||
self.station_profile.use_security(self.security_2g, self.ssid_2g, self.password_2g)
|
||||
keys = list(self.test_case.keys())
|
||||
if "BOTH" in self.test_case:
|
||||
self.radios = self.test_case["BOTH"].split(",")
|
||||
elif "both" in self.test_case:
|
||||
self.radios = self.test_case["both"].split(",")
|
||||
split = len(self.sta_list) // 2
|
||||
self.station_profile.mode = 2
|
||||
self.station_profile.create(radio=self.radio[0], sta_names_=self.sta_list[:split], debug=self.debug)
|
||||
self.station_profile.mode = 9
|
||||
self.station_profile.create(radio=self.radio[1], sta_names_=self.sta_list[split:], debug=self.debug)
|
||||
if keys[0] == "2.4G" or keys[0] == "2.4g":
|
||||
if self.ssid is None:
|
||||
self.station_profile.use_security(self.security_2g, self.ssid_2g, self.password_2g)
|
||||
else:
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.mode = 11
|
||||
self.station_profile.create(radio=self.radios[0], sta_names_=self.sta_list[:split],
|
||||
debug=self.debug)
|
||||
if self.ssid is None:
|
||||
self.station_profile.use_security(self.security_5g, self.ssid_5g, self.password_5g)
|
||||
else:
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.mode = 9
|
||||
self.station_profile.create(radio=self.radios[1], sta_names_=self.sta_list[split:],
|
||||
debug=self.debug)
|
||||
elif keys[0] == "5G" or keys[0] == "5g":
|
||||
if self.ssid is None:
|
||||
self.station_profile.use_security(self.security_5g, self.ssid_5g, self.password_5g)
|
||||
else:
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.mode = 9
|
||||
self.station_profile.create(radio=self.radios[0], sta_names_=self.sta_list[:split],
|
||||
debug=self.debug)
|
||||
if self.ssid is None:
|
||||
self.station_profile.use_security(self.security_2g, self.ssid_2g, self.password_2g)
|
||||
else:
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.mode = 11
|
||||
self.station_profile.create(radio=self.radios[1], sta_names_=self.sta_list[split:],
|
||||
debug=self.debug)
|
||||
else:
|
||||
self.station_profile.create(radio=self.radio[0], sta_names_=self.sta_list, debug=self.debug)
|
||||
self._pass("PASS: Station build finished")
|
||||
@@ -282,7 +316,7 @@ python3 ./throughput_QOS.py
|
||||
parser.add_argument('--tos', help='used to provide different ToS settings: BK | BE | VI | VO | numeric',
|
||||
default="Best Effort")
|
||||
parser.add_argument('--bands', help='used to run on multiple radio bands,can be used with multiple stations',
|
||||
default="2,4G, 5G, BOTH")
|
||||
default="2,4G, 5G, BOTH", required=True)
|
||||
parser.add_argument('--ssid_2g', help="ssid for 2.4Ghz band")
|
||||
parser.add_argument('--security_2g', help="security type for 2.4Ghz band")
|
||||
parser.add_argument('--passwd_2g', help="password for 2.4Ghz band")
|
||||
@@ -295,6 +329,10 @@ python3 ./throughput_QOS.py
|
||||
print("--------------------------------------------")
|
||||
args.test_case = {}
|
||||
test_results = []
|
||||
loads = {}
|
||||
bands = []
|
||||
radios = []
|
||||
station_list = []
|
||||
if (args.a_min is not None) and (args.b_min is not None):
|
||||
args.a_min = args.a_min.split(',')
|
||||
args.b_min = args.b_min.split(',')
|
||||
@@ -357,67 +395,67 @@ python3 ./throughput_QOS.py
|
||||
radio=radios[1]))
|
||||
else:
|
||||
station_list = args.sta_names.split(",")
|
||||
print("-----------------")
|
||||
# print(bands[i])
|
||||
print(args.radio)
|
||||
# print(args.mode)
|
||||
# print(station_list)
|
||||
print(args.test_case)
|
||||
print("-----------------")
|
||||
exit(1)
|
||||
# for index in range(len(loads["a_min"])):
|
||||
# throughput_qos = ThroughputQOS(host=args.mgr,
|
||||
# port=args.mgr_port,
|
||||
# number_template="0000",
|
||||
# sta_list=station_list,
|
||||
# create_sta=args.create_sta,
|
||||
# name_prefix="TOS-",
|
||||
# upstream=args.upstream_port,
|
||||
# ssid=args.ssid,
|
||||
# password=args.passwd,
|
||||
# security=args.security,
|
||||
# ssid_2g=args.ssid_2g,
|
||||
# password_2g=args.passwd_2g,
|
||||
# security_2g=args.security_2g,
|
||||
# ssid_5g=args.ssid_5g,
|
||||
# password_5g=args.passwd_5g,
|
||||
# security_5g=args.security_5g,
|
||||
# radio=args.radio,
|
||||
# test_duration=args.test_duration,
|
||||
# use_ht160=False,
|
||||
# side_a_min_rate=loads['a_min'][index],
|
||||
# side_b_min_rate=loads['b_min'][index],
|
||||
# mode=args.mode,
|
||||
# bands=args.bands,
|
||||
# ap=args.ap,
|
||||
# traffic_type=args.traffic_type,
|
||||
# tos=args.tos,
|
||||
# _debug_on=args.debug)
|
||||
# throughput_qos.pre_cleanup()
|
||||
# throughput_qos.build()
|
||||
#
|
||||
# if args.create_sta:
|
||||
# if not throughput_qos.passes():
|
||||
# print(throughput_qos.get_fail_message())
|
||||
# throughput_qos.exit_fail()
|
||||
# # try:
|
||||
# # layer3connections = ','.join([[*x.keys()][0] for x in throughput_qos.json_get('endp')['endpoint']])
|
||||
# # except:
|
||||
# # raise ValueError('Try setting the upstream port flag if your device does not have an eth1 port')
|
||||
#
|
||||
# throughput_qos.start(False, False)
|
||||
# time.sleep(int(args.test_duration) * 60)
|
||||
# throughput_qos.stop()
|
||||
# test_results.append(throughput_qos.evaluate_throughput())
|
||||
# if args.create_sta:
|
||||
# if not throughput_qos.passes():
|
||||
# print(throughput_qos.get_fail_message())
|
||||
# throughput_qos.exit_fail()
|
||||
# LFUtils.wait_until_ports_admin_up(port_list=station_list)
|
||||
# if throughput_qos.passes():
|
||||
# throughput_qos.success()
|
||||
# throughput_qos.cleanup()
|
||||
# ---------------------------------------#
|
||||
print("-----------------")
|
||||
# print(bands[i])
|
||||
print(args.radio)
|
||||
# print(args.mode)
|
||||
# print(station_list)
|
||||
print(args.test_case)
|
||||
print("-----------------")
|
||||
# ---------------------------------------#
|
||||
for index in range(len(loads["a_min"])):
|
||||
throughput_qos = ThroughputQOS(host=args.mgr,
|
||||
port=args.mgr_port,
|
||||
number_template="0000",
|
||||
sta_list=station_list,
|
||||
create_sta=args.create_sta,
|
||||
name_prefix="TOS-",
|
||||
upstream=args.upstream_port,
|
||||
ssid=args.ssid,
|
||||
password=args.passwd,
|
||||
security=args.security,
|
||||
ssid_2g=args.ssid_2g,
|
||||
password_2g=args.passwd_2g,
|
||||
security_2g=args.security_2g,
|
||||
ssid_5g=args.ssid_5g,
|
||||
password_5g=args.passwd_5g,
|
||||
security_5g=args.security_5g,
|
||||
radio=args.radio,
|
||||
test_duration=args.test_duration,
|
||||
use_ht160=False,
|
||||
side_a_min_rate=loads['a_min'][index],
|
||||
side_b_min_rate=loads['b_min'][index],
|
||||
mode=args.mode,
|
||||
bands=args.bands,
|
||||
ap=args.ap,
|
||||
traffic_type=args.traffic_type,
|
||||
tos=args.tos,
|
||||
test_case=args.test_case,
|
||||
_debug_on=args.debug)
|
||||
throughput_qos.pre_cleanup()
|
||||
throughput_qos.build()
|
||||
|
||||
if args.create_sta:
|
||||
if not throughput_qos.passes():
|
||||
print(throughput_qos.get_fail_message())
|
||||
throughput_qos.exit_fail()
|
||||
# try:
|
||||
# layer3connections = ','.join([[*x.keys()][0] for x in throughput_qos.json_get('endp')['endpoint']])
|
||||
# except:
|
||||
# raise ValueError('Try setting the upstream port flag if your device does not have an eth1 port')
|
||||
|
||||
throughput_qos.start(False, False)
|
||||
time.sleep(int(args.test_duration) * 60)
|
||||
throughput_qos.stop()
|
||||
test_results.append(throughput_qos.evaluate_throughput())
|
||||
if args.create_sta:
|
||||
if not throughput_qos.passes():
|
||||
print(throughput_qos.get_fail_message())
|
||||
throughput_qos.exit_fail()
|
||||
LFUtils.wait_until_ports_admin_up(port_list=station_list)
|
||||
if throughput_qos.passes():
|
||||
throughput_qos.success()
|
||||
throughput_qos.cleanup()
|
||||
print('+++++++++++++++++')
|
||||
print(test_results)
|
||||
print('+++++++++++++++++')
|
||||
|
||||
Reference in New Issue
Block a user