run_cv_scenario.py: adds more timing and refresh logic

Use the refresh() method between loading databases to force updates of text blobs, DUTs and profiles.
Look for last-updated times so as to not wait overly long for text blobs to update.
Adds more cv sync calls to make sure that CV has gotten refreshed.

Signed-off-by: Jed Reynolds <jed@bitratchet.com>
This commit is contained in:
Jed Reynolds
2021-10-01 15:23:35 -07:00
parent 2ddf6e10a9
commit ad188af0ba

View File

@@ -1,12 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# This script will set the LANforge to a BLANK database then it will load the specified database # This script will set the LANforge to a BLANK database then it will load the specified database
# and start a graphical report # and start a graphical report
import sys
import os
import importlib
import argparse import argparse
from time import sleep import importlib
import os
import pprint import pprint
import sys
import time
from time import sleep
if sys.version_info[0] != 3: if sys.version_info[0] != 3:
print("This script requires Python 3") print("This script requires Python 3")
@@ -38,6 +39,7 @@ class RunCvScenario(LFCliBase):
self.test_profile = test_scenario_ self.test_profile = test_scenario_
self.localrealm = Realm(lfclient_host=lfhost, lfclient_port=lfport, debug_=debug_) self.localrealm = Realm(lfclient_host=lfhost, lfclient_port=lfport, debug_=debug_)
self.report_name = None self.report_name = None
self.load_timeout_sec = 2 * 60
def get_report_file_name(self): def get_report_file_name(self):
return self.report_name return self.report_name
@@ -50,7 +52,8 @@ class RunCvScenario(LFCliBase):
"clean_chambers": "yes" "clean_chambers": "yes"
} }
self.json_post("/cli-json/load", data) self.json_post("/cli-json/load", data)
sleep(1) self.refresh()
port_counter = 0; port_counter = 0;
attempts = 6 attempts = 6
while (attempts > 0) and (port_counter > 0): while (attempts > 0) and (port_counter > 0):
@@ -79,25 +82,107 @@ class RunCvScenario(LFCliBase):
"clean_chambers": "yes" "clean_chambers": "yes"
} }
self.json_post("/cli-json/load", data) self.json_post("/cli-json/load", data)
sleep(1) self.refresh()
self._pass("Loaded scenario %s" % self.lanforge_db, True) self._pass("Loaded scenario %s" % self.lanforge_db, True)
return True return True
def refresh(self):
events_response = self.json_get("/events/last")
if "event" not in events_response:
raise ValueError("Unable to find last event")
if "id" not in events_response["event"]:
pprint.pprint(events_response["event"])
raise ValueError("bad event format")
previous_event_id = events_response["event"]["id"]
# check for scenario (db) load message
begin_time: int = round(time.time() * 1000)
load_completed = False
while not load_completed:
if time.time() > (begin_time + self.load_timeout_sec):
print("Unable to load database within %d sec" % self.load_timeout_sec)
exit(1)
events_response = self.json_get("/events/since/%s" % previous_event_id)
if "events" not in events_response:
pprint.pprint(("events response", events_response))
raise ValueError("incorrect events response")
for event_o in events_response["events"]:
if load_completed:
break
for (key, record) in event_o.items():
if "event description" not in record:
continue
if not record["event description"]:
continue
if record["event description"].startswith("LOAD COMPLETED at "):
print("load completed: %s " % record["event description"])
load_completed = True
break
if not load_completed:
sleep(1)
blobs_last_updated = begin_time
status_response = self.json_get("/")
if "text_records_last_updated_ms" in status_response:
blobs_last_updated = int(status_response["text_records_last_updated_ms"])
#print("*** blobs updated at %d" % blobs_last_updated)
else:
begin_time = round(time.time() * 1000)
print("no text_records_last_updated_ms, using %d " % begin_time)
# next we will want to sync our text blobs up
self.json_post("/cli-json/show_text_blob", {
"type": "ALL",
"name": "ALL"
})
load_completed = False
while not load_completed:
sleep(1)
if time.time() > (begin_time + (6 * 1000)):
print("waited %d sec for text blobs to update" % self.load_timeout_sec)
load_completed = True
break
status_response = self.json_get("/")
if "text_records_last_updated_ms" in status_response:
updated = int(status_response["text_records_last_updated_ms"])
print(", , , , , , , , , updated at %d" % updated)
if updated > blobs_last_updated:
load_completed = True
break
else:
pprint.pprint(status_response)
self.json_post("/cli-json/show_text_blob", {
"type": "ALL",
"name": "ALL"
})
delta: int = (time.time() * 1000) - begin_time
print("blobs loaded in %d ms" % delta)
# next show duts
self.json_post("/cli-json/show_dut", {"name": "ALL"})
self.json_post("/cli-json/show_profile", {"name": "ALL"})
self.json_post("/cli-json/show_traffic_profile", {"name": "ALL"})
sleep(5)
def start(self, debug_=False): def start(self, debug_=False):
# /gui_cli takes commands keyed on 'cmd', so we create an array of commands # /gui_cli takes commands keyed on 'cmd', so we create an array of commands
commands = [ commands = [
"cv sync",
"sleep 4",
"cv apply '%s'" % self.cv_scenario, "cv apply '%s'" % self.cv_scenario,
"sleep 4",
"cv build", "cv build",
"sleep 4", "sleep 4",
"cv is_built", "cv is_built",
"cv sync", "cv sync",
"sleep 2", "sleep 4",
"cv create '%s' 'test_ref' 'true'" % self.cv_test, "cv create '%s' 'test_ref' 'true'" % self.cv_test,
"sleep 2",
"cv load test_ref '%s'" % self.test_profile,
"sleep 1",
"cv click test_ref 'Auto Save Report'",
"sleep 5", "sleep 5",
"cv load test_ref '%s'" % self.test_profile,
"sleep 5",
"cv click test_ref 'Auto Save Report'",
"sleep 1",
"cv click test_ref Start", "cv click test_ref Start",
"sleep 60", "sleep 60",
"cv get test_ref 'Report Location:'", "cv get test_ref 'Report Location:'",