mirror of
https://github.com/Telecominfraproject/openafc_final.git
synced 2026-01-27 02:22:02 +00:00
841 lines
32 KiB
Python
841 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
""" Tool for testing Rcache """
|
|
#
|
|
# Copyright (C) 2023 Broadcom. All rights reserved. The term "Broadcom"
|
|
# refers solely to the Broadcom Inc. corporate affiliate that owns
|
|
# the software below. This work is licensed under the OpenAFC Project License,
|
|
# a copy of which is included with this software program
|
|
#
|
|
|
|
# pylint: disable=wrong-import-order, invalid-name, too-many-arguments
|
|
# pylint: disable=too-many-instance-attributes, too-many-statements
|
|
# pylint: disable=too-many-locals, too-many-branches
|
|
|
|
import aiohttp
|
|
import asyncio
|
|
import argparse
|
|
import copy
|
|
import datetime
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
import sqlalchemy as sa
|
|
import sqlalchemy.ext.asyncio as sa_async
|
|
import sys
|
|
import tabulate
|
|
from typing import Any, Dict, List, Optional, Set, Tuple, Union
|
|
import urllib.parse
|
|
|
|
from rcache_common import dp, error, error_if
|
|
from rcache_models import AfcReqRespKey, ApDbRecord, ApDbRespState, \
|
|
LatLonRect, RcacheInvalidateReq, RcacheSpatialInvalidateReq, \
|
|
RcacheStatus, RcacheUpdateReq
|
|
|
|
# Environment variable with connection string to Postgres DB
|
|
POSTGRES_DSN_ENV = "RCACHE_POSTGRES_DSN"
|
|
|
|
# Environment variable with URL to Rcache service
|
|
RCACHE_URL_ENV = "RCACHE_SERVICE_URL"
|
|
|
|
# Environment variable for localhost's port of Rcache service
|
|
RCACHE_PORT_ENV = "RCACHE_CLIENT_PORT"
|
|
|
|
# Default number of simultaneous streams in mas operations
|
|
DEFAULT_MASS_THREADS = 10
|
|
|
|
# Default number of request in one AFC message
|
|
DEFAULT_BATCH_SIZE = 1
|
|
|
|
# Default number of lookups
|
|
DEFAULT_LOOKUP_COUNT = 1000000
|
|
|
|
# Default periodicity of status reports
|
|
DEFAULT_PERIODICITY = 1000
|
|
|
|
# SqlAlchemy asynchronous driver name
|
|
ASYNC_DRIVER_NAME = "asyncpg"
|
|
|
|
# Name of table in RCache
|
|
TABLE_NAME = "aps"
|
|
|
|
# Number of retries
|
|
RETRIES = 6
|
|
|
|
|
|
class RrkGen:
|
|
""" Generator of request/response/key triplets """
|
|
|
|
# Minimum latitude for generated coordinates
|
|
MIN_LAT = 33
|
|
|
|
# Maximum latitude for generated coordinates
|
|
MAX_LAT = 48
|
|
|
|
# Minimum longitude for generated coordinates
|
|
MIN_LON = -116
|
|
|
|
# Maximum longitude for generated coordinates
|
|
MAX_LON = -95
|
|
|
|
# Grid (number of points in one direction) of generated coordinates
|
|
GRID_SIZE = 100000
|
|
|
|
# AFC Request template
|
|
REQUEST_TEMPLATE = json.loads("""{
|
|
"availableSpectrumInquiryRequests": [
|
|
{
|
|
"inquiredChannels": [
|
|
{ "globalOperatingClass": 131 },
|
|
{ "globalOperatingClass": 132 },
|
|
{ "globalOperatingClass": 133 },
|
|
{ "globalOperatingClass": 134 },
|
|
{ "globalOperatingClass": 136 }
|
|
],
|
|
"deviceDescriptor": {
|
|
"serialNumber": "FSP43",
|
|
"certificationId": [
|
|
{"rulesetId": "US_47_CFR_PART_15_SUBPART_E", "id": "FCCID-FSP43"}
|
|
]
|
|
},
|
|
"inquiredFrequencyRange": [
|
|
{"lowFrequency": 5925, "highFrequency": 6425},
|
|
{"lowFrequency": 6525, "highFrequency": 6875}
|
|
],
|
|
"location": {
|
|
"indoorDeployment": 2,
|
|
"elevation": {
|
|
"verticalUncertainty": 10,
|
|
"heightType": "AGL",
|
|
"height": 83
|
|
},
|
|
"ellipse": {
|
|
"center": {"latitude": 39.792935, "longitude": -105.018517},
|
|
"orientation": 45,
|
|
"minorAxis": 50,
|
|
"majorAxis": 50
|
|
}
|
|
},
|
|
"requestId": "0"
|
|
}
|
|
],
|
|
"version": "1.4"
|
|
}""")
|
|
|
|
# AFC Response template
|
|
RESPONSE_TEMPLATE = json.loads("""{
|
|
"availableSpectrumInquiryResponses": [
|
|
{
|
|
"availabilityExpireTime": "2023-08-11T16:45:44Z",
|
|
"availableChannelInfo": [
|
|
{"channelCfi": [],
|
|
"globalOperatingClass": 131,
|
|
"maxEirp": []},
|
|
{"channelCfi": [3, 11, 19, 27, 35, 43, 51, 59, 67],
|
|
"globalOperatingClass": 132,
|
|
"maxEirp": [-2.3, -2.3, 19.7, 34.8, 34.4, 34.5, 17.1, 31.2, 25.8]},
|
|
{"channelCfi": [7, 23, 39, 55, 71, 87, 135, 151, 167 ],
|
|
"globalOperatingClass": 133,
|
|
"maxEirp": [0.7, 21.8, 36, 20.2, 28.9, 36, 27.3, 32.5, 20.1]},
|
|
{"channelCfi": [15, 47, 79, 143],
|
|
"globalOperatingClass": 134,
|
|
"maxEirp": [3.8, 23.1, 32, 30.4]},
|
|
{"channelCfi": [2],
|
|
"globalOperatingClass": 136,
|
|
"maxEirp": [22.4]}
|
|
],
|
|
"availableFrequencyInfo": [
|
|
{"frequencyRange": {"highFrequency": 5945, "lowFrequency": 5925},
|
|
"maxPsd": 9.4},
|
|
{"frequencyRange": {"highFrequency": 5965, "lowFrequency": 5945},
|
|
"maxPsd": -18.4},
|
|
{"frequencyRange": {"highFrequency": 6025, "lowFrequency": 5965},
|
|
"maxPsd": -18.3},
|
|
{"frequencyRange": {"highFrequency": 6045, "lowFrequency": 6025},
|
|
"maxPsd": 7.9},
|
|
{"frequencyRange": {"highFrequency": 6065, "lowFrequency": 6045},
|
|
"maxPsd": 8},
|
|
{"frequencyRange": {"highFrequency": 6085, "lowFrequency": 6065},
|
|
"maxPsd": 18.8},
|
|
{"frequencyRange": {"highFrequency": 6105, "lowFrequency": 6085},
|
|
"maxPsd": 22.9},
|
|
{"frequencyRange": {"highFrequency": 6125, "lowFrequency": 6105},
|
|
"maxPsd": 18.3},
|
|
{"frequencyRange": {"highFrequency": 6165, "lowFrequency": 6125},
|
|
"maxPsd": 18.4},
|
|
{"frequencyRange": {"highFrequency": 6185, "lowFrequency": 6165},
|
|
"maxPsd": 18.5},
|
|
{"frequencyRange": {"highFrequency": 6205, "lowFrequency": 6185},
|
|
"maxPsd": 1.1},
|
|
{"frequencyRange": {"highFrequency": 6245, "lowFrequency": 6205},
|
|
"maxPsd": 15.1},
|
|
{"frequencyRange": {"highFrequency": 6265, "lowFrequency": 6245},
|
|
"maxPsd": 15.2},
|
|
{"frequencyRange": {"highFrequency": 6305, "lowFrequency": 6265},
|
|
"maxPsd": 9.8},
|
|
{"frequencyRange": {"highFrequency": 6325, "lowFrequency": 6305},
|
|
"maxPsd": 20.9},
|
|
{"frequencyRange": {"highFrequency": 6345, "lowFrequency": 6325},
|
|
"maxPsd": 21},
|
|
{"frequencyRange": {"highFrequency": 6425, "lowFrequency": 6345},
|
|
"maxPsd": 22.9},
|
|
{"frequencyRange": {"highFrequency": 6565, "lowFrequency": 6525},
|
|
"maxPsd": 22.9},
|
|
{"frequencyRange": {"highFrequency": 6585, "lowFrequency": 6565},
|
|
"maxPsd": 21.4},
|
|
{"frequencyRange": {"highFrequency": 6605, "lowFrequency": 6585},
|
|
"maxPsd": 21.5},
|
|
{"frequencyRange": {"highFrequency": 6625, "lowFrequency": 6605},
|
|
"maxPsd": 8.2},
|
|
{"frequencyRange": {"highFrequency": 6645, "lowFrequency": 6625},
|
|
"maxPsd": 8.3},
|
|
{"frequencyRange": {"highFrequency": 6665, "lowFrequency": 6645},
|
|
"maxPsd": 11.2},
|
|
{"frequencyRange": {"highFrequency": 6685, "lowFrequency": 6665},
|
|
"maxPsd": 13.4},
|
|
{"frequencyRange": {"highFrequency": 6705, "lowFrequency": 6685},
|
|
"maxPsd": 22.9},
|
|
{"frequencyRange": {"highFrequency": 6725, "lowFrequency": 6705},
|
|
"maxPsd": 19.3},
|
|
{"frequencyRange": {"highFrequency": 6765, "lowFrequency": 6725},
|
|
"maxPsd": 15.6},
|
|
{"frequencyRange": {"highFrequency": 6805, "lowFrequency": 6765},
|
|
"maxPsd": 12.5},
|
|
{"frequencyRange": {"highFrequency": 6845, "lowFrequency": 6805},
|
|
"maxPsd": 1.2},
|
|
{"frequencyRange": {"highFrequency": 6865, "lowFrequency": 6845},
|
|
"maxPsd": 22.9}
|
|
],
|
|
"requestId": "0",
|
|
"response": {"responseCode": 0, "shortDescription": "Success"},
|
|
"rulesetId": "US_47_CFR_PART_15_SUBPART_E"
|
|
}
|
|
],
|
|
"version": "1.4"
|
|
}""")
|
|
|
|
# Response template stripped of variable fields (filled on first use)
|
|
_STRIPPED_RESPONSE: Optional[Dict[str, Any]] = None
|
|
|
|
# 20MHz channels to chose from
|
|
CHANNELS = [1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45, 49, 53, 57, 61,
|
|
65, 69, 73, 77, 81, 85, 89, 93, 117, 121, 125, 129, 133, 137,
|
|
141, 145, 149, 153, 157, 161, 165, 169, 173, 177, 181]
|
|
|
|
@classmethod
|
|
def rrk(cls, idx: int) -> AfcReqRespKey:
|
|
""" For given request index generates correspondent AfcReqRespKey
|
|
object """
|
|
req_msg = copy.deepcopy(cls.REQUEST_TEMPLATE)
|
|
req = req_msg["availableSpectrumInquiryRequests"][0]
|
|
req["deviceDescriptor"]["serialNumber"] = "RCACHE_TOOL" + str(idx)
|
|
req["location"]["ellipse"]["center"] = \
|
|
{"latitude":
|
|
(cls.MIN_LAT +
|
|
(idx // cls.GRID_SIZE) * (cls.MAX_LAT - cls.MIN_LAT) /
|
|
cls.GRID_SIZE),
|
|
"longitude":
|
|
(cls.MIN_LON +
|
|
(idx % cls.GRID_SIZE) * (cls.MAX_LON - cls.MIN_LON) /
|
|
cls.GRID_SIZE)}
|
|
resp_msg = copy.deepcopy(cls.RESPONSE_TEMPLATE)
|
|
resp = resp_msg["availableSpectrumInquiryResponses"][0]
|
|
resp["availableChannelInfo"][0]["channelCfi"], \
|
|
resp["availableChannelInfo"][0]["maxEirp"] = \
|
|
cls._resp_channels(idx)
|
|
return AfcReqRespKey(afc_req=json.dumps(req_msg),
|
|
afc_resp=json.dumps(resp_msg),
|
|
req_cfg_digest=cls.lookup_key(idx))
|
|
|
|
@classmethod
|
|
def lookup_key(cls, idx: int) -> str:
|
|
""" For given request index generates search key """
|
|
md5 = hashlib.md5()
|
|
md5.update(str(idx).encode("ascii"))
|
|
return md5.hexdigest()
|
|
|
|
@classmethod
|
|
def validate_response(cls, idx: int, resp: str) -> bool:
|
|
""" True if given response matches given request index """
|
|
resp_dict = json.loads(resp)
|
|
if (resp_dict["availableSpectrumInquiryResponses"]["channelCfi"],
|
|
resp_dict["availableSpectrumInquiryResponses"]["maxEirp"]) != \
|
|
cls._resp_channels(idx):
|
|
return False
|
|
if cls._STRIPPED_RESPONSE is None:
|
|
cls._STRIPPED_RESPONSE = copy.deepcopy(cls.RESPONSE_TEMPLATE)
|
|
cls._resp_strip(cls._STRIPPED_RESPONSE)
|
|
if cls._resp_strip(resp_dict) != cls._STRIPPED_RESPONSE:
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def _resp_channels(cls, idx: int) -> Tuple[List[int], List[float]]:
|
|
""" Response channels for given request index """
|
|
channels: List[int] = []
|
|
bit_idx = 0
|
|
while idx:
|
|
if idx & 1:
|
|
channels.append(cls.CHANNELS[bit_idx])
|
|
idx //= 2
|
|
bit_idx += 1
|
|
return (channels, [32.] * len(channels))
|
|
|
|
@classmethod
|
|
def _resp_strip(cls, resp_msg_dict: Dict[str, Any]) -> Dict[str, Any]:
|
|
""" Strips variable fields of response object """
|
|
resp_dict = resp_msg_dict["availableSpectrumInquiryResponses"][0]
|
|
del resp_dict["availableChannelInfo"][0]["channelCfi"]
|
|
del resp_dict["requestId"]
|
|
return resp_msg_dict
|
|
|
|
|
|
class Reporter:
|
|
""" Progress reporter
|
|
|
|
Public attributes:
|
|
start_time -- Start datetime.datetime
|
|
success_count -- Number of successful requests
|
|
fail_count -- Number of failed requests
|
|
|
|
Private attributes:
|
|
_total_count -- Total count of requests that will be performed
|
|
_periodicity -- Report periodicity (e.g. 1000 - once in 1000 bumps)
|
|
_last_print_len -- Length of last single-line print
|
|
"""
|
|
|
|
def __init__(self, total_count: Optional[int] = None,
|
|
periodicity: int = DEFAULT_PERIODICITY) -> None:
|
|
""" Constructor
|
|
|
|
total_count -- Total count of requests that will be performed
|
|
periodicity -- Report periodicity (e.g. 1000 - once in 1000 bumps)
|
|
"""
|
|
self.start_time = datetime.datetime.now()
|
|
self.success_count = 0
|
|
self.fail_count = 0
|
|
self._total_count = total_count
|
|
self._periodicity = periodicity
|
|
self._last_print_len = 0
|
|
|
|
def bump(self, success: bool = True) -> None:
|
|
""" Increment success or fail count """
|
|
if success:
|
|
self.success_count += 1
|
|
else:
|
|
self.fail_count += 1
|
|
if ((self.success_count + self.fail_count) % self._periodicity) == 0:
|
|
self.report(newline=False)
|
|
|
|
def report(self, newline: bool = True) -> None:
|
|
""" Make a report print
|
|
|
|
Arguments:
|
|
newline -- True to go to next line, False to print same line as before
|
|
"""
|
|
completed = self.success_count + self.fail_count
|
|
msg = f"{completed} completed "
|
|
if self.fail_count:
|
|
msg += f"(of them {self.fail_count} " \
|
|
f"({self.fail_count * 100 / completed:0.2f}%) failed). "
|
|
if self._total_count is not None:
|
|
msg += f"{completed * 100 / self._total_count:.2f}%. "
|
|
ts: Union[int, float] = \
|
|
(datetime.datetime.now() - self.start_time).total_seconds()
|
|
rate = completed / ts
|
|
seconds = ts % 60
|
|
ts = int(ts) // 60
|
|
minutes = ts % 60
|
|
hours = ts // 60
|
|
if hours:
|
|
msg += f"{hours}:"
|
|
if hours or minutes:
|
|
msg += f"{minutes:02}:"
|
|
msg += f"{seconds:05.2f} elapsed ({rate:.2f} requests per second)"
|
|
print(msg + (" " * min(0, self._last_print_len - len(msg))),
|
|
end="\n" if newline else "\r", flush=True)
|
|
self._last_print_len = 0 if newline else len(msg)
|
|
|
|
|
|
def rcache_url(args: Any, path: str) -> str:
|
|
""" Returns Rcache REST URL
|
|
|
|
Arguments:
|
|
args -- Parsed command line arguments
|
|
path -- Path inside teh service
|
|
Returns URL
|
|
"""
|
|
return urllib.parse.urljoin(args.rcache, path)
|
|
|
|
|
|
async def fill_worker(args: Any, reporter: Reporter,
|
|
req_queue: asyncio.Queue[Optional[int]]) -> None:
|
|
""" Database fill worker
|
|
|
|
Arguments:
|
|
args -- Parsed command line arguments
|
|
reporter -- Progress reporter object
|
|
req_queue -- Queue with requests
|
|
"""
|
|
end = False
|
|
batch: List[AfcReqRespKey] = []
|
|
while not end:
|
|
item = await req_queue.get()
|
|
if item is None:
|
|
end = True
|
|
else:
|
|
batch.append(RrkGen.rrk(item))
|
|
if len(batch) < (1 if end else args.batch):
|
|
continue
|
|
update_req = RcacheUpdateReq(req_resp_keys=batch).dict()
|
|
errmsg: Optional[str] = None
|
|
if (not args.dry) or args.dry_remote:
|
|
backoff_window = 1
|
|
for _ in range(RETRIES):
|
|
errmsg = None
|
|
try:
|
|
async with aiohttp.ClientSession(trust_env=True) \
|
|
as session:
|
|
if args.dry_remote:
|
|
async with session.get(rcache_url(args,
|
|
"healthcheck")) \
|
|
as resp:
|
|
if resp.ok:
|
|
break
|
|
errmsg = f"{await resp.text()}. " \
|
|
f"Status={resp.status}. " \
|
|
f"Reason={resp.reason}"
|
|
else:
|
|
async with session.post(rcache_url(args, "update"),
|
|
json=update_req) as resp:
|
|
if resp.ok:
|
|
break
|
|
errmsg = f"{await resp.text()}. " \
|
|
f"Status={resp.status}. " \
|
|
f"Reason={resp.reason}"
|
|
except aiohttp.ClientError as ex:
|
|
errmsg = str(ex)
|
|
await asyncio.sleep(random.random() * backoff_window)
|
|
backoff_window *= 2
|
|
for _ in range(len(batch)):
|
|
reporter.bump(success=errmsg is None)
|
|
batch = []
|
|
if errmsg:
|
|
print(errmsg)
|
|
|
|
|
|
async def do_mass_fill(args: Any) -> None:
|
|
""" Execute "mass_fill" command.
|
|
|
|
Arguments:
|
|
args -- Parsed command line arguments
|
|
"""
|
|
reporter = Reporter(total_count=args.max_idx - args.min_idx)
|
|
req_queue: asyncio.Queue[Optional[int]] = asyncio.Queue()
|
|
async with asyncio.TaskGroup() as tg:
|
|
for _ in range(args.threads):
|
|
tg.create_task(
|
|
fill_worker(args=args, reporter=reporter, req_queue=req_queue))
|
|
for idx in range(args.min_idx, args.max_idx):
|
|
await req_queue.put(idx)
|
|
for _ in range(args.threads):
|
|
await req_queue.put(None)
|
|
reporter.report()
|
|
|
|
|
|
async def lookup_worker(postgres_dsn: str, metadata: Optional[sa.MetaData],
|
|
min_idx: int, max_idx: int, batch_size: int,
|
|
count: int, dry: bool, reporter: Reporter) -> None:
|
|
""" Lookup worker
|
|
|
|
Arguments:
|
|
postgres_dsn -- Postgres connection string
|
|
min_idx -- Minimum request index
|
|
max_idx -- Aftermaximum request index
|
|
batch_size -- Number of requests per message
|
|
count -- Number of lookups to perform
|
|
dry -- Don't do actual database operations
|
|
reporter -- Progress reporter object
|
|
"""
|
|
dsn_parts = urllib.parse.urlsplit(postgres_dsn)
|
|
async_dsn = \
|
|
urllib.parse.urlunsplit(
|
|
dsn_parts._replace(
|
|
scheme=f"{dsn_parts.scheme}+{ASYNC_DRIVER_NAME}"))
|
|
async_engine = None if dry else sa_async.create_async_engine(async_dsn)
|
|
dry_result = \
|
|
[ApDbRecord.from_req_resp_key(RrkGen.rrk(idx)).dict()
|
|
for idx in range(batch_size)] if dry else None
|
|
done = 0
|
|
while done < count:
|
|
bs = min(batch_size, count - done)
|
|
batch: Set[str] = set()
|
|
while len(batch) < bs:
|
|
batch.add(RrkGen.lookup_key(random.randrange(min_idx, max_idx)))
|
|
errmsg: Optional[str] = None
|
|
if dry:
|
|
assert dry_result is not None
|
|
result = \
|
|
[ApDbRecord.parse_obj(dry_result[idx]).get_patched_response()
|
|
for idx in range(len(batch))]
|
|
else:
|
|
assert async_engine is not None
|
|
assert metadata is not None
|
|
table = metadata.tables[TABLE_NAME]
|
|
backoff_window = 1
|
|
for _ in range(RETRIES):
|
|
errmsg = None
|
|
try:
|
|
s = sa.select(table).\
|
|
where((table.c.req_cfg_digest.in_(list(batch))) &
|
|
(table.c.state == ApDbRespState.Valid.name))
|
|
async with async_engine.connect() as conn:
|
|
rp = await conn.execute(s)
|
|
result = [ApDbRecord.parse_obj(rec).get_patched_response()
|
|
for rec in rp]
|
|
break
|
|
except sa.exc.SQLAlchemyError as ex:
|
|
errmsg = str(ex)
|
|
result = []
|
|
await asyncio.sleep(random.random() * backoff_window)
|
|
backoff_window *= 2
|
|
for _ in range(len(result)):
|
|
reporter.bump()
|
|
for _ in range(len(batch) - len(result)):
|
|
reporter.bump(success=False)
|
|
if errmsg:
|
|
print(errmsg)
|
|
done += len(batch)
|
|
|
|
|
|
async def do_mass_lookup(args: Any) -> None:
|
|
""" Execute "mass_lookup" command.
|
|
|
|
Arguments:
|
|
args -- Parsed command line arguments
|
|
"""
|
|
per_worker_count = args.count // args.threads
|
|
reporter = Reporter(total_count=per_worker_count * args.threads)
|
|
metadata: Optional[sa.MetaData] = None
|
|
if not args.dry:
|
|
engine = sa.create_engine(args.postgres)
|
|
metadata = sa.MetaData()
|
|
metadata.reflect(bind=engine)
|
|
engine.dispose()
|
|
async with asyncio.TaskGroup() as tg:
|
|
for _ in range(args.threads):
|
|
tg.create_task(
|
|
lookup_worker(
|
|
postgres_dsn=args.postgres, metadata=metadata,
|
|
dry=args.dry, min_idx=args.min_idx, max_idx=args.max_idx,
|
|
batch_size=args.batch, count=per_worker_count,
|
|
reporter=reporter))
|
|
reporter.report()
|
|
|
|
|
|
async def do_invalidate(args: Any) -> None:
|
|
""" Execute "invalidate" command.
|
|
|
|
Arguments:
|
|
args -- Parsed command line arguments
|
|
"""
|
|
invalidate_req: Dict[str, Any] = {}
|
|
path: str = ""
|
|
if args.enable or args.disable:
|
|
error_if(args.all or args.tile or args.ruleset or
|
|
(args.enable and args.disable),
|
|
"Incompatible parameters")
|
|
path = f"invalidation_state/{json.dumps(bool(args.enable))}"
|
|
elif args.all:
|
|
error_if(args.tile or args.ruleset, "Incompatible parameters")
|
|
invalidate_req = RcacheInvalidateReq().dict()
|
|
path = "invalidate"
|
|
elif args.ruleset:
|
|
error_if(args.tile, "Incompatible parameters")
|
|
invalidate_req = RcacheInvalidateReq(ruleset_ids=args.ruleset).dict()
|
|
path = "invalidate"
|
|
elif args.tile:
|
|
tiles: List[LatLonRect] = []
|
|
for s in args.tile:
|
|
m = \
|
|
re.search(
|
|
r"^(?P<min_lat>[0-9.+-]+),(?P<min_lon>[0-9.+-]+)"
|
|
r"(,(?P<max_lat>[0-9.+-]+)(,(?P<max_lon>[0-9.+-]+))?)?$",
|
|
s)
|
|
error_if(not m, f"Tile specification '{s}' has invalid format")
|
|
assert m is not None
|
|
try:
|
|
min_lat = float(m.group("min_lat"))
|
|
min_lon = float(m.group("min_lon"))
|
|
max_lat = float(m.group("max_lat")) if m.group("max_lat") \
|
|
else (min_lat + 1)
|
|
max_lon = float(m.group("max_lon")) if m.group("max_lon") \
|
|
else (min_lon + 1)
|
|
except ValueError:
|
|
error(not m, f"Tile specification '{s}' has invalid format")
|
|
tiles.append(
|
|
LatLonRect(min_lat=min_lat, min_lon=min_lon, max_lat=max_lat,
|
|
max_lon=max_lon))
|
|
invalidate_req = RcacheSpatialInvalidateReq(tiles=tiles).dict()
|
|
path = "spatial_invalidate"
|
|
else:
|
|
error("No invalidation type parameters specified")
|
|
async with aiohttp.ClientSession() as session:
|
|
kwargs = {}
|
|
if invalidate_req:
|
|
kwargs["json"] = invalidate_req
|
|
async with session.post(rcache_url(args, path), **kwargs) as resp:
|
|
error_if(not resp.ok,
|
|
f"Operation failed: {await resp.text()}")
|
|
|
|
|
|
async def do_precompute(args: Any) -> None:
|
|
""" Execute "precompute" command.
|
|
|
|
Arguments:
|
|
args -- Parsed command line arguments
|
|
"""
|
|
if args.enable or args.disable:
|
|
error_if((args.quota is not None) or (args.enable and args.disable),
|
|
"Only one parameter should be specified")
|
|
path = "precomputation_state"
|
|
value = bool(args.enable)
|
|
elif args.quota is not None:
|
|
path = "precomputation_quota"
|
|
value = args.quota
|
|
else:
|
|
error("At least one parameter should be specified")
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(rcache_url(args, f"{path}/{value}")) as resp:
|
|
error_if(not resp.ok, f"Operation failed: {await resp.text()}")
|
|
|
|
|
|
async def do_update(args: Any) -> None:
|
|
""" Execute "update" command.
|
|
|
|
Arguments:
|
|
args -- Parsed command line arguments
|
|
"""
|
|
error_if(args.enable == args.disable,
|
|
"Exactly one parameter should be specified")
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
rcache_url(args, f"update_state/{bool(args.enable)}")) as resp:
|
|
error_if(not resp.ok, f"Operation failed: {await resp.text()}")
|
|
|
|
|
|
async def do_status(args: Any) -> None:
|
|
""" Execute "status" command.
|
|
|
|
Arguments:
|
|
args -- Parsed command line arguments
|
|
"""
|
|
while True:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(rcache_url(args, "status")) as resp:
|
|
status = RcacheStatus.parse_obj(await resp.json())
|
|
print(tabulate.tabulate(status.dict().items(),
|
|
tablefmt="plain", colalign=("left", "right")))
|
|
if args.interval is None:
|
|
break
|
|
await asyncio.sleep(args.interval)
|
|
|
|
|
|
def do_help(args: Any) -> None:
|
|
""" Execute "help" command.
|
|
|
|
Arguments:
|
|
args -- Parsed command line arguments (also contains 'argument_parser' and
|
|
'subparsers' fields)
|
|
"""
|
|
if args.subcommand is None:
|
|
args.argument_parser.print_help()
|
|
else:
|
|
args.subparsers.choices[args.subcommand].print_help()
|
|
|
|
|
|
def main(argv: List[str]) -> None:
|
|
"""Do the job.
|
|
|
|
Arguments:
|
|
argv -- Program arguments
|
|
"""
|
|
default_postgres = os.environ.get(POSTGRES_DSN_ENV)
|
|
|
|
default_rcache = os.environ.get(RCACHE_URL_ENV)
|
|
if (default_rcache is None) and (RCACHE_PORT_ENV in os.environ):
|
|
default_rcache = f"http://localhost:{os.environ[RCACHE_PORT_ENV]}"
|
|
|
|
switches_mass = argparse.ArgumentParser(add_help=False)
|
|
switches_mass.add_argument(
|
|
"--min_idx", metavar="MIN_IDX", default=0, type=int,
|
|
help="Minimum request index for mass fill/lookup operation. Default "
|
|
"is 0")
|
|
switches_mass.add_argument(
|
|
"--max_idx", metavar="MAX_IDX", required=True, type=int,
|
|
help="Post-maximum request index for mass fill/lookup operation. This "
|
|
"parameter is mandatory")
|
|
switches_mass.add_argument(
|
|
"--threads", metavar="NUM_THREADS", default=DEFAULT_MASS_THREADS,
|
|
type=int,
|
|
help=f"Number of parallel threads during mass operation. Default is "
|
|
f"{DEFAULT_MASS_THREADS}")
|
|
switches_mass.add_argument(
|
|
"--batch", metavar="BATCH_SIZE", default=DEFAULT_BATCH_SIZE, type=int,
|
|
help=f"Batch length (corresponds to number of requests per message). "
|
|
f"Default is {DEFAULT_BATCH_SIZE}")
|
|
switches_mass.add_argument(
|
|
"--dry", action="store_true",
|
|
help="Dry run (don't do any database/service requests) to estimate "
|
|
"client-side overhead")
|
|
|
|
switches_postgres = argparse.ArgumentParser(add_help=False)
|
|
switches_postgres.add_argument(
|
|
"--postgres", metavar="CONN_STR", required=default_postgres is None,
|
|
default=default_postgres,
|
|
help="Connection string to Rcache Postgres database. " +
|
|
(f"Default is {default_postgres}" if default_postgres is not None
|
|
else "This parameter is mandatory"))
|
|
|
|
switches_rcache = argparse.ArgumentParser(add_help=False)
|
|
switches_rcache.add_argument(
|
|
"--rcache", metavar="URL", required=default_rcache is None,
|
|
default=default_rcache,
|
|
help="URL to rcache service. " +
|
|
(f"Default is {default_rcache}" if default_rcache is not None
|
|
else "This parameter is mandatory"))
|
|
|
|
argument_parser = argparse.ArgumentParser(
|
|
description="Response cache test and manipulation tool")
|
|
|
|
subparsers = argument_parser.add_subparsers(dest="subcommand",
|
|
metavar="SUBCOMMAND")
|
|
|
|
parser_mass_fill = subparsers.add_parser(
|
|
"mass_fill", parents=[switches_mass, switches_rcache],
|
|
help="Mass cache fill")
|
|
parser_mass_fill.add_argument(
|
|
"--dry_remote", action="store_true",
|
|
help="Similar to --dry, but also makes a trivial 'get'. Useful for "
|
|
"network performance estimation")
|
|
parser_mass_fill.set_defaults(func=do_mass_fill)
|
|
parser_mass_fill.set_defaults(is_async=True)
|
|
|
|
parser_mass_lookup = subparsers.add_parser(
|
|
"mass_lookup", parents=[switches_mass, switches_postgres],
|
|
help="Mass cache lookup")
|
|
parser_mass_lookup.add_argument(
|
|
"--count", metavar="MUMBER_OF_LOOKUPS", default=DEFAULT_LOOKUP_COUNT,
|
|
type=int,
|
|
help=f"How many lookups to perform. Default is "
|
|
f"{DEFAULT_LOOKUP_COUNT}")
|
|
parser_mass_lookup.add_argument(
|
|
"--check_response", action="store_true",
|
|
help="Check response content. Note that response content might change "
|
|
"because of invalidation/precomputation, thus they should be somehow "
|
|
"disabled")
|
|
parser_mass_lookup.set_defaults(func=do_mass_lookup)
|
|
parser_mass_lookup.set_defaults(is_async=True)
|
|
|
|
parser_invalidate = subparsers.add_parser(
|
|
"invalidate", parents=[switches_rcache],
|
|
help="Cache invalidation (all parameters are mutually exclusive)")
|
|
parser_invalidate.add_argument(
|
|
"--enable", action="store_true",
|
|
help="Signal rcache service to enable cache invalidation (after it "
|
|
"was previously disabled). All invalidation requests accumulated "
|
|
"while disabled are fulfilled")
|
|
parser_invalidate.add_argument(
|
|
"--disable", action="store_true",
|
|
help="Signal rcache service to disable cache invalidation")
|
|
parser_invalidate.add_argument(
|
|
"--all", action="store_true",
|
|
help="Invalidate all cache")
|
|
parser_invalidate.add_argument(
|
|
"--tile", metavar="MIN_LAT,MIN_LON[,MAX_LAT,MAX_LON]", action="append",
|
|
help="Tile to invalidate. Latitude/longitude are north/east positive "
|
|
"degrees. Maximums, if not specified, are 'plus one degree' of "
|
|
"minimums. This parameter may be specified several times")
|
|
parser_invalidate.add_argument(
|
|
"--ruleset", metavar="RULESET_ID", action="append",
|
|
help="Config ruleset ID for entries to invalidate. This parameter may "
|
|
"be specified several times")
|
|
parser_invalidate.set_defaults(func=do_invalidate)
|
|
parser_invalidate.set_defaults(is_async=True)
|
|
|
|
parser_precompute = subparsers.add_parser(
|
|
"precompute", parents=[switches_rcache],
|
|
help="Set precomputation parameters")
|
|
parser_precompute.add_argument(
|
|
"--enable", action="store_true",
|
|
help="Enable precomputation after it was previously disabled")
|
|
parser_precompute.add_argument(
|
|
"--disable", action="store_true",
|
|
help="Disable precomputation (e.g. for development purposes)")
|
|
parser_precompute.add_argument(
|
|
"--quota", metavar="N", type=int,
|
|
help="Set precompute quota - maximum number of simultaneous "
|
|
"precomputation requests")
|
|
parser_precompute.set_defaults(func=do_precompute)
|
|
parser_precompute.set_defaults(is_async=True)
|
|
|
|
parser_update = subparsers.add_parser(
|
|
"update", parents=[switches_rcache],
|
|
help="Enables/disables cache update")
|
|
parser_update.add_argument(
|
|
"--enable", action="store_true",
|
|
help="Enable update after it was previously disabled")
|
|
parser_update.add_argument(
|
|
"--disable", action="store_true",
|
|
help="Disable update. All update requests are dropped until emable")
|
|
parser_update.set_defaults(func=do_update)
|
|
parser_update.set_defaults(is_async=True)
|
|
|
|
parser_status = subparsers.add_parser(
|
|
"status", parents=[switches_rcache],
|
|
help="Print service status")
|
|
parser_status.add_argument(
|
|
"--interval", metavar="SECONDS", type=float,
|
|
help="Report status periodically with given interval (in seconds). "
|
|
"Default is to report status once")
|
|
parser_status.set_defaults(func=do_status)
|
|
parser_status.set_defaults(is_async=True)
|
|
|
|
# Subparser for 'help' command
|
|
parser_help = subparsers.add_parser(
|
|
"help", add_help=False,
|
|
help="Prints help on given subcommand")
|
|
parser_help.add_argument(
|
|
"subcommand", metavar="SUBCOMMAND", nargs="?",
|
|
choices=subparsers.choices,
|
|
help="Name of subcommand to print help about (use " +
|
|
"\"%(prog)s --help\" to get list of all subcommands)")
|
|
parser_help.set_defaults(func=do_help, subparsers=subparsers,
|
|
argument_parser=argument_parser)
|
|
|
|
if not argv:
|
|
argument_parser.print_help()
|
|
sys.exit(1)
|
|
args = argument_parser.parse_args(argv)
|
|
|
|
for predicate, argument in [("require_rcache", "rcache"),
|
|
("require_postgres", "postgres")]:
|
|
error_if(
|
|
getattr(args, predicate, False) and (not getattr(args, argument)),
|
|
f"--{argument} must be specified")
|
|
try:
|
|
if getattr(args, "is_async", False):
|
|
asyncio.run(args.func(args))
|
|
else:
|
|
args.func(args)
|
|
except SystemExit as ex:
|
|
sys.exit(1 if isinstance(ex.code, str) else ex.code)
|
|
except KeyboardInterrupt:
|
|
print("^C")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv[1:])
|