Merge /home/greearb/btbits/x64_btbits/server/lf_scripts

This commit is contained in:
Ben Greear
2020-12-10 08:02:41 -08:00
29 changed files with 2210 additions and 664 deletions

View File

@@ -50,8 +50,14 @@ class IPv4Test(LFCliBase):
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio="wiphy0", sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(station_list=self.sta_list, debug=self.debug, timeout_sec=30):
self._pass("Station build finished")
self.passed()
else:
self._fail("Stations not able to acquire IP. Please check network input.")
self.failed()
def cleanup(self, sta_list):
self.station_profile.cleanup(sta_list)

View File

@@ -0,0 +1,117 @@
#!/usr/bin/env python3
import sys
import os
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
import LANforge
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
import realm
import argparse
import time
import pprint
class IPv4Test(LFCliBase):
def __init__(self, host, port, ssid, security, password, sta_list=None, number_template="00000", radio = "wiphy0",_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
self.host = host
self.port = port
self.ssid = ssid
self.radio = radio
self.security = security
self.password = password
self.sta_list = sta_list
self.timeout = 120
self.number_template = number_template
self.debug = _debug_on
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password,
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
def build(self):
# Build stations
#print("We've gotten into the build stations function")
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(station_list=self.sta_list, debug=self.debug, timeout_sec=30):
self._pass("Station build finished")
self.exit_success()
else:
self._fail("Stations not able to acquire IP. Please check network input.")
self.exit_fail()
def cleanup(self, sta_list):
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
debug=self.debug)
def main():
lfjson_host = "localhost"
lfjson_port = 8080
parser = LFCliBase.create_basic_argparse(
prog='example_security_connection.py',
# formatter_class=argparse.RawDescriptionHelpFormatter,
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Example flags and command line input to run the script.
''',
description='''\
example_security_connection.py
--------------------
This python script creates an inputted number of stations using user-inputted security. This verifies that the most basic form of security works with the LANforge device.
--------------------
Generic command example:
python3 ./example_security_connection.py \\
--host localhost (optional) \\
--port 8080 (optional) \\
--num_stations 6 \\
--radio wiphy2
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear-wpa3 \\
--passwd admin123-wpa3 \\
--debug
''')
args = parser.parse_args()
num_sta = 2
if (args.num_stations is not None) and (int(args.num_stations) > 0):
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
station_list = LFUtils.portNameSeries(prefix_="sta",
start_id_=0,
end_id_=num_sta-1,
padding_number_=10000,
radio=args.radio)
ip_test = IPv4Test(lfjson_host, lfjson_port, ssid=args.ssid, password=args.passwd, radio=args.radio,
security=args.security, sta_list=station_list)
ip_test.cleanup(station_list)
ip_test.timeout = 60
ip_test.build()
if __name__ == "__main__":
main()

View File

@@ -51,7 +51,13 @@ class IPv4Test(LFCliBase):
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(station_list=self.sta_list, debug=self.debug, timeout_sec=30):
self._pass("Station build finished")
self.passed()
else:
self._fail("Stations not able to acquire IP. Please check network input.")
self.failed()
def cleanup(self, sta_list):
self.station_profile.cleanup(sta_list)

View File

@@ -51,8 +51,14 @@ class IPv4Test(LFCliBase):
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(station_list=self.sta_list, debug=self.debug, timeout_sec=30):
self._pass("Station build finished")
self.passed()
else:
self._fail("Stations not able to acquire IP. Please check network input.")
self.failed()
def cleanup(self, sta_list):
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list,
@@ -71,7 +77,7 @@ def main():
''',
description='''\
example_wpa_connection.py
example_wpa2_connection.py
--------------------
Generic command example:

View File

@@ -51,7 +51,14 @@ class IPv4Test(LFCliBase):
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.station_profile.admin_up()
if self.local_realm.wait_for_ip(station_list=self.sta_list, debug=self.debug, timeout_sec=30):
self._pass("Station build finished")
self.passed()
else:
self._fail("Stations not able to acquire IP. Please check network input.")
self.failed()
def cleanup(self, sta_list):
self.station_profile.cleanup(sta_list)

View File

@@ -52,8 +52,14 @@ class IPv4Test(LFCliBase):
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self.station_profile.admin_up()
self._pass("PASS: Station build finished")
if self.local_realm.wait_for_ip(station_list=self.sta_list, debug=self.debug, timeout_sec=30):
self._pass("Station build finished")
self.passed()
else:
self._fail("Stations not able to acquire IP. Please check network input.")
self.failed()
def cleanup(self, sta_list):
self.station_profile.cleanup(sta_list)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
import os
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
from realm import Realm
from LANforge.lfcli_base import LFCliBase
from LANforge import LFUtils
import argparse
import time
import datetime
import pprint
class Test1KClients(LFCliBase):
def __init__(self,
host,
port,
upstream,
side_a_min_rate=0, side_a_max_rate=56000,
side_b_min_rate=0, side_b_max_rate=56000,
num_sta_=200,
_debug_on=True,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host,
port,
_debug=_debug_on,
_local_realm=Realm(lfclient_host=host, lfclient_port=port),
_halt_on_error=_exit_on_error,
_exit_on_fail=_exit_on_fail)
self.ssid_radio_map = {
'1.1.wiphy0' : ("wpa2", "jedway-wpa2-x2048-4-4", "jedway-wpa2-x2048-4-4"),
'1.1.wiphy1' : ("wpa2", "jedway-wpa2-x2048-5-1", "jedway-wpa2-x2048-5-1"),
'1.1.wiphy2' : ("wpa2", "jedway-wpa2-x2048-4-1", "jedway-wpa2-x2048-4-1"),
'1.2.wiphy0' : ("wpa2", "jedway-wpa2-x2048-5-3", "jedway-wpa2-x2048-5-3"),
'1.2.wiphy1' : ("wpa2", "jedway-wpa2-x2048-4-4", "jedway-wpa2-x2048-4-4"),
'1.2.wiphy2' : ("wpa2", "jedway-wpa2-x2048-4-1", "jedway-wpa2-x2048-4-1"),
}
if num_sta_ is None:
raise ValueError("need a number of stations per radio")
self.num_sta = int(num_sta_)
self.station_radio_map = {
# port_name_series(prefix=prefix_, start_id=start_id_, end_id=end_id_, padding_number=padding_number_, radio=radio)
"1.1.wiphy0" : LFUtils.port_name_series(start_id=0, end_id=self.num_sta-1, padding_number=10000, radio="1.1.wiphy0"),
"1.1.wiphy1" : LFUtils.port_name_series(start_id=1000, end_id=1000+self.num_sta-1, padding_number=10000, radio="1.1.wiphy1"),
"1.1.wiphy2" : LFUtils.port_name_series(start_id=2000, end_id=2000+self.num_sta-1, padding_number=10000, radio="1.1.wiphy2"),
"1.2.wiphy0" : LFUtils.port_name_series(start_id=3000, end_id=3000+self.num_sta-1, padding_number=10000, radio="1.2.wiphy0"),
"1.2.wiphy1" : LFUtils.port_name_series(start_id=4000, end_id=4000+self.num_sta-1, padding_number=10000, radio="1.2.wiphy1"),
"1.2.wiphy2" : LFUtils.port_name_series(start_id=5000, end_id=5000+self.num_sta-1, padding_number=10000, radio="1.2.wiphy2")
}
self.upstream=upstream
self.name_prefix = "1k"
self.cx_profile = self.local_realm.new_l3_cx_profile()
self.cx_profile.name_prefix = self.name_prefix
self.cx_profile.side_a_min_bps = side_a_min_rate
self.cx_profile.side_a_max_bps = side_a_max_rate
self.cx_profile.side_b_min_bps = side_b_min_rate
self.cx_profile.side_b_max_bps = side_b_max_rate
self.station_profile_map = {}
def build(self):
for (radio, name_series) in self.station_radio_map.items():
print("building stations for %s"%radio)
if (name_series is None) or len(name_series) < 1:
print("No name series for %s"%radio)
continue
station_profile = self.local_realm.new_station_profile()
station_profile.use_security(self.ssid_radio_map[radio][0],
self.ssid_radio_map[radio][1],
self.ssid_radio_map[radio][2])
self.station_profile_map[radio] = station_profile
self._pass("defined %s station profiles" % len(self.station_radio_map))
for (radio, station_profile) in self.station_profile_map.items():
station_profile.create(radio=radio,
sta_names_=self.station_radio_map[radio],
dry_run=False,
up_=False,
debug=self.debug,
suppress_related_commands_=True,
use_radius=False,
hs20_enable=False,
sleep_time=.02)
station_profile.set_command_param("set_port", "report_timer", 1500)
station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.cx_profile.create(endp_type="lf_udp", side_a=station_profile.station_names, side_b=self.upstream, sleep_time=0)
self._pass("built stations on %s radios" % len(self.station_radio_map))
def __get_rx_values(self):
cx_list = self.json_get("endp?fields=name,rx+bytes", debug_=self.debug)
# print(self.cx_profile.created_cx.values())
#print("==============\n", cx_list, "\n==============")
cx_rx_map = {}
for cx_name in cx_list['endpoint']:
if cx_name != 'uri' and cx_name != 'handler':
for item, value in cx_name.items():
for value_name, value_rx in value.items():
if value_name == 'rx bytes' and item in self.cx_profile.created_cx.values():
cx_rx_map[item] = value_rx
return cx_rx_map
def __compare_vals(self, old_list, new_list):
passes = 0
expected_passes = 0
if len(old_list) == len(new_list):
for item, value in old_list.items():
expected_passes += 1
if new_list[item] > old_list[item]:
passes += 1
# print(item, new_list[item], old_list[item], passes, expected_passes)
if passes == expected_passes:
return True
else:
return False
else:
return False
def start(self):
print("Bringing stations up...")
prev_ip_num=0
total_num_sta=6*self.num_sta
for (radio, station_profile) in self.station_profile_map.items():
station_profile.admin_up()
self.local_realm.wait_for_ip(station_list=self.station_radio_map[radio], debug=self.debug, timeout_sec=30)
curr_ip_num = self.local_realm.get_curr_num_ips(num_sta_with_ips=prev_ip_num,station_list=self.station_radio_map[radio], debug=self.debug)
while ((prev_ip_num < curr_ip_num) and (curr_ip_num < total_num_sta)):
self.local_realm.wait_for_ip(station_list=self.station_radio_map[radio], debug=self.debug, timeout_sec=60)
prev_ip_num = curr_ip_num
curr_ip_num = self.local_realm.get_curr_num_ips(num_sta_with_ips=prev_ip_num,station_list=self.station_radio_map[radio], debug=self.debug)
if curr_ip_num == total_num_sta:
self._pass("stations on radio %s up" % radio)
else:
self._fail("FAIL: Not all stations on radio %s up" % radio)
exit(1)
cur_time = datetime.datetime.now()
# old_cx_rx_values = self.__get_rx_values() #
end_time = self.local_realm.parse_time("3m") + cur_time
self.cx_profile.start_cx() #
passes = 0
expected_passes = 0
while cur_time < end_time:
interval_time = cur_time + datetime.timedelta(minutes=1)
while cur_time < interval_time:
cur_time = datetime.datetime.now()
time.sleep(1)
cur_time = datetime.datetime.now()
if passes == expected_passes:
self._pass("PASS: All tests passed", print_pass)
def stop(self):
pass
def cleanup(self):
#self.cx_profile.cleanup_prefix()
for (radio, name_series) in self.station_radio_map.items():
sta_list= self.station_radio_map[radio]
for sta in sta_list:
self.local_realm.rm_port(sta, check_exists=True)
def main():
num_sta=200
lfjson_host = "localhost"
lfjson_port = 8080
argparser = LFCliBase.create_basic_argparse(prog=__file__,
description="creates lots of stations across multiple radios",
formatter_class=argparse.RawTextHelpFormatter)
argparser.add_argument("--sta_per_radio",
type=int,
help="number of stations per radio")
argparser.add_argument("--test_duration",
type=int,
help="length of test duration")
args = argparser.parse_args()
kilo_test = Test1KClients(lfjson_host,
lfjson_port,
upstream=args.upstream_port,
num_sta_=args.sta_per_radio,
_debug_on=args.debug)
kilo_test.cleanup()
kilo_test.build()
if not kilo_test.passes():
print("test fails")
exit(1)
kilo_test.start()
if not kilo_test.passes():
print("test fails")
exit(1)
kilo_test.stop()
if not kilo_test.passes():
print("test fails")
exit(1)
kilo_test.cleanup()
if __name__ == "__main__":
main()

View File

@@ -1,79 +1,90 @@
#!/bin/bash
#Security connection examples#########################################
echo "Beginning example_wpa_connection.py test..."
chmod +x example_wpa2_connection.py
./example_wpa2_connection.py --num_stations 3 --ssid jedway-r8000-36 --passwd jedway-r8000-36 --radio wiphy0
chmod +x scenario.py
./scenario.py --load BLANK
chmod +x example_wep_connection.py
./example_wep_connection.py --ssid jedway-wep-48 --passwd jedway-wep-48 --radio wiphy1
./scenario.py --load BLANK
chmod +x example_wpa3_connection.py
./example_wpa3_connection.py
./scenario.py --load BLANK
chmod +x example_wpa_connection.py
./example_wpa_connection.py --radio wiphy1
./scenario.py --load BLANK
#test generic and test fileio######################################################
#chmod +x test_fileio.py
#./test_fileio.py
chmod +x test_generic.py
#lfping
./test_generic.py --num_stations 3 --ssid jedway-r8000-36 --passwd jedway-r8000-36 --radio wiphy0
#lfcurl
./test_generic.py --num_stations 3 --ssid jedway-r8000-36 --passwd jedway-r8000-36 --radio wiphy0
#generic
./test_generic.py --num_stations 3 --ssid jedway-r8000-36 --passwd jedway-r8000-36 --radio wiphy0
#speedtest
./test_generic.py --num_stations 3 --ssid jedway-r8000-36 --passwd jedway-r8000-36 --radio wiphy0
#iperf3
./test_generic.py --num_stations 3 --ssid jedway-r8000-36 --passwd jedway-r8000-36 --radio wiphy0
#Ipv4 connection tests##########################################
chmod +x test_ipv4_connection.py
./test_ipv4_connection.py
chmod +x test_ipv4_l4_ftp_upload.py
./test_ipv4_l4_ftp_upload.py
chmod +x test_ipv4_l4_ftp_urls_per_ten.py
./test_ipv4_l4_ftp_urls_per_ten.py
chmod +x test_ipv4_l4_ftp_wifi.py
./test_ipv4_l4_ftp_wifi.py
chmod +x test_ipv4_l4_urls_per_ten.py
./test_ipv4_l4_urls_per_ten.py
chmod +x test_ipv4_l4_wifi.py
./test_ipv4_l4_wifi.py
chmod +x test_ipv4_l4
./test_ipv4_l4.py
chmod +x test_ipv4_ps.py
./test_ipv4_l4_ps.py
chmod +x test_ipv4_ttls.py
./test_ipv4_l4_ttls.py
chmod +x test_ipv4_variable_time.py
./test_ipv4_variable_time.py
#Layer 3 tests################################################
chmod +x test_l3_longevity.py
./test_l3_longevity.py
chmod +x test_l3_powersave_traffic
./test_l3_powersave_traffic.py
chmod +x test_l3_scenario_traffic.py
./test_l3_scenario_traffic.py
chmod +x test_l3_unicast_traffic_gen.py
./test_l3_unicast_traffic_gen.py
chmod +x test_l3_WAN_LAN.py
./test_l3_WAN_LAN.py
#WANlink######################################################
chmod +x test_wanlink.py
./test_wanlink.py
#IPv6 connection tests########################################
chmod +x test_ipv6_connection.py
./test_ipv6_variable_connection.py
chmod +x test_ipv6_variable_time.py
./test_ipv6_variable_time.py
#STA Connect examples#########################################
chmod +x sta_connect_example.py
./sta_connect_example.py
chmod +x sta_connect_multi_example.py
./sta_connect_multi_example.py
chmod +x sta_connect.py
./sta_connect.py
chmod +x sta_connect2.py
./sta_connect2.py
#This bash script aims to automate the test process of all Candela Technologies's test_* scripts in the lanforge-scripts directory. The script can be run 2 ways and may include (via user input) the "start_num" and "stop_num" variables to select which tests should be run.
# OPTION ONE: ./test_all_scripts.sh : this command runs all the scripts in the array "testCommands"
# OPTION TWO: ./test_all_scripts.sh 4 5 : this command runs py-script commands (in testCommands array) that include the py-script options beginning with 4 and 5 (inclusive) in case function ret_case_num.
#Variables
NUM_STA=4
SSID_USED="jedway-wpa2-x2048-5-3"
PASSWD_USED="jedway-wpa2-x2048-5-3"
RADIO_USED="wiphy1"
SECURITY="wpa2"
CURR_TEST_NAME="BLANK"
CURR_TEST_NUM=0
STOP_NUM=9
START_NUM=0
#Test array
testCommands=("./example_wpa_connection.py --num_stations $NUM_STA --ssid jedway-r8000-36 --passwd jedway-r8000-36 --radio $RADIO_USED --security wpa"
"./example_wpa2_connection.py --num_stations $NUM_STA --ssid $SSID_USED --passwd $SSID_USED --radio $RADIO_USED --security wpa2"
"./example_wep_connection.py --num_stations $NUM_STA --ssid jedway-wep-48 --passwd jedway-wep-48 --radio $RADIO_USED --security wep"
"./example_wpa3_connection.py --num_stations $NUM_STA --ssid jedway-wpa3-1 --passwd jedway-wpa3-1 --radio $RADIO_USED --security wpa3"
"./test_ipv4_connection.py --radio wiphy2 --num_stations $NUM_STA --ssid $SSID_USED --passwd $PASSWD_USED --security $SECURITY --debug --upstream_port eth1"
"./test_generic.py --mgr localhost --mgr_port 4122 --radio $RADIO_USED --ssid SSID_USED --passwd $PASSWD_USED --num_stations $NUM_STA --type lfping --dest 10.40.0.1 --security $SECURITY"
"./test_generic.py --mgr localhost --mgr_port 4122 --radio $RADIO_USED --ssid $SSID_USED --passwd $PASSWD_USED --num_stations $NUM_STA --type speedtest --speedtest_min_up 20 --speedtest_min_dl 20 --speedtest_max_ping 150 --security $SECURITY"
"./test_ipv4_l4_urls_per_ten.py --upstream_port eth1 --radio $RADIO_USED --num_stations $NUM_STA --security $SECURITY --ssid $SSID_USED --passwd $PASSWD_USED --num_tests 1 --requests_per_ten 600 --target_per_ten 600"
"./test_ipv4_l4_wifi.py --upstream_port eth1 --radio wiphy0 --num_stations $NUM_STA --security $SECURITY --ssid jedway-wpa2-x2048-4-4 --passwd jedway-wpa2-x2048-4-4 --test_duration 3m"
"./test_ipv4_l4.py --radio wiphy3 --num_stations 4 --security wpa2 --ssid jedway-wpa2-x2048-4-1 --passwd jedway-wpa2-x2048-4-1 --url \"dl http://10.40.0.1 /dev/null\" --test_duration 2m --debug"
)
function ret_case_num(){
case $1 in
"example_wpa_connection")
echo 1 ;;
"example_wpa2_connection")
echo 2 ;;
"example_wpa3_connection")
echo 4 ;;
"example_wep_connection")
echo 3 ;;
"test_ipv4_connection")
echo 5 ;;
"test_generic")
echo 6 ;;
"test_ipv4_l4_urls_per_ten")
echo 7 ;;
"test_ipv4_l4_wifi")
echo 8 ;;
"test_ipv4_l4")
echo 9 ;;
esac
}
function blank_db() {
echo "Loading blank scenario..." >> ~/test_all_output_file.txt
./scenario.py --load BLANK >> ~/test_all_output_file.txt
#check_blank.py
}
function echo_print(){
echo "Beginning $CURR_TEST_NAME test..." >> ~/test_all_output_file.txt
}
function run_test(){
for i in "${testCommands[@]}"; do
CURR_TEST_NAME=${i%%.py*}
CURR_TEST_NAME=${CURR_TEST_NAME#./*}
CURR_TEST_NUM=$(ret_case_num $CURR_TEST_NAME)
if [[ $CURR_TEST_NUM -gt $STOP_NUM ]] || [[ $STOP_NUM -eq $CURR_NUM && $STOP_NUM -ne 0 ]]; then
exit 1
fi
if [[ $CURR_TEST_NUM -gt $START_NUM ]] || [[ $CURR_TEST_NUM -eq $START_NUM ]]; then
echo_print
eval $i >> ~/test_all_output_file.txt
if [ $? -ne 0 ]; then
echo $CURR_TEST_NAME failure
else
echo $CURR_TEST_NAME success
fi
if [[ "${CURR_TEST_NAME}" = @(example_wpa_connection|example_wpa2_connection|example_wpa3_connection|example_wep_connection) ]]; then
blank_db
fi
fi
done
}
function check_args(){
if [ ! -z $1 ]; then
START_NUM=$1
fi
if [ ! -z $2 ]; then
STOP_NUM=$2
fi
}
true > ~/test_all_output_file.txt
check_args $1 $2
run_test
#test generic and fileio are for macvlans

View File

@@ -18,6 +18,7 @@ import argparse
import realm
import time
import datetime
import pprint
class FileIOTest(LFCliBase):
@@ -44,7 +45,12 @@ class FileIOTest(LFCliBase):
gateway=None,
dhcp=True,
use_macvlans=False,
use_test_groups=False,
write_only_test_group=None,
read_only_test_group=None,
port_list=[],
ip_list=None,
mode=None,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
@@ -58,14 +64,39 @@ class FileIOTest(LFCliBase):
self.password = password
self.number_template = number_template
self.test_duration = test_duration
self.sta_list = []
self.port_list = []
self.use_macvlans = use_macvlans
self.mode = mode
self.ip_list = ip_list
self.netmask = netmask
self.gateway = gateway
if self.use_macvlans:
if macvlan_parent != None:
if macvlan_parent is not None:
self.macvlan_parent = macvlan_parent
self.port_list = port_list
else:
self.sta_list = port_list
self.port_list = port_list
self.use_test_groups = use_test_groups
if self.use_test_groups:
if self.mode == "write":
if write_only_test_group is not None:
self.write_only_test_group = write_only_test_group
else:
raise ValueError("--write_only_test_group must be used to set test group name")
if self.mode == "read":
if read_only_test_group is not None:
self.read_only_test_group = read_only_test_group
else:
raise ValueError("--read_only_test_group must be used to set test group name")
if self.mode == "both":
if write_only_test_group is not None and read_only_test_group is not None:
self.write_only_test_group = write_only_test_group
self.read_only_test_group = read_only_test_group
else:
raise ValueError("--write_only_test_group and --read_only_test_group "
"must be used to set test group names")
#self.min_rw_size = self.parse_size(min_rw_size)
#self.max_rw_size = self.parse_size(max_rw_size)
#self.min_file_size = self.parse_size(min_file_size)
@@ -76,10 +107,10 @@ class FileIOTest(LFCliBase):
#self.max_write_rate_bps = self.parse_size_bps(max_write_rate_bps)
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.endp_profile = self.local_realm.new_fio_endp_profile()
self.wo_profile = self.local_realm.new_fio_endp_profile()
self.mvlan_profile = self.local_realm.new_mvlan_profile()
if len(self.sta_list) > 0:
if not self.use_macvlans and len(self.port_list) > 0:
self.station_profile = self.local_realm.new_station_profile()
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
@@ -88,29 +119,44 @@ class FileIOTest(LFCliBase):
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
self.endp_profile.fs_type = fs_type
self.endp_profile.min_rw_size = LFUtils.parse_size(min_rw_size)
self.endp_profile.max_rw_size = LFUtils.parse_size(max_rw_size)
self.endp_profile.min_file_size = LFUtils.parse_size(min_file_size)
self.endp_profile.min_file_size = LFUtils.parse_size(min_file_size)
self.endp_profile.min_read_rate_bps = LFUtils.parse_size(min_read_rate_bps)
self.endp_profile.max_read_rate_bps = LFUtils.parse_size(max_read_rate_bps)
self.endp_profile.min_write_rate_bps = LFUtils.parse_size(min_write_rate_bps)
self.endp_profile.max_write_rate_bps = LFUtils.parse_size(max_write_rate_bps)
self.endp_profile.directory = directory
self.endp_profile.server_mount = server_mount
self.wo_profile.fs_type = fs_type
self.wo_profile.min_rw_size = LFUtils.parse_size(min_rw_size)
self.wo_profile.max_rw_size = LFUtils.parse_size(max_rw_size)
self.wo_profile.min_file_size = LFUtils.parse_size(min_file_size)
self.wo_profile.max_file_size = LFUtils.parse_size(max_file_size)
self.wo_profile.min_read_rate_bps = LFUtils.parse_size(min_read_rate_bps)
self.wo_profile.max_read_rate_bps = LFUtils.parse_size(max_read_rate_bps)
self.wo_profile.min_write_rate_bps = LFUtils.parse_size(min_write_rate_bps)
self.wo_profile.max_write_rate_bps = LFUtils.parse_size(max_write_rate_bps)
self.wo_profile.directory = directory
self.wo_profile.server_mount = server_mount
self.ro_profile = self.endp_profile.create_ro_profile()
self.ro_profile = self.wo_profile.create_ro_profile()
self.mvlan_profile.num_macvlans = int(num_ports)
self.mvlan_profile.desired_macvlans = self.port_list
self.mvlan_profile.macvlan_parent = self.macvlan_parent
self.mvlan_profile.dhcp = dhcp
self.mvlan_profile.netmask = netmask
self.mvlan_profile.first_ip_addr = first_mvlan_ip
self.mvlan_profile.gateway = gateway
if self.use_macvlans:
self.mvlan_profile.num_macvlans = int(num_ports)
self.mvlan_profile.desired_macvlans = self.port_list
self.mvlan_profile.macvlan_parent = self.macvlan_parent
self.mvlan_profile.dhcp = dhcp
self.mvlan_profile.netmask = netmask
self.mvlan_profile.first_ip_addr = first_mvlan_ip
self.mvlan_profile.gateway = gateway
self.created_ports = []
if self.use_test_groups:
if self.mode is not None:
if self.mode.lower() == "write":
self.wo_tg_profile = self.local_realm.new_test_group_profile()
elif self.mode.lower() == "read":
self.ro_tg_profile = self.local_realm.new_test_group_profile()
elif self.mode.lower() == "both":
self.wo_tg_profile = self.local_realm.new_test_group_profile()
self.ro_tg_profile = self.local_realm.new_test_group_profile()
else:
raise ValueError("Unknown mode given ", self.mode)
else:
raise ValueError("Mode (read,write,both) must be specified")
def __compare_vals(self, val_list):
passes = 0
expected_passes = 0
@@ -121,18 +167,18 @@ class FileIOTest(LFCliBase):
if item[0] == 'r':
# print("TEST", item,
# val_list[item]['read-bps'],
# self.endp_profile.min_read_rate_bps,
# val_list[item]['read-bps'] > self.endp_profile.min_read_rate_bps)
# self.ro_profile.min_read_rate_bps,
# val_list[item]['read-bps'] > self.ro_profile.min_read_rate_bps)
if val_list[item]['read-bps'] > self.endp_profile.min_read_rate_bps:
if val_list[item]['read-bps'] > self.wo_profile.min_read_rate_bps:
passes += 1
else:
# print("TEST", item,
# val_list[item]['write-bps'],
# self.endp_profile.min_write_rate_bps,
# val_list[item]['write-bps'] > self.endp_profile.min_write_rate_bps)
# self.wo_profile.min_write_rate_bps,
# val_list[item]['write-bps'] > self.wo_profile.min_write_rate_bps)
if val_list[item]['write-bps'] > self.endp_profile.min_write_rate_bps:
if val_list[item]['write-bps'] > self.wo_profile.min_write_rate_bps:
passes += 1
if passes == expected_passes:
return True
@@ -143,11 +189,20 @@ class FileIOTest(LFCliBase):
def __get_values(self):
time.sleep(3)
cx_list = self.json_get("fileio/%s,%s?fields=write-bps,read-bps" % (','.join(self.endp_profile.created_cx.keys()), ','.join(self.ro_profile.created_cx.keys())),
debug_=self.debug)
if self.mode == "write":
cx_list = self.json_get("fileio/%s?fields=write-bps,read-bps" % (
','.join(self.wo_profile.created_cx.keys())), debug_=self.debug)
elif self.mode == "read":
cx_list = self.json_get("fileio/%s?fields=write-bps,read-bps" % (
','.join(self.ro_profile.created_cx.keys())), debug_=self.debug)
else:
cx_list = self.json_get("fileio/%s,%s?fields=write-bps,read-bps" % (
','.join(self.wo_profile.created_cx.keys()),
','.join(self.ro_profile.created_cx.keys())), debug_=self.debug)
# print(cx_list)
# print("==============\n", cx_list, "\n==============")
cx_map = {}
# pprint.pprint(cx_list)
if cx_list is not None:
cx_list = cx_list['endpoint']
for i in cx_list:
@@ -159,45 +214,114 @@ class FileIOTest(LFCliBase):
def build(self):
# Build stations
# print(self.min_tx_bps, self.min_rx_bps)
# TODO: Check for upstream port as child to bridge before macvlan create
if self.use_macvlans:
self.mvlan_profile.create(admin_down=True, sleep_time=.5, debug=self.debug)
print("Creating MACVLANs")
self.mvlan_profile.create(admin_down=False, sleep_time=.5, debug=self.debug)
self._pass("PASS: MACVLAN build finished")
self.created_ports += self.mvlan_profile.created_macvlans
if len(self.sta_list) > 0:
elif not self.use_macvlans and self.ip_list is None:
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
self.station_profile.set_command_param("set_port", "report_timer", 1500)
self.station_profile.set_command_flag("set_port", "rpt_timer", 1)
self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug)
self.station_profile.create(radio=self.radio, sta_names_=self.port_list, debug=self.debug)
self._pass("PASS: Station build finished")
self.created_ports += self.station_profile.station_names
if self.ip_list is not None:
if self.gateway is not None and self.netmask is not None:
for num_port in range(len(self.port_list)):
shelf = self.local_realm.name_to_eid(self.port_list[num_port])[0]
resource = self.local_realm.name_to_eid(self.port_list[num_port])[1]
port = self.local_realm.name_to_eid(self.port_list[num_port])[2]
req_url = "/cli-json/set_port"
data = {
"shelf": shelf,
"resource": resource,
"port": port,
"ip_addr": self.ip_list[num_port],
"netmask": self.netmask,
"gateway": self.gateway
}
self.local_realm.json_post(req_url, data)
self.created_ports.append("%s.%s.%s" % (shelf, resource, port))
else:
raise ValueError("Netmask and gateway must be specified")
if self.mode is not None:
if self.mode.lower() == "write":
print("Creating Write Only CXs")
self.wo_profile.create(ports=self.created_ports, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=None)
elif self.mode.lower() == "read":
print("Creating Read Only CXs")
self.ro_profile.create(ports=self.created_ports, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=None)
elif self.mode.lower() == "both":
print("Creating Write Only CXs")
print("Creating Read Only CXs")
self.wo_profile.create(ports=self.created_ports, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=None)
self.ro_profile.create(ports=self.created_ports, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=None)
if self.use_test_groups:
print("Creating test groups...")
if self.mode is not None:
if self.mode.lower() == "write":
self.wo_tg_profile.cx_list = self.wo_profile.created_cx.values()
self.wo_tg_profile.create_group(group_name=self.write_only_test_group)
elif self.mode.lower() == "read":
self.ro_tg_profile.cx_list = self.ro_profile.created_cx.values()
self.ro_tg_profile.create_group(group_name=self.read_only_test_group)
elif self.mode.lower() == "both":
self.ro_tg_profile.cx_list = self.ro_profile.created_cx.values()
self.wo_tg_profile.cx_list = self.wo_profile.created_cx.values()
self.ro_tg_profile.create_group(group_name=self.read_only_test_group)
self.wo_tg_profile.create_group(group_name=self.write_only_test_group)
else:
raise ValueError("Unknown mode given ", self.mode)
else:
raise ValueError("Mode (read,write,both) must be specified")
self.endp_profile.create(ports=self.created_ports, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=None)
self.ro_profile.create(ports=self.created_ports, sleep_time=.5, debug_=self.debug,
suppress_related_commands_=None)
def start(self, print_pass=False, print_fail=False):
temp_ports = self.port_list.copy()
temp_ports = self.created_ports.copy()
#temp_stas.append(self.local_realm.name_to_eid(self.upstream_port)[2])
self.station_profile.admin_up()
self.mvlan_profile.admin_up()
exit(1)
if self.local_realm.wait_for_ip(temp_ports):
if not self.use_macvlans:
self.station_profile.admin_up()
else:
self.mvlan_profile.admin_up()
if self.local_realm.wait_for_ip(temp_ports, debug=self.debug):
self._pass("All ports got IPs", print_pass)
else:
self._fail("Ports failed to get IPs", print_fail)
cur_time = datetime.datetime.now()
# print("Got Values")
end_time = self.local_realm.parse_time(self.test_duration) + cur_time
self.endp_profile.start_cx()
time.sleep(2)
self.ro_profile.start_cx()
if self.use_test_groups:
if self.mode == "write":
self.wo_tg_profile.start_group(group_name=self.write_only_test_group)
elif self.mode == "read":
self.ro_tg_profile.start_group(group_name=self.read_only_test_group)
else:
self.wo_tg_profile.start_group(group_name=self.write_only_test_group)
time.sleep(2)
self.ro_tg_profile.start_group(group_name=self.read_only_test_group)
else:
if self.mode == "write":
self.wo_profile.start_cx()
elif self.mode == "read":
self.ro_profile.start_cx()
else:
self.wo_profile.start_cx()
time.sleep(2)
self.ro_profile.start_cx()
passes = 0
expected_passes = 0
print("Starting Test...")
@@ -225,18 +349,51 @@ class FileIOTest(LFCliBase):
self._pass("PASS: All tests passes", print_pass)
def stop(self):
self.endp_profile.stop_cx()
self.ro_profile.stop_cx()
self.station_profile.admin_down()
self.mvlan_profile.admin_down()
if self.use_test_groups:
if self.mode == "write":
self.wo_tg_profile.stop_group(group_name=self.write_only_test_group)
elif self.mode == "read":
self.ro_tg_profile.stop_group(group_name=self.read_only_test_group)
else:
self.wo_tg_profile.stop_group(group_name=self.write_only_test_group)
time.sleep(2)
self.ro_tg_profile.stop_group(group_name=self.read_only_test_group)
else:
if self.mode == "write":
self.wo_profile.stop_cx()
elif self.mode == "read":
self.ro_profile.stop_cx()
else:
self.wo_profile.stop_cx()
time.sleep(2)
self.ro_profile.stop_cx()
def cleanup(self, sta_list=None):
self.endp_profile.cleanup()
if not self.use_macvlans:
self.station_profile.admin_down()
else:
self.mvlan_profile.admin_down()
def cleanup(self, port_list=None):
if self.use_test_groups:
if self.mode == "read":
self.ro_tg_profile.remove_group(group_name=self.read_only_test_group)
elif self.mode == "write":
self.wo_tg_profile.remove_group(group_name=self.write_only_test_group)
else:
self.ro_tg_profile.remove_group(group_name=self.read_only_test_group)
self.wo_tg_profile.remove_group(group_name=self.write_only_test_group)
time.sleep(1)
self.wo_profile.cleanup()
self.ro_profile.cleanup()
if len(self.sta_list) > 0:
self.station_profile.cleanup(sta_list)
LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=sta_list, debug=self.debug)
self.mvlan_profile.cleanup()
if not self.use_macvlans and self.ip_list is None:
self.station_profile.cleanup(port_list)
elif self.use_macvlans:
self.mvlan_profile.cleanup()
# LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=port_list, debug=self.debug)
def main():
@@ -253,9 +410,11 @@ def main():
test_fileio.py:
--------------------
Generic command layout:
python ./test_fileio.py --upstream_port <port> --radio <radio 0> <stations> <ssid> <ssid password> <security type: wpa2, open, wpa3> --debug
./test_fileio.py --macvlan_parent <port> --num_ports <num ports> --use_macvlans
--first_mvlan_ip <first ip in series> --netmask <netmask to use> --gateway <gateway ip addr>
python3 ./test_fileio.py --upstream_port eth1 --fio_type fe_nfs4 --min_read 1Mbps --min_write 1Gbps --server_mount 192.168.93.195:/tmp/test
./test_fileio.py --macvlan_parent eth2 --num_ports 3 --use_macvlans --first_mvlan_ip 192.168.92.13
--netmask 255.255.255.0 --gateway 192.168.92.1
''')
parser.add_argument('--test_duration', help='sets the duration of the test', default="5m")
@@ -268,7 +427,8 @@ python3 ./test_fileio.py --upstream_port eth1 --fio_type fe_nfs4 --min_read 1Mbp
parser.add_argument('--max_read_rate_bps', help='maximum bps read rate', default=10e9)
parser.add_argument('--min_write_rate_bps', help='minimum bps write rate', default=10e9)
parser.add_argument('--max_write_rate_bps', help='maximum bps write rate', default="1G")
parser.add_argument('--directory', help='--directory directory to read/write in. Absolute path suggested', default="AUTO")
parser.add_argument('--directory', help='--directory directory to read/write in. Absolute path suggested',
default="AUTO")
parser.add_argument('--server_mount', help='--server_mount The server to mount, ex: 192.168.100.5/exports/test1',
default="10.40.0.1:/var/tmp/test")
parser.add_argument('--macvlan_parent', help='specifies parent port for macvlan creation', default=None)
@@ -278,13 +438,21 @@ python3 ./test_fileio.py --upstream_port eth1 --fio_type fe_nfs4 --min_read 1Mbp
parser.add_argument('--first_mvlan_ip', help='specifies first static ip address to be used or dhcp', default=None)
parser.add_argument('--netmask', help='specifies netmask to be used with static ip addresses', default=None)
parser.add_argument('--gateway', help='specifies default gateway to be used with static addressing', default=None)
parser.add_argument('--use_test_groups', help='will use test groups to start/stop instead of single endps/cxs',
action='store_true', default=False)
parser.add_argument('--read_only_test_group', help='specifies name to use for read only test group', default=None)
parser.add_argument('--write_only_test_group', help='specifies name to use for write only test group', default=None)
parser.add_argument('--mode', help='write,read,both', default='both', type=str)
parser.add_argument('--use_ports', help='list of comma separated ports to use with ips, \'=\' separates name and ip'
'{ port_name1=ip_addr1,port_name1=ip_addr2 }', default=None)
args = parser.parse_args()
port_list = []
if args.first_port is not None:
ip_list = []
if args.first_port is not None and args.use_ports is not None:
if args.first_port.startswith("sta"):
if (args.num_ports is not None) and (int(args.num_ports) > 0):
start_num = int(args.first_port[4:])
start_num = int(args.first_port[3:])
num_ports = int(args.num_ports)
port_list = LFUtils.port_name_series(prefix="sta", start_id=start_num, end_id=start_num+num_ports-1,
padding_number=10000,
@@ -302,27 +470,45 @@ python3 ./test_fileio.py --upstream_port eth1 --fio_type fe_nfs4 --min_read 1Mbp
"first_port must contain parent port and num_ports must be greater than 0"
% (args.num_ports, args.macvlan_parent, args.first_port))
else:
num_ports = int(args.num_ports)
if not args.use_macvlans:
port_list = LFUtils.port_name_series(prefix="sta", start_id=0, end_id=num_ports - 1,
padding_number=10000,
radio=args.radio)
else:
port_list = LFUtils.port_name_series(prefix=args.macvlan_parent + "#", start_id=0,
if args.use_ports is None:
num_ports = int(args.num_ports)
if not args.use_macvlans:
port_list = LFUtils.port_name_series(prefix="sta", start_id=0, end_id=num_ports - 1,
padding_number=10000,
radio=args.radio)
else:
port_list = LFUtils.port_name_series(prefix=args.macvlan_parent + "#", start_id=0,
end_id=num_ports - 1, padding_number=100000,
radio=args.radio)
if args.first_mvlan_ip.lower() == "dhcp":
dhcp = True
else:
dhcp = False
print(port_list)
else:
temp_list = args.use_ports.split(',')
for port in temp_list:
port_list.append(port.split('=')[0])
ip_list.append(port.split('=')[1])
if len(port_list) != len(ip_list):
raise ValueError(temp_list, " ports must have matching ip addresses!")
print(port_list)
print(ip_list)
if args.first_mvlan_ip is not None:
if args.first_mvlan_ip.lower() == "dhcp":
dhcp = True
else:
dhcp = False
else:
dhcp = True
# print(port_list)
# exit(1)
ip_test = FileIOTest(args.mgr,
args.mgr_port,
ssid=args.ssid,
password=args.passwd,
security=args.security,
port_list=port_list,
ip_list=ip_list,
test_duration=args.test_duration,
upstream_port=args.upstream_port,
_debug_on=args.debug,
@@ -344,7 +530,11 @@ python3 ./test_fileio.py --upstream_port eth1 --fio_type fe_nfs4 --min_read 1Mbp
max_write_rate_bps=args.max_write_rate_bps,
directory=args.directory,
server_mount=args.server_mount,
num_ports=args.num_ports
num_ports=args.num_ports,
use_test_groups=args.use_test_groups,
write_only_test_group=args.write_only_test_group,
read_only_test_group=args.read_only_test_group,
mode=args.mode
# want a mount options param
)
@@ -352,7 +542,6 @@ python3 ./test_fileio.py --upstream_port eth1 --fio_type fe_nfs4 --min_read 1Mbp
ip_test.build()
if not ip_test.passes():
print(ip_test.get_fail_message())
exit(1)
ip_test.start(False, False)
ip_test.stop()
if not ip_test.passes():

View File

@@ -19,7 +19,7 @@ import datetime
import json
class GenTest(LFCliBase):
def __init__(self, host, port, ssid, security, password, sta_list, name_prefix, upstream,
def __init__(self, host, port, ssid, security, passwd, sta_list, name_prefix, upstream,
number_template="000", test_duration="5m", type="lfping", dest=None, cmd =None,
interval=1, radio=None, speedtest_min_up=None, speedtest_min_dl=None, speedtest_max_ping=None,
_debug_on=False,
@@ -31,7 +31,7 @@ class GenTest(LFCliBase):
self.upstream = upstream
self.sta_list = sta_list
self.security = security
self.password = password
self.passwd = passwd
self.number_template = number_template
self.name_prefix = name_prefix
self.test_duration = test_duration
@@ -48,7 +48,7 @@ class GenTest(LFCliBase):
self.station_profile.lfclient_url = self.lfclient_url
self.station_profile.ssid = self.ssid
self.station_profile.ssid_pass = self.password,
self.station_profile.ssid_pass = self.passwd,
self.station_profile.security = self.security
self.station_profile.number_template_ = self.number_template
self.station_profile.mode = 0
@@ -84,8 +84,8 @@ class GenTest(LFCliBase):
if last_results['download'] is None and last_results['upload'] is None and last_results['ping'] is None:
return False, v['name']
elif last_results['download'] >= self.speedtest_min_dl and \
last_results['upload'] >= self.speedtest_min_up and \
last_results['ping'] <= self.speedtest_max_ping:
last_results['upload'] >= self.speedtest_min_up and \
last_results['ping'] <= self.speedtest_max_ping:
return True, v['name']
def choose_generic_command(self):
@@ -155,7 +155,7 @@ class GenTest(LFCliBase):
self.station_profile.admin_down()
def build(self):
self.station_profile.use_security(self.security, self.ssid, self.password)
self.station_profile.use_security(self.security, self.ssid, self.passwd)
self.station_profile.set_number_template(self.number_template)
print("Creating stations")
self.station_profile.set_command_flag("add_sta", "create_admin_down", 1)
@@ -194,6 +194,20 @@ python3 ./test_generic.py --upstream_port eth1
--test_duration 2m
--interval 1s
--debug
Example commands:
LFPING:
./test_generic.py --mgr localhost --mgr_port 4122 --radio wiphy0 --num_stations 3 --ssid jedway-wpa2-x2048-4-1 --passwd jedway-wpa2-x2048-4-1 --type lfping --dest 10.40.0.1 --security wpa2
LFCURL (under construction):
./test_generic.py --mgr localhost --mgr_port 4122 --radio wiphy1 --num_stations 3 --ssid jedway-wpa2-x2048-4-1 --passwd jedway-wpa2-x2048-4-1 --security wpa2 --type lfcurl --dest 10.40.0.1
GENERIC:
./test_generic.py --mgr localhost--mgr_port 4122 --radio wiphy1 --num_stations 3 --ssid jedway-wpa2-x2048-4-1 --passwd jedway-wpa2-x2048-4-1 --security wpa2 --type generic
SPEEDTEST:
./test_generic.py --mgr localhost --mgr_port 4122 --radio wiphy2 --num_stations 3 --ssid jedway-wpa2-x2048-4-1 --passwd jedway-wpa2-x2048-4-1 --type speedtest --speedtest_min_up 20
--speedtest_min_dl 20 --speedtest_max_ping 150 --security wpa2
IPERF3 (under construction):
./test_generic.py --mgr localhost --mgr_port 4122 --radio wiphy1 --num_stations 3 --ssid jedway-wpa2-x2048-4-1 --passwd jedway-wpa2-x2048-4-1 --security wpa2 --type iperf3
''')
parser.add_argument('--type', help='type of command to run: generic, lfping, iperf3-client, iperf3-server, lfcurl', default="lfping")
@@ -228,7 +242,7 @@ python3 ./test_generic.py --upstream_port eth1
interval=1,
ssid=args.ssid,
upstream=args.upstream_port,
password=args.passwd,
passwd=args.passwd,
security=args.security,
test_duration=args.test_duration,
speedtest_min_up=args.speedtest_min_up,

View File

@@ -126,12 +126,9 @@ Generic command example:
./test_ipv4_connection.py --upstream_port eth1 \\
--radio wiphy0 \\
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--security open {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--dest 10.40.0.1 \\
--test_duration 2m \\
--interval 1s \\
--passwd BLANK \\
--debug
''')
@@ -142,7 +139,6 @@ Generic command example:
num_stations_converted = int(args.num_stations)
num_sta = num_stations_converted
ws_event = LFCliBase("192.168.200.15", 8080)
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000)
ip_test = IPv4Test(lfjson_host, lfjson_port, ssid=args.ssid, password=args.passwd,
@@ -152,19 +148,20 @@ Generic command example:
ip_test.build()
if not ip_test.passes():
print(ip_test.get_fail_message())
ws_event.add_event(name="test_ipv4_connection.py", message=ip_test.get_fail_message())
ip_test.add_event(name="test_ipv4_connection.py", message=ip_test.get_fail_message())
exit(1)
ip_test.start(station_list, False, False)
ip_test.stop()
if not ip_test.passes():
print(ip_test.get_fail_message())
ws_event.add_event(name="test_ipv4_connection.py", message=ip_test.get_fail_message())
ip_test.add_event(name="test_ipv4_connection.py", message=ip_test.get_fail_message())
exit(1)
time.sleep(30)
ip_test.cleanup(station_list)
if ip_test.passes():
print("Full test passed, all stations associated and got IP")
ws_event.add_event(name="test_ipv4_connection.py", message="Full test passed, all stations associated and got IP")
ip_test.add_event(name="test_ipv4_connection.py", message="Full test passed, all stations associated and got IP")
#exit(1)
if __name__ == "__main__":
main()

View File

@@ -173,7 +173,7 @@ def main():
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--url dl http://10.40.0.1 /dev/null \\
--url "dl http://10.40.0.1 /dev/null" \\
--password admin123 \\
--test_duration 2m \\
--debug

View File

@@ -165,15 +165,13 @@ def main():
test_ipv4_l4_ftp_upload.py
--------------------
Generic command example:
python3 ./test_ipv4_l4_ftp_wifi.py --upstream_port eth1 \\
./test_ipv4_l4_ftp_upload.py --upstream_port eth1 \\
--radio wiphy0 \\
--num_stations 3 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--dest 10.40.0.1 \\
--test_duration 2m \\
--interval 1s \\
--url "ul ftp://10.40.0.1 /dev/null" \\
--requests_per_ten 600 \\
--debug

View File

@@ -33,11 +33,11 @@ class IPV4L4(LFCliBase):
self.security = security
self.password = password
self.url = url
self.requests_per_ten = requests_per_ten
self.requests_per_ten = int(requests_per_ten)
self.number_template = number_template
self.sta_list = station_list
self.num_tests = num_tests
self.target_requests_per_ten = target_requests_per_ten
self.num_tests = int(num_tests)
self.target_requests_per_ten = int(target_requests_per_ten)
self.local_realm = realm.Realm(lfclient_host=self.host, lfclient_port=self.port)
self.station_profile = self.local_realm.new_station_profile()
@@ -63,7 +63,7 @@ class IPV4L4(LFCliBase):
if name in self.cx_profile.created_cx.keys():
expected_passes += 1
if info['urls/s'] * self.requests_per_ten >= self.target_requests_per_ten * .9:
# print(name, info['urls/s'], info['urls/s'] * self.requests_per_ten, self.target_requests_per_ten * .9)
print(name, info['urls/s'], info['urls/s'] * self.requests_per_ten, self.target_requests_per_ten * .9)
passes += 1
return passes == expected_passes
@@ -84,7 +84,7 @@ class IPV4L4(LFCliBase):
temp_stas = self.sta_list.copy()
# temp_stas.append(self.local_realm.name_to_eid(self.upstream_port)[2])
cur_time = datetime.datetime.now()
interval_time = cur_time + datetime.timedelta(minutes=10)
interval_time = cur_time + datetime.timedelta(minutes=2)
passes = 0
expected_passes = 0
self.station_profile.admin_up()
@@ -99,6 +99,7 @@ class IPV4L4(LFCliBase):
expected_passes += 1
while cur_time < interval_time:
time.sleep(1)
print(".",end="")
cur_time = datetime.datetime.now()
if self.cx_profile.check_errors(self.debug):
@@ -110,7 +111,7 @@ class IPV4L4(LFCliBase):
else:
self._fail("FAIL: Errors found getting to %s " % self.url, print_fail)
break
interval_time = cur_time + datetime.timedelta(minutes=10)
interval_time = cur_time + datetime.timedelta(minutes=2)
if passes == expected_passes:
self._pass("PASS: All tests passes", print_pass)
@@ -146,9 +147,6 @@ python3 ./test_ipv4_l4_urls_per_ten.py --upstream_port eth1 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--dest 10.40.0.1 \\
--test_duration 2m \\
--interval 1s \\
--requests_per_ten 600 \\
--num_tests 1 \\
--url "dl http://10.40.0.1 /dev/null" \\
@@ -173,7 +171,7 @@ python3 ./test_ipv4_l4_urls_per_ten.py --upstream_port eth1 \\
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
radio=args.radio)
ip_test = IPV4L4(args.mgr, lfjson_port, ssid=args.ssid, password=args.passwd, upstream_port=args.upstream_port,
ip_test = IPV4L4(args.mgr, lfjson_port, ssid=args.ssid, password=args.passwd, radio=args.radio, upstream_port=args.upstream_port,
security=args.security, station_list=station_list, url=args.url, num_tests=args.num_tests,
target_requests_per_ten=args.target_per_ten,
requests_per_ten=args.requests_per_ten)

View File

@@ -173,12 +173,10 @@ python3 ./test_ipv4_l4_wifi.py --upstream_port eth1 \\
--security {open|wep|wpa|wpa2|wpa3} \\
--ssid netgear \\
--passwd admin123 \\
--dest 10.40.0.1 \\
--test_duration 2m \\
--interval 1s \\
--requests_per_ten 600 \\
--direction {ul | dl} \\
--dest /dev/null \\
--dest /dev/null (or 10.40.0.1)\\
--debug
''')
@@ -197,7 +195,7 @@ python3 ./test_ipv4_l4_wifi.py --upstream_port eth1 \\
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=num_sta-1, padding_number_=10000,
radio=args.radio)
ip_test = IPV4L4(args.mgr, lfjson_port, ssid=args.ssid, password=args.passwd,
ip_test = IPV4L4(args.mgr, lfjson_port, ssid=args.ssid, password=args.passwd, radio=args.radio,
security=args.security, station_list=station_list, direction=args.direction,dest=args.dest,
test_duration=args.test_duration, upstream_port=args.upstream_port,
requests_per_ten=args.requests_per_ten, _debug_on=args.debug)

View File

@@ -66,8 +66,6 @@ class IPV4VariableTime(LFCliBase):
def __get_rx_values(self):
cx_list = self.json_get("endp?fields=name,rx+bytes", debug_=self.debug)
# print(self.cx_profile.created_cx.values())
# print("==============\n", cx_list, "\n==============")
cx_rx_map = {}
for cx_name in cx_list['endpoint']:
if cx_name != 'uri' and cx_name != 'handler':
@@ -85,8 +83,6 @@ class IPV4VariableTime(LFCliBase):
expected_passes += 1
if new_list[item] > old_list[item]:
passes += 1
# print(item, new_list[item], old_list[item], passes, expected_passes)
if passes == expected_passes:
return True
else:
@@ -142,10 +138,6 @@ class IPV4VariableTime(LFCliBase):
time.sleep(1)
new_cx_rx_values = self.__get_rx_values()
# print(old_cx_rx_values, new_cx_rx_values)
# print("\n-----------------------------------")
# print(cur_time, end_time, cur_time + datetime.timedelta(minutes=1))
# print("-----------------------------------\n")
expected_passes += 1
if self.__compare_vals(old_cx_rx_values, new_cx_rx_values):
passes += 1
@@ -183,24 +175,22 @@ def main():
prog='test_ipv4_variable_time.py',
# formatter_class=argparse.RawDescriptionHelpFormatter,
formatter_class=argparse.RawTextHelpFormatter,
epilog='''\
Useful Information:
1. TBD
''',
description='''\
test_ipv4_variable_time.py:
--------------------
TBD
---------------------
Generic command layout:
python ./test_ipv4_variable_time.py --upstream_port <port> --radio <radio 0> <stations> <ssid> <ssid password> <security type: wpa2, open, wpa3> --debug
Note: multiple --radio switches may be entered up to the number of radios available:
--radio <radio 0> <stations> <ssid> <ssid password> --radio <radio 01> <number of last station> <ssid> <ssid password>
python3 ./test_ipv4_variable_time.py --upstream_port eth1 --radio wiphy0 32 candelaTech-wpa2-x2048-4-1 candelaTech-wpa2-x2048-4-1 wpa2 --radio wiphy1 64 candelaTech-wpa2-x2048-5-3 candelaTech-wpa2-x2048-5-3 wpa2
./test_ipv4_variable_time.py
--upstream_port eth1
--radio wiphy3
--num_stations 4
-ssid jedway-wpa2-x2048-4-1
-passwd jedway-wpa2-x2048-4-1
--security {wpa2|open|wpa|wpa3}
--a_min 250000
--b_min 260000
--test_duration 2m
--debug
''')
parser.add_argument('--a_min', help='--a_min bps rate minimum for side_a', default=256000)

View File

@@ -288,7 +288,6 @@ test_ipv4_ttls.py:
enable_pkc=args.enable_pkc,
)
ttls_test.cleanup(station_list)
#ttls_test.timeout = 60
ttls_test.build()
if not ttls_test.passes():
print(ttls_test.get_fail_message())

0
py-scripts/test_l3_scenario_throughput.py Normal file → Executable file
View File

345
py-scripts/test_status_msg.py Executable file
View File

@@ -0,0 +1,345 @@
#!/usr/bin/env python3
import os
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
import argparse
from LANforge.lfcli_base import LFCliBase
import time
from uuid import uuid1
import pprint
from pprint import pprint
class TestStatusMessage(LFCliBase):
def __init__(self, host, port,
_deep_clean=False,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False):
super().__init__(host, port, _debug=_debug_on, _halt_on_error=_exit_on_error, _exit_on_fail=_exit_on_fail)
self.deep_clean = _deep_clean
self.check_connect()
def build(self):
"""create a new session"""
new_session = uuid1()
self.status_msg_url = "/status-msg"
self.session_url = "/status-msg/"+str(new_session)
# print("----- ----- ----- ----- ----- PUT ----- ----- ----- ----- ----- ----- ")
self.json_put(self.session_url, _data={})
# we should see list of sessions
try:
#print("----- ----- ----- ----- ----- GET ----- ----- ----- ----- ----- ----- ")
session_response = self.json_get(self.status_msg_url)
if self.debug:
pprint(session_response)
if "sessions" not in session_response:
print("----- ----- ----- ----- ----- BAD ----- ----- ----- ----- ----- ----- ")
self._fail("response lacks sessions element")
if len(session_response["sessions"]) < 2:
self._fail("why do we have less than two sessions?")
for session in session_response["sessions"]:
#print("----- ----- ----- ----- ----- SESSION ----- ----- ----- ----- ----- ----- ")
pprint(session)
self._pass("session created")
except ValueError as ve:
print("----- ----- ----- ----- ----- what??? ----- ----- ----- ----- ----- ----- ")
self._fail(ve)
return
def start(self, print_pass=False, print_fail=False):
"""
create a series of messages
:return: None
"""
#print("----- ----- ----- ----- ----- START ----- %s ----- ----- ----- ----- ----- " % self.session_url)
message_response = self.json_get(self.session_url)
if self.debug:
pprint(message_response)
if "empty" in message_response:
self._pass("empty response, zero messages")
elif "messages" in message_response:
messages_a = message_response["messages"]
if len(messages_a) > 0:
self._fail("we should have zero messages")
for msg_num in ( 1, 2, 3, 4, 5 ):
#print("----- ----- ----- ----- ----- ----- %s ----- ----- ----- ----- ----- " % msg_num)
#print("session url: "+self.session_url)
self.msg_count = msg_num
self.json_post(self.session_url, {
"key": "test_status_message.py",
"content-type":"application/json",
"message":"message %s"%msg_num
})
message_response = self.json_get(self.session_url)
if len(message_response["messages"]) != msg_num:
pprint(message_response)
self._fail("we should have %s messages"%msg_num)
self._pass("created and listed %s messages counted"%msg_num)
def stop(self):
"""
make sure we read those messages
:return: None
"""
message_list_response = self.json_get(self.session_url)
if "empty" in message_list_response:
self._fail("empty response, we expect 1 or more messages")
msg_num = 0
for message_o in message_list_response["messages"]:
msg_url = message_o["_links"]
print("Message url: "+msg_url)
message_response = self.json_get(msg_url)
if self.debug:
pprint(message_response)
for message_o in message_response["messages"]:
msg_num += 1
content_o = message_o
print("id %s" % content_o["message_id"])
print("key %s" % content_o["message"]["key"])
print("content-type %s" % content_o["message"]["content-type"])
print("message %s" % content_o["message"]["message"])
if msg_num != self.msg_count:
self._fail("(stop) expected %s messages, saw %s" % (self.msg_count, msg_num))
else:
self._pass("saw correct number of messages")
def cleanup(self):
"""delete messages and delete the session"""
message_list_response = self.json_get(self.session_url)
if "empty" in message_list_response:
self._fail("empty response, we expect 1 or more messages")
last_link = ""
msg_num = 0
for message_o in message_list_response["messages"]:
msg_url = message_o["_links"]
# print("Delete Message url: "+msg_url)
last_link = message_o["_links"]
msg_num += 1
if msg_num != self.msg_count:
self._fail("(cleanup) expected %s messages, saw %s" % (self.msg_count, msg_num))
message_response = self.json_delete(last_link)
if self.debug:
pprint(message_response)
# check message removal
message_list_response = self.json_get(self.session_url)
msg_num = len(message_list_response["messages"])
if msg_num != (self.msg_count - 1):
self._fail("(cleanup) expected %s messages, saw %s" % ((self.msg_count - 1), msg_num))
else:
self._pass("(cleanup) messages decreased by one")
all_url = self.session_url + "/all"
message_response = self.json_delete(all_url)
if self.debug:
pprint(message_response)
message_list_response = self.json_get(self.session_url)
if self.debug:
pprint(message_list_response)
if "messages" in message_list_response:
msg_num = len(message_list_response["messages"])
elif "empty" in message_list_response:
msg_num = 0
if (msg_num == 0):
self._pass("deleted all messages in session")
else:
self._fail("failed to delete all messages in session")
# make sure we fail on removing session incorrectly
try:
if self.debug:
print("--- del -------------------- -------------------- --------------------")
self.exit_on_error=False
self.halt_on_error=False
message_response = self.json_delete(self.session_url, debug_=False)
if self.debug:
print("--- ~del -------------------- -------------------- --------------------")
except ValueError as ve:
print("- - - - - - - - - - - - - - - - - - - - - - -")
print(ve)
print("- - - - - - - - - - - - - - - - - - - - - - -")
sessions_list_response = self.json_get("/status-msg")
if self.debug:
pprint(sessions_list_response)
session_list = sessions_list_response["sessions"]
counter = 0
for session_o in session_list:
if self.debug:
print("session %s link %s " % (self.session_url, session_o["_links"]))
if session_o["_links"] == self.session_url:
counter += 1
self._pass("session not deleted")
break
if counter == 0:
self._fail("session incorrectly deleted")
try:
if self.debug:
print("--- del -------------------- -------------------- --------------------")
self.exit_on_error=False
self.halt_on_error=False
message_response = self.json_delete(self.session_url+"/this", debug_=False)
if self.debug:
print("--- ~del -------------------- -------------------- --------------------")
except ValueError as ve:
print(ve)
sessions_list_response = self.json_get("/status-msg")
if self.debug:
pprint(sessions_list_response)
session_list = sessions_list_response["sessions"]
counter = 0
for session_o in session_list:
if session_o["_links"] == self.session_url:
counter += 1
self._fail("session not deleted: "+session_o["_links"])
break
if counter == 0:
self._pass("session correctly deleted")
# make sure we fail on removing session zero
if not self.deep_clean:
return True
print("Deleting all sessions...")
counter = 0
for session_o in session_list:
counter += 1
self.json_delete(session_o["_links"]+"/all")
print("cleaned %s sessions" % counter)
counter = 0
for session_o in session_list:
if session_o["session-id"] == "0":
continue
counter += 1
self.json_delete(session_o["_links"]+"/this")
print("deleted %s sessions" % counter)
def main():
lfjson_port = 8080
parser = LFCliBase.create_bare_argparse(
prog=__file__,
# formatter_class=argparse.RawDescriptionHelpFormatter,
formatter_class=argparse.RawTextHelpFormatter,
description="""
Test the status message passing functions of /status-msg:
- create a session: PUT /status-msg/<new-session-id>
- post message: POST /status-msg/<new-session-id>
- list sessions: GET /status-msg/
- list messages for session: GET /status-msg/<new-session-id>
- delete message: DELETE /status-msg/<new-session-id>/message-id
- delete session: DELETE /status-msg/<new-session-id>/this
- delete all messages in session: DELETE /status-msg/<new-session-id>/all
""")
parser.add_argument('--action', default="run_test", help="""
Actions can be:
run_test : run a messaging test
new : create new session
update : add message to session, requires --session, --key, --message
read : read message(s) from session, requires --session
list : list messages from session
delete : delete message, all messages using session/all or session using session/this
""")
parser.add_argument('--session', type=str, help='explicit session or session/message-id')
parser.add_argument('--deep_clean', type=bool, help='remove all messages and all sessions')
parser.add_argument('--key', type=str, help='how to key the message')
parser.add_argument('--message', type=str, help='message to include')
args = parser.parse_args()
status_messages = TestStatusMessage(args.mgr,
lfjson_port,
_debug_on=False,
_exit_on_error=False,
_exit_on_fail=False)
if args.action == "new":
if args.session is not None:
status_messages.json_put("/status-msg/"+args.session, {})
else:
u = uuid1()
status_messages.json_put("/status-msg/"+u, {})
print("created session /status-msg/"+u)
return
if args.action == "update":
if args.session is None:
print("requires --session")
return
if args.key is None:
print("requires --key")
return
if args.message is None:
print("requires --message")
return
status_messages.json_post("/status-msg/"+args.session, {
"key": args.key,
"content-type": "text/plain",
"message": args.message
})
return
if args.action == "list":
if args.session is None:
response_o = status_messages.json_get("/status-msg/")
pprint(response_o["sessions"])
else:
response_o = status_messages.json_get("/status-msg/"+args.session)
pprint(response_o["messages"])
return
if args.action == "read":
if args.session is None:
print("requires --session")
return
response_o = status_messages.json_get("/status-msg/"+args.session)
pprint(response_o)
return
if args.action == "delete":
if args.session is None:
print("requires --session")
return
response_o = status_messages.json_delete("/status-msg/"+args.session)
pprint(response_o)
return
if args.action == "run_test":
if args.deep_clean:
status_messages.deep_clean = True
status_messages.build()
if not status_messages.passes():
print(status_messages.get_fail_message())
exit(1)
status_messages.start(False, False)
status_messages.stop()
if not status_messages.passes():
print(status_messages.get_fail_message())
exit(1)
status_messages.cleanup()
if status_messages.passes():
print("Full test passed, all messages read and cleaned up")
exit(0)
if __name__ == "__main__":
main()

140
py-scripts/testgroup.py Executable file
View File

@@ -0,0 +1,140 @@
#!/usr/bin/env python3
import sys
if sys.version_info[0] != 3:
print("This script requires Python 3")
exit(1)
if 'py-json' not in sys.path:
sys.path.append('../py-json')
from LANforge.lfcli_base import LFCliBase
from LANforge.LFUtils import *
from LANforge import LFUtils
import argparse
import realm
class TestGroup(LFCliBase):
def __init__(self, host, port,
group_name=None,
add_cx_list=[],
rm_cx_list=[],
tg_action=None,
cx_action=None,
list_groups=None,
show_group=None):
self.local_realm = realm.Realm(lfclient_host=host, lfclient_port=port)
self.tg_profile = self.local_realm.new_test_group_profile()
if group_name is None and list_groups is None and (tg_action is not None or cx_action is not None or
add_cx_list is not None or rm_cx_list is not None or show_group is not None):
raise ValueError("Group name must be set if manipulating test groups")
else:
self.tg_profile.group_name = group_name
self.tg_action = tg_action
self.cx_action = cx_action
self.list_groups = list_groups
self.show_group = show_group
self.add_cx_list = add_cx_list
self.rm_cx_list = rm_cx_list
def do_cx_action(self):
if self.cx_action == 'start':
print("Starting %s" % self.tg_profile.group_name)
self.tg_profile.start_group()
elif self.cx_action == 'stop':
print("Stopping %s" % self.tg_profile.group_name)
self.tg_profile.stop_group()
elif self.cx_action == 'quiesce':
print("Quiescing %s" % self.tg_profile.group_name)
self.tg_profile.quiesce_group()
def do_tg_action(self):
if self.tg_action == 'add':
print("Creating %s" % self.tg_profile.group_name)
self.tg_profile.create_group()
if self.tg_action == 'del':
print("Removing %s" % self.tg_profile.group_name)
if self.tg_profile.check_group_exists():
self.tg_profile.remove_group()
else:
print("%s not found, no action taken" % self.tg_profile.group_name)
def show_info(self):
if self.list_groups:
print("Current Test Groups: ")
for group in self.tg_profile.list_groups():
print(group)
if self.show_group:
print("show_group not yet implemented")
def update_cxs(self):
if len(self.add_cx_list) > 0:
for cx in self.add_cx_list:
self.tg_profile.add_cx(cx)
self.tg_profile.cx_list.append(cx)
if len(self.rm_cx_list) > 0:
for cx in self.rm_cx_list:
self.tg_profile.rm_cx(cx)
if cx in self.tg_profile.cx_list:
self.tg_profile.cx_list.remove(cx)
def main():
parser = LFCliBase.create_bare_argparse(
prog='testgroup.py',
formatter_class=argparse.RawTextHelpFormatter,
epilog='''Control and query test groups\n''',
description='''testgroup.py
--------------------
Generic command example:
''')
parser.add_argument('--group_name', help='specify the name of the test group to use', default=None)
parser.add_argument('--list_groups', help='list all existing test groups', action='store_true', default=False)
tg_group = parser.add_mutually_exclusive_group()
tg_group.add_argument('--add_group', help='add new test group', action='store_true', default=False)
tg_group.add_argument('--del_group', help='delete test group', action='store_true', default=False)
parser.add_argument('--show_group', help='show connections in current test group', action='store_true', default=False)
cx_group = parser.add_mutually_exclusive_group()
cx_group.add_argument('--start_group', help='start all cxs in chosen test group', default=None)
cx_group.add_argument('--stop_group', help='stop all cxs in chosen test group', default=None)
cx_group.add_argument('--quiesce_group', help='quiesce all cxs in chosen test groups', default=None)
parser.add_argument('--add_cx', help='add cx to chosen test group', nargs='*', default=[])
parser.add_argument('--remove_cx', help='remove cx from chosen test group', nargs='*', default=[])
args = parser.parse_args()
tg_action = None
cx_action = None
if args.add_group:
tg_action = 'add'
elif args.del_group:
tg_action = 'del'
if args.start_group:
cx_action = 'start'
elif args.stop_group:
cx_action = 'stop'
elif args.quiesce_group:
cx_action = 'quiesce'
tg = TestGroup(host=args.mgr, port=args.mgr_port,
group_name=args.group_name,
add_cx_list=args.add_cx, rm_cx_list=args.remove_cx, cx_action=cx_action,
tg_action=tg_action, list_groups=args.list_groups, show_group=args.show_group)
tg.do_tg_action()
tg.update_cxs()
tg.do_cx_action()
tg.show_info()
if __name__ == "__main__":
main()

0
py-scripts/wlan_capacity_calculator.py Normal file → Executable file
View File

0
py-scripts/ws_generic_monitor_test.py Normal file → Executable file
View File