Files
openafc_final/als/als_siphon.py
2024-03-25 10:11:24 -04:00

3809 lines
157 KiB
Python
Executable File

#!/usr/bin/env python3
"""Tool for moving data from Kafka to PostgreSQL/PostGIS database """
# Copyright (C) 2022 Broadcom. All rights reserved.
# The term "Broadcom" refers solely to the Broadcom Inc. corporate affiliate
# that owns the software below.
# This work is licensed under the OpenAFC Project License, a copy of which is
# included with this software program.
# pylint: disable=raise-missing-from, logging-fstring-interpolation
# pylint: disable=too-many-lines, invalid-name, consider-using-f-string
# pylint: disable=unnecessary-pass, unnecessary-ellipsis, too-many-arguments
# pylint: disable=too-many-instance-attributes, too-few-public-methods
# pylint: disable=wrong-import-order, too-many-locals, too-many-branches
from abc import ABC, abstractmethod
import argparse
from collections.abc import Iterable
import confluent_kafka
import enum
import datetime
import dateutil.tz # type: ignore
import geoalchemy2 as ga # type: ignore
import hashlib
import heapq
import inspect
import json
import logging
import lz4.frame # type: ignore
import math
import os
import prometheus_client # type: ignore
import random
import re
import sqlalchemy as sa # type: ignore
import sqlalchemy.dialects.postgresql as sa_pg # type: ignore
import string
import sys
from typing import Any, Callable, Dict, Generic, List, NamedTuple, Optional, \
Set, Tuple, Type, TypeVar, Union
import uuid
# This script version
VERSION = "0.1"
# Kafka topic for ALS logs
ALS_KAFKA_TOPIC = "ALS"
# Type for JSON objects
JSON_DATA_TYPE = Union[Dict[str, Any], List[Any]]
# Type for database column
COLUMN_DATA_TYPE = Optional[Union[int, float, str, bytes, bool,
datetime.datetime, uuid.UUID, Dict, List]]
# Type for database row dictionary
ROW_DATA_TYPE = Dict[str, COLUMN_DATA_TYPE]
# Type for database operation result row
RESULT_ROW_DATA_TYPE = Dict[int, ROW_DATA_TYPE]
# Default port for Kafka servers
KAFKA_PORT = 9092
# Default Kafka server
DEFAULT_KAFKA_SERVER = f"localhost:{KAFKA_PORT}"
# Default Kafka client ID
DEFAULT_KAFKA_CLIENT_ID = "siphon_@"
def dp(*args, **kwargs):
"""Print debug message
Arguments:
args -- Format and positional arguments. If latter present - formatted
with %
kwargs -- Keyword arguments. If present formatted with format()
"""
msg = args[0] if args else ""
if len(args) > 1:
msg = msg % args[1:]
if args and kwargs:
msg = msg.format(**kwargs)
fi = inspect.getframeinfo(inspect.currentframe().f_back)
timetag = datetime.datetime.now()
print(
f"DP {timetag.hour:02}:{timetag.minute:02}:{timetag.second:02}."
f"{timetag.microsecond // 1000:02} {fi.function}()@{fi.lineno}: {msg}",
flush=True)
def error(msg: str) -> None:
""" Prints given msg as error message and exit abnormally """
logging.error(msg)
sys.exit(1)
def error_if(cond: Any, msg: str) -> None:
""" If condition evaluates to true prints given msg as error message and
exits abnormally """
if cond:
error(msg)
class LineNumber:
""" Utility functions around line numbers """
# Function names to ignore by LineNumber.exc()
_EXC_STACK_IGNORE: Set[str] = set()
@classmethod
def exc(cls) -> Optional[int]:
""" Line number of last exception """
def is_ignored_tb(t: Any) -> bool:
""" True if given frame should be ignored, as it is not in this
module or marked to be ignored """
f_code = t.tb_frame.f_code
return (os.path.basename(f_code.co_filename) !=
os.path.basename(__file__)) or \
(f_code.co_name in cls._EXC_STACK_IGNORE)
last_local_line: Optional[int] = None
tb = sys.exc_info()[2]
while tb is not None:
if not is_ignored_tb(tb):
last_local_line = tb.tb_lineno
tb = tb.tb_next
return last_local_line
@classmethod
def current(cls) -> int:
""" Current caller's line number """
f = inspect.currentframe()
assert (f is not None) and (f.f_back is not None)
return inspect.getframeinfo(f.f_back).lineno
@classmethod
def stack_trace_ignore(cls, func):
""" Decorator to mark functions for ignore in exc() """
cls._EXC_STACK_IGNORE.add(func.__name__)
return func
class ErrorBase(Exception):
""" Base class for exceptions in this module
Attributes
msg -- Diagnostics message
code_line -- Source code line number
data -- Optional pertinent data - string or JSON dictionary
"""
def __init__(self, msg: str, code_line: Optional[int],
data: Optional[Union[str, bytes, JSON_DATA_TYPE]] = None) \
-> None:
""" Constructor
Arguments:
msg -- Diagnostics message
code_line -- Source code line number
data -- Optional pertinent data - string or JSON dictionary
"""
super().__init__(msg)
self.msg = msg
self.code_line = code_line
self.data = data
class AlsProtocolError(ErrorBase):
""" Exception class for errors in ALS protocol """
pass
class JsonFormatError(ErrorBase):
""" Exception class for errors in AFC response/request/config JSON data """
pass
class DbFormatError(ErrorBase):
""" Exception class for DB format inconsistencies error """
def __init__(self, msg: str, code_line: Optional[int]) -> None:
""" Constructor
Arguments:
msg -- Diagnostics message
code_line -- Source code line number
"""
super().__init__(msg, code_line=code_line)
# MYPY APPEASEMENT STUFF
@LineNumber.stack_trace_ignore
def ms(s: Any) -> str:
""" Makes sure that given value is a string.
Asserts otherwise"""
assert isinstance(s, str)
return s
@LineNumber.stack_trace_ignore
def js(s: Any) -> str:
""" Makes sure that given value is a string.
raises TypeError otherwise """
if not isinstance(s, str):
raise TypeError(f"Unexpected '{s}'. Should be string")
return s
@LineNumber.stack_trace_ignore
def jb(b: Any) -> bytes:
""" Makes sure that given value is a bytestring.
raises TypeError otherwise """
if not isinstance(b, bytes):
raise TypeError(f"Unexpected '{b}'. Should be bytes")
return b
@LineNumber.stack_trace_ignore
def ji(i: Any) -> int:
""" Makes sure that given value is an integer.
raises TypeError otherwise """
if not isinstance(i, int):
raise TypeError(f"Unexpected '{i}'. Should be integer")
return i
@LineNumber.stack_trace_ignore
def jd(d: Any) -> dict:
""" Makes sure that given value is a dictionary.
raises TypeError otherwise """
if not isinstance(d, dict):
raise TypeError(f"Unexpected '{d}'. Should be dictionary")
return d
@LineNumber.stack_trace_ignore
def jod(d: Any) -> Optional[dict]:
""" Makes sure that given value is a dictionary or None.
raises TypeError otherwise """
if not ((d is None) or isinstance(d, dict)):
raise TypeError(f"Unexpected '{d}'. Should be optional dictionary")
return d
@LineNumber.stack_trace_ignore
def jl(ll: Any) -> list:
""" Makes sure that given value is a list.
raises TypeError otherwise """
if not isinstance(ll, list):
raise TypeError(f"Unexpected '{ll}'. Should be list")
return ll
@LineNumber.stack_trace_ignore
def ju(u: Any) -> uuid.UUID:
""" Makes sure that given value is an UUID.
raises TypeError otherwise """
if not isinstance(u, uuid.UUID):
raise TypeError(f"Unexpected '{u}'. Should be UUID")
return u
@LineNumber.stack_trace_ignore
def jdt(dt: Any) -> datetime.datetime:
""" Makes sure that given value is a datetime.datetime object.
raises TypeError otherwise """
if not isinstance(dt, datetime.datetime):
raise TypeError(f"Unexpected '{dt}'. Should be datetime")
return dt
def get_month_idx() -> int:
""" Computes month index """
d = datetime.datetime.now()
return (d.year - 2022) * 12 + (d.month - 1)
class Metrics:
""" Wrapper around collection of Prometheus metrics
Private attributes:
_metrics -- Dictionary of _metric objects, indexed by metric name
"""
# Value for 'id' label of all metrics
INSTANCE_ID = \
"".join(random.choices(string.ascii_uppercase + string.digits, k=10))
class MetricDef(NamedTuple):
""" Metric definition """
# Metric type ("Counter", "Summary", "Histogram" or "Info"
metric_type: str
# Metric name
name: str
# Metric description
dsc: str
# Optional list of additional labels (besides 'id')
labels: Optional[List[str]] = None
# Optional bucket list for histogram
buckets: Optional[List[float]] = None
class _Metric:
""" Metric wrapper.
Holds metric object, its call operator returns value that 'labels()',
of wrapped metric returns
Private attributes:
_metric -- Metric objects
_labels -- List of additional (besides 'id') label names
"""
def __init__(self, metric_def: "Metrics.MetricDef") -> None:
""" Constructor
Arguments:
metric_def -- Metric definition
"""
metric_class = getattr(prometheus_client, metric_def.metric_type)
assert metric_class is not None
self._labels: List[str] = metric_def.labels or []
kwargs: Dict[str, Any] = {}
if metric_def.buckets is not None:
kwargs["buckets"] = metric_def.buckets
self._metric = \
metric_class(metric_def.name, metric_def.dsc,
["id"] + self._labels, **kwargs)
def __call__(self, *args, **kwargs) -> Any:
""" Returns output value of 'labels()' of wrapped metric.
Arguments are values of additional labels
"""
assert (len(args) + len(kwargs)) == len(self._labels)
return self._metric.labels(
Metrics.INSTANCE_ID,
*[self._arg_to_str(arg) for arg in args],
**{arg_name: self._arg_to_str(arg_value)
for arg_name, arg_value in kwargs.items()})
def _arg_to_str(self, arg: Any) -> str:
""" Somehow onvert argument value to string """
if isinstance(arg, bytes):
return arg.decode(encoding="utf-8", errors="backslashreplace")
return str(arg)
def __init__(self,
metric_defs: List[Union["Metrics.MetricDef", Tuple]]) -> None:
""" Constructor
Arguments:
metric_def -- List of metric definitions
"""
self._metrics: Dict[str, "Metrics._Metric"] = {}
for md in metric_defs:
if not isinstance(md, self._Metric):
md = self.MetricDef(*md)
assert md.name not in self._metrics
self._metrics[md.name] = self._Metric(md)
def __getattr__(self, name: str) -> Any:
""" Metric lookup by name as attribute """
metric = self._metrics.get(name)
assert metric is not None
return metric
class BytesUtils:
""" Various bytestring-related conversions """
@classmethod
def json_to_bytes(cls, j: JSON_DATA_TYPE) -> bytes:
""" Converts JSON dictionary to bytes.
Representation is the most compact: no whitespaces
Arguments:
j -- Dictionary or list
Returns UTF8 bytes string
"""
return json.dumps(j, separators=(',', ':')).encode("utf-8")
@classmethod
def text_to_uuid(cls, text: str) -> uuid.UUID:
""" Computes UUID, generated from text's MD5 digest
Arguments:
text -- Text to compute UUID of
Returns UUID, made from MD5, computed from UTF8-encoded text
"""
return uuid.UUID(bytes=hashlib.md5(text.encode("utf-8")).digest())
@classmethod
def json_to_uuid(cls, j: JSON_DATA_TYPE) -> uuid.UUID:
""" Computes UUID, generated from JSON MD5 digest.
JSON first converted to compact (non spaces) string representation,
then to UTF-8 encoded bytes, then MD5 is computed
Arguments:
j -- Dictionary or listUUIDbytes string
Returns UUID
"""
return uuid.UUID(bytes=hashlib.md5(cls.json_to_bytes(j)).digest())
class DatabaseBase(ABC):
""" Base class of database handlers
Attributes:
engine -- Database engine
metadata -- Database metadata
conn -- Default database connection
db_name -- Database name
_disposed -- True if disposed
"""
# Driver part to use in Postgres database connection strings
DB_DRIVER = "postgresql+psycopg2://"
# Parts of connection string
ConnStrParts = \
NamedTuple(
"ConnStrParts",
# Driver part with trailing '://'
[("driver", str),
# Username
("user", str),
# Host name
("host", str),
# Port number
("port", str),
# Database name
("database", str),
# Parameters with leading '?' or empty string
("params", str)])
def __init__(self, arg_conn_str: Optional[str],
arg_password: Optional[str]) -> None:
""" Constructor
Arguments:
arg_conn_str -- Connection string from command line or None
arg_password -- Password from command line or None
"""
self._disposed = True
try:
self.engine = \
self.create_engine(arg_conn_str=arg_conn_str,
arg_password=arg_password)
self.db_name = \
self.parse_conn_str(arg_conn_str=arg_conn_str).database
self.metadata = sa.MetaData()
self.metadata.reflect(bind=self.engine)
self._disposed = False
self.conn = self.engine.connect()
logging.info(f"{self.name_for_logs()} database connected")
except sa.exc.SQLAlchemyError as ex:
error(f"Can't open {self.name_for_logs()} database: {ex}")
def dispose(self) -> None:
""" Explicit cleanup """
if not self._disposed:
self._disposed = True
self.engine.dispose()
@classmethod
def parse_conn_str(cls, arg_conn_str: Optional[str]) \
-> "DatabaseBase.ConnStrParts":
""" Parse argument/default connection string
Arguments:
arg_conn_str -- Connection string from command line or None
Returns ConnStrParts
"""
cs_re = \
re.compile(
r"^((?P<driver>[^/ ]+)://)?"
r"(?P<user>[^@/ ]+?)?"
r"(@(?P<host>[^:/ ]+)(:(?P<port>\d+))?)?"
r"(/(?P<database>[^?]+))?"
r"(?P<params>\?\S.+)?$")
m_arg = cs_re.match((arg_conn_str or "").strip())
error_if(not m_arg,
f"{cls.name_for_logs()} database connection string "
f"'{arg_conn_str or ''}' has invalid format")
assert m_arg is not None
m_def = cs_re.match(cls.default_conn_str())
assert m_def is not None
error_if(m_arg.group("driver") and
(m_arg.group("driver") != m_def.group("driver")),
f"{cls.name_for_logs()} database connection string has "
f"invalid database driver name '{m_arg.group('driver')}'. "
f"If specified it must be '{m_def.group('driver')}'")
return \
cls.ConnStrParts(
driver=m_def.group("driver"),
user=m_arg.group("user") or m_def.group("user"),
host=m_arg.group("host") or m_def.group("host"),
port=m_arg.group("port") or m_def.group("port"),
database=m_arg.group("database") or m_def.group("database"),
params=(m_arg.group("params") or m_def.group("params")) or "")
@classmethod
def create_engine(cls, arg_conn_str: Optional[str],
arg_password: Optional[str]) -> sa.engine.base.Engine:
""" Creates SqlAlchemy engine
Arguments:
arg_conn_str -- Connection string from command line or None
arg_password -- Password from command line or None
Returns SqlAlchemy engine
"""
conn_str_parts = cls.parse_conn_str(arg_conn_str=arg_conn_str)
return \
sa.create_engine(
f"{cls.DB_DRIVER}"
f"{conn_str_parts.user}"
f"{(':' + arg_password) if arg_password else ''}"
f"@{conn_str_parts.host}:{conn_str_parts.port}/"
f"{conn_str_parts.database}{conn_str_parts.params}")
@classmethod
@abstractmethod
def default_conn_str(cls) -> str:
""" Default connection string """
...
@classmethod
@abstractmethod
def name_for_logs(cls) -> str:
""" Database alias name to use for logs and error messages """
...
class AlsDatabase(DatabaseBase):
""" ALS Database handler """
@classmethod
def default_conn_str(cls) -> str:
""" Default connection string """
return "postgresql://postgres@localhost:5432/ALS"
@classmethod
def name_for_logs(cls) -> str:
""" Database alias name to use for logs and error messages """
return "ALS"
class LogsDatabase(DatabaseBase):
""" Log database handler """
# Log record with data to write to database
Record = NamedTuple("Record", [("source", str),
("time", datetime.datetime),
("log", JSON_DATA_TYPE)])
@classmethod
def default_conn_str(cls) -> str:
""" Default connection string """
return "postgresql://postgres@localhost:5432/AFC_LOGS"
@classmethod
def name_for_logs(cls) -> str:
""" Database alias name to use for logs and error messages """
return "Logs"
def write_log(self, topic: str, records: List["LogsDatabase.Record"]) \
-> None:
""" Write a bunch of log records to database
Arguments:
topic -- Kafka topic - serves a database table name
records -- List of records to write
"""
if not records:
return
try:
if topic not in self.metadata.tables:
sa.Table(
topic, self.metadata,
sa.Column("source", sa.Text(), index=True),
sa.Column("time", sa.DateTime(timezone=True), index=True),
sa.Column("log", sa_pg.JSON()),
keep_existing=True)
self.metadata.create_all(self.conn, checkfirst=True)
ins = sa.insert(self.metadata.tables[topic]).\
values([{"source": r.source,
"time": r.time,
"log": r.log}
for r in records])
self.conn.execute(ins)
except sa.exc.SQLAlchemyError as ex:
logging.error(f"Error writing {topic} log table: {ex}")
class InitialDatabase(DatabaseBase):
""" Initial Postgres database (context fro creation of other databases)
handler """
class IfExists(enum.Enum):
""" What to do if database being created already exists """
Skip = "skip"
Drop = "drop"
Exc = "exc"
@classmethod
def default_conn_str(cls) -> str:
""" Default connection string """
return "postgresql://postgres@localhost:5432/postgres"
@classmethod
def name_for_logs(cls) -> str:
""" Database alias name to use for logs and error messages """
return "Initial"
def create_db(self, db_name: str,
if_exists: "InitialDatabase.IfExists",
conn_str: str, password: Optional[str] = None,
template: Optional[str] = None) -> bool:
""" Create database
Arguments:
db_name -- Name of database to create
if_exists -- What to do if database already exists
conn_str -- Connection string to database being created
password -- Password for connection string to database being created
template -- Name of template database to use
Returns True if database was created, False if it already exists
"""
with self.engine.connect() as conn:
try:
if if_exists == self.IfExists.Drop:
conn.execute(sa.text("commit"))
conn.execute(
sa.text(f'drop database if exists "{db_name}"'))
conn.execute(sa.text("commit"))
template_clause = f' template "{template}"' if template else ""
conn.execute(
sa.text(f'create database "{db_name}"{template_clause}'))
logging.info(f"Database '{db_name}' successfully created")
return True
except sa.exc.ProgrammingError:
if if_exists != self.IfExists.Skip:
raise
engine = self.create_engine(arg_conn_str=conn_str,
arg_password=password)
engine.dispose()
logging.info(
f"Already existing database '{db_name}' will be used")
return False
def drop_db(self, db_name: str) -> None:
""" Drop given database """
with self.engine.connect() as conn:
conn.execute(sa.text("commit"))
conn.execute(sa.text(f'drop database "{db_name}"'))
# Fully qualified position in Kafka queue on certain Kafka cluster
KafkaPosition = \
NamedTuple("KafkaPosition",
[("topic", str), ("partition", int), ("offset", int)])
class KafkaPositions:
""" Collection of partially-processed Kafka messages' offsets
Private attributes:
_topics -- By-topic collection of by-partition collection of offset status
information
"""
class OffsetInfo:
""" Information about single offset
Attributes:
kafka_offset -- Offset in partition
processed -- Processed status
"""
def __init__(self, kafka_offset: int) -> None:
self.kafka_offset = kafka_offset
self.processed = False
def __lt__(self, other: "KafkaPositions.OffsetInfo") -> bool:
""" Offset-based comparison for heap queue use """
return self.kafka_offset < other.kafka_offset
def __eq__(self, other: Any) -> bool:
""" Equality comparison in vase heap queue will need it """
return isinstance(other, self.__class__) and \
(self.kafka_offset == other.kafka_offset)
def __repr__(self) -> str:
""" Debug print representation """
return f"<{self.kafka_offset}, {'T' if self.processed else 'F'}>"
class PartitionOffsets:
""" Collection of offset information objects within partition
Private attributes:
_queue -- Heap queue of offset information objects
_catalog -- Catalog of offset information objects by offset
"""
def __init__(self) -> None:
""" Constructor """
self._queue: List["KafkaPositions.OffsetInfo"] = []
self._catalog: Dict[int, "KafkaPositions.OffsetInfo"] = {}
def is_empty(self) -> bool:
""" True if collection is empty (hence might be safely deleted) """
return not self._queue
def add(self, offset: int) -> None:
""" Add information about (not processed) offset to collection """
if offset in self._catalog:
return
oi = KafkaPositions.OffsetInfo(offset)
heapq.heappush(self._queue, oi)
self._catalog[offset] = oi
def mark_processed(self, offset: Optional[int]) -> None:
""" Mark given offset or all topic offsets as processed """
if offset is not None:
if offset in self._catalog:
self._catalog[offset].processed = True
else:
for offset_info in self._catalog.values():
offset_info.processed = True
def get_processed_offset(self) -> Optional[int]:
""" Computes partition commit level
Returns Maximum offset at and below which all offsets marked as
processed. None if there is no such offset. Offsets at and below
returned offset are removed from collection """
ret: Optional[int] = None
while self._queue and self._queue[0].processed:
ret = heapq.heappop(self._queue).kafka_offset
assert ret is not None
del self._catalog[ret]
return ret
def __repr__(self) -> str:
""" Debug print representation """
return f"<{self._catalog}>"
def __init__(self) -> None:
""" Constructor """
self._topics: Dict[str, Dict[int,
"KafkaPositions.PartitionOffsets"]] = {}
def add(self, kafka_position: KafkaPosition) -> None:
""" Add given position (topic/partition/offset) as nonprocessed """
partition_offsets = \
self._topics.setdefault(kafka_position.topic, {}).\
get(kafka_position.partition)
if partition_offsets is None:
partition_offsets = self.PartitionOffsets()
self._topics[kafka_position.topic][kafka_position.partition] = \
partition_offsets
partition_offsets.add(kafka_position.offset)
def mark_processed(self, kafka_position: Optional[KafkaPosition] = None,
topic: Optional[str] = None) -> None:
""" Mark given position or all positions in a topic as processed
Arguments:
kafka_position -- Position to mark as processed or None
topic -- Topic to mark as processed or None """
if kafka_position is not None:
partition_offsets = \
self._topics.setdefault(kafka_position.topic, {}).\
get(kafka_position.partition)
if partition_offsets is not None:
partition_offsets.mark_processed(kafka_position.offset)
if topic is not None:
for partition_offsets in self._topics.get(topic, {}).values():
partition_offsets.mark_processed(None)
def get_processed_offsets(self) -> Dict[str, Dict[int, int]]:
""" Computes commit levels for all offsets in collection
Returns by-topic/partition commit levels (offets at or below which are
all marked processed). Ofets at or below returned levels are removed
from collection """
ret: Dict[str, Dict[int, int]] = {}
for topic, partitions in self._topics.items():
for partition, partition_offsets in partitions.items():
processed_offset = partition_offsets.get_processed_offset()
if processed_offset is not None:
ret.setdefault(topic, {})[partition] = processed_offset
for partition in ret.get(topic, {}):
if partitions[partition].is_empty():
del partitions[partition]
return ret
# Type for Kafka keys of ALS messages
AlsMessageKeyType = bytes
class AlsMessage:
""" Single ALS message (AFC Request, Response or Config)
Attributes:
raw_msg -- Message in raw form (as received from Kafka)
version -- Message format version
afc_server -- AFC Server ID
time_tag -- Time tag
msg_type -- Message type (one of AlsMessage.MsgType)
json_str -- Content of AFC Request/Response/Config as string
customer -- Customer (for Config) or None
geo_data_id -- Geodetic data ID (if Config) or None
uls_id -- ULS ID (if Config) or None
request_indexes -- Indexes of requests to which config is related (if
Config) or None
"""
# ALS message format version
FORMAT_VERSION = "1.0"
class MsgType(enum.Enum):
""" ALS message type string """
Request = "AFC_REQUEST"
Response = "AFC_RESPONSE"
Config = "AFC_CONFIG"
# Maps values to MsgType enum instances
value_to_type = {t.value: t for t in MsgType}
def __init__(self, raw_msg: bytes) -> None:
""" Constructor
Arguments:
raw_msg -- Message value as retrieved from Kafka """
self.raw_msg = raw_msg
try:
msg_dict = json.loads(raw_msg)
except json.JSONDecodeError as ex:
raise AlsProtocolError(f"Malforemed JSON of ALS message: {ex}",
code_line=LineNumber.exc())
try:
self.version: str = msg_dict["version"]
self.afc_server: str = msg_dict["afcServer"]
self.time_tag = datetime.datetime.fromisoformat(msg_dict["time"])
self.msg_type: "AlsMessage.MsgType" = \
self.value_to_type[msg_dict["dataType"]]
self.json_str: str = msg_dict["jsonData"]
is_config = self.msg_type == self.MsgType.Config
self.customer: Optional[str] \
= msg_dict["customer"] if is_config else None
self.geo_data_id: Optional[str] = \
msg_dict["geoDataVersion"] if is_config else None
self.uls_id: Optional[str] = \
msg_dict["ulsId"] if is_config else None
self.request_indexes: Optional[Set[int]] = \
set(int(i) for i in msg_dict.get("requestIndexes", [])) \
if is_config else None
except (LookupError, TypeError, ValueError) as ex:
raise AlsProtocolError(f"Invalid content of ALS message: {ex}",
code_line=LineNumber.exc(), data=msg_dict)
if self.version != self.FORMAT_VERSION:
raise AlsProtocolError(
f"Unsupported format version: '{self.version}'",
code_line=LineNumber.exc(), data=msg_dict)
if not isinstance(self.json_str, str):
raise AlsProtocolError("'jsonData' missing",
code_line=LineNumber.exc(), data=msg_dict)
if is_config and not \
all(isinstance(x, str) for x in
(self.customer, self.geo_data_id, self.uls_id)):
raise AlsProtocolError(
"Missing config fields",
code_line=LineNumber.current(), data=msg_dict)
class AlsMessageBundle:
""" Request/Response/Config(s) bundle
Private attributes:
_message_key -- Kafka message key
_kafka_positions -- KafkaPositions (registry of completed/incomplete
offsets)
_afc_server -- AFC Server ID
_last_update -- Time of last ALS message (from local clock)
_request_msg -- Request message as JSON dictionary (None if not yet
arrived)
_request_timetag -- Timetag of request message (None if not yet arrived)
_response_msg -- Response message as JSON dictionary (None if not yet
arrived)
_response_timetag -- Timetag of response message (None if not yet arrived)
_configs -- Dictionary of AfcConfigInfo objects, ordered by
individual request sequential indexes (or None if for
all requests)
_assembled -- True if bundle has all necessary parts
_store_parts -- Bundle in StoreParts representation. None if not yet
computed
_als_positions -- Set of positions of ALS messages used in this bundle
"""
# Top-level JSON keys in 'invariant_json' dictionary
JRR_REQUEST_KEY = "request"
JRR_RESPONSE_KEY = "response"
JRR_CONFIG_TEXT_KEY = "afc_config_text"
JRR_CUSTOMER_KEY = "customer"
JRR_ULS_KEY = "uls_data_id"
JRR_GEO_KEY = "geo_data_id"
# Information about single AFC request/response
RequestResponse = \
NamedTuple(
"RequestResponse",
# Dictionary indexed by 'JRR_...' keys. 'response' has empty (or
# nonexistent) 'availabilityExpireTime' field. Both 'request' and
# 'response' have 'requestId' field removed
[("invariant_json", JSON_DATA_TYPE),
# 'availabilityExpireTime', retrieved from 'response'
("expire_time", Optional[datetime.datetime])])
# AFC Configuration information
AfcConfigInfo = \
NamedTuple(
"AfcConfigInfo", [("config_str", str), ("customer", str),
("geo_data_id", str), ("uls_id", str)])
# Messages arranged to form used in convenient for store in DB
StoreParts = \
NamedTuple(
"StoreParts",
# AFC Server ID
[("afc_server", str),
# AFC Request message with empty
# 'availableSpectrumInquiryRequests' list
("rx_envelope", JSON_DATA_TYPE),
# AFC Response message with empty
# 'availableSpectrumInquiryResponses' list
("tx_envelope", JSON_DATA_TYPE),
# AFC Request message timetag
("rx_timetag", datetime.datetime),
# AFC Response message timetag
("tx_timetag", datetime.datetime),
# Dictionary of RequestResponse objects, indexed by 'requestId'
# field values
("request_responses", Dict[str, RequestResponse]),
# List of requests with no responses
("orphan_requests", List[JSON_DATA_TYPE]),
# List of responses with no requests
("orphan_responses", List[JSON_DATA_TYPE])])
def __init__(self, message_key: AlsMessageKeyType,
kafka_positions: KafkaPositions) -> None:
""" Constructor
message_key -- Raw key from Kafka message
kafka_positions -- KafkaPositions (registry of completed/incomplete
offsets)
"""
self._message_key = message_key
self._kafka_positions = kafka_positions
self._afc_server = ""
self._last_update = datetime.datetime.now()
self._request_msg: Optional[Dict[str, Any]] = None
self._request_timetag: Optional[datetime.datetime] = None
self._response_msg: Optional[JSON_DATA_TYPE] = None
self._response_timetag: Optional[datetime.datetime] = None
self._configs: Dict[Optional[int],
"AlsMessageBundle.AfcConfigInfo"] = {}
self._assembled = False
self._store_parts: Optional["AlsMessageBundle.StoreParts"] = None
self._als_positions: Set[KafkaPosition] = set()
def message_key(self) -> AlsMessageKeyType:
""" Kafka message key """
return self._message_key
def assembled(self) -> bool:
""" True if bundle fully assembled (ave got all pertinent ALS messages)
"""
return self._assembled
def last_update(self) -> datetime.datetime:
""" Local time of last update """
return self._last_update
def dump(self) -> JSON_DATA_TYPE:
""" Dump for debug purposes """
if self._store_parts is not None:
return self._store_parts._asdict()
return \
{"key": self._message_key.decode("latin-1"),
"afc_server": self._afc_server,
"last_update": self._last_update.isoformat(),
"request_msg": self._request_msg,
"request_timetag":
self._request_timetag.isoformat() if self._request_timetag
else None,
"response_msg": self._response_msg,
"response_timetag":
self._response_timetag.isoformat() if self._response_timetag
else None,
"configs": {k: cfg._asdict() for k, cfg in self._configs.items()}
if self._configs else None}
def request_count(self) -> int:
""" Number of contained requests """
assert self._request_msg is not None
try:
return \
len(jl(self._request_msg["availableSpectrumInquiryRequests"]))
except (LookupError, TypeError, ValueError) as ex:
raise JsonFormatError(f"Requests not found: {ex}",
code_line=LineNumber.exc(),
data=self._request_msg)
def update(self, message: AlsMessage, position: KafkaPosition) -> None:
""" Adds arrived ALS message
Arguments:
message -- Kafka raw message value
position -- Kafka message position
"""
self._last_update = datetime.datetime.now()
if self._assembled:
return
try:
self._afc_server = message.afc_server
if message.msg_type == AlsMessage.MsgType.Request:
if self._request_msg is not None:
return
try:
self._request_msg = jd(json.loads(message.json_str))
except json.JSONDecodeError:
raise JsonFormatError(
"Malformed JSON in AFC Request message",
code_line=LineNumber.exc(), data=message.json_str)
self._request_timetag = message.time_tag
elif message.msg_type == AlsMessage.MsgType.Response:
if self._response_msg is not None:
return
try:
self._response_msg = json.loads(message.json_str)
except json.JSONDecodeError:
raise JsonFormatError(
"Malformed JSON in AFC Response message",
code_line=LineNumber.exc(), data=message.json_str)
self._response_timetag = message.time_tag
else:
if message.msg_type != AlsMessage.MsgType.Config:
raise ValueError(
f"Unexpected ALS message type: {message.msg_type}")
assert message.msg_type == AlsMessage.MsgType.Config
config_info = \
self.AfcConfigInfo(
config_str=js(message.json_str),
customer=js(message.customer),
geo_data_id=js(message.geo_data_id),
uls_id=js(message.uls_id))
if message.request_indexes:
for i in message.request_indexes:
self._configs[i] = config_info
else:
self._configs[None] = config_info
self._check_config_indexes(message)
self._assembled = (self._request_msg is not None) and \
(self._response_msg is not None) and bool(self._configs) and \
((None in self._configs) or
(len(self._configs) == self.request_count()))
self._als_positions.add(position)
except (LookupError, TypeError, ValueError) as ex:
raise JsonFormatError(f"ALS message decoding problem: {ex}",
code_line=LineNumber.exc(),
data=message.raw_msg)
def take_apart(self) -> "AlsMessageBundle.StoreParts":
""" Return (assembled) message contents in StoreParts form """
assert self._assembled
if self._store_parts:
return self._store_parts
self._store_parts = \
self.StoreParts(
afc_server=self._afc_server,
rx_envelope=jd(self._request_msg),
tx_envelope=jd(self._response_msg),
rx_timetag=jdt(self._request_timetag),
tx_timetag=jdt(self._response_timetag),
request_responses={}, orphan_requests=[], orphan_responses=[])
requests: List[JSON_DATA_TYPE] = \
jl(jd(self._request_msg)["availableSpectrumInquiryRequests"])
responses: List[JSON_DATA_TYPE] = \
jl(jd(self._response_msg)["availableSpectrumInquiryResponses"])
jd(self._store_parts.rx_envelope)[
"availableSpectrumInquiryRequests"] = []
jd(self._store_parts.tx_envelope)[
"availableSpectrumInquiryResponses"] = []
response_map = {jd(r)["requestId"]: r for r in responses}
for req_idx, request in enumerate(requests):
req_id = js(jd(request)["requestId"])
response = jod(response_map.get(req_id))
if response is None:
self._store_parts.orphan_requests.append(request)
continue
del response_map[req_id]
config_info = \
self._configs[req_idx if req_idx in self._configs else None]
expire_time_str: Optional[str] = \
response.get("availabilityExpireTime")
if expire_time_str is not None:
expire_time_str = expire_time_str.replace("Z", "+00:00")
if "+" not in expire_time_str:
expire_time_str += "+00:00"
expire_time = datetime.datetime.fromisoformat(expire_time_str)
response["availabilityExpireTime"] = ""
else:
expire_time = None
jd(request)["requestId"] = ""
response["requestId"] = ""
self._store_parts.request_responses[req_id] = \
self.RequestResponse(
invariant_json={
self.JRR_REQUEST_KEY: request,
self.JRR_RESPONSE_KEY: response,
self.JRR_CONFIG_TEXT_KEY: config_info.config_str,
self.JRR_CUSTOMER_KEY: config_info.customer,
self.JRR_ULS_KEY: config_info.uls_id,
self.JRR_GEO_KEY: config_info.geo_data_id},
expire_time=expire_time)
for orphan in response_map.values():
self._store_parts.orphan_responses.append(orphan)
return self._store_parts
def _check_config_indexes(self, message: AlsMessage) -> None:
""" Ensure that config indexes in Config ALS message are valid """
if (self._request_msg is None) or (not self._configs):
return
rc = self.request_count()
if not all(0 <= i < rc for i in self._configs if i is not None):
raise AlsProtocolError(
f"Out of range config indexes found while processing ALS "
f"message with key '{self._message_key!r}'",
code_line=LineNumber.current(), data=message.raw_msg)
def __lt__(self, other) -> bool:
""" Comparison for by-time heap queue """
assert isinstance(other, self.__class__)
return self._last_update < other._last_update
def __eq__(self, other) -> bool:
""" Equality comparison for by-time heap queue """
return isinstance(other, self.__class__) and \
(self._message_key == other._message_key)
def __del__(self) -> None:
""" Destructor (marks ALS messages as processed """
for als_position in self._als_positions:
self._kafka_positions.mark_processed(als_position)
class CertificationList:
""" List of AP Certifications
Private attributes:
_certifications -- Dictionary of certifications, indexed by 0-based indices
in list from 'certificationId' field
"""
# Single certification
Certification = \
NamedTuple("Certification", [("ruleset_id", str),
("certification_id", str)])
def __init__(self, json_data: Optional[List[JSON_DATA_TYPE]] = None) \
-> None:
""" Constructor
Arguments:
json_data -- Optional JSON dictionary - value of 'certificationId'
field to read self from
"""
self._certifications: Dict[int, "CertificationList.Certification"] = {}
if json_data is not None:
try:
for c in json_data:
cert: Dict[str, Any] = jd(c)
ruleset_id = cert["rulesetId"]
certification_id = cert["id"]
if not (isinstance(ruleset_id, str),
isinstance(certification_id, str)):
raise TypeError()
self._certifications[len(self._certifications)] = \
self.Certification(
ruleset_id=js(ruleset_id),
certification_id=js(certification_id))
except (LookupError, TypeError, ValueError):
raise JsonFormatError(
"Invalid DeviceDescriptor.certificationId format",
code_line=LineNumber.exc(), data=json_data)
def add_certification(
self, index: int,
certification: "CertificationList.Certification") -> None:
""" Adds single certification
Arguments:
index -- 0-based certification indexc in certification list
certification -- Certification to add
"""
self._certifications[index] = certification
def get_uuid(self) -> uuid.UUID:
""" UUID of certification list (computed over JSON list of
certidications) """
return \
BytesUtils.json_to_uuid(
[{"rulesetId": self._certifications[idx].ruleset_id,
"id": self._certifications[idx].certification_id}
for idx in sorted(self._certifications.keys())])
def certifications(self) -> List["CertificationList.Certification"]:
""" List of Certification objects """
return \
[self._certifications[idx] for idx in sorted(self._certifications)]
def __eq__(self, other: Any) -> bool:
""" Eqquality comparison """
return isinstance(other, self.__class__) and \
(self._certifications == other._certifications)
def __hash__(self) -> int:
""" Hash over certifications """
return \
sum(idx + hash(cert) for idx, cert in self._certifications.items())
class RegRuleList:
""" List of regulatory rules
Privatew attributes:
_reg_rules - By-index in list dictionary of regulatory rules names """
def __init__(self, json_data: Optional[List[Any]] = None) -> None:
""" Constructor
Arguments:
json_data -- Optional content of 'rulesetIds' field to read self from
"""
self._reg_rules: Dict[int, str] = {}
if json_data is not None:
try:
for reg_rule in json_data:
if not isinstance(reg_rule, str):
raise TypeError()
self._reg_rules[len(self._reg_rules)] = reg_rule
except (LookupError, TypeError, ValueError):
raise JsonFormatError("Invalid regulatory rule format",
code_line=LineNumber.exc(),
data=json_data)
def add_rule(self, index: int, reg_rule: str) -> None:
""" Add regulatory rule:
Arguments:
index -- 0-based rule index in rule list
reg_rule -- Rulew name
"""
self._reg_rules[index] = reg_rule
def get_uuid(self) -> uuid.UUID:
""" Computes digest of self """
return \
BytesUtils.json_to_uuid(
[self._reg_rules[idx]
for idx in sorted(self._reg_rules.keys())])
def reg_rules(self) -> List[str]:
""" List of rule names """
return [self._reg_rules[idx] for idx in sorted(self._reg_rules.keys())]
def __eq__(self, other: Any) -> bool:
""" Equality comparison """
return isinstance(other, self.__class__) and \
(self._reg_rules == other._reg_rules)
def __hash__(self) -> int:
""" Hash value """
return sum(idx + hash(rr) for idx, rr in self._reg_rules.items())
class AlsTableBase:
""" Common part of ALS database table initialization
Protected attributes:
_adb -- AlsDatabase object
_table_name -- Table name
_table -- SQLAlchemy Table object
"""
# Name of month index column
MONTH_IDX_COL_NAME = "month_idx"
def __init__(self, adb: AlsDatabase, table_name: str) -> None:
""" Constructor
adb -- AlsDatabase object
table_name -- List of sa
"""
self._adb = adb
self._table_name = table_name
if self._table_name not in self._adb.metadata.tables:
raise \
DbFormatError(
f"'{self._table_name}' table not found in ALS database",
code_line=LineNumber.current())
self._table: sa.Table = self._adb.metadata.tables[self._table_name]
def get_column(self, name: str, expected_type: Optional[Type] = None) \
-> sa.Column:
""" Returns given column object
Arguments:
name -- Column name
expected_type -- Expected column type (None to not check)
Returns correspondent sa.Column object
"""
ret = self._table.c.get(name)
if ret is None:
raise DbFormatError(f"Column '{name}' not found in table "
f"'{self._table_name}' of ALS database",
code_line=LineNumber.current())
if (expected_type is not None) and \
(not isinstance(ret.type, expected_type)):
raise DbFormatError(f"Column '{name}' of '{self._table_name}' "
f"table of ALS database has unexpected type",
code_line=LineNumber.current())
return ret
def get_month_idx_col(self) -> sa.Column:
""" Returns an instance of 'month_idx' column """
return self.get_column(self.MONTH_IDX_COL_NAME, sa.SmallInteger)
class Lookups:
""" Collection of lookups
Private attributes:
_lookups -- List of registered LookupBase objects
"""
def __init__(self) -> None:
""" Constructor """
self._lookups: List["LookupBase"] = []
def register(self, lookup: "LookupBase") -> None:
""" Register newly-created lookup """
self._lookups.append(lookup)
def reread(self) -> None:
""" Signal all lookups to reread self (e.g. after transsaction failure)
"""
for lookup in self._lookups:
lookup.reread()
# Generic type name for lookup key value (usually int or UUID)
LookupKey = TypeVar("LookupKey")
# Generic type name for lookup table value
LookupValue = TypeVar("LookupValue")
class LookupBase(AlsTableBase, Generic[LookupKey, LookupValue], ABC):
""" Generic base class for lookup tables (database tables, also contained
in memory for speed of access)
Private attributes:
_by_value -- Dictionary of lookup keys, ordered by (value, month_index)
keys
_value_column -- SQLALchemy column for lookup tables where value contained
in some column of a single row. None for other cases (e.g.
when value should be constructed from several rows)
_need_reread -- True if dictionary should be reread from database on next
update_db()
"""
def __init__(self, adb: AlsDatabase, table_name: str, lookups: Lookups,
value_column_name: Optional[str] = None) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
table_name -- Database table name
lookups -- Lookup collection to register self in
value_column_name -- Optional name of column containing lookup value.
None for lookups that contain value in more than
one row/column
"""
AlsTableBase.__init__(self, adb=adb, table_name=table_name)
lookups.register(self)
self._value_column: Optional[sa.Column] = \
None if value_column_name is None \
else self.get_column(value_column_name)
self._by_value: Dict[Tuple[LookupValue, int], LookupKey] = {}
self._need_reread = True
def reread(self) -> None:
""" Request reread on next update """
self._need_reread = True
def update_db(self, values: Iterable[LookupValue], month_idx: int) -> None:
""" Update lookup table with new lookup values (if any)
Arguments:
values -- Sequence of lookup values (some of which may, other may not
already be in the table)
month_id -- Month index to use in new records
"""
self._reread_if_needed()
new_value_months: Set[Tuple[LookupValue, int]] = \
{(value, month_idx) for value in values} - \
set(self._by_value.keys())
if not new_value_months:
return
rows: List[ROW_DATA_TYPE] = []
for value_month in new_value_months:
if self._value_column is None:
self._by_value[value_month] = \
self._key_from_value(value_month[0])
rows += self._rows_from_value(*value_month)
try:
ins = sa_pg.insert(self._table).values(rows).\
on_conflict_do_nothing()
if self._value_column is not None:
ins = ins.returning(self._table)
result = self._adb.conn.execute(ins)
if self._value_column is not None:
for row in result:
key = self._key_from_row(row)
value = self._value_from_row_create(row)
self._by_value[(value, month_idx)] = key
assert isinstance(key, int)
new_value_months.remove((value, month_idx))
for value_month in new_value_months:
s = sa.select([self._table]).\
where(self._value_column == value_month[0])
result = self._adb.conn.execute(s)
self._by_value[value_month] = \
self._key_from_row(list(result)[0])
except (sa.exc.SQLAlchemyError, TypeError, ValueError) as ex:
raise DbFormatError(
f"Error updating '{self._table_name}': {ex}",
code_line=LineNumber.exc())
def key_for_value(self, value: LookupValue, month_idx: int) \
-> LookupKey:
""" Returns lookup key for given value """
return self._by_value[(value, month_idx)]
@abstractmethod
def _key_from_row(self, row: ROW_DATA_TYPE) -> LookupKey:
""" Required 'virtual' function. Returns key contained in given table
row dictionary """
...
@abstractmethod
def _value_from_row_create(self, row: ROW_DATA_TYPE) -> LookupValue:
""" Required 'virtual' function. Creates (possibly incomplete) lookup
value contained in given table row dictionary """
...
def _reread_if_needed(self) -> None:
""" Reread lookup from database if requested """
if not self._need_reread:
return
by_key: Dict[Tuple[LookupKey, int], LookupValue] = {}
try:
for row in self._adb.conn.execute(sa.select(self._table)):
key = (self._key_from_row(row),
row[AlsTableBase.MONTH_IDX_COL_NAME])
value = by_key.get(key)
if value is None:
by_key[key] = \
self._value_from_row_create(row)
else:
self._value_from_row_update(row, value)
except (sa.exc.SQLAlchemyError, TypeError, ValueError) as ex:
raise DbFormatError(f"Error reading '{self._table_name}': {ex}",
code_line=LineNumber.exc())
self._by_value = {(by_key[(key, month_idx)], month_idx): key
for key, month_idx in by_key}
self._need_reread = False
def _value_from_row_update(self, row: ROW_DATA_TYPE,
value: LookupValue) -> None:
""" Optional 'virtual' function. Updates incomplete value with data
from given row dictionary. Call of this function only happens for
lookup tables, whose values contained in a single row """
raise NotImplementedError(f"_value_from_row_update() not implemented "
f"for '{self._table_name}' table")
def _key_from_value(self, value: LookupValue) -> LookupKey:
""" Optional 'virtual' function. Computes lookup key from lookup value.
Only called for tables without 'value_column' """
raise NotImplementedError(f"_key_from_value() not implemented for "
f"'{self._table_name}' table")
@abstractmethod
def _rows_from_value(self, value: LookupValue, month_idx: int) \
-> List[ROW_DATA_TYPE]:
""" Required 'virtual' function. List of database rows from given
lookup value and month index """
...
class CertificationsLookup(LookupBase[uuid.UUID, CertificationList]):
""" Certifications' lookup
Private attributes:
_col_digest -- Certifications' digest column
_col_index -- Index in Certifications' list column
_col_month_idx -- Month index column
_col_ruleset_id -- National Registration Authority name column
_col_id -- Certificate ID column
"""
# Table name
TABLE_NAME = "certification"
def __init__(self, adb: AlsDatabase, lookups: Lookups) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
lookups -- Lookup collection to register self in
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME, lookups=lookups)
self._col_digest = self.get_column("certifications_digest", sa_pg.UUID)
self._col_index = self.get_column("certification_index",
sa.SmallInteger)
self._col_month_idx = self.get_month_idx_col()
self._col_ruleset_id = self.get_column("ruleset_id", sa.Text)
self._col_id = self.get_column("certification_id", sa.Text)
def _key_from_row(self, row: ROW_DATA_TYPE) -> uuid.UUID:
""" Certifications' digest for given row dictionary """
return uuid.UUID(js(row[ms(self._col_digest.name)]))
def _value_from_row_create(self, row: ROW_DATA_TYPE) -> CertificationList:
""" Returns partial certification list from given row dictionary """
ret = CertificationList()
self._value_from_row_update(row, ret)
return ret
def _value_from_row_update(self, row: ROW_DATA_TYPE,
value: CertificationList) -> None:
""" Updates given partial certification list from given row dictionary
"""
value.add_certification(
index=ji(row[ms(self._col_index.name)]),
certification=CertificationList.Certification(
ruleset_id=js(row[ms(self._col_ruleset_id.name)]),
certification_id=js(row[ms(self._col_id.name)])))
def _key_from_value(self, value: CertificationList) -> uuid.UUID:
""" Table (semi) key from Certifications object """
return value.get_uuid()
def _rows_from_value(self, value: CertificationList, month_idx: int) \
-> List[ROW_DATA_TYPE]:
""" List of rows dictionaries, representing given Certifications object
"""
ret: List[ROW_DATA_TYPE] = []
for cert_idx, certification in enumerate(value.certifications()):
ret.append(
{ms(self._col_digest.name): value.get_uuid().urn,
ms(self._col_index.name): cert_idx,
ms(self._col_month_idx.name): month_idx,
ms(self._col_ruleset_id.name): certification.ruleset_id,
ms(self._col_id.name): certification.certification_id})
return ret
class AfcConfigLookup(LookupBase[uuid.UUID, str]):
""" AFC Configs lookup table
Private attributes:
_col_digest -- Digest computed over AFC Config string column
_col_month_idx -- Month index column
_col_text -- AFC Config text representation column
_col_json -- AFC Config JSON representation column
"""
# Table name
TABLE_NAME = "afc_config"
def __init__(self, adb: AlsDatabase, lookups: Lookups) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
lookups -- Lookup collection to register self in
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME, lookups=lookups)
self._col_digest = self.get_column("afc_config_text_digest",
sa_pg.UUID)
self._col_month_idx = self.get_month_idx_col()
self._col_text = self.get_column("afc_config_text", sa.Text)
self._col_json = self.get_column("afc_config_json", sa_pg.JSON)
def _key_from_row(self, row: ROW_DATA_TYPE) -> uuid.UUID:
""" Returns AFC Config digest stored in a row dictionary """
return uuid.UUID(js(row[ms(self._col_digest.name)]))
def _value_from_row_create(self, row: ROW_DATA_TYPE) -> str:
""" Returns AFC Config string stored in row dictionary """
return js(row[ms(self._col_text.name)])
def _key_from_value(self, value: str) -> uuid.UUID:
""" Computes AFC config digest from AFC Config string """
return BytesUtils.text_to_uuid(value)
def _rows_from_value(self, value: str, month_idx: int) \
-> List[ROW_DATA_TYPE]:
""" Returns row dictionary from AFC Config string """
try:
config_json = json.loads(value)
except json.JSONDecodeError as ex:
raise JsonFormatError(f"Malformed AFC Config JSON: {ex}",
code_line=LineNumber.exc(), data=value)
return [{ms(self._col_digest.name):
BytesUtils.text_to_uuid(value).urn,
ms(self._col_month_idx.name): month_idx,
ms(self._col_text.name): value,
ms(self._col_json.name): config_json}]
class StringLookup(LookupBase[int, str]):
""" Lookup table with string values and sequential integer keys
Private attributes:
_col_id -- Sequential index column
_col_month_idx -- Month index column
_col_value -- String value column
"""
# Lookup parameters
Params = NamedTuple(
"Params",
# Table name
[("table_name", str),
# Sequential index column name
("id_col_name", str),
# String value column name
("value_col_name", str)])
# Parameter for AFC Server name lookup
AFC_SERVER_PARAMS = Params(table_name="afc_server",
id_col_name="afc_server_id",
value_col_name="afc_server_name")
# Parameters for Customer name lookup
CUSTOMER_PARAMS = Params(table_name="customer",
id_col_name="customer_id",
value_col_name="customer_name")
# Parameters for ULS ID lookup
ULS_PARAMS_PARAMS = Params(table_name="uls_data_version",
id_col_name="uls_data_version_id",
value_col_name="uls_data_version")
# Parameters for Geodetic data ID lookup
GEO_DATA_PARAMS = Params(table_name="geo_data_version",
id_col_name="geo_data_version_id",
value_col_name="geo_data_version")
def __init__(self, adb: AlsDatabase, params: "StringLookup.Params",
lookups: Lookups) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
params -- Lookup parameters
lookups -- Lookup collection to register self in
"""
super().__init__(adb=adb, table_name=params.table_name,
value_column_name=params.value_col_name,
lookups=lookups)
self._col_id = self.get_column(params.id_col_name, sa.Integer)
self._col_month_idx = self.get_month_idx_col()
self._col_value = self.get_column(params.value_col_name, sa.Text)
def _key_from_row(self, row: ROW_DATA_TYPE) -> int:
""" Key from row dictionary """
return ji(row[ms(self._col_id.name)])
def _value_from_row_create(self, row: ROW_DATA_TYPE) -> str:
""" Value from row dictionary """
return js(row[ms(self._col_value.name)])
def _rows_from_value(self, value: str, month_idx: int) \
-> List[ROW_DATA_TYPE]:
""" Lookup table row dictionary for a value """
return [{ms(self._col_month_idx.name): month_idx,
ms(self._col_value.name): value}]
# Generic type parameter for data key, used in data dictionaries, passed to
# update_db().
# Nature of this key might be different - it can be primary data key (usually
# data digest), foreign key or even key for consistency (to not create
# list-based version of update_db()).
# Digest primary keys (the most typical option) are expensive to compute, so
# their values should be reused, not recomputed
TableUpdaterDataKey = TypeVar("TableUpdaterDataKey")
# Generic type parameter for data value stored in table - type for values of
# data dictionary passed to update_db(). Most often a JSON object (e.g. from
# AFC message) to be written to table - maybe along with to dependent tables
TableUpdaterData = TypeVar("TableUpdaterData")
class TableUpdaterBase(AlsTableBase,
Generic[TableUpdaterDataKey, TableUpdaterData], ABC):
""" Base class for tables being updated (no in-memory data copy)
Private attributes:
_json_obj_name -- Name of JSON object that corresponds to data in
table (or something descriptive) for error
reporting purposes
_data_key_columns -- List of columns that correspond to data key
(usually - primary table key without 'month_idx'
column). Used to collect information for
_update_foreign_sources(), empty means not to call
_update_foreign_sources()
"""
def __init__(self, adb: AlsDatabase, table_name: str, json_obj_name: str,
data_key_column_names: Optional[List[str]] = None) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
table_name -- Table name
json_obj_name -- Name of JSON object that corresponds to data
in table (or something descriptive) for error
reporting purposes
data_key_column_names -- List of names of columns that correspond to
data key (usually - primary table key without
'month_idx' column). Used to collect
information for _update_foreign_sources(),
None means not to call
_update_foreign_sources()
"""
AlsTableBase.__init__(self, adb=adb, table_name=table_name)
self._json_obj_name = json_obj_name
self._data_key_columns = \
[self.get_column(col_name, None)
for col_name in (data_key_column_names or [])]
def update_db(self,
data_dict: Dict[TableUpdaterDataKey, TableUpdaterData],
month_idx: int) -> None:
""" Write data to table (unless they are already there)
Arguments:
data_dict -- Data dictionary (one item per record)
month_idx -- Value for 'month_idx' column
"""
try:
if not data_dict:
return
rows: List[ROW_DATA_TYPE] = []
self._update_lookups(data_objects=data_dict.values(),
month_idx=month_idx)
row_infos: \
Dict[TableUpdaterDataKey,
Tuple[TableUpdaterData, List[ROW_DATA_TYPE]]] = {}
for data_key, data_object in data_dict.items():
rows_for_data = self._make_rows(data_key=data_key,
data_object=data_object,
month_idx=month_idx)
rows += rows_for_data
row_infos[data_key] = (data_object, rows_for_data)
self._update_foreign_targets(row_infos=row_infos,
month_idx=month_idx)
except (LookupError, TypeError, ValueError) as ex:
raise JsonFormatError(
f"Invalid {self._json_obj_name} object format: {ex}",
code_line=LineNumber.exc())
ins = sa_pg.insert(self._table).values(rows).on_conflict_do_nothing()
if self._data_key_columns:
ins = ins.returning(*self._data_key_columns)
try:
result = self._adb.conn.execute(ins)
except (sa.exc.SQLAlchemyError) as ex:
raise DbFormatError(f"Error updating '{self._table_name}': {ex}",
code_line=LineNumber.exc())
if not self._data_key_columns:
return
inserted_rows: \
Dict[TableUpdaterDataKey,
Tuple[ROW_DATA_TYPE, TableUpdaterData,
RESULT_ROW_DATA_TYPE]] = {}
for result_row_idx, result_row in enumerate(result):
data_key = \
self._data_key_from_result_row(
result_row=result_row, result_row_idx=result_row_idx)
inserted_rows[data_key] = (row_infos[data_key][1][0],
data_dict[data_key], result_row)
self._update_foreign_sources(inserted_rows=inserted_rows,
month_idx=month_idx)
def _update_lookups(self, data_objects: Iterable[TableUpdaterData],
month_idx: int) -> None:
""" Updates lookups, references by current table.
Optional 'virtual' function
Arguments:
data_objects -- Sequence of data objects being inserted
month_idx -- Value for 'month_idx' column
"""
pass
@abstractmethod
def _make_rows(self, data_key: TableUpdaterDataKey,
data_object: TableUpdaterData,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Generates list of table row dictionaries for given object.
Mandatory 'virtual' function
Argument:
data_key -- Data key for a data object
data_object -- Data object
month_idx -- Value for 'month_idx' column
Returns list of row dictionaries
"""
...
def _update_foreign_targets(
self,
row_infos: Dict[TableUpdaterDataKey,
Tuple[TableUpdaterData, List[ROW_DATA_TYPE]]],
month_idx: int) -> None:
""" Updates tables pointed to by foreign keys of this table.
Optional 'virtual' function
Arguments:
row_infos -- Dictionary of data objects and row lists generated from
them, indexed by data keys
month_idx -- Value for 'month_idx' column
"""
pass
def _data_key_from_result_row(
self, result_row: Tuple[Any, ...], result_row_idx: int) \
-> TableUpdaterDataKey:
""" Returns data key from given result row (comprised of columns,
passed to constructor as 'data_key_column_names' argument). Called only
if _update_foreign_sources() should be called. Default implementation
(presented here) returns value from first and only column
Arguments:
result_row -- Result row (list of values of columns, passed as
'data_key_columns' parameter
result_row_idx -- 0-based index of result row
Returns data key, computed from result row and row index
"""
assert len(result_row) == 1
return result_row[0]
def _update_foreign_sources(
self,
inserted_rows: Dict[TableUpdaterDataKey,
Tuple[ROW_DATA_TYPE, TableUpdaterData,
RESULT_ROW_DATA_TYPE]],
month_idx: int) -> None:
""" Updates tables whose foreign keys point to this table.
Optional 'virtual' function that only called if 'data_key_column_names'
was passed to constructor
Arguments:
inserted_rows -- Information about newly-inserted rows (dictionary of
(row_dictionary, data_object, result_row) tuples,
ordered by data keys
month_idx -- Month index
"""
raise NotImplementedError(f"_update_foreign_sources() not implemented "
f"for table '{self._table_name}'")
class DeviceDescriptorTableUpdater(TableUpdaterBase[uuid.UUID,
JSON_DATA_TYPE]):
""" Updater of device descriptor table.
Data key is digest, computed over device descriptor JSON string
Private data:
_cert_lookup -- Certificates' lookup
_col_digest -- Digest column
_col_month_idx -- Month index column
_col_serial -- AP Serial Number column
_col_cert_digest -- Certificates' digest column
"""
TABLE_NAME = "device_descriptor"
def __init__(self, adb: AlsDatabase, cert_lookup: CertificationsLookup) \
-> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
cert_lookup -- Certificates' lookup
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME,
json_obj_name="DeviceDescriptor")
self._cert_lookup = cert_lookup
self._col_digest = self.get_column("device_descriptor_digest",
sa_pg.UUID)
self._col_month_idx = self.get_month_idx_col()
self._col_serial = self.get_column("serial_number", sa.Text)
self._col_cert_digest = self.get_column("certifications_digest",
sa_pg.UUID)
def _update_lookups(self, data_objects: Iterable[JSON_DATA_TYPE],
month_idx: int) -> None:
""" Update used lookups
Arguments:
data_objects -- Sequence of JSON dictionaries with device descriptors
row_lookup -- Rows to be written to database, ordered by device
descriptor digests
month_idx -- Month index
"""
cert_lists: List[CertificationList] = []
for d in data_objects:
j = jd(d)
try:
cert_lists.append(CertificationList(jl(j["certificationId"])))
except LookupError as ex:
raise JsonFormatError(
f"Certifications not found: {ex}",
code_line=LineNumber.exc(), data=j)
self._cert_lookup.update_db(cert_lists, month_idx=month_idx)
def _make_rows(self, data_key: uuid.UUID, data_object: JSON_DATA_TYPE,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Generates table row dictionary for given device descriptor JSON
Arguments:
data_key -- Data key (DeviceDescriptor JSON digest)
data_object -- DeviceDescriptor JSON
month_idx -- Month index
Returns list of single row dictionary
"""
try:
json_object = jd(data_object)
return [{ms(self._col_digest.name): data_key.urn,
ms(self._col_month_idx.name): month_idx,
ms(self._col_serial.name): json_object["serialNumber"],
ms(self._col_cert_digest.name):
self._cert_lookup.key_for_value(
CertificationList(json_object["certificationId"]),
month_idx=month_idx).urn}]
except (LookupError, TypeError, ValueError) as ex:
raise \
JsonFormatError(
f"Invalid device DeviceDescriptor format: '{ex}'",
code_line=LineNumber.exc(), data=data_object)
class LocationTableUpdater(TableUpdaterBase[uuid.UUID, JSON_DATA_TYPE]):
""" Locations table updater.
Data key is digest over JSON Location object, data value is JSON Location
object
Private attributes:
_col_digest -- Digest over JSON Locatopn object column
_col_month_idx -- Month index column
_col_location -- Geodetic location column
_col_loc_uncertainty -- Location uncertainty in meters column
_col_loc_type -- Location type
(ellipse/radialPolygon/linearPolygon) column
_col_deployment_type -- Location deployment (indoor/outdoor) column
_col_height -- Height in meters column
_col_height_uncertainty -- Height uncertainty in meters column
_col_height_type -- Height type (AGL/AMSL) column
"""
# Table name
TABLE_NAME = "location"
# Point geodetic coordinates - in North/East positive degrees
Point = NamedTuple("Point", [("lat", float), ("lon", float)])
# Length of one degree in meters in latitudinal direction
DEGREE_M = 6_371_000 * math.pi / 180
def __init__(self, adb: AlsDatabase) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME,
json_obj_name="Location")
self._col_digest = self.get_column("location_digest", sa_pg.UUID)
self._col_month_idx = self.get_month_idx_col()
self._col_location = self.get_column("location_wgs84", ga.Geography)
self._col_loc_uncertainty = self.get_column("location_uncertainty_m",
sa.Float)
self._col_loc_type = self.get_column("location_type", sa.Text)
self._col_deployment_type = self.get_column("deployment_type",
sa.Integer)
self._col_height = self.get_column("height_m", sa.Float)
self._col_height_uncertainty = \
self.get_column("height_uncertainty_m", sa.Float)
self._col_height_type = self.get_column("height_type", sa.Text)
def _make_rows(self, data_key: uuid.UUID, data_object: JSON_DATA_TYPE,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Makes table row dictionary from Location JSON data object
Arguments:
data_key -- Data key (Location JSON digest)
data_object -- Data object (Location JSON)
month_idx -- Month index
Returns single-element row dictionary list
"""
try:
json_object = jd(data_object)
j_elev = json_object["elevation"]
ret: ROW_DATA_TYPE = \
{ms(self._col_digest.name): data_key.urn,
ms(self._col_month_idx.name): month_idx,
ms(self._col_deployment_type.name):
ji(json_object.get("indoorDeployment", 0)),
ms(self._col_height.name): float(j_elev["height"]),
ms(self._col_height_type.name): str(j_elev["heightType"]),
ms(self._col_height_uncertainty.name):
float(j_elev["verticalUncertainty"])}
loc_uncertainty: float
if "ellipse" in json_object:
ret[ms(self._col_loc_type.name)] = "ellipse"
j_ellipse = jd(json_object["ellipse"])
center = self._get_point(jd(j_ellipse["center"]))
loc_uncertainty = float(j_ellipse["majorAxis"])
elif "radialPolygon" in json_object:
ret[ms(self._col_loc_type.name)] = "radialPolygon"
j_r_poly = jd(json_object["radialPolygon"])
loc_uncertainty = 0
center = self._get_point(jd(j_r_poly["center"]))
for j_v in j_r_poly["outerBoundary"]:
loc_uncertainty = max(loc_uncertainty,
float(j_v["length"]))
else:
j_l_poly = jd(json_object["linearPolygon"])
ret[ms(self._col_loc_type.name)] = "linearPolygon"
center_lat: float = 0
center_lon: float = 0
lon0: Optional[float] = None
for j_p in j_l_poly["outerBoundary"]:
p = self._get_point(jd(j_p))
center_lat += p.lat
if lon0 is None:
lon0 = p.lon
center_lon += self._same_hemisphere(p.lon, lon0)
center_lat /= len(j_l_poly["outerBoundary"])
center_lon /= len(j_l_poly["outerBoundary"])
if center_lon <= -180:
center_lon += 360
elif center_lon > 180:
center_lon -= 360
center = self.Point(lat=center_lat, lon=center_lon)
loc_uncertainty = 0
for j_p in jl(j_l_poly["outerBoundary"]):
p = self._get_point(jd(j_p))
loc_uncertainty = max(loc_uncertainty,
self._dist(center, p))
ret[ms(self._col_loc_uncertainty.name)] = loc_uncertainty
ret[ms(self._col_location.name)] = \
f"POINT({center.lon} {center.lat})"
return [ret]
except (LookupError, TypeError, ValueError) as ex:
raise JsonFormatError(f"Invalid Location format: '{ex}'",
code_line=LineNumber.exc(),
data=data_object)
def _get_point(self, j_p: dict) -> "LocationTableUpdater.Point":
""" Point object from JSON
Arguments:
j_p -- JSON Point object
Returns Point object
"""
return self.Point(lat=j_p["latitude"], lon=j_p["longitude"])
def _same_hemisphere(self, lon: float, root_lon: float) -> float:
""" Makes actually close longitudes numerically close
Arguments:
lon -- Longitude in question
root_lon -- Longitude in hemisphere in question
Returns if longitudes are on the opposite sides of 180 - returns
appropriately corrected first one (even if it will go beyond
[-180, 180]. Otherwise - just returns first longitude
"""
if lon < (root_lon - 180):
lon += 360
elif lon > (root_lon + 180):
lon -= 360
return lon
def _dist(self, p1: "LocationTableUpdater.Point",
p2: "LocationTableUpdater.Point") -> float:
""" Approximate distance in meters beteen two geodetic points """
lat_dist = (p1.lat - p2.lat) * self.DEGREE_M
lon_dist = \
(p1.lon - self._same_hemisphere(p2.lon, p1.lon)) * \
math.cos((p1.lat + p2.lat) / 2 * math.pi / 180) * \
self.DEGREE_M
return math.sqrt(lat_dist * lat_dist + lon_dist * lon_dist)
class CompressedJsonTableUpdater(TableUpdaterBase[uuid.UUID, JSON_DATA_TYPE]):
""" Compressed JSONs table
Private attributes:
_col_digest -- Digest over uncompressed JSON column
_col_month_idx -- Month index column
_col_data -- Compressed JSON column
"""
# Table name
TABLE_NAME = "compressed_json"
def __init__(self, adb: AlsDatabase) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME,
json_obj_name="Request/Response")
self._col_digest = self.get_column("compressed_json_digest",
sa_pg.UUID)
self._col_month_idx = self.get_month_idx_col()
self._col_data = self.get_column("compressed_json_data",
sa.LargeBinary)
def _make_rows(self, data_key: uuid.UUID, data_object: JSON_DATA_TYPE,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Makes row dictionary
Arguments:
data_key -- Digest over JSON
data_object -- JSON itself
month_idx -- Month index
"""
return [{ms(self._col_digest.name): data_key.urn,
ms(self._col_month_idx.name): month_idx,
ms(self._col_data.name):
lz4.frame.compress(BytesUtils.json_to_bytes(data_object))}]
class MaxEirpTableUpdater(TableUpdaterBase[uuid.UUID, JSON_DATA_TYPE]):
""" Updater for Max EIRP table.
Data key is digest, used in request_response table (i.e. digest computed
over AlsMessageBundle.RequestResponse.invariant_json).
Data value is this object itself
(AlsMessageBundle.RequestResponse.invariant_json)
Private attributes:
_col_digest -- Digest over invariant_json column
_col_month_idx -- Month index column
_col_op_class -- Channel operating class column
_col_channel -- Channel number column
_col_eirp -- Maximum EIRP in dBm column
"""
# Table name
TABLE_NAME = "max_eirp"
def __init__(self, adb: AlsDatabase) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME,
json_obj_name="AvailableChannelInfo")
self._col_digest = self.get_column("request_response_digest",
sa_pg.UUID)
self._col_month_idx = self.get_month_idx_col()
self._col_op_class = self.get_column("channel", sa.SmallInteger)
self._col_channel = self.get_column("channel", sa.SmallInteger)
self._col_eirp = self.get_column("max_eirp_dbm", sa.Float)
def _make_rows(self, data_key: uuid.UUID, data_object: JSON_DATA_TYPE,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Prepares list of row dictionaries
Arguments:
data_key -- Digest over 'invariant_json'
data_object -- 'invariant_json' itself
month_idx -- Month index
Returns list or row dictionaries
"""
ret: List[ROW_DATA_TYPE] = []
try:
for av_chan_info in data_object:
av_chan_info_j = jd(av_chan_info)
op_class: int = av_chan_info_j["globalOperatingClass"]
for channel, eirp in zip(av_chan_info_j["channelCfi"],
av_chan_info_j["maxEirp"]):
ret.append({ms(self._col_digest.name): data_key.urn,
ms(self._col_month_idx.name): month_idx,
ms(self._col_op_class.name): op_class,
ms(self._col_channel.name): channel,
ms(self._col_eirp.name): eirp})
except (LookupError, TypeError, ValueError) as ex:
raise \
JsonFormatError(f"Invalid AvailableChannelInfo format: '{ex}'",
code_line=LineNumber.exc(), data=data_object)
return ret
class MaxPsdTableUpdater(TableUpdaterBase[uuid.UUID, JSON_DATA_TYPE]):
""" Updater for Max PSD table.
Data key is digest, used in request_response table (i.e. digest computed
over AlsMessageBundle.RequestResponse.invariant_json).
Data value is thsi object itsef
(AlsMessageBundle.RequestResponse.invariant_json)
Private attributes:
_col_digest -- Digest over invariant_json column
_col_month_idx -- Month index column
_col_low -- Lower frequency range bound in MHz column
_col_high -- High frequency range bound in MHz column
_col_psd -- Maximum PSD in dBm/MHz column
"""
TABLE_NAME = "max_psd"
def __init__(self, adb: AlsDatabase) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME,
json_obj_name="AvailableFrequencyInfo")
self._col_digest = self.get_column("request_response_digest",
sa_pg.UUID)
self._col_month_idx = self.get_month_idx_col()
self._col_low = self.get_column("low_frequency_mhz", sa.SmallInteger)
self._col_high = self.get_column("high_frequency_mhz", sa.SmallInteger)
self._col_psd = self.get_column("max_psd_dbm_mhz", sa.Float)
def _make_rows(self, data_key: uuid.UUID, data_object: JSON_DATA_TYPE,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Prepares list of row dictionaries
Arguments:
data_key -- Digest over 'invariant_json'
data_object -- 'invariant_json' itself
month_idx -- Month index
Returns list or row dictionaries
"""
ret: List[ROW_DATA_TYPE] = []
try:
for av_freq_info in data_object:
av_freq_info_j = jd(av_freq_info)
freq_range = jd(av_freq_info_j["frequencyRange"])
ret.append(
{ms(self._col_digest.name): data_key.urn,
ms(self._col_month_idx.name): month_idx,
ms(self._col_low.name): freq_range["lowFrequency"],
ms(self._col_high.name): freq_range["highFrequency"],
ms(self._col_psd.name): av_freq_info_j["maxPsd"]})
except (LookupError, TypeError, ValueError) as ex:
raise \
JsonFormatError(
f"Invalid AvailableFrequencyInfo format: '{ex}'",
code_line=LineNumber.exc(), data=data_object)
return ret
class RequestResponseTableUpdater(TableUpdaterBase[uuid.UUID, JSON_DATA_TYPE]):
""" Updater for request/response table
Data key is digest, used in request_response table (i.e. digest computed
over AlsMessageBundle.RequestResponse.invariant_json).
Data value is thsi object itsef
(AlsMessageBundle.RequestResponse.invariant_json)
Private attributes:
_afc_config_lookup -- AFC Config lookup
_customer_lookup -- Customer name lookup
_uls_lookup -- ULS ID lookup
_geo_data_lookup -- Geodetic data lookup
_compressed_json_updater -- Compressed JSON table updater
_dev_desc_updater -- Device Descriptor table updater
_location_updater -- Location table updater
_max_eirp_updater -- Maximum EIRP table updater
_max_psd_updater -- Maximum PSD table updater
_col_digest -- Digest over 'invariant_json' column
_col_month_idx -- Month index column
_col_afc_config_digest -- AFC Config digest column
_col_customer_id -- Customer ID column
_col_uls_data_id -- ULS Data ID column
_col_geo_data_id -- Geodetic data ID column
_col_req_digest -- Request digest column
_col_resp_digest -- Response digest column
_col_dev_desc_digest -- Device Descriptor digets column
_col_loc_digest -- Location digest column
_col_response_code -- Response code column
_col_response_description -- Response description column
_col_response_data -- Response data column
"""
TABLE_NAME = "request_response"
def __init__(self, adb: AlsDatabase, afc_config_lookup: AfcConfigLookup,
customer_lookup: StringLookup, uls_lookup: StringLookup,
geo_data_lookup: StringLookup,
compressed_json_updater: CompressedJsonTableUpdater,
dev_desc_updater: DeviceDescriptorTableUpdater,
location_updater: LocationTableUpdater,
max_eirp_updater: MaxEirpTableUpdater,
max_psd_updater: MaxPsdTableUpdater) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
afc_config_lookup -- AFC Config lookup
customer_lookup -- Customer name lookup
uls_lookup -- ULS ID lookup
geo_data_lookup -- Geodetic data lookup
compressed_json_updater -- Compressed JSON table updater
dev_desc_updater -- Device Descriptor table updater
location_updater -- Location table updater
max_eirp_updater -- Maximum EIRP table updater
max_psd_updater -- Maximum PSD table updater
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME,
json_obj_name="RequestResponse")
self._afc_config_lookup = afc_config_lookup
self._customer_lookup = customer_lookup
self._uls_lookup = uls_lookup
self._geo_data_lookup = geo_data_lookup
self._compressed_json_updater = compressed_json_updater
self._dev_desc_updater = dev_desc_updater
self._location_updater = location_updater
self._max_eirp_updater = max_eirp_updater
self._max_psd_updater = max_psd_updater
self._col_digest = self.get_column("request_response_digest",
sa_pg.UUID)
self._col_month_idx = self.get_month_idx_col()
self._col_afc_config_digest = self.get_column("afc_config_text_digest",
sa_pg.UUID)
self._col_customer_id = self.get_column("customer_id", sa.Integer)
self._col_uls_data_id = self.get_column("uls_data_version_id",
sa.Integer)
self._col_geo_data_id = self.get_column("geo_data_version_id",
sa.Integer)
self._col_req_digest = self.get_column("request_json_digest",
sa_pg.UUID)
self._col_resp_digest = self.get_column("response_json_digest",
sa_pg.UUID)
self._col_dev_desc_digest = self.get_column("device_descriptor_digest",
sa_pg.UUID)
self._col_loc_digest = self.get_column("location_digest", sa_pg.UUID)
self._col_response_code = self.get_column("response_code", sa.Integer)
self._col_response_description = \
self.get_column("response_description", sa.Text)
self._col_response_data = self.get_column("response_data", sa.Text)
def _update_lookups(self, data_objects: Iterable[JSON_DATA_TYPE],
month_idx: int) -> None:
""" Update used lookups
Arguments:
data_objects -- Sequence of 'invariant_json' objects
month_idx -- Month index
"""
configs: List[str] = []
customers: List[str] = []
ulss: List[str] = []
geos: List[str] = []
for json_obj in data_objects:
json_object = jd(json_obj)
configs.append(
js(json_object[AlsMessageBundle.JRR_CONFIG_TEXT_KEY]))
customers.append(
js(json_object[AlsMessageBundle.JRR_CUSTOMER_KEY]))
ulss.append(js(json_object[AlsMessageBundle.JRR_ULS_KEY]))
geos.append(js(json_object[AlsMessageBundle.JRR_GEO_KEY]))
self._afc_config_lookup.update_db(values=configs, month_idx=month_idx)
self._customer_lookup.update_db(values=customers, month_idx=month_idx)
self._uls_lookup.update_db(values=ulss, month_idx=month_idx)
self._geo_data_lookup.update_db(values=geos, month_idx=month_idx)
def _make_rows(self, data_key: uuid.UUID, data_object: JSON_DATA_TYPE,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Makes database row dictionary
Arguments:
data_key -- Digest of 'invariant_json' object
data_object -- 'invariant_json' object itself
month_idx -- Month index
Returns single-element row dictionary
"""
try:
json_dict = jd(data_object)
resp_status: Dict[str, Any] = \
jd(json_dict[AlsMessageBundle.JRR_RESPONSE_KEY]["response"])
success = resp_status["responseCode"] == 0
resp_data: Optional[str] = None
if not success:
resp_data = ""
for field_name, field_value in \
(resp_status.get("supplementalInfo", {}) or
{}).items():
if field_value and \
(field_name in ("missingParams", "invalidParams",
"unexpectedParams")):
resp_data += \
("," if resp_data else "") + ",".join(field_value)
return [{
ms(self._col_digest.name): data_key.urn,
ms(self._col_month_idx.name): month_idx,
ms(self._col_afc_config_digest.name):
self._afc_config_lookup.key_for_value(
json_dict[AlsMessageBundle.JRR_CONFIG_TEXT_KEY],
month_idx=month_idx).urn,
ms(self._col_customer_id.name):
self._customer_lookup.key_for_value(
json_dict[AlsMessageBundle.JRR_CUSTOMER_KEY],
month_idx=month_idx),
ms(self._col_uls_data_id.name):
self._uls_lookup.key_for_value(
json_dict[AlsMessageBundle.JRR_ULS_KEY],
month_idx=month_idx),
ms(self._col_geo_data_id.name):
self._geo_data_lookup.key_for_value(
json_dict[AlsMessageBundle.JRR_GEO_KEY],
month_idx=month_idx),
ms(self._col_req_digest.name):
BytesUtils.json_to_uuid(
json_dict[AlsMessageBundle.JRR_REQUEST_KEY]).urn,
ms(self._col_resp_digest.name):
BytesUtils.json_to_uuid(
json_dict[AlsMessageBundle.JRR_RESPONSE_KEY]).urn,
ms(self._col_dev_desc_digest.name):
BytesUtils.json_to_uuid(
json_dict[AlsMessageBundle.JRR_REQUEST_KEY][
"deviceDescriptor"]).urn,
ms(self._col_loc_digest.name):
BytesUtils.json_to_uuid(
json_dict[AlsMessageBundle.JRR_REQUEST_KEY][
"location"]).urn,
ms(self._col_response_code.name):
json_dict[AlsMessageBundle.JRR_RESPONSE_KEY][
"response"]["responseCode"],
ms(self._col_response_description.name):
resp_status.get("shortDescription"),
ms(self._col_response_data.name): resp_data}]
except (LookupError, TypeError, ValueError) as ex:
raise JsonFormatError(
f"Invalid Request or Response format: '{ex}'",
code_line=LineNumber.exc(), data=data_object)
def _update_foreign_targets(
self,
row_infos: Dict[uuid.UUID,
Tuple[JSON_DATA_TYPE, List[ROW_DATA_TYPE]]],
month_idx: int) -> None:
""" Updates tables this one references
Arguments:
row_infos -- Dictionary of data objects and row lists generated from
them, indexed by data keys
month_idx -- Month index
"""
updated_jsons: Dict[uuid.UUID, JSON_DATA_TYPE] = {}
updated_dev_desc: Dict[uuid.UUID, JSON_DATA_TYPE] = {}
updated_locations: Dict[uuid.UUID, JSON_DATA_TYPE] = {}
for json_obj, rows in row_infos.values():
json_object = jd(json_obj)
row = rows[0]
updated_jsons[
uuid.UUID(js(row[ms(self._col_req_digest.name)]))] = \
json_object[AlsMessageBundle.JRR_REQUEST_KEY]
updated_jsons[
uuid.UUID(js(row[ms(self._col_resp_digest.name)]))] = \
json_object[AlsMessageBundle.JRR_RESPONSE_KEY]
updated_dev_desc[uuid.UUID(
js(row[ms(self._col_dev_desc_digest.name)]))] = \
jd(json_object[AlsMessageBundle.JRR_REQUEST_KEY])[
"deviceDescriptor"]
updated_locations[
uuid.UUID(js(row[ms(self._col_loc_digest.name)]))] = \
jd(jd(json_object[AlsMessageBundle.JRR_REQUEST_KEY])[
"location"])
self._compressed_json_updater.update_db(data_dict=updated_jsons,
month_idx=month_idx)
self._dev_desc_updater.update_db(data_dict=updated_dev_desc,
month_idx=month_idx)
self._location_updater.update_db(data_dict=updated_locations,
month_idx=month_idx)
def _update_foreign_sources(
self,
inserted_rows: Dict[uuid.UUID, Tuple[ROW_DATA_TYPE,
JSON_DATA_TYPE,
RESULT_ROW_DATA_TYPE]],
month_idx: int) -> None:
""" Updates compressed JSONs, device descriptors, locations, EIRPs, PSD
tables for those table rows that were inserted
Arguments:
inserted_rows -- Ordered by digest rows/objects/inserted rows that
were inserted
conflicting rows -- Ordered by digest rows/objects that were not
inserted (ignored)
month_idx -- Month index
"""
updated_eirps: Dict[uuid.UUID, JSON_DATA_TYPE] = {}
updated_psds: Dict[uuid.UUID, JSON_DATA_TYPE] = {}
for digest, (_, json_obj, _) in inserted_rows.items():
json_object = jd(json_obj)
updated_eirps[digest] = \
json_object[AlsMessageBundle.JRR_RESPONSE_KEY].get(
"availableChannelInfo") or []
updated_psds[digest] = \
json_object[AlsMessageBundle.JRR_RESPONSE_KEY].get(
"availableFrequencyInfo") or []
self._max_eirp_updater.update_db(data_dict=updated_eirps,
month_idx=month_idx)
self._max_psd_updater.update_db(data_dict=updated_psds,
month_idx=month_idx)
class EnvelopeTableUpdater(TableUpdaterBase[uuid.UUID, JSON_DATA_TYPE]):
""" Request/response envelope tables.
Keys are digests over envelope JSON
Values are envelope JSONs themselves
Private attributes
_col_digest -- Digest column
_col_month_idx -- Month index column
_col_data -- Envelope JSON column
"""
# Parameters
Params = NamedTuple(
"Params",
# Table name
[("table_name", str),
# Name of digest column
("digest_col_name", str),
# Name of envelope JSON column
("value_col_name", str)])
# Parameters for AFC Request envelope table
RX_ENVELOPE_PARAMS = Params(table_name="rx_envelope",
digest_col_name="rx_envelope_digest",
value_col_name="envelope_json")
# Parameters for AFC Response envelope table
TX_ENVELOPE_PARAMS = Params(table_name="tx_envelope",
digest_col_name="tx_envelope_digest",
value_col_name="envelope_json")
def __init__(self, adb: AlsDatabase,
params: "EnvelopeTableUpdater.Params") -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
params -- Table parameters
"""
super().__init__(adb=adb, table_name=params.table_name,
json_obj_name="RequestResponseMessageEnvelope")
self._col_digest = self.get_column(params.digest_col_name,
sa_pg.UUID)
self._col_month_idx = self.get_month_idx_col()
self._col_data = self.get_column(params.value_col_name, sa_pg.JSON)
def _make_rows(self, data_key: uuid.UUID, data_object: JSON_DATA_TYPE,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Makes row dictionary
Arguments:
data_key -- Digest of envelope JSON
data_object -- Envelope JSON
month_idx -- Month index
"""
return \
[{ms(self._col_digest.name): data_key.urn,
ms(self._col_month_idx.name): month_idx,
ms(self._col_data.name): data_object}]
# Type for key of request/response association key data
RequestResponseAssociationTableDataKey = \
NamedTuple(
"RequestResponseAssociationTableDataKey",
# Serial ID of AFC message
[("message_id", int),
# Id of request/response within message
("request_id", str)])
class RequestResponseAssociationTableUpdater(
TableUpdaterBase[RequestResponseAssociationTableDataKey,
AlsMessageBundle.RequestResponse]):
""" Updater of request/response association table (intermediary between
message table and request/response table)
Private attributes:
_col_message_id -- Message serial ID column
_col_req_id -- Request ID column
_col_month_idx -- Month index column
_col_rr_digest -- Request/response (invariant_json) digest
column
_col_expire_time -- Response expiration time column
_request_response_updater -- Request/response table updater
"""
TABLE_NAME = "request_response_in_message"
def __init__(
self, adb: AlsDatabase,
request_response_updater: RequestResponseTableUpdater) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
request_response_updater -- Request/response table updater
"""
super().__init__(
adb=adb, table_name=self.TABLE_NAME,
json_obj_name="RequestResponseAssociation")
self._col_message_id = self.get_column("message_id", sa.BigInteger)
self._col_req_id = self.get_column("request_id", sa.Text)
self._col_month_idx = self.get_month_idx_col()
self._col_rr_digest = self.get_column("request_response_digest",
sa_pg.UUID)
self._col_expire_time = self.get_column("expire_time",
sa.DateTime)
self._request_response_updater = request_response_updater
def _make_rows(self,
data_key: RequestResponseAssociationTableDataKey,
data_object: AlsMessageBundle.RequestResponse,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Make row dictionary
Argument:
data_key -- Row in message table and request index
data_object -- AlsMessageBundle.RequestResponse object
month_idx -- Month index
Returns single-element row dictionary list
"""
return [{ms(self._col_message_id.name): data_key.message_id,
ms(self._col_req_id.name): data_key.request_id,
ms(self._col_month_idx.name): month_idx,
ms(self._col_rr_digest.name):
BytesUtils.json_to_uuid(data_object.invariant_json).urn,
ms(self._col_expire_time.name): data_object.expire_time}]
def _update_foreign_targets(
self,
row_infos: Dict[RequestResponseAssociationTableDataKey,
Tuple[AlsMessageBundle.RequestResponse,
List[ROW_DATA_TYPE]]],
month_idx: int) -> None:
""" Updates tables pointed to by foreign keys of this table.
Arguments:
row_infos -- Dictionary of data objects and row lists generated from
them, indexed by data keys
month_idx -- Value for 'month_idx' column
"""
self._request_response_updater.update_db(
data_dict={
uuid.UUID(js(rows[0][ms(self._col_rr_digest.name)])):
rr.invariant_json
for rr, rows in row_infos.values()},
month_idx=month_idx)
class DecodeErrorTableWriter(AlsTableBase):
""" Writer of decode error table
Private attributes:
_col_msg -- Error message column
_col_line -- Code line number column
_col_data -- Supplementary data column
_col_time -- Timetag column
_col_month_idx -- Month index column
"""
TABLE_NAME = "decode_error"
def __init__(self, adb: AlsDatabase) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME)
self._col_id = self.get_column("id", sa.BigInteger)
self._col_time = self.get_column("time", sa.DateTime)
self._col_msg = self.get_column("msg", sa.Text)
self._col_line = self.get_column("code_line", sa.Integer)
self._col_data = self.get_column("data", sa.Text)
self._col_month_idx = self.get_month_idx_col()
self._conn = self._adb.engine.connect()
def write_decode_error(
self, msg: str, line: Optional[int],
data: Optional[Union[bytes, str, JSON_DATA_TYPE]] = None) -> None:
""" Writes decode error to table
Arguments:
msg -- Error message
line -- Script line number
data -- Supplementary data
"""
if isinstance(data, bytes):
data = data.decode("latin-1")
elif isinstance(data, (list, dict)):
data = json.dumps(data)
ins = sa.insert(self._table).values(
{ms(self._col_month_idx.name): get_month_idx(),
ms(self._col_msg.name): msg,
ms(self._col_line.name): line,
ms(self._col_data.name): data,
ms(self._col_time.name):
datetime.datetime.now(dateutil.tz.tzlocal())})
self._conn.execute(ins)
class AfcMessageTableUpdater(TableUpdaterBase[int, AlsMessageBundle]):
""" AFC Message table
Keys are, for no better alternatives, 0-based indices of messages, passed
to update_db()
Data objects are AlsMessageBundle objects
Private attributes:
_col_message_id -- Message serial ID column
_col_month_idx -- Month index column
_col_afc_server -- AFC Server ID column
_col_rx_time -- AFC Request timetag column
_col_tx_time -- AFC Response timetag column
_rx_envelope_digest -- AFC Request envelope digest column
_tx_envelope_digest -- AFC Response envelope digest column
_afc_server_lookup -- Lookup fr AFC Server names
_rr_assoc_updater -- Updater of message to request/response association
table
_rx_envelope_updater -- Updater for AFC Request envelope table
_tx_envelope_updater -- Updater for AFC Response envelope table
_decode_error_writer -- Decode error table writer
"""
# Table name
TABLE_NAME = "afc_message"
def __init__(self, adb: AlsDatabase, afc_server_lookup: StringLookup,
rr_assoc_updater: RequestResponseAssociationTableUpdater,
rx_envelope_updater: EnvelopeTableUpdater,
tx_envelope_updater: EnvelopeTableUpdater,
decode_error_writer: DecodeErrorTableWriter) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object
rr_assoc_updater -- Updater of message to request/response
association table
rx_envelope_updater -- Updater for AFC Request envelope table
tx_envelope_updater -- Updater for AFC Response envelope table
decode_error_writer -- Decode error table writer
"""
super().__init__(adb=adb, table_name=self.TABLE_NAME,
json_obj_name="AlsMessageBundle",
data_key_column_names=["message_id"])
self._col_message_id = self.get_column("message_id", sa.BigInteger)
self._col_month_idx = self.get_month_idx_col()
self._col_afc_server = self.get_column("afc_server", sa.Integer)
self._col_rx_time = self.get_column("rx_time", sa.DateTime)
self._col_tx_time = self.get_column("tx_time", sa.DateTime)
self._rx_envelope_digest = self.get_column("rx_envelope_digest",
sa_pg.UUID)
self._tx_envelope_digest = self.get_column("tx_envelope_digest",
sa_pg.UUID)
self._afc_server_lookup = afc_server_lookup
self._rr_assoc_updater = rr_assoc_updater
self._rx_envelope_updater = rx_envelope_updater
self._tx_envelope_updater = tx_envelope_updater
self._decode_error_writer = decode_error_writer
def _update_lookups(self, data_objects: Iterable[AlsMessageBundle],
month_idx: int) -> None:
""" Update used lookups
Arguments:
data_objects -- Sequence of AlsMessageBundle being added to database
month_idx -- Month index
"""
self._afc_server_lookup.update_db(
values=[b.take_apart().afc_server for b in data_objects],
month_idx=month_idx)
def _make_rows(self, data_key: int, data_object: AlsMessageBundle,
month_idx: int) -> List[ROW_DATA_TYPE]:
""" Makes table row dictionary from data object
Arguments:
data_key -- Data key (sequential index in dictionary, passed to
update_db())
data_object -- AlsMessageBundle to make row from
month_idx -- Month index
Single-element list of row dictionaries
"""
parts = data_object.take_apart()
for orphans, name in \
[(parts.orphan_requests, "Requests without responses"),
(parts.orphan_responses, "Responses without requests")]:
for orphan in orphans:
self._decode_error_writer.write_decode_error(
msg=f"{name} received from {parts.afc_server}",
line=LineNumber.current(),
data=orphan)
return [{ms(self._col_month_idx.name): month_idx,
ms(self._col_afc_server.name):
self._afc_server_lookup.key_for_value(parts.afc_server,
month_idx),
ms(self._col_rx_time.name): parts.rx_timetag,
ms(self._col_tx_time.name): parts.tx_timetag,
ms(self._rx_envelope_digest.name):
BytesUtils.json_to_uuid(parts.rx_envelope).urn,
ms(self._tx_envelope_digest.name):
BytesUtils.json_to_uuid(parts.tx_envelope).urn}]
def _update_foreign_targets(
self,
row_infos: Dict[int, Tuple[AlsMessageBundle, List[ROW_DATA_TYPE]]],
month_idx: int) -> None:
""" Update RX/TX envelopes that are not yet in database
Arguments:
row_infos -- Dictionary of data objects and row lists generated from
them, indexed by data keys
month_idx -- Month index
"""
self._rx_envelope_updater.update_db(
data_dict={uuid.UUID(js(rows[0]
[ms(self._rx_envelope_digest.name)])):
bundle.take_apart().rx_envelope
for bundle, rows in row_infos.values()},
month_idx=month_idx)
self._tx_envelope_updater.update_db(
data_dict={uuid.UUID(js(rows[0]
[ms(self._tx_envelope_digest.name)])):
bundle.take_apart().tx_envelope
for bundle, rows in row_infos.values()},
month_idx=month_idx)
def _update_foreign_sources(
self,
inserted_rows: Dict[int, Tuple[ROW_DATA_TYPE,
AlsMessageBundle,
RESULT_ROW_DATA_TYPE]],
month_idx: int) -> None:
""" Updates request/response association tables and (for unique
requests/responses) dependent tables
inserted_rows -- Information about inserted rows - row dictionaries,
data objects, result rows. Ordered by 0-based
indices of inserted rows
month_idx -- Month index
"""
rr_dict: Dict[RequestResponseAssociationTableDataKey,
AlsMessageBundle.RequestResponse] = {}
for _, message_bundle, inserted_row in inserted_rows.values():
parts = message_bundle.take_apart()
for req_id, request_response in parts.request_responses.items():
rr_dict[
RequestResponseAssociationTableDataKey(
message_id=ji(inserted_row[0]), request_id=req_id)] = \
request_response
self._rr_assoc_updater.update_db(data_dict=rr_dict,
month_idx=month_idx)
def _data_key_from_result_row(self, result_row: Tuple[Any, ...],
result_row_idx: int) -> int:
""" Data key from rows written to database
Arguments:
result_row -- Insert result tuple
result_row_idx -- 0-based index i insert results
Returns the latter
"""
return result_row_idx
class IncompleteAlsBundles:
""" Collection of ALS bundles, for which not all messages arrived yet
Private attributes:
_kafka_positions -- Collection of uncommitted Kafka positions
_bundle_queue -- Heap queue of ALS bundles, arranged by last update
_bundle_map -- Maps Kafka message keys to bundles
"""
def __init__(self, kafka_positions: KafkaPositions) -> None:
""" Constructor
Arguments:
kafka_positions -- Collection of uncommitted Kafka positions
"""
self._kafka_positions = kafka_positions
self._bundle_queue: List[AlsMessageBundle] = []
self._bundle_map: Dict[AlsMessageKeyType, AlsMessageBundle] = {}
def add_message(self, message_key: AlsMessageKeyType, message: AlsMessage,
kafka_position: KafkaPosition) -> bool:
""" Adds ALS message
Arguments:
message_key -- Kafka message key
message -- AlsMessage
kafka_position -- Message's position in Kafka queue
Returns True if new bundle was created
"""
ret = False
bundle = self._bundle_map.get(message_key)
if bundle is None:
ret = True
bundle = AlsMessageBundle(message_key=message_key,
kafka_positions=self._kafka_positions)
heapq.heappush(self._bundle_queue, bundle)
self._bundle_map[message_key] = bundle
bundle.update(message, kafka_position)
heapq.heapify(self._bundle_queue)
return ret
def get_oldest_bundle(self) -> Optional[AlsMessageBundle]:
""" Get the oldest bundle (None if collection is empty) """
return self._bundle_queue[0] if self._bundle_queue else None
def get_incomplete_count(self) -> int:
""" Number of incomplete (not yet assembled) bundles """
return sum((0 if b.assembled() else 1) for b in self._bundle_queue)
def remove_oldest_bundle(self) -> None:
""" Removes oldest bundle from collection """
assert self._bundle_queue
ret = heapq.heappop(self._bundle_queue)
del self._bundle_map[ret.message_key()]
def fetch_assembled(
self, max_bundles: Optional[int] = None,
max_requests: Optional[int] = None) -> List[AlsMessageBundle]:
""" Fetch and remove from collection assembled bundles (all or some)
Arguments:
max_bundles -- Maximum number of bundles or None
max_requests -- Maximum total number of requests or None
Returns list of bundles
"""
ret: List[AlsMessageBundle] = []
idx = 0
num_requests = 0
while (idx < len(self._bundle_queue)) and \
((max_bundles is None) or (len(ret) < max_bundles)):
bundle = self._bundle_queue[idx]
if not bundle.assembled():
idx += 1
continue
if (max_requests is not None) and \
((num_requests + bundle.request_count()) > max_requests):
break
ret.append(bundle)
del self._bundle_map[bundle.message_key()]
self._bundle_queue.pop(idx)
heapq.heapify(self._bundle_queue)
return ret
class KafkaClient:
""" Wrapper over confluent_kafka.Consumer object
Private attributes:
_consumer -- confluent_kafka.Consumer object
_subscribed_topics -- Set of currently subscribed topics
_resubscribe_interval -- Minimum time interval before subscription
checks
_last_subscription_check -- Moment when subscription was last time checked
_subscribe_als -- True if ALS topic should be subscribed
_subscribe_log -- True if log topics should be subscribed
_metrics -- Metric collection
"""
# Kafka message data
MessageInfo = \
NamedTuple(
"MessageInfo",
# Message position (topic/partition/offset)
[("position", KafkaPosition),
# Message raw key
("key", Optional[bytes]),
# Message raw value
("value", bytes)])
class _ArgDsc(NamedTuple):
""" confluent_kafka.Consumer() config argument descriptor """
# confluent_kafka.Consumer() parameter
config: str
# Correspondent command line parameter (if any)
cmdline: Optional[str] = None
# Default value
default: Any = None
def get_value(self, args: Any) -> Any:
""" Returns value for parameter (from command line or default)
Arguments:
args -- Parsed command line object
Returns None or parameter value
"""
assert (self.cmdline is not None) or (self.default is not None)
ret: Any = None
if self.cmdline is not None:
assert hasattr(args, self.cmdline)
ret = getattr(args, self.cmdline)
return ret if ret is not None else self.default
# Supported confluent_kafka.Consumer() config arguments
_ARG_DSCS = [
_ArgDsc(config="bootstrap.servers", cmdline="kafka_servers"),
_ArgDsc(config="client.id", cmdline="kafka_client_id"),
_ArgDsc(config="security.protocol", cmdline="kafka_security_protocol"),
_ArgDsc(config="ssl.keystore.location", cmdline="kafka_ssl_keyfile"),
_ArgDsc(config="ssl.truststore.location", cmdline="kafka_ssl_cafile"),
_ArgDsc(config="ssl.cipher.suites", cmdline="kafka_ssl_ciphers"),
_ArgDsc(config="max.partition.fetch.bytes",
cmdline="kafka_max_partition_fetch_bytes"),
_ArgDsc(config="enable.auto.commit", default=True),
_ArgDsc(config="group.id", default="ALS"),
_ArgDsc(config="auto.offset.reset", default="earliest")]
def __init__(self, args: Any, subscribe_als: bool, subscribe_log: bool,
resubscribe_interval_s: int) -> None:
""" Constructor
Arguments:
args -- Parsed command line parameters
subscribe_als -- True if ALS topic should be subscribed
subscribe_log -- True if log topics should be subscribed
resubscribe_interval_s -- How often (interval in seconds)
subscription_check() will actually check
subscription. 0 means - on each call.
"""
config: Dict[str, Any] = {}
for ad in self._ARG_DSCS:
v = ad.get_value(args)
if v is not None:
config[ad.config] = v
if config.get("client.id", "").endswith("@"):
config["client.id"] = config["client.id"][:-1] + \
"".join(f"{b:02X}" for b in os.urandom(10))
try:
self._consumer = confluent_kafka.Consumer(config)
except confluent_kafka.KafkaException as ex:
logging.error(f"Error creating Kafka Consumer: {ex.args[0].str}")
raise
self._subscribe_als = subscribe_als
self._subscribe_log = subscribe_log
self._resubscribe_interval = \
datetime.timedelta(seconds=resubscribe_interval_s)
self._subscribed_topics: Set[str] = set()
self._last_subscription_check = \
datetime.datetime.now() - self._resubscribe_interval
self._metrics = \
Metrics([("Gauge", "siphon_fetched_offsets",
"Fetched Kafka offsets", ["topic", "partition"]),
("Counter", "siphon_kafka_errors",
"Messages delivered with errors", ["topic", "code"]),
("Gauge", "siphon_comitted_offsets",
"Comitted Kafka offsets", ["topic", "partition"])])
def subscription_check(self) -> None:
""" If it's time - check if new matching topics arrived and resubscribe
if so """
if (datetime.datetime.now() - self._last_subscription_check) < \
self._resubscribe_interval:
return
try:
current_topics: Set[str] = set()
for topic in self._consumer.list_topics().topics.keys():
if (self._subscribe_als and (topic == ALS_KAFKA_TOPIC)) or \
(self._subscribe_log and
(not topic.startswith("__"))):
current_topics.add(topic)
if current_topics <= self._subscribed_topics:
return
self._consumer.subscribe(list(current_topics))
self._subscribed_topics = current_topics
self._last_subscription_check = datetime.datetime.now()
except confluent_kafka.KafkaException as ex:
logging.error(f"Topic subscription error: {ex.args[0].str}")
raise
def poll(self, timeout_ms: int, max_records: int) \
-> Dict[str, List["KafkaClient.MessageInfo"]]:
""" Poll for new messages
Arguments:
timeout_ms -- Poll timeout in milliseconds. 0 to return immediately
max_records -- Maximum number of records to poll
Returns by-topic dictionary of MessageInfo objects
"""
timeout_s = timeout_ms / 1000
try:
fetched_offsets: Dict[Tuple[str, int], int] = {}
ret: Dict[str, List["KafkaClient.MessageInfo"]] = {}
start_time = datetime.datetime.now()
for _ in range(max_records):
message = self._consumer.poll(timeout_s)
if (message is None) or \
((datetime.datetime.now() -
start_time).total_seconds() > timeout_s):
break
kafka_error = message.error()
topic = message.topic()
if kafka_error is not None:
self._metrics.siphon_kafka_errors(
message.topic() or "None",
str(kafka_error.code())).inc()
else:
partition = message.partition()
offset = message.offset()
ret.setdefault(message.topic(), []).\
append(
self.MessageInfo(
position=KafkaPosition(
topic=topic, partition=partition,
offset=offset),
key=message.key(),
value=message.value()))
previous_offset = \
fetched_offsets.setdefault((topic, partition), -1)
fetched_offsets[(topic, partition)] = \
max(previous_offset, offset)
except confluent_kafka.KafkaException as ex:
logging.error(f"Message fetch error: {ex.args[0].str}")
raise
for (topic, partition), offset in fetched_offsets.items():
self._metrics.siphon_fetched_offsets(str(topic),
str(partition)).set(offset)
return ret
def commit(self, positions: Dict[str, Dict[int, int]]) -> None:
""" Commit given message positions
Arguments:
positions -- By-topic then by-partition maximum committed offsets
"""
offsets: List[confluent_kafka.TopicPartition] = []
for topic, offset_dict in positions.items():
for partition, offset in offset_dict.items():
offsets.append(
confluent_kafka.TopicPartition(
topic=topic, partition=partition, offset=offset + 1))
self._metrics.siphon_comitted_offsets(topic,
partition).set(offset)
try:
self._consumer.commit(offsets=offsets)
except confluent_kafka.KafkaException as ex:
logging.error(f"Offset commit error: {ex.args[0].str}")
raise
class Siphon:
""" Siphon (Kafka reader / DB updater
Private attributes:
_adb -- AlsDatabase object or None
_ldb -- LogsDatabase object or None
_kafka_client -- KafkaClient consumer wrapper object
_decode_error_writer -- Decode error table writer
_lookups -- Lookup collection
_cert_lookup -- Certificates' lookup
_afc_config_lookup -- AFC Configs lookup
_afc_server_lookup -- AFC Server name lookup
_customer_lookup -- Customer name lookup
_uls_lookup -- ULS ID lookup
_geo_data_lookup -- Geodetic Data ID lookup
_dev_desc_updater -- DeviceDescriptor table updater
_location_updater -- Location table updater
_compressed_json_updater -- Compressed JSON table updater
_max_eirp_updater -- Maximum EIRP table updater
_max_psd_updater -- Maximum PSD table updater
_req_resp_updater -- Request/Response table updater
_rx_envelope_updater -- AFC Request envelope table updater
_tx_envelope_updater -- AFC Response envelope tabgle updater
_req_resp_assoc_updater -- Request/Response to Message association
table updater
_afc_message_updater -- Message table updater
_kafka_positions -- Nonprocessed Kafka positions' collection
_als_bundles -- Incomplete ALS Bundler collection
_metrics -- Collection of Prometheus metrics
"""
# Number of messages fetched from Kafka in single access
KAFKA_MAX_RECORDS = 1000
# Kafka server access timeout if system is idle
KAFKA_IDLE_TIMEOUT_MS = 1000
# Maximum age (time since last update) of ALS Bundle
ALS_MAX_AGE_SEC = 1000
# Maximum number of requests in bundles to write to database
ALS_MAX_REQ_UPDATE = 5000
def __init__(self, adb: Optional[AlsDatabase], ldb: Optional[LogsDatabase],
kafka_client: KafkaClient) -> None:
""" Constructor
Arguments:
adb -- AlsDatabase object or None
ldb -- LogsDatabase object or None
kafka_client -- KafkaClient
"""
error_if(not (adb or ldb),
"Neither ALS nor Logs database specified. Nothing to do")
self._metrics = \
Metrics([("Counter", "siphon_kafka_polls",
"Number of Kafka polls"),
("Counter", "siphon_als_received",
"Number of ALS records received from Kafka"),
("Counter", "siphon_als_malformed",
"Number of malformed ALS records received from Kafka"),
("Counter", "siphon_log_received",
"Number of LOG records received from Kafka", ["topic"]),
("Counter", "siphon_log_malformed",
"Number of malformed LOG records received from Kafka",
["topic"]),
("Counter", "siphon_afc_msg_received",
"Number of AFC Request messages received"),
("Counter", "siphon_afc_msg_completed",
"Number of completed AFC Request messages"),
("Counter", "siphon_afc_req_completed",
"Number of completed AFC Requests"),
("Counter", "siphon_afc_msg_dropped",
"Number of incomplete AFC Request messages"),
("Gauge", "siphon_afc_msg_in_progress",
"Number of AFC Request messages awaiting completion")])
self._adb = adb
self._ldb = ldb
self._kafka_client = kafka_client
if self._adb:
self._decode_error_writer = DecodeErrorTableWriter(adb=self._adb)
self._lookups = Lookups()
self._cert_lookup = CertificationsLookup(adb=self._adb,
lookups=self._lookups)
self._afc_config_lookup = AfcConfigLookup(adb=self._adb,
lookups=self._lookups)
self._afc_server_lookup = \
StringLookup(adb=self._adb,
params=StringLookup.AFC_SERVER_PARAMS,
lookups=self._lookups)
self._customer_lookup = \
StringLookup(adb=self._adb,
params=StringLookup.CUSTOMER_PARAMS,
lookups=self._lookups)
self._uls_lookup = \
StringLookup(adb=self._adb,
params=StringLookup.ULS_PARAMS_PARAMS,
lookups=self._lookups)
self._geo_data_lookup = \
StringLookup(adb=self._adb,
params=StringLookup.GEO_DATA_PARAMS,
lookups=self._lookups)
self._dev_desc_updater = \
DeviceDescriptorTableUpdater(
adb=self._adb, cert_lookup=self._cert_lookup)
self._location_updater = LocationTableUpdater(adb=self._adb)
self._compressed_json_updater = \
CompressedJsonTableUpdater(adb=self._adb)
self._max_eirp_updater = MaxEirpTableUpdater(adb=self._adb)
self._max_psd_updater = MaxPsdTableUpdater(adb=self._adb)
self._req_resp_updater = \
RequestResponseTableUpdater(
adb=self._adb, afc_config_lookup=self._afc_config_lookup,
customer_lookup=self._customer_lookup,
uls_lookup=self._uls_lookup,
geo_data_lookup=self._geo_data_lookup,
compressed_json_updater=self._compressed_json_updater,
dev_desc_updater=self._dev_desc_updater,
location_updater=self._location_updater,
max_eirp_updater=self._max_eirp_updater,
max_psd_updater=self._max_psd_updater)
self._rx_envelope_updater = \
EnvelopeTableUpdater(
adb=self._adb,
params=EnvelopeTableUpdater.RX_ENVELOPE_PARAMS)
self._tx_envelope_updater = \
EnvelopeTableUpdater(
adb=self._adb,
params=EnvelopeTableUpdater.TX_ENVELOPE_PARAMS)
self._req_resp_assoc_updater = \
RequestResponseAssociationTableUpdater(
adb=self._adb,
request_response_updater=self._req_resp_updater)
self._afc_message_updater = \
AfcMessageTableUpdater(
adb=self._adb, afc_server_lookup=self._afc_server_lookup,
rr_assoc_updater=self._req_resp_assoc_updater,
rx_envelope_updater=self._rx_envelope_updater,
tx_envelope_updater=self._tx_envelope_updater,
decode_error_writer=self._decode_error_writer)
self._kafka_positions = KafkaPositions()
self._als_bundles = \
IncompleteAlsBundles(kafka_positions=self._kafka_positions)
def main_loop(self) -> None:
""" Read/write loop """
busy = True
while True:
kafka_messages_by_topic = \
self._kafka_client.poll(
timeout_ms=0 if busy else self.KAFKA_IDLE_TIMEOUT_MS,
max_records=self.KAFKA_MAX_RECORDS)
self._metrics.siphon_kafka_polls().inc()
busy = bool(kafka_messages_by_topic)
for topic, kafka_messages in kafka_messages_by_topic.items():
if topic == ALS_KAFKA_TOPIC:
self._read_als_kafka_messages(kafka_messages)
else:
self._process_log_kafka_messages(topic, kafka_messages)
if self._adb:
busy |= self._write_als_messages()
busy |= self._timeout_als_messages()
busy |= self._commit_kafka_offsets()
self._kafka_client.subscription_check()
def _read_als_kafka_messages(
self, kafka_messages: List[KafkaClient.MessageInfo]) -> None:
""" Put fetched ALS Kafka messages to store of incomplete bundles
Arguments:
topic -- ALS Topic name
kafka_messages -- List of raw Kafka messages
"""
for kafka_message in kafka_messages:
self._kafka_positions.add(kafka_message.position)
self._metrics.siphon_als_received().inc()
try:
assert kafka_message.key is not None
if self._als_bundles.add_message(
message_key=kafka_message.key,
message=AlsMessage(raw_msg=kafka_message.value),
kafka_position=kafka_message.position):
self._metrics.siphon_afc_msg_received().inc()
self._metrics.siphon_afc_msg_in_progress().set(
self._als_bundles.get_incomplete_count())
except (AlsProtocolError, JsonFormatError) as ex:
self._metrics.siphon_als_malformed().inc()
self._decode_error_writer.write_decode_error(
msg=ex.msg, line=ex.code_line, data=ex.data)
self._kafka_positions.mark_processed(
kafka_position=kafka_message.position)
def _process_log_kafka_messages(
self, topic: str, kafka_messages: List[KafkaClient.MessageInfo]) \
-> None:
""" Process non-ALS (i.e. JSON Log) messages for one topic
Arguments:
topic -- ALS Topic name
kafka_messages -- List of Kafka messages
"""
records: List[LogsDatabase.Record] = []
for kafka_message in kafka_messages:
self._metrics.siphon_log_received(topic).inc()
self._kafka_positions.add(kafka_message.position)
try:
log_message = json.loads(kafka_message.value)
records.append(
LogsDatabase.Record(
source=log_message["source"],
time=datetime.datetime.fromisoformat(
log_message["time"]),
log=json.loads(log_message["jsonData"])))
except (json.JSONDecodeError, LookupError, TypeError, ValueError) \
as ex:
self._metrics.siphon_log_malformed(topic).inc()
logging.error(
f"Can't decode log message '{kafka_message.value!r}': "
f"{repr(ex)}")
if records and (self._ldb is not None):
transaction: Optional[Any] = None
try:
transaction = self._ldb.conn.begin()
self._ldb.write_log(topic=topic, records=records)
transaction.commit()
transaction = None
finally:
if transaction is not None:
transaction.rollback()
self._kafka_positions.mark_processed(topic=topic)
def _write_als_messages(self) -> bool:
""" Write complete ALS Bundles to ALS database.
Returns True if any work was done """
assert self._adb is not None
month_idx = get_month_idx()
transaction: Optional[Any] = None
try:
data_dict = \
dict(
enumerate(self._als_bundles.fetch_assembled(
max_requests=self.ALS_MAX_REQ_UPDATE)))
if not data_dict:
return False
req_count = \
sum(bundle.request_count() for bundle in data_dict.values())
transaction = self._adb.conn.begin()
self._afc_message_updater.update_db(data_dict, month_idx=month_idx)
transaction.commit()
self._metrics.siphon_afc_msg_completed().inc(len(data_dict))
self._metrics.siphon_afc_req_completed().inc(req_count)
self._metrics.siphon_afc_msg_in_progress().set(
self._als_bundles.get_incomplete_count())
transaction = None
except JsonFormatError as ex:
if transaction is not None:
transaction.rollback()
transaction = None
self._lookups.reread()
self._decode_error_writer.write_decode_error(
ex.msg, line=ex.code_line, data=ex.data)
finally:
if transaction is not None:
transaction.rollback()
return True
def _timeout_als_messages(self) -> bool:
""" Throw away old incomplete ALS messages.
Returns True if any work was done """
boundary = datetime.datetime.now() - \
datetime.timedelta(seconds=self.ALS_MAX_AGE_SEC)
ret = False
while True:
oldest_bundle = self._als_bundles.get_oldest_bundle()
if (oldest_bundle is None) or \
(oldest_bundle.last_update() > boundary):
break
ret = True
self._als_bundles.remove_oldest_bundle()
self._decode_error_writer.write_decode_error(
"Incomplete message bundle removed",
line=LineNumber.current(),
data=oldest_bundle.dump())
self._metrics.siphon_afc_msg_dropped().inc()
return ret
def _commit_kafka_offsets(self) -> bool:
""" Commit completed Kafka offsets.
Returns True if any work was done """
completed_offsets = self._kafka_positions.get_processed_offsets()
if not completed_offsets:
return False
self._kafka_client.commit(completed_offsets)
return True
def read_sql_file(sql_file: str) -> str:
""" Returns content of SQL file properly cleaned """
with open(sql_file, encoding="ascii", newline=None) as f:
content = f.read()
# Removing -- and /* */ comments. Courtesy of stackoverflow :)
def replacer(match: re.Match) -> str:
""" Replacement callback """
s = match.group(0)
return " " if s.startswith('/') else s # /* */ comment is separator
return re.sub(
r'--.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"',
replacer, content, flags=re.DOTALL | re.MULTILINE)
ALS_PATCH = ["ALTER TABLE afc_server DROP CONSTRAINT IF EXISTS "
"afc_server_afc_server_name_key"]
def do_init_db(args: Any) -> None:
"""Execute "init" command.
Arguments:
args -- Parsed command line arguments
"""
databases: Set[DatabaseBase] = set()
try:
try:
init_db = InitialDatabase(arg_conn_str=args.init_postgres,
arg_password=args.init_postgres_password)
databases.add(init_db)
except sa.exc.SQLAlchemyError as ex:
error(f"Connection to {InitialDatabase.name_for_logs()} database "
f"failed: {ex}")
nothing_done = True
patch: List[str]
for conn_str, password, sql_file, template, db_class, sql_required, \
patch in \
[(args.als_postgres, args.als_postgres_password, args.als_sql,
args.als_template, AlsDatabase, True, ALS_PATCH),
(args.log_postgres, args.log_postgres_password, args.log_sql,
args.log_template, LogsDatabase, False, [])]:
if not (conn_str or sql_file or template):
continue
nothing_done = False
error_if(sql_file and (not os.path.isfile(sql_file)),
f"SQL file '{sql_file}' not found")
error_if(
sql_required and not sql_file,
f"SQL file is required for {db_class.name_for_logs()} "
f"database")
created = False
try:
database = db_class.parse_conn_str(conn_str).database
created = \
init_db.create_db(
db_name=database,
if_exists=InitialDatabase.IfExists(args.if_exists),
template=template, conn_str=conn_str,
password=password)
db = db_class(arg_conn_str=conn_str, arg_password=password)
databases.add(db)
with db.engine.connect() as conn:
if created and sql_file:
conn.execute(sa.text(read_sql_file(sql_file)))
if not created:
for cmd in patch:
conn.execute(sa.text(cmd))
except sa.exc.SQLAlchemyError as ex:
error(f"{db_class.name_for_logs()} database initialization "
f"failed: {ex}")
if created:
try:
init_db.drop_db(database)
except sa.exc.SQLAlchemyError:
pass
error_if(nothing_done, "Nothing to do")
finally:
for db in databases:
db.dispose()
def do_siphon(args: Any) -> None:
"""Execute "siphon" command.
Arguments:
args -- Parsed command line arguments
"""
if args.prometheus_port is not None:
prometheus_client.start_http_server(args.prometheus_port)
adb = AlsDatabase(arg_conn_str=args.als_postgres,
arg_password=args.als_postgres_password) \
if args.als_postgres else None
ldb = LogsDatabase(arg_conn_str=args.log_postgres,
arg_password=args.log_postgres_password) \
if args.log_postgres else None
try:
kafka_client = \
KafkaClient(args=args, subscribe_als=adb is not None,
subscribe_log=ldb is not None,
resubscribe_interval_s=5)
siphon = Siphon(adb=adb, ldb=ldb, kafka_client=kafka_client)
siphon.main_loop()
finally:
if adb is not None:
adb.dispose()
if ldb is not None:
ldb.dispose()
def do_init_siphon(args: Any) -> None:
"""Execute "init_siphon" command.
Arguments:
args -- Parsed command line arguments
"""
do_init_db(args)
do_siphon(args)
def do_help(args: Any) -> None:
"""Execute "help" command.
Arguments:
args -- Parsed command line arguments (also contains 'argument_parser' and
'subparsers' fields)
"""
if args.subcommand is None:
args.argument_parser.print_help()
else:
args.subparsers.choices[args.subcommand].print_help()
def docker_arg_type(final_type: Callable[[Any], Any], default: Any = None,
required: bool = False) -> Callable[[str], Any]:
""" Generator of argument converter for Docker environment
Empty argument value passed from environment-variable-initialized argument
(e.g. from Docker) should be treated as nonspecified. Boolean values
passed from environment-variable-initialized argument should also be
treated specially
Arguments:
final_type -- Type converter for nonempty argument
default -- Default value for empty argument
required -- True if argument is required (can't be empty)
Returns argument converter function
"""
assert (not required) or (default is None)
def arg_converter(arg: str) -> Any:
""" Type conversion function that will be used by argparse """
try:
if arg in ("", None):
if required:
raise ValueError("Parameter is required")
return default
if final_type == bool:
if arg.lower() in ("yes", "true", "+", "1"):
return True
if arg.lower() in ("no", "false", "-", "0"):
return False
raise \
argparse.ArgumentTypeError(
"Wrong representation for boolean argument")
return final_type(arg)
except Exception as ex:
raise \
argparse.ArgumentTypeError(
f"Command line argument '{arg}' has invalid format: "
f"{repr(ex)}")
return arg_converter
def main(argv: List[str]) -> None:
"""Do the job.
Arguments:
argv -- Program arguments
"""
# Kafka server connection switches
switches_kafka = argparse.ArgumentParser(add_help=False)
switches_kafka.add_argument(
"--kafka_servers", "-k", metavar="SERVER[:PORT][,SERVER2[:PORT2]...]",
type=docker_arg_type(str, default=DEFAULT_KAFKA_SERVER),
help=f"Comma-separated Kafka bootstrap server(s). Default is "
f"'{DEFAULT_KAFKA_SERVER}'")
switches_kafka.add_argument(
"--kafka_client_id", metavar="CLIENT_ID[@]",
type=docker_arg_type(str, default=DEFAULT_KAFKA_CLIENT_ID),
help=f"ID of this instance to be used in Kafka logs. If ends with "
f"'@' - supplemented with random string (to achieve uniqueness). "
f"Default is '{DEFAULT_KAFKA_CLIENT_ID}'")
switches_kafka.add_argument(
"--kafka_security_protocol", choices=["", "PLAINTEXT", "SSL"],
type=docker_arg_type(str, default="PLAINTEXT"),
help="Security protocol to use. Default is 'PLAINTEXT'")
switches_kafka.add_argument(
"--kafka_ssl_keyfile", metavar="FILENAME", type=docker_arg_type(str),
help="Client private key file for SSL authentication")
switches_kafka.add_argument(
"--kafka_ssl_cafile", metavar="FILENAME", type=docker_arg_type(str),
help="CA file for certificate verification")
switches_kafka.add_argument(
"--kafka_ssl_ciphers", metavar="CIPHERS", type=docker_arg_type(str),
help="Available ciphers in OpenSSL cipher list format")
switches_kafka.add_argument(
"--kafka_max_partition_fetch_bytes", metavar="SIZE_IN_BYTES",
type=docker_arg_type(int),
help="Maximum size of Kafka message (default is 1MB)")
switches_als_db = argparse.ArgumentParser(add_help=False)
switches_als_db.add_argument(
"--als_postgres",
metavar="[driver://][user][@host][:port][/database][?...]",
type=docker_arg_type(str),
help=f"ALS Database connection string. If some part (driver, user, "
f"host port database) is missing - it is taken from the default "
f"connection string (which is '{AlsDatabase.default_conn_str()}'. "
f"Connection parameters may be specified after '?' - see "
f"https://www.postgresql.org/docs/current/libpq-connect.html"
f"#LIBPQ-CONNSTRING for details")
switches_als_db.add_argument(
"--als_postgres_password", metavar="PASSWORD",
type=docker_arg_type(str),
help="Password to use for ALS Database connection")
switches_log_db = argparse.ArgumentParser(add_help=False)
switches_log_db.add_argument(
"--log_postgres",
metavar="[driver://][user][@host][:port][/database][?...]",
type=docker_arg_type(str),
help=f"Log Database connection string. If some part (driver, user, "
f"host port database) is missing - it is taken from the default "
f"connection string (which is '{LogsDatabase.default_conn_str()}'. "
f"Connection parameters may be specified after '?' - see "
f"https://www.postgresql.org/docs/current/libpq-connect.html"
f"#LIBPQ-CONNSTRING for details. Default is not use log database")
switches_log_db.add_argument(
"--log_postgres_password", metavar="PASSWORD",
type=docker_arg_type(str),
help="Password to use for Log Database connection")
switches_init = argparse.ArgumentParser(add_help=False)
switches_init.add_argument(
"--init_postgres",
metavar="[driver://][user][@host][:port][/database][?...]",
type=docker_arg_type(str),
help=f"Connection string to initial database used as a context for "
"other databases' creation. If some part (driver, user, host port "
f"database) is missing - it is taken from the default connection "
f"string (which is '{InitialDatabase.default_conn_str()}'. Connection "
f"parameters may be specified after '?' - see "
f"https://www.postgresql.org/docs/current/libpq-connect.html"
f"#LIBPQ-CONNSTRING for details")
switches_init.add_argument(
"--init_postgres_password", metavar="PASSWORD",
type=docker_arg_type(str),
help="Password to use for initial database connection")
switches_init.add_argument(
"--if_exists", choices=["skip", "drop"],
type=docker_arg_type(str, default="exc"),
help="What to do if database already exist: nothing (skip) or "
"recreate (drop). Default is to fail")
switches_init.add_argument(
"--als_template", metavar="DB_NAME", type=docker_arg_type(str),
help="Template database (e.g. bearer of required extensions) to use "
"for ALS database creation. E.g. postgis/postgis image strangely "
"assigns Postgis extension on 'template_postgis' database instead of "
"on default 'template0/1'")
switches_init.add_argument(
"--log_template", metavar="DB_NAME", type=docker_arg_type(str),
help="Template database to use for JSON Logs database creation")
switches_init.add_argument(
"--als_sql", metavar="SQL_FILE", type=docker_arg_type(str),
help="SQL command file that creates tables, relations, etc. in ALS "
"database. If neither this parameter nor --als_postgres is specified "
"ALS database is not being created")
switches_init.add_argument(
"--log_sql", metavar="SQL_FILE", type=docker_arg_type(str),
help="SQL command file that creates tables, relations, etc. in JSON "
"log database. By default database created (if --log_postgres is "
"specified) empty")
switches_siphon = argparse.ArgumentParser(add_help=False)
switches_siphon.add_argument(
"--prometheus_port", metavar="PORT", type=docker_arg_type(int),
help="Port to serve Prometheus metrics on")
# Top level parser
argument_parser = argparse.ArgumentParser(
description=f"Tool for moving data from Kafka to PostgreSQL/PostGIS "
f"database. V{VERSION}")
subparsers = argument_parser.add_subparsers(dest="subcommand",
metavar="SUBCOMMAND")
parser_init_db = subparsers.add_parser(
"init_db", parents=[switches_init, switches_als_db, switches_log_db],
help="Initialize ALS and/or JSON Log database")
parser_init_db.set_defaults(func=do_init_db)
parser_siphon = subparsers.add_parser(
"siphon",
parents=[switches_kafka, switches_als_db, switches_log_db,
switches_siphon],
help="Siphon data from Kafka queue to ALS database")
parser_siphon.set_defaults(func=do_siphon)
parser_init_siphon = subparsers.add_parser(
"init_siphon",
parents=[switches_init, switches_kafka, switches_als_db,
switches_log_db, switches_siphon],
help="Combination of 'db_init' and 'siphon' for Docker convenience")
parser_init_siphon.set_defaults(func=do_init_siphon)
# Subparser for 'help' command
parser_help = subparsers.add_parser(
"help", add_help=False, usage="%(prog)s subcommand",
help="Prints help on given subcommand")
parser_help.add_argument(
"subcommand", metavar="SUBCOMMAND", nargs="?",
choices=subparsers.choices,
help="Name of subcommand to print help about (use " +
"\"%(prog)s --help\" to get list of all subcommands)")
parser_help.set_defaults(func=do_help, subparsers=subparsers,
argument_parser=argument_parser)
if not argv:
argument_parser.print_help()
sys.exit(1)
args = argument_parser.parse_args(argv)
# Set up logging
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter(
f"{os.path.basename(__file__)}. %(levelname)s: %(message)s"))
logging.getLogger().addHandler(console_handler)
logging.getLogger().setLevel(logging.INFO)
if args.func != do_help:
logging.info("Arguments:")
for arg, value in \
sorted(args.__dict__.items(), key=lambda kvp: kvp[0]):
if (arg != "func") and (value is not None):
logging.info(f" {arg}: {value}")
# Do the needful
args.func(args)
if __name__ == "__main__":
main(sys.argv[1:])