addition of test output logic for urls_per_ten , arg parsing changes to var_time

This commit is contained in:
Dipti
2021-01-22 01:33:24 -08:00
parent 49a1e75b0a
commit 5c18c4edfd
3 changed files with 41 additions and 45 deletions

View File

@@ -34,6 +34,7 @@ class IPV4L4(LFCliBase):
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.radio = radio
@@ -52,6 +53,8 @@ class IPV4L4(LFCliBase):
self.target_requests_per_ten = int(target_requests_per_ten)
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.l4cxprofile=realm.L4CXProfile(lfclient_host=host,
lfclient_port=port,local_realm=self.local_realm)
self.station_profile = self.local_realm.new_station_profile()
self.cx_profile = self.local_realm.new_l4_cx_profile()
@@ -67,20 +70,6 @@ class IPV4L4(LFCliBase):
self.cx_profile.url = self.url
self.cx_profile.requests_per_ten = self.requests_per_ten
# def __check_request_rate(self):
# endp_list = self.json_get("layer4/list?fields=urls/s")
# expected_passes = 0
# passes = 0
# if endp_list is not None and endp_list['endpoint'] is not None:
# endp_list = endp_list['endpoint']
# for item in endp_list:
# for name, info in item.items():
# if name in self.cx_profile.created_cx.keys():
# expected_passes += 1
# if info['urls/s'] * self.requests_per_ten >= self.target_requests_per_ten * .9:
# print(name, info['urls/s'], info['urls/s'] * self.requests_per_ten, self.target_requests_per_ten * .9)
# passes += 1
# return passes == expected_passes
def build(self):
# Build stations
@@ -105,32 +94,8 @@ class IPV4L4(LFCliBase):
self._fail("Stations failed to get IPs", print_fail)
exit(1)
self.cx_profile.start_cx()
print("Starting test")
curr_time = datetime.datetime.now()
end_time = self.local_realm.parse_time(self.test_duration) + curr_time
sleep_interval = self.local_realm.parse_time(self.test_duration) // 5
passes = 0
expected_passes = 0
for test in range(self.num_tests):
expected_passes += 1
while curr_time < end_time:
time.sleep(sleep_interval.total_seconds())
if self.debug:
print(".",end="")
curr_time = datetime.datetime.now()
if self.cx_profile.check_errors(self.debug):
if self.__check_request_rate():
passes += 1
else:
self._fail("FAIL: Request rate did not exceed 90% target rate", print_fail)
break
else:
self._fail("FAIL: Errors found getting to %s " % self.url, print_fail)
break
if passes == expected_passes:
self._pass("PASS: All tests passes", print_pass)
print("Starting test...")
def stop(self):
self.cx_profile.stop_cx()
self.station_profile.admin_down()
@@ -202,12 +167,31 @@ python3 ./test_ipv4_l4_urls_per_ten.py
optional.add_argument('--target_per_ten', help='--target_per_ten target number of request per ten minutes. test will check for 90 percent this value',default=600)
optional.add_argument('--mode',help='Used to force mode of stations')
optional.add_argument('--ap',help='Used to force a connection to a particular AP')
optional.add_argument('--report_file',help='where you want to store results')
optional.add_argument('--output_format', help='choose either csv or xlsx')
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
if args.report_file is None:
if args.output_format in ['csv','json','html','hdf','stata','pickle','pdf','parquet','xlsx']:
output_form=args.output_format
print("Defaulting file output placement to /home/lanforge.")
rpt_file='/home/lanforge/data.' + output_form
else:
print("Defaulting data file output type to Excel")
rpt_file='/home/lanforge/data.xlsx'
output_form='xlsx'
else:
rpt_file=args.report_file
if args.output_format is None:
output_form=str(args.report_file).split('.')[-1]
else:
output_form=args.output_format
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
@@ -230,6 +214,16 @@ python3 ./test_ipv4_l4_urls_per_ten.py
ip_test.cleanup(station_list)
ip_test.build()
ip_test.start()
try:
layer4traffic=','.join([[*x.keys()][0] for x in ip_test.local_realm.json_get('layer4')['endpoint']])
except:
pass
ip_test.l4cxprofile.monitor(report_file=rpt_file, duration_sec=ip_test.local_realm.parse_time(args.test_duration).total_seconds(),
created_cx=layer4traffic,
output_format=output_form,
script_name='test_ipv4_l4_urls_per_ten',
arguments=args)
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())