mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
Moved files being replaced by test_l4.py, Commit 2 of 2
This commit is contained in:
248
py-scripts/scripts_deprecated/test_ipv4_l4.py
Executable file
248
py-scripts/scripts_deprecated/test_ipv4_l4.py
Executable file
@@ -0,0 +1,248 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
NAME: test_ipv4_l4.py
|
||||
|
||||
PURPOSE:
|
||||
test_ipv4_l4.py will create stations and endpoints to generate and verify layer-4 traffic
|
||||
|
||||
This script will monitor the bytes-rd attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
|
||||
|
||||
EXAMPLE:
|
||||
./test_ipv4_l4.py --upstream_port eth1 (optional) --radio wiphy0 (required) --num_stations 3 (optional)
|
||||
--security {open|wep|wpa|wpa2|wpa3} (required) --ssid netgear (required)
|
||||
--url "dl http://10.40.0.1 /dev/null" (required) --password admin123 (required)
|
||||
--test_duration 2m (optional) --debug (optional)
|
||||
|
||||
Use './test_ipv4_l4.py --help' to see command line usage and options
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
License: Free to distribute and modify. LANforge systems must be licensed.
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append('../../py-json')
|
||||
|
||||
import argparse
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge.LFUtils import *
|
||||
from LANforge import LFUtils
|
||||
import argparse
|
||||
import realm
|
||||
import time
|
||||
import datetime
|
||||
|
||||
|
||||
class IPV4L4(LFCliBase):
|
||||
def __init__(self, host, port, ssid, security, password, url,
|
||||
station_list,
|
||||
number_template="00000", radio="wiphy0",
|
||||
test_duration="5m", upstream_port="eth1",
|
||||
_debug_on=False,
|
||||
_exit_on_error=False,
|
||||
_exit_on_fail=False):
|
||||
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.radio = radio
|
||||
self.upstream_port = upstream_port
|
||||
self.ssid = ssid
|
||||
self.security = security
|
||||
self.password = password
|
||||
self.url = url
|
||||
self.number_template = number_template
|
||||
self.sta_list = station_list
|
||||
self.test_duration = test_duration
|
||||
|
||||
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
||||
self.station_profile = self.local_realm.new_station_profile()
|
||||
self.cx_profile = self.local_realm.new_l4_cx_profile()
|
||||
|
||||
self.station_profile.lfclient_url = self.lfclient_url
|
||||
self.station_profile.ssid = self.ssid
|
||||
self.station_profile.ssid_pass = self.password,
|
||||
self.station_profile.security = self.security
|
||||
self.station_profile.number_template_ = self.number_template
|
||||
self.station_profile.mode = 0
|
||||
|
||||
self.cx_profile.url = self.url
|
||||
|
||||
def __compare_vals(self, old_list, new_list):
|
||||
passes = 0
|
||||
expected_passes = 0
|
||||
if len(old_list) == len(new_list):
|
||||
for item, value in old_list.items():
|
||||
expected_passes += 1
|
||||
if new_list[item] > old_list[item]:
|
||||
passes += 1
|
||||
# print(item, old_list[item], new_list[item], passes, expected_passes)
|
||||
|
||||
if passes == expected_passes:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
def __get_values(self):
|
||||
time.sleep(1)
|
||||
cx_list = self.json_get("layer4/list?fields=name,bytes-rd", debug_=self.debug)
|
||||
# print("==============\n", cx_list, "\n==============")
|
||||
cx_map = {}
|
||||
for cx_name in cx_list['endpoint']:
|
||||
if cx_name != 'uri' and cx_name != 'handler':
|
||||
for item, value in cx_name.items():
|
||||
for value_name, value_rx in value.items():
|
||||
if item in self.cx_profile.created_cx.keys() and value_name == 'bytes-rd':
|
||||
cx_map[item] = value_rx
|
||||
return cx_map
|
||||
|
||||
def build(self):
|
||||
# Build stations
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.set_number_template(self.number_template)
|
||||
print("Creating stations")
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
|
||||
self._pass("PASS: Station build finished")
|
||||
|
||||
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug,
|
||||
suppress_related_commands_=None)
|
||||
|
||||
def start(self, print_pass=False, print_fail=False):
|
||||
temp_stas = self.station_profile.station_names.copy()
|
||||
self.station_profile.admin_up()
|
||||
if self.local_realm.wait_for_ip(temp_stas):
|
||||
self._pass("All stations got IPs", print_pass)
|
||||
else:
|
||||
self._fail("Stations failed to get IPs", print_fail)
|
||||
exit(1)
|
||||
cur_time = datetime.datetime.now()
|
||||
old_rx_values = self.__get_values()
|
||||
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
|
||||
self.cx_profile.start_cx()
|
||||
passes = 0
|
||||
expected_passes = 0
|
||||
print("Starting Test...")
|
||||
while cur_time < end_time:
|
||||
interval_time = cur_time + datetime.timedelta(minutes=1)
|
||||
while cur_time < interval_time:
|
||||
cur_time = datetime.datetime.now()
|
||||
time.sleep(1)
|
||||
|
||||
new_rx_values = self.__get_values()
|
||||
# print(old_rx_values, new_rx_values)
|
||||
# print("\n-----------------------------------")
|
||||
# print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
|
||||
# print("-----------------------------------\n")
|
||||
expected_passes += 1
|
||||
if self.__compare_vals(old_rx_values, new_rx_values):
|
||||
passes += 1
|
||||
else:
|
||||
self._fail("FAIL: Not all stations increased traffic", print_fail)
|
||||
break
|
||||
old_rx_values = new_rx_values
|
||||
cur_time = datetime.datetime.now()
|
||||
if passes == expected_passes:
|
||||
self._pass("PASS: All tests passes", print_pass)
|
||||
|
||||
def stop(self):
|
||||
self.cx_profile.stop_cx()
|
||||
self.station_profile.admin_down()
|
||||
|
||||
def cleanup(self, sta_list):
|
||||
self.cx_profile.cleanup()
|
||||
self.station_profile.cleanup(sta_list)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
|
||||
debug=self.debug)
|
||||
|
||||
|
||||
def main():
|
||||
lfjson_host = "localhost"
|
||||
lfjson_port = 8080
|
||||
|
||||
parser = LFCliBase.create_basic_argparse(
|
||||
prog='test_generic.py',
|
||||
# formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog='''\
|
||||
Create layer-4 endpoints and test that the bytes-rd from the chosen URL are increasing over the
|
||||
duration of the test
|
||||
''',
|
||||
|
||||
description='''\
|
||||
test_ipv4_l4.py
|
||||
--------------------
|
||||
Generic command layout:
|
||||
python ./test_ipv4_l4.py --upstream_port <port> --radio <radio 0> <stations> <ssid> <ssid password> <security type: wpa2, open, wpa3> --debug
|
||||
|
||||
Command Line Example:
|
||||
python3 ./test_ipv4_l4.py
|
||||
--upstream_port eth1 (optional)
|
||||
--radio wiphy0 (required)
|
||||
--num_stations 3 (optional)
|
||||
--security {open|wep|wpa|wpa2|wpa3} (required)
|
||||
--ssid netgear (required)
|
||||
--url "dl http://10.40.0.1 /dev/null" (required)
|
||||
--password admin123 (required)
|
||||
--test_duration 2m (optional)
|
||||
--debug (optional)
|
||||
|
||||
''')
|
||||
|
||||
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="5m")
|
||||
parser.add_argument('--url', help='--url specifies upload/download, address, and dest',
|
||||
default="dl http://10.40.0.1 /dev/null")
|
||||
|
||||
args = parser.parse_args()
|
||||
num_sta = 2
|
||||
if (args.num_stations is not None) and (int(args.num_stations) > 0):
|
||||
num_stations_converted = int(args.num_stations)
|
||||
num_sta = num_stations_converted
|
||||
|
||||
station_list = LFUtils.portNameSeries(prefix_="sta",
|
||||
start_id_=0,
|
||||
end_id_=num_sta - 1,
|
||||
padding_number_=10000,
|
||||
radio=args.radio)
|
||||
|
||||
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
|
||||
ssid=args.ssid,
|
||||
radio=args.radio,
|
||||
password=args.passwd,
|
||||
security=args.security,
|
||||
station_list=station_list,
|
||||
url=args.url,
|
||||
test_duration=args.test_duration,
|
||||
upstream_port=args.upstream_port,
|
||||
_debug_on=args.debug)
|
||||
|
||||
ip_test.cleanup(station_list)
|
||||
ip_test.build()
|
||||
print('Stations built')
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
ip_test.exit_fail()
|
||||
print('Starting Stations')
|
||||
ip_test.start(False, False)
|
||||
print('Stopping Stations')
|
||||
ip_test.stop()
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
ip_test.exit_fail()
|
||||
time.sleep(30)
|
||||
ip_test.cleanup(station_list)
|
||||
if ip_test.passes():
|
||||
print("Full test passed, all endpoints had increased bytes-rd throughout test duration")
|
||||
ip_test.exit_success()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
240
py-scripts/scripts_deprecated/test_ipv4_l4_ftp_upload.py
Executable file
240
py-scripts/scripts_deprecated/test_ipv4_l4_ftp_upload.py
Executable file
@@ -0,0 +1,240 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
NAME: test_ipv4_l4_ftp_upload.py
|
||||
|
||||
PURPOSE:
|
||||
test_ipv4_l4_ftp_upload.py will create stations and endpoints to generate and verify layer-4 upload traffic over an ftp connection
|
||||
|
||||
This script will monitor the bytes-wr attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
|
||||
|
||||
EXAMPLE:
|
||||
./test_ipv4_l4_ftp_upload.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
|
||||
--ssid netgear --passwd admin123 --test_duration 2m --url "ul ftp://10.40.0.1 /dev/null" --requests_per_ten 600
|
||||
--debug
|
||||
|
||||
Use './test_ipv4_l4_ftp_upload.py --help' to see command line usage and options
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
License: Free to distribute and modify. LANforge systems must be licensed.
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append('../../py-json')
|
||||
|
||||
import argparse
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge.LFUtils import *
|
||||
from LANforge import LFUtils
|
||||
import realm
|
||||
import time
|
||||
import datetime
|
||||
|
||||
|
||||
class IPV4L4(LFCliBase):
|
||||
def __init__(self, host, port, ssid, security, password, url, requests_per_ten, station_list, number_template="00000",
|
||||
upstream_port="eth1", radio="wiphy0",
|
||||
test_duration="5m",
|
||||
_debug_on=False,
|
||||
_exit_on_error=False,
|
||||
_exit_on_fail=False):
|
||||
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.ssid = ssid
|
||||
self.radio = radio
|
||||
self.upstream_port = upstream_port
|
||||
self.security = security
|
||||
self.password = password
|
||||
self.url = url
|
||||
self.requests_per_ten = requests_per_ten
|
||||
self.number_template = number_template
|
||||
self.sta_list = station_list
|
||||
self.test_duration = test_duration
|
||||
|
||||
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
||||
self.station_profile = self.local_realm.new_station_profile()
|
||||
self.cx_profile = self.local_realm.new_l4_cx_profile()
|
||||
|
||||
self.station_profile.lfclient_url = self.lfclient_url
|
||||
print("##### station_profile.lfclient_url: {}".format(self.station_profile.lfclient_url))
|
||||
self.station_profile.ssid = self.ssid
|
||||
self.station_profile.ssid_pass = self.password,
|
||||
self.station_profile.security = self.security
|
||||
self.station_profile.number_template_ = self.number_template
|
||||
self.station_profile.mode = 0
|
||||
|
||||
self.cx_profile.url = self.url
|
||||
self.cx_profile.requests_per_ten = self.requests_per_ten
|
||||
|
||||
self.port_util = realm.PortUtils(self.local_realm)
|
||||
|
||||
def __compare_vals(self, old_list, new_list):
|
||||
passes = 0
|
||||
expected_passes = 0
|
||||
if len(old_list) == len(new_list):
|
||||
for item, value in old_list.items():
|
||||
expected_passes += 1
|
||||
if new_list[item] > old_list[item]:
|
||||
passes += 1
|
||||
# print(item, old_list[item], new_list[item], passes, expected_passes)
|
||||
|
||||
if passes == expected_passes:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
def __get_values(self):
|
||||
time.sleep(1)
|
||||
cx_list = self.json_get("layer4/list?fields=name,bytes-rd", debug_=self.debug)
|
||||
# print("==============\n", cx_list, "\n==============")
|
||||
cx_map = {}
|
||||
for cx_name in cx_list['endpoint']:
|
||||
if cx_name != 'uri' and cx_name != 'handler':
|
||||
for item, value in cx_name.items():
|
||||
for value_name, value_rx in value.items():
|
||||
if item in self.cx_profile.created_cx.keys() and value_name == 'bytes-wr':
|
||||
cx_map[item] = value_rx
|
||||
return cx_map
|
||||
|
||||
def build(self):
|
||||
# Build stations
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.set_number_template(self.number_template)
|
||||
print("Creating stations")
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
|
||||
self._pass("PASS: Station build finished")
|
||||
|
||||
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug, suppress_related_commands_=True)
|
||||
|
||||
def start(self, print_pass=False, print_fail=False):
|
||||
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=True)
|
||||
temp_stas = self.sta_list.copy()
|
||||
self.station_profile.admin_up()
|
||||
# temp_stas.append(self.local_realm.name_to_eid(self.upstream_port)[2])
|
||||
if self.local_realm.wait_for_ip(temp_stas):
|
||||
self._pass("All stations got IPs", print_pass)
|
||||
else:
|
||||
self._fail("Stations failed to get IPs", print_fail)
|
||||
exit(1)
|
||||
cur_time = datetime.datetime.now()
|
||||
old_rx_values = self.__get_values()
|
||||
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
|
||||
self.cx_profile.start_cx()
|
||||
passes = 0
|
||||
expected_passes = 0
|
||||
print("Starting Test...")
|
||||
while cur_time < end_time:
|
||||
interval_time = cur_time + datetime.timedelta(minutes=1)
|
||||
while cur_time < interval_time:
|
||||
cur_time = datetime.datetime.now()
|
||||
time.sleep(1)
|
||||
|
||||
new_rx_values = self.__get_values()
|
||||
# print(old_rx_values, new_rx_values)
|
||||
# print("\n-----------------------------------")
|
||||
# print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
|
||||
# print("-----------------------------------\n")
|
||||
expected_passes += 1
|
||||
if self.__compare_vals(old_rx_values, new_rx_values):
|
||||
passes += 1
|
||||
else:
|
||||
self._fail("FAIL: Not all stations increased traffic", print_fail)
|
||||
break
|
||||
old_rx_values = new_rx_values
|
||||
cur_time = datetime.datetime.now()
|
||||
if passes == expected_passes:
|
||||
self._pass("PASS: All tests passes", print_pass)
|
||||
|
||||
def stop(self):
|
||||
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=False)
|
||||
self.cx_profile.stop_cx()
|
||||
self.station_profile.admin_down()
|
||||
|
||||
def cleanup(self, sta_list):
|
||||
self.cx_profile.cleanup()
|
||||
self.station_profile.cleanup(sta_list)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
|
||||
debug=self.debug)
|
||||
|
||||
|
||||
def main():
|
||||
lfjson_port = 8080
|
||||
|
||||
parser = LFCliBase.create_basic_argparse(
|
||||
prog='test_ipv4_l4_ftp',
|
||||
# formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog='''\
|
||||
Create layer-4 endpoints to an ftp server and test that the bytes-wr are increasing over the duration
|
||||
of the test
|
||||
''',
|
||||
|
||||
description='''\
|
||||
test_ipv4_l4_ftp_upload.py
|
||||
--------------------
|
||||
Generic command example:
|
||||
./test_ipv4_l4_ftp_upload.py --upstream_port eth1 \\
|
||||
--radio wiphy0 \\
|
||||
--num_stations 3 \\
|
||||
--security {open|wep|wpa|wpa2|wpa3} \\
|
||||
--ssid netgear \\
|
||||
--passwd admin123 \\
|
||||
--test_duration 2m \\
|
||||
--url "ul ftp://10.40.0.1 /dev/null" \\
|
||||
--requests_per_ten 600 \\
|
||||
--debug
|
||||
''')
|
||||
|
||||
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="5m")
|
||||
parser.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
|
||||
parser.add_argument('--url', help='--url specifies upload/download, address, and dest',
|
||||
default="ul ftp://10.40.0.1 /dev/null")
|
||||
|
||||
args = parser.parse_args()
|
||||
num_sta = 2
|
||||
if (args.num_stations is not None) and (int(args.num_stations) > 0):
|
||||
num_stations_converted = int(args.num_stations)
|
||||
num_sta = num_stations_converted
|
||||
|
||||
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
|
||||
radio=args.radio)
|
||||
|
||||
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
|
||||
ssid=args.ssid,
|
||||
password=args.passwd,
|
||||
security=args.security,
|
||||
station_list=station_list,
|
||||
url=args.url,
|
||||
test_duration=args.test_duration,
|
||||
requests_per_ten=args.requests_per_ten,
|
||||
_debug_on=args.debug,
|
||||
upstream_port=args.upstream_port)
|
||||
ip_test.cleanup(station_list)
|
||||
ip_test.build()
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
exit(1)
|
||||
ip_test.start(False, False)
|
||||
ip_test.stop()
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
exit(1)
|
||||
time.sleep(30)
|
||||
ip_test.cleanup(station_list)
|
||||
if ip_test.passes():
|
||||
print("Full test passed, all endpoints had increased bytes-rd throughout test duration")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
251
py-scripts/scripts_deprecated/test_ipv4_l4_ftp_urls_per_ten.py
Executable file
251
py-scripts/scripts_deprecated/test_ipv4_l4_ftp_urls_per_ten.py
Executable file
@@ -0,0 +1,251 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
NAME: test_ipv4_l4_ftp_urls_per_ten.py
|
||||
|
||||
PURPOSE:
|
||||
test_ipv4_l4_ftp_urls_per_ten.py will create stations and endpoints to generate and verify layer-4 traffic over an ftp connection
|
||||
|
||||
This script will monitor the urls/s attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
|
||||
|
||||
EXAMPLE:
|
||||
./test_ipv4_l4_ftp_urls_per_ten.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
|
||||
--ssid netgear --passwd admin123 --test_duration 2m --interval 1s --mode 1 --ap "00:0e:8e:78:e1:76" --requests_per_ten 600
|
||||
--num_tests 1 --url "ul ftp://lanforge:lanforge@10.40.0.1/example.txt /home/lanforge/example.txt" --debug
|
||||
|
||||
Use './test_ipv4_l4_ftp_urls_per_ten.py --help' to see command line usage and options
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
License: Free to distribute and modify. LANforge systems must be licensed.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
|
||||
|
||||
import argparse
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge.LFUtils import *
|
||||
from LANforge import LFUtils
|
||||
import realm
|
||||
import time
|
||||
import datetime
|
||||
|
||||
|
||||
class IPV4L4(LFCliBase):
|
||||
def __init__(self, ssid, security, password, url, requests_per_ten, station_list,test_duration="2m",host="localhost", port=8080,
|
||||
number_template="00000", num_tests=1, radio="wiphy0", mode=0, ap=None,
|
||||
_debug_on=False, upstream_port="eth1",
|
||||
_exit_on_error=False,
|
||||
_exit_on_fail=False):
|
||||
super().__init__(host,
|
||||
port,
|
||||
_debug=_debug_on,
|
||||
_local_realm = realm.Realm(lfclient_host=host, lfclient_port=port),
|
||||
_exit_on_fail=_exit_on_fail)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.radio = radio
|
||||
self.upstream_port = upstream_port
|
||||
self.ssid = ssid
|
||||
self.security = security
|
||||
self.password = password
|
||||
self.mode = mode
|
||||
self.ap=ap
|
||||
self.url = url
|
||||
self.requests_per_ten = requests_per_ten
|
||||
self.number_template = number_template
|
||||
self.sta_list = station_list
|
||||
self.num_tests = num_tests
|
||||
self.target_requests_per_ten = requests_per_ten
|
||||
self.test_duration = test_duration
|
||||
|
||||
self.station_profile = self.local_realm.new_station_profile()
|
||||
self.cx_profile = self.local_realm.new_l4_cx_profile()
|
||||
|
||||
self.station_profile.lfclient_url = self.lfclient_url
|
||||
self.station_profile.ssid = self.ssid
|
||||
self.station_profile.ssid_pass = self.password,
|
||||
self.station_profile.security = self.security
|
||||
self.station_profile.number_template_ = self.number_template
|
||||
self.station_profile.mode = self.mode
|
||||
if self.ap is not None:
|
||||
self.station_profile.set_command_param("add_sta", "ap",self.ap)
|
||||
self.cx_profile.url = self.url
|
||||
print(self.cx_profile.url)
|
||||
self.cx_profile.requests_per_ten = self.requests_per_ten
|
||||
|
||||
self.port_util = realm.PortUtils(self.local_realm)
|
||||
|
||||
def __check_request_rate(self):
|
||||
endp_list = self.json_get("layer4/list?fields=urls/s")
|
||||
expected_passes = 0
|
||||
passes = 0
|
||||
if endp_list is not None and endp_list['endpoint'] is not None:
|
||||
endp_list = endp_list['endpoint']
|
||||
for item in endp_list:
|
||||
for name, info in item.items():
|
||||
if name in self.cx_profile.created_cx.keys():
|
||||
expected_passes += 1
|
||||
if info['urls/s'] * self.requests_per_ten >= self.target_requests_per_ten * .9:
|
||||
# print(name, info['urls/s'], info['urls/s'] * self.requests_per_ten, self.target_requests_per_ten * .9)
|
||||
passes += 1
|
||||
|
||||
return passes == expected_passes
|
||||
|
||||
def build(self):
|
||||
# Build stations
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
print("Creating stations")
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
|
||||
self._pass("PASS: Station build finished")
|
||||
|
||||
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug, suppress_related_commands_=None)
|
||||
|
||||
def start(self, print_pass=False, print_fail=False):
|
||||
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=True)
|
||||
temp_stas = self.sta_list.copy()
|
||||
|
||||
self.station_profile.admin_up()
|
||||
if self.local_realm.wait_for_ip(temp_stas):
|
||||
self._pass("All stations got IPs", print_pass)
|
||||
else:
|
||||
self._fail("Stations failed to get IPs", print_fail)
|
||||
exit(1)
|
||||
self.cx_profile.start_cx()
|
||||
print("Starting test")
|
||||
curr_time = datetime.datetime.now()
|
||||
end_time = self.local_realm.parse_time(self.test_duration) + curr_time
|
||||
sleep_interval = self.local_realm.parse_time(self.test_duration) // 5
|
||||
passes = 0
|
||||
expected_passes = 0
|
||||
for test in range(self.num_tests):
|
||||
expected_passes += 1
|
||||
while curr_time < end_time:
|
||||
time.sleep(sleep_interval.total_seconds())
|
||||
curr_time = datetime.datetime.now()
|
||||
|
||||
if self.cx_profile.check_errors(self.debug):
|
||||
if self.__check_request_rate():
|
||||
passes += 1
|
||||
else:
|
||||
self._fail("FAIL: Request rate did not exceed 90% target rate", print_fail)
|
||||
break
|
||||
else:
|
||||
self._fail("FAIL: Errors found getting to %s " % self.url, print_fail)
|
||||
break
|
||||
if passes == expected_passes:
|
||||
self._pass("PASS: All tests passes", print_pass)
|
||||
|
||||
def stop(self):
|
||||
self.cx_profile.stop_cx()
|
||||
self.port_util.set_ftp(port_name=self.local_realm.name_to_eid(self.upstream_port)[2], resource=1, on=False)
|
||||
self.station_profile.admin_down()
|
||||
|
||||
def cleanup(self, sta_list):
|
||||
self.cx_profile.cleanup()
|
||||
self.station_profile.cleanup(sta_list)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
|
||||
debug=self.debug)
|
||||
|
||||
|
||||
def main():
|
||||
lfjson_port = 8080
|
||||
parser = LFCliBase.create_basic_argparse(
|
||||
prog='test_ipv4_l4_urls_per_ten',
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog='''\
|
||||
Create layer-4 endpoints to connect to an ftp server and test that urls/s are meeting or exceeding the target rate
|
||||
''',
|
||||
|
||||
description='''\
|
||||
test_ipv4_l4_ftp_urls_per_ten.py
|
||||
--------------------
|
||||
Generic command example:
|
||||
python3 ./test_ipv4_l4_ftp_urls_per_ten.py --upstream_port eth1 \\
|
||||
--radio wiphy0 \\
|
||||
--num_stations 3 \\
|
||||
--security {open|wep|wpa|wpa2|wpa3} \\
|
||||
--ssid netgear \\
|
||||
--passwd admin123 \\
|
||||
--test_duration 2m \\ {2m | 30s | 3h | 1d ...etc}
|
||||
--interval 1s \\
|
||||
--mode 1
|
||||
{"auto" : "0",
|
||||
"a" : "1",
|
||||
"b" : "2",
|
||||
"g" : "3",
|
||||
"abg" : "4",
|
||||
"abgn" : "5",
|
||||
"bgn" : "6",
|
||||
"bg" : "7",
|
||||
"abgnAC" : "8",
|
||||
"anAC" : "9",
|
||||
"an" : "10",
|
||||
"bgnAC" : "11",
|
||||
"abgnAX" : "12",
|
||||
"bgnAX" : "13",
|
||||
--ap "00:0e:8e:78:e1:76"
|
||||
--requests_per_ten 600 \\
|
||||
--num_tests 1 \\
|
||||
--url "ul ftp://lanforge:lanforge@10.40.0.1/example.txt /home/lanforge/example.txt"
|
||||
--debug
|
||||
''')
|
||||
optional = parser.add_argument_group('optional arguments')
|
||||
required = parser.add_argument_group('required arguments')
|
||||
required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True)
|
||||
parser.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
|
||||
parser.add_argument('--test_duration', help='--test duration of a single test', default=600)
|
||||
parser.add_argument('--num_tests', help='--num_tests number of tests to run. Each test runs 10 minutes', default=1)
|
||||
parser.add_argument('--url', help='--url specifies upload/download, address, and dest',
|
||||
default="dl http://10.40.0.1 /dev/null")
|
||||
optional.add_argument('--mode',help='Used to force mode of stations')
|
||||
optional.add_argument('--ap',help='Used to force a connection to a particular AP')
|
||||
#parser.add_argument('--target_per_ten', help='--target_per_ten target number of request per ten minutes. test will check for 90 percent of this value',
|
||||
#default=600)
|
||||
args = parser.parse_args()
|
||||
|
||||
num_sta = 2
|
||||
if (args.num_stations is not None) and (int(args.num_stations) > 0):
|
||||
num_stations_converted = int(args.num_stations)
|
||||
num_sta = num_stations_converted
|
||||
|
||||
|
||||
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
|
||||
radio=args.radio)
|
||||
|
||||
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
|
||||
ssid=args.ssid,
|
||||
password=args.passwd,
|
||||
upstream_port=args.upstream_port,
|
||||
radio= args.radio,
|
||||
security=args.security,
|
||||
station_list=station_list,
|
||||
url=args.url,
|
||||
mode=args.mode,
|
||||
ap=args.ap,
|
||||
num_tests=args.num_tests,
|
||||
requests_per_ten=args.requests_per_ten,
|
||||
test_duration=args.test_duration,
|
||||
_debug_on=args.debug)
|
||||
ip_test.cleanup(station_list)
|
||||
ip_test.build()
|
||||
ip_test.start()
|
||||
ip_test.stop()
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
exit(1)
|
||||
time.sleep(30)
|
||||
ip_test.cleanup(station_list)
|
||||
if ip_test.passes():
|
||||
print("Full test passed, all endpoints met or exceeded 90 percent of the target rate")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
254
py-scripts/scripts_deprecated/test_ipv4_l4_ftp_wifi.py
Executable file
254
py-scripts/scripts_deprecated/test_ipv4_l4_ftp_wifi.py
Executable file
@@ -0,0 +1,254 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
NAME: test_ipv4_l4_ftp_wifi.py
|
||||
|
||||
PURPOSE:
|
||||
test_ipv4_l4_ftp_wifi.py will create stations and endpoints to generate and verify layer-4 traffic over an ftp connection
|
||||
|
||||
This script will monitor the bytes-wr attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
|
||||
|
||||
EXAMPLE:
|
||||
./test_ipv4_l4_ftp_wifi.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
|
||||
--ssid netgear --passwd admin123 --dest 10.40.0.1 --test_duration 2m --interval 1s --requests_per_ten 600
|
||||
--dest /var/www/html/data_slug_4K.bin --source /tmp/data_slug_4K.bin --ftp_user lanforge --ftp_passwd lanforge
|
||||
--debug
|
||||
|
||||
Use './test_ipv4_l4_ftp_wifi.py --help' to see command line usage and options
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
License: Free to distribute and modify. LANforge systems must be licensed.
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append('../../py-json')
|
||||
|
||||
import argparse
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge.LFUtils import *
|
||||
from LANforge import LFUtils
|
||||
import realm
|
||||
import time
|
||||
import datetime
|
||||
|
||||
|
||||
class IPV4L4(LFCliBase):
|
||||
def __init__(self, host, port, ssid, security, password, requests_per_ten, station_list, number_template="00000",
|
||||
upstream_port="eth1", radio="wiphy0", ftp_user="localhost", ftp_passwd="localhost",
|
||||
source="", dest="",
|
||||
test_duration="5m",
|
||||
_debug_on=False,
|
||||
_exit_on_error=False,
|
||||
_exit_on_fail=False):
|
||||
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.ssid = ssid
|
||||
self.radio = radio
|
||||
self.upstream_port = upstream_port
|
||||
self.security = security
|
||||
self.password = password
|
||||
self.requests_per_ten = requests_per_ten
|
||||
self.number_template = number_template
|
||||
self.sta_list = station_list
|
||||
self.test_duration = test_duration
|
||||
|
||||
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
||||
self.station_profile = self.local_realm.new_station_profile()
|
||||
self.cx_profile = self.local_realm.new_http_profile()
|
||||
|
||||
self.station_profile.lfclient_url = self.lfclient_url
|
||||
self.ftp_user = ftp_user
|
||||
self.ftp_passwd = ftp_passwd
|
||||
self.source = source
|
||||
self.dest = dest
|
||||
self.station_profile.ssid = self.ssid
|
||||
self.station_profile.ssid_pass = self.password,
|
||||
self.station_profile.security = self.security
|
||||
self.station_profile.number_template_ = self.number_template
|
||||
self.station_profile.mode = 0
|
||||
|
||||
self.cx_profile.requests_per_ten = self.requests_per_ten
|
||||
|
||||
self.port_util = realm.PortUtils(self.local_realm)
|
||||
|
||||
def __compare_vals(self, old_list, new_list):
|
||||
passes = 0
|
||||
expected_passes = 0
|
||||
if len(old_list) == len(new_list):
|
||||
for item, value in old_list.items():
|
||||
expected_passes += 1
|
||||
if new_list[item] > old_list[item]:
|
||||
passes += 1
|
||||
# print(item, old_list[item], new_list[item], passes, expected_passes)
|
||||
|
||||
if passes == expected_passes:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
def __get_values(self):
|
||||
time.sleep(1)
|
||||
cx_list = self.json_get("layer4/list?fields=name,bytes-wr", debug_=self.debug)
|
||||
# print("==============\n", cx_list, "\n==============")
|
||||
cx_map = {}
|
||||
for cx_name in cx_list['endpoint']:
|
||||
if cx_name != 'uri' and cx_name != 'handler':
|
||||
for item, value in cx_name.items():
|
||||
for value_name, value_rx in value.items():
|
||||
if item in self.cx_profile.created_cx.keys() and value_name == 'bytes-wr':
|
||||
cx_map[item] = value_rx
|
||||
return cx_map
|
||||
|
||||
def build(self):
|
||||
# Build stations
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.set_number_template(self.number_template)
|
||||
print("Creating stations")
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
|
||||
self._pass("PASS: Station build finished")
|
||||
|
||||
self.station_profile.admin_up()
|
||||
if self.local_realm.wait_for_ip(self.sta_list):
|
||||
self._pass("All stations got IPs")
|
||||
else:
|
||||
self._fail("Stations failed to get IPs")
|
||||
exit(1)
|
||||
self.cx_profile.direction = "ul"
|
||||
self.cx_profile.dest = self.dest
|
||||
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug,
|
||||
suppress_related_commands_=True, ftp=True, user=self.ftp_user, passwd=self.ftp_passwd,
|
||||
source=self.source)
|
||||
|
||||
def start(self, print_pass=False, print_fail=False):
|
||||
print("Starting Test...")
|
||||
self.cx_profile.start_cx()
|
||||
cur_time = datetime.datetime.now()
|
||||
old_rx_values = self.__get_values()
|
||||
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
|
||||
passes = 0
|
||||
expected_passes = 0
|
||||
while cur_time < end_time:
|
||||
interval_time = cur_time + datetime.timedelta(minutes=1)
|
||||
while cur_time < interval_time:
|
||||
cur_time = datetime.datetime.now()
|
||||
time.sleep(1)
|
||||
|
||||
new_rx_values = self.__get_values()
|
||||
# print(old_rx_values, new_rx_values)
|
||||
# print("\n-----------------------------------")
|
||||
# print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
|
||||
# print("-----------------------------------\n")
|
||||
expected_passes += 1
|
||||
if self.__compare_vals(old_rx_values, new_rx_values):
|
||||
passes += 1
|
||||
else:
|
||||
self._fail("FAIL: Not all stations increased traffic", print_fail)
|
||||
break
|
||||
old_rx_values = new_rx_values
|
||||
cur_time = datetime.datetime.now()
|
||||
if passes == expected_passes:
|
||||
self._pass("PASS: All tests passes", print_pass)
|
||||
|
||||
def stop(self):
|
||||
self.cx_profile.stop_cx()
|
||||
self.station_profile.admin_down()
|
||||
|
||||
def cleanup(self, sta_list):
|
||||
self.cx_profile.cleanup()
|
||||
self.station_profile.cleanup(sta_list)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
|
||||
debug=self.debug)
|
||||
|
||||
|
||||
def main():
|
||||
lfjson_port = 8080
|
||||
|
||||
parser = LFCliBase.create_basic_argparse(
|
||||
prog='test_ipv4_l4_ftp_wifi.py',
|
||||
# formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog='''\
|
||||
Create layer-4 endpoints and test that the bytes-wr from the chosen URL are increasing over the
|
||||
duration of the test
|
||||
''',
|
||||
|
||||
description='''\
|
||||
test_ipv4_l4_ftp_wifi.py
|
||||
--------------------
|
||||
Generic command example:
|
||||
python3 ./test_ipv4_l4_ftp_wifi.py --upstream_port eth1 \\
|
||||
--radio wiphy0 \\
|
||||
--num_stations 3 \\
|
||||
--security {open|wep|wpa|wpa2|wpa3} \\
|
||||
--ssid netgear \\
|
||||
--passwd admin123 \\
|
||||
--dest 10.40.0.1 \\
|
||||
--test_duration 2m \\
|
||||
--interval 1s \\
|
||||
--requests_per_ten 600 \\
|
||||
--dest /var/www/html/data_slug_4K.bin \\
|
||||
--source /tmp/data_slug_4K.bin \\
|
||||
--ftp_user lanforge \\
|
||||
--ftp_passwd lanforge \\
|
||||
--debug
|
||||
''')
|
||||
|
||||
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="5m")
|
||||
parser.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
|
||||
parser.add_argument('--dest', help='--dest specifies the destination for the file', default="/var/www/html/data_slug_4K.bin")
|
||||
parser.add_argument('--source', help='--source specifies the source of the file',
|
||||
default="/tmp/data_slug_4K.bin")
|
||||
parser.add_argument('--ftp_user', help='--ftp_user sets the username to be used for ftp', default="lanforge")
|
||||
parser.add_argument('--ftp_passwd', help='--ftp_user sets the password to be used for ftp', default="lanforge")
|
||||
|
||||
args = parser.parse_args()
|
||||
num_sta = 2
|
||||
if (args.num_stations is not None) and (int(args.num_stations) > 0):
|
||||
num_stations_converted = int(args.num_stations)
|
||||
num_sta = num_stations_converted
|
||||
|
||||
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
|
||||
radio=args.radio)
|
||||
|
||||
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
|
||||
ssid=args.ssid,
|
||||
password=args.passwd,
|
||||
security=args.security,
|
||||
station_list=station_list,
|
||||
test_duration=args.test_duration,
|
||||
requests_per_ten=args.requests_per_ten,
|
||||
_debug_on=args.debug,
|
||||
upstream_port=args.upstream_port,
|
||||
ftp_user=args.ftp_user,
|
||||
ftp_passwd=args.ftp_passwd,
|
||||
dest=args.dest,
|
||||
source=args.source)
|
||||
ip_test.cleanup(station_list)
|
||||
ip_test.build()
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
exit(1)
|
||||
ip_test.start(False, False)
|
||||
ip_test.stop()
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
exit(1)
|
||||
time.sleep(30)
|
||||
ip_test.cleanup(station_list)
|
||||
if ip_test.passes():
|
||||
print("Full test passed, all endpoints had increased bytes-wr throughout test duration")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
299
py-scripts/scripts_deprecated/test_ipv4_l4_urls_per_ten.py
Executable file
299
py-scripts/scripts_deprecated/test_ipv4_l4_urls_per_ten.py
Executable file
@@ -0,0 +1,299 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
NAME: test_ipv4_l4_urls_per_ten.py
|
||||
|
||||
PURPOSE:
|
||||
test_ipv4_l4_urls_per_ten.py will create stations and endpoints to generate and verify layer-4 traffic
|
||||
|
||||
This script will monitor the urls/s attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
|
||||
|
||||
EXAMPLE:
|
||||
./test_ipv4_l4_urls_per_ten.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security {open|wep|wpa|wpa2|wpa3}
|
||||
--ssid netgear --passwd admin123 --requests_per_ten 600 --mode 1 --num_tests 1 --url "dl http://10.40.0.1 /dev/null"
|
||||
--ap "00:0e:8e:78:e1:76" --target_per_ten 600 --output_format csv --report_file ~/Documents/results.csv --test_duration 2m
|
||||
--debug
|
||||
|
||||
|
||||
Use './test_ipv4_l4_urls_per_ten.py --help' to see command line usage and options
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
License: Free to distribute and modify. LANforge systems must be licensed.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append(os.path.join(os.path.abspath('../..'), 'py-json'))
|
||||
|
||||
import argparse
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge import LFUtils
|
||||
import realm
|
||||
import time
|
||||
import datetime
|
||||
from realm import TestGroupProfile
|
||||
|
||||
|
||||
class IPV4L4(LFCliBase):
|
||||
def __init__(self,
|
||||
host="localhost",
|
||||
port=8080,
|
||||
ssid=None,
|
||||
security=None,
|
||||
password=None,
|
||||
url=None,
|
||||
requests_per_ten=None,
|
||||
station_list=None,
|
||||
test_duration="2m",
|
||||
ap=None,
|
||||
mode=0,
|
||||
target_requests_per_ten=60,
|
||||
number_template="00000",
|
||||
num_tests=1,
|
||||
radio="wiphy0",
|
||||
_debug_on=False,
|
||||
upstream_port="eth1",
|
||||
_exit_on_error=False,
|
||||
_exit_on_fail=False):
|
||||
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
|
||||
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.radio = radio
|
||||
self.upstream_port = upstream_port
|
||||
self.ssid = ssid
|
||||
self.security = security
|
||||
self.password = password
|
||||
self.url = url
|
||||
self.mode=mode
|
||||
self.ap=ap
|
||||
self.debug=_debug_on
|
||||
self.requests_per_ten = int(requests_per_ten)
|
||||
self.number_template = number_template
|
||||
self.test_duration=test_duration
|
||||
self.sta_list = station_list
|
||||
self.num_tests = int(num_tests)
|
||||
self.target_requests_per_ten = int(target_requests_per_ten)
|
||||
|
||||
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
||||
self.l4cxprofile=realm.L4CXProfile(lfclient_host=host,
|
||||
lfclient_port=port,local_realm=self.local_realm)
|
||||
self.station_profile = self.local_realm.new_station_profile()
|
||||
self.cx_profile = self.local_realm.new_l4_cx_profile()
|
||||
|
||||
self.station_profile.lfclient_url = self.lfclient_url
|
||||
self.station_profile.ssid = self.ssid
|
||||
self.station_profile.ssid_pass = self.password,
|
||||
self.station_profile.security = self.security
|
||||
self.station_profile.number_template_ = self.number_template
|
||||
self.station_profile.mode = self.mode
|
||||
if self.ap is not None:
|
||||
self.station_profile.set_command_param("add_sta", "ap",self.ap)
|
||||
|
||||
self.cx_profile.url = self.url
|
||||
self.cx_profile.requests_per_ten = self.requests_per_ten
|
||||
|
||||
|
||||
def build(self):
|
||||
# Build stations
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
print("Creating stations")
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
|
||||
self._pass("PASS: Station build finished")
|
||||
|
||||
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug, suppress_related_commands_=None)
|
||||
|
||||
def start(self, print_pass=False, print_fail=False):
|
||||
temp_stas = self.sta_list.copy()
|
||||
# temp_stas.append(self.local_realm.name_to_eid(self.upstream_port)[2])
|
||||
|
||||
self.station_profile.admin_up()
|
||||
if self.local_realm.wait_for_ip(temp_stas):
|
||||
self._pass("All stations got IPs", print_pass)
|
||||
else:
|
||||
self._fail("Stations failed to get IPs", print_fail)
|
||||
exit(1)
|
||||
self.cx_profile.start_cx()
|
||||
print("Starting test...")
|
||||
|
||||
def stop(self):
|
||||
self.cx_profile.stop_cx()
|
||||
self.station_profile.admin_down()
|
||||
|
||||
def cleanup(self, sta_list):
|
||||
self.cx_profile.cleanup()
|
||||
self.station_profile.cleanup(sta_list)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
|
||||
debug=self.debug)
|
||||
|
||||
|
||||
def main():
|
||||
parser = LFCliBase.create_basic_argparse(
|
||||
prog='test_ipv4_l4_urls_per_ten',
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog='''\
|
||||
Create layer-4 endpoints to connect to a url and test that urls/s are meeting or exceeding the target rate
|
||||
''',
|
||||
description='''\
|
||||
test_ipv4_l4_urls_per_ten.py:
|
||||
--------------------
|
||||
Generic command example:
|
||||
python3 ./test_ipv4_l4_urls_per_ten.py
|
||||
--upstream_port eth1 \\
|
||||
--radio wiphy0 \\
|
||||
--num_stations 3 \\
|
||||
--security {open|wep|wpa|wpa2|wpa3} \\
|
||||
--ssid netgear \\
|
||||
--passwd admin123 \\
|
||||
--requests_per_ten 600 \\
|
||||
--mode 1
|
||||
{"auto" : "0",
|
||||
"a" : "1",
|
||||
"b" : "2",
|
||||
"g" : "3",
|
||||
"abg" : "4",
|
||||
"abgn" : "5",
|
||||
"bgn" : "6",
|
||||
"bg" : "7",
|
||||
"abgnAC" : "8",
|
||||
"anAC" : "9",
|
||||
"an" : "10",
|
||||
"bgnAC" : "11",
|
||||
"abgnAX" : "12",
|
||||
"bgnAX" : "13"} \\
|
||||
--num_tests 1 \\
|
||||
--url "dl http://10.40.0.1 /dev/null" \\
|
||||
--ap "00:0e:8e:78:e1:76"
|
||||
--target_per_ten 600 \\
|
||||
--output_format csv \\
|
||||
--report_file ~/Documents/results.csv \\
|
||||
--test_duration 2m \\
|
||||
--debug
|
||||
''')
|
||||
required = None
|
||||
for agroup in parser._action_groups:
|
||||
if agroup.title == "required arguments":
|
||||
required = agroup
|
||||
#if required is not None:
|
||||
|
||||
optional = None
|
||||
for agroup in parser._action_groups:
|
||||
if agroup.title == "optional arguments":
|
||||
optional = agroup
|
||||
|
||||
if optional is not None:
|
||||
optional.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
|
||||
optional.add_argument('--num_tests', help='--num_tests number of tests to run. Each test runs 10 minutes', default=1)
|
||||
optional.add_argument('--url', help='--url specifies upload/download, address, and dest',default="dl http://10.40.0.1 /dev/null")
|
||||
optional.add_argument('--test_duration', help='duration of test',default="2m")
|
||||
optional.add_argument('--target_per_ten', help='--target_per_ten target number of request per ten minutes. test will check for 90 percent this value',default=600)
|
||||
optional.add_argument('--mode',help='Used to force mode of stations')
|
||||
optional.add_argument('--ap',help='Used to force a connection to a particular AP')
|
||||
optional.add_argument('--report_file',help='where you want to store results')
|
||||
optional.add_argument('--output_format', help='choose csv or xlsx') #update once other forms are completed
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
num_sta = 2
|
||||
if (args.num_stations is not None) and (int(args.num_stations) > 0):
|
||||
num_stations_converted = int(args.num_stations)
|
||||
num_sta = num_stations_converted
|
||||
if args.report_file is None:
|
||||
if args.output_format in ['csv','json','html','hdf','stata','pickle','pdf','parquet','png','df','xlsx']:
|
||||
output_form=args.output_format.lower()
|
||||
print("Defaulting file output placement to /home/lanforge.")
|
||||
rpt_file='/home/data.' + output_form
|
||||
else:
|
||||
print("Defaulting data file output type to Excel")
|
||||
rpt_file='/home/lanforge/data.xlsx'
|
||||
output_form='xlsx'
|
||||
|
||||
else:
|
||||
rpt_file=args.report_file
|
||||
if args.output_format is None:
|
||||
output_form=str(args.report_file).split('.')[-1]
|
||||
else:
|
||||
output_form=args.output_format
|
||||
|
||||
|
||||
#Create directory
|
||||
if args.report_file is None:
|
||||
try:
|
||||
homedir = str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':','-')+'test_ipv4_l4_urls_per_ten'
|
||||
path = os.path.join('/home/lanforge/report-data/',homedir)
|
||||
os.mkdir(path)
|
||||
except:
|
||||
path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
print('Saving file to local directory')
|
||||
else:
|
||||
pass
|
||||
|
||||
if args.report_file is None:
|
||||
if args.output_format in ['csv','json','html','hdf','stata','pickle','pdf','png','df','parquet','xlsx']:
|
||||
rpt_file=path+'/data.' + args.output_format
|
||||
output=args.output_format
|
||||
else:
|
||||
print('Defaulting data file output type to Excel')
|
||||
rpt_file=path+'/data.xlsx'
|
||||
output='xlsx'
|
||||
else:
|
||||
rpt_file=args.report_file
|
||||
if args.output_format is None:
|
||||
output=str(args.report_file).split('.')[-1]
|
||||
else:
|
||||
output=args.output_format
|
||||
|
||||
|
||||
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
|
||||
radio=args.radio)
|
||||
|
||||
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
|
||||
ssid=args.ssid,
|
||||
password=args.passwd,
|
||||
radio=args.radio,
|
||||
upstream_port=args.upstream_port,
|
||||
security=args.security,
|
||||
station_list=station_list,
|
||||
url=args.url,
|
||||
mode=args.mode,
|
||||
ap=args.ap,
|
||||
_debug_on=args.debug,
|
||||
test_duration=args.test_duration,
|
||||
num_tests=args.num_tests,
|
||||
target_requests_per_ten=args.target_per_ten,
|
||||
requests_per_ten=args.requests_per_ten)
|
||||
ip_test.cleanup(station_list)
|
||||
ip_test.build()
|
||||
ip_test.start()
|
||||
|
||||
try:
|
||||
layer4traffic=','.join([[*x.keys()][0] for x in ip_test.local_realm.json_get('layer4')['endpoint']])
|
||||
except:
|
||||
pass
|
||||
ip_test.l4cxprofile.monitor(col_names=['bytes-rd', 'urls/s'],
|
||||
report_file=rpt_file,
|
||||
duration_sec=ip_test.local_realm.parse_time(args.test_duration).total_seconds(),
|
||||
created_cx=layer4traffic,
|
||||
output_format=output_form,
|
||||
script_name='test_ipv4_l4_urls_per_ten',
|
||||
arguments=args,
|
||||
debug=args.debug)
|
||||
ip_test.stop()
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
exit(1)
|
||||
time.sleep(30)
|
||||
ip_test.cleanup(station_list)
|
||||
if ip_test.passes():
|
||||
print("Full test passed, all endpoints met or exceeded 90 percent of the target rate")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
246
py-scripts/scripts_deprecated/test_ipv4_l4_wifi.py
Executable file
246
py-scripts/scripts_deprecated/test_ipv4_l4_wifi.py
Executable file
@@ -0,0 +1,246 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
NAME: test_ipv4_l4_wifi.py
|
||||
|
||||
PURPOSE:
|
||||
test_ipv4_l4_wifi.py will create stations and endpoints to generate and verify layer-4 upload traffic
|
||||
|
||||
This script will monitor the bytes-rd attribute of the endpoints. If the the monitored value does not continually increase, this test will not pass.
|
||||
|
||||
EXAMPLE:
|
||||
./test_ipv4_l4_wifi.py --upstream_port eth1 --radio wiphy0 --num_stations 3 --security open --ssid netgear
|
||||
--passwd admin123 --test_duration 2m --requests_per_ten 600 --direction {ul | dl}
|
||||
--dest /dev/null (or 10.40.0.1) --debug
|
||||
|
||||
Use './test_ipv4_l4_wifi.py --help' to see command line usage and options
|
||||
Copyright 2021 Candela Technologies Inc
|
||||
License: Free to distribute and modify. LANforge systems must be licensed.
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit(1)
|
||||
|
||||
if 'py-json' not in sys.path:
|
||||
sys.path.append('../../py-json')
|
||||
|
||||
import argparse
|
||||
from LANforge.lfcli_base import LFCliBase
|
||||
from LANforge.LFUtils import *
|
||||
from LANforge import LFUtils
|
||||
import argparse
|
||||
import realm
|
||||
import time
|
||||
import datetime
|
||||
|
||||
|
||||
class IPV4L4(LFCliBase):
|
||||
def __init__(self, host, port, ssid, security, password, requests_per_ten, station_list,
|
||||
number_template="00000", radio="wiphy0", direction="dl", dest="/dev/null",
|
||||
test_duration="5m", upstream_port="eth1",
|
||||
_debug_on=False,
|
||||
_exit_on_error=False,
|
||||
_exit_on_fail=False):
|
||||
super().__init__(host, port, _debug=_debug_on, _exit_on_fail=_exit_on_fail)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.radio = radio
|
||||
self.upstream_port = upstream_port
|
||||
self.ssid = ssid
|
||||
self.direction = direction
|
||||
self.dest = dest
|
||||
self.security = security
|
||||
self.password = password
|
||||
self.requests_per_ten = requests_per_ten
|
||||
self.number_template = number_template
|
||||
self.sta_list = station_list
|
||||
self.test_duration = test_duration
|
||||
|
||||
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
|
||||
self.station_profile = self.local_realm.new_station_profile()
|
||||
self.cx_profile = self.local_realm.new_http_profile()
|
||||
|
||||
self.station_profile.lfclient_url = self.lfclient_url
|
||||
self.station_profile.ssid = self.ssid
|
||||
self.station_profile.ssid_pass = self.password,
|
||||
self.station_profile.security = self.security
|
||||
self.station_profile.number_template_ = self.number_template
|
||||
self.station_profile.mode = 0
|
||||
|
||||
self.cx_profile.requests_per_ten = self.requests_per_ten
|
||||
|
||||
def __compare_vals(self, old_list, new_list):
|
||||
passes = 0
|
||||
expected_passes = 0
|
||||
if len(old_list) == len(new_list):
|
||||
for item, value in old_list.items():
|
||||
expected_passes += 1
|
||||
if new_list[item] > old_list[item]:
|
||||
passes += 1
|
||||
|
||||
if passes == expected_passes:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
def __get_values(self):
|
||||
time.sleep(1)
|
||||
cx_list = self.json_get("layer4/list?fields=name,bytes-rd", debug_=self.debug)
|
||||
if self.debug:
|
||||
print("==============\n", cx_list, "\n==============")
|
||||
cx_map = {}
|
||||
for cx_name in cx_list['endpoint']:
|
||||
if cx_name != 'uri' and cx_name != 'handler':
|
||||
for item, value in cx_name.items():
|
||||
for value_name, value_rx in value.items():
|
||||
if item in self.cx_profile.created_cx.keys() and value_name == 'bytes-rd':
|
||||
cx_map[item] = value_rx
|
||||
return cx_map
|
||||
|
||||
def build(self):
|
||||
# Build stations
|
||||
self.station_profile.use_security(self.security, self.ssid, self.password)
|
||||
self.station_profile.set_number_template(self.number_template)
|
||||
print("Creating stations")
|
||||
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
|
||||
self.station_profile.set_command_param("set_port", "report_timer", 1500)
|
||||
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
|
||||
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
|
||||
self._pass("PASS: Station build finished")
|
||||
|
||||
self.station_profile.admin_up()
|
||||
if self.local_realm.wait_for_ip(self.sta_list):
|
||||
self._pass("All stations got IPs")
|
||||
else:
|
||||
self._fail("Stations failed to get IPs")
|
||||
self.cx_profile.direction = self.direction
|
||||
self.cx_profile.dest = self.dest
|
||||
self.cx_profile.create(ports=self.station_profile.station_names, sleep_time=.5, debug_=self.debug,
|
||||
suppress_related_commands_=None, http=True)
|
||||
|
||||
def start(self, print_pass=False, print_fail=False):
|
||||
cur_time = datetime.datetime.now()
|
||||
old_rx_values = self.__get_values()
|
||||
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
|
||||
self.cx_profile.start_cx()
|
||||
passes = 0
|
||||
expected_passes = 0
|
||||
print("Starting Test...")
|
||||
while cur_time < end_time:
|
||||
interval_time = cur_time + datetime.timedelta(minutes=1)
|
||||
while cur_time < interval_time:
|
||||
cur_time = datetime.datetime.now()
|
||||
time.sleep(1)
|
||||
|
||||
new_rx_values = self.__get_values()
|
||||
if self.debug:
|
||||
print(old_rx_values, new_rx_values)
|
||||
print("\n-----------------------------------")
|
||||
print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
|
||||
print("-----------------------------------\n")
|
||||
expected_passes += 1
|
||||
if self.__compare_vals(old_rx_values, new_rx_values):
|
||||
passes += 1
|
||||
else:
|
||||
self._fail("FAIL: Not all stations increased traffic", print_fail)
|
||||
break
|
||||
old_rx_values = new_rx_values
|
||||
cur_time = datetime.datetime.now()
|
||||
if passes == expected_passes:
|
||||
self._pass("PASS: All tests passes", print_pass)
|
||||
|
||||
def stop(self):
|
||||
self.cx_profile.stop_cx()
|
||||
for sta_name in self.sta_list:
|
||||
data = LFUtils.port_down_request(1, self.local_realm.name_to_eid(sta_name)[2])
|
||||
url = "cli-json/set_port"
|
||||
self.json_post(url, data)
|
||||
|
||||
def cleanup(self, sta_list):
|
||||
self.cx_profile.cleanup()
|
||||
self.station_profile.cleanup(sta_list)
|
||||
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
|
||||
debug=self.debug)
|
||||
|
||||
|
||||
def main():
|
||||
lfjson_host = "localhost"
|
||||
lfjson_port = 8080
|
||||
|
||||
parser = LFCliBase.create_basic_argparse(
|
||||
prog='test_ipv4_l4_wifi.py',
|
||||
# formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog='''\
|
||||
Create layer-4 endpoints and test that the bytes-rd from the chosen URL are increasing over the
|
||||
duration of the test
|
||||
''',
|
||||
|
||||
description='''\
|
||||
test_ipv4_l4_wifi.py:
|
||||
--------------------
|
||||
Generic command example:
|
||||
python3 ./test_ipv4_l4_wifi.py --upstream_port eth1 \\
|
||||
--radio wiphy0 \\
|
||||
--num_stations 3 \\
|
||||
--security {open|wep|wpa|wpa2|wpa3} \\
|
||||
--ssid netgear \\
|
||||
--passwd admin123 \\
|
||||
--test_duration 2m \\
|
||||
--requests_per_ten 600 \\
|
||||
--direction {ul | dl} \\
|
||||
--dest /dev/null (or 10.40.0.1)\\
|
||||
--debug
|
||||
''')
|
||||
|
||||
parser.add_argument('--test_duration', help='--test_duration sets the duration of the test', default="5m")
|
||||
parser.add_argument('--requests_per_ten', help='--requests_per_ten number of request per ten minutes', default=600)
|
||||
parser.add_argument('--direction', help='--direction <ul | dl> specifies upload or download', default="dl")
|
||||
parser.add_argument('--dest', help='--dest specifies the destination (dl) or source (ul) for the file', default="/dev/null")
|
||||
|
||||
args = parser.parse_args()
|
||||
num_sta = 2
|
||||
if (args.num_stations is not None) and (int(args.num_stations) > 0):
|
||||
num_stations_converted = int(args.num_stations)
|
||||
num_sta = num_stations_converted
|
||||
|
||||
|
||||
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
|
||||
radio=args.radio)
|
||||
|
||||
ip_test = IPV4L4(host=args.mgr, port=args.mgr_port,
|
||||
ssid=args.ssid,
|
||||
password=args.passwd,
|
||||
radio=args.radio,
|
||||
security=args.security,
|
||||
station_list=station_list,
|
||||
direction=args.direction,
|
||||
dest=args.dest,
|
||||
test_duration=args.test_duration,
|
||||
upstream_port=args.upstream_port,
|
||||
requests_per_ten=args.requests_per_ten,
|
||||
_debug_on=args.debug)
|
||||
ip_test.cleanup(station_list)
|
||||
ip_test.build()
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
ip_test.exit_fail()
|
||||
ip_test.start(False, False)
|
||||
ip_test.stop()
|
||||
if not ip_test.passes():
|
||||
print(ip_test.get_fail_message())
|
||||
ip_test.exit_fail()
|
||||
time.sleep(30)
|
||||
ip_test.cleanup(station_list)
|
||||
if ip_test.passes():
|
||||
("Full test passed, all endpoints had increased bytes-rd throughout test duration")
|
||||
ip_test.exit_success()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user