mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-10-30 18:27:53 +00:00
JAG: added methods for events_X and alerts_X
This commit is contained in:
@@ -70,9 +70,7 @@
|
||||
class which appends subclasses to it.
|
||||
|
||||
----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----"""
|
||||
import http.client
|
||||
import sys
|
||||
|
||||
if sys.version_info[0] != 3:
|
||||
print("This script requires Python 3")
|
||||
exit()
|
||||
@@ -81,14 +79,14 @@ import urllib
|
||||
from urllib import request
|
||||
from urllib import error
|
||||
from urllib import parse
|
||||
import http.client
|
||||
from http.client import HTTPResponse
|
||||
|
||||
import json
|
||||
import logging
|
||||
from logging import Logger
|
||||
import time
|
||||
import traceback
|
||||
# from typing import Type
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
from enum import IntFlag
|
||||
from pprint import pprint
|
||||
@@ -496,17 +494,20 @@ class BaseLFJsonRequest:
|
||||
url: str = "",
|
||||
post_data: dict = None,
|
||||
debug: bool = False,
|
||||
wait_sec: float = None,
|
||||
max_timeout_sec: float = None,
|
||||
die_on_error: bool = False,
|
||||
errors_warnings: list[str] = None,
|
||||
response_json_list: list = None,
|
||||
method_: str = 'POST',
|
||||
session_id_: str = ""):
|
||||
session_id_: str = "") -> Optional[HTTPResponse]:
|
||||
"""
|
||||
|
||||
:param url: URL to post to
|
||||
:param post_data: data to send in post
|
||||
:param debug: turn on diagnostics
|
||||
:param die_on_error: exit() if the return status is not 200
|
||||
:param response_json_list: list of result data
|
||||
:param response_json_list: pass in a list to store json data in the response
|
||||
:param method_: override HTTP method, please do not override
|
||||
:param session_id_: insert a session to the header; this is useful in the case where we are
|
||||
operating outside a session context, like during the __del__ constructor
|
||||
@@ -520,7 +521,7 @@ class BaseLFJsonRequest:
|
||||
% (self.session_id, self.session_instance.get_session_id()))
|
||||
if die_on_error:
|
||||
exit(1)
|
||||
responses = []
|
||||
responses : list[HTTPResponse] = []
|
||||
url = self.get_corrected_url(url)
|
||||
if (post_data is not None) and (post_data is not self.No_Data):
|
||||
myrequest = request.Request(url=url,
|
||||
@@ -548,66 +549,89 @@ class BaseLFJsonRequest:
|
||||
|
||||
# https://stackoverflow.com/a/59635684/11014343
|
||||
|
||||
resp: http.client.HTTPResponse
|
||||
try:
|
||||
resp = urllib.request.urlopen(myrequest)
|
||||
resp_data = resp.read().decode('utf-8')
|
||||
if debug and die_on_error:
|
||||
self.logger.debug(__name__+" ----- json_post: debug: --------------------------------------------")
|
||||
self.logger.debug("URL: %s :%d " % (url, resp.status))
|
||||
self.logger.debug(__name__+" ----- headers -------------------------------------------------")
|
||||
if resp.status != 200:
|
||||
self.logger.error(pformat(resp.getheaders()))
|
||||
self.logger.error(__name__+" ----- response -------------------------------------------------")
|
||||
self.logger.error(pformat(resp_data))
|
||||
self.logger.error(" ----- -------------------------------------------------")
|
||||
responses.append(resp)
|
||||
headers = resp.getheaders()
|
||||
if debug:
|
||||
self.logger.warning("response headers:")
|
||||
self.logger.warning(pformat(headers))
|
||||
if SESSION_HEADER in headers:
|
||||
if self.session_id != resp.getheader(SESSION_HEADER):
|
||||
self.logger.warning("established session header [%s] different from response session header[%s]"
|
||||
% (self.session_id, resp.getheader(SESSION_HEADER)))
|
||||
if response_json_list is not None:
|
||||
if type(response_json_list) is not list:
|
||||
raise ValueError("reponse_json_list needs to be type list")
|
||||
jzon_str = json.loads(resp_data)
|
||||
if debug:
|
||||
self.logger.debug(__name__+":----- json_post debug: --------------------------------------------")
|
||||
self.logger.debug("URL: %s :%d " % (url, resp.status))
|
||||
response: http.client.HTTPResponse
|
||||
|
||||
if wait_sec:
|
||||
time.sleep(wait_sec)
|
||||
begin_time_ms = time.time() * 1000
|
||||
if not max_timeout_sec:
|
||||
max_timeout_sec = self.session_instance.max_timeout_sec
|
||||
finish_time_ms = (max_timeout_sec * 1000) + begin_time_ms
|
||||
attempt = 1
|
||||
while (time.time() * 1000) < finish_time_ms:
|
||||
try:
|
||||
response = urllib.request.urlopen(myrequest)
|
||||
resp_data = response.read().decode('utf-8')
|
||||
jzon_data = None
|
||||
if debug and die_on_error:
|
||||
self.logger.debug(__name__+" ----- json_post: %d debug: --------------------------------------------" % attempt)
|
||||
self.logger.debug("URL: %s :%d " % (url, response.status))
|
||||
self.logger.debug(__name__+" ----- headers -------------------------------------------------")
|
||||
self.logger.debug(pformat(resp.getheaders()))
|
||||
self.logger.debug(__name__+" ----- response -------------------------------------------------")
|
||||
self.logger.debug(pformat(jzon_str))
|
||||
self.logger.debug("-------------------------------------------------")
|
||||
response_json_list.append(jzon_str)
|
||||
if resp.status not in self.OK_STATUSES:
|
||||
self.logger.debug("----------------- BAD STATUS --------------------------------")
|
||||
if response.status != 200:
|
||||
self.logger.error(pformat(response.getheaders()))
|
||||
self.logger.error(__name__+" ----- response -------------------------------------------------")
|
||||
self.logger.error(pformat(resp_data))
|
||||
self.logger.error(" ----- -------------------------------------------------")
|
||||
responses.append(response)
|
||||
header_items = response.getheaders()
|
||||
if debug:
|
||||
self.logger.warning("BaseJsonRequest::json_post: response headers:")
|
||||
self.logger.warning(pformat(header_items))
|
||||
if SESSION_HEADER in header_items:
|
||||
if self.session_id != response.getheader(SESSION_HEADER):
|
||||
self.logger.warning("established session header [%s] different from response session header[%s]"
|
||||
% (self.session_id, response.getheader(SESSION_HEADER)))
|
||||
if errors_warnings:
|
||||
for header in header_items:
|
||||
if header[0].startswith("X-Error") == 0:
|
||||
errors_warnings.append(header)
|
||||
if header[0].startswith("X-Warning") == 0:
|
||||
errors_warnings.append(header)
|
||||
|
||||
if response_json_list is not None:
|
||||
if type(response_json_list) is not list:
|
||||
raise ValueError("reponse_json_list needs to be type list")
|
||||
jzon_data = json.loads(resp_data)
|
||||
if debug:
|
||||
self.logger.debug(__name__+":----- json_post debug: ------------------------------------------")
|
||||
self.logger.debug("URL: %s :%d " % (url, response.status))
|
||||
self.logger.debug(__name__+" ----- headers -------------------------------------------------")
|
||||
self.logger.debug(pformat(response.getheaders()))
|
||||
self.logger.debug(__name__+" ----- response -------------------------------------------------")
|
||||
self.logger.debug(pformat(jzon_data))
|
||||
self.logger.debug("-------------------------------------------------")
|
||||
response_json_list.append(jzon_data)
|
||||
|
||||
if response.status not in self.OK_STATUSES:
|
||||
if errors_warnings:
|
||||
if "errors" in jzon_data:
|
||||
errors_warnings.extend(jzon_data["errors"])
|
||||
if "warnings" in jzon_data:
|
||||
errors_warnings.extend(jzon_data["warnings"])
|
||||
self.logger.debug("----------------- BAD STATUS --------------------------------")
|
||||
if die_on_error:
|
||||
exit(1)
|
||||
return responses[0]
|
||||
|
||||
except urllib.error.HTTPError as herror:
|
||||
print_diagnostics(url_=url,
|
||||
request_=myrequest,
|
||||
responses_=responses,
|
||||
error_=herror,
|
||||
debug_=debug,
|
||||
die_on_error_=die_on_error)
|
||||
if die_on_error:
|
||||
exit(1)
|
||||
return responses[0]
|
||||
|
||||
except urllib.error.HTTPError as herror:
|
||||
print_diagnostics(url_=url,
|
||||
request_=myrequest,
|
||||
responses_=responses,
|
||||
error_=herror,
|
||||
debug_=debug,
|
||||
die_on_error_=die_on_error)
|
||||
if die_on_error:
|
||||
exit(1)
|
||||
|
||||
except urllib.error.URLError as uerror:
|
||||
print_diagnostics(url_=url,
|
||||
request_=myrequest,
|
||||
responses_=responses,
|
||||
error_=uerror,
|
||||
debug_=debug,
|
||||
die_on_error_=die_on_error)
|
||||
if die_on_error:
|
||||
exit(1)
|
||||
except urllib.error.URLError as uerror:
|
||||
print_diagnostics(url_=url,
|
||||
request_=myrequest,
|
||||
responses_=responses,
|
||||
error_=uerror,
|
||||
debug_=debug,
|
||||
die_on_error_=die_on_error)
|
||||
if die_on_error:
|
||||
exit(1)
|
||||
|
||||
if die_on_error:
|
||||
exit(1)
|
||||
@@ -616,14 +640,16 @@ class BaseLFJsonRequest:
|
||||
def json_put(self,
|
||||
url: str = None,
|
||||
debug: bool = False,
|
||||
wait_sec: float = None,
|
||||
request_timeout_sec: float = None,
|
||||
max_timeout_sec: float = None,
|
||||
errors_warnings: list = None,
|
||||
die_on_error: bool = False,
|
||||
response_json_list: list = None):
|
||||
response_json_list: list = None) -> Optional[HTTPResponse]:
|
||||
if not url:
|
||||
raise ValueError("json_put requires url")
|
||||
return self.json_post(url=url,
|
||||
debug=debug | self.debug_on,
|
||||
die_on_error=die_on_error | self.die_on_error,
|
||||
response_json_list=response_json_list,
|
||||
return self.json_post(url=url, debug=debug | self.debug_on, wait_sec=wait_sec, max_timeout_sec=max_timeout_sec,
|
||||
die_on_error=die_on_error | self.die_on_error, response_json_list=response_json_list,
|
||||
method_='PUT')
|
||||
|
||||
def json_delete(self,
|
||||
@@ -634,6 +660,17 @@ class BaseLFJsonRequest:
|
||||
request_timeout_sec: float = None,
|
||||
max_timeout_sec: float = None,
|
||||
errors_warnings: list = None):
|
||||
"""
|
||||
Perform a HTTP DELETE call
|
||||
:param url: fully qualified URL to request
|
||||
:param debug: turn on diagnostic info
|
||||
:param die_on_error: call exit if response is nither 100, 200, or 404
|
||||
:param wait_sec: time to pause before making call
|
||||
:param request_timeout_sec: time to override default request timeout
|
||||
:param max_timeout_sec: time after which to stop making more requests
|
||||
:param errors_warnings: provide a list into which API errors and warnings are placed
|
||||
:return: as get_as_json() returns (a native decoding of JSON document: dict, list, str, float or int)
|
||||
"""
|
||||
if wait_sec and (wait_sec > 0):
|
||||
time.sleep(wait_sec)
|
||||
return self.get_as_json(url=url,
|
||||
@@ -649,7 +686,7 @@ class BaseLFJsonRequest:
|
||||
debug: bool = False,
|
||||
die_on_error: bool = False,
|
||||
method_: str = 'GET',
|
||||
connection_timeout_sec: int = None):
|
||||
connection_timeout_sec: int = None) -> Optional[HTTPResponse]:
|
||||
"""
|
||||
Makes a HTTP GET request with specified timeout.
|
||||
:param url: Fully qualified URL to request
|
||||
@@ -680,7 +717,7 @@ class BaseLFJsonRequest:
|
||||
if connection_timeout_sec:
|
||||
myrequest.timeout = connection_timeout_sec
|
||||
|
||||
myresponses = []
|
||||
myresponses: list[HTTPResponse] = []
|
||||
try:
|
||||
myresponses.append(request.urlopen(myrequest))
|
||||
return myresponses[0]
|
||||
@@ -727,7 +764,8 @@ class BaseLFJsonRequest:
|
||||
:param method_: Overrides the HTTP method used. Please do not override.
|
||||
:param errors_warnings: if present, this list gets populated with errors and warnings from the result
|
||||
:param max_timeout_sec: if there is no response, this request can retry every request_timeout_sec
|
||||
:return: get request as Json data
|
||||
:return: get response as a python object decoded from Json data
|
||||
This often is a dict, but it could be any primitive Python type such as str, int, float or list.
|
||||
"""
|
||||
begin_sec = time.time() * 1000
|
||||
responses = []
|
||||
@@ -787,12 +825,15 @@ class BaseLFJsonRequest:
|
||||
self.error_list.clear()
|
||||
attempt_counter = 1
|
||||
while _now_sec() < deadline_sec:
|
||||
time.sleep(wait_sec)
|
||||
if wait_sec:
|
||||
time.sleep(wait_sec)
|
||||
try:
|
||||
json_response = self.get_as_json(url=url,
|
||||
debug=debug,
|
||||
die_on_error=False,
|
||||
request_timeout_sec=request_timeout_sec)
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if debug:
|
||||
self.logger.debug("[%s] json_get: URL[%s]" % (attempt_counter, url))
|
||||
self.logger.debug(pformat(json_response))
|
||||
@@ -982,9 +1023,7 @@ class JsonCommand(BaseLFJsonRequest):
|
||||
raise ValueError("JsonCommand::start_session lacks self.session_instance")
|
||||
|
||||
first_response: HTTPResponse
|
||||
first_response = self.json_post(url="/newsession",
|
||||
debug=True,
|
||||
response_json_list=responses)
|
||||
first_response = self.json_post(url="/newsession", debug=True, response_json_list=responses)
|
||||
if not first_response:
|
||||
self.logger.warning("No session established.")
|
||||
self.logger.debug(pformat(first_response))
|
||||
@@ -1176,10 +1215,11 @@ class BaseSession:
|
||||
@classmethod
|
||||
def end_session(cls,
|
||||
command_obj: JsonCommand = None,
|
||||
session_id_: str = ""):
|
||||
session_id_: str = "",
|
||||
debug: bool = False):
|
||||
responses = []
|
||||
first_response = command_obj.json_post(url="endsession",
|
||||
debug=True,
|
||||
debug=debug,
|
||||
response_json_list=responses,
|
||||
session_id_=session_id_)
|
||||
pprint(first_response)
|
||||
@@ -12352,6 +12392,259 @@ class LFJsonQuery(JsonQuery):
|
||||
|
||||
# Auto generated methods follow:
|
||||
|
||||
"""----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
|
||||
Notes for <ALERTS> type requests
|
||||
|
||||
If you need to call the URL directly,
|
||||
request one of these URLs:
|
||||
/alerts/
|
||||
/alerts/$event_id
|
||||
/alerts/before/$event_id
|
||||
/alerts/between/$start_event_id/$end_event_id
|
||||
/alerts/last/$event_count
|
||||
/alerts/since/$event_id
|
||||
|
||||
When requesting specific column names, they need to be URL encoded:
|
||||
eid, entity+id, event, event+description, id, name, priority, time-stamp,
|
||||
type
|
||||
Example URL: /alerts?fields=eid,entity+id
|
||||
|
||||
Example py-json call (it knows the URL):
|
||||
record = LFJsonGet.get_alerts(eid_list=['1.234', '1.344'],
|
||||
requested_col_names=['entity id'],
|
||||
debug=True)
|
||||
|
||||
The record returned will have these members:
|
||||
{
|
||||
'eid': # Time at which this event was created.This uses the clock on the source
|
||||
# machine.
|
||||
'entity id': # Entity IdentifierExact format depends on the
|
||||
# type.(shelf.resource.port.endpoint.extra)
|
||||
'event': # Event Type
|
||||
'event description': # Text description for this event.
|
||||
'id': # Unique ID for this event.
|
||||
'name': # Name of the entity associated with this event.
|
||||
'priority': # Event priority.
|
||||
'time-stamp': # Time at which this event was created.This uses the clock on the source
|
||||
# machine.
|
||||
'type': # Entity type.
|
||||
}
|
||||
----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----"""
|
||||
|
||||
def get_alerts(self,
|
||||
eid_list: list = None,
|
||||
requested_col_names: list = None,
|
||||
wait_sec: float = 0.01,
|
||||
timeout_sec: float = 5.0,
|
||||
errors_warnings: list = None,
|
||||
debug: bool = False):
|
||||
"""
|
||||
:param eid_list: list of entity IDs to query for
|
||||
:param requested_col_names: list of column names to return
|
||||
:param wait_sec: duration to wait between retries if no response or response is HTTP 404
|
||||
:param timeout_sec: duration in which to keep querying before returning
|
||||
:param errors_warnings: optional list to extend with errors and warnings from response
|
||||
:param debug: print diagnostic info if true
|
||||
:return: dictionary of results
|
||||
"""
|
||||
debug |= self.debug_on
|
||||
url = "/alerts"
|
||||
if (eid_list is None) or (len(eid_list) < 1):
|
||||
raise ValueError("no entity id in request")
|
||||
trimmed_fields = []
|
||||
if isinstance(requested_col_names, str):
|
||||
if not requested_col_names.strip():
|
||||
raise ValueError("column name cannot be blank")
|
||||
trimmed_fields.append(requested_col_names.strip())
|
||||
if isinstance(requested_col_names, list):
|
||||
for field in requested_col_names:
|
||||
if not field.strip():
|
||||
raise ValueError("column names cannot be blank")
|
||||
field = field.strip()
|
||||
if field.find(" ") > -1:
|
||||
raise ValueError("field should be URL encoded: [%s]" % field)
|
||||
trimmed_fields.append(field)
|
||||
url += self.create_port_eid_url(eid_list=eid_list)
|
||||
|
||||
if len(trimmed_fields) > 0:
|
||||
url += "?fields=%s" % (",".join(trimmed_fields))
|
||||
|
||||
response = self.json_get(url=url,
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=timeout_sec,
|
||||
max_timeout_sec=timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if response is None:
|
||||
return None
|
||||
return self.extract_values(response=response,
|
||||
singular_key="alert",
|
||||
plural_key="alerts")
|
||||
#
|
||||
"""
|
||||
Below are 6 methods defined by LFClient URL Responders
|
||||
"""
|
||||
|
||||
def alerts_between(self,
|
||||
start_event_id : int = None,
|
||||
end_event_id : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select alerts between start and end IDs
|
||||
:param start_event_id: alerts id to start at
|
||||
:param end_event_id: alerts id to end at
|
||||
"""
|
||||
response = self.json_get(url="/alerts/alerts_between/{start_event_id}/{end_event_id}".format(start_event_id=start_event_id, end_event_id=end_event_id),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def alerts_last_events(self,
|
||||
event_count : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select last event_count alerts
|
||||
:param event_count: number since end to select
|
||||
"""
|
||||
response = self.json_get(url="/alerts/last/{event_count}".format(event_count=event_count),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def alerts_before(self,
|
||||
event_id : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select first alerts before alert_id
|
||||
:param event_id: id to stop selecting at
|
||||
"""
|
||||
response = self.json_get(url="/alerts/before/{event_id}".format(event_id=event_id),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def events_between(self,
|
||||
start_event_id : int = None,
|
||||
end_event_id : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select events between start and end IDs, inclusive
|
||||
:param start_event_id: start selection at this id
|
||||
:param end_event_id: end selection at this id
|
||||
"""
|
||||
response = self.json_get(url="/events/between/{start_event_id}/{end_event_id}".format(start_event_id=start_event_id, end_event_id=end_event_id),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def events_get_event(self,
|
||||
event_id : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Query an event by id
|
||||
:param event_id: id to select
|
||||
"""
|
||||
response = self.json_get(url="/events/{event_id}".format(event_id=event_id),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def events_last_events(self,
|
||||
event_count : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select last event_count events
|
||||
:param event_count: number since end to select
|
||||
"""
|
||||
response = self.json_get(url="/events/last/{event_count}".format(event_count=event_count),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
"""----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
|
||||
Notes for <ATTENUATOR> type requests
|
||||
|
||||
@@ -13170,6 +13463,170 @@ class LFJsonQuery(JsonQuery):
|
||||
singular_key="alert",
|
||||
plural_key="alerts")
|
||||
#
|
||||
"""
|
||||
Below are 6 methods defined by LFClient URL Responders
|
||||
"""
|
||||
|
||||
def alerts_between(self,
|
||||
start_event_id : int = None,
|
||||
end_event_id : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select alerts between start and end IDs
|
||||
:param start_event_id: alerts id to start at
|
||||
:param end_event_id: alerts id to end at
|
||||
"""
|
||||
response = self.json_get(url="/alerts/alerts_between/{start_event_id}/{end_event_id}".format(start_event_id=start_event_id, end_event_id=end_event_id),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def alerts_last_events(self,
|
||||
event_count : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select last event_count alerts
|
||||
:param event_count: number since end to select
|
||||
"""
|
||||
response = self.json_get(url="/alerts/last/{event_count}".format(event_count=event_count),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def alerts_before(self,
|
||||
event_id : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select first alerts before alert_id
|
||||
:param event_id: id to stop selecting at
|
||||
"""
|
||||
response = self.json_get(url="/alerts/before/{event_id}".format(event_id=event_id),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def events_between(self,
|
||||
start_event_id : int = None,
|
||||
end_event_id : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select events between start and end IDs, inclusive
|
||||
:param start_event_id: start selection at this id
|
||||
:param end_event_id: end selection at this id
|
||||
"""
|
||||
response = self.json_get(url="/events/between/{start_event_id}/{end_event_id}".format(start_event_id=start_event_id, end_event_id=end_event_id),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def events_get_event(self,
|
||||
event_id : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Query an event by id
|
||||
:param event_id: id to select
|
||||
"""
|
||||
response = self.json_get(url="/events/{event_id}".format(event_id=event_id),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
def events_last_events(self,
|
||||
event_count : int = None,
|
||||
debug : bool = False,
|
||||
wait_sec : float = None,
|
||||
request_timeout_sec : float = None,
|
||||
max_timeout_sec : float = None,
|
||||
errors_warnings : list = None):
|
||||
"""
|
||||
Select last event_count events
|
||||
:param event_count: number since end to select
|
||||
"""
|
||||
response = self.json_get(url="/events/last/{event_count}".format(event_count=event_count),
|
||||
debug=debug,
|
||||
wait_sec=wait_sec,
|
||||
request_timeout_sec=request_timeout_sec,
|
||||
max_timeout_sec=max_timeout_sec,
|
||||
errors_warnings=errors_warnings)
|
||||
if not response:
|
||||
return None
|
||||
errors_warnings.extend(response['errors'])
|
||||
errors_warnings.extend(response['warnings'])
|
||||
return self.extract_values(response=response,
|
||||
singular_key="",
|
||||
plural_key="")
|
||||
#
|
||||
|
||||
"""----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
|
||||
Notes for <FILEIO> type requests
|
||||
|
||||
|
||||
Reference in New Issue
Block a user