mirror of
				https://github.com/Telecominfraproject/openafc_final.git
				synced 2025-10-31 01:57:46 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			325 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			325 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| # Tool for querying ALS database
 | |
| 
 | |
| # Copyright (C) 2022 Broadcom. All rights reserved.
 | |
| # The term "Broadcom" refers solely to the Broadcom Inc. corporate affiliate
 | |
| # that owns the software below.
 | |
| # This work is licensed under the OpenAFC Project License, a copy of which is
 | |
| # included with this software program.
 | |
| 
 | |
| import argparse
 | |
| import csv
 | |
| import datetime
 | |
| import json
 | |
| import logging
 | |
| import os
 | |
| import psycopg2
 | |
| import re
 | |
| import sqlalchemy as sa
 | |
| import sqlalchemy.dialects.postgresql as sa_pg
 | |
| import subprocess
 | |
| import sys
 | |
| from typing import Any, List, NamedTuple, Optional, Set
 | |
| 
 | |
| try:
 | |
|     import geoalchemy2 as ga                        # type: ignore
 | |
| except ImportError:
 | |
|     pass
 | |
| 
 | |
| VERSION = "0.1"
 | |
| 
 | |
| DEFAULT_USER = "postgres"
 | |
| DEFAULT_PORT = 5432
 | |
| 
 | |
| ALS_DB = "ALS"
 | |
| LOG_DB = "AFC_LOGS"
 | |
| 
 | |
| # Environment variables holding parts of database connection string
 | |
| DbEnv = \
 | |
|     NamedTuple(
 | |
|         "DbEnv",
 | |
|         [
 | |
|             # Host name
 | |
|             ("host", str),
 | |
|             # Port
 | |
|             ("port", str),
 | |
|             # Username
 | |
|             ("user", str),
 | |
|             # Password
 | |
|             ("password", str),
 | |
|             # Options
 | |
|             ("options", str)])
 | |
| 
 | |
| # Environment variable names for ALS and JSON log databases' connection strings
 | |
| DB_ENVS = {ALS_DB:
 | |
|            DbEnv(host="POSTGRES_HOST", port="POSTGRES_PORT",
 | |
|                  user="POSTGRES_ALS_USER", password="POSTGRES_ALS_PASSWORD",
 | |
|                  options="POSTGRES_ALS_OPTIONS"),
 | |
|            LOG_DB:
 | |
|            DbEnv(host="POSTGRES_HOST", port="POSTGRES_PORT",
 | |
|                  user="POSTGRES_LOG_USER", password="POSTGRES_LOG_PASSWORD",
 | |
|                  options="POSTGRES_LOG_OPTIONS")}
 | |
| 
 | |
| 
 | |
| def error(msg: str) -> None:
 | |
|     """ Prints given msg as error message and exit abnormally """
 | |
|     logging.error(msg)
 | |
|     sys.exit(1)
 | |
| 
 | |
| 
 | |
| def error_if(cond: Any, msg: str) -> None:
 | |
|     """ If condition evaluates to true prints given msg as error message and
 | |
|     exits abnormally """
 | |
|     if cond:
 | |
|         error(msg)
 | |
| 
 | |
| 
 | |
| class DbConn:
 | |
|     """ Database connection encapsulation
 | |
| 
 | |
|     Attributes:
 | |
|     db_name  -- Database name
 | |
|     engine   -- Database engine
 | |
|     metadata -- Database metadata
 | |
|     conn     -- Database connection
 | |
|     """
 | |
| 
 | |
|     def __init__(self, conn_str: Optional[str], password: Optional[str],
 | |
|                  db_name: str) -> None:
 | |
|         """ Constructor
 | |
| 
 | |
|         Arguments:
 | |
|         conn_str -- Abbreviated conneftion string, as specified in command
 | |
|                     line. None means take from environment variable
 | |
|         password -- Optional password
 | |
|         db_name  -- Database name
 | |
|         """
 | |
|         self.db_name = db_name
 | |
| 
 | |
|         if conn_str:
 | |
|             m = re.match(
 | |
|                 r"^(?P<user>[^ :\?]+@)?"
 | |
|                 r"(?P<cont>\^)?(?P<host>[^ :?]+)"
 | |
|                 r"(:(?P<port>\d+))?"
 | |
|                 r"(?P<options>\?.+)?$",
 | |
|                 conn_str)
 | |
|             error_if(not m, f"Server string '{conn_str}' has invalid format")
 | |
|             assert m is not None
 | |
| 
 | |
|             user = m.group("user") or DEFAULT_USER
 | |
|             host = m.group("host")
 | |
|             port = m.group("port") or str(DEFAULT_PORT)
 | |
|             options = m.group("options") or ""
 | |
|             if m.group("cont"):
 | |
|                 try:
 | |
|                     insp_str = \
 | |
|                         subprocess.check_output(["docker", "inspect", host])
 | |
|                 except (OSError, subprocess.CalledProcessError) as ex:
 | |
|                     error(f"Failed to inspect container '{host}': {ex}")
 | |
|                 insp = json.loads(insp_str)
 | |
|                 try:
 | |
|                     networks = insp[0]["NetworkSettings"]["Networks"]
 | |
|                     host = networks[list(networks.keys())[0]]["IPAddress"]
 | |
|                 except (LookupError, TypeError, ValueError) as ex:
 | |
|                     error(f"Failed to find server IP address in container "
 | |
|                           f"inspection: {ex}")
 | |
|         else:
 | |
|             db_env = DB_ENVS[db_name]
 | |
|             error_if(db_env.host not in os.environ,
 | |
|                      f"PostgreSQL server neither specified explicitly (via "
 | |
|                      f"--server parameter) nor via environment (via "
 | |
|                      f"'{db_env.host}' variable and related ones)")
 | |
|             host = os.environ[db_env.host]
 | |
|             port = os.environ.get(db_env.port, str(DEFAULT_PORT))
 | |
|             user = os.environ.get(db_env.user, str(DEFAULT_USER))
 | |
|             options = os.environ.get(db_env.options, "")
 | |
|             password = password or os.environ.get(db_env.password)
 | |
|         try:
 | |
|             full_conn_str = \
 | |
|                 f"postgresql+psycopg2://{user}" \
 | |
|                 f"{(':' + password) if password else ''}@{host}:{port}/" \
 | |
|                 f"{db_name}{options}"
 | |
|             self.engine = sa.create_engine(full_conn_str)
 | |
|             self.metadata = sa.MetaData()
 | |
|             self.metadata.reflect(bind=self.engine)
 | |
|             self.conn = self.engine.connect()
 | |
|         except sa.exc.SQLAlchemyError as ex:
 | |
|             error(f"Failed to connect to '{db_name}' at '{conn_str}' "
 | |
|                   f"('{full_conn_str}'): {ex}")
 | |
| 
 | |
| 
 | |
| class JsonEncoder(json.JSONEncoder):
 | |
|     """ JSON encoder that handles unusual types """
 | |
| 
 | |
|     def default(self, o: Any) -> Any:
 | |
|         """ Handles unusual data types """
 | |
|         if isinstance(o, datetime.datetime):
 | |
|             return o.isoformat()
 | |
|         return super().default(o)
 | |
| 
 | |
| 
 | |
| def do_log(args: Any) -> None:
 | |
|     """Execute "log" command.
 | |
| 
 | |
|     Arguments:
 | |
|     args -- Parsed command line arguments
 | |
|     """
 | |
|     db_conn = \
 | |
|         DbConn(conn_str=args.server, password=args.password, db_name=LOG_DB)
 | |
|     work_done = False
 | |
|     if args.topics:
 | |
|         work_done = True
 | |
|         for topic in sorted(db_conn.metadata.tables.keys()):
 | |
|             print(topic)
 | |
|     if args.sources is not None:
 | |
|         work_done = True
 | |
|         sources: Set[str] = set()
 | |
|         error_if(
 | |
|             args.sources and (args.sources not in db_conn.metadata.tables),
 | |
|             f"Topic '{args.sources}' not found")
 | |
|         for topic in db_conn.metadata.tables.keys():
 | |
|             if "source" not in db_conn.metadata.tables[topic].c:
 | |
|                 continue
 | |
|             if args.sources and (args.sources != topic):
 | |
|                 continue
 | |
|             table_sources = \
 | |
|                 db_conn.conn.execute(
 | |
|                     sa.text(f'SELECT DISTINCT source FROM "{topic}"')).\
 | |
|                 fetchall()
 | |
|             sources |= {s[0] for s in table_sources}
 | |
|         for source in sorted(sources):
 | |
|             print(source)
 | |
|     if args.SELECT:
 | |
|         work_done = True
 | |
|         try:
 | |
|             rp = db_conn.conn.execute(
 | |
|                 sa.text("SELECT " + " ".join(args.SELECT)))
 | |
|             if args.format == "bare":
 | |
|                 for record in rp:
 | |
|                     error_if(
 | |
|                         len(record) != 1,
 | |
|                         f"Bare format assumes one field per result row "
 | |
|                         f"(this query has {len(record)} fields per record)")
 | |
|                     print(record[0])
 | |
|             elif args.format == "json":
 | |
|                 print("[")
 | |
|                 for record in rp:
 | |
|                     print("    " + json.dumps(record._asdict(),
 | |
|                                               cls=JsonEncoder))
 | |
|                 print("]")
 | |
|             elif args.format == "csv":
 | |
|                 csv_writer = csv.writer(sys.stdout)
 | |
|                 csv_writer.writerow(rp.keys())
 | |
|                 for record in rp:
 | |
|                     csv_writer.writerow(record)
 | |
|             else:
 | |
|                 error(f"Internal error: unsupported output format "
 | |
|                       f"'{args.format}'")
 | |
|         except sa.exc.SQLAlchemyError as ex:
 | |
|             error(f"Database acces error: {ex}")
 | |
|     error_if(not work_done, "Nothing to do!")
 | |
| 
 | |
| 
 | |
| def do_help(args: Any) -> None:
 | |
|     """Execute "help" command.
 | |
| 
 | |
|     Arguments:
 | |
|     args -- Parsed command line arguments (also contains 'argument_parser' and
 | |
|             'subparsers' fields)
 | |
|     """
 | |
|     if args.subcommand is None:
 | |
|         args.argument_parser.print_help()
 | |
|     else:
 | |
|         args.subparsers.choices[args.subcommand].print_help()
 | |
| 
 | |
| 
 | |
| def main(argv: List[str]) -> None:
 | |
|     """Do the job.
 | |
| 
 | |
|     Arguments:
 | |
|     argv -- Program arguments
 | |
|     """
 | |
|     # Database connection switches
 | |
|     switches_server = argparse.ArgumentParser(add_help=False)
 | |
|     switches_server.add_argument(
 | |
|         "--server", "-s", metavar="[user@]{host|^container}[:port][?options]",
 | |
|         help=f"PostgreSQL server connection information. Host part may be a "
 | |
|         f"hostname, IP address, or container name or ID, preceded by '^' "
 | |
|         f"(specifying container name would not work if script runs inside the "
 | |
|         f"container). Default username is '{DEFAULT_USER}', default port is "
 | |
|         f"{DEFAULT_PORT}. Options may specify various (e.g. SSL-related) "
 | |
|         f"parameters (see "
 | |
|         f"https://www.postgresql.org/docs/current/libpq-connect.html"
 | |
|         f"#LIBPQ-CONNSTRING for details). If omitted, script tries to use "
 | |
|         f"data from POSTGRES_HOST, POSTGRES_PORT, POSTGRES_LOG_USER, "
 | |
|         f"POSTGRES_LOG_OPTIONS, POSTGRES_ALS_USER, POSTGRES_ALS_OPTIONS "
 | |
|         f"environment variables")
 | |
|     switches_server.add_argument(
 | |
|         "--password", metavar="PASSWORD",
 | |
|         help="Postgres connection password (if required). If omitted and "
 | |
|         "--server not specified then values from POSTGRES_LOG_PASSWORD and "
 | |
|         "POSTGRES_ALS_PASSWORD environment variables are used")
 | |
| 
 | |
|     # Top level parser
 | |
|     argument_parser = argparse.ArgumentParser(
 | |
|         description=f"Tool for querying ALS database. V{VERSION}")
 | |
|     subparsers = argument_parser.add_subparsers(dest="subcommand",
 | |
|                                                 metavar="SUBCOMMAND")
 | |
| 
 | |
|     # Subparser for "log" command
 | |
|     parser_log = subparsers.add_parser(
 | |
|         "log", parents=[switches_server],
 | |
|         help="Read JSON log messages")
 | |
|     parser_log.add_argument(
 | |
|         "--topics", action="store_true",
 | |
|         help="Print list of topics, stored in database")
 | |
|     parser_log.add_argument(
 | |
|         "--sources", metavar="[TOPIC]", nargs="?", const="",
 | |
|         help="Print list of log sources - for all topics or for specific "
 | |
|         "topic")
 | |
|     parser_log.add_argument(
 | |
|         "--format", "-f", choices=["bare", "json", "csv"], default="csv",
 | |
|         help="Output format for 'SELECT' result. 'bare' is unadorned "
 | |
|         "value-per-line output (must be just one field per result row), "
 | |
|         "'csv' - CSV format (default), 'json' - JSON format")
 | |
| 
 | |
|     parser_log.add_argument(
 | |
|         "SELECT", nargs="*",
 | |
|         help="SELECT command body (without 'SELECT'). 'FROM' clause should "
 | |
|         "use topic name, column names are 'time' (timetag), 'source' (AFC or "
 | |
|         "whatever server ID) and 'log' (JSON log record). Surrounding quotes "
 | |
|         "are optional")
 | |
|     parser_log.set_defaults(func=do_log)
 | |
| 
 | |
|     # Subparser for 'help' command
 | |
|     parser_help = subparsers.add_parser(
 | |
|         "help", add_help=False, usage="%(prog)s subcommand",
 | |
|         help="Prints help on given subcommand")
 | |
|     parser_help.add_argument(
 | |
|         "subcommand", metavar="SUBCOMMAND", nargs="?",
 | |
|         choices=subparsers.choices,
 | |
|         help="Name of subcommand to print help about (use " +
 | |
|         "\"%(prog)s --help\" to get list of all subcommands)")
 | |
|     parser_help.set_defaults(func=do_help, subparsers=subparsers,
 | |
|                              argument_parser=argument_parser)
 | |
| 
 | |
|     if not argv:
 | |
|         argument_parser.print_help()
 | |
|         sys.exit(1)
 | |
|     args = argument_parser.parse_args(argv)
 | |
| 
 | |
|     # Set up logging
 | |
|     console_handler = logging.StreamHandler()
 | |
|     console_handler.setFormatter(
 | |
|         logging.Formatter(
 | |
|             f"{os.path.basename(__file__)}. %(levelname)s: %(message)s"))
 | |
|     logging.getLogger().addHandler(console_handler)
 | |
|     logging.getLogger().setLevel(logging.INFO)
 | |
| 
 | |
|     # Do the needful
 | |
|     args.func(args)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main(sys.argv[1:])
 | 
