#!/usr/bin/env python3 import sys import os if sys.version_info[0] != 3: print("This script requires Python 3") exit(1) if 'py-json' not in sys.path: sys.path.append(os.path.join(os.path.abspath('..'), 'py-json')) import argparse from LANforge import LFUtils import time import test_l3_longevity as DFS def valid_endp_types(_endp_type): etypes = _endp_type.split() for endp_type in etypes: valid_endp_type=['lf_udp','lf_udp6','lf_tcp','lf_tcp6','mc_udp','mc_udp6'] if not (str(endp_type) in valid_endp_type): print('invalid endp_type: %s. Valid types lf_udp, lf_udp6, lf_tcp, lf_tcp6, mc_udp, mc_udp6' % endp_type) exit(1) return _endp_type def main(): lfjson_host = "localhost" lfjson_port = 8080 endp_types = "lf_udp" debug_on = False parser = argparse.ArgumentParser( prog='lf_cisco_dfs.py', #formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawTextHelpFormatter, epilog='''\ Useful Information: 1. Polling interval for checking traffic is fixed at 1 minute 2. The test will generate csv file 3. The tx/rx rates are fixed at 256000 bits per second 4. Maximum stations per radio is 64 ''', description='''\ lf_cisco_dfs.py: -------------------- Summary : ---------- create stations, create traffic between upstream port and stations, run traffic. The traffic on the stations will be checked once per minute to verify that traffic is transmitted and recieved. Generic command layout: ----------------------- python .\\lf_cisco_dfs.py --test_duration --endp_type --upstream_port --radio "radio== stations== ssid== ssid_pw== security==" --debug Multiple radios may be entered with individual --radio switches generiic command with controller setting channel and channel width test duration 5 min python3 lf_cisco_dfs.py --cisco_ctlr --cisco_dfs True/False --mgr --cisco_channel --cisco_chan_width <20,40,80,120> --endp_type 'lf_udp lf_tcp mc_udp' --upstream_port <1.ethX> --radio "radio== stations== ssid== ssid_pw== security==" --radio "radio== stations== ssid== ssid_pw== security==" --duration 5m : number followed by one of the following d - days h - hours m - minutes s - seconds : lf_udp : IPv4 UDP traffic lf_tcp : IPv4 TCP traffic lf_udp6 : IPv6 UDP traffic lf_tcp6 : IPv6 TCP traffic mc_udp : IPv4 multi cast UDP traffic mc_udp6 : IPv6 multi cast UDP traffic : BK, BE, VI, VO: Optional wifi related Tos Settings. Or, use your preferred numeric values. ################################# #Command switches ################################# --cisco_ctlr ',default=None --cisco_user ',default="admin" --cisco_passwd ',default="Cisco123 --cisco_prompt ',default="(Cisco Controller) > --cisco_ap ',default="APA453.0E7B.CF9C" --cisco_dfs ',default=False --cisco_channel ',default=None , no change --cisco_chan_width <20 40 80 160>',default="20",choices=["20","40","80","160"] --cisco_band ',default="a",choices=["a", "b", "abgn"] --mgr ',default='localhost' -d / --test_duration example --time 5d (5 days) default: 3m options: number followed by d, h, m or s',default='3m' --tos: Support different ToS settings: BK | BE | VI | VO | numeric',default="BE" --debug: Enable debugging',default=False -t / --endp_type example --endp_type \"lf_udp lf_tcp mc_udp\" Default: lf_udp , options: lf_udp, lf_udp6, lf_tcp, lf_tcp6, mc_udp, mc_udp6', default='lf_udp', type=valid_endp_types -u / --upstream_port example: --upstream_port eth1',default='eth1') -o / --outfile ", default='longevity_results' ######################################### # Examples # ####################################### Example #1 running traffic with two radios 1. Test duration 4 minutes 2. Traffic IPv4 TCP 3. Upstream-port eth1 4. Radio #0 wiphy0 has 32 stations, ssid = candelaTech-wpa2-x2048-4-1, ssid password = candelaTech-wpa2-x2048-4-1 5. Radio #1 wiphy1 has 64 stations, ssid = candelaTech-wpa2-x2048-5-3, ssid password = candelaTech-wpa2-x2048-5-3 6. Create connections with TOS of BK and VI Command: (remove carriage returns) python3 .\\lf_cisco_dfs.py --test_duration 4m --endp_type \"lf_tcp lf_udp mc_udp\" --tos \"BK VI\" --upstream_port eth1 --radio "radio==wiphy0 stations==32 ssid==candelaTech-wpa2-x2048-4-1 ssid_pw==candelaTech-wpa2-x2048-4-1 security==wpa2" --radio "radio==wiphy1 stations==64 ssid==candelaTech-wpa2-x2048-5-3 ssid_pw==candelaTech-wpa2-x2048-5-3 security==wpa2" Example #2 using cisco controller 1. cisco controller at 192.168.100.112 2. cisco dfs True 3. cisco channel 52 4. cisco channel width 20 5. traffic 'lf_udp lf_tcp mc_udp' 6. upstream port eth3 7. radio #0 wiphy0 stations 3 ssid test_candela ssid_pw [BLANK] secruity Open 8. radio #1 wiphy1 stations 16 ssid test_candela ssid_pw [BLANK] security Open 9. lanforge manager at 192.168.100.178 10. duration 5m Command: python3 lf_cisco_dfs.py --cisco_ctlr 192.168.100.112 --cisco_dfs True --mgr 192.168.100.178 --cisco_channel 52 --cisco_chan_width 20 --endp_type 'lf_udp lf_tcp mc_udp' --upstream_port 1.eth3 --radio "radio==1.wiphy0 stations==3 ssid==test_candela ssid_pw==[BLANK] security==open" --radio "radio==1.wiphy1 stations==16 ssid==test_candela ssid_pw==[BLANK] security==open" --test_duration 5m ''') parser.add_argument('--cisco_ctlr', help='--cisco_ctlr ',default=None) parser.add_argument('--cisco_user', help='--cisco_user ',default="admin") parser.add_argument('--cisco_passwd', help='--cisco_passwd ',default="Cisco123") parser.add_argument('--cisco_prompt', help='--cisco_prompt ',default="\(Cisco Controller\) >") parser.add_argument('--cisco_ap', help='--cisco_ap ',default="APA453.0E7B.CF9C") parser.add_argument('--cisco_dfs', help='--cisco_dfs ',default=False) parser.add_argument('--cisco_channel', help='--cisco_channel ',default=None) parser.add_argument('--cisco_chan_width', help='--cisco_chan_width <20 40 80 160>',default="20",choices=["20","40","80","160"]) parser.add_argument('--cisco_band', help='--cisco_band ',default="a",choices=["a", "b", "abgn"]) parser.add_argument('--cisco_series', help='--cisco_series <9800 | 3504>',default="3504",choices=["9800","3504"]) parser.add_argument('--cisco_scheme', help='--cisco_scheme (serial|telnet|ssh): connect via serial, ssh or telnet',default="ssh",choices=["serial","telnet","ssh"]) parser.add_argument('--cisco_wlan', help='--cisco_wlan default: NA, NA means no change',default="NA") parser.add_argument('--cisco_wlanID', help='--cisco_wlanID default: NA , NA means not change',default="NA") parser.add_argument('--cisco_tx_power', help='--cisco_tx_power <1 | 2 | 3 | 4 | 5 | 6 | 7 | 8> 1 is highest power default NA NA means no change',default="NA" ,choices=["1","2","3","4","5","6","7","8","NA"]) parser.add_argument('--amount_ports_to_reset', help='--amount_ports_to_reset \" \" ', default=None) parser.add_argument('--port_reset_seconds', help='--ports_reset_seconds \" \" ', default="10 30") parser.add_argument('--mgr', help='--mgr ',default='localhost') parser.add_argument('-d','--test_duration', help='--test_duration example --time 5d (5 days) default: 3m options: number followed by d, h, m or s',default='3m') parser.add_argument('--tos', help='--tos: Support different ToS settings: BK | BE | VI | VO | numeric',default="BE") parser.add_argument('--debug', help='--debug: Enable debugging',default=False) parser.add_argument('-t', '--endp_type', help='--endp_type example --endp_type \"lf_udp lf_tcp mc_udp\" Default: lf_udp , options: lf_udp, lf_udp6, lf_tcp, lf_tcp6, mc_udp, mc_udp6', default='lf_udp', type=valid_endp_types) parser.add_argument('-u', '--upstream_port', help='--upstream_port example: --upstream_port eth1',default='eth1') parser.add_argument('-o','--csv_outfile', help="--csv_outfile ", default='longevity_results') parser.add_argument('--polling_interval', help="--polling_interval ", default='60s') #parser.add_argument('-c','--csv_output', help="Generate csv output", default=False) parser.add_argument('-r','--radio', action='append', nargs=1, help='--radio \ \"radio== ssid== ssid_pw== security==\" '\ , required=True) parser.add_argument("--cap_ctl_out", help="--cap_ctl_out , switch the cisco controller output will be captured", action='store_true') args = parser.parse_args() #print("args: {}".format(args)) debug_on = args.debug if args.test_duration: test_duration = args.test_duration if args.polling_interval: polling_interval = args.polling_interval if args.endp_type: endp_types = args.endp_type if args.mgr: lfjson_host = args.mgr if args.upstream_port: side_b = args.upstream_port if args.radio: radios = args.radio if args.csv_outfile != None: current_time = time.strftime("%m_%d_%Y_%H_%M_%S", time.localtime()) csv_outfile = "{}_{}.csv".format(args.csv_outfile,current_time) print("csv output file : {}".format(csv_outfile)) MAX_NUMBER_OF_STATIONS = 64 radio_name_list = [] number_of_stations_per_radio_list = [] ssid_list = [] ssid_password_list = [] ssid_security_list = [] #optional radio configuration reset_port_enable_list = [] reset_port_time_min_list = [] reset_port_time_max_list = [] print("radios {}".format(radios)) for radio_ in radios: radio_keys = ['radio','stations','ssid','ssid_pw','security'] radio_info_dict = dict(map(lambda x: x.split('=='), str(radio_).replace('[','').replace(']','').replace("'","").split())) print("radio_dict {}".format(radio_info_dict)) for key in radio_keys: if key not in radio_info_dict: print("missing config, for the {}, all of the following need to be present {} ".format(key,radio_keys)) exit(1) radio_name_list.append(radio_info_dict['radio']) number_of_stations_per_radio_list.append(radio_info_dict['stations']) ssid_list.append(radio_info_dict['ssid']) ssid_password_list.append(radio_info_dict['ssid_pw']) ssid_security_list.append(radio_info_dict['security']) optional_radio_reset_keys = ['reset_port_enable'] radio_reset_found = True for key in optional_radio_reset_keys: if key not in radio_info_dict: #print("port reset test not enabled") radio_reset_found = False break if radio_reset_found: reset_port_enable_list.append(True) reset_port_time_min_list.append(radio_info_dict['reset_port_time_min']) reset_port_time_max_list.append(radio_info_dict['reset_port_time_max']) else: reset_port_enable_list.append(False) reset_port_time_min_list.append('0s') reset_port_time_max_list.append('0s') index = 0 station_lists = [] for (radio_name_, number_of_stations_per_radio_) in zip(radio_name_list,number_of_stations_per_radio_list): number_of_stations = int(number_of_stations_per_radio_) if number_of_stations > MAX_NUMBER_OF_STATIONS: print("number of stations per radio exceeded max of : {}".format(MAX_NUMBER_OF_STATIONS)) quit(1) station_list = LFUtils.portNameSeries(prefix_="sta", start_id_= 1 + index*1000, end_id_= number_of_stations + index*1000, padding_number_=10000, radio=radio_name_) station_lists.append(station_list) index += 1 #print("endp-types: %s"%(endp_types)) dfs = DFS.L3VariableTime( lfjson_host, lfjson_port, args=args, number_template="00", station_lists= station_lists, name_prefix="LT-", endp_types=endp_types, tos=args.tos, side_b=side_b, radio_name_list=radio_name_list, number_of_stations_per_radio_list=number_of_stations_per_radio_list, ssid_list=ssid_list, ssid_password_list=ssid_password_list, ssid_security_list=ssid_security_list, test_duration=test_duration, polling_interval= polling_interval, reset_port_enable_list=reset_port_enable_list, reset_port_time_min_list=reset_port_time_min_list, reset_port_time_max_list=reset_port_time_max_list, side_a_min_rate=256000, side_b_min_rate=256000, debug_on=debug_on, outfile=csv_outfile) dfs.pre_cleanup() dfs.build() if not dfs.passes(): print("build step failed.") print(dfs.get_fail_message()) exit(1) dfs.start(False, False) dfs.stop() if not dfs.passes(): print("stop test failed") print(dfs.get_fail_message()) print("Pausing 30 seconds after run for manual inspection before we clean up.") time.sleep(30) dfs.cleanup() if dfs.passes(): print("Full test passed, all connections increased rx bytes") if __name__ == "__main__": main()