#!/usr/bin/env python3 """ AFC Load Test Tool """ # # Copyright (C) 2023 Broadcom. All rights reserved. The term "Broadcom" # refers solely to the Broadcom Inc. corporate affiliate that owns # the software below. This work is licensed under the OpenAFC Project License, # a copy of which is included with this software program # # pylint: disable=too-many-lines, too-many-arguments, invalid-name # pylint: disable=consider-using-f-string, wrong-import-order, too-many-locals # pylint: disable=too-few-public-methods, logging-fstring-interpolation # pylint: disable=too-many-instance-attributes, broad-exception-caught # pylint: disable=too-many-branches, too-many-nested-blocks # pylint: disable=too-many-statements, eval-used import argparse from collections.abc import Iterable, Iterator import copy import csv import datetime import hashlib import http import inspect import json import logging import multiprocessing import os import random try: import requests except ImportError: pass import re import signal import sqlite3 import subprocess import sys import time import traceback from typing import Any, Callable, cast, List, Dict, NamedTuple, Optional, \ Tuple, Union import urllib.error import urllib.parse import urllib.request has_yaml = True try: import yaml except ImportError: has_yaml = False def dp(*args, **kwargs) -> None: # pylint: disable=invalid-name """Print debug message Arguments: args -- Format and positional arguments. If latter present - formatted with % kwargs -- Keyword arguments. If present formatted with format() """ msg = args[0] if args else "" if len(args) > 1: msg = msg % args[1:] if args and kwargs: msg = msg.format(**kwargs) cur_frame = inspect.currentframe() assert (cur_frame is not None) and (cur_frame.f_back is not None) frameinfo = inspect.getframeinfo(cur_frame.f_back) print(f"DP {frameinfo.function}()@{frameinfo.lineno}: {msg}") def error(msg: str) -> None: """ Prints given msg as error message and exit abnormally """ logging.error(msg) sys.exit(1) def error_if(cond: Any, msg: str) -> None: """ If condition evaluates to true prints given msg as error message and exits abnormally """ if cond: error(msg) def exc_if(predicate: Any, exc: Exception) -> None: """ Raise given exception if predicate evaluates to true """ if predicate: raise exc def expandpath(path: Optional[str]) -> Optional[str]: """ Expand ~ and {} in given path. For None/empty returns None/empty """ return os.path.expandvars(os.path.expanduser(path)) if path else path def unused_argument(arg: Any) -> None: # pylint: disable=unused-argument """ Sink for all unused arguments """ return def yaml_load(yaml_s: str) -> Any: """ YAML/JSON dictionary for given YAML/JSON content """ kwargs: Dict[str, Any] = {} if has_yaml: if hasattr(yaml, "CLoader"): kwargs["Loader"] = yaml.CLoader elif hasattr(yaml, "FullLoader"): kwargs["Loader"] = yaml.FullLoader return yaml.load(yaml_s, **kwargs) return json.loads(yaml_s) class Config(Iterable): """ Node of config structure that provides dot-based access Private attributes: _data -- Data that corresponds to node (list or string-indexed dictionary) _path -- Path to node ("foo.bar.42.baz") for use in error messages """ JSON_EXT = ".json" YAML_EXT = ".yaml" # Default config file name DEFAULT_CONFIG = \ os.path.splitext(__file__)[0] + (YAML_EXT if has_yaml else JSON_EXT) def __init__( self, argv: Optional[List[str]] = None, arg_name: Optional[str] = None, config_env: Optional[str] = None, data: Optional[Union[List[Any], Dict[str, Any]]] = None, path: Optional[str] = None) -> None: """ Constructor (for both root and nonroot nodes) Arguments: argv -- Argument for root node: Command line arguments arg_name -- Argument for root node - name of command line parameter with config env_config -- Optional argument for root node - name of environment variable for main config data -- Argument for nonroot node: data that corresponds to node (list or string-indexed dictionary) path -- Argument for nonroot node: path to node ("foo.bar[42].baz") for use in error messages """ self._data: Union[List[Any], Dict[str, Any]] self._path: str # Nonroot node if data is not None: assert path is not None assert (argv or arg_name or config_env) is None self._path = path self._data = data return # Root node assert path is None assert argv is not None assert arg_name is not None self._path = "" argument_parser = argparse.ArgumentParser() argument_parser.add_argument( "--" + arg_name, action="append", default=[]) configs = getattr(argument_parser.parse_known_args(argv)[0], arg_name) # Finding root config if (not configs) or any(c.startswith("+") for c in configs): for config_file in \ ([expandpath(os.environ.get(config_env))] if config_env else []) + [self.DEFAULT_CONFIG]: assert config_file is not None if os.path.isfile(config_file): configs = [config_file] + configs break else: error("This script's config file not found") self._data = {} # Merging all configs for config in configs: if config.startswith("+"): config = config[1:] error_if(not os.path.isfile(config), f"Config file '{config}' not found") with open(config, encoding="utf-8") as f: yaml_s = f.read() try: config_yaml_dict = yaml_load(yaml_s) except (yaml.YAMLError if has_yaml else json.JSONDecodeError) \ as ex: error(f"Error reading '{config}': {repr(ex)}") error_if(not isinstance(config_yaml_dict, dict), f"Content of config file '{config}' is not a dictionary") assert isinstance(config_yaml_dict, dict) self._data = {**self._data, **config_yaml_dict} def __getattr__(self, attr: str) -> Any: """ Returns given attribute of node """ exc_if(not (isinstance(self._data, dict) and (attr in self._data)), AttributeError(f"Config item '{self._path}' does not have " f"attribute '{attr}'")) assert isinstance(self._data, dict) return self._subitem(value=self._data[attr], attr=attr) def __getitem__(self, key: Union[int, str]) -> Any: """ Access by index - integer or string """ if isinstance(self._data, list): exc_if(not isinstance(key, (int, slice)), IndexError(f"Config item '{self._path}' is a list, it " f"can't be subscribed with '{key}'")) assert isinstance(key, (int, slice)) exc_if(isinstance(key, int) and (key >= len(self._data)), IndexError(f"Index {key} is out of range for config item " f"'{self._path}'")) else: assert isinstance(self._data, dict) exc_if(not isinstance(key, str), KeyError(f"Config item '{self._path}' is a dictionary, it " f"can't be subscribed with '{key}'")) assert isinstance(key, str) exc_if(key not in self._data, KeyError(f"Config item '{self._path}' does not have " f"attribute '{key}'")) return self._subitem(value=self._data[key], attr=key) def get(self, attr: str, default: Any = None) -> Any: """ Keys of dictionary-type node """ exc_if(not isinstance(self._data, dict), AttributeError(f"Config item '{self._path}' is not a " f"dictionary, can't be queried for '{attr}'")) assert isinstance(self._data, dict) exc_if(not isinstance(attr, str), AttributeError(f"Non-string attribute '{attr}' can't be looked " f"up in config item '{self._path}'")) return self._subitem(value=self._data.get(attr, default), attr=attr) def __len__(self) -> int: """ Number of subitems in node """ return len(self._data) def keys(self) -> Iterable: """ Keys of dictionary-type node """ exc_if(not isinstance(self._data, dict), AttributeError(f"Config item '{self._path}' is not a " f"dictionary, it doesn't have keys")) assert isinstance(self._data, dict) return self._data.keys() def values(self) -> Iterable: """ Values of dictionary-type node """ exc_if(not isinstance(self._data, dict), AttributeError(f"Config item '{self._path}' is not a " f"dictionary, it doesn't have values")) assert isinstance(self._data, dict) for key, value in self._data.items(): yield self._subitem(value=value, attr=key) def items(self) -> Iterable: """ Items of dictionary-type node """ exc_if(not isinstance(self._data, dict), AttributeError(f"Config item '{self._path}' is not a " f"dictionary, it doesn't have items")) assert isinstance(self._data, dict) for key, value in self._data.items(): yield (key, self._subitem(value=value, attr=key)) def data(self) -> Union[List[Any], Dict[str, Any]]: """ Returns underlying data structure """ return self._data def __iter__(self) -> Iterator: """ Iterator over node subitems """ if isinstance(self._data, list): for idx, value in enumerate(self._data): yield self._subitem(value=value, attr=idx) else: assert isinstance(self._data, dict) for key in self._data.keys(): yield key def __in__(self, attr: Any) -> bool: """ True if dictionary node contains given attribute """ exc_if(not isinstance(self._data, dict), AttributeError(f"Config item '{self._path}' is not a " f"dictionary, 'in' check can't be performed")) exc_if(not isinstance(attr, str), AttributeError(f"Non-string attribute '{attr}' can't be looked " f"up in config item '{self._path}'")) return attr in self._data def _subitem(self, value: Any, attr: Union[int, str]) -> Any: """ Returns given subitem as attribute of given name """ if not isinstance(value, (list, dict)): return value return \ self.__class__( data=value, path=f"{self._path}{'.' if self._path else ''}{attr}" if isinstance(attr, str) else f"{self._path}[{attr}]") def run_docker(args: List[str]) -> str: """ Runs docker with given parameters, returns stdout content """ try: return \ subprocess.check_output(["docker"] + args, universal_newlines=True, encoding="utf-8") except subprocess.CalledProcessError as ex: error(f"Failed to run 'docker {' '.join(args)}': {repr(ex)}. Please " "specify all hosts explicitly") return "" # Unreachable code to make pylint happy class ServiceDiscovery: """ Container and IP discovery for services Private attributes: _compose_project -- Compose project name _containers -- Dictionary of _ContainerInfo object, indexed by service name. None before first access """ class _ContainerInfo: """ Information about single service Public attributes: name -- Container name ip -- Container IP (if known) or None """ def __init__(self, name: str) -> None: self.name = name self.ip: Optional[str] = None def __init__(self, compose_project: str) -> None: """ Constructor Arguments: compose_project -- Docker compose project name """ self._compose_project = compose_project self._containers: \ Optional[Dict[str, "ServiceDiscovery._ContainerInfo"]] = None def get_container(self, service: str) -> str: """ Returns container name for given service name """ ci = self._get_cont_info(service) return ci.name def get_ip(self, service: str) -> str: """ Returns container IP for given service name """ ci = self._get_cont_info(service) if ci.ip: return ci.ip try: inspect_dict = json.loads(run_docker(["inspect", ci.name])) except json.JSONDecodeError as ex: error(f"Error parsing 'docker inspect {ci.name}' output: " f"{repr(ex)}") try: for net_name, net_info in \ inspect_dict[0]["NetworkSettings"]["Networks"].items(): if net_name.endswith("_default"): ci.ip = net_info["IPAddress"] break else: error(f"Default network not found in container '{ci.name}'") except (AttributeError, LookupError) as ex: error(f"Unsupported structure of 'docker inspect {ci.name}' " f"output: {repr(ex)}") assert ci.ip is not None return ci.ip def _get_cont_info(self, service: str) \ -> "ServiceDiscovery._ContainerInfo": """ Returns _ContainerInfo object for given service name """ if self._containers is None: self._containers = {} name_offset: Optional[int] = None for line in run_docker(["ps"]).splitlines(): if name_offset is None: name_offset = line.find("NAMES") error_if(name_offset < 0, "Unsupported structure of 'docker ps' output") continue assert name_offset is not None for names in re.split(r",\s*", line[name_offset:]): m = re.search(r"(%s_(.+)_\d+)" % re.escape(self._compose_project), names) if m: self._containers[m.group(2)] = \ self._ContainerInfo(name=m.group(1)) error_if(not self._containers, f"No running containers found for compose project " f"'{self._compose_project}'") ret = self._containers.get(service) error_if(ret is None, f"Service name '{service}' not found among containers of " f"compose project '{self._compose_project}'") assert ret is not None return ret def get_url(base_url: str, param_host: Optional[str], service_discovery: Optional[ServiceDiscovery]) -> str: """ Construct URL from base URL in config and command line host or compose project name Arguments: base_url -- Base URL from config param_host -- Optional host name from command line parameter service_discovery -- Optional ServiceDiscovery object. None means that service discovery is not possible (--comp_proj was not specified) and hostname must be provided explicitly Returns actionable URL with host either specified or retrieved from compose container inspection and tail from base URL """ base_parts = urllib.parse.urlparse(base_url) if param_host: if "://" not in param_host: param_host = f"{base_parts.scheme}://{param_host}" param_parts = urllib.parse.urlparse(param_host) replacements = {field: getattr(param_parts, field) for field in base_parts._fields if getattr(param_parts, field)} return urllib.parse.urlunparse(base_parts._replace(**replacements)) if service_discovery is None: return base_url service = base_parts.netloc.split(":")[0] return \ urllib.parse.urlunparse( base_parts._replace( netloc=service_discovery.get_ip(service) + base_parts.netloc[len(service):])) def ratdb(cfg: Config, command: str, service_discovery: ServiceDiscovery) \ -> Union[int, List[Dict[str, Any]]]: """ Executes SQL statement in ratdb Arguments: cfg -- Config object command -- SQL command to execute service_discovery -- ServiceDiscovery object Returns number of affected records for INSERT/UPDATE/DELETE, list of row dictionaries on SELECT """ cont = service_discovery.get_container(cfg.ratdb.service) result = run_docker(["exec", cont, "psql", "-U", cfg.ratdb.username, "-d", cfg.ratdb.dbname, "--csv", "-c", command]) if command.upper().startswith("SELECT "): try: return list(csv.DictReader(result.splitlines())) except csv.Error as ex: error(f"CSV parse error on output of '{command}': {repr(ex)}") for begin, pattern in [("INSERT ", r"INSERT\s+\d+\s+(\d+)"), ("UPDATE ", r"UPDATE\s+(\d+)"), ("DELETE ", r"DELETE\s+(\d+)")]: if command.upper().startswith(begin): m = re.search(pattern, result) error_if(not m, f"Can't fetch result of '{command}'") assert m is not None return int(m.group(1)) error(f"Unknown SQL command '{command}'") return 0 # Unreachable code, appeasing pylint def path_get(obj: Any, path: List[Union[str, int]]) -> Any: """ Read value from multilevel dictionary/list structure Arguments: obj -- Multilevel dictionary/list structure path -- Sequence of indices Returns value at the end of sequence """ for idx in path: obj = obj[idx] return obj def path_set(obj: Any, path: List[Union[str, int]], value: Any) -> None: """ Write value to multilevel dictionary/list structure Arguments: obj -- Multilevel dictionary/list structure path -- Sequence of indices value -- Value to insert to point at the end of sequence """ for idx in path[: -1]: obj = obj[idx] obj[path[-1]] = value def path_del(obj: Any, path: List[Union[str, int]]) -> Any: """ Delete value from multilevel dictionary/list structure Arguments: obj -- Multilevel dictionary/list structure path -- Sequence of indices, value at end of it is deleted """ ret = obj for idx in path[: -1]: obj = obj[idx] if isinstance(path[-1], int): obj.pop(path[-1]) else: del obj[path[-1]] return ret class AfcReqRespGenerator: """ Generator of AFC Request/Response messages for given request indices Private attributes _paths -- Copy of cfg.paths _req_msg_pattern -- AFC Request message pattern as JSON dictionary _resp_msg_pattern -- AFC Response message pattern as JSON dictionary _grid_size -- Copy of cfg.region.grid_size _min_lat -- Copy of cfg.region.min_lat _max_lat -- Copy of cfg.region.max_lat _min_lon -- Copy of cfg.region.min_lon _max_lon -- Copy of cfg.region.max_lon _default_height -- AP height to use when there is no randomization _channels_20mhz -- Copy of cfg._channels_20mhz _randomize -- True to choose AP positions randomly (uniformly or according to population density). False to use request index (to make caching possible) _random_height -- Formula for random height in string form. Evaluated with 'r' local value, randomly distributed in [0, 1] _population_db -- Population density database name. None for uniform _db_conn -- None or SQLite3 connection _db_cur -- None or SQLite3 cursor """ def __init__(self, cfg: Config, randomize: bool, population_db: Optional[str], req_msg_pattern: Optional[Dict[str, Any]]) -> None: """ Constructor Arguments: cfg -- Config object randomize -- True to choose AP positions randomly (uniformly or according to population density). False to use request index (to make caching possible) population_db -- Population density database name. None for uniform req_msg_pattern -- Optional Request message pattern to use instead of default """ self._paths = cfg.paths self._req_msg_pattern = \ req_msg_pattern or json.loads(cfg.req_msg_pattern) self._resp_msg_pattern = json.loads(cfg.resp_msg_pattern) path_set(path_get(self._req_msg_pattern, self._paths.reqs_in_msg)[0], self._paths.ruleset_in_req, cfg.region.rulesest_id) path_set(path_get(self._req_msg_pattern, self._paths.reqs_in_msg)[0], self._paths.cert_in_req, cfg.region.cert_id) path_set(path_get(self._resp_msg_pattern, self._paths.resps_in_msg)[0], self._paths.ruleset_in_resp, cfg.region.rulesest_id) self._grid_size = cfg.region.grid_size self._min_lat = cfg.region.min_lat self._max_lat = cfg.region.max_lat self._min_lon = cfg.region.min_lon self._max_lon = cfg.region.max_lon self._default_height = \ path_get( path_get(self._req_msg_pattern, self._paths.reqs_in_msg)[0], self._paths.height_in_req) self._channels_20mhz = cfg.channels_20mhz self._randomize = randomize self._random_height = cfg.region.random_height self._population_db = population_db self._db_conn: Optional[sqlite3.Connection] = None self._db_cur: Optional[sqlite3.Cursor] = None def request_msg(self, req_indices=Union[int, List[int]]) -> Dict[str, Any]: """ Generate AFC Request message for given request index range Arguments: req_indices -- Request index or list of request indices Returns Request message """ if isinstance(req_indices, int): req_indices = [req_indices] assert not isinstance(req_indices, int) # Whole message to be msg = copy.deepcopy(self._req_msg_pattern) pattern_req = path_get(msg, self._paths.reqs_in_msg)[0] # List of requests reqs = [] for idx_in_msg, req_idx in enumerate(req_indices): # Individual request req = copy.deepcopy(pattern_req) if len(req_indices) > 1 \ else pattern_req path_set(req, self._paths.id_in_req, self._req_id(req_idx, idx_in_msg)) path_set(req, self._paths.serial_in_req, self._serial(req_idx)) lat, lon, height = self._get_position(req_idx=req_idx) path_set(req, self._paths.lat_in_req, lat) path_set(req, self._paths.lon_in_req, lon) path_set(req, self._paths.height_in_req, height) reqs.append(req) path_set(msg, self._paths.reqs_in_msg, reqs) return msg def response_msg(self, req_idx: int) -> Dict[str, Any]: """ Generate (fake) AFC Response message (for rcache - hence single-item) Arguments: req_idx -- Request index Returns Fake AFC Response message """ # AFC Response message to be msg = copy.deepcopy(self._resp_msg_pattern) # Response inside it resp = path_get(msg, self._paths.resps_in_msg)[0] path_set(resp, self._paths.id_in_resp, self._req_id(req_idx, 0)) # To add uniqueness to message - set 20MHz channels according to bits # in request index binary representation channels = [] powers = [] chan_idx = 0 while req_idx: if req_idx & 1: channels.append(self._channels_20mhz[chan_idx]) powers.append(30.) chan_idx += 1 req_idx //= 2 path_set(resp, self._paths.var_chans_in_resp, channels) path_set(resp, self._paths.var_pwrs_in_resp, powers) return msg def _req_id(self, req_idx: int, idx_in_msg: int) -> str: """ Request ID in message for given request index Arguments: req_idx -- Request index idx_in_msg -- Index of request in AFC message Returns Request ID to use """ unused_argument(idx_in_msg) return str(req_idx) def _serial(self, req_idx: int) -> str: """ AP Serial Number for given request index """ return f"AFC_LOAD_{req_idx:08}" def _get_position(self, req_idx: int) -> Tuple[float, float, float]: """ Returns (lat_deg, lon_deg, height_m) position for given request index """ height = \ eval(self._random_height, None, {"r": random.uniform(0, 1)}) \ if self._randomize else self._default_height if self._population_db is None: if self._randomize: return (random.uniform(self._min_lat, self._max_lat), random.uniform(self._min_lon, self._max_lon), height) return (self._min_lat + (req_idx // self._grid_size) * (self._max_lat - self._min_lat) / self._grid_size, (req_idx % self._grid_size) * (self._max_lon - self._min_lon) / self._grid_size, height) if self._db_conn is None: self._db_conn = \ sqlite3.connect(f"file:{self._population_db}?mode=ro", uri=True) self._db_cur = self._db_conn.cursor() assert self._db_cur is not None cumulative_density = random.uniform(0, 1) if self._randomize \ else req_idx / (self._grid_size * self._grid_size) rows = \ self._db_cur.execute( f"SELECT min_lat, max_lat, min_lon, max_lon " f"FROM population_density " f"WHERE cumulative_density >= {cumulative_density} " f"ORDER BY cumulative_density " f"LIMIT 1") min_lat, max_lat, min_lon, max_lon = rows.fetchall()[0] if self._randomize: return (random.uniform(min_lat, max_lat), random.uniform(min_lon, max_lon), height) return ((min_lat + max_lat) / 2, (min_lon + max_lon) / 2, height) class RestDataHandlerBase: """ Base class for generate/process REST API request/response data payloads """ def __init__(self, cfg: Config, randomize: bool = False, population_db: Optional[str] = None, req_msg_pattern: Optional[Dict[str, Any]] = None) -> None: """ Constructor Arguments: cfg -- Config object randomize -- True to choose AP positions randomly (uniformly or according to population density). False to use request index (to make caching possible) population_db -- Population density database name. None for uniform req_msg_pattern -- Optional Request message pattern to use instead of default """ self.cfg = cfg self.afc_req_resp_gen = \ AfcReqRespGenerator( cfg=cfg, randomize=randomize, population_db=population_db, req_msg_pattern=req_msg_pattern) def make_req_data(self, req_indices: List[int]) -> bytes: """ Abstract method that generates REST API POST data payload Arguments: req_indices -- List of request indices to generate payload for Returns POST payload as byte string """ unused_argument(req_indices) raise \ NotImplementedError( f"{self.__class__}.make_req_data() must be implemented") def make_error_map(self, result_data: Optional[bytes]) -> Dict[int, str]: """ Virtual method for error map generation Arguments: result_data -- Response in form of optional bytes string Returns error map - dictionary of error messages, indexed by request indices. This default implementation returns empty dictionary """ unused_argument(result_data) return {} def dry_result_data(self, batch: Optional[int]) -> bytes: """ Virtual method returning bytes string to pass to make_error_map on dry run. This default implementation returns empty string """ unused_argument(batch) return b"" class PreloadRestDataHandler(RestDataHandlerBase): """ REST API data handler for 'preload' operation - i.e. Rcache update messages (no response) Private attributes: _hash_base -- MD5 hash computed over AFC Config, awaiting AFC Request tail """ def __init__(self, cfg: Config, afc_config: Dict[str, Any], req_msg_pattern: Optional[Dict[str, Any]] = None) -> None: """ Constructor Arguments: cfg -- Config object afc_config -- AFC Config that will be used in request hash computation req_msg_pattern -- Optional Request message pattern to use instead of default """ self._hash_base = hashlib.md5() self._hash_base.update(json.dumps(afc_config, sort_keys=True).encode("utf-8")) super().__init__(cfg=cfg, req_msg_pattern=req_msg_pattern) def make_req_data(self, req_indices: List[int]) -> bytes: """ Generates REST API POST payload (RcacheUpdateReq - see rcache_models.py, not included here) Arguments: req_indices -- List of request indices to generate payload for Returns POST payload as byte string """ rrks = [] for req_idx in req_indices: req_msg = self.afc_req_resp_gen.request_msg(req_indices=[req_idx]) resp_msg = self.afc_req_resp_gen.response_msg(req_idx=req_idx) hash_req = \ copy.deepcopy(path_get(req_msg, self.cfg.paths.reqs_in_msg)[0]) path_del(hash_req, self.cfg.paths.id_in_req) h = self._hash_base.copy() h.update(json.dumps(hash_req, sort_keys=True).encode("utf-8")) rrks.append({"afc_req": json.dumps(req_msg), "afc_resp": json.dumps(resp_msg), "req_cfg_digest": h.hexdigest()}) return json.dumps({"req_resp_keys": rrks}).encode("utf-8") class LoadRestDataHandler(RestDataHandlerBase): """ REST API data handler for 'load' operation - i.e. AFC Request/Response messages """ def __init__(self, cfg: Config, randomize: bool, population_db: Optional[str], req_msg_pattern: Optional[Dict[str, Any]] = None) -> None: """ Constructor Arguments: cfg -- Config object randomize -- True to choose AP positions randomly (uniformly or according to population density). False to use request index (to make caching possible) population_db -- Population density database name. None for uniform req_msg_pattern -- Optional Request message pattern to use instead of default """ super().__init__(cfg=cfg, randomize=randomize, population_db=population_db, req_msg_pattern=req_msg_pattern) def make_req_data(self, req_indices: List[int]) -> bytes: """ Generates REST API POST payload (AFC Request message) Arguments: req_indices -- Request indices to generate payload for Returns POST payload as byte string """ return \ json.dumps( self.afc_req_resp_gen.request_msg(req_indices=req_indices)).\ encode("utf-8") def make_error_map(self, result_data: Optional[bytes]) -> Dict[int, str]: """ Generate error map for given AFC Response message Arguments: result_data -- AFC Response as byte string Returns error map - dictionary of error messages, indexed by request indices """ paths = self.cfg.paths ret: Dict[int, str] = {} for resp in path_get(json.loads(result_data or b""), paths.resps_in_msg): if path_get(resp, paths.code_in_resp): ret[int(path_get(resp, paths.id_in_resp))] = \ str(path_get(resp, paths.response_in_resp)) return ret def dry_result_data(self, batch: Optional[int]) -> bytes: """ Returns byte string, containing AFC Response to parse on dry run """ assert batch is not None resp_msg = json.loads(self.cfg.resp_msg_pattern) path_set(resp_msg, self.cfg.paths.resps_in_msg, path_get(resp_msg, self.cfg.paths.resps_in_msg) * batch) return json.dumps(resp_msg).encode("utf-8") # POST Worker request data and supplementary information PostWorkerReqInfo = \ NamedTuple("PostWorkerReqInfo", [ # Request indices, contained in REST API request data ("req_indices", List[int]), # REST API Request data ("req_data", bytes)]) # GET Worker request data and supplementary information GetWorkerReqInfo = \ NamedTuple("GetWorkerReqInfo", [ # Number of GET requests to send ("num_gets", int)]) # REST API request results class WorkerResultInfo(NamedTuple): """ Data, returned by REST API workers in result queue """ # Retries made (0 - from first attempt) retries: int # CPU time consumed by worker in nanoseconds. Negative if not available worker_cpu_consumed_ns: int # Time consumed in processing of current request in second req_time_spent_sec: float # Request indices. None for netload req_indices: Optional[List[int]] = None # REST API Response data (None if error or N/A) result_data: Optional[bytes] = None # Error message for failed requests, None for succeeded error_msg: Optional[str] = None # Optional request data req_data: Optional[bytes] = None # Message from Tick worker for EMA rate computation TickInfo = NamedTuple("TickInfo", [("tick", int)]) # Type for result queue items ResultQueueDataType = Optional[Union[WorkerResultInfo, TickInfo]] class Ticker: """ Second ticker (used for EMA computations). Puts TickInfo to result queue Private attributes: _worker -- Tick worker process (generates TickInfo once per second) """ def __init__(self, result_queue: "multiprocessing.Queue[ResultQueueDataType]") \ -> None: """ Constructor Arguments: result_queue -- Queue for tick worker to put TickInfo objects """ self._worker = \ multiprocessing.Process(target=Ticker._tick_worker, kwargs={"result_queue": result_queue}) self._worker.start() @classmethod def _tick_worker( cls, result_queue: "multiprocessing.Queue[ResultQueueDataType]") \ -> None: """ Tick worker process Arguments: result_queue -- Queue to put TickInfo to """ count = 0 while True: time.sleep(1) count += 1 result_queue.put(TickInfo(tick=count)) def stop(self) -> None: """ Stops tick worker """ self._worker.terminate() class RateEma: """ Exponential Moving Average for rate of change Public attributes: rate_ema -- Rate Average computed on last tick Private attributes: _weight -- Weight for EMA computation _prev_value -- Value on previous tick """ def __init__(self, win_size_sec: float = 20) -> None: """ Constructor Arguments: result_queue -- Queue for tick worker to put TickInfo objects win_size_sec -- Averaging window size in seconds """ self._weight = 2 / (win_size_sec + 1) self.rate_ema: float = 0 self._prev_value: float = 0 def on_tick(self, new_value: float) -> None: """ Call on arrival of TickInfo message Arguments: new_value - Measured data value on this tick """ increment = new_value - self._prev_value self._prev_value = new_value self.rate_ema += self._weight * (increment - self.rate_ema) class StatusPrinter: """ Prints status on a single line: Private attributes: _prev_len -- Length of previously printed line """ def __init__(self) -> None: """ Constructor Arguments: enabled -- True if status print is enabled """ self._prev_len = 0 def pr(self, s: Optional[str] = None) -> None: """ Print string (if given) or cleans up after previous print """ if s is None: if self._prev_len: print(" " * self._prev_len, flush=True) self._prev_len = 0 else: print(s + " " * (max(0, self._prev_len - len(s))), end="\r", flush=True) self._prev_len = len(s) class ResultsProcessor: """ Prints important summary of request execution results Private attributes: _netload -- True for netload testing, False for preload/load testing _total_requests -- Total number of individual requests (not batches) that will be executed _result_queue -- Queue with execution results, second ticks, Nones for completed workers _status_period -- Period of status printing (in terms of request count), e.g. 1000 for once in 1000 requests, 0 to not print status (except in the end) _rest_data_handler -- Generator/interpreter of REST request/response data. None for netload test _num_workers -- Number of worker processes _err_dir -- None or directory for failed requests _status_printer -- StatusPrinter """ def __init__( self, netload: bool, total_requests: int, result_queue: "multiprocessing.Queue[ResultQueueDataType]", num_workers: int, status_period: int, rest_data_handler: Optional[RestDataHandlerBase], err_dir: Optional[str]) -> None: """ Constructor Arguments: netload -- True for netload testing, False for preload/load testing total_requests -- Total number of individual requests (not batches) that will be executed result_queue -- Queue with execution results, Nones for completed workers num_workers -- Number of worker processes status_period -- Period of status printing (in terms of request count), e.g. 1000 for once in 1000 requests, 0 to not print status (except in the end) rest_data_handler -- Generator/interpreter of REST request/response data err_dir -- None or directory for failed requests """ self._netload = netload self._total_requests = total_requests self._result_queue = result_queue self._status_period = status_period self._num_workers = num_workers self._rest_data_handler = rest_data_handler self._err_dir = err_dir self._status_printer = StatusPrinter() def process(self) -> None: """ Keep processing results until all worker will stop """ start_time = datetime.datetime.now() requests_sent: int = 0 requests_failed: int = 0 retries: int = 0 cpu_consumed_ns: int = 0 time_spent_sec: float = 0 req_rate_ema = RateEma() cpu_consumption_ema = RateEma() def status_message(intermediate: bool) -> str: """ Returns status message (intermediate or final) """ now = datetime.datetime.now() elapsed = now - start_time elapsed_sec = elapsed.total_seconds() elapsed_str = re.sub(r"\.\d*$", "", str(elapsed)) global_req_rate = requests_sent / elapsed_sec if elapsed_sec else 0 cpu_consumption: float if cpu_consumed_ns < 0: cpu_consumption = -1 elif intermediate: cpu_consumption = cpu_consumption_ema.rate_ema elif elapsed_sec: cpu_consumption = cpu_consumed_ns * 1e-9 / elapsed_sec else: cpu_consumption = 0 if requests_sent: req_duration = time_spent_sec / requests_sent req_duration_str = \ f"{req_duration:.3g} sec" if req_duration > 0.1 \ else f"{req_duration * 1000:.3g} ms" else: req_duration_str = "unknown" ret = \ f"{'Progress: ' if intermediate else ''}" \ f"{requests_sent} requests completed " \ f"({requests_sent * 100 / self._total_requests:.1f}%), " \ f"{requests_failed} failed " \ f"({requests_failed * 100 / (requests_sent or 1):.3f}%), " \ f"{retries} retries made. " \ f"{elapsed_str} elapsed, " \ f"rate is {global_req_rate:.3f} req/sec " \ f"(avg req proc time {req_duration_str})" if cpu_consumption >= 0: ret += f", CPU consumption is {cpu_consumption:.3f}" if intermediate and elapsed_sec and requests_sent: total_duration = \ datetime.timedelta( seconds=self._total_requests * elapsed_sec / requests_sent) eta_dt = start_time + total_duration tta_sec = int((eta_dt - now).total_seconds()) tta: str if tta_sec < 60: tta = f"{tta_sec} seconds" elif tta_sec < 3600: tta = f"{tta_sec // 60} minutes" else: tta_minutes = tta_sec // 60 tta = \ f"{tta_minutes // 60} hours {tta_minutes % 60} minutes" ret += f", current rate is " \ f"{req_rate_ema.rate_ema:.3f} req/sec. " \ f"ETA: {eta_dt.strftime('%X')} (in {tta})" return ret try: while True: result_info = self._result_queue.get() if result_info is None: self._num_workers -= 1 if self._num_workers == 0: break continue if isinstance(result_info, TickInfo): req_rate_ema.on_tick(requests_sent) cpu_consumption_ema.on_tick(cpu_consumed_ns * 1e-9) continue error_msg = result_info.error_msg if error_msg: self._status_printer.pr() if self._netload: indices_clause = "" else: assert result_info.req_indices is not None indices = \ ", ".join(str(i) for i in result_info.req_indices) indices_clause = f" with indices ({indices})" logging.error( f"Request{indices_clause} failed: {error_msg}") error_map: Dict[int, str] = {} if self._rest_data_handler is not None: if error_msg is None: try: error_map = \ self._rest_data_handler.make_error_map( result_data=result_info.result_data) except Exception as ex: error_msg = f"Error decoding message " \ f"{result_info.result_data!r}: {repr(ex)}" for idx, error_msg in error_map.items(): self._status_printer.pr() logging.error(f"Request {idx} failed: {error_msg}") prev_sent = requests_sent num_requests = \ 1 if self._netload \ else len(cast(List[int], result_info.req_indices)) requests_sent += num_requests retries += result_info.retries if result_info.error_msg: requests_failed += num_requests else: requests_failed += len(error_map) if self._err_dir and result_info.req_data and \ (result_info.error_msg or error_map): try: filename = \ os.path.join( self._err_dir, datetime.datetime.now().strftime( "err_req_%y%m%d_%H%M%S_%f.json")) with open(filename, "wb") as f: f.write(result_info.req_data) except OSError as ex: error(f"Failed to write failed request file " f"'{filename}': {repr(ex)}") cpu_consumed_ns += result_info.worker_cpu_consumed_ns time_spent_sec += result_info.req_time_spent_sec if self._status_period and \ ((prev_sent // self._status_period) != (requests_sent // self._status_period)): self._status_printer.pr(status_message(intermediate=True)) finally: self._status_printer.pr() logging.info(status_message(intermediate=False)) def _print(self, s: str, newline: bool, is_error: bool) -> None: """ Print message Arguments: s -- Message to print newline -- True to go to new line, False to remain on same line """ if newline: self._status_printer.pr() (logging.error if is_error else logging.info)(s) else: self._status_printer.pr(s) def post_req_worker( url: str, retries: int, backoff: float, dry: bool, post_req_queue: multiprocessing.Queue, result_queue: "multiprocessing.Queue[ResultQueueDataType]", dry_result_data: Optional[bytes], use_requests: bool, return_requests: bool = False, delay_sec: float = 0) -> None: """ REST API POST worker Arguments: url -- REST API URL to send POSTs to retries -- Number of retries backoff -- Initial backoff window in seconds dry -- True to dry run post_req_queue -- Request queue. Elements are PostWorkerReqInfo objects corresponding to single REST API post or None to stop operation result_queue -- Result queue. Elements added are WorkerResultInfo for operation results, None to signal that worker finished use_requests -- True to use requests, False to use urllib.request return_requests -- True to return requests in WorkerResultInfo delay_sec -- Delay start by this number of seconds """ try: time.sleep(delay_sec) session: Optional[requests.Session] = \ requests.Session() if use_requests and (not dry) else None has_proc_time = hasattr(time, "process_time_ns") prev_proc_time_ns = time.process_time_ns() if has_proc_time else 0 while True: req_info: PostWorkerReqInfo = post_req_queue.get() if req_info is None: result_queue.put(None) return start_time = datetime.datetime.now() error_msg = None if dry: result_data = dry_result_data attempts = 1 else: result_data = None last_error: Optional[str] = None for attempt in range(retries + 1): if use_requests: assert session is not None try: resp = \ session.post( url=url, data=req_info.req_data, headers={ "Content-Type": "application/json; charset=utf-8", "Content-Length": str(len(req_info.req_data))}, timeout=180) if not resp.ok: last_error = \ f"{resp.status_code}: {resp.reason}" continue result_data = resp.content break except requests.RequestException as ex: last_error = repr(ex) else: req = urllib.request.Request(url) req.add_header("Content-Type", "application/json; charset=utf-8") req.add_header("Content-Length", str(len(req_info.req_data))) try: with urllib.request.urlopen( req, req_info.req_data, timeout=180) as f: result_data = f.read() break except (urllib.error.HTTPError, urllib.error.URLError, urllib.error.ContentTooShortError, OSError) \ as ex: last_error = repr(ex) time.sleep( random.uniform(0, (1 << attempt)) * backoff) else: error_msg = last_error attempts = attempt + 1 new_proc_time_ns = time.process_time_ns() if has_proc_time else 0 result_queue.put( WorkerResultInfo( req_indices=req_info.req_indices, retries=attempts - 1, result_data=result_data, error_msg=error_msg, worker_cpu_consumed_ns=(new_proc_time_ns - prev_proc_time_ns) if has_proc_time else -1, req_time_spent_sec=(datetime.datetime.now() - start_time). total_seconds(), req_data=req_info.req_data if return_requests and (not dry) else None)) prev_proc_time_ns = new_proc_time_ns except Exception as ex: logging.error(f"Worker failed: {repr(ex)}\n" f"{traceback.format_exc()}") result_queue.put(None) def get_req_worker( url: str, expected_code: Optional[int], retries: int, backoff: float, dry: bool, get_req_queue: multiprocessing.Queue, result_queue: "multiprocessing.Queue[ResultQueueDataType]", use_requests: bool) -> None: """ REST API GET worker Arguments: url -- REST API URL to send POSTs to expected_code -- None or expected non-200 status code retries -- Number of retries backoff -- Initial backoff window in seconds dry -- True to dry run get_req_queue -- Request queue. Elements are GetWorkerReqInfo objects corresponding to bunch of single REST API GETs or None to stop operation result_queue -- Result queue. Elements added are WorkerResultInfo for operation results, None to signal that worker finished use_requests -- True to use requests, False to use urllib.request """ try: if expected_code is None: expected_code = http.HTTPStatus.OK.value has_proc_time = hasattr(time, "process_time_ns") prev_proc_time_ns = time.process_time_ns() if has_proc_time else 0 session: Optional[requests.Session] = \ requests.Session() if use_requests and (not dry) else None while True: req_info: GetWorkerReqInfo = get_req_queue.get() if req_info is None: result_queue.put(None) return start_time = datetime.datetime.now() for _ in range(req_info.num_gets): error_msg = None if dry: attempts = 1 else: last_error: Optional[str] = None for attempt in range(retries + 1): if use_requests: assert session is not None try: resp = session.get(url=url, timeout=30) if resp.status_code != expected_code: last_error = \ f"{resp.status_code}: {resp.reason}" continue break except requests.RequestException as ex: last_error = repr(ex) else: req = urllib.request.Request(url) status_code: Optional[int] = None status_reason: str = "" try: with urllib.request.urlopen(req, timeout=30): pass status_code = http.HTTPStatus.OK.value status_reason = http.HTTPStatus.OK.name except urllib.error.HTTPError as http_ex: status_code = http_ex.code status_reason = http_ex.reason except (urllib.error.URLError, urllib.error.ContentTooShortError) as ex: last_error = repr(ex) if status_code is not None: if status_code == expected_code: break last_error = f"{status_code}: {status_reason}" time.sleep( random.uniform(0, (1 << attempt)) * backoff) else: error_msg = last_error attempts = attempt + 1 new_proc_time_ns = \ time.process_time_ns() if has_proc_time else 0 result_queue.put( WorkerResultInfo( retries=attempts - 1, error_msg=error_msg, worker_cpu_consumed_ns=(new_proc_time_ns - prev_proc_time_ns) if has_proc_time else -1, req_time_spent_sec=(datetime.datetime.now() - start_time). total_seconds(),)) prev_proc_time_ns = new_proc_time_ns except Exception as ex: logging.error(f"Worker failed: {repr(ex)}\n" f"{traceback.format_exc()}") result_queue.put(None) def get_idx_range(idx_range_arg: str) -> Tuple[int, int]: """ Parses --idx_range command line parameter to index range tuple """ parts = idx_range_arg.split("-", maxsplit=1) try: ret = (0, int(parts[0])) if len(parts) == 1 \ else (int(parts[0]), int(parts[1])) error_if(ret[0] >= ret[1], f"Invalid index range: {idx_range_arg}") return ret except ValueError as ex: error(f"Invalid index range syntax: {repr(ex)}") return (0, 0) # Appeasing pylint, will never happen def producer_worker( count: Optional[int], batch: int, parallel: int, req_queue: multiprocessing.Queue, netload: bool = False, min_idx: Optional[int] = None, max_idx: Optional[int] = None, rest_data_handler: Optional[RestDataHandlerBase] = None) -> None: """ POST Producer (request queue filler) Arguments: batch -- Batch size (number of requests per queue element) min_idx -- Minimum request index. None for netload max_idx -- Aftremaximum request index. None for netload count -- Optional count of requests to send. None means that requests will be sent sequentially according to range. If specified - request indices will be randomized parallel -- Number of worker processes to use dry -- Dry run rest_data_handler -- Generator/interpreter of REST request/response data req_queue -- Requests queue to fill """ try: if netload: assert count is not None for min_count in range(0, count, batch): req_queue.put( GetWorkerReqInfo(num_gets=min(count - min_count, batch))) elif count is not None: assert (min_idx is not None) and (max_idx is not None) and \ (rest_data_handler is not None) for min_count in range(0, count, batch): req_indices = \ [random.randrange(min_idx, max_idx) for _ in range(min(count - min_count, batch))] req_queue.put( PostWorkerReqInfo( req_indices=req_indices, req_data=rest_data_handler.make_req_data(req_indices))) else: assert (min_idx is not None) and (max_idx is not None) and \ (rest_data_handler is not None) for min_req_idx in range(min_idx, max_idx, batch): req_indices = list(range(min_req_idx, min(min_req_idx + batch, max_idx))) req_queue.put( PostWorkerReqInfo( req_indices=req_indices, req_data=rest_data_handler.make_req_data(req_indices))) except Exception as ex: error(f"Producer terminated: {repr(ex)}") finally: for _ in range(parallel): req_queue.put(None) def run(url: str, parallel: int, backoff: int, retries: int, dry: bool, status_period: int, batch: int, rest_data_handler: Optional[RestDataHandlerBase] = None, netload_target: Optional[str] = None, expected_code: Optional[int] = None, min_idx: Optional[int] = None, max_idx: Optional[int] = None, count: Optional[int] = None, use_requests: bool = False, err_dir: Optional[str] = None, ramp_up: Optional[float] = None, randomize: Optional[bool] = None, population_db: Optional[str] = None) -> None: """ Run the POST operation Arguments: rest_data_handler -- REST API payload data generator/interpreter. None for netload url -- REST API URL to send POSTs to (GET in case of netload) parallel -- Number of worker processes to use backoff -- Initial size of backoff windows in seconds retries -- Number of retries dry -- True to dry run status_period -- Period (in terms of count) of status message prints (0 to not at all) batch -- Batch size (number of requests per element of request queue) netload_target -- None for POST test, tested destination for netload test expected_code -- None or non-200 netload test HTTP status code min_idx -- Minimum request index. None for netload test max_idx -- Aftermaximum request index. None for netload test count -- Optional count of requests to send. None means that requests will be sent sequentially according to range. If specified - request indices will be randomized use_requests -- Use requests to send requests (default is to use urllib.request) err_dir -- None or directory for failed requests ramp_up -- Ramp up parallel streams for this number of seconds randomize -- True to random points, False for predefined points, None if irrelevant. Only used in banner printing population_db -- Population database file name or None. Only used for banner printing """ error_if(use_requests and ("requests" not in sys.modules), "'requests' Python3 module have to be installed to use " "'--no_reconnect' option") total_requests = count if count is not None \ else (cast(int, max_idx) - cast(int, min_idx)) if netload_target: logging.info(f"Netload test of {netload_target}") logging.info(f"URL: {'N/A' if dry else url}") logging.info(f"Streams: {parallel}") logging.info(f"Backoff: {backoff} sec, {retries} retries") if not netload_target: logging.info(f"Index range: {min_idx:_} - {max_idx:_}") logging.info(f"Batch size: {batch}") logging.info(f"Requests to send: {total_requests:_}") logging.info(f"Nonreconnect mode: {use_requests}") if ramp_up is not None: logging.info(f"Ramp up for: {ramp_up} seconds") if status_period: logging.info(f"Intermediate status is printed every " f"{status_period} requests completed") else: logging.info("Intermediate status is not printed") if err_dir: logging.info(f"Directory for failed requests: {err_dir}") if randomize is not None: logging.info( f"Point selection is {'random' if randomize else 'predefined'}") if population_db: logging.info(f"Point density chosen according to population database: " f"{population_db}") if dry: logging.info("Dry mode") req_queue: multiprocessing.Queue = multiprocessing.Queue() result_queue: "multiprocessing.Queue[ResultQueueDataType]" = \ multiprocessing.Queue() workers: List[multiprocessing.Process] = [] original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) ticker: Optional[Ticker] = None if err_dir: try: os.makedirs(err_dir, exist_ok=True) except OSError as ex: error(f"Failed to create directory '{err_dir}': {repr(ex)}") try: req_worker_kwargs: Dict[str, Any] = { "url": url, "backoff": backoff, "retries": retries, "dry": dry, "result_queue": result_queue, "use_requests": use_requests} req_worker: Callable if netload_target: req_worker = get_req_worker req_worker_kwargs["get_req_queue"] = req_queue req_worker_kwargs["expected_code"] = expected_code else: assert rest_data_handler is not None req_worker = post_req_worker req_worker_kwargs["post_req_queue"] = req_queue req_worker_kwargs["dry_result_data"] = \ rest_data_handler.dry_result_data(batch) req_worker_kwargs["return_requests"] = err_dir is not None for idx in range(parallel): kwargs = dict(req_worker_kwargs) if ramp_up is not None: kwargs["delay_sec"] = \ (idx * ramp_up / (parallel - 1)) if idx else 0 workers.append( multiprocessing.Process(target=req_worker, kwargs=kwargs)) workers[-1].start() workers.append( multiprocessing.Process( target=producer_worker, kwargs={"netload": netload_target is not None, "min_idx": min_idx, "max_idx": max_idx, "count": count, "batch": batch, "parallel": parallel, "rest_data_handler": rest_data_handler, "req_queue": req_queue})) workers[-1].start() ticker = Ticker(result_queue) signal.signal(signal.SIGINT, original_sigint_handler) results_processor = \ ResultsProcessor( netload=netload_target is not None, total_requests=total_requests, result_queue=result_queue, num_workers=parallel, status_period=status_period, rest_data_handler=rest_data_handler, err_dir=err_dir) results_processor.process() for worker in workers: worker.join() finally: for worker in workers: if worker.is_alive(): worker.terminate() if ticker: ticker.stop() def wait_rcache_flush(cfg: Config, args: Any, service_discovery: Optional[ServiceDiscovery]) -> None: """ Waiting for rcache preload stuff flushing to DB Arguments: cfg -- Config object args -- Parsed command line arguments service_discovery -- Optional ServiceDiscovery object """ logging.info("Waiting for updates to flush to DB") start_time = datetime.datetime.now() start_queue_len: Optional[int] = None prev_queue_len: Optional[int] = None status_printer = StatusPrinter() rcache_status_url = \ get_url(base_url=cfg.rest_api.rcache_status.url, param_host=args.rcache, service_discovery=service_discovery) while True: try: with urllib.request.urlopen(rcache_status_url, timeout=30) as f: rcache_status = json.loads(f.read()) except (urllib.error.HTTPError, urllib.error.URLError) as ex: error(f"Error retrieving Rcache status '{cfg.region.rulesest_id}' " f"using URL get_config.url : {repr(ex)}") queue_len: int = rcache_status["update_queue_len"] if not rcache_status["update_queue_len"]: status_printer.pr() break if start_queue_len is None: start_queue_len = prev_queue_len = queue_len elif (args.status_period is not None) and \ ((cast(int, prev_queue_len) - queue_len) >= args.status_period): now = datetime.datetime.now() elapsed_sec = (now - start_time).total_seconds() written = start_queue_len - queue_len total_duration = \ datetime.timedelta( seconds=start_queue_len * elapsed_sec / written) eta_dt = start_time + total_duration status_printer.pr( f"{queue_len} records not yet written. " f"Write rate {written / elapsed_sec:.2f} rec/sec. " f"ETA {eta_dt.strftime('%X')} (in " f"{int((eta_dt - now).total_seconds()) // 60} minutes)") time.sleep(1) status_printer.pr() now = datetime.datetime.now() elapsed_sec = (now - start_time).total_seconds() elapsed_str = re.sub(r"\.\d*$", "", str((now - start_time))) msg = f"Flushing took {elapsed_str}" if start_queue_len is not None: msg += f". Flush rate {start_queue_len / elapsed_sec: .2f} rec/sec" logging.info(msg) def get_afc_config(cfg: Config, args: Any, service_discovery: Optional[ServiceDiscovery]) \ -> Dict[str, Any]: """ Retrieve AFC Config for configured Ruleset ID Arguments: cfg -- Config object args -- Parsed command line arguments service_discovery -- Optional ServiceDiscovery object Returns AFC Config as dictionary """ get_config_url = \ get_url(base_url=cfg.rest_api.get_config.url, param_host=getattr(args, "rat_server", None), service_discovery=service_discovery) try: get_config_url += cfg.region.rulesest_id with urllib.request.urlopen(get_config_url, timeout=30) as f: afc_config_str = f.read().decode('utf-8') except (urllib.error.HTTPError, urllib.error.URLError) as ex: error(f"Error retrieving AFC Config for Ruleset ID " f"'{cfg.region.rulesest_id}' using URL get_config.url: " f"{repr(ex)}") try: return json.loads(afc_config_str) except json.JSONDecodeError as ex: error(f"Error decoding AFC Config JSON: {repr(ex)}") return {} # Unreachable code to appease pylint def patch_json(patch_arg: Optional[List[str]], json_dict: Dict[str, Any], data_type: str, new_type: Optional[str] = None) \ -> Dict[str, Any]: """ Modify JSON object with patches from command line Arguments: patch_arg -- Optional list of FIELD1=VALUE1[,FIELD2=VALUE2...] patches json_dict -- JSON dictionary to modify data_type -- Patch of what - to be used in error messages new_type -- None or type of new values for previously nonexisted keys returns modified dictionary """ if not patch_arg: return json_dict ret = copy.deepcopy(json_dict) for patches in patch_arg: for patch in patches.split(";"): error_if("=" not in patch, f"Invalid syntax: {data_type} setting '{patch}' doesn't " f"have '='") field, value = patch.split("=", 1) new_key = False super_obj: Any = None obj: Any = ret last_idx: Any = None idx: Any for idx in field.split("."): super_obj = obj if re.match(r"^\d+$", idx): int_idx = int(idx) error_if(not isinstance(obj, list), f"Integer index '{idx}' in {data_type} setting " f"'{patch}' is applied to nonlist entity") error_if(not (0 <= int_idx < len(obj)), f"Integer index '{idx}' in {data_type} setting " f"'{patch}' is outside of [0, {len(obj)}[ valid " f"range") idx = int_idx else: error_if(not isinstance(obj, dict), f"Key '{idx}' in {data_type} setting '{patch}' " f"can't be applied to nondictionary entity") if idx not in obj: error_if(not new_type, f"Key '{idx}' of setting '{patch}' not found " f"in {data_type}") obj[idx] = {} new_key = True obj = obj[idx] last_idx = idx error_if( (isinstance(obj, dict) and (not new_key)) or (isinstance(obj, list) and (not value.startswith("["))), f"'{field}' of {data_type} setting '{patch}' does not address " f"scalar value") try: if isinstance(obj, int) or (new_key and (new_type == "int")): try: super_obj[last_idx] = int(value) except ValueError: if new_key and (new_type == "int"): raise super_obj[last_idx] = float(value) elif isinstance(obj, float) or \ (new_key and (new_type == "float")): super_obj[last_idx] = float(value) elif isinstance(obj, bool) or \ (new_key and (new_type == "bool")): if value.lower() in ("1", "y", "t", "yes", "true", "+"): super_obj[last_idx] = True elif value.lower() in ("0", "n", "f", "no", "false", "-"): super_obj[last_idx] = False else: raise TypeError(f"'{value}' is bot a valid boolean " "representation") elif isinstance(obj, list) or \ (new_key and (new_type == "list")): super_obj[last_idx] = json.loads(value) else: super_obj[last_idx] = value except (TypeError, ValueError, json.JSONDecodeError) as ex: error(f"'{value}' of {data_type} setting '{patch}' has " f"invalid type/formatting: {repr(ex)}") return ret def patch_req(cfg: Config, args_req: Optional[List[str]]) -> Dict[str, Any]: """ Get AFC Request pattern, patched according to --req switch Arguments: cfg -- Config object args_req -- Optional --req switch value Returns patched AFC Request pattern """ req_msg_dict = json.loads(cfg.req_msg_pattern) req_dict = path_get(obj=req_msg_dict, path=cfg.paths.reqs_in_msg)[0] req_dict = patch_json(patch_arg=args_req, json_dict=req_dict, data_type="AFC Request") path_set(obj=req_msg_dict, path=cfg.paths.reqs_in_msg, value=[req_dict]) return req_msg_dict def do_preload(cfg: Config, args: Any) -> None: """ Execute "preload" command. Arguments: cfg -- Config object args -- Parsed command line arguments """ if args.dry: afc_config = {} worker_url = "" else: service_discovery = None if args.comp_proj is None \ else ServiceDiscovery(compose_project=args.comp_proj) if args.protect_cache: try: with urllib.request.urlopen( urllib.request.Request( get_url(base_url=cfg.rest_api.protect_rcache.url, param_host=args.rcache, service_discovery=service_discovery), method="POST"), timeout=30): pass except (urllib.error.HTTPError, urllib.error.URLError) as ex: error(f"Error attempting to protect Rcache from invalidation " f"using protect_rcache.url: {repr(ex)}") worker_url = \ get_url(base_url=cfg.rest_api.update_rcache.url, param_host=args.rcache, service_discovery=service_discovery) afc_config = get_afc_config(cfg=cfg, args=args, service_discovery=service_discovery) min_idx, max_idx = get_idx_range(args.idx_range) run(rest_data_handler=PreloadRestDataHandler( cfg=cfg, afc_config=afc_config, req_msg_pattern=patch_req(cfg=cfg, args_req=args.req)), url=worker_url, parallel=args.parallel, backoff=args.backoff, retries=args.retries, dry=args.dry, batch=args.batch, min_idx=min_idx, max_idx=max_idx, status_period=args.status_period, use_requests=args.no_reconnect) if not args.dry: wait_rcache_flush(cfg=cfg, args=args, service_discovery=service_discovery) def get_afc_worker_url(args: Any, base_url: str, service_discovery: Optional[ServiceDiscovery]) -> str: """ REST API URL to AFC server Arguments: cfg -- Config object base_url -- Base URL from config service_discovery -- Optional ServiceDiscovery object Returns REST API URL """ if args.localhost: error_if(not args.comp_proj, "--comp_proj parameter must be specified") m = re.search(r"0\.0\.0\.0:(\d+)->%d/tcp.*%s_dispatcher_\d+" % (80 if args.localhost == "http" else 443, re.escape(args.comp_proj)), run_docker(["ps"])) error_if(not m, "AFC port not found. Please specify AFC server address " "explicitly and remove --localhost switch") assert m is not None ret = get_url(base_url=base_url, param_host=f"localhost:{m.group(1)}", service_discovery=service_discovery) else: ret = get_url(base_url=base_url, param_host=args.afc, service_discovery=service_discovery) return ret def do_load(cfg: Config, args: Any) -> None: """ Execute "load" command Arguments: cfg -- Config object args -- Parsed command line arguments """ if args.dry: worker_url = "" else: service_discovery = None if args.comp_proj is None \ else ServiceDiscovery(compose_project=args.comp_proj) worker_url = \ get_afc_worker_url(args=args, base_url=cfg.rest_api.afc_req.url, service_discovery=service_discovery) if args.no_cache: parsed_worker_url = urllib.parse.urlparse(worker_url) worker_url = \ parsed_worker_url._replace( query="&".join( p for p in [parsed_worker_url.query, "nocache=True"] if p)).geturl() min_idx, max_idx = get_idx_range(args.idx_range) run(rest_data_handler=LoadRestDataHandler( cfg=cfg, randomize=args.random, population_db=args.population, req_msg_pattern=patch_req(cfg=cfg, args_req=args.req)), url=worker_url, parallel=args.parallel, backoff=args.backoff, retries=args.retries, dry=args.dry, batch=args.batch, min_idx=min_idx, max_idx=max_idx, status_period=args.status_period, count=args.count, use_requests=args.no_reconnect, err_dir=args.err_dir, ramp_up=args.ramp_up, randomize=args.random, population_db=args.population) def do_netload(cfg: Config, args: Any) -> None: """ Execute "netload" command. Arguments: cfg -- Config object args -- Parsed command line arguments """ expected_code: Optional[int] = None worker_url = "" if not args.dry: service_discovery = None if args.comp_proj is None \ else ServiceDiscovery(compose_project=args.comp_proj) if args.target is None: if args.localhost and (not args.rcache): args.target = "dispatcher" elif args.afc and (not (args.localhost or args.rcache)): args.target = "msghnd" elif args.target and (not (args.localhost or args.afc)): args.target = "rcache" else: error("'--target' argument must be explicitly specified") if args.target == "rcache": worker_url = \ get_url(base_url=cfg.rest_api.rcache_get.url, param_host=args.rcache, service_discovery=service_discovery) expected_code = cfg.rest_api.rcache_get.get("code") elif args.target == "msghnd": worker_url = \ get_url(base_url=cfg.rest_api.msghnd_get.url, param_host=args.afc, service_discovery=service_discovery) expected_code = cfg.rest_api.msghnd_get.get("code") else: assert args.target == "dispatcher" worker_url = \ get_afc_worker_url( args=args, base_url=cfg.rest_api.dispatcher_get.url, service_discovery=service_discovery) expected_code = cfg.rest_api.dispatcher_get.get("code") run(netload_target=args.target, url=worker_url, parallel=args.parallel, backoff=args.backoff, retries=args.retries, dry=args.dry, batch=1000, expected_code=expected_code, status_period=args.status_period, count=args.count, use_requests=args.no_reconnect) def do_cache(cfg: Config, args: Any) -> None: """ Execute "cache" command. Arguments: cfg -- Config object args -- Parsed command line arguments """ error_if(args.protect and args.unprotect, "--protect and --unprotect are mutually exclusive") error_if(not (args.protect or args.unprotect or args.invalidate), "Nothing to do") service_discovery = None if args.comp_proj is None \ else ServiceDiscovery(compose_project=args.comp_proj) json_data: Any for arg, attr, json_data in \ [("protect", "protect_rcache", None), ("invalidate", "invalidate_rcache", {}), ("unprotect", "unprotect_rcache", None)]: try: if not getattr(args, arg): continue data: Optional[bytes] = None url = get_url(base_url=getattr(cfg.rest_api, attr).url, param_host=args.rcache, service_discovery=service_discovery) req = urllib.request.Request(url, method="POST") if json_data is not None: data = json.dumps(json_data).encode(encoding="ascii") req.add_header("Content-Type", "application/json") urllib.request.urlopen(req, data, timeout=30) except (urllib.error.HTTPError, urllib.error.URLError) as ex: error(f"Error attempting to perform cache {arg} using " f"rest_api.{attr}.url: {repr(ex)}") def do_afc_config(cfg: Config, args: Any) -> None: """ Execute "afc_config" command. Arguments: cfg -- Config object args -- Parsed command line arguments """ service_discovery = ServiceDiscovery(compose_project=args.comp_proj) afc_config = get_afc_config(cfg=cfg, args=args, service_discovery=service_discovery) afc_config_str = \ json.dumps( patch_json(patch_arg=args.FIELD_VALUE, json_dict=afc_config, new_type=args.new, data_type="AFC Config")) result = \ ratdb( cfg=cfg, command=cfg.ratdb.update_config_by_id.format( afc_config=afc_config_str, region_str=afc_config["regionStr"]), service_discovery=service_discovery) error_if(not (isinstance(result, int) and result > 0), "AFC Config update failed") def do_json_config(cfg: Config, args: Any) -> None: """ Execute "afc_config" command. Arguments: cfg -- Config object args -- Parsed command line arguments """ s = json.dumps(cfg.data(), indent=2) filename = args.JSON_CONFIG if args.JSON_CONFIG else \ (os.path.splitext(Config.DEFAULT_CONFIG)[0] + Config.JSON_EXT) with open(filename, "w", encoding="utf-8") as f: f.write(s) def do_help(cfg: Config, args: Any) -> None: """ Execute "help" command. Arguments: cfg -- Config object (not used) args -- Parsed command line arguments (also contains 'argument_parser' and 'subparsers' fields) """ unused_argument(cfg) if args.subcommand is None: args.argument_parser.print_help() else: args.subparsers.choices[args.subcommand].print_help() def main(argv: List[str]) -> None: """Do the job. Arguments: argv -- Program arguments """ cfg = Config(argv=argv, arg_name="config") switches_common = argparse.ArgumentParser(add_help=False) switches_common.add_argument( "--parallel", metavar="N", type=int, default=cfg.defaults.parallel, help=f"Number of requests to execute in parallel. Default is " f"{cfg.defaults.parallel}") switches_common.add_argument( "--backoff", metavar="SECONDS", type=float, default=cfg.defaults.backoff, help=f"Initial backoff window (in seconds) to use on request failure. " f"It is doubled on each retry. Default is {cfg.defaults.backoff} " f"seconds") switches_common.add_argument( "--retries", metavar="N", type=int, default=cfg.defaults.retries, help=f"Maximum number of retries before giving up. Default is " f"{cfg.defaults.retries}") switches_common.add_argument( "--dry", action="store_true", help="Dry run (no communication with server) to estimate overhead") switches_common.add_argument( "--status_period", metavar="N", type=int, default=cfg.defaults.status_period, help=f"How often to print status information. Default is once in " f"{cfg.defaults.status_period} requests. 0 means no status print") switches_common.add_argument( "--no_reconnect", action="store_true", help="Do not reconnect on each request (requires 'requests' Python3 " "library to be installed: 'pip install requests'") switches_req = argparse.ArgumentParser(add_help=False) switches_req.add_argument( "--idx_range", metavar="[FROM-]TO", default=cfg.defaults.idx_range, help=f"Range of AP indices. FROM is initial index (0 if omitted), TO " f"is 'afterlast' index. Default is '{cfg.defaults.idx_range}'") switches_req.add_argument( "--batch", metavar="N", type=int, default=cfg.defaults.batch, help=f"Number of requests in one REST API call. Default is " f"{cfg.defaults.batch}") switches_req.add_argument( "--req", metavar="FIELD1=VALUE1[;FIELD2=VALUE2...]", action="append", default=[], help="Change field(s) in request body (compared to req_msg_pattern " "in config file). FIELD is dot-separated path to field inside request " "(e.g. 'location.ellipse.majorAxis'), VALUE is a field value, if " "field value is list - it should be surrounded by [] and formatted as " "in JSON. Several semicolon-separated settings may be specified, also " "this parameter may be specified several times") switches_count = argparse.ArgumentParser(add_help=False) switches_count.add_argument( "--count", metavar="NUM_REQS", type=int, default=cfg.defaults.count, help=f"Number of requests to send. Default is {cfg.defaults.count}") switches_afc = argparse.ArgumentParser(add_help=False) switches_afc.add_argument( "--afc", metavar="[PROTOCOL://]HOST[:port][path][params]", help="AFC Server to send requests to. Unspecified parts are taken " "from 'rest_api.afc_req' of config file. By default determined by " "means of '--comp_proj'") switches_afc.add_argument( "--localhost", nargs="?", choices=["http", "https"], const="http", help="If --afc not specified, default is to send requests to msghnd " "container (bypassing Nginx container). This flag causes requests to " "be sent to external http/https AFC port on localhost. If protocol " "not specified http is assumed") switches_rat = argparse.ArgumentParser(add_help=False) switches_rat.add_argument( "--rat_server", metavar="[PROTOCOL://]HOST[:port][path][params]", help="Server to request config from. Unspecified parts are taken " "from 'rest_api.get_config' of config file. By default determined by " "means of '--comp_proj'") switches_compose = argparse.ArgumentParser(add_help=False) switches_compose.add_argument( "--comp_proj", metavar="PROJ_NAME", help="Docker compose project name. Used to determine hosts to send " "API calls to. If not specified hostnames should be specified " "explicitly") switches_rcache = argparse.ArgumentParser(add_help=False) switches_rcache.add_argument( "--rcache", metavar="[PROTOCOL://]HOST[:port]", help="Rcache server. May also be determined by means of '--comp_proj'") # Top level parser argument_parser = argparse.ArgumentParser( description="AFC Load Test Tool") argument_parser.add_argument( "--config", metavar="[+]CONFIG_FILE", action="append", default=[], help=f"Config file. Default has same name and directory as this " f"script, but has " f"'{Config.YAML_EXT if has_yaml else Config.JSON_EXT}' extension " f"(i.e. {Config.DEFAULT_CONFIG}). May be specified several times (in " f"which case values are merged together). If prefixed with '+' - " "joined to the default config. Note that this script is accompanied " f"with default YAML config. On YAML-less Python it should be " f"converted to JSON (with 'json_config' subcommand) on some YAML-ed " f"system and copied to YAML-less one") subparsers = argument_parser.add_subparsers(dest="subcommand", metavar="SUBCOMMAND") parser_preload = subparsers.add_parser( "preload", parents=[switches_common, switches_rat, switches_req, switches_compose, switches_rcache], help="Fill rcache with (fake) responses") parser_preload.add_argument( "--protect_cache", action="store_true", help="Protect Rcache from invalidation (e.g. by ULS downloader). " "Protection persists in rcache database, see 'rcache' subcommand on " "how to unprotect") parser_preload.set_defaults(func=do_preload) parser_load = subparsers.add_parser( "load", parents=[switches_common, switches_req, switches_count, switches_compose, switches_afc], help="Do load test") parser_load.add_argument( "--no_cache", action="store_true", help="Don't use rcache, force each request to be computed") parser_load.add_argument( "--population", metavar="POPULATION_DB_FILE", help="Select AP positions proportionally to population density from " "given database (prepared with " "tools/geo_converters/make_population_db.py). Positions are random, " "so no rcache will help") parser_load.add_argument( "--err_dir", metavar="DIRECTORY", help="Directory for offending JSON AFC Requests") parser_load.add_argument( "--random", action="store_true", help="Choose AP positions randomly (makes sense only in noncached " "mode)") parser_load.add_argument( "--ramp_up", metavar="SECONDS", type=float, default=0, help="Ramp up streams for this number of seconds. Default is to start " "all at once") parser_load.set_defaults(func=do_load) parser_network = subparsers.add_parser( "netload", parents=[switches_common, switches_count, switches_compose, switches_afc, switches_rcache], help="Network load test by repeatedly querying health endpoints") parser_network.add_argument( "--target", choices=["dispatcher", "msghnd", "rcache"], help="What to test. If omitted then guess is attempted: 'dispatcher' " "if --localhost specified, 'msghnd' if --afc without --localhost is " "specified, 'rcache' if --rcache is specified") parser_network.set_defaults(func=do_netload) parser_cache = subparsers.add_parser( "cache", parents=[switches_compose, switches_rcache], help="Do something with response cache") parser_cache.add_argument( "--protect", action="store_true", help="Protect rcache from invalidation (e.g. by background ULS " "downloader). This action persists in rcache database, it need to be " "explicitly undone with --unprotect") parser_cache.add_argument( "--unprotect", action="store_true", help="Allows rcache invalidation") parser_cache.add_argument( "--invalidate", action="store_true", help="Invalidate cache (invalidation must be enabled)") parser_cache.set_defaults(func=do_cache) parser_afc_config = subparsers.add_parser( "afc_config", parents=[switches_rat], help="Modify AFC Config") parser_afc_config.add_argument( "--comp_proj", metavar="PROJ_NAME", required=True, help="Docker compose project name. This parameter is mandatory") parser_afc_config.add_argument( "--new", metavar="VALUE_TYPE", choices=["str", "int", "float", "bool", "list"], help="Allow creation of new AFC Config keys (requires respective " "changes in AFC Engine). Created keys will be of given type") parser_afc_config.add_argument( "FIELD_VALUE", nargs="+", help="One or more FIELD=VALUE clauses, where FIELD is a field name in " "AFC Config, deep field may be specified in dot-separated for m (e.g. " "'freqBands.0.startFreqMHz'). VALUE is new field value, if field " "value is list - it should be surrounded by [] and formatted as in " "JSON") parser_afc_config.set_defaults(func=do_afc_config) parser_json_config = subparsers.add_parser( "json_config", help="Convert config file from YAML to JSON for use on YAML-less " "systems") parser_json_config.add_argument( "JSON_CONFIG", nargs="?", help=f"JSON file to create. By default - same as source YAML file, " f"but with {Config.JSON_EXT} extension") parser_json_config.set_defaults(func=do_json_config) parser_help = subparsers.add_parser( "help", add_help=False, help="Prints help on given subcommand") parser_help.add_argument( "subcommand", metavar="SUBCOMMAND", nargs="?", choices=subparsers.choices, help="Name of subcommand to print help about (use " + "\"%(prog)s --help\" to get list of all subcommands)") parser_help.set_defaults(func=do_help, subparsers=subparsers, argument_parser=argument_parser, supports_unknown_args=True) if not argv: argument_parser.print_help() sys.exit(1) args = argument_parser.parse_known_args(argv)[0] if not getattr(args, "supports_unknown_args", False): args = argument_parser.parse_args(argv) # Set up logging console_handler = logging.StreamHandler() console_handler.setFormatter( logging.Formatter( f"{os.path.basename(__file__)}. %(levelname)s: %(message)s")) logging.getLogger().addHandler(console_handler) logging.getLogger().setLevel(logging.INFO) # Do the needful try: args.func(cfg, args) except KeyboardInterrupt: print("^C") sys.exit(1) if __name__ == "__main__": main(sys.argv[1:])