mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
328 lines
15 KiB
Python
328 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import os
|
|
|
|
if sys.version_info[0] != 3:
|
|
print("This script requires Python 3")
|
|
exit(1)
|
|
|
|
if 'py-json' not in sys.path:
|
|
sys.path.append(os.path.join(os.path.abspath('..'), 'py-json'))
|
|
|
|
import argparse
|
|
from LANforge import LFUtils
|
|
import time
|
|
import test_l3_longevity as DFS
|
|
|
|
|
|
def valid_endp_types(_endp_type):
|
|
etypes = _endp_type.split()
|
|
for endp_type in etypes:
|
|
valid_endp_type=['lf_udp','lf_udp6','lf_tcp','lf_tcp6','mc_udp','mc_udp6']
|
|
if not (str(endp_type) in valid_endp_type):
|
|
print('invalid endp_type: %s. Valid types lf_udp, lf_udp6, lf_tcp, lf_tcp6, mc_udp, mc_udp6' % endp_type)
|
|
exit(1)
|
|
return _endp_type
|
|
|
|
def main():
|
|
lfjson_host = "localhost"
|
|
lfjson_port = 8080
|
|
endp_types = "lf_udp"
|
|
debug_on = False
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog='lf_cisco_dfs.py',
|
|
#formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
epilog='''\
|
|
Useful Information:
|
|
1. Polling interval for checking traffic is fixed at 1 minute
|
|
2. The test will generate csv file
|
|
3. The tx/rx rates are fixed at 256000 bits per second
|
|
4. Maximum stations per radio is 64
|
|
''',
|
|
|
|
description='''\
|
|
lf_cisco_dfs.py:
|
|
--------------------
|
|
|
|
Summary :
|
|
----------
|
|
create stations, create traffic between upstream port and stations, run traffic.
|
|
The traffic on the stations will be checked once per minute to verify that traffic is transmitted
|
|
and recieved.
|
|
|
|
Generic command layout:
|
|
-----------------------
|
|
python .\\lf_cisco_dfs.py --test_duration <duration> --endp_type <traffic types> --upstream_port <port>
|
|
--radio "radio==<radio> stations==<number staions> ssid==<ssid> ssid_pw==<ssid password> security==<security type: wpa2, open, wpa3>" --debug
|
|
Multiple radios may be entered with individual --radio switches
|
|
|
|
generiic command with controller setting channel and channel width test duration 5 min
|
|
python3 lf_cisco_dfs.py --cisco_ctlr <IP> --cisco_dfs True/False --mgr <Lanforge IP>
|
|
--cisco_channel <channel> --cisco_chan_width <20,40,80,120> --endp_type 'lf_udp lf_tcp mc_udp' --upstream_port <1.ethX>
|
|
--radio "radio==<radio 0 > stations==<number stations> ssid==<ssid> ssid_pw==<ssid password> security==<wpa2 , open>"
|
|
--radio "radio==<radio 1 > stations==<number stations> ssid==<ssid> ssid_pw==<ssid password> security==<wpa2 , open>"
|
|
--duration 5m
|
|
|
|
|
|
<duration>: number followed by one of the following
|
|
d - days
|
|
h - hours
|
|
m - minutes
|
|
s - seconds
|
|
|
|
<traffic type>:
|
|
lf_udp : IPv4 UDP traffic
|
|
lf_tcp : IPv4 TCP traffic
|
|
lf_udp6 : IPv6 UDP traffic
|
|
lf_tcp6 : IPv6 TCP traffic
|
|
mc_udp : IPv4 multi cast UDP traffic
|
|
mc_udp6 : IPv6 multi cast UDP traffic
|
|
|
|
<tos>:
|
|
BK, BE, VI, VO: Optional wifi related Tos Settings. Or, use your preferred numeric values.
|
|
|
|
#################################
|
|
#Command switches
|
|
#################################
|
|
--cisco_ctlr <IP of Cisco Controller>',default=None
|
|
--cisco_user <User-name for Cisco Controller>',default="admin"
|
|
--cisco_passwd <Password for Cisco Controller>',default="Cisco123
|
|
--cisco_prompt <Prompt for Cisco Controller>',default="(Cisco Controller) >
|
|
--cisco_ap <Cisco AP in question>',default="APA453.0E7B.CF9C"
|
|
|
|
--cisco_dfs <True/False>',default=False
|
|
--cisco_channel <channel>',default=None , no change
|
|
--cisco_chan_width <20 40 80 160>',default="20",choices=["20","40","80","160"]
|
|
--cisco_band <a | b | abgn>',default="a",choices=["a", "b", "abgn"]
|
|
|
|
--mgr <hostname for where LANforge GUI is running>',default='localhost'
|
|
-d / --test_duration <how long to run> example --time 5d (5 days) default: 3m options: number followed by d, h, m or s',default='3m'
|
|
--tos: Support different ToS settings: BK | BE | VI | VO | numeric',default="BE"
|
|
--debug: Enable debugging',default=False
|
|
-t / --endp_type <types of traffic> example --endp_type \"lf_udp lf_tcp mc_udp\" Default: lf_udp , options: lf_udp, lf_udp6, lf_tcp, lf_tcp6, mc_udp, mc_udp6',
|
|
default='lf_udp', type=valid_endp_types
|
|
-u / --upstream_port <cross connect upstream_port> example: --upstream_port eth1',default='eth1')
|
|
-o / --outfile <Output file for csv data>", default='longevity_results'
|
|
|
|
#########################################
|
|
# Examples
|
|
# #######################################
|
|
Example #1 running traffic with two radios
|
|
1. Test duration 4 minutes
|
|
2. Traffic IPv4 TCP
|
|
3. Upstream-port eth1
|
|
4. Radio #0 wiphy0 has 32 stations, ssid = candelaTech-wpa2-x2048-4-1, ssid password = candelaTech-wpa2-x2048-4-1
|
|
5. Radio #1 wiphy1 has 64 stations, ssid = candelaTech-wpa2-x2048-5-3, ssid password = candelaTech-wpa2-x2048-5-3
|
|
6. Create connections with TOS of BK and VI
|
|
|
|
Command: (remove carriage returns)
|
|
python3 .\\lf_cisco_dfs.py --test_duration 4m --endp_type \"lf_tcp lf_udp mc_udp\" --tos \"BK VI\" --upstream_port eth1
|
|
--radio "radio==wiphy0 stations==32 ssid==candelaTech-wpa2-x2048-4-1 ssid_pw==candelaTech-wpa2-x2048-4-1 security==wpa2"
|
|
--radio "radio==wiphy1 stations==64 ssid==candelaTech-wpa2-x2048-5-3 ssid_pw==candelaTech-wpa2-x2048-5-3 security==wpa2"
|
|
|
|
Example #2 using cisco controller
|
|
1. cisco controller at 192.168.100.112
|
|
2. cisco dfs True
|
|
3. cisco channel 52
|
|
4. cisco channel width 20
|
|
5. traffic 'lf_udp lf_tcp mc_udp'
|
|
6. upstream port eth3
|
|
7. radio #0 wiphy0 stations 3 ssid test_candela ssid_pw [BLANK] secruity Open
|
|
8. radio #1 wiphy1 stations 16 ssid test_candela ssid_pw [BLANK] security Open
|
|
9. lanforge manager at 192.168.100.178
|
|
10. duration 5m
|
|
|
|
Command:
|
|
python3 lf_cisco_dfs.py --cisco_ctlr 192.168.100.112 --cisco_dfs True --mgr 192.168.100.178
|
|
--cisco_channel 52 --cisco_chan_width 20 --endp_type 'lf_udp lf_tcp mc_udp' --upstream_port 1.eth3
|
|
--radio "radio==1.wiphy0 stations==3 ssid==test_candela ssid_pw==[BLANK] security==open"
|
|
--radio "radio==1.wiphy1 stations==16 ssid==test_candela ssid_pw==[BLANK] security==open"
|
|
--test_duration 5m
|
|
|
|
|
|
''')
|
|
|
|
|
|
parser.add_argument('--cisco_ctlr', help='--cisco_ctlr <IP of Cisco Controller>',default=None)
|
|
parser.add_argument('--cisco_user', help='--cisco_user <User-name for Cisco Controller>',default="admin")
|
|
parser.add_argument('--cisco_passwd', help='--cisco_passwd <Password for Cisco Controller>',default="Cisco123")
|
|
parser.add_argument('--cisco_prompt', help='--cisco_prompt <Prompt for Cisco Controller>',default="\(Cisco Controller\) >")
|
|
parser.add_argument('--cisco_ap', help='--cisco_ap <Cisco AP in question>',default="APA453.0E7B.CF9C")
|
|
|
|
parser.add_argument('--cisco_dfs', help='--cisco_dfs <True/False>',default=False)
|
|
|
|
parser.add_argument('--cisco_channel', help='--cisco_channel <channel>',default=None)
|
|
parser.add_argument('--cisco_chan_width', help='--cisco_chan_width <20 40 80 160>',default="20",choices=["20","40","80","160"])
|
|
parser.add_argument('--cisco_band', help='--cisco_band <a | b | abgn>',default="a",choices=["a", "b", "abgn"])
|
|
parser.add_argument('--cisco_series', help='--cisco_series <9800 | 3504>',default="3504",choices=["9800","3504"])
|
|
parser.add_argument('--cisco_scheme', help='--cisco_scheme (serial|telnet|ssh): connect via serial, ssh or telnet',default="ssh",choices=["serial","telnet","ssh"])
|
|
|
|
parser.add_argument('--cisco_wlan', help='--cisco_wlan <wlan name> default: NA, NA means no change',default="NA")
|
|
parser.add_argument('--cisco_wlanID', help='--cisco_wlanID <wlanID> default: NA , NA means not change',default="NA")
|
|
parser.add_argument('--cisco_tx_power', help='--cisco_tx_power <1 | 2 | 3 | 4 | 5 | 6 | 7 | 8> 1 is highest power default NA NA means no change',default="NA"
|
|
,choices=["1","2","3","4","5","6","7","8","NA"])
|
|
|
|
|
|
|
|
parser.add_argument('--amount_ports_to_reset', help='--amount_ports_to_reset \"<min amount ports> <max amount ports>\" ', default=None)
|
|
parser.add_argument('--port_reset_seconds', help='--ports_reset_seconds \"<min seconds> <max seconds>\" ', default="10 30")
|
|
|
|
parser.add_argument('--mgr', help='--mgr <hostname for where LANforge GUI is running>',default='localhost')
|
|
parser.add_argument('-d','--test_duration', help='--test_duration <how long to run> example --time 5d (5 days) default: 3m options: number followed by d, h, m or s',default='3m')
|
|
parser.add_argument('--tos', help='--tos: Support different ToS settings: BK | BE | VI | VO | numeric',default="BE")
|
|
parser.add_argument('--debug', help='--debug: Enable debugging',default=False)
|
|
parser.add_argument('-t', '--endp_type', help='--endp_type <types of traffic> example --endp_type \"lf_udp lf_tcp mc_udp\" Default: lf_udp , options: lf_udp, lf_udp6, lf_tcp, lf_tcp6, mc_udp, mc_udp6',
|
|
default='lf_udp', type=valid_endp_types)
|
|
parser.add_argument('-u', '--upstream_port', help='--upstream_port <cross connect upstream_port> example: --upstream_port eth1',default='eth1')
|
|
parser.add_argument('-o','--csv_outfile', help="--csv_outfile <Output file for csv data>", default='longevity_results')
|
|
parser.add_argument('--polling_interval', help="--polling_interval <seconds>", default='60s')
|
|
#parser.add_argument('-c','--csv_output', help="Generate csv output", default=False)
|
|
|
|
parser.add_argument('-r','--radio', action='append', nargs=1, help='--radio \
|
|
\"radio==<number_of_wiphy stations=<=number of stations> ssid==<ssid> ssid_pw==<ssid password> security==<security>\" '\
|
|
, required=True)
|
|
parser.add_argument("--cap_ctl_out", help="--cap_ctl_out , switch the cisco controller output will be captured", action='store_true')
|
|
|
|
args = parser.parse_args()
|
|
|
|
#print("args: {}".format(args))
|
|
debug_on = args.debug
|
|
|
|
if args.test_duration:
|
|
test_duration = args.test_duration
|
|
|
|
if args.polling_interval:
|
|
polling_interval = args.polling_interval
|
|
|
|
if args.endp_type:
|
|
endp_types = args.endp_type
|
|
|
|
if args.mgr:
|
|
lfjson_host = args.mgr
|
|
|
|
if args.upstream_port:
|
|
side_b = args.upstream_port
|
|
|
|
if args.radio:
|
|
radios = args.radio
|
|
|
|
if args.csv_outfile != None:
|
|
current_time = time.strftime("%m_%d_%Y_%H_%M_%S", time.localtime())
|
|
csv_outfile = "{}_{}.csv".format(args.csv_outfile,current_time)
|
|
print("csv output file : {}".format(csv_outfile))
|
|
|
|
|
|
MAX_NUMBER_OF_STATIONS = 64
|
|
|
|
radio_name_list = []
|
|
number_of_stations_per_radio_list = []
|
|
ssid_list = []
|
|
ssid_password_list = []
|
|
ssid_security_list = []
|
|
|
|
#optional radio configuration
|
|
reset_port_enable_list = []
|
|
reset_port_time_min_list = []
|
|
reset_port_time_max_list = []
|
|
|
|
print("radios {}".format(radios))
|
|
for radio_ in radios:
|
|
radio_keys = ['radio','stations','ssid','ssid_pw','security']
|
|
radio_info_dict = dict(map(lambda x: x.split('=='), str(radio_).replace('[','').replace(']','').replace("'","").split()))
|
|
print("radio_dict {}".format(radio_info_dict))
|
|
|
|
for key in radio_keys:
|
|
if key not in radio_info_dict:
|
|
print("missing config, for the {}, all of the following need to be present {} ".format(key,radio_keys))
|
|
exit(1)
|
|
|
|
radio_name_list.append(radio_info_dict['radio'])
|
|
number_of_stations_per_radio_list.append(radio_info_dict['stations'])
|
|
ssid_list.append(radio_info_dict['ssid'])
|
|
ssid_password_list.append(radio_info_dict['ssid_pw'])
|
|
ssid_security_list.append(radio_info_dict['security'])
|
|
|
|
optional_radio_reset_keys = ['reset_port_enable']
|
|
radio_reset_found = True
|
|
for key in optional_radio_reset_keys:
|
|
if key not in radio_info_dict:
|
|
#print("port reset test not enabled")
|
|
radio_reset_found = False
|
|
break
|
|
|
|
if radio_reset_found:
|
|
reset_port_enable_list.append(True)
|
|
reset_port_time_min_list.append(radio_info_dict['reset_port_time_min'])
|
|
reset_port_time_max_list.append(radio_info_dict['reset_port_time_max'])
|
|
else:
|
|
reset_port_enable_list.append(False)
|
|
reset_port_time_min_list.append('0s')
|
|
reset_port_time_max_list.append('0s')
|
|
|
|
|
|
|
|
index = 0
|
|
station_lists = []
|
|
for (radio_name_, number_of_stations_per_radio_) in zip(radio_name_list,number_of_stations_per_radio_list):
|
|
number_of_stations = int(number_of_stations_per_radio_)
|
|
if number_of_stations > MAX_NUMBER_OF_STATIONS:
|
|
print("number of stations per radio exceeded max of : {}".format(MAX_NUMBER_OF_STATIONS))
|
|
quit(1)
|
|
station_list = LFUtils.portNameSeries(prefix_="sta", start_id_= 1 + index*1000, end_id_= number_of_stations + index*1000,
|
|
padding_number_=10000, radio=radio_name_)
|
|
station_lists.append(station_list)
|
|
index += 1
|
|
|
|
#print("endp-types: %s"%(endp_types))
|
|
|
|
dfs = DFS.L3VariableTime(
|
|
lfjson_host,
|
|
lfjson_port,
|
|
args=args,
|
|
number_template="00",
|
|
station_lists= station_lists,
|
|
name_prefix="LT-",
|
|
endp_types=endp_types,
|
|
tos=args.tos,
|
|
side_b=side_b,
|
|
radio_name_list=radio_name_list,
|
|
number_of_stations_per_radio_list=number_of_stations_per_radio_list,
|
|
ssid_list=ssid_list,
|
|
ssid_password_list=ssid_password_list,
|
|
ssid_security_list=ssid_security_list,
|
|
test_duration=test_duration,
|
|
polling_interval= polling_interval,
|
|
reset_port_enable_list=reset_port_enable_list,
|
|
reset_port_time_min_list=reset_port_time_min_list,
|
|
reset_port_time_max_list=reset_port_time_max_list,
|
|
side_a_min_rate=256000,
|
|
side_b_min_rate=256000,
|
|
debug_on=debug_on,
|
|
outfile=csv_outfile)
|
|
|
|
dfs.pre_cleanup()
|
|
|
|
dfs.build()
|
|
if not dfs.passes():
|
|
print("build step failed.")
|
|
print(dfs.get_fail_message())
|
|
exit(1)
|
|
dfs.start(False, False)
|
|
dfs.stop()
|
|
if not dfs.passes():
|
|
print("stop test failed")
|
|
print(dfs.get_fail_message())
|
|
|
|
|
|
print("Pausing 30 seconds after run for manual inspection before we clean up.")
|
|
time.sleep(30)
|
|
dfs.cleanup()
|
|
if dfs.passes():
|
|
print("Full test passed, all connections increased rx bytes")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|