Files
openafc_final/als/als_query.py
2024-03-25 10:11:24 -04:00

325 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
# Tool for querying ALS database
# Copyright (C) 2022 Broadcom. All rights reserved.
# The term "Broadcom" refers solely to the Broadcom Inc. corporate affiliate
# that owns the software below.
# This work is licensed under the OpenAFC Project License, a copy of which is
# included with this software program.
import argparse
import csv
import datetime
import json
import logging
import os
import psycopg2
import re
import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as sa_pg
import subprocess
import sys
from typing import Any, List, NamedTuple, Optional, Set
try:
import geoalchemy2 as ga # type: ignore
except ImportError:
pass
VERSION = "0.1"
DEFAULT_USER = "postgres"
DEFAULT_PORT = 5432
ALS_DB = "ALS"
LOG_DB = "AFC_LOGS"
# Environment variables holding parts of database connection string
DbEnv = \
NamedTuple(
"DbEnv",
[
# Host name
("host", str),
# Port
("port", str),
# Username
("user", str),
# Password
("password", str),
# Options
("options", str)])
# Environment variable names for ALS and JSON log databases' connection strings
DB_ENVS = {ALS_DB:
DbEnv(host="POSTGRES_HOST", port="POSTGRES_PORT",
user="POSTGRES_ALS_USER", password="POSTGRES_ALS_PASSWORD",
options="POSTGRES_ALS_OPTIONS"),
LOG_DB:
DbEnv(host="POSTGRES_HOST", port="POSTGRES_PORT",
user="POSTGRES_LOG_USER", password="POSTGRES_LOG_PASSWORD",
options="POSTGRES_LOG_OPTIONS")}
def error(msg: str) -> None:
""" Prints given msg as error message and exit abnormally """
logging.error(msg)
sys.exit(1)
def error_if(cond: Any, msg: str) -> None:
""" If condition evaluates to true prints given msg as error message and
exits abnormally """
if cond:
error(msg)
class DbConn:
""" Database connection encapsulation
Attributes:
db_name -- Database name
engine -- Database engine
metadata -- Database metadata
conn -- Database connection
"""
def __init__(self, conn_str: Optional[str], password: Optional[str],
db_name: str) -> None:
""" Constructor
Arguments:
conn_str -- Abbreviated conneftion string, as specified in command
line. None means take from environment variable
password -- Optional password
db_name -- Database name
"""
self.db_name = db_name
if conn_str:
m = re.match(
r"^(?P<user>[^ :\?]+@)?"
r"(?P<cont>\^)?(?P<host>[^ :?]+)"
r"(:(?P<port>\d+))?"
r"(?P<options>\?.+)?$",
conn_str)
error_if(not m, f"Server string '{conn_str}' has invalid format")
assert m is not None
user = m.group("user") or DEFAULT_USER
host = m.group("host")
port = m.group("port") or str(DEFAULT_PORT)
options = m.group("options") or ""
if m.group("cont"):
try:
insp_str = \
subprocess.check_output(["docker", "inspect", host])
except (OSError, subprocess.CalledProcessError) as ex:
error(f"Failed to inspect container '{host}': {ex}")
insp = json.loads(insp_str)
try:
networks = insp[0]["NetworkSettings"]["Networks"]
host = networks[list(networks.keys())[0]]["IPAddress"]
except (LookupError, TypeError, ValueError) as ex:
error(f"Failed to find server IP address in container "
f"inspection: {ex}")
else:
db_env = DB_ENVS[db_name]
error_if(db_env.host not in os.environ,
f"PostgreSQL server neither specified explicitly (via "
f"--server parameter) nor via environment (via "
f"'{db_env.host}' variable and related ones)")
host = os.environ[db_env.host]
port = os.environ.get(db_env.port, str(DEFAULT_PORT))
user = os.environ.get(db_env.user, str(DEFAULT_USER))
options = os.environ.get(db_env.options, "")
password = password or os.environ.get(db_env.password)
try:
full_conn_str = \
f"postgresql+psycopg2://{user}" \
f"{(':' + password) if password else ''}@{host}:{port}/" \
f"{db_name}{options}"
self.engine = sa.create_engine(full_conn_str)
self.metadata = sa.MetaData()
self.metadata.reflect(bind=self.engine)
self.conn = self.engine.connect()
except sa.exc.SQLAlchemyError as ex:
error(f"Failed to connect to '{db_name}' at '{conn_str}' "
f"('{full_conn_str}'): {ex}")
class JsonEncoder(json.JSONEncoder):
""" JSON encoder that handles unusual types """
def default(self, o: Any) -> Any:
""" Handles unusual data types """
if isinstance(o, datetime.datetime):
return o.isoformat()
return super().default(o)
def do_log(args: Any) -> None:
"""Execute "log" command.
Arguments:
args -- Parsed command line arguments
"""
db_conn = \
DbConn(conn_str=args.server, password=args.password, db_name=LOG_DB)
work_done = False
if args.topics:
work_done = True
for topic in sorted(db_conn.metadata.tables.keys()):
print(topic)
if args.sources is not None:
work_done = True
sources: Set[str] = set()
error_if(
args.sources and (args.sources not in db_conn.metadata.tables),
f"Topic '{args.sources}' not found")
for topic in db_conn.metadata.tables.keys():
if "source" not in db_conn.metadata.tables[topic].c:
continue
if args.sources and (args.sources != topic):
continue
table_sources = \
db_conn.conn.execute(
sa.text(f'SELECT DISTINCT source FROM "{topic}"')).\
fetchall()
sources |= {s[0] for s in table_sources}
for source in sorted(sources):
print(source)
if args.SELECT:
work_done = True
try:
rp = db_conn.conn.execute(
sa.text("SELECT " + " ".join(args.SELECT)))
if args.format == "bare":
for record in rp:
error_if(
len(record) != 1,
f"Bare format assumes one field per result row "
f"(this query has {len(record)} fields per record)")
print(record[0])
elif args.format == "json":
print("[")
for record in rp:
print(" " + json.dumps(record._asdict(),
cls=JsonEncoder))
print("]")
elif args.format == "csv":
csv_writer = csv.writer(sys.stdout)
csv_writer.writerow(rp.keys())
for record in rp:
csv_writer.writerow(record)
else:
error(f"Internal error: unsupported output format "
f"'{args.format}'")
except sa.exc.SQLAlchemyError as ex:
error(f"Database acces error: {ex}")
error_if(not work_done, "Nothing to do!")
def do_help(args: Any) -> None:
"""Execute "help" command.
Arguments:
args -- Parsed command line arguments (also contains 'argument_parser' and
'subparsers' fields)
"""
if args.subcommand is None:
args.argument_parser.print_help()
else:
args.subparsers.choices[args.subcommand].print_help()
def main(argv: List[str]) -> None:
"""Do the job.
Arguments:
argv -- Program arguments
"""
# Database connection switches
switches_server = argparse.ArgumentParser(add_help=False)
switches_server.add_argument(
"--server", "-s", metavar="[user@]{host|^container}[:port][?options]",
help=f"PostgreSQL server connection information. Host part may be a "
f"hostname, IP address, or container name or ID, preceded by '^' "
f"(specifying container name would not work if script runs inside the "
f"container). Default username is '{DEFAULT_USER}', default port is "
f"{DEFAULT_PORT}. Options may specify various (e.g. SSL-related) "
f"parameters (see "
f"https://www.postgresql.org/docs/current/libpq-connect.html"
f"#LIBPQ-CONNSTRING for details). If omitted, script tries to use "
f"data from POSTGRES_HOST, POSTGRES_PORT, POSTGRES_LOG_USER, "
f"POSTGRES_LOG_OPTIONS, POSTGRES_ALS_USER, POSTGRES_ALS_OPTIONS "
f"environment variables")
switches_server.add_argument(
"--password", metavar="PASSWORD",
help="Postgres connection password (if required). If omitted and "
"--server not specified then values from POSTGRES_LOG_PASSWORD and "
"POSTGRES_ALS_PASSWORD environment variables are used")
# Top level parser
argument_parser = argparse.ArgumentParser(
description=f"Tool for querying ALS database. V{VERSION}")
subparsers = argument_parser.add_subparsers(dest="subcommand",
metavar="SUBCOMMAND")
# Subparser for "log" command
parser_log = subparsers.add_parser(
"log", parents=[switches_server],
help="Read JSON log messages")
parser_log.add_argument(
"--topics", action="store_true",
help="Print list of topics, stored in database")
parser_log.add_argument(
"--sources", metavar="[TOPIC]", nargs="?", const="",
help="Print list of log sources - for all topics or for specific "
"topic")
parser_log.add_argument(
"--format", "-f", choices=["bare", "json", "csv"], default="csv",
help="Output format for 'SELECT' result. 'bare' is unadorned "
"value-per-line output (must be just one field per result row), "
"'csv' - CSV format (default), 'json' - JSON format")
parser_log.add_argument(
"SELECT", nargs="*",
help="SELECT command body (without 'SELECT'). 'FROM' clause should "
"use topic name, column names are 'time' (timetag), 'source' (AFC or "
"whatever server ID) and 'log' (JSON log record). Surrounding quotes "
"are optional")
parser_log.set_defaults(func=do_log)
# Subparser for 'help' command
parser_help = subparsers.add_parser(
"help", add_help=False, usage="%(prog)s subcommand",
help="Prints help on given subcommand")
parser_help.add_argument(
"subcommand", metavar="SUBCOMMAND", nargs="?",
choices=subparsers.choices,
help="Name of subcommand to print help about (use " +
"\"%(prog)s --help\" to get list of all subcommands)")
parser_help.set_defaults(func=do_help, subparsers=subparsers,
argument_parser=argument_parser)
if not argv:
argument_parser.print_help()
sys.exit(1)
args = argument_parser.parse_args(argv)
# Set up logging
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter(
f"{os.path.basename(__file__)}. %(levelname)s: %(message)s"))
logging.getLogger().addHandler(console_handler)
logging.getLogger().setLevel(logging.INFO)
# Do the needful
args.func(args)
if __name__ == "__main__":
main(sys.argv[1:])