jbr_monitor_bssids.py: WIP attempting to recreate load scenario logic with lanforge_api

This commit is contained in:
Jed Reynolds
2021-10-20 17:38:58 -07:00
parent 7e53c4722b
commit 8ff674f1f2

View File

@@ -44,7 +44,8 @@ if sys.version_info[0] != 3:
import os
import importlib
import argparse
from http.client import HTTPResponse
from typing import Optional
path_hunks = os.path.abspath(__file__).split('/')
while( path_hunks[-1] != 'lanforge-scripts'):
@@ -56,6 +57,7 @@ lanforge_api = importlib.import_module("lanforge_client.lanforge_api")
LFSession = lanforge_api.LFSession
LFJsonCommand = lanforge_api.LFJsonCommand
LFJsonQuery = lanforge_api.LFJsonQuery
# from scenario import LoadScenario
import pprint
# print(pprint.pformat(("ospath", sys.path)))
@@ -123,7 +125,7 @@ class BssidMonitor(Realm):
self.report_file: str = report_file
self.output_format: str = output_format
self.port_mgr_cols: list[str] = port_mgr_cols
self.layer3_cols: list[str] = layer3_cols,
self.layer3_cols: tuple[Optional[list[str]]] = layer3_cols,
self.monitor_interval_sec: int = monitor_interval_sec
if not bssid_list:
@@ -139,13 +141,32 @@ class BssidMonitor(Realm):
},
debug=True,
die_on_error=False);'''
self.lf_session = lanforge_api.LFSession(lfclient_url="http://{host}:{port}/".format(host=host, port=port))
self.lf_command = self.lf_session.get_command()
self.lf_query = self.lf_session.get_query()
self.lf_session: LFSession =\
lanforge_api.LFSession(lfclient_url="http://{host}:{port}/".format(host=host, port=port),
debug=debug,
connection_timeout_sec=2,
stream_errors=True,
stream_warnings=True,
require_session=True,
exit_on_error=True)
self.lf_command : LFJsonCommand = self.lf_session.get_command()
self.lf_query : LFJsonQuery = self.lf_session.get_query()
def build(self):
err_warn_list = []
# query for the last response
event_response = self.lf_query.events_last_events(event_count=1,
debug=self.debug,
errors_warnings=err_warn_list)
pprint(("event_response", dir(event_response)))
# load a database
response: HTTPResponse = self.lf_command.post_load(name="BLANK",
action="overwrite",
clean_dut="NA",
clean_chambers="NA",
debug=self.debug_)
if not response:
raise ConnectionError("lf_command::post_load returned no response")
# create a series of stations
for bssid in self.bssid_list: