mirror of
https://github.com/Telecominfraproject/openafc_final.git
synced 2025-11-04 12:07:51 +00:00
2288 lines
94 KiB
Python
Executable File
2288 lines
94 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
""" AFC Load Test Tool """
|
|
#
|
|
# Copyright (C) 2023 Broadcom. All rights reserved. The term "Broadcom"
|
|
# refers solely to the Broadcom Inc. corporate affiliate that owns
|
|
# the software below. This work is licensed under the OpenAFC Project License,
|
|
# a copy of which is included with this software program
|
|
#
|
|
|
|
# pylint: disable=too-many-lines, too-many-arguments, invalid-name
|
|
# pylint: disable=consider-using-f-string, wrong-import-order, too-many-locals
|
|
# pylint: disable=too-few-public-methods, logging-fstring-interpolation
|
|
# pylint: disable=too-many-instance-attributes, broad-exception-caught
|
|
# pylint: disable=too-many-branches, too-many-nested-blocks
|
|
# pylint: disable=too-many-statements, eval-used
|
|
|
|
import argparse
|
|
from collections.abc import Iterable, Iterator
|
|
import copy
|
|
import csv
|
|
import datetime
|
|
import hashlib
|
|
import http
|
|
import inspect
|
|
import json
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import random
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
pass
|
|
import re
|
|
import signal
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from typing import Any, Callable, cast, List, Dict, NamedTuple, Optional, \
|
|
Tuple, Union
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
has_yaml = True
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
has_yaml = False
|
|
|
|
|
|
def dp(*args, **kwargs) -> None: # pylint: disable=invalid-name
|
|
"""Print debug message
|
|
|
|
Arguments:
|
|
args -- Format and positional arguments. If latter present - formatted
|
|
with %
|
|
kwargs -- Keyword arguments. If present formatted with format()
|
|
"""
|
|
msg = args[0] if args else ""
|
|
if len(args) > 1:
|
|
msg = msg % args[1:]
|
|
if args and kwargs:
|
|
msg = msg.format(**kwargs)
|
|
cur_frame = inspect.currentframe()
|
|
assert (cur_frame is not None) and (cur_frame.f_back is not None)
|
|
frameinfo = inspect.getframeinfo(cur_frame.f_back)
|
|
print(f"DP {frameinfo.function}()@{frameinfo.lineno}: {msg}")
|
|
|
|
|
|
def error(msg: str) -> None:
|
|
""" Prints given msg as error message and exit abnormally """
|
|
logging.error(msg)
|
|
sys.exit(1)
|
|
|
|
|
|
def error_if(cond: Any, msg: str) -> None:
|
|
""" If condition evaluates to true prints given msg as error message and
|
|
exits abnormally """
|
|
if cond:
|
|
error(msg)
|
|
|
|
|
|
def exc_if(predicate: Any, exc: Exception) -> None:
|
|
""" Raise given exception if predicate evaluates to true """
|
|
if predicate:
|
|
raise exc
|
|
|
|
|
|
def expandpath(path: Optional[str]) -> Optional[str]:
|
|
""" Expand ~ and {} in given path. For None/empty returns None/empty """
|
|
return os.path.expandvars(os.path.expanduser(path)) if path else path
|
|
|
|
|
|
def unused_argument(arg: Any) -> None: # pylint: disable=unused-argument
|
|
""" Sink for all unused arguments """
|
|
return
|
|
|
|
|
|
def yaml_load(yaml_s: str) -> Any:
|
|
""" YAML/JSON dictionary for given YAML/JSON content """
|
|
kwargs: Dict[str, Any] = {}
|
|
if has_yaml:
|
|
if hasattr(yaml, "CLoader"):
|
|
kwargs["Loader"] = yaml.CLoader
|
|
elif hasattr(yaml, "FullLoader"):
|
|
kwargs["Loader"] = yaml.FullLoader
|
|
return yaml.load(yaml_s, **kwargs)
|
|
return json.loads(yaml_s)
|
|
|
|
|
|
class Config(Iterable):
|
|
""" Node of config structure that provides dot-based access
|
|
|
|
Private attributes:
|
|
_data -- Data that corresponds to node (list or string-indexed dictionary)
|
|
_path -- Path to node ("foo.bar.42.baz") for use in error messages
|
|
"""
|
|
|
|
JSON_EXT = ".json"
|
|
YAML_EXT = ".yaml"
|
|
|
|
# Default config file name
|
|
DEFAULT_CONFIG = \
|
|
os.path.splitext(__file__)[0] + (YAML_EXT if has_yaml else JSON_EXT)
|
|
|
|
def __init__(
|
|
self, argv: Optional[List[str]] = None,
|
|
arg_name: Optional[str] = None, config_env: Optional[str] = None,
|
|
data: Optional[Union[List[Any], Dict[str, Any]]] = None,
|
|
path: Optional[str] = None) -> None:
|
|
""" Constructor (for both root and nonroot nodes)
|
|
|
|
Arguments:
|
|
argv -- Argument for root node: Command line arguments
|
|
arg_name -- Argument for root node - name of command line parameter
|
|
with config
|
|
env_config -- Optional argument for root node - name of environment
|
|
variable for main config
|
|
data -- Argument for nonroot node: data that corresponds to node
|
|
(list or string-indexed dictionary)
|
|
path -- Argument for nonroot node: path to node
|
|
("foo.bar[42].baz") for use in error messages
|
|
"""
|
|
self._data: Union[List[Any], Dict[str, Any]]
|
|
self._path: str
|
|
# Nonroot node
|
|
if data is not None:
|
|
assert path is not None
|
|
assert (argv or arg_name or config_env) is None
|
|
self._path = path
|
|
self._data = data
|
|
return
|
|
|
|
# Root node
|
|
assert path is None
|
|
assert argv is not None
|
|
assert arg_name is not None
|
|
self._path = ""
|
|
argument_parser = argparse.ArgumentParser()
|
|
argument_parser.add_argument(
|
|
"--" + arg_name, action="append", default=[])
|
|
configs = getattr(argument_parser.parse_known_args(argv)[0], arg_name)
|
|
# Finding root config
|
|
if (not configs) or any(c.startswith("+") for c in configs):
|
|
for config_file in \
|
|
([expandpath(os.environ.get(config_env))]
|
|
if config_env else []) + [self.DEFAULT_CONFIG]:
|
|
assert config_file is not None
|
|
if os.path.isfile(config_file):
|
|
configs = [config_file] + configs
|
|
break
|
|
else:
|
|
error("This script's config file not found")
|
|
self._data = {}
|
|
# Merging all configs
|
|
for config in configs:
|
|
if config.startswith("+"):
|
|
config = config[1:]
|
|
error_if(not os.path.isfile(config),
|
|
f"Config file '{config}' not found")
|
|
with open(config, encoding="utf-8") as f:
|
|
yaml_s = f.read()
|
|
try:
|
|
config_yaml_dict = yaml_load(yaml_s)
|
|
except (yaml.YAMLError if has_yaml else json.JSONDecodeError) \
|
|
as ex:
|
|
error(f"Error reading '{config}': {repr(ex)}")
|
|
error_if(not isinstance(config_yaml_dict, dict),
|
|
f"Content of config file '{config}' is not a dictionary")
|
|
assert isinstance(config_yaml_dict, dict)
|
|
self._data = {**self._data, **config_yaml_dict}
|
|
|
|
def __getattr__(self, attr: str) -> Any:
|
|
""" Returns given attribute of node """
|
|
exc_if(not (isinstance(self._data, dict) and (attr in self._data)),
|
|
AttributeError(f"Config item '{self._path}' does not have "
|
|
f"attribute '{attr}'"))
|
|
assert isinstance(self._data, dict)
|
|
return self._subitem(value=self._data[attr], attr=attr)
|
|
|
|
def __getitem__(self, key: Union[int, str]) -> Any:
|
|
""" Access by index - integer or string """
|
|
if isinstance(self._data, list):
|
|
exc_if(not isinstance(key, (int, slice)),
|
|
IndexError(f"Config item '{self._path}' is a list, it "
|
|
f"can't be subscribed with '{key}'"))
|
|
assert isinstance(key, (int, slice))
|
|
exc_if(isinstance(key, int) and (key >= len(self._data)),
|
|
IndexError(f"Index {key} is out of range for config item "
|
|
f"'{self._path}'"))
|
|
else:
|
|
assert isinstance(self._data, dict)
|
|
exc_if(not isinstance(key, str),
|
|
KeyError(f"Config item '{self._path}' is a dictionary, it "
|
|
f"can't be subscribed with '{key}'"))
|
|
assert isinstance(key, str)
|
|
exc_if(key not in self._data,
|
|
KeyError(f"Config item '{self._path}' does not have "
|
|
f"attribute '{key}'"))
|
|
return self._subitem(value=self._data[key], attr=key)
|
|
|
|
def get(self, attr: str, default: Any = None) -> Any:
|
|
""" Keys of dictionary-type node """
|
|
exc_if(not isinstance(self._data, dict),
|
|
AttributeError(f"Config item '{self._path}' is not a "
|
|
f"dictionary, can't be queried for '{attr}'"))
|
|
assert isinstance(self._data, dict)
|
|
exc_if(not isinstance(attr, str),
|
|
AttributeError(f"Non-string attribute '{attr}' can't be looked "
|
|
f"up in config item '{self._path}'"))
|
|
return self._subitem(value=self._data.get(attr, default), attr=attr)
|
|
|
|
def __len__(self) -> int:
|
|
""" Number of subitems in node """
|
|
return len(self._data)
|
|
|
|
def keys(self) -> Iterable:
|
|
""" Keys of dictionary-type node """
|
|
exc_if(not isinstance(self._data, dict),
|
|
AttributeError(f"Config item '{self._path}' is not a "
|
|
f"dictionary, it doesn't have keys"))
|
|
assert isinstance(self._data, dict)
|
|
return self._data.keys()
|
|
|
|
def values(self) -> Iterable:
|
|
""" Values of dictionary-type node """
|
|
exc_if(not isinstance(self._data, dict),
|
|
AttributeError(f"Config item '{self._path}' is not a "
|
|
f"dictionary, it doesn't have values"))
|
|
assert isinstance(self._data, dict)
|
|
for key, value in self._data.items():
|
|
yield self._subitem(value=value, attr=key)
|
|
|
|
def items(self) -> Iterable:
|
|
""" Items of dictionary-type node """
|
|
exc_if(not isinstance(self._data, dict),
|
|
AttributeError(f"Config item '{self._path}' is not a "
|
|
f"dictionary, it doesn't have items"))
|
|
assert isinstance(self._data, dict)
|
|
for key, value in self._data.items():
|
|
yield (key, self._subitem(value=value, attr=key))
|
|
|
|
def data(self) -> Union[List[Any], Dict[str, Any]]:
|
|
""" Returns underlying data structure """
|
|
return self._data
|
|
|
|
def __iter__(self) -> Iterator:
|
|
""" Iterator over node subitems """
|
|
if isinstance(self._data, list):
|
|
for idx, value in enumerate(self._data):
|
|
yield self._subitem(value=value, attr=idx)
|
|
else:
|
|
assert isinstance(self._data, dict)
|
|
for key in self._data.keys():
|
|
yield key
|
|
|
|
def __in__(self, attr: Any) -> bool:
|
|
""" True if dictionary node contains given attribute """
|
|
exc_if(not isinstance(self._data, dict),
|
|
AttributeError(f"Config item '{self._path}' is not a "
|
|
f"dictionary, 'in' check can't be performed"))
|
|
exc_if(not isinstance(attr, str),
|
|
AttributeError(f"Non-string attribute '{attr}' can't be looked "
|
|
f"up in config item '{self._path}'"))
|
|
return attr in self._data
|
|
|
|
def _subitem(self, value: Any, attr: Union[int, str]) -> Any:
|
|
""" Returns given subitem as attribute of given name """
|
|
if not isinstance(value, (list, dict)):
|
|
return value
|
|
return \
|
|
self.__class__(
|
|
data=value,
|
|
path=f"{self._path}{'.' if self._path else ''}{attr}"
|
|
if isinstance(attr, str) else f"{self._path}[{attr}]")
|
|
|
|
|
|
def run_docker(args: List[str]) -> str:
|
|
""" Runs docker with given parameters, returns stdout content """
|
|
try:
|
|
return \
|
|
subprocess.check_output(["docker"] + args, universal_newlines=True,
|
|
encoding="utf-8")
|
|
except subprocess.CalledProcessError as ex:
|
|
error(f"Failed to run 'docker {' '.join(args)}': {repr(ex)}. Please "
|
|
"specify all hosts explicitly")
|
|
return "" # Unreachable code to make pylint happy
|
|
|
|
|
|
class ServiceDiscovery:
|
|
""" Container and IP discovery for services
|
|
|
|
Private attributes:
|
|
_compose_project -- Compose project name
|
|
_containers -- Dictionary of _ContainerInfo object, indexed by service
|
|
name. None before first access
|
|
"""
|
|
class _ContainerInfo:
|
|
""" Information about single service
|
|
|
|
Public attributes:
|
|
name -- Container name
|
|
ip -- Container IP (if known) or None
|
|
"""
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
self.ip: Optional[str] = None
|
|
|
|
def __init__(self, compose_project: str) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
compose_project -- Docker compose project name
|
|
"""
|
|
self._compose_project = compose_project
|
|
self._containers: \
|
|
Optional[Dict[str, "ServiceDiscovery._ContainerInfo"]] = None
|
|
|
|
def get_container(self, service: str) -> str:
|
|
""" Returns container name for given service name """
|
|
ci = self._get_cont_info(service)
|
|
return ci.name
|
|
|
|
def get_ip(self, service: str) -> str:
|
|
""" Returns container IP for given service name """
|
|
ci = self._get_cont_info(service)
|
|
if ci.ip:
|
|
return ci.ip
|
|
try:
|
|
inspect_dict = json.loads(run_docker(["inspect", ci.name]))
|
|
except json.JSONDecodeError as ex:
|
|
error(f"Error parsing 'docker inspect {ci.name}' output: "
|
|
f"{repr(ex)}")
|
|
try:
|
|
for net_name, net_info in \
|
|
inspect_dict[0]["NetworkSettings"]["Networks"].items():
|
|
if net_name.endswith("_default"):
|
|
ci.ip = net_info["IPAddress"]
|
|
break
|
|
else:
|
|
error(f"Default network not found in container '{ci.name}'")
|
|
except (AttributeError, LookupError) as ex:
|
|
error(f"Unsupported structure of 'docker inspect {ci.name}' "
|
|
f"output: {repr(ex)}")
|
|
assert ci.ip is not None
|
|
return ci.ip
|
|
|
|
def _get_cont_info(self, service: str) \
|
|
-> "ServiceDiscovery._ContainerInfo":
|
|
""" Returns _ContainerInfo object for given service name """
|
|
if self._containers is None:
|
|
self._containers = {}
|
|
name_offset: Optional[int] = None
|
|
for line in run_docker(["ps"]).splitlines():
|
|
if name_offset is None:
|
|
name_offset = line.find("NAMES")
|
|
error_if(name_offset < 0,
|
|
"Unsupported structure of 'docker ps' output")
|
|
continue
|
|
assert name_offset is not None
|
|
for names in re.split(r",\s*", line[name_offset:]):
|
|
m = re.search(r"(%s_(.+)_\d+)" %
|
|
re.escape(self._compose_project),
|
|
names)
|
|
if m:
|
|
self._containers[m.group(2)] = \
|
|
self._ContainerInfo(name=m.group(1))
|
|
error_if(not self._containers,
|
|
f"No running containers found for compose project "
|
|
f"'{self._compose_project}'")
|
|
ret = self._containers.get(service)
|
|
error_if(ret is None,
|
|
f"Service name '{service}' not found among containers of "
|
|
f"compose project '{self._compose_project}'")
|
|
assert ret is not None
|
|
return ret
|
|
|
|
|
|
def get_url(base_url: str, param_host: Optional[str],
|
|
service_discovery: Optional[ServiceDiscovery]) -> str:
|
|
""" Construct URL from base URL in config and command line host or compose
|
|
project name
|
|
|
|
Arguments:
|
|
base_url -- Base URL from config
|
|
param_host -- Optional host name from command line parameter
|
|
service_discovery -- Optional ServiceDiscovery object. None means that
|
|
service discovery is not possible (--comp_proj was
|
|
not specified) and hostname must be provided
|
|
explicitly
|
|
Returns actionable URL with host either specified or retrieved from compose
|
|
container inspection and tail from base URL
|
|
"""
|
|
base_parts = urllib.parse.urlparse(base_url)
|
|
if param_host:
|
|
if "://" not in param_host:
|
|
param_host = f"{base_parts.scheme}://{param_host}"
|
|
param_parts = urllib.parse.urlparse(param_host)
|
|
replacements = {field: getattr(param_parts, field)
|
|
for field in base_parts._fields
|
|
if getattr(param_parts, field)}
|
|
return urllib.parse.urlunparse(base_parts._replace(**replacements))
|
|
if service_discovery is None:
|
|
return base_url
|
|
service = base_parts.netloc.split(":")[0]
|
|
return \
|
|
urllib.parse.urlunparse(
|
|
base_parts._replace(
|
|
netloc=service_discovery.get_ip(service) +
|
|
base_parts.netloc[len(service):]))
|
|
|
|
|
|
def ratdb(cfg: Config, command: str, service_discovery: ServiceDiscovery) \
|
|
-> Union[int, List[Dict[str, Any]]]:
|
|
""" Executes SQL statement in ratdb
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
command -- SQL command to execute
|
|
service_discovery -- ServiceDiscovery object
|
|
Returns number of affected records for INSERT/UPDATE/DELETE, list of row
|
|
dictionaries on SELECT
|
|
"""
|
|
cont = service_discovery.get_container(cfg.ratdb.service)
|
|
result = run_docker(["exec", cont, "psql", "-U", cfg.ratdb.username,
|
|
"-d", cfg.ratdb.dbname, "--csv", "-c", command])
|
|
if command.upper().startswith("SELECT "):
|
|
try:
|
|
return list(csv.DictReader(result.splitlines()))
|
|
except csv.Error as ex:
|
|
error(f"CSV parse error on output of '{command}': {repr(ex)}")
|
|
for begin, pattern in [("INSERT ", r"INSERT\s+\d+\s+(\d+)"),
|
|
("UPDATE ", r"UPDATE\s+(\d+)"),
|
|
("DELETE ", r"DELETE\s+(\d+)")]:
|
|
if command.upper().startswith(begin):
|
|
m = re.search(pattern, result)
|
|
error_if(not m,
|
|
f"Can't fetch result of '{command}'")
|
|
assert m is not None
|
|
return int(m.group(1))
|
|
error(f"Unknown SQL command '{command}'")
|
|
return 0 # Unreachable code, appeasing pylint
|
|
|
|
|
|
def path_get(obj: Any, path: List[Union[str, int]]) -> Any:
|
|
""" Read value from multilevel dictionary/list structure
|
|
|
|
Arguments:
|
|
obj -- Multilevel dictionary/list structure
|
|
path -- Sequence of indices
|
|
Returns value at the end of sequence
|
|
"""
|
|
for idx in path:
|
|
obj = obj[idx]
|
|
return obj
|
|
|
|
|
|
def path_set(obj: Any, path: List[Union[str, int]], value: Any) -> None:
|
|
""" Write value to multilevel dictionary/list structure
|
|
|
|
Arguments:
|
|
obj -- Multilevel dictionary/list structure
|
|
path -- Sequence of indices
|
|
value -- Value to insert to point at the end of sequence
|
|
"""
|
|
for idx in path[: -1]:
|
|
obj = obj[idx]
|
|
obj[path[-1]] = value
|
|
|
|
|
|
def path_del(obj: Any, path: List[Union[str, int]]) -> Any:
|
|
""" Delete value from multilevel dictionary/list structure
|
|
|
|
Arguments:
|
|
obj -- Multilevel dictionary/list structure
|
|
path -- Sequence of indices, value at end of it is deleted
|
|
"""
|
|
ret = obj
|
|
for idx in path[: -1]:
|
|
obj = obj[idx]
|
|
if isinstance(path[-1], int):
|
|
obj.pop(path[-1])
|
|
else:
|
|
del obj[path[-1]]
|
|
return ret
|
|
|
|
|
|
class AfcReqRespGenerator:
|
|
""" Generator of AFC Request/Response messages for given request indices
|
|
|
|
Private attributes
|
|
_paths -- Copy of cfg.paths
|
|
_req_msg_pattern -- AFC Request message pattern as JSON dictionary
|
|
_resp_msg_pattern -- AFC Response message pattern as JSON dictionary
|
|
_grid_size -- Copy of cfg.region.grid_size
|
|
_min_lat -- Copy of cfg.region.min_lat
|
|
_max_lat -- Copy of cfg.region.max_lat
|
|
_min_lon -- Copy of cfg.region.min_lon
|
|
_max_lon -- Copy of cfg.region.max_lon
|
|
_default_height -- AP height to use when there is no randomization
|
|
_channels_20mhz -- Copy of cfg._channels_20mhz
|
|
_randomize -- True to choose AP positions randomly (uniformly or
|
|
according to population density). False to use request
|
|
index (to make caching possible)
|
|
_random_height -- Formula for random height in string form. Evaluated
|
|
with 'r' local value, randomly distributed in [0, 1]
|
|
_population_db -- Population density database name. None for uniform
|
|
_db_conn -- None or SQLite3 connection
|
|
_db_cur -- None or SQLite3 cursor
|
|
|
|
"""
|
|
|
|
def __init__(self, cfg: Config, randomize: bool,
|
|
population_db: Optional[str],
|
|
req_msg_pattern: Optional[Dict[str, Any]]) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
randomize -- True to choose AP positions randomly (uniformly or
|
|
according to population density). False to use
|
|
request index (to make caching possible)
|
|
population_db -- Population density database name. None for uniform
|
|
req_msg_pattern -- Optional Request message pattern to use instead of
|
|
default
|
|
"""
|
|
self._paths = cfg.paths
|
|
self._req_msg_pattern = \
|
|
req_msg_pattern or json.loads(cfg.req_msg_pattern)
|
|
self._resp_msg_pattern = json.loads(cfg.resp_msg_pattern)
|
|
path_set(path_get(self._req_msg_pattern, self._paths.reqs_in_msg)[0],
|
|
self._paths.ruleset_in_req, cfg.region.rulesest_id)
|
|
path_set(path_get(self._req_msg_pattern, self._paths.reqs_in_msg)[0],
|
|
self._paths.cert_in_req, cfg.region.cert_id)
|
|
path_set(path_get(self._resp_msg_pattern, self._paths.resps_in_msg)[0],
|
|
self._paths.ruleset_in_resp, cfg.region.rulesest_id)
|
|
self._grid_size = cfg.region.grid_size
|
|
self._min_lat = cfg.region.min_lat
|
|
self._max_lat = cfg.region.max_lat
|
|
self._min_lon = cfg.region.min_lon
|
|
self._max_lon = cfg.region.max_lon
|
|
self._default_height = \
|
|
path_get(
|
|
path_get(self._req_msg_pattern, self._paths.reqs_in_msg)[0],
|
|
self._paths.height_in_req)
|
|
self._channels_20mhz = cfg.channels_20mhz
|
|
self._randomize = randomize
|
|
self._random_height = cfg.region.random_height
|
|
self._population_db = population_db
|
|
self._db_conn: Optional[sqlite3.Connection] = None
|
|
self._db_cur: Optional[sqlite3.Cursor] = None
|
|
|
|
def request_msg(self, req_indices=Union[int, List[int]]) -> Dict[str, Any]:
|
|
""" Generate AFC Request message for given request index range
|
|
|
|
Arguments:
|
|
req_indices -- Request index or list of request indices
|
|
Returns Request message
|
|
"""
|
|
if isinstance(req_indices, int):
|
|
req_indices = [req_indices]
|
|
assert not isinstance(req_indices, int)
|
|
|
|
# Whole message to be
|
|
msg = copy.deepcopy(self._req_msg_pattern)
|
|
pattern_req = path_get(msg, self._paths.reqs_in_msg)[0]
|
|
# List of requests
|
|
reqs = []
|
|
for idx_in_msg, req_idx in enumerate(req_indices):
|
|
# Individual request
|
|
req = copy.deepcopy(pattern_req) if len(req_indices) > 1 \
|
|
else pattern_req
|
|
|
|
path_set(req, self._paths.id_in_req, self._req_id(req_idx,
|
|
idx_in_msg))
|
|
path_set(req, self._paths.serial_in_req, self._serial(req_idx))
|
|
lat, lon, height = self._get_position(req_idx=req_idx)
|
|
path_set(req, self._paths.lat_in_req, lat)
|
|
path_set(req, self._paths.lon_in_req, lon)
|
|
path_set(req, self._paths.height_in_req, height)
|
|
reqs.append(req)
|
|
path_set(msg, self._paths.reqs_in_msg, reqs)
|
|
return msg
|
|
|
|
def response_msg(self, req_idx: int) -> Dict[str, Any]:
|
|
""" Generate (fake) AFC Response message (for rcache - hence
|
|
single-item)
|
|
|
|
Arguments:
|
|
req_idx -- Request index
|
|
Returns Fake AFC Response message
|
|
"""
|
|
# AFC Response message to be
|
|
msg = copy.deepcopy(self._resp_msg_pattern)
|
|
# Response inside it
|
|
resp = path_get(msg, self._paths.resps_in_msg)[0]
|
|
path_set(resp, self._paths.id_in_resp, self._req_id(req_idx, 0))
|
|
# To add uniqueness to message - set 20MHz channels according to bits
|
|
# in request index binary representation
|
|
channels = []
|
|
powers = []
|
|
chan_idx = 0
|
|
while req_idx:
|
|
if req_idx & 1:
|
|
channels.append(self._channels_20mhz[chan_idx])
|
|
powers.append(30.)
|
|
chan_idx += 1
|
|
req_idx //= 2
|
|
path_set(resp, self._paths.var_chans_in_resp, channels)
|
|
path_set(resp, self._paths.var_pwrs_in_resp, powers)
|
|
return msg
|
|
|
|
def _req_id(self, req_idx: int, idx_in_msg: int) -> str:
|
|
""" Request ID in message for given request index
|
|
|
|
Arguments:
|
|
req_idx -- Request index
|
|
idx_in_msg -- Index of request in AFC message
|
|
Returns Request ID to use
|
|
"""
|
|
unused_argument(idx_in_msg)
|
|
return str(req_idx)
|
|
|
|
def _serial(self, req_idx: int) -> str:
|
|
""" AP Serial Number for given request index """
|
|
return f"AFC_LOAD_{req_idx:08}"
|
|
|
|
def _get_position(self, req_idx: int) -> Tuple[float, float, float]:
|
|
""" Returns (lat_deg, lon_deg, height_m) position for given request
|
|
index """
|
|
height = \
|
|
eval(self._random_height, None, {"r": random.uniform(0, 1)}) \
|
|
if self._randomize else self._default_height
|
|
if self._population_db is None:
|
|
if self._randomize:
|
|
return (random.uniform(self._min_lat, self._max_lat),
|
|
random.uniform(self._min_lon, self._max_lon),
|
|
height)
|
|
return (self._min_lat +
|
|
(req_idx // self._grid_size) *
|
|
(self._max_lat - self._min_lat) / self._grid_size,
|
|
(req_idx % self._grid_size) *
|
|
(self._max_lon - self._min_lon) / self._grid_size,
|
|
height)
|
|
if self._db_conn is None:
|
|
self._db_conn = \
|
|
sqlite3.connect(f"file:{self._population_db}?mode=ro",
|
|
uri=True)
|
|
self._db_cur = self._db_conn.cursor()
|
|
assert self._db_cur is not None
|
|
cumulative_density = random.uniform(0, 1) if self._randomize \
|
|
else req_idx / (self._grid_size * self._grid_size)
|
|
rows = \
|
|
self._db_cur.execute(
|
|
f"SELECT min_lat, max_lat, min_lon, max_lon "
|
|
f"FROM population_density "
|
|
f"WHERE cumulative_density >= {cumulative_density} "
|
|
f"ORDER BY cumulative_density "
|
|
f"LIMIT 1")
|
|
min_lat, max_lat, min_lon, max_lon = rows.fetchall()[0]
|
|
if self._randomize:
|
|
return (random.uniform(min_lat, max_lat),
|
|
random.uniform(min_lon, max_lon), height)
|
|
return ((min_lat + max_lat) / 2, (min_lon + max_lon) / 2, height)
|
|
|
|
|
|
class RestDataHandlerBase:
|
|
""" Base class for generate/process REST API request/response data payloads
|
|
"""
|
|
|
|
def __init__(self, cfg: Config, randomize: bool = False,
|
|
population_db: Optional[str] = None,
|
|
req_msg_pattern: Optional[Dict[str, Any]] = None) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
randomize -- True to choose AP positions randomly (uniformly or
|
|
according to population density). False to use
|
|
request index (to make caching possible)
|
|
population_db -- Population density database name. None for uniform
|
|
req_msg_pattern -- Optional Request message pattern to use instead of
|
|
default
|
|
"""
|
|
self.cfg = cfg
|
|
self.afc_req_resp_gen = \
|
|
AfcReqRespGenerator(
|
|
cfg=cfg, randomize=randomize, population_db=population_db,
|
|
req_msg_pattern=req_msg_pattern)
|
|
|
|
def make_req_data(self, req_indices: List[int]) -> bytes:
|
|
""" Abstract method that generates REST API POST data payload
|
|
|
|
Arguments:
|
|
req_indices -- List of request indices to generate payload for
|
|
Returns POST payload as byte string
|
|
"""
|
|
unused_argument(req_indices)
|
|
raise \
|
|
NotImplementedError(
|
|
f"{self.__class__}.make_req_data() must be implemented")
|
|
|
|
def make_error_map(self, result_data: Optional[bytes]) -> Dict[int, str]:
|
|
""" Virtual method for error map generation
|
|
|
|
Arguments:
|
|
result_data -- Response in form of optional bytes string
|
|
Returns error map - dictionary of error messages, indexed by request
|
|
indices. This default implementation returns empty dictionary
|
|
"""
|
|
unused_argument(result_data)
|
|
return {}
|
|
|
|
def dry_result_data(self, batch: Optional[int]) -> bytes:
|
|
""" Virtual method returning bytes string to pass to make_error_map on
|
|
dry run. This default implementation returns empty string """
|
|
unused_argument(batch)
|
|
return b""
|
|
|
|
|
|
class PreloadRestDataHandler(RestDataHandlerBase):
|
|
""" REST API data handler for 'preload' operation - i.e. Rcache update
|
|
messages (no response)
|
|
|
|
Private attributes:
|
|
_hash_base -- MD5 hash computed over AFC Config, awaiting AFC Request tail
|
|
"""
|
|
|
|
def __init__(self, cfg: Config, afc_config: Dict[str, Any],
|
|
req_msg_pattern: Optional[Dict[str, Any]] = None) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
afc_config -- AFC Config that will be used in request hash
|
|
computation
|
|
req_msg_pattern -- Optional Request message pattern to use instead of
|
|
default
|
|
"""
|
|
self._hash_base = hashlib.md5()
|
|
self._hash_base.update(json.dumps(afc_config,
|
|
sort_keys=True).encode("utf-8"))
|
|
super().__init__(cfg=cfg, req_msg_pattern=req_msg_pattern)
|
|
|
|
def make_req_data(self, req_indices: List[int]) -> bytes:
|
|
""" Generates REST API POST payload (RcacheUpdateReq - see
|
|
rcache_models.py, not included here)
|
|
|
|
Arguments:
|
|
req_indices -- List of request indices to generate payload for
|
|
Returns POST payload as byte string
|
|
"""
|
|
rrks = []
|
|
for req_idx in req_indices:
|
|
req_msg = self.afc_req_resp_gen.request_msg(req_indices=[req_idx])
|
|
resp_msg = self.afc_req_resp_gen.response_msg(req_idx=req_idx)
|
|
hash_req = \
|
|
copy.deepcopy(path_get(req_msg, self.cfg.paths.reqs_in_msg)[0])
|
|
path_del(hash_req, self.cfg.paths.id_in_req)
|
|
h = self._hash_base.copy()
|
|
h.update(json.dumps(hash_req, sort_keys=True).encode("utf-8"))
|
|
rrks.append({"afc_req": json.dumps(req_msg),
|
|
"afc_resp": json.dumps(resp_msg),
|
|
"req_cfg_digest": h.hexdigest()})
|
|
return json.dumps({"req_resp_keys": rrks}).encode("utf-8")
|
|
|
|
|
|
class LoadRestDataHandler(RestDataHandlerBase):
|
|
""" REST API data handler for 'load' operation - i.e. AFC Request/Response
|
|
messages """
|
|
|
|
def __init__(self, cfg: Config, randomize: bool,
|
|
population_db: Optional[str],
|
|
req_msg_pattern: Optional[Dict[str, Any]] = None) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
randomize -- True to choose AP positions randomly (uniformly or
|
|
according to population density). False to use
|
|
request index (to make caching possible)
|
|
population_db -- Population density database name. None for uniform
|
|
req_msg_pattern -- Optional Request message pattern to use instead of
|
|
default
|
|
"""
|
|
super().__init__(cfg=cfg, randomize=randomize,
|
|
population_db=population_db,
|
|
req_msg_pattern=req_msg_pattern)
|
|
|
|
def make_req_data(self, req_indices: List[int]) -> bytes:
|
|
""" Generates REST API POST payload (AFC Request message)
|
|
|
|
Arguments:
|
|
req_indices -- Request indices to generate payload for
|
|
Returns POST payload as byte string
|
|
"""
|
|
return \
|
|
json.dumps(
|
|
self.afc_req_resp_gen.request_msg(req_indices=req_indices)).\
|
|
encode("utf-8")
|
|
|
|
def make_error_map(self, result_data: Optional[bytes]) -> Dict[int, str]:
|
|
""" Generate error map for given AFC Response message
|
|
|
|
Arguments:
|
|
result_data -- AFC Response as byte string
|
|
Returns error map - dictionary of error messages, indexed by request
|
|
indices
|
|
"""
|
|
paths = self.cfg.paths
|
|
ret: Dict[int, str] = {}
|
|
for resp in path_get(json.loads(result_data or b""),
|
|
paths.resps_in_msg):
|
|
if path_get(resp, paths.code_in_resp):
|
|
ret[int(path_get(resp, paths.id_in_resp))] = \
|
|
str(path_get(resp, paths.response_in_resp))
|
|
return ret
|
|
|
|
def dry_result_data(self, batch: Optional[int]) -> bytes:
|
|
""" Returns byte string, containing AFC Response to parse on dry run
|
|
"""
|
|
assert batch is not None
|
|
resp_msg = json.loads(self.cfg.resp_msg_pattern)
|
|
path_set(resp_msg, self.cfg.paths.resps_in_msg,
|
|
path_get(resp_msg, self.cfg.paths.resps_in_msg) * batch)
|
|
return json.dumps(resp_msg).encode("utf-8")
|
|
|
|
|
|
# POST Worker request data and supplementary information
|
|
PostWorkerReqInfo = \
|
|
NamedTuple("PostWorkerReqInfo",
|
|
[
|
|
# Request indices, contained in REST API request data
|
|
("req_indices", List[int]),
|
|
# REST API Request data
|
|
("req_data", bytes)])
|
|
|
|
|
|
# GET Worker request data and supplementary information
|
|
GetWorkerReqInfo = \
|
|
NamedTuple("GetWorkerReqInfo",
|
|
[
|
|
# Number of GET requests to send
|
|
("num_gets", int)])
|
|
|
|
|
|
# REST API request results
|
|
class WorkerResultInfo(NamedTuple):
|
|
""" Data, returned by REST API workers in result queue """
|
|
# Retries made (0 - from first attempt)
|
|
retries: int
|
|
# CPU time consumed by worker in nanoseconds. Negative if not available
|
|
worker_cpu_consumed_ns: int
|
|
# Time consumed in processing of current request in second
|
|
req_time_spent_sec: float
|
|
# Request indices. None for netload
|
|
req_indices: Optional[List[int]] = None
|
|
# REST API Response data (None if error or N/A)
|
|
result_data: Optional[bytes] = None
|
|
# Error message for failed requests, None for succeeded
|
|
error_msg: Optional[str] = None
|
|
# Optional request data
|
|
req_data: Optional[bytes] = None
|
|
|
|
|
|
# Message from Tick worker for EMA rate computation
|
|
TickInfo = NamedTuple("TickInfo", [("tick", int)])
|
|
|
|
# Type for result queue items
|
|
ResultQueueDataType = Optional[Union[WorkerResultInfo, TickInfo]]
|
|
|
|
|
|
class Ticker:
|
|
""" Second ticker (used for EMA computations). Puts TickInfo to result
|
|
queue
|
|
|
|
Private attributes:
|
|
_worker -- Tick worker process (generates TickInfo once per second)
|
|
"""
|
|
|
|
def __init__(self,
|
|
result_queue: "multiprocessing.Queue[ResultQueueDataType]") \
|
|
-> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
result_queue -- Queue for tick worker to put TickInfo objects
|
|
"""
|
|
self._worker = \
|
|
multiprocessing.Process(target=Ticker._tick_worker,
|
|
kwargs={"result_queue": result_queue})
|
|
self._worker.start()
|
|
|
|
@classmethod
|
|
def _tick_worker(
|
|
cls, result_queue: "multiprocessing.Queue[ResultQueueDataType]") \
|
|
-> None:
|
|
""" Tick worker process
|
|
|
|
Arguments:
|
|
result_queue -- Queue to put TickInfo to
|
|
"""
|
|
count = 0
|
|
while True:
|
|
time.sleep(1)
|
|
count += 1
|
|
result_queue.put(TickInfo(tick=count))
|
|
|
|
def stop(self) -> None:
|
|
""" Stops tick worker """
|
|
self._worker.terminate()
|
|
|
|
|
|
class RateEma:
|
|
""" Exponential Moving Average for rate of change
|
|
|
|
Public attributes:
|
|
rate_ema -- Rate Average computed on last tick
|
|
|
|
Private attributes:
|
|
_weight -- Weight for EMA computation
|
|
_prev_value -- Value on previous tick
|
|
"""
|
|
|
|
def __init__(self, win_size_sec: float = 20) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
result_queue -- Queue for tick worker to put TickInfo objects
|
|
win_size_sec -- Averaging window size in seconds
|
|
"""
|
|
self._weight = 2 / (win_size_sec + 1)
|
|
self.rate_ema: float = 0
|
|
self._prev_value: float = 0
|
|
|
|
def on_tick(self, new_value: float) -> None:
|
|
""" Call on arrival of TickInfo message
|
|
|
|
Arguments:
|
|
new_value - Measured data value on this tick
|
|
"""
|
|
increment = new_value - self._prev_value
|
|
self._prev_value = new_value
|
|
self.rate_ema += self._weight * (increment - self.rate_ema)
|
|
|
|
|
|
class StatusPrinter:
|
|
""" Prints status on a single line:
|
|
|
|
Private attributes:
|
|
_prev_len -- Length of previously printed line
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
enabled -- True if status print is enabled
|
|
"""
|
|
self._prev_len = 0
|
|
|
|
def pr(self, s: Optional[str] = None) -> None:
|
|
""" Print string (if given) or cleans up after previous print """
|
|
if s is None:
|
|
if self._prev_len:
|
|
print(" " * self._prev_len, flush=True)
|
|
self._prev_len = 0
|
|
else:
|
|
print(s + " " * (max(0, self._prev_len - len(s))), end="\r",
|
|
flush=True)
|
|
self._prev_len = len(s)
|
|
|
|
|
|
class ResultsProcessor:
|
|
""" Prints important summary of request execution results
|
|
|
|
Private attributes:
|
|
_netload -- True for netload testing, False for preload/load
|
|
testing
|
|
_total_requests -- Total number of individual requests (not batches)
|
|
that will be executed
|
|
_result_queue -- Queue with execution results, second ticks, Nones
|
|
for completed workers
|
|
_status_period -- Period of status printing (in terms of request
|
|
count), e.g. 1000 for once in 1000 requests, 0 to
|
|
not print status (except in the end)
|
|
_rest_data_handler -- Generator/interpreter of REST request/response
|
|
data. None for netload test
|
|
_num_workers -- Number of worker processes
|
|
_err_dir -- None or directory for failed requests
|
|
_status_printer -- StatusPrinter
|
|
"""
|
|
|
|
def __init__(
|
|
self, netload: bool, total_requests: int,
|
|
result_queue: "multiprocessing.Queue[ResultQueueDataType]",
|
|
num_workers: int, status_period: int,
|
|
rest_data_handler: Optional[RestDataHandlerBase],
|
|
err_dir: Optional[str]) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
netload -- True for netload testing, False for preload/load
|
|
testing
|
|
total_requests -- Total number of individual requests (not batches)
|
|
that will be executed
|
|
result_queue -- Queue with execution results, Nones for completed
|
|
workers
|
|
num_workers -- Number of worker processes
|
|
status_period -- Period of status printing (in terms of request
|
|
count), e.g. 1000 for once in 1000 requests, 0 to
|
|
not print status (except in the end)
|
|
rest_data_handler -- Generator/interpreter of REST request/response
|
|
data
|
|
err_dir -- None or directory for failed requests
|
|
"""
|
|
self._netload = netload
|
|
self._total_requests = total_requests
|
|
self._result_queue = result_queue
|
|
self._status_period = status_period
|
|
self._num_workers = num_workers
|
|
self._rest_data_handler = rest_data_handler
|
|
self._err_dir = err_dir
|
|
self._status_printer = StatusPrinter()
|
|
|
|
def process(self) -> None:
|
|
""" Keep processing results until all worker will stop """
|
|
start_time = datetime.datetime.now()
|
|
requests_sent: int = 0
|
|
requests_failed: int = 0
|
|
retries: int = 0
|
|
cpu_consumed_ns: int = 0
|
|
time_spent_sec: float = 0
|
|
req_rate_ema = RateEma()
|
|
cpu_consumption_ema = RateEma()
|
|
|
|
def status_message(intermediate: bool) -> str:
|
|
""" Returns status message (intermediate or final) """
|
|
now = datetime.datetime.now()
|
|
elapsed = now - start_time
|
|
elapsed_sec = elapsed.total_seconds()
|
|
elapsed_str = re.sub(r"\.\d*$", "", str(elapsed))
|
|
global_req_rate = requests_sent / elapsed_sec if elapsed_sec else 0
|
|
|
|
cpu_consumption: float
|
|
if cpu_consumed_ns < 0:
|
|
cpu_consumption = -1
|
|
elif intermediate:
|
|
cpu_consumption = cpu_consumption_ema.rate_ema
|
|
elif elapsed_sec:
|
|
cpu_consumption = cpu_consumed_ns * 1e-9 / elapsed_sec
|
|
else:
|
|
cpu_consumption = 0
|
|
|
|
if requests_sent:
|
|
req_duration = time_spent_sec / requests_sent
|
|
req_duration_str = \
|
|
f"{req_duration:.3g} sec" if req_duration > 0.1 \
|
|
else f"{req_duration * 1000:.3g} ms"
|
|
else:
|
|
req_duration_str = "unknown"
|
|
ret = \
|
|
f"{'Progress: ' if intermediate else ''}" \
|
|
f"{requests_sent} requests completed " \
|
|
f"({requests_sent * 100 / self._total_requests:.1f}%), " \
|
|
f"{requests_failed} failed " \
|
|
f"({requests_failed * 100 / (requests_sent or 1):.3f}%), " \
|
|
f"{retries} retries made. " \
|
|
f"{elapsed_str} elapsed, " \
|
|
f"rate is {global_req_rate:.3f} req/sec " \
|
|
f"(avg req proc time {req_duration_str})"
|
|
if cpu_consumption >= 0:
|
|
ret += f", CPU consumption is {cpu_consumption:.3f}"
|
|
if intermediate and elapsed_sec and requests_sent:
|
|
total_duration = \
|
|
datetime.timedelta(
|
|
seconds=self._total_requests * elapsed_sec /
|
|
requests_sent)
|
|
eta_dt = start_time + total_duration
|
|
tta_sec = int((eta_dt - now).total_seconds())
|
|
tta: str
|
|
if tta_sec < 60:
|
|
tta = f"{tta_sec} seconds"
|
|
elif tta_sec < 3600:
|
|
tta = f"{tta_sec // 60} minutes"
|
|
else:
|
|
tta_minutes = tta_sec // 60
|
|
tta = \
|
|
f"{tta_minutes // 60} hours {tta_minutes % 60} minutes"
|
|
ret += f", current rate is " \
|
|
f"{req_rate_ema.rate_ema:.3f} req/sec. " \
|
|
f"ETA: {eta_dt.strftime('%X')} (in {tta})"
|
|
return ret
|
|
|
|
try:
|
|
while True:
|
|
result_info = self._result_queue.get()
|
|
if result_info is None:
|
|
self._num_workers -= 1
|
|
if self._num_workers == 0:
|
|
break
|
|
continue
|
|
if isinstance(result_info, TickInfo):
|
|
req_rate_ema.on_tick(requests_sent)
|
|
cpu_consumption_ema.on_tick(cpu_consumed_ns * 1e-9)
|
|
continue
|
|
error_msg = result_info.error_msg
|
|
if error_msg:
|
|
self._status_printer.pr()
|
|
if self._netload:
|
|
indices_clause = ""
|
|
else:
|
|
assert result_info.req_indices is not None
|
|
indices = \
|
|
", ".join(str(i) for i in result_info.req_indices)
|
|
indices_clause = f" with indices ({indices})"
|
|
logging.error(
|
|
f"Request{indices_clause} failed: {error_msg}")
|
|
|
|
error_map: Dict[int, str] = {}
|
|
if self._rest_data_handler is not None:
|
|
if error_msg is None:
|
|
try:
|
|
error_map = \
|
|
self._rest_data_handler.make_error_map(
|
|
result_data=result_info.result_data)
|
|
except Exception as ex:
|
|
error_msg = f"Error decoding message " \
|
|
f"{result_info.result_data!r}: {repr(ex)}"
|
|
for idx, error_msg in error_map.items():
|
|
self._status_printer.pr()
|
|
logging.error(f"Request {idx} failed: {error_msg}")
|
|
|
|
prev_sent = requests_sent
|
|
num_requests = \
|
|
1 if self._netload \
|
|
else len(cast(List[int], result_info.req_indices))
|
|
requests_sent += num_requests
|
|
retries += result_info.retries
|
|
if result_info.error_msg:
|
|
requests_failed += num_requests
|
|
else:
|
|
requests_failed += len(error_map)
|
|
if self._err_dir and result_info.req_data and \
|
|
(result_info.error_msg or error_map):
|
|
try:
|
|
filename = \
|
|
os.path.join(
|
|
self._err_dir,
|
|
datetime.datetime.now().strftime(
|
|
"err_req_%y%m%d_%H%M%S_%f.json"))
|
|
with open(filename, "wb") as f:
|
|
f.write(result_info.req_data)
|
|
except OSError as ex:
|
|
error(f"Failed to write failed request file "
|
|
f"'{filename}': {repr(ex)}")
|
|
cpu_consumed_ns += result_info.worker_cpu_consumed_ns
|
|
time_spent_sec += result_info.req_time_spent_sec
|
|
|
|
if self._status_period and \
|
|
((prev_sent // self._status_period) !=
|
|
(requests_sent // self._status_period)):
|
|
self._status_printer.pr(status_message(intermediate=True))
|
|
finally:
|
|
self._status_printer.pr()
|
|
logging.info(status_message(intermediate=False))
|
|
|
|
def _print(self, s: str, newline: bool, is_error: bool) -> None:
|
|
""" Print message
|
|
|
|
Arguments:
|
|
s -- Message to print
|
|
newline -- True to go to new line, False to remain on same line
|
|
"""
|
|
if newline:
|
|
self._status_printer.pr()
|
|
(logging.error if is_error else logging.info)(s)
|
|
else:
|
|
self._status_printer.pr(s)
|
|
|
|
|
|
def post_req_worker(
|
|
url: str, retries: int, backoff: float, dry: bool,
|
|
post_req_queue: multiprocessing.Queue,
|
|
result_queue: "multiprocessing.Queue[ResultQueueDataType]",
|
|
dry_result_data: Optional[bytes], use_requests: bool,
|
|
return_requests: bool = False, delay_sec: float = 0) -> None:
|
|
""" REST API POST worker
|
|
|
|
Arguments:
|
|
url -- REST API URL to send POSTs to
|
|
retries -- Number of retries
|
|
backoff -- Initial backoff window in seconds
|
|
dry -- True to dry run
|
|
post_req_queue -- Request queue. Elements are PostWorkerReqInfo objects
|
|
corresponding to single REST API post or None to stop
|
|
operation
|
|
result_queue -- Result queue. Elements added are WorkerResultInfo for
|
|
operation results, None to signal that worker finished
|
|
use_requests -- True to use requests, False to use urllib.request
|
|
return_requests -- True to return requests in WorkerResultInfo
|
|
delay_sec -- Delay start by this number of seconds
|
|
"""
|
|
try:
|
|
time.sleep(delay_sec)
|
|
session: Optional[requests.Session] = \
|
|
requests.Session() if use_requests and (not dry) else None
|
|
has_proc_time = hasattr(time, "process_time_ns")
|
|
prev_proc_time_ns = time.process_time_ns() if has_proc_time else 0
|
|
while True:
|
|
req_info: PostWorkerReqInfo = post_req_queue.get()
|
|
if req_info is None:
|
|
result_queue.put(None)
|
|
return
|
|
|
|
start_time = datetime.datetime.now()
|
|
error_msg = None
|
|
if dry:
|
|
result_data = dry_result_data
|
|
attempts = 1
|
|
else:
|
|
result_data = None
|
|
last_error: Optional[str] = None
|
|
for attempt in range(retries + 1):
|
|
if use_requests:
|
|
assert session is not None
|
|
try:
|
|
resp = \
|
|
session.post(
|
|
url=url, data=req_info.req_data,
|
|
headers={
|
|
"Content-Type":
|
|
"application/json; charset=utf-8",
|
|
"Content-Length":
|
|
str(len(req_info.req_data))},
|
|
timeout=180)
|
|
if not resp.ok:
|
|
last_error = \
|
|
f"{resp.status_code}: {resp.reason}"
|
|
continue
|
|
result_data = resp.content
|
|
break
|
|
except requests.RequestException as ex:
|
|
last_error = repr(ex)
|
|
else:
|
|
req = urllib.request.Request(url)
|
|
req.add_header("Content-Type",
|
|
"application/json; charset=utf-8")
|
|
req.add_header("Content-Length",
|
|
str(len(req_info.req_data)))
|
|
try:
|
|
with urllib.request.urlopen(
|
|
req, req_info.req_data, timeout=180) as f:
|
|
result_data = f.read()
|
|
break
|
|
except (urllib.error.HTTPError, urllib.error.URLError,
|
|
urllib.error.ContentTooShortError, OSError) \
|
|
as ex:
|
|
last_error = repr(ex)
|
|
time.sleep(
|
|
random.uniform(0, (1 << attempt)) * backoff)
|
|
else:
|
|
error_msg = last_error
|
|
attempts = attempt + 1
|
|
new_proc_time_ns = time.process_time_ns() if has_proc_time else 0
|
|
result_queue.put(
|
|
WorkerResultInfo(
|
|
req_indices=req_info.req_indices, retries=attempts - 1,
|
|
result_data=result_data, error_msg=error_msg,
|
|
worker_cpu_consumed_ns=(new_proc_time_ns -
|
|
prev_proc_time_ns)
|
|
if has_proc_time else -1,
|
|
req_time_spent_sec=(datetime.datetime.now() - start_time).
|
|
total_seconds(),
|
|
req_data=req_info.req_data if return_requests and (not dry)
|
|
else None))
|
|
prev_proc_time_ns = new_proc_time_ns
|
|
except Exception as ex:
|
|
logging.error(f"Worker failed: {repr(ex)}\n"
|
|
f"{traceback.format_exc()}")
|
|
result_queue.put(None)
|
|
|
|
|
|
def get_req_worker(
|
|
url: str, expected_code: Optional[int], retries: int, backoff: float,
|
|
dry: bool, get_req_queue: multiprocessing.Queue,
|
|
result_queue: "multiprocessing.Queue[ResultQueueDataType]",
|
|
use_requests: bool) -> None:
|
|
""" REST API GET worker
|
|
|
|
Arguments:
|
|
url -- REST API URL to send POSTs to
|
|
expected_code -- None or expected non-200 status code
|
|
retries -- Number of retries
|
|
backoff -- Initial backoff window in seconds
|
|
dry -- True to dry run
|
|
get_req_queue -- Request queue. Elements are GetWorkerReqInfo objects
|
|
corresponding to bunch of single REST API GETs or None to
|
|
stop operation
|
|
result_queue -- Result queue. Elements added are WorkerResultInfo for
|
|
operation results, None to signal that worker finished
|
|
use_requests -- True to use requests, False to use urllib.request
|
|
"""
|
|
try:
|
|
if expected_code is None:
|
|
expected_code = http.HTTPStatus.OK.value
|
|
has_proc_time = hasattr(time, "process_time_ns")
|
|
prev_proc_time_ns = time.process_time_ns() if has_proc_time else 0
|
|
session: Optional[requests.Session] = \
|
|
requests.Session() if use_requests and (not dry) else None
|
|
while True:
|
|
req_info: GetWorkerReqInfo = get_req_queue.get()
|
|
if req_info is None:
|
|
result_queue.put(None)
|
|
return
|
|
start_time = datetime.datetime.now()
|
|
for _ in range(req_info.num_gets):
|
|
error_msg = None
|
|
if dry:
|
|
attempts = 1
|
|
else:
|
|
last_error: Optional[str] = None
|
|
for attempt in range(retries + 1):
|
|
if use_requests:
|
|
assert session is not None
|
|
try:
|
|
resp = session.get(url=url, timeout=30)
|
|
if resp.status_code != expected_code:
|
|
last_error = \
|
|
f"{resp.status_code}: {resp.reason}"
|
|
continue
|
|
break
|
|
except requests.RequestException as ex:
|
|
last_error = repr(ex)
|
|
else:
|
|
req = urllib.request.Request(url)
|
|
status_code: Optional[int] = None
|
|
status_reason: str = ""
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30):
|
|
pass
|
|
status_code = http.HTTPStatus.OK.value
|
|
status_reason = http.HTTPStatus.OK.name
|
|
except urllib.error.HTTPError as http_ex:
|
|
status_code = http_ex.code
|
|
status_reason = http_ex.reason
|
|
except (urllib.error.URLError,
|
|
urllib.error.ContentTooShortError) as ex:
|
|
last_error = repr(ex)
|
|
if status_code is not None:
|
|
if status_code == expected_code:
|
|
break
|
|
last_error = f"{status_code}: {status_reason}"
|
|
time.sleep(
|
|
random.uniform(0, (1 << attempt)) * backoff)
|
|
else:
|
|
error_msg = last_error
|
|
attempts = attempt + 1
|
|
new_proc_time_ns = \
|
|
time.process_time_ns() if has_proc_time else 0
|
|
result_queue.put(
|
|
WorkerResultInfo(
|
|
retries=attempts - 1, error_msg=error_msg,
|
|
worker_cpu_consumed_ns=(new_proc_time_ns -
|
|
prev_proc_time_ns)
|
|
if has_proc_time else -1,
|
|
req_time_spent_sec=(datetime.datetime.now() -
|
|
start_time).
|
|
total_seconds(),))
|
|
prev_proc_time_ns = new_proc_time_ns
|
|
except Exception as ex:
|
|
logging.error(f"Worker failed: {repr(ex)}\n"
|
|
f"{traceback.format_exc()}")
|
|
result_queue.put(None)
|
|
|
|
|
|
def get_idx_range(idx_range_arg: str) -> Tuple[int, int]:
|
|
""" Parses --idx_range command line parameter to index range tuple """
|
|
parts = idx_range_arg.split("-", maxsplit=1)
|
|
try:
|
|
ret = (0, int(parts[0])) if len(parts) == 1 \
|
|
else (int(parts[0]), int(parts[1]))
|
|
error_if(ret[0] >= ret[1], f"Invalid index range: {idx_range_arg}")
|
|
return ret
|
|
except ValueError as ex:
|
|
error(f"Invalid index range syntax: {repr(ex)}")
|
|
return (0, 0) # Appeasing pylint, will never happen
|
|
|
|
|
|
def producer_worker(
|
|
count: Optional[int], batch: int, parallel: int,
|
|
req_queue: multiprocessing.Queue, netload: bool = False,
|
|
min_idx: Optional[int] = None, max_idx: Optional[int] = None,
|
|
rest_data_handler: Optional[RestDataHandlerBase] = None) -> None:
|
|
""" POST Producer (request queue filler)
|
|
|
|
Arguments:
|
|
batch -- Batch size (number of requests per queue element)
|
|
min_idx -- Minimum request index. None for netload
|
|
max_idx -- Aftremaximum request index. None for netload
|
|
count -- Optional count of requests to send. None means that
|
|
requests will be sent sequentially according to range.
|
|
If specified - request indices will be randomized
|
|
parallel -- Number of worker processes to use
|
|
dry -- Dry run
|
|
rest_data_handler -- Generator/interpreter of REST request/response data
|
|
req_queue -- Requests queue to fill
|
|
"""
|
|
try:
|
|
if netload:
|
|
assert count is not None
|
|
for min_count in range(0, count, batch):
|
|
req_queue.put(
|
|
GetWorkerReqInfo(num_gets=min(count - min_count, batch)))
|
|
elif count is not None:
|
|
assert (min_idx is not None) and (max_idx is not None) and \
|
|
(rest_data_handler is not None)
|
|
for min_count in range(0, count, batch):
|
|
req_indices = \
|
|
[random.randrange(min_idx, max_idx)
|
|
for _ in range(min(count - min_count, batch))]
|
|
req_queue.put(
|
|
PostWorkerReqInfo(
|
|
req_indices=req_indices,
|
|
req_data=rest_data_handler.make_req_data(req_indices)))
|
|
else:
|
|
assert (min_idx is not None) and (max_idx is not None) and \
|
|
(rest_data_handler is not None)
|
|
for min_req_idx in range(min_idx, max_idx, batch):
|
|
req_indices = list(range(min_req_idx,
|
|
min(min_req_idx + batch, max_idx)))
|
|
req_queue.put(
|
|
PostWorkerReqInfo(
|
|
req_indices=req_indices,
|
|
req_data=rest_data_handler.make_req_data(req_indices)))
|
|
except Exception as ex:
|
|
error(f"Producer terminated: {repr(ex)}")
|
|
finally:
|
|
for _ in range(parallel):
|
|
req_queue.put(None)
|
|
|
|
|
|
def run(url: str, parallel: int, backoff: int, retries: int, dry: bool,
|
|
status_period: int, batch: int,
|
|
rest_data_handler: Optional[RestDataHandlerBase] = None,
|
|
netload_target: Optional[str] = None,
|
|
expected_code: Optional[int] = None, min_idx: Optional[int] = None,
|
|
max_idx: Optional[int] = None, count: Optional[int] = None,
|
|
use_requests: bool = False, err_dir: Optional[str] = None,
|
|
ramp_up: Optional[float] = None, randomize: Optional[bool] = None,
|
|
population_db: Optional[str] = None) -> None:
|
|
""" Run the POST operation
|
|
|
|
Arguments:
|
|
rest_data_handler -- REST API payload data generator/interpreter. None for
|
|
netload
|
|
url -- REST API URL to send POSTs to (GET in case of netload)
|
|
parallel -- Number of worker processes to use
|
|
backoff -- Initial size of backoff windows in seconds
|
|
retries -- Number of retries
|
|
dry -- True to dry run
|
|
status_period -- Period (in terms of count) of status message prints (0
|
|
to not at all)
|
|
batch -- Batch size (number of requests per element of request
|
|
queue)
|
|
netload_target -- None for POST test, tested destination for netload
|
|
test
|
|
expected_code -- None or non-200 netload test HTTP status code
|
|
min_idx -- Minimum request index. None for netload test
|
|
max_idx -- Aftermaximum request index. None for netload test
|
|
count -- Optional count of requests to send. None means that
|
|
requests will be sent sequentially according to range.
|
|
If specified - request indices will be randomized
|
|
use_requests -- Use requests to send requests (default is to use
|
|
urllib.request)
|
|
err_dir -- None or directory for failed requests
|
|
ramp_up -- Ramp up parallel streams for this number of seconds
|
|
randomize -- True to random points, False for predefined points,
|
|
None if irrelevant. Only used in banner printing
|
|
population_db -- Population database file name or None. Only used for
|
|
banner printing
|
|
"""
|
|
error_if(use_requests and ("requests" not in sys.modules),
|
|
"'requests' Python3 module have to be installed to use "
|
|
"'--no_reconnect' option")
|
|
total_requests = count if count is not None \
|
|
else (cast(int, max_idx) - cast(int, min_idx))
|
|
if netload_target:
|
|
logging.info(f"Netload test of {netload_target}")
|
|
logging.info(f"URL: {'N/A' if dry else url}")
|
|
logging.info(f"Streams: {parallel}")
|
|
logging.info(f"Backoff: {backoff} sec, {retries} retries")
|
|
if not netload_target:
|
|
logging.info(f"Index range: {min_idx:_} - {max_idx:_}")
|
|
logging.info(f"Batch size: {batch}")
|
|
logging.info(f"Requests to send: {total_requests:_}")
|
|
logging.info(f"Nonreconnect mode: {use_requests}")
|
|
if ramp_up is not None:
|
|
logging.info(f"Ramp up for: {ramp_up} seconds")
|
|
if status_period:
|
|
logging.info(f"Intermediate status is printed every "
|
|
f"{status_period} requests completed")
|
|
else:
|
|
logging.info("Intermediate status is not printed")
|
|
if err_dir:
|
|
logging.info(f"Directory for failed requests: {err_dir}")
|
|
if randomize is not None:
|
|
logging.info(
|
|
f"Point selection is {'random' if randomize else 'predefined'}")
|
|
if population_db:
|
|
logging.info(f"Point density chosen according to population database: "
|
|
f"{population_db}")
|
|
if dry:
|
|
logging.info("Dry mode")
|
|
|
|
req_queue: multiprocessing.Queue = multiprocessing.Queue()
|
|
result_queue: "multiprocessing.Queue[ResultQueueDataType]" = \
|
|
multiprocessing.Queue()
|
|
workers: List[multiprocessing.Process] = []
|
|
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
ticker: Optional[Ticker] = None
|
|
if err_dir:
|
|
try:
|
|
os.makedirs(err_dir, exist_ok=True)
|
|
except OSError as ex:
|
|
error(f"Failed to create directory '{err_dir}': {repr(ex)}")
|
|
try:
|
|
req_worker_kwargs: Dict[str, Any] = {
|
|
"url": url, "backoff": backoff, "retries": retries,
|
|
"dry": dry, "result_queue": result_queue,
|
|
"use_requests": use_requests}
|
|
req_worker: Callable
|
|
if netload_target:
|
|
req_worker = get_req_worker
|
|
req_worker_kwargs["get_req_queue"] = req_queue
|
|
req_worker_kwargs["expected_code"] = expected_code
|
|
else:
|
|
assert rest_data_handler is not None
|
|
req_worker = post_req_worker
|
|
req_worker_kwargs["post_req_queue"] = req_queue
|
|
req_worker_kwargs["dry_result_data"] = \
|
|
rest_data_handler.dry_result_data(batch)
|
|
req_worker_kwargs["return_requests"] = err_dir is not None
|
|
for idx in range(parallel):
|
|
kwargs = dict(req_worker_kwargs)
|
|
if ramp_up is not None:
|
|
kwargs["delay_sec"] = \
|
|
(idx * ramp_up / (parallel - 1)) if idx else 0
|
|
|
|
workers.append(
|
|
multiprocessing.Process(target=req_worker, kwargs=kwargs))
|
|
workers[-1].start()
|
|
workers.append(
|
|
multiprocessing.Process(
|
|
target=producer_worker,
|
|
kwargs={"netload": netload_target is not None,
|
|
"min_idx": min_idx, "max_idx": max_idx,
|
|
"count": count, "batch": batch,
|
|
"parallel": parallel,
|
|
"rest_data_handler": rest_data_handler,
|
|
"req_queue": req_queue}))
|
|
workers[-1].start()
|
|
ticker = Ticker(result_queue)
|
|
signal.signal(signal.SIGINT, original_sigint_handler)
|
|
results_processor = \
|
|
ResultsProcessor(
|
|
netload=netload_target is not None,
|
|
total_requests=total_requests, result_queue=result_queue,
|
|
num_workers=parallel, status_period=status_period,
|
|
rest_data_handler=rest_data_handler, err_dir=err_dir)
|
|
results_processor.process()
|
|
for worker in workers:
|
|
worker.join()
|
|
finally:
|
|
for worker in workers:
|
|
if worker.is_alive():
|
|
worker.terminate()
|
|
if ticker:
|
|
ticker.stop()
|
|
|
|
|
|
def wait_rcache_flush(cfg: Config, args: Any,
|
|
service_discovery: Optional[ServiceDiscovery]) -> None:
|
|
""" Waiting for rcache preload stuff flushing to DB
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
args -- Parsed command line arguments
|
|
service_discovery -- Optional ServiceDiscovery object
|
|
"""
|
|
logging.info("Waiting for updates to flush to DB")
|
|
start_time = datetime.datetime.now()
|
|
start_queue_len: Optional[int] = None
|
|
prev_queue_len: Optional[int] = None
|
|
status_printer = StatusPrinter()
|
|
rcache_status_url = \
|
|
get_url(base_url=cfg.rest_api.rcache_status.url,
|
|
param_host=args.rcache, service_discovery=service_discovery)
|
|
while True:
|
|
try:
|
|
with urllib.request.urlopen(rcache_status_url, timeout=30) as f:
|
|
rcache_status = json.loads(f.read())
|
|
except (urllib.error.HTTPError, urllib.error.URLError) as ex:
|
|
error(f"Error retrieving Rcache status '{cfg.region.rulesest_id}' "
|
|
f"using URL get_config.url : {repr(ex)}")
|
|
queue_len: int = rcache_status["update_queue_len"]
|
|
if not rcache_status["update_queue_len"]:
|
|
status_printer.pr()
|
|
break
|
|
if start_queue_len is None:
|
|
start_queue_len = prev_queue_len = queue_len
|
|
elif (args.status_period is not None) and \
|
|
((cast(int, prev_queue_len) - queue_len) >=
|
|
args.status_period):
|
|
now = datetime.datetime.now()
|
|
elapsed_sec = (now - start_time).total_seconds()
|
|
written = start_queue_len - queue_len
|
|
total_duration = \
|
|
datetime.timedelta(
|
|
seconds=start_queue_len * elapsed_sec / written)
|
|
eta_dt = start_time + total_duration
|
|
status_printer.pr(
|
|
f"{queue_len} records not yet written. "
|
|
f"Write rate {written / elapsed_sec:.2f} rec/sec. "
|
|
f"ETA {eta_dt.strftime('%X')} (in "
|
|
f"{int((eta_dt - now).total_seconds()) // 60} minutes)")
|
|
time.sleep(1)
|
|
status_printer.pr()
|
|
now = datetime.datetime.now()
|
|
elapsed_sec = (now - start_time).total_seconds()
|
|
elapsed_str = re.sub(r"\.\d*$", "", str((now - start_time)))
|
|
msg = f"Flushing took {elapsed_str}"
|
|
if start_queue_len is not None:
|
|
msg += f". Flush rate {start_queue_len / elapsed_sec: .2f} rec/sec"
|
|
logging.info(msg)
|
|
|
|
|
|
def get_afc_config(cfg: Config, args: Any,
|
|
service_discovery: Optional[ServiceDiscovery]) \
|
|
-> Dict[str, Any]:
|
|
""" Retrieve AFC Config for configured Ruleset ID
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
args -- Parsed command line arguments
|
|
service_discovery -- Optional ServiceDiscovery object
|
|
Returns AFC Config as dictionary
|
|
"""
|
|
get_config_url = \
|
|
get_url(base_url=cfg.rest_api.get_config.url,
|
|
param_host=getattr(args, "rat_server", None),
|
|
service_discovery=service_discovery)
|
|
try:
|
|
get_config_url += cfg.region.rulesest_id
|
|
with urllib.request.urlopen(get_config_url, timeout=30) as f:
|
|
afc_config_str = f.read().decode('utf-8')
|
|
except (urllib.error.HTTPError, urllib.error.URLError) as ex:
|
|
error(f"Error retrieving AFC Config for Ruleset ID "
|
|
f"'{cfg.region.rulesest_id}' using URL get_config.url: "
|
|
f"{repr(ex)}")
|
|
try:
|
|
return json.loads(afc_config_str)
|
|
except json.JSONDecodeError as ex:
|
|
error(f"Error decoding AFC Config JSON: {repr(ex)}")
|
|
return {} # Unreachable code to appease pylint
|
|
|
|
|
|
def patch_json(patch_arg: Optional[List[str]], json_dict: Dict[str, Any],
|
|
data_type: str, new_type: Optional[str] = None) \
|
|
-> Dict[str, Any]:
|
|
""" Modify JSON object with patches from command line
|
|
|
|
Arguments:
|
|
patch_arg -- Optional list of FIELD1=VALUE1[,FIELD2=VALUE2...] patches
|
|
json_dict -- JSON dictionary to modify
|
|
data_type -- Patch of what - to be used in error messages
|
|
new_type -- None or type of new values for previously nonexisted keys
|
|
returns modified dictionary
|
|
"""
|
|
if not patch_arg:
|
|
return json_dict
|
|
ret = copy.deepcopy(json_dict)
|
|
for patches in patch_arg:
|
|
for patch in patches.split(";"):
|
|
error_if("=" not in patch,
|
|
f"Invalid syntax: {data_type} setting '{patch}' doesn't "
|
|
f"have '='")
|
|
field, value = patch.split("=", 1)
|
|
new_key = False
|
|
super_obj: Any = None
|
|
obj: Any = ret
|
|
last_idx: Any = None
|
|
idx: Any
|
|
for idx in field.split("."):
|
|
super_obj = obj
|
|
if re.match(r"^\d+$", idx):
|
|
int_idx = int(idx)
|
|
error_if(not isinstance(obj, list),
|
|
f"Integer index '{idx}' in {data_type} setting "
|
|
f"'{patch}' is applied to nonlist entity")
|
|
error_if(not (0 <= int_idx < len(obj)),
|
|
f"Integer index '{idx}' in {data_type} setting "
|
|
f"'{patch}' is outside of [0, {len(obj)}[ valid "
|
|
f"range")
|
|
idx = int_idx
|
|
else:
|
|
error_if(not isinstance(obj, dict),
|
|
f"Key '{idx}' in {data_type} setting '{patch}' "
|
|
f"can't be applied to nondictionary entity")
|
|
if idx not in obj:
|
|
error_if(not new_type,
|
|
f"Key '{idx}' of setting '{patch}' not found "
|
|
f"in {data_type}")
|
|
obj[idx] = {}
|
|
new_key = True
|
|
obj = obj[idx]
|
|
last_idx = idx
|
|
error_if(
|
|
(isinstance(obj, dict) and (not new_key)) or
|
|
(isinstance(obj, list) and (not value.startswith("["))),
|
|
f"'{field}' of {data_type} setting '{patch}' does not address "
|
|
f"scalar value")
|
|
try:
|
|
if isinstance(obj, int) or (new_key and (new_type == "int")):
|
|
try:
|
|
super_obj[last_idx] = int(value)
|
|
except ValueError:
|
|
if new_key and (new_type == "int"):
|
|
raise
|
|
super_obj[last_idx] = float(value)
|
|
elif isinstance(obj, float) or \
|
|
(new_key and (new_type == "float")):
|
|
super_obj[last_idx] = float(value)
|
|
elif isinstance(obj, bool) or \
|
|
(new_key and (new_type == "bool")):
|
|
if value.lower() in ("1", "y", "t", "yes", "true", "+"):
|
|
super_obj[last_idx] = True
|
|
elif value.lower() in ("0", "n", "f", "no", "false", "-"):
|
|
super_obj[last_idx] = False
|
|
else:
|
|
raise TypeError(f"'{value}' is bot a valid boolean "
|
|
"representation")
|
|
elif isinstance(obj, list) or \
|
|
(new_key and (new_type == "list")):
|
|
super_obj[last_idx] = json.loads(value)
|
|
else:
|
|
super_obj[last_idx] = value
|
|
except (TypeError, ValueError, json.JSONDecodeError) as ex:
|
|
error(f"'{value}' of {data_type} setting '{patch}' has "
|
|
f"invalid type/formatting: {repr(ex)}")
|
|
return ret
|
|
|
|
|
|
def patch_req(cfg: Config, args_req: Optional[List[str]]) -> Dict[str, Any]:
|
|
""" Get AFC Request pattern, patched according to --req switch
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
args_req -- Optional --req switch value
|
|
Returns patched AFC Request pattern
|
|
"""
|
|
req_msg_dict = json.loads(cfg.req_msg_pattern)
|
|
req_dict = path_get(obj=req_msg_dict, path=cfg.paths.reqs_in_msg)[0]
|
|
req_dict = patch_json(patch_arg=args_req, json_dict=req_dict,
|
|
data_type="AFC Request")
|
|
path_set(obj=req_msg_dict, path=cfg.paths.reqs_in_msg, value=[req_dict])
|
|
return req_msg_dict
|
|
|
|
|
|
def do_preload(cfg: Config, args: Any) -> None:
|
|
""" Execute "preload" command.
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
args -- Parsed command line arguments
|
|
"""
|
|
if args.dry:
|
|
afc_config = {}
|
|
worker_url = ""
|
|
else:
|
|
service_discovery = None if args.comp_proj is None \
|
|
else ServiceDiscovery(compose_project=args.comp_proj)
|
|
if args.protect_cache:
|
|
try:
|
|
with urllib.request.urlopen(
|
|
urllib.request.Request(
|
|
get_url(base_url=cfg.rest_api.protect_rcache.url,
|
|
param_host=args.rcache,
|
|
service_discovery=service_discovery),
|
|
method="POST"),
|
|
timeout=30):
|
|
pass
|
|
except (urllib.error.HTTPError, urllib.error.URLError) as ex:
|
|
error(f"Error attempting to protect Rcache from invalidation "
|
|
f"using protect_rcache.url: {repr(ex)}")
|
|
|
|
worker_url = \
|
|
get_url(base_url=cfg.rest_api.update_rcache.url,
|
|
param_host=args.rcache,
|
|
service_discovery=service_discovery)
|
|
|
|
afc_config = get_afc_config(cfg=cfg, args=args,
|
|
service_discovery=service_discovery)
|
|
|
|
min_idx, max_idx = get_idx_range(args.idx_range)
|
|
|
|
run(rest_data_handler=PreloadRestDataHandler(
|
|
cfg=cfg, afc_config=afc_config,
|
|
req_msg_pattern=patch_req(cfg=cfg, args_req=args.req)),
|
|
url=worker_url, parallel=args.parallel, backoff=args.backoff,
|
|
retries=args.retries, dry=args.dry, batch=args.batch, min_idx=min_idx,
|
|
max_idx=max_idx, status_period=args.status_period,
|
|
use_requests=args.no_reconnect)
|
|
|
|
if not args.dry:
|
|
wait_rcache_flush(cfg=cfg, args=args,
|
|
service_discovery=service_discovery)
|
|
|
|
|
|
def get_afc_worker_url(args: Any, base_url: str,
|
|
service_discovery: Optional[ServiceDiscovery]) -> str:
|
|
""" REST API URL to AFC server
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
base_url -- Base URL from config
|
|
service_discovery -- Optional ServiceDiscovery object
|
|
Returns REST API URL
|
|
"""
|
|
if args.localhost:
|
|
error_if(not args.comp_proj,
|
|
"--comp_proj parameter must be specified")
|
|
m = re.search(r"0\.0\.0\.0:(\d+)->%d/tcp.*%s_dispatcher_\d+" %
|
|
(80 if args.localhost == "http" else 443,
|
|
re.escape(args.comp_proj)),
|
|
run_docker(["ps"]))
|
|
error_if(not m,
|
|
"AFC port not found. Please specify AFC server address "
|
|
"explicitly and remove --localhost switch")
|
|
assert m is not None
|
|
ret = get_url(base_url=base_url,
|
|
param_host=f"localhost:{m.group(1)}",
|
|
service_discovery=service_discovery)
|
|
else:
|
|
ret = get_url(base_url=base_url, param_host=args.afc,
|
|
service_discovery=service_discovery)
|
|
return ret
|
|
|
|
|
|
def do_load(cfg: Config, args: Any) -> None:
|
|
""" Execute "load" command
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
args -- Parsed command line arguments
|
|
"""
|
|
if args.dry:
|
|
worker_url = ""
|
|
else:
|
|
service_discovery = None if args.comp_proj is None \
|
|
else ServiceDiscovery(compose_project=args.comp_proj)
|
|
worker_url = \
|
|
get_afc_worker_url(args=args, base_url=cfg.rest_api.afc_req.url,
|
|
service_discovery=service_discovery)
|
|
if args.no_cache:
|
|
parsed_worker_url = urllib.parse.urlparse(worker_url)
|
|
worker_url = \
|
|
parsed_worker_url._replace(
|
|
query="&".join(
|
|
p for p in [parsed_worker_url.query, "nocache=True"]
|
|
if p)).geturl()
|
|
|
|
min_idx, max_idx = get_idx_range(args.idx_range)
|
|
run(rest_data_handler=LoadRestDataHandler(
|
|
cfg=cfg, randomize=args.random, population_db=args.population,
|
|
req_msg_pattern=patch_req(cfg=cfg, args_req=args.req)),
|
|
url=worker_url, parallel=args.parallel, backoff=args.backoff,
|
|
retries=args.retries, dry=args.dry, batch=args.batch, min_idx=min_idx,
|
|
max_idx=max_idx, status_period=args.status_period, count=args.count,
|
|
use_requests=args.no_reconnect, err_dir=args.err_dir,
|
|
ramp_up=args.ramp_up, randomize=args.random,
|
|
population_db=args.population)
|
|
|
|
|
|
def do_netload(cfg: Config, args: Any) -> None:
|
|
""" Execute "netload" command.
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
args -- Parsed command line arguments
|
|
"""
|
|
expected_code: Optional[int] = None
|
|
worker_url = ""
|
|
if not args.dry:
|
|
service_discovery = None if args.comp_proj is None \
|
|
else ServiceDiscovery(compose_project=args.comp_proj)
|
|
if args.target is None:
|
|
if args.localhost and (not args.rcache):
|
|
args.target = "dispatcher"
|
|
elif args.afc and (not (args.localhost or args.rcache)):
|
|
args.target = "msghnd"
|
|
elif args.target and (not (args.localhost or args.afc)):
|
|
args.target = "rcache"
|
|
else:
|
|
error("'--target' argument must be explicitly specified")
|
|
if args.target == "rcache":
|
|
worker_url = \
|
|
get_url(base_url=cfg.rest_api.rcache_get.url,
|
|
param_host=args.rcache,
|
|
service_discovery=service_discovery)
|
|
expected_code = cfg.rest_api.rcache_get.get("code")
|
|
elif args.target == "msghnd":
|
|
worker_url = \
|
|
get_url(base_url=cfg.rest_api.msghnd_get.url,
|
|
param_host=args.afc,
|
|
service_discovery=service_discovery)
|
|
expected_code = cfg.rest_api.msghnd_get.get("code")
|
|
else:
|
|
assert args.target == "dispatcher"
|
|
worker_url = \
|
|
get_afc_worker_url(
|
|
args=args, base_url=cfg.rest_api.dispatcher_get.url,
|
|
service_discovery=service_discovery)
|
|
expected_code = cfg.rest_api.dispatcher_get.get("code")
|
|
run(netload_target=args.target, url=worker_url,
|
|
parallel=args.parallel, backoff=args.backoff, retries=args.retries,
|
|
dry=args.dry, batch=1000, expected_code=expected_code,
|
|
status_period=args.status_period, count=args.count,
|
|
use_requests=args.no_reconnect)
|
|
|
|
|
|
def do_cache(cfg: Config, args: Any) -> None:
|
|
""" Execute "cache" command.
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
args -- Parsed command line arguments
|
|
"""
|
|
error_if(args.protect and args.unprotect,
|
|
"--protect and --unprotect are mutually exclusive")
|
|
error_if(not (args.protect or args.unprotect or args.invalidate),
|
|
"Nothing to do")
|
|
service_discovery = None if args.comp_proj is None \
|
|
else ServiceDiscovery(compose_project=args.comp_proj)
|
|
json_data: Any
|
|
for arg, attr, json_data in \
|
|
[("protect", "protect_rcache", None),
|
|
("invalidate", "invalidate_rcache", {}),
|
|
("unprotect", "unprotect_rcache", None)]:
|
|
try:
|
|
if not getattr(args, arg):
|
|
continue
|
|
data: Optional[bytes] = None
|
|
url = get_url(base_url=getattr(cfg.rest_api, attr).url,
|
|
param_host=args.rcache,
|
|
service_discovery=service_discovery)
|
|
req = urllib.request.Request(url, method="POST")
|
|
if json_data is not None:
|
|
data = json.dumps(json_data).encode(encoding="ascii")
|
|
req.add_header("Content-Type", "application/json")
|
|
urllib.request.urlopen(req, data, timeout=30)
|
|
except (urllib.error.HTTPError, urllib.error.URLError) as ex:
|
|
error(f"Error attempting to perform cache {arg} using "
|
|
f"rest_api.{attr}.url: {repr(ex)}")
|
|
|
|
|
|
def do_afc_config(cfg: Config, args: Any) -> None:
|
|
""" Execute "afc_config" command.
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
args -- Parsed command line arguments
|
|
"""
|
|
service_discovery = ServiceDiscovery(compose_project=args.comp_proj)
|
|
afc_config = get_afc_config(cfg=cfg, args=args,
|
|
service_discovery=service_discovery)
|
|
afc_config_str = \
|
|
json.dumps(
|
|
patch_json(patch_arg=args.FIELD_VALUE, json_dict=afc_config,
|
|
new_type=args.new, data_type="AFC Config"))
|
|
result = \
|
|
ratdb(
|
|
cfg=cfg,
|
|
command=cfg.ratdb.update_config_by_id.format(
|
|
afc_config=afc_config_str, region_str=afc_config["regionStr"]),
|
|
service_discovery=service_discovery)
|
|
error_if(not (isinstance(result, int) and result > 0),
|
|
"AFC Config update failed")
|
|
|
|
|
|
def do_json_config(cfg: Config, args: Any) -> None:
|
|
""" Execute "afc_config" command.
|
|
|
|
Arguments:
|
|
cfg -- Config object
|
|
args -- Parsed command line arguments
|
|
"""
|
|
s = json.dumps(cfg.data(), indent=2)
|
|
filename = args.JSON_CONFIG if args.JSON_CONFIG else \
|
|
(os.path.splitext(Config.DEFAULT_CONFIG)[0] + Config.JSON_EXT)
|
|
with open(filename, "w", encoding="utf-8") as f:
|
|
f.write(s)
|
|
|
|
|
|
def do_help(cfg: Config, args: Any) -> None:
|
|
""" Execute "help" command.
|
|
|
|
Arguments:
|
|
cfg -- Config object (not used)
|
|
args -- Parsed command line arguments (also contains 'argument_parser' and
|
|
'subparsers' fields)
|
|
"""
|
|
unused_argument(cfg)
|
|
if args.subcommand is None:
|
|
args.argument_parser.print_help()
|
|
else:
|
|
args.subparsers.choices[args.subcommand].print_help()
|
|
|
|
|
|
def main(argv: List[str]) -> None:
|
|
"""Do the job.
|
|
|
|
Arguments:
|
|
argv -- Program arguments
|
|
"""
|
|
|
|
cfg = Config(argv=argv, arg_name="config")
|
|
|
|
switches_common = argparse.ArgumentParser(add_help=False)
|
|
switches_common.add_argument(
|
|
"--parallel", metavar="N", type=int, default=cfg.defaults.parallel,
|
|
help=f"Number of requests to execute in parallel. Default is "
|
|
f"{cfg.defaults.parallel}")
|
|
switches_common.add_argument(
|
|
"--backoff", metavar="SECONDS", type=float,
|
|
default=cfg.defaults.backoff,
|
|
help=f"Initial backoff window (in seconds) to use on request failure. "
|
|
f"It is doubled on each retry. Default is {cfg.defaults.backoff} "
|
|
f"seconds")
|
|
switches_common.add_argument(
|
|
"--retries", metavar="N", type=int, default=cfg.defaults.retries,
|
|
help=f"Maximum number of retries before giving up. Default is "
|
|
f"{cfg.defaults.retries}")
|
|
switches_common.add_argument(
|
|
"--dry", action="store_true",
|
|
help="Dry run (no communication with server) to estimate overhead")
|
|
switches_common.add_argument(
|
|
"--status_period", metavar="N", type=int,
|
|
default=cfg.defaults.status_period,
|
|
help=f"How often to print status information. Default is once in "
|
|
f"{cfg.defaults.status_period} requests. 0 means no status print")
|
|
switches_common.add_argument(
|
|
"--no_reconnect", action="store_true",
|
|
help="Do not reconnect on each request (requires 'requests' Python3 "
|
|
"library to be installed: 'pip install requests'")
|
|
|
|
switches_req = argparse.ArgumentParser(add_help=False)
|
|
switches_req.add_argument(
|
|
"--idx_range", metavar="[FROM-]TO", default=cfg.defaults.idx_range,
|
|
help=f"Range of AP indices. FROM is initial index (0 if omitted), TO "
|
|
f"is 'afterlast' index. Default is '{cfg.defaults.idx_range}'")
|
|
switches_req.add_argument(
|
|
"--batch", metavar="N", type=int, default=cfg.defaults.batch,
|
|
help=f"Number of requests in one REST API call. Default is "
|
|
f"{cfg.defaults.batch}")
|
|
switches_req.add_argument(
|
|
"--req", metavar="FIELD1=VALUE1[;FIELD2=VALUE2...]", action="append",
|
|
default=[],
|
|
help="Change field(s) in request body (compared to req_msg_pattern "
|
|
"in config file). FIELD is dot-separated path to field inside request "
|
|
"(e.g. 'location.ellipse.majorAxis'), VALUE is a field value, if "
|
|
"field value is list - it should be surrounded by [] and formatted as "
|
|
"in JSON. Several semicolon-separated settings may be specified, also "
|
|
"this parameter may be specified several times")
|
|
|
|
switches_count = argparse.ArgumentParser(add_help=False)
|
|
switches_count.add_argument(
|
|
"--count", metavar="NUM_REQS", type=int, default=cfg.defaults.count,
|
|
help=f"Number of requests to send. Default is {cfg.defaults.count}")
|
|
|
|
switches_afc = argparse.ArgumentParser(add_help=False)
|
|
switches_afc.add_argument(
|
|
"--afc", metavar="[PROTOCOL://]HOST[:port][path][params]",
|
|
help="AFC Server to send requests to. Unspecified parts are taken "
|
|
"from 'rest_api.afc_req' of config file. By default determined by "
|
|
"means of '--comp_proj'")
|
|
switches_afc.add_argument(
|
|
"--localhost", nargs="?", choices=["http", "https"], const="http",
|
|
help="If --afc not specified, default is to send requests to msghnd "
|
|
"container (bypassing Nginx container). This flag causes requests to "
|
|
"be sent to external http/https AFC port on localhost. If protocol "
|
|
"not specified http is assumed")
|
|
|
|
switches_rat = argparse.ArgumentParser(add_help=False)
|
|
switches_rat.add_argument(
|
|
"--rat_server", metavar="[PROTOCOL://]HOST[:port][path][params]",
|
|
help="Server to request config from. Unspecified parts are taken "
|
|
"from 'rest_api.get_config' of config file. By default determined by "
|
|
"means of '--comp_proj'")
|
|
|
|
switches_compose = argparse.ArgumentParser(add_help=False)
|
|
switches_compose.add_argument(
|
|
"--comp_proj", metavar="PROJ_NAME",
|
|
help="Docker compose project name. Used to determine hosts to send "
|
|
"API calls to. If not specified hostnames should be specified "
|
|
"explicitly")
|
|
|
|
switches_rcache = argparse.ArgumentParser(add_help=False)
|
|
switches_rcache.add_argument(
|
|
"--rcache", metavar="[PROTOCOL://]HOST[:port]",
|
|
help="Rcache server. May also be determined by means of '--comp_proj'")
|
|
|
|
# Top level parser
|
|
argument_parser = argparse.ArgumentParser(
|
|
description="AFC Load Test Tool")
|
|
argument_parser.add_argument(
|
|
"--config", metavar="[+]CONFIG_FILE", action="append", default=[],
|
|
help=f"Config file. Default has same name and directory as this "
|
|
f"script, but has "
|
|
f"'{Config.YAML_EXT if has_yaml else Config.JSON_EXT}' extension "
|
|
f"(i.e. {Config.DEFAULT_CONFIG}). May be specified several times (in "
|
|
f"which case values are merged together). If prefixed with '+' - "
|
|
"joined to the default config. Note that this script is accompanied "
|
|
f"with default YAML config. On YAML-less Python it should be "
|
|
f"converted to JSON (with 'json_config' subcommand) on some YAML-ed "
|
|
f"system and copied to YAML-less one")
|
|
|
|
subparsers = argument_parser.add_subparsers(dest="subcommand",
|
|
metavar="SUBCOMMAND")
|
|
|
|
parser_preload = subparsers.add_parser(
|
|
"preload",
|
|
parents=[switches_common, switches_rat, switches_req, switches_compose,
|
|
switches_rcache],
|
|
help="Fill rcache with (fake) responses")
|
|
parser_preload.add_argument(
|
|
"--protect_cache", action="store_true",
|
|
help="Protect Rcache from invalidation (e.g. by ULS downloader). "
|
|
"Protection persists in rcache database, see 'rcache' subcommand on "
|
|
"how to unprotect")
|
|
parser_preload.set_defaults(func=do_preload)
|
|
|
|
parser_load = subparsers.add_parser(
|
|
"load",
|
|
parents=[switches_common, switches_req, switches_count,
|
|
switches_compose, switches_afc],
|
|
help="Do load test")
|
|
parser_load.add_argument(
|
|
"--no_cache", action="store_true",
|
|
help="Don't use rcache, force each request to be computed")
|
|
parser_load.add_argument(
|
|
"--population", metavar="POPULATION_DB_FILE",
|
|
help="Select AP positions proportionally to population density from "
|
|
"given database (prepared with "
|
|
"tools/geo_converters/make_population_db.py). Positions are random, "
|
|
"so no rcache will help")
|
|
parser_load.add_argument(
|
|
"--err_dir", metavar="DIRECTORY",
|
|
help="Directory for offending JSON AFC Requests")
|
|
parser_load.add_argument(
|
|
"--random", action="store_true",
|
|
help="Choose AP positions randomly (makes sense only in noncached "
|
|
"mode)")
|
|
parser_load.add_argument(
|
|
"--ramp_up", metavar="SECONDS", type=float, default=0,
|
|
help="Ramp up streams for this number of seconds. Default is to start "
|
|
"all at once")
|
|
parser_load.set_defaults(func=do_load)
|
|
|
|
parser_network = subparsers.add_parser(
|
|
"netload",
|
|
parents=[switches_common, switches_count, switches_compose,
|
|
switches_afc, switches_rcache],
|
|
help="Network load test by repeatedly querying health endpoints")
|
|
parser_network.add_argument(
|
|
"--target", choices=["dispatcher", "msghnd", "rcache"],
|
|
help="What to test. If omitted then guess is attempted: 'dispatcher' "
|
|
"if --localhost specified, 'msghnd' if --afc without --localhost is "
|
|
"specified, 'rcache' if --rcache is specified")
|
|
parser_network.set_defaults(func=do_netload)
|
|
|
|
parser_cache = subparsers.add_parser(
|
|
"cache", parents=[switches_compose, switches_rcache],
|
|
help="Do something with response cache")
|
|
parser_cache.add_argument(
|
|
"--protect", action="store_true",
|
|
help="Protect rcache from invalidation (e.g. by background ULS "
|
|
"downloader). This action persists in rcache database, it need to be "
|
|
"explicitly undone with --unprotect")
|
|
parser_cache.add_argument(
|
|
"--unprotect", action="store_true",
|
|
help="Allows rcache invalidation")
|
|
parser_cache.add_argument(
|
|
"--invalidate", action="store_true",
|
|
help="Invalidate cache (invalidation must be enabled)")
|
|
parser_cache.set_defaults(func=do_cache)
|
|
|
|
parser_afc_config = subparsers.add_parser(
|
|
"afc_config", parents=[switches_rat], help="Modify AFC Config")
|
|
parser_afc_config.add_argument(
|
|
"--comp_proj", metavar="PROJ_NAME", required=True,
|
|
help="Docker compose project name. This parameter is mandatory")
|
|
parser_afc_config.add_argument(
|
|
"--new", metavar="VALUE_TYPE",
|
|
choices=["str", "int", "float", "bool", "list"],
|
|
help="Allow creation of new AFC Config keys (requires respective "
|
|
"changes in AFC Engine). Created keys will be of given type")
|
|
parser_afc_config.add_argument(
|
|
"FIELD_VALUE", nargs="+",
|
|
help="One or more FIELD=VALUE clauses, where FIELD is a field name in "
|
|
"AFC Config, deep field may be specified in dot-separated for m (e.g. "
|
|
"'freqBands.0.startFreqMHz'). VALUE is new field value, if field "
|
|
"value is list - it should be surrounded by [] and formatted as in "
|
|
"JSON")
|
|
parser_afc_config.set_defaults(func=do_afc_config)
|
|
|
|
parser_json_config = subparsers.add_parser(
|
|
"json_config",
|
|
help="Convert config file from YAML to JSON for use on YAML-less "
|
|
"systems")
|
|
parser_json_config.add_argument(
|
|
"JSON_CONFIG", nargs="?",
|
|
help=f"JSON file to create. By default - same as source YAML file, "
|
|
f"but with {Config.JSON_EXT} extension")
|
|
parser_json_config.set_defaults(func=do_json_config)
|
|
|
|
parser_help = subparsers.add_parser(
|
|
"help", add_help=False,
|
|
help="Prints help on given subcommand")
|
|
parser_help.add_argument(
|
|
"subcommand", metavar="SUBCOMMAND", nargs="?",
|
|
choices=subparsers.choices,
|
|
help="Name of subcommand to print help about (use " +
|
|
"\"%(prog)s --help\" to get list of all subcommands)")
|
|
parser_help.set_defaults(func=do_help, subparsers=subparsers,
|
|
argument_parser=argument_parser,
|
|
supports_unknown_args=True)
|
|
|
|
if not argv:
|
|
argument_parser.print_help()
|
|
sys.exit(1)
|
|
args = argument_parser.parse_known_args(argv)[0]
|
|
if not getattr(args, "supports_unknown_args", False):
|
|
args = argument_parser.parse_args(argv)
|
|
|
|
# Set up logging
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(
|
|
logging.Formatter(
|
|
f"{os.path.basename(__file__)}. %(levelname)s: %(message)s"))
|
|
logging.getLogger().addHandler(console_handler)
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
# Do the needful
|
|
try:
|
|
args.func(cfg, args)
|
|
except KeyboardInterrupt:
|
|
print("^C")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv[1:])
|