#!/usr/bin/env python3 """ Makes AFC Requests with custom FS database """ # Copyright (C) 2022 Broadcom. All rights reserved. # The term "Broadcom" refers solely to the Broadcom Inc. corporate affiliate # that owns the software below. # This work is licensed under the OpenAFC Project License, a copy of which is # included with this software program. # pylint: disable=wrong-import-order, too-many-statements, too-many-branches # pylint: disable=invalid-name, too-many-locals, logging-fstring-interpolation # pylint: disable=too-many-nested-blocks import argparse import concurrent.futures import copy import datetime import http import json import logging import os import random import re import requests import string import sys from typing import Any, Dict, List, NamedTuple, Optional, Union import yaml # Name of the default config file DEAFULT_CONFIG = os.path.realpath(os.path.splitext(__file__)[0] + ".yaml") # Pydantic wouldn't be out of place here... # Config key for default number of parallel requests DEFAULT_PARALLEL_KEY = "default_parallel" # Config key for default request timeout in minutes DEFAULT_TIMEOUT_MIN_KEY = "default_timeout_min" # Config key for request format REQ_ID_FORMAT_KEY = "req_id_format" # Config key for request pattern REQ_PATTERN_KEY = "req_pattern" # Config key for region definition patterns REGION_PATTERNS_KEY = "region_patterns" # Config key for path definitions PATHS_KEY = "paths" # Config subkey for path to Request ID in AFC Request JSON REQ_ID_PATH_KEY = "request_id" # Config subkey for path to region definition in AFC Request JSON REGION_PATH_KEY = "region" # Config subkey for path to coordinates in AFC Request JSON COORD_PATH_KEY = "coordinates" # Config subkey for path to FS Database in AFC Request JSON FS_DATABASE_PATH_KEY = "fs_database" # Config subkey for path to response code in AFC Response JSON RESPONSE_CODE_PATH_KEY = "response_code" # Config subkey in response description in AFC Response JSON RESPONSE_DESC_PATH_KEY = "response_desc" # Config subkey in response description in AFC Response JSON RESPONSE_SUPP_PATH_KEY = "response_supp" # Config subkey for descrition substring of statuses to ignore SUCCESS_STATUSES_KEY = "success_statuses" # Config subkey for code of ignored status STATUS_CODE_KEY = "code" # Config subkey for description of ignore dstatus STATUS_DESC_KEY = "desc_substr" # Config key for point definition patterns POINT_PATTERNS_KEY = "point_patterns" # Config subkey for point name POINT_NAME_KEY = "name" # Config subkey for point coordinates POINT_COORD_DICT_KEY = "coordinates" # Config subkey for point latitude POINT_COORD_LAT_KEY = "latitude" # Config subkey to point longitude POINT_COORD_LON_KEY = "longitude" def error(msg: str) -> None: """ Prints given msg as error message and exits abnormally """ logging.error(msg) sys.exit(1) def error_if(cond: Any, msg: str) -> None: """ If condition evaluates to true prints given msg as error message and exits abnormally """ if cond: error(msg) def json_substitute(json_obj: Union[List[Any], Dict[str, Any]], path: List[Union[int, str]], value: Any) -> None: """ Substitute JSON element at given path with given value Element at given path must exist and its current value should be 'null' Arguments: json_obj -- JSON Dictionary. Updated inplace path -- Path (list of indices) leading to desired element value -- New value """ error_if(not isinstance(json_obj, dict), "Attempt to JSON-substitute in non-JSON object") error_if(len(path) == 0, "JSON substitution path may not be empty") container: Optional[Union[List[Any], Dict[str, Any]]] = None original_obj = json_obj for idx, path_elem in enumerate(path): if isinstance(path_elem, int): error_if(not (isinstance(json_obj, list) and (0 <= path_elem < len(json_obj))), f"Path {path[:idx + 1]} is invalid for '{original_obj}'") elif isinstance(path_elem, str): error_if(not (isinstance(json_obj, dict) and (path_elem in json_obj)), f"Path {path[:idx + 1]} is invalid for '{original_obj}'") else: error(f"Path index '{path_elem}' has invalid type. Must be string " f"or integer") container = json_obj json_obj = json_obj[path_elem] error_if(json_obj is not None, f"Path {path} is invalid for '{original_obj}' - must end on " f"'null' element, instead ends on '{json_obj}'") container[path[-1]] = value def json_retrieve(json_obj: Optional[Union[List[Any], Dict[str, Any]]], path=List[Union[int, str]]) -> Any: """ Try to read value at given path in given JSON Arguments: json_obj -- JSON object to read from path -- Path (sequence of indices) leading to desired element Returns element value or None """ original_obj = json_obj for idx, path_elem in enumerate(path): if json_obj is None: return None if isinstance(path_elem, int): error_if(not isinstance(json_obj, list), f"Path {path[:idx + 1]} is invalid for '{original_obj}'") if not (0 <= path_elem < len(json_obj)): return None elif isinstance(path_elem, str): error_if(not isinstance(json_obj, dict), f"Path {path[:idx + 1]} is invalid for '{original_obj}'") if path_elem not in json_obj: return None else: error(f"Path index '{path_elem}' has invalid type. Must be string " f"or integer") json_obj = json_obj[path_elem] return json_obj # Response from request worker function ResponseInfo = \ NamedTuple( "ResponseInfo", [ # AFC Response object or None ("afc_response", Optional[Dict[str, Any]]), # True if there was a timeout ("timeout", bool), # HTTP Status code ("status_code", Optional[int]), # Request duration in seconds ("duration_sec", int)]) def do_request(req: Dict[str, Any], url: str, timeout_sec: float) \ -> ResponseInfo: """ Thread worker function that performs AFC Request Arguments req -- AFC Request JSON dictionary url -- Request URL timeout_sec -- Timeout in seconds Returns ResponseInfo object """ timeout = False start_time = datetime.datetime.now() result: Optional[requests.Response] = None try: result = requests.post(url=url, json=req, timeout=timeout_sec) except requests.Timeout: timeout = True return \ ResponseInfo( afc_response=result.json() if (result is not None) and (result.status_code == http.HTTPStatus.OK) else None, timeout=timeout, status_code=result.status_code if result is not None else None, duration_sec=int((datetime.datetime.now() - start_time).total_seconds())) class ReqInfo(NamedTuple): """ Request information - request itself and stuff for its prettyprinting """ # Point name (City) name: str # Point region (Country Code) region: str # AFC Request for point in form of JSON dictionary req: Dict[str, Any] # Point latitude in north-positive degrees lat: float # Point longitude in east-positive degrees lon: float def point_info(self) -> str: """ Point information for messages """ return f"{self.name} ({self.region} @ " \ f"{abs(self.lat)}{'N' if self.lat >= 0 else 'S'}, " \ f"{abs(self.lon)}{'E' if self.lon >= 0 else 'W'})" def main(argv: List[str]) -> None: """Do the job. Arguments: argv -- Program arguments """ argument_parser = \ argparse.ArgumentParser( description="Makes AFC Requests with custom FS database") argument_parser.add_argument( "--config", metavar="CONGIG", default=DEAFULT_CONFIG, help=f"Config file name. Default is '{DEAFULT_CONFIG}'") argument_parser.add_argument( "--server_url", metavar="URL", required=True, help="URL of AFC Service to use (should accept " "openAfc.overrideAfcConfig Vendor Extension)") argument_parser.add_argument( "--region", metavar="REGION", action="append", default=[], help="Region to run tests for. This parameter may be specified " "several times. Default is to run for all regions") argument_parser.add_argument( "--parallel", metavar="NUM_PARALLEL_REQS", type=int, help="Number of parallel requests. Default see in the config file") argument_parser.add_argument( "--timeout_min", metavar="MINUTES", type=float, help="Request timeout in minutes. Default see in the config file") argument_parser.add_argument( "--point", metavar="LAT_DEG,LON_DEG OR NAME", action="append", help="Run request for given point. Point specified either by " "coordinates (north/east positive degrees) or by name in config file. " "In former case exactly one --region should be specified, in latter " "case - only if it is required for disambiguation. This parameter may " "be specified several times") argument_parser.add_argument( "--failed_json", metavar="FILENAME", help="Write json that caused fail to file") argument_parser.add_argument( "FS_DATABASE", help="FS (aka ULS) database (.sqlite3 file) to use. " "Should have exactly same path as AFC Config requires") if not argv: argument_parser.print_help() sys.exit(1) args = argument_parser.parse_args(argv) console_handler = logging.StreamHandler() console_handler.setFormatter( logging.Formatter( f"{os.path.basename(__file__)}. " f"%(levelname)s: %(asctime)s %(message)s")) logging.getLogger().addHandler(console_handler) logging.getLogger().setLevel(logging.INFO) error_if(not os.path.isfile(args.config), f"Config file '{args.config}' not found") with open(args.config, mode="r", encoding="utf-8") as f: config = \ yaml.load( f.read(), Loader=yaml.CLoader if hasattr(yaml, "CLoader") else yaml.Loader) if args.region: error_if( not all(r in config[REGION_PATTERNS_KEY] for r in args.region), f"Unknown region code '{args.region[0]}'. Known regions are: " f"{', '.join(sorted(config[REGION_PATTERNS_KEY].keys()))}") start_time = datetime.datetime.now() req_pattern_s = config[REQ_PATTERN_KEY] try: req_pattern = json.loads(req_pattern_s) except json.JSONDecodeError as ex: error(f"Syntax error in config's request pattern: {ex}") # Building per-region lists of points to send requests for region_points: Dict[str, List[Dict[str, Any]]] = {} if args.point: for point in args.point: m = re.match(r"^([0-9+-.]+),([0-9+-.]+)$", point) point_dict: Dict[str, Any] if m: error_if(len(args.region) != 1, "One '--region' parameter must be specified along " "with coordinate-based '--point' parameter") point_dict = {POINT_NAME_KEY: point} for field, value_s in [(POINT_COORD_LAT_KEY, m.group(1)), (POINT_COORD_LON_KEY, m.group(2))]: try: point_dict.setdefault(POINT_COORD_DICT_KEY, {})[field] = float(value_s) except ValueError: error( f"Wrong structure of point coordinates '{point}'") region_points.setdefault(args.region[0], []).append(point_dict) else: found = False for region in config[POINT_PATTERNS_KEY]: if args.region and region not in args.region: continue for point_dict in config[POINT_PATTERNS_KEY][region]: if point_dict[POINT_NAME_KEY] == point: region_points.setdefault(region, []).append(point_dict) found = True error_if(not found, f"Point '{point}' not found in config file") else: for region in config[POINT_PATTERNS_KEY]: error_if(region not in config[REGION_PATTERNS_KEY], f"Config file's '{POINT_PATTERNS_KEY}' section contains " f"region code of '{region}', not found in config's " f"'{REGION_PATTERNS_KEY}' section") if args.region and (region not in args.region): continue region_points[region] = config[POINT_PATTERNS_KEY][region] request_timeout_sec = \ 60 * (args.timeout_min if args.timeout_min is not None else config[DEFAULT_TIMEOUT_MIN_KEY]) max_workers = args.parallel if args.parallel is not None \ else config[DEFAULT_PARALLEL_KEY] success = False with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) \ as executor: try: paths: Dict[str, List[Union[str, int]]] = config[PATHS_KEY] future_to_req_info: Dict[concurrent.futures.Future, ReqInfo] = {} # Starting requests for region, point_list in region_points.items(): for point in point_list: req = copy.deepcopy(req_pattern) json_substitute( json_obj=req, path=paths[REQ_ID_PATH_KEY], value=config[REQ_ID_FORMAT_KEY].format( req_idx=len(future_to_req_info), region=region, random_str="".join( random.choices( string.ascii_uppercase + string.digits, k=10)))) json_substitute(json_obj=req, path=paths[REGION_PATH_KEY], value=config[REGION_PATTERNS_KEY][region]) json_substitute(json_obj=req, path=paths[COORD_PATH_KEY], value=point[POINT_COORD_DICT_KEY]) json_substitute(json_obj=req, path=paths[FS_DATABASE_PATH_KEY], value=args.FS_DATABASE) future_to_req_info[ executor.submit( do_request, req=req, url=args.server_url, timeout_sec=request_timeout_sec )] = ReqInfo(name=point[POINT_NAME_KEY], region=region, req=req, lat=point[POINT_COORD_DICT_KEY][ POINT_COORD_LAT_KEY], lon=point[POINT_COORD_DICT_KEY][ POINT_COORD_LON_KEY]) # Processing finished requests for future in concurrent.futures.as_completed(future_to_req_info): req_info = future_to_req_info[future] try: exception = future.exception() error_if(exception is not None, f"Request '{req_info.point_info()}' ended with " f"exception: {exception}") response_info: Optional[ResponseInfo] = future.result() assert response_info is not None error_if(response_info.timeout, f"Request '{req_info.point_info()}' timed out") error_if(response_info.status_code != http.HTTPStatus.OK, f"Request '{req_info.point_info()}' ended with " f"status code {response_info.status_code}") response_code = \ json_retrieve(json_obj=response_info.afc_response, path=paths[RESPONSE_CODE_PATH_KEY]) response_desc = \ json_retrieve(json_obj=response_info.afc_response, path=paths[RESPONSE_DESC_PATH_KEY]) response_supp = \ json_retrieve(json_obj=response_info.afc_response, path=paths[RESPONSE_SUPP_PATH_KEY]) for ss in config[SUCCESS_STATUSES_KEY]: if (STATUS_CODE_KEY in ss) and \ (ss[STATUS_CODE_KEY] != response_code): continue if (STATUS_DESC_KEY in ss) and \ (ss[STATUS_DESC_KEY] not in (response_desc or "")): continue break else: response_desc_msg = "" if response_desc or response_supp: response_desc_msg = \ " (" + \ ", ".join(str(v) for v in [response_desc, response_supp] if v) + \ ")" error(f"Request '{req_info.point_info()}' ended with " f"AFC response code " f"{response_code}{response_desc_msg}") success_verdict = "successfully completed" \ if response_code == 0 else "failed without prejudice" logging.info( f"Request '{req_info.point_info()}' {success_verdict} " f"in {response_info.duration_sec} seconds") except KeyboardInterrupt: break except: # noqa if args.failed_json: with open(args.failed_json, mode="w", encoding="utf-8") as f: json.dump(req_info.req, f, indent=4, sort_keys=False) logging.info( f"Offending request JSON written to " f"'{os.path.realpath(args.failed_json)}'. Happy " f"curling") raise success = True except KeyboardInterrupt: pass finally: if not success: logging.info("Waiting for completion of in-progress requests") executor.shutdown(wait=True, cancel_futures=True) elapsed_seconds = \ int((datetime.datetime.now() - start_time).total_seconds()) logging.info(f"{len(future_to_req_info)} requests were processed in " f"{elapsed_seconds // 60} min {elapsed_seconds % 60} sec") if __name__ == "__main__": main(sys.argv[1:])