mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 11:18:03 +00:00
lfcli_base.py: adding basis for signal handlers in scripts
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
#!env /usr/bin/python
|
||||
|
||||
import sys
|
||||
import signal
|
||||
import traceback
|
||||
# Extend this class to use common set of debug and request features for your script
|
||||
from pprint import pprint
|
||||
@@ -12,6 +13,11 @@ from LANforge import LFRequest
|
||||
import LANforge.LFRequest
|
||||
|
||||
class LFCliBase:
|
||||
|
||||
SHOULD_RUN = 0 # indicates normal operation
|
||||
SHOULD_QUIT = 1 # indicates to quit loops, close files, send SIGQUIT to threads and return
|
||||
SHOULD_HALT = 2 # indicates to quit loops, send SIGABRT to threads and exit
|
||||
|
||||
# do not use `super(LFCLiBase,self).__init__(self, host, port, _debug)
|
||||
# that is py2 era syntax and will force self into the host variable, making you
|
||||
# very confused.
|
||||
@@ -20,7 +26,8 @@ class LFCliBase:
|
||||
_halt_on_error=False,
|
||||
_exit_on_error=False,
|
||||
_exit_on_fail=False,
|
||||
_local_realm=False):
|
||||
_local_realm=False,
|
||||
_capture_signal_list=[]):
|
||||
self.fail_pref = "FAILED: "
|
||||
self.pass_pref = "PASSED: "
|
||||
self.lfclient_host = _lfjson_host
|
||||
@@ -34,12 +41,110 @@ class LFCliBase:
|
||||
self.halt_on_error = _halt_on_error
|
||||
self.exit_on_error = _exit_on_error
|
||||
self.exit_on_fail = _exit_on_fail
|
||||
self.capture_signals = _capture_signal_list
|
||||
# toggle using preexec_cli, preexec_method; the preexec_X parameters are useful
|
||||
# when you desire the lfclient to check for existance of entities to run commands on,
|
||||
# like when developing; you might toggle this with use_preexec = _debug
|
||||
# Otherwise, preexec methods use more processing time because they add an extra CLI call
|
||||
# into the queue, and inspect it -- typically nc_show_port
|
||||
self.suppress_related_commands = None
|
||||
self.finish = self.SHOULD_RUN
|
||||
self.thread_map = {}
|
||||
|
||||
if len(_capture_signal_list) > 0:
|
||||
for zignal in _capture_signal_list:
|
||||
captured_signal(zignal, my_captured_signal)
|
||||
#
|
||||
|
||||
def _finish(self):
|
||||
"""
|
||||
call this to indicate SIGQUIT
|
||||
"""
|
||||
self.finish = self.SHOULD_QUIT
|
||||
|
||||
def _halt(self):
|
||||
"""
|
||||
call this to indicate SIGABRT
|
||||
"""
|
||||
self.finish = self.SHOULD_HALT
|
||||
|
||||
def _should_finish(self):
|
||||
"""
|
||||
check this when in a run loop if SIGQUIT has been indicated
|
||||
"""
|
||||
if self.finish == self.SHOULD_RUN:
|
||||
return False
|
||||
if self.finish == self.SHOULD_QUIT:
|
||||
return True
|
||||
if self.finish == self.SHOULD_HALT:
|
||||
return False
|
||||
|
||||
def _should_halt(self):
|
||||
"""
|
||||
check this when in a run loop if SIGABRT has been indicated
|
||||
"""
|
||||
if self.finish == self.SHOULD_RUN:
|
||||
return False
|
||||
if self.finish == self.SHOULD_QUIT:
|
||||
return False
|
||||
if self.finish == self.SHOULD_HALT:
|
||||
return True
|
||||
|
||||
def track_thread(self, name, thread):
|
||||
if self.thread_map is None:
|
||||
self.thread_map = {}
|
||||
self.thread_map[name] = thread
|
||||
|
||||
def get_thread(self, name):
|
||||
if self.thread_map is None:
|
||||
return None
|
||||
if name in self.thread_map.keys():
|
||||
return self.thread_map[name]
|
||||
return None
|
||||
|
||||
def remove_thread(self, name):
|
||||
if self.thread_map is None:
|
||||
return None
|
||||
if name not in self.thread_map.keys():
|
||||
return None
|
||||
thrud = self.thread_map[name]
|
||||
del self.thread_map[name]
|
||||
return thrud
|
||||
|
||||
def send_thread_signals(signum, fname):
|
||||
if len(self.thread_map) < 1:
|
||||
print("no threads to signal")
|
||||
return
|
||||
for (name, thread) in self.thread_map.items():
|
||||
if self.debug:
|
||||
print("sending signal %s to thread %s" % (signum, name))
|
||||
# do a thing
|
||||
|
||||
def my_captured_signal(signum, frame):
|
||||
"""
|
||||
Override me to process signals, otherwise superclass signal handler is called.
|
||||
You may use _finish() or _halt() to indicate finishing soon or halting immediately.
|
||||
|
||||
:return: True if we processed this signal
|
||||
"""
|
||||
print("my_captured_signal should be overridden")
|
||||
return False
|
||||
|
||||
def caputured_signal(signum):
|
||||
"""
|
||||
Here is your opportunity to decide what to do on things like KeyboardInterrupt or other UNIX signals
|
||||
Check that your subclass handled the signal or not. You may use _finish() or _halt() to indicate
|
||||
finishing soon or halting immediately. Use signal.signal(signal.STOP) to enable this.
|
||||
"""
|
||||
if self.debug:
|
||||
print("Captured signal %s" % signum)
|
||||
if my_captured_signal(signum):
|
||||
if self.debug:
|
||||
print("subclass processed signal")
|
||||
else:
|
||||
if self.debug:
|
||||
print("subclass ignored signal")
|
||||
|
||||
|
||||
def clear_test_results(self):
|
||||
self.test_results.clear()
|
||||
|
||||
Reference in New Issue
Block a user