multiple bands scenario added

This commit is contained in:
anil-tegala
2021-06-10 02:05:34 +05:30
parent 78109710ef
commit 7552324ec1

View File

@@ -44,7 +44,7 @@ class ThroughputQOS(Realm):
create_sta=True,
name_prefix=None,
upstream=None,
radio=None,
radio="wiphy0",
host="localhost",
port=8080,
mode=0,
@@ -54,7 +54,7 @@ class ThroughputQOS(Realm):
side_b_min_rate=56, side_b_max_rate=0,
number_template="00000",
test_duration="2m",
modes="0",
bands="2.4G, 5G, BOTH",
use_ht160=False,
_debug_on=False,
_exit_on_error=False,
@@ -69,12 +69,12 @@ class ThroughputQOS(Realm):
self.create_sta = create_sta
self.security = security
self.password = password
self.radio = radio
self.radio = radio.split(",")
self.mode = mode
self.ap = ap
self.traffic_type = traffic_type
self.tos = tos.split(",")
self.modes = modes.split(",")
self.bands = bands.split(",")
self.number_template = number_template
self.debug = _debug_on
self.name_prefix = name_prefix
@@ -104,10 +104,9 @@ class ThroughputQOS(Realm):
def start(self, print_pass=False, print_fail=False):
if self.create_sta:
self.station_profile.admin_up()
# to-do- check here if upstream port got IP
temp_stas = self.station_profile.station_names.copy()
if self.wait_for_ip(temp_stas):
# check here if upstream port got IP
temp_stations = self.station_profile.station_names.copy()
if self.wait_for_ip(temp_stations):
self._pass("All stations got IPs")
else:
self._fail("Stations failed to get IPs")
@@ -132,17 +131,23 @@ class ThroughputQOS(Realm):
debug=self.debug)
def build(self):
if self.create_sta:
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.create_cx()
print("cx build finished")
for key in self.bands:
if self.create_sta:
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
if key == "BOTH" or key == "both":
split = len(self.sta_list) // 2
self.station_profile.create(radio=self.radio[0], sta_names_=self.sta_list[:split], debug=self.debug)
self.station_profile.create(radio=self.radio[1], sta_names_=self.sta_list[split:], debug=self.debug)
else:
self.station_profile.create(radio=self.radio[0], sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.create_cx()
print("cx build finished")
def create_cx(self):
_tos = "BK,BE,VI,VO"
@@ -219,7 +224,7 @@ python3 ./throughput_QOS.py
"bgn" : "6",
"bg" : "7",
"abgnAC" : "8",
"anAC" : "9",
"anAC" : "9",
"an" : "10",
"bgnAC" : "11",
"abgnAX" : "12",
@@ -249,109 +254,114 @@ python3 ./throughput_QOS.py
parser.add_argument('--sta_names', help='Used to force a connection to a particular AP', default="sta0000")
parser.add_argument('--tos', help='used to provide different ToS settings: BK | BE | VI | VO | numeric',
default="Best Effort")
parser.add_argument('--modes', help='used to run on multiple radio modes,can be used with multiple stations',
default="0")
parser.add_argument('--bands', help='used to run on multiple radio bands,can be used with multiple stations',
default="2,4G, 5G, BOTH")
args = parser.parse_args()
print("--------------------------------------------")
print(args)
print("--------------------------------------------")
results = []
loads = {}
# for multiple test conditions #
if args.mode is not None:
modes = args.mode.split(',')
test_results = []
test_cases = {}
if args.bands is not None:
bands = args.bands.split(',')
if args.radio is not None:
radios = args.radio.split(',')
if args.a_min is not None or args.b_min is not None:
args.a_min = args.a_min.split(',')
args.b_min = args.b_min.split(',')
loads = {"a_min": args.a_min, "b_min": args.b_min}
# if radios is not None:
# try:
# if len(args.a_min) != len(args.b_min):
# raise print("The values of a_min and b_min should be same.")
# finally:
# print("")
if len(radios) < 2:
radios.append(radios[0])
if args.test_duration is not None:
args.test_duration = args.test_duration.strip('m')
for key in modes:
if key == "2.4G" or key == "2.4g":
for i in range(len(bands)):
if bands[i] == "2.4G" or bands[i] == "2.4g":
args.bands = bands[i]
args.mode = 9
if i == 0:
args.radio = radios[0]
if i == 1:
args.radio = radios[1]
if args.create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=int(args.num_stations) - 1,
padding_number_=10000, radio=args.radio)
else:
station_list = args.sta_names.split(",")
elif bands[i] == "5G" or bands[i] == "5g":
args.bands = bands[i]
args.mode = 11
if i == 0:
args.radio = radios[0]
if i == 1:
args.radio = radios[1]
if args.create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=int(args.num_stations) - 1,
padding_number_=10000,
radio=args.radio)
else:
station_list = args.sta_names.split(",")
elif bands[i] == "BOTH" or bands[i] == "both":
args.bands = bands[i]
args.radio = radios
if args.create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=(int(args.num_stations) // 2),
padding_number_=10000,
radio=radios[0])
else:
station_list = args.sta_names.split(",")
elif key == "5G" or key == "5g":
if radios[1] is None:
radios[1] = "wiphy0"
if args.create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=int(args.num_stations) - 1,
padding_number_=10000,
radio=radios[1])
else:
station_list = args.sta_names.split(",")
elif key == "BOTH" or key == "both":
if radios[2] is not None:
radios[2] = "wiphy0"
if args.create_sta:
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=int(args.num_stations) // 2,
padding_number_=10000,
radio=radios[2])
station_list.append(LFUtils.portNameSeries(prefix_="sta", start_id_=(int(args.num_stations) // 2) + 1,
end_id_=int(args.num_stations) - 1,
padding_number_=10000,
radio=radios[2]))
radio=radios[1]))
else:
station_list = args.sta_names.split(",")
for index in range(len(loads["a_min"])):
throughput_qos = ThroughputQOS(host=args.mgr,
port=args.mgr_port,
number_template="0000",
sta_list=station_list,
create_sta=args.create_sta,
name_prefix="TOS-",
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
radio=radios[key],
security=args.security,
test_duration=args.test_duration,
use_ht160=False,
side_a_min_rate=loads["a_min"][index],
side_b_min_rate=loads["b_min"][index],
mode=int(modes[key]),
ap=args.ap,
traffic_type=args.traffic_type,
tos=args.tos,
_debug_on=args.debug)
throughput_qos.pre_cleanup()
throughput_qos.build()
if args.create_sta:
if not throughput_qos.passes():
print(throughput_qos.get_fail_message())
throughput_qos.exit_fail()
# try:
# layer3connections = ','.join([[*x.keys()][0] for x in throughput_qos.json_get('endp')['endpoint']])
# except:
# raise ValueError('Try setting the upstream port flag if your device does not have an eth1 port')
throughput_qos = ThroughputQOS(host=args.mgr,
port=args.mgr_port,
number_template="0000",
sta_list=station_list,
create_sta=args.create_sta,
name_prefix="TOS-",
upstream=args.upstream_port,
ssid=args.ssid,
password=args.passwd,
radio=args.radio,
security=args.security,
test_duration=args.test_duration,
use_ht160=False,
side_a_min_rate=args.a_min,
side_b_min_rate=args.b_min,
mode=args.mode,
bands=args.bands,
ap=args.ap,
traffic_type=args.traffic_type,
tos=args.tos,
_debug_on=args.debug)
throughput_qos.pre_cleanup()
throughput_qos.build()
throughput_qos.start(False, False)
time.sleep(int(args.test_duration) * 60)
throughput_qos.stop()
results.append(throughput_qos.evaluate_throughput())
if args.create_sta:
if not throughput_qos.passes():
print(throughput_qos.get_fail_message())
throughput_qos.exit_fail()
LFUtils.wait_until_ports_admin_up(port_list=station_list)
if throughput_qos.passes():
throughput_qos.success()
throughput_qos.cleanup()
if args.create_sta:
if not throughput_qos.passes():
print(throughput_qos.get_fail_message())
throughput_qos.exit_fail()
# try:
# layer3connections = ','.join([[*x.keys()][0] for x in throughput_qos.json_get('endp')['endpoint']])
# except:
# raise ValueError('Try setting the upstream port flag if your device does not have an eth1 port')
throughput_qos.start(False, False)
time.sleep(int(args.test_duration) * 60)
throughput_qos.stop()
test_results.append(throughput_qos.evaluate_throughput())
if args.create_sta:
if not throughput_qos.passes():
print(throughput_qos.get_fail_message())
throughput_qos.exit_fail()
LFUtils.wait_until_ports_admin_up(port_list=station_list)
if throughput_qos.passes():
throughput_qos.success()
throughput_qos.cleanup()
# ---------------------------------------#
print('+++++++++++++++++')
print(results)
print(test_results)
print('+++++++++++++++++')