mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
317 lines
12 KiB
Python
Executable File
317 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# This script will set the LANforge to a BLANK database then it will load the specified database
|
|
# and start a graphical report
|
|
import argparse
|
|
import importlib
|
|
import os
|
|
import pprint
|
|
import sys
|
|
import time
|
|
from time import sleep
|
|
|
|
if sys.version_info[0] != 3:
|
|
print("This script requires Python 3")
|
|
exit(1)
|
|
|
|
|
|
sys.path.append(os.path.join(os.path.abspath(__file__ + "../../../")))
|
|
|
|
LFUtils = importlib.import_module("py-json.LANforge.LFUtils")
|
|
lfcli_base = importlib.import_module("py-json.LANforge.lfcli_base")
|
|
LFCliBase = lfcli_base.LFCliBase
|
|
realm = importlib.import_module("py-json.realm")
|
|
Realm = realm.Realm
|
|
|
|
"""
|
|
cvScenario.scenario_db = args.scenario_db
|
|
if args.cv_test is not None:
|
|
cvScenario.cv_test = args.cv_test
|
|
if args.test_scenario is not None:
|
|
cvScenario.test_scenario = args.test_scenario
|
|
"""
|
|
|
|
class RunCvScenario(LFCliBase):
|
|
def __init__(self, lfhost="localhost", lfport=8080, debug_=False, lanforge_db_=None, cv_scenario_=None, cv_test_=None, test_scenario_=None):
|
|
super().__init__( _lfjson_host=lfhost, _lfjson_port=lfport, _debug=debug_, _exit_on_error=True, _exit_on_fail=True)
|
|
self.lanforge_db = lanforge_db_
|
|
self.cv_scenario = cv_scenario_
|
|
self.cv_test = cv_test_
|
|
self.test_profile = test_scenario_
|
|
self.localrealm = Realm(lfclient_host=lfhost, lfclient_port=lfport, debug_=debug_)
|
|
self.report_name = None
|
|
self.load_timeout_sec = 2 * 60
|
|
|
|
def get_report_file_name(self):
|
|
return self.report_name
|
|
|
|
def build(self):
|
|
data = {
|
|
"name": "BLANK",
|
|
"action":"overwrite",
|
|
"clean_dut":"yes",
|
|
"clean_chambers": "yes"
|
|
}
|
|
self.json_post("/cli-json/load", data)
|
|
self.wait_for_db_load_and_sync()
|
|
|
|
port_counter = 0;
|
|
attempts = 6
|
|
while (attempts > 0) and (port_counter > 0):
|
|
sleep(1)
|
|
attempts -= 1
|
|
print("looking for ports like vap+")
|
|
port_list = self.localrealm.find_ports_like("vap+")
|
|
alias_map = LFUtils.portListToAliasMap(port_list)
|
|
port_counter = len(alias_map)
|
|
|
|
port_list = self.localrealm.find_ports_like("sta+")
|
|
alias_map = LFUtils.portListToAliasMap(port_list)
|
|
port_counter += len(alias_map)
|
|
if port_counter == 0:
|
|
break
|
|
|
|
if (port_counter != 0) and (attempts == 0):
|
|
print("There appears to be a vAP in this database, quitting.")
|
|
pprint(alias_map);
|
|
exit(1)
|
|
|
|
data = {
|
|
"name": self.lanforge_db,
|
|
"action":"overwrite",
|
|
"clean_dut":"yes",
|
|
"clean_chambers": "yes"
|
|
}
|
|
self.json_post("/cli-json/load", data)
|
|
self.wait_for_db_load_and_sync()
|
|
self._pass("Loaded scenario %s" % self.lanforge_db, True)
|
|
return True
|
|
|
|
def wait_for_db_load_and_sync(self):
|
|
events_response = self.json_get("/events/last")
|
|
if "event" not in events_response:
|
|
raise ValueError("Unable to find last event")
|
|
if "id" not in events_response["event"]:
|
|
pprint.pprint(events_response["event"])
|
|
raise ValueError("bad event format")
|
|
previous_event_id = events_response["event"]["id"]
|
|
|
|
# check for scenario (db) load message
|
|
begin_time: int = round(time.time() * 1000)
|
|
load_completed = False
|
|
while not load_completed:
|
|
if time.time() > (begin_time + self.load_timeout_sec):
|
|
print("Unable to load database within %d sec" % self.load_timeout_sec)
|
|
exit(1)
|
|
events_response = self.json_get("/events/since/%s" % previous_event_id)
|
|
pronoun = None
|
|
if "events" in events_response:
|
|
pronoun = "events"
|
|
elif "event" in events_response:
|
|
pronoun = "event"
|
|
if not pronoun:
|
|
pprint.pprint(("events response", events_response))
|
|
raise ValueError("incorrect events response")
|
|
for event_o in events_response[pronoun]:
|
|
if load_completed:
|
|
break
|
|
for (key, record) in event_o.items():
|
|
if "event description" not in record:
|
|
continue
|
|
if not record["event description"]:
|
|
continue
|
|
if record["event description"].startswith("LOAD COMPLETED at "):
|
|
print("load completed: %s " % record["event description"])
|
|
load_completed = True
|
|
break
|
|
if not load_completed:
|
|
sleep(1)
|
|
|
|
blobs_last_updated = begin_time
|
|
status_response = self.json_get("/")
|
|
if "text_records_last_updated_ms" in status_response:
|
|
blobs_last_updated = int(status_response["text_records_last_updated_ms"])
|
|
#print("*** blobs updated at %d" % blobs_last_updated)
|
|
else:
|
|
begin_time = round(time.time() * 1000)
|
|
print("no text_records_last_updated_ms, using %d " % begin_time)
|
|
# next we will want to sync our text blobs up
|
|
self.json_post("/cli-json/show_text_blob", {
|
|
"type": "ALL",
|
|
"name": "ALL"
|
|
})
|
|
|
|
load_completed = False
|
|
while not load_completed:
|
|
sleep(1)
|
|
if time.time() > (begin_time + (6 * 1000)):
|
|
print("waited %d sec for text blobs to update" % self.load_timeout_sec)
|
|
load_completed = True
|
|
break
|
|
status_response = self.json_get("/")
|
|
if "text_records_last_updated_ms" in status_response:
|
|
updated = int(status_response["text_records_last_updated_ms"])
|
|
print(", , , , , , , , , updated at %d" % updated)
|
|
if updated > blobs_last_updated:
|
|
load_completed = True
|
|
break
|
|
else:
|
|
pprint.pprint(status_response)
|
|
self.json_post("/cli-json/show_text_blob", {
|
|
"type": "ALL",
|
|
"name": "ALL"
|
|
})
|
|
delta: int = (time.time() * 1000) - begin_time
|
|
print("blobs loaded in %d ms" % delta)
|
|
|
|
# next show duts
|
|
self.json_post("/cli-json/show_dut", {"name": "ALL"})
|
|
self.json_post("/cli-json/show_profile", {"name": "ALL"})
|
|
self.json_post("/cli-json/show_traffic_profile", {"name": "ALL"})
|
|
sleep(5)
|
|
|
|
|
|
def start(self, debug_=False):
|
|
# /gui_cli takes commands keyed on 'cmd', so we create an array of commands
|
|
commands = [
|
|
"cv sync",
|
|
"sleep 4",
|
|
"cv apply '%s'" % self.cv_scenario,
|
|
"sleep 4",
|
|
"cv build",
|
|
"sleep 4",
|
|
"cv is_built",
|
|
"cv sync",
|
|
"sleep 4",
|
|
"cv list_instances",
|
|
"cv create '%s' 'test_ref' 'true'" % self.cv_test,
|
|
"sleep 5",
|
|
"cv load test_ref '%s'" % self.test_profile,
|
|
"sleep 5",
|
|
"cv click test_ref 'Auto Save Report'",
|
|
"sleep 1",
|
|
"cv click test_ref Start",
|
|
"sleep 60",
|
|
"cv get test_ref 'Report Location:'",
|
|
"sleep 5",
|
|
#"cv click test_ref 'Save HTML'",
|
|
"cv click test_ref 'Close'",
|
|
"sleep 1",
|
|
"cv click test_ref Cancel",
|
|
"sleep 1",
|
|
"exit"
|
|
]
|
|
response_json = []
|
|
for command in commands:
|
|
data = {
|
|
"cmd": command
|
|
}
|
|
try:
|
|
debug_par = ""
|
|
if debug_:
|
|
debug_par="?_debug=1"
|
|
if command.endswith("is_built"):
|
|
print("Waiting for scenario to build...", end='')
|
|
self.localrealm.wait_while_building(debug_=False)
|
|
print("...proceeding")
|
|
elif command.startswith("sleep "):
|
|
nap = int(command.split(" ")[1])
|
|
print("sleeping %d..." % nap)
|
|
sleep(nap)
|
|
print("...proceeding")
|
|
elif command == "cv list_instances":
|
|
response_json = []
|
|
print("running %s..." % command, end='')
|
|
response = self.json_post("/gui-json/cmd%s" % debug_par,
|
|
data,
|
|
debug_=debug_,
|
|
response_json_list_=response_json)
|
|
if debug_:
|
|
LFUtils.debug_printer.pprint(response)
|
|
else:
|
|
response_json = []
|
|
print("running %s..." % command, end='')
|
|
response = self.json_post("/gui-json/cmd%s" % debug_par, data, debug_=False, response_json_list_=response_json)
|
|
if debug_:
|
|
LFUtils.debug_printer.pprint(response_json)
|
|
print("...proceeding")
|
|
except Exception as x:
|
|
print(x)
|
|
|
|
self._pass("report finished", print_=True)
|
|
|
|
|
|
def stop(self):
|
|
pass
|
|
|
|
def cleanup(self):
|
|
pass
|
|
|
|
|
|
def main():
|
|
lfjson_host = "localhost"
|
|
lfjson_port = 8080
|
|
parser = argparse.ArgumentParser(
|
|
prog="run_cv_scenario.py",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
description="""LANforge Reporting Script: Load a scenario and run a RvR report
|
|
Example:
|
|
./load_ap_scenario.py --lfmgr 127.0.0.1 --lanforge_db 'handsets' --cv_test 'WiFi Capacity' --test_profile 'test-20'
|
|
""")
|
|
parser.add_argument("-m", "--lfmgr", type=str, help="address of the LANforge GUI machine (localhost is default)")
|
|
parser.add_argument("-o", "--port", type=int, help="IP Port the LANforge GUI is listening on (8080 is default)")
|
|
parser.add_argument("-d", "--lanforge_db", type=str, help="Name of test scenario database (see Status Tab)")
|
|
parser.add_argument("-c", "--cv_scenario", type=str, help="Name of Chamber View test scenario (see CV Manage Scenarios)")
|
|
parser.add_argument("-n", "--cv_test", type=str, help="Chamber View test")
|
|
parser.add_argument("-s", "--test_profile", type=str, help="Name of the saved CV test profile")
|
|
parser.add_argument("--debug", help='Enable debugging', default=False, action="store_true")
|
|
|
|
args = parser.parse_args()
|
|
if args.lfmgr is not None:
|
|
lfjson_host = args.lfmgr
|
|
if args.port is not None:
|
|
lfjson_port = args.port
|
|
debug = False
|
|
if args.debug is not None:
|
|
debug = args.debug
|
|
run_cv_scenario = RunCvScenario(lfjson_host, lfjson_port, debug_=debug)
|
|
|
|
if args.lanforge_db is not None:
|
|
run_cv_scenario.lanforge_db = args.lanforge_db
|
|
if args.cv_scenario is not None:
|
|
run_cv_scenario.cv_scenario = args.cv_scenario
|
|
if args.cv_test is not None:
|
|
run_cv_scenario.cv_test = args.cv_test
|
|
if args.test_profile is not None:
|
|
run_cv_scenario.test_profile = args.test_profile
|
|
|
|
if (run_cv_scenario.lanforge_db is None) or (run_cv_scenario.lanforge_db == ""):
|
|
raise ValueError("Please specificy scenario database name with --scenario_db")
|
|
|
|
if not (run_cv_scenario.build() and run_cv_scenario.passes()):
|
|
print("scenario failed to build.")
|
|
print(run_cv_scenario.get_fail_message())
|
|
exit(1)
|
|
|
|
if not (run_cv_scenario.start() and run_cv_scenario.passes()):
|
|
print("scenario failed to start.")
|
|
print(run_cv_scenario.get_fail_message())
|
|
exit(1)
|
|
|
|
if not (run_cv_scenario.stop() and run_cv_scenario.passes()):
|
|
print("scenario failed to stop:")
|
|
print(run_cv_scenario.get_fail_message())
|
|
exit(1)
|
|
|
|
if not (run_cv_scenario.cleanup() and run_cv_scenario.passes()):
|
|
print("scenario failed to clean up:")
|
|
print(run_cv_scenario.get_fail_message())
|
|
exit(1)
|
|
|
|
report_file = run_cv_scenario.get_report_file_name()
|
|
print("Report file saved to "+report_file)
|
|
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|