diff --git a/py-json/realm.py b/py-json/realm.py index dcb56178..f3c650d4 100755 --- a/py-json/realm.py +++ b/py-json/realm.py @@ -1102,7 +1102,7 @@ class L3CXProfile(LFCliBase): if output_format is not None: if output_format.lower() != report_file.split('.')[-1]: if output_format.lower() != 'excel': - raise ValueError('Filename %s does not match output format %s' (report_file, output_format)) + raise ValueError('Filename %s does not match output format %s' % (report_file, output_format)) else: output_format = report_file.split('.')[-1] @@ -1125,7 +1125,7 @@ class L3CXProfile(LFCliBase): print(response) value_map[datetime.datetime.now()]=response if datetime.datetime.now() > end_time: - break; + break new_cx_rx_values = self.__get_rx_values() if self.debug: print(old_cx_rx_values, new_cx_rx_values) diff --git a/py-scripts/docstrings.py b/py-scripts/docstrings.py new file mode 100755 index 00000000..310decd3 --- /dev/null +++ b/py-scripts/docstrings.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 + +""" Script for mapping module names to their docstrings, will print output in json format. Will look for all python + files in the current directory and map filename:docstring +""" + +import ast +import os +import pprint + + +class DocstringCollector: + def __init__(self): + self.docstring_map = {} + self.cur_path = os.getcwd() + self.files = [] + + def get_python_files(self): + for file in os.listdir(self.cur_path): + if file.endswith('.py'): + self.files.append(file) + + def map_docstrings(self): + if len(self.files) > 0: + for file in self.files: + try: + with open(file, 'r') as f: + tree = ast.parse(f.read()) + docstring = ast.get_docstring(tree) + if docstring is not None: + self.docstring_map[file] = docstring + except Exception as e: + continue + # print("Exception %s on %s" % (e, file)) + else: + raise ValueError("No python files found in directory") + + +def main(): + collector = DocstringCollector() + collector.get_python_files() + collector.map_docstrings() + # print(collector.docstring_map.keys()) + print(collector.docstring_map) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/py-scripts/test_ipv4_connection.py b/py-scripts/test_ipv4_connection.py deleted file mode 100755 index 89a5e5c6..00000000 --- a/py-scripts/test_ipv4_connection.py +++ /dev/null @@ -1,209 +0,0 @@ -#!/usr/bin/env python3 - - -import sys -import os -import argparse - -if sys.version_info[0] != 3: - print("This script requires Python 3") - exit(1) - -if 'py-json' not in sys.path: - sys.path.append(os.path.join(os.path.abspath('..'), 'py-json')) -import LANforge -from LANforge.lfcli_base import LFCliBase -from LANforge import LFUtils -import realm -import time -import pprint - - -class IPv4Test(LFCliBase): - def __init__(self, - _ssid=None, - _security=None, - _password=None, - _host=None, - _port=None, - _sta_list=None, - _number_template="00000", - _radio="wiphy0", - _proxy_str=None, - _debug_on=False, - _exit_on_error=False, - _exit_on_fail=False): - super().__init__(_host, - _port, - _proxy_str=_proxy_str, - _local_realm=realm.Realm(lfclient_host=_host, - lfclient_port=_port, - halt_on_error_=_exit_on_error, - _exit_on_error=_exit_on_error, - _exit_on_fail=_exit_on_fail, - _proxy_str=_proxy_str, - debug_=_debug_on), - _debug=_debug_on, - _halt_on_error=_exit_on_error, - _exit_on_fail=_exit_on_fail) - self.host = _host - self.port = _port - self.ssid = _ssid - self.security = _security - self.password = _password - self.sta_list = _sta_list - self.radio = _radio - self.timeout = 120 - self.number_template = _number_template - self.debug = _debug_on - self.station_profile = self.local_realm.new_station_profile() - self.station_profile.lfclient_url = self.lfclient_url - self.station_profile.ssid = self.ssid - self.station_profile.ssid_pass = self.password, - self.station_profile.security = self.security - self.station_profile.number_template_ = self.number_template - self.station_profile.mode = 0 - if self.debug: - print("----- Station List ----- ----- ----- ----- ----- ----- \n") - pprint.pprint(self.sta_list) - print("---- ~Station List ----- ----- ----- ----- ----- ----- \n") - - - def build(self): - # Build stations - self.station_profile.use_security(self.security, self.ssid, self.password) - self.station_profile.set_number_template(self.number_template) - - print("Creating stations") - self.station_profile.set_command_flag("add_sta", "create_admin_down", 1) - self.station_profile.set_command_param("set_port", "report_timer", 1500) - self.station_profile.set_command_flag("set_port", "rpt_timer", 1) - self.station_profile.create(radio=self.radio, sta_names_=self.sta_list, debug=self.debug) - self._pass("PASS: Station build finished") - - def start(self, sta_list, print_pass, print_fail): - self.station_profile.admin_up() - associated_map = {} - ip_map = {} - print("Starting test...") - for sec in range(self.timeout): - for sta_name in sta_list: - eidn = self.local_realm.name_to_eid(sta_name) - url = "/port/1/%s/%s" % (eidn[1], eidn[2]) - sta_status = self.json_get(url + "?fields=port,alias,ip,ap", debug_=self.debug) - # print(sta_status) - if (sta_status is None) or (sta_status['interface'] is None) or (sta_status['interface']['ap'] is None): - continue - if (len(sta_status['interface']['ap']) == 17) and (sta_status['interface']['ap'][-3] == ':'): - if self.debug: - print("Associated", sta_name, sta_status['interface']['ap'], sta_status['interface']['ip']) - associated_map[sta_name] = 1 - if sta_status['interface']['ip'] != '0.0.0.0': - if self.debug: - print("IP", sta_name, sta_status['interface']['ap'], sta_status['interface']['ip']) - ip_map[sta_name] = 1 - if (len(sta_list) == len(ip_map)) and (len(sta_list) == len(associated_map)): - break - else: - time.sleep(1) - - if self.debug: - print("sta_list", len(sta_list), sta_list) - print("ip_map", len(ip_map), ip_map) - print("associated_map", len(associated_map), associated_map) - if (len(sta_list) == len(ip_map)) and (len(sta_list) == len(associated_map)): - self._pass("PASS: All stations associated with IP", print_pass) - else: - self._fail("FAIL: Not all stations able to associate/get IP", print_fail) - print("sta_list", sta_list) - print("ip_map", ip_map) - print("associated_map", associated_map) - - return self.passes() - - def stop(self): - # Bring stations down - self.station_profile.admin_down() - - def cleanup(self, sta_list): - self.station_profile.cleanup(sta_list, debug_=self.debug) - LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, - port_list=sta_list, - debug=self.debug) - time.sleep(1) - -def main(): - parser = LFCliBase.create_basic_argparse( - prog='test_ipv4_connection.py', - formatter_class=argparse.RawTextHelpFormatter, - epilog='''\ - Create stations that attempt to authenticate, associate, and receive IP addresses on the - chosen SSID - ''', - - description='''\ - test_ipv4_connection.py --------------------- -Command example: -./test_ipv4_connection.py - --upstream_port eth1 - --radio wiphy0 - --num_stations 3 - --security open - --ssid netgear - --passwd BLANK - --debug - ''') - required = parser.add_argument_group('required arguments') - #required.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', required=True) - - args = parser.parse_args() - #if args.debug: - # pprint.pprint(args) - # time.sleep(5) - if (args.radio is None): - raise ValueError("--radio required") - - num_sta = 2 - if (args.num_stations is not None) and (int(args.num_stations) > 0): - num_stations_converted = int(args.num_stations) - num_sta = num_stations_converted - - station_list = LFUtils.port_name_series(prefix="sta", - start_id=0, - end_id=num_sta-1, - padding_number=10000, - radio=args.radio) - if args.debug: - print("args.proxy: %s" % args.proxy) - ip_test = IPv4Test(_host=args.mgr, - _port=args.mgr_port, - _ssid=args.ssid, - _password=args.passwd, - _security=args.security, - _sta_list=station_list, - _radio=args.radio, - _proxy_str=args.proxy, - _debug_on=args.debug) - - ip_test.cleanup(station_list) - ip_test.build() - if not ip_test.passes(): - print(ip_test.get_fail_message()) - ip_test.add_event(name="test_ipv4_connection.py", message=ip_test.get_fail_message()) - ip_test.exit_fail() - ip_test.start(station_list, False, False) - ip_test.stop() - if not ip_test.passes(): - print(ip_test.get_fail_message()) - ip_test.add_event(name="test_ipv4_connection.py", message=ip_test.get_fail_message()) - ip_test.exit_fail() - time.sleep(30) - ip_test.cleanup(station_list) - if ip_test.passes(): - ip_test.add_event(name="test_ipv4_connection.py", message="Full test passed, all stations associated and got IP") - ip_test.exit_success() - - -if __name__ == "__main__": - main() diff --git a/py-scripts/test_ipv4_l4.py b/py-scripts/test_ipv4_l4.py index ae88bf6c..35b0f324 100755 --- a/py-scripts/test_ipv4_l4.py +++ b/py-scripts/test_ipv4_l4.py @@ -1,5 +1,13 @@ #!/usr/bin/env python3 +""" + This script will create a variable number of stations that will attempt to use layer 4 endpoints to generate traffic. + A test will run for a specified amount of time with periodic checks for traffic successfully being received. The test will + monitor the bytes-rd column of the endpoints. An increase at every test interval will pass the test, otherwise it will fail + + Use './test_ipv4_l4.py --help' to see command line usage and options +""" + import sys if sys.version_info[0] != 3: diff --git a/py-scripts/test_ipv4_l4_ftp_upload.py b/py-scripts/test_ipv4_l4_ftp_upload.py index c172d780..7db91924 100755 --- a/py-scripts/test_ipv4_l4_ftp_upload.py +++ b/py-scripts/test_ipv4_l4_ftp_upload.py @@ -1,5 +1,13 @@ #!/usr/bin/env python3 +""" + This script will create a variable number of stations that will attempt to use layer 4 endpoints to generate traffic over ftp. + A test will run for a specified amount of time with periodic checks for traffic successfully being received. The test will + monitor the bytes-wr column of the endpoints. An increase at every test interval will pass the test, otherwise it will fail + + Use './test_ipv4_l4_ftp_upload.py --help' to see command line usage and options +""" + import sys if sys.version_info[0] != 3: diff --git a/py-scripts/test_ipv4_l4_ftp_urls_per_ten.py b/py-scripts/test_ipv4_l4_ftp_urls_per_ten.py index fd18e361..c2e2a9f3 100755 --- a/py-scripts/test_ipv4_l4_ftp_urls_per_ten.py +++ b/py-scripts/test_ipv4_l4_ftp_urls_per_ten.py @@ -1,5 +1,14 @@ #!/usr/bin/env python3 +""" + This script will create a variable number of stations that will attempt to use layer 4 endpoints to generate traffic over ftp. + A test will run a specified number of times with periodic checks for traffic successfully being received every ten minutes. + The test will monitor the urls/s column of the endpoints. Being at or above 90% of the chosen url rate at each interval + will pass the test, otherwise it will fail + + Use './test_ipv4_l4_ftp_urls_per_ten.py --help' to see command line usage and options +""" + import sys import os if sys.version_info[0] != 3: diff --git a/py-scripts/test_ipv4_l4_ftp_wifi.py b/py-scripts/test_ipv4_l4_ftp_wifi.py index 6b39a574..fc08c7cf 100755 --- a/py-scripts/test_ipv4_l4_ftp_wifi.py +++ b/py-scripts/test_ipv4_l4_ftp_wifi.py @@ -1,5 +1,13 @@ #!/usr/bin/env python3 +""" + This script will create a variable number of stations that will attempt to use layer 4 endpoints to generate traffic over ftp. + A test will run for a specified amount of time with periodic checks for traffic successfully being received. The test will + monitor the bytes-wr column of the endpoints. An increase at every test interval will pass the test, otherwise it will fail + + Use './test_ipv4_l4_ftp_wifi.py --help' to see command line usage and options +""" + import sys if sys.version_info[0] != 3: @@ -160,7 +168,7 @@ def main(): # formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawTextHelpFormatter, epilog='''\ - Create layer-4 endpoints and test that the bytes-rd from the chosen URL are increasing over the + Create layer-4 endpoints and test that the bytes-wr from the chosen URL are increasing over the duration of the test ''', diff --git a/py-scripts/test_ipv4_l4_urls_per_ten.py b/py-scripts/test_ipv4_l4_urls_per_ten.py index dbe5ec8f..eacfdc5b 100755 --- a/py-scripts/test_ipv4_l4_urls_per_ten.py +++ b/py-scripts/test_ipv4_l4_urls_per_ten.py @@ -1,5 +1,14 @@ #!/usr/bin/env python3 +""" + This script will create a variable number of stations that will attempt to use layer 4 endpoints to generate traffic. + A test will run a specified number of times with periodic checks for traffic successfully being received every ten minutes. + The test will monitor the urls/s column of the endpoints. Being at or above 90% of the chosen url rate at each interval + will pass the test, otherwise it will fail + + Use './test_ipv4_l4_urls_per_ten.py --help' to see command line usage and options +""" + import sys import os if sys.version_info[0] != 3: diff --git a/py-scripts/test_ipv4_l4_wifi.py b/py-scripts/test_ipv4_l4_wifi.py index 59e71b94..b16f7c48 100755 --- a/py-scripts/test_ipv4_l4_wifi.py +++ b/py-scripts/test_ipv4_l4_wifi.py @@ -1,5 +1,13 @@ #!/usr/bin/env python3 +""" + This script will create a variable number of stations that will attempt to use layer 4 endpoints to generate traffic. + A test will run for a specified amount of time with periodic checks for traffic successfully being received. The test will + monitor the bytes-rd column of the endpoints. An increase at every test interval will pass the test, otherwise it will fail + + Use './test_ipv4_l4_wifi.py --help' to see command line usage and options +""" + import sys if sys.version_info[0] != 3: