mirror of
				https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
				synced 2025-10-31 10:48:02 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			354 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			354 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| NAME: jbr_monitor_bssids.py
 | |
| 
 | |
| PURPOSE: Creates a series of stations and L3 connections
 | |
| Uses lanforge_api.
 | |
| 
 | |
| EXAMPLE:
 | |
| $ ./jbr_monitor_bssids.py --host ct521a-manx \
 | |
|     --radio 1.1.wiphy0 \
 | |
|     --traffic_dur_sec 60 \
 | |
|     --test_duration_min 120 \
 | |
|     --security wpa2 \
 | |
|     --ssid cyrus \
 | |
|     --passwd HelloRudy \
 | |
|     --bssid cc:32:e5:4b:ef:b3 \
 | |
|     --bssid cc:32:e5:4b:ef:b4 \
 | |
|     --bssid 70:4f:57:1e:15:af \
 | |
|     --bssid 70:4f:57:1e:15:ae \
 | |
|     --bssid d8:0d:17:be:ab:36 \
 | |
|     --bssid d8:0d:17:be:ab:37 \
 | |
|     --bssid 54:83:3a:9c:38:7f \
 | |
| 
 | |
| Notes:
 | |
|   # cyrus1 161-    3x3 MCS 0-9 AC WPA2     cc:32:e5:4b:ef:b3 -31.0  5805      100    5.76 m
 | |
|   cyrus1 11      3x3 MIMO       WPA2     cc:32:e5:4b:ef:b4 -32.0  2462      100    5.76 m
 | |
|   cyrus  1       3x3 MIMO       WPA2     70:4f:57:1e:15:af -48.0  2412      100    5.76 m
 | |
|   cyrus  149+    4x4 MCS 0-9 AC WPA2     70:4f:57:1e:15:ae -53.0  5745      100    5.76 m
 | |
|   cyrus  157+    3x3 MCS 0-9 AC WPA2     d8:0d:17:be:ab:36 -64.0  5785      100    5.76 m
 | |
|   cyrus  11      3x3 MIMO       WPA2     d8:0d:17:be:ab:37 -65.0  2462      100    5.76 m
 | |
|   cyrus  11      3x3 MIMO       WPA WPA2 54:83:3a:9c:38:7f -72.0  2462      100    5.76 m
 | |
| 
 | |
| 
 | |
| 
 | |
| TO DO NOTES:
 | |
| 
 | |
| """
 | |
| import logging
 | |
| import sys
 | |
| 
 | |
| if sys.version_info[0] != 3:
 | |
|     print("This script requires Python3")
 | |
|     exit()
 | |
| 
 | |
| import os
 | |
| import importlib
 | |
| import argparse
 | |
| import time
 | |
| from http.client import HTTPResponse
 | |
| from typing import Optional
 | |
| from pprint import pprint
 | |
| 
 | |
| path_hunks = os.path.abspath(__file__).split('/')
 | |
| while (path_hunks[-1] != 'lanforge-scripts'):
 | |
|     path_hunks.pop()
 | |
| sys.path.append(os.path.join("/".join(path_hunks)))
 | |
| realm = importlib.import_module("py-json.realm")
 | |
| Realm = realm.Realm
 | |
| lanforge_api = importlib.import_module("lanforge_client.lanforge_api")
 | |
| Logg = lanforge_api.Logg
 | |
| LFSession = lanforge_api.LFSession
 | |
| LFJsonCommand = lanforge_api.LFJsonCommand
 | |
| LFJsonQuery = lanforge_api.LFJsonQuery
 | |
| 
 | |
| 
 | |
| # from scenario import LoadScenario
 | |
| 
 | |
| 
 | |
| # print(pprint.pformat(("ospath", sys.path)))
 | |
| 
 | |
| class BssidMonitor(Realm):
 | |
|     def __init__(self,
 | |
|                  host: str = "localhost",
 | |
|                  port: int = 8080,
 | |
|                  debug: bool = False,
 | |
|                  args: argparse = None,
 | |
|                  radio: str = None,
 | |
|                  ssid: str = None,
 | |
|                  security: str = None,
 | |
|                  password: str = None,
 | |
|                  upstream: str = None,
 | |
|                  mode: int = 0,
 | |
|                  side_a_min_rate: int = 256000,
 | |
|                  side_b_min_rate: int = 256000,
 | |
|                  test_duration_min: str = "5m",
 | |
|                  report_file: str = None,
 | |
|                  output_format: str = None,
 | |
|                  layer3_cols: list = None,  # p3.9 list[str]
 | |
|                  port_mgr_cols=None,
 | |
|                  monitor_interval_sec: int = 10,
 | |
|                  bssid_list: list = None):  # p3.9 list[str]
 | |
|         """
 | |
|                          lfclient_host="localhost",
 | |
|                  lfclient_port=8080,
 | |
|                  debug_=False,
 | |
|                  _exit_on_error=False,
 | |
|                  _exit_on_fail=False,
 | |
|                  _proxy_str=None,
 | |
|                  _capture_signal_list=[]
 | |
|         :param host:
 | |
|         :param port:
 | |
|         :param radio:
 | |
|         :param ssid:
 | |
|         :param security:
 | |
|         :param password:
 | |
|         :param upstream:
 | |
|         :param mode:
 | |
|         :param side_a_min_rate:
 | |
|         :param side_b_min_rate:
 | |
|         :param test_duration_min:
 | |
|         :param report_file:
 | |
|         :param output_format:
 | |
|         :param layer3_cols:
 | |
|         :param port_mgr_cols:
 | |
|         :param monitor_interval_sec:
 | |
|         :param bssid_list:
 | |
|         """
 | |
|         super().__init__(lfclient_host=host,
 | |
|                          lfclient_port=port,
 | |
|                          debug_=debug,
 | |
|                          _exit_on_error=True,
 | |
|                          _exit_on_fail=False)
 | |
|         self.radio: str = radio
 | |
|         self.ssid: str = ssid
 | |
|         self.security: str = security
 | |
|         self.password: str = password
 | |
|         self.upstream: str = upstream
 | |
|         self.mode: int = mode
 | |
|         self.side_a_min_rate: int = side_a_min_rate
 | |
|         self.side_b_min_rate: int = side_b_min_rate
 | |
|         self.test_duration = test_duration_min
 | |
|         self.report_file: str = report_file
 | |
|         self.output_format: str = output_format
 | |
|         self.port_mgr_cols: list = port_mgr_cols  # p3.9 list[str]
 | |
|         self.layer3_cols = layer3_cols  # py 3.9.x tuple[Optional[list[str]]]
 | |
|         # self.layer3_cols: tuple = layer3_cols # py 3.7.7
 | |
|         self.monitor_interval_sec: int = monitor_interval_sec
 | |
| 
 | |
|         if not bssid_list:
 | |
|             raise ValueError("bssid list necessary to continue")
 | |
|         self.bssid_list: list = bssid_list  # p 3.9: list[str]
 | |
|         self.bssid_sta_profiles: dict = {}  #p3.9: dict[str,str]
 | |
| 
 | |
|         '''
 | |
|         session = LFSession(lfclient_url="http://localhost:8080",
 | |
|                         connect_timeout_sec=20,
 | |
|                         proxy_map={
 | |
|                                 'http':'http://192.168.1.250:3128'
 | |
|                             },
 | |
|                         debug=True,
 | |
|                         die_on_error=False);'''
 | |
| 
 | |
|         self.lf_session: LFSession = \
 | |
|             lanforge_api.LFSession(lfclient_url="http://{host}:{port}/".format(host=host, port=port),
 | |
|                                    debug=debug,
 | |
|                                    connection_timeout_sec=2,
 | |
|                                    stream_errors=True,
 | |
|                                    stream_warnings=True,
 | |
|                                    require_session=True,
 | |
|                                    exit_on_error=True)
 | |
|         if args and args.debugging:
 | |
|             pprint(args.debugging)
 | |
|             # we could have a nested list here?
 | |
|             for item in args.debugging:
 | |
|                 if (type(item) is list):
 | |
|                     item = item[0]
 | |
|                 if item.startswith("tag:"):
 | |
|                     Logg.register_tag(item[item.rindex(":"):])
 | |
|                 if item.startswith("method:"):
 | |
|                     Logg.register_method_name(item[item.rindex(":"):])
 | |
| 
 | |
|         self.lf_command: LFJsonCommand = self.lf_session.get_command()
 | |
|         self.lf_query: LFJsonQuery = self.lf_session.get_query()
 | |
| 
 | |
|     def build(self):
 | |
|         # get last event id
 | |
|         last_event_id = self.before_load_action()
 | |
| 
 | |
|         # load a database
 | |
|         response: HTTPResponse = self.lf_command.post_load(name="BLANK",
 | |
|                                                            action="overwrite",
 | |
|                                                            clean_dut="NA",
 | |
|                                                            clean_chambers="NA",
 | |
|                                                            debug=self.debug)
 | |
| 
 | |
|         if not self.wait_for_load_to_finish(since_id=last_event_id):
 | |
|             exit(1)
 | |
| 
 | |
|         if not response:
 | |
|             raise ConnectionError("lf_command::post_load returned no response")
 | |
| 
 | |
|         # create a series of stations
 | |
|         for bssid in self.bssid_list:
 | |
|             print("build: bssid: %s" % bssid)
 | |
| 
 | |
|     def before_load_action(self):
 | |
|         """
 | |
|         Use this
 | |
|         :return: last event ID in event list
 | |
|         """
 | |
|         err_warn_list = []
 | |
|         self.lf_command.post_show_events(p_type='all',
 | |
|                                          shelf=1,
 | |
|                                          card='all',
 | |
|                                          port='all',
 | |
|                                          endp='all')
 | |
|         time.sleep(0.2)
 | |
| 
 | |
|         event_response = self.lf_query.events_last_events(event_count=1,
 | |
|                                                           debug=self.debug,
 | |
|                                                           wait_sec=1,
 | |
|                                                           max_timeout_sec=120,
 | |
|                                                           errors_warnings=err_warn_list)
 | |
|         if not event_response:
 | |
|             Logg.logg(level=logging.ERROR, msg="No event_response, we should have retried that")
 | |
|             return
 | |
|         # pprint(("event_response:", event_response))
 | |
|         if "id" not in event_response:
 | |
|             pprint(("event_response:", event_response))
 | |
|             return
 | |
|         return event_response["id"]
 | |
| 
 | |
|     def wait_for_load_to_finish(self, since_id: int = None):
 | |
|         """
 | |
|         TODO: make this a standard method outside this module
 | |
|         :param since_id:
 | |
|         :return:
 | |
|         """
 | |
|         completed = False
 | |
|         timer = 0
 | |
|         timeout = 60
 | |
|         while not completed:
 | |
|             new_events = self.lf_query.events_since(event_id=since_id)
 | |
|             if new_events:
 | |
|                 for event_tup in new_events:
 | |
|                     for event_id in event_tup.keys():
 | |
|                         event_record = event_tup[event_id]
 | |
|                         if self.debug:
 | |
|                             pprint("\n        wait_for_load_to_finish: {since} -> {id}: {descr}\n".format(
 | |
|                                 since=since_id,
 | |
|                                 id=event_id,
 | |
|                                 descr=event_record['event description']
 | |
|                             ))
 | |
|                         if event_record['event description'].startswith('LOAD COMPLETED'):
 | |
|                             completed = True
 | |
|                             self.lf_query.logger.warning('Scenario loaded after %s seconds' % timer)
 | |
|                             break
 | |
|             if completed:
 | |
|                 break
 | |
|             else:
 | |
|                 if (timer % 5) == 0:
 | |
|                     self.lf_command.post_show_events(p_type='all',
 | |
|                                                      shelf=1,
 | |
|                                                      card='all',
 | |
|                                                      port='all',
 | |
|                                                      endp='all')
 | |
|                 timer += 1
 | |
|                 time.sleep(1)
 | |
|                 if timer > timeout:
 | |
|                     completed = True
 | |
|                     print('Scenario failed to load after %s seconds' % timeout)
 | |
|                     break
 | |
|                 else:
 | |
|                     print('Waiting %s out of %s seconds to load scenario' % (timer, timeout))
 | |
|         return completed
 | |
| 
 | |
|     def start(self):
 | |
|         pass
 | |
| 
 | |
|     def stop(self):
 | |
|         pass
 | |
| 
 | |
|     def cleanup(self):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     parser = Realm.create_basic_argparse(
 | |
|         formatter_class=argparse.RawTextHelpFormatter,
 | |
|         prog=__file__,
 | |
|         epilog="",
 | |
|         description="Monitor traffic to different APs and report connection stability")
 | |
| 
 | |
|     parser.add_argument('--mode', help='Used to force mode of stations')
 | |
|     parser.add_argument('--bssid',
 | |
|                         action='append', # action='extend' appears in py 3.9
 | |
|                         type=str,
 | |
|                         nargs="+",
 | |
|                         help='Add an AP to the list of APs to test connections to')
 | |
|     #parser.add_argument('--debugging',
 | |
|     #                    action='extend',
 | |
|     #                    type=str,
 | |
|     #                    nargs="+",
 | |
|     #                    help='Debugging for specific areas: "tag:keyword" or "method:methodname" ')
 | |
|     parser.add_argument('--output_format',
 | |
|                         type=str,
 | |
|                         help='choose either csv or xlsx')
 | |
|     parser.add_argument('--report_file',
 | |
|                         default=None,
 | |
|                         type=str,
 | |
|                         help='where you want to store results')
 | |
|     parser.add_argument('--a_min',
 | |
|                         default=256000,
 | |
|                         type=int,
 | |
|                         help='bps rate minimum for side_a')
 | |
|     parser.add_argument('--b_min',
 | |
|                         default=256000,
 | |
|                         type=int,
 | |
|                         help='bps rate minimum for side_b')
 | |
|     parser.add_argument('--test_duration_min',
 | |
|                         default="2",
 | |
|                         type=int,
 | |
|                         help='duration of the test in minutes')
 | |
|     parser.add_argument('--layer3_cols',
 | |
|                         type=list,  # py3.9 list[str]
 | |
|                         help='titles of columns to report')
 | |
|     parser.add_argument('--port_mgr_cols',
 | |
|                         type=list,  # py3.9 list[str]
 | |
|                         help='titles of columns to report')
 | |
| 
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     # pprint.pprint(("args.members:", dir(args)))
 | |
| 
 | |
|     bssid_monitor = BssidMonitor(host=args.mgr,
 | |
|                                  port=args.mgr_port,
 | |
|                                  debug=args.debug,
 | |
|                                  args=args,
 | |
|                                  radio=args.radio,
 | |
|                                  ssid=args.ssid,
 | |
|                                  security=args.security,
 | |
|                                  password=args.passwd,
 | |
|                                  upstream=args.upstream_port,
 | |
|                                  mode=args.mode,
 | |
|                                  side_a_min_rate=args.a_min,
 | |
|                                  side_b_min_rate=args.b_min,
 | |
|                                  test_duration_min=args.test_duration_min,
 | |
|                                  report_file=args.report_file,
 | |
|                                  output_format=args.output_format,
 | |
|                                  layer3_cols=args.layer3_cols,
 | |
|                                  port_mgr_cols=args.port_mgr_cols,
 | |
|                                  monitor_interval_sec=10,
 | |
|                                  bssid_list=args.bssid)
 | |
| 
 | |
|     bssid_monitor.build()
 | |
|     bssid_monitor.start()
 | |
|     bssid_monitor.stop()
 | |
|     bssid_monitor.cleanup()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | |
| #
 | 
