""" ULS service state database """ # Copyright (C) 2022 Broadcom. All rights reserved. # The term "Broadcom" refers solely to the Broadcom Inc. corporate affiliate # that owns the software below. # This work is licensed under the OpenAFC Project License, a copy of which is # included with this software program. # pylint: disable=invalid-name, broad-exception-caught, wildcard-import # pylint: disable=wrong-import-order, unused-wildcard-import import datetime import enum import sqlalchemy as sa import sqlalchemy.dialects.postgresql as sa_pg from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Set import urllib.parse from uls_service_common import * # FS database downloader milestone. Names used in database and Prometheus # metrics. Please change judiciously (if at all) DownloaderMilestone = \ enum.Enum("DownloaderMilestone", [ # Service birth (first write to milestone database) "ServiceBirth", # Service start "ServiceStart", # Download start "DownloadStart", # Download success "DownloadSuccess", # FS region changed "RegionChanged", # FS database file updated "DbUpdated", # External parameters checked "ExtParamsChecked", # Healthcheck performed "Healthcheck", # Beacon sent "BeaconSent", # Alarm sent "AlarmSent"]) # Type of log LogType = \ enum.Enum("LogType", [ # Log of last download attempt "Last", # Log of last failed attempt "LastFailed", # Log of last completed update "LastCompleted"]) # Information about retrieved log record LogInfo = \ NamedTuple( "LogInfo", [("text", str), ("timestamp", datetime.datetime), ("log_type", LogType)]) # Check type CheckType = enum.Enum("CheckType", ["ExtParams", "FsDatabase"]) # Information about check CheckInfo = \ NamedTuple( "CheckInfo", [ # Check type ("check_type", CheckType), # Item checked ("check_item", str), # Error message (None if check passed) ("errmsg", Optional[str]), # Check timestamp ("timestamp", datetime.datetime)]) # Alarm types AlarmType = enum.Enum("AlarmType", ["MissingMilestone", "FailedCheck"]) # Information about alarm AlarmInfo = \ NamedTuple( "AlarmInfo", [ # Type of alarm ("alarm_type", AlarmType), # Specific reason for alarm (name of missing milestone, # name of offending external file, etc.) ("alarm_reason", str), # Alarm timestramp ("timestamp", datetime.datetime)]) def safe_dsn(dsn: Optional[str]) -> Optional[str]: """ Returns DSN without password (if there was any) """ if not dsn: return dsn try: parsed = urllib.parse.urlparse(dsn) if not parsed.password: return dsn return \ urllib.parse.urlunparse( parsed._replace( netloc=parsed.netloc.replace(":" + parsed.password, ":"))) except Exception: return dsn class StateDb: """ Status database access wrapper Public attributes: db_dsn -- Connection string (might be None in Alembic environment) db_name -- Database name from connection string (might be None in Alembic environment) metadata -- Metadata. Originally as predefined, after connection - actual Private attributes:: _engine -- SqlAlchemy engine (set on connection, reset to None on disconnection or error) """ # Name of table with milestones MILESTONE_TABLE_NAME = "milestones" # Name of table with alarming milestones ALARM_TABLE_NAME = "alarms" # Name of table with recent successful and unsuccessful logs LOG_TABLE_NAME = "logs" # Name of table with check results CHECKS_TABLE = "checks" # All known table names (not including Alembic, etc.) ALL_TABLE_NAMES = [MILESTONE_TABLE_NAME, ALARM_TABLE_NAME, LOG_TABLE_NAME, CHECKS_TABLE] class _RootDb: """ Context wrapper to work with everpresent root database Public attributes: dsn -- Connection string to root database conn -- Sqlalchemy connection object to root database Private attributes: _engine -- SqlAlchemy connection object """ # Name of Postgres root database ROOT_DB_NAME = "postgres" def __init__(self, dsn: str) -> None: """ Constructor Arguments: dsn -- Connection string to (nonroot) database of interest """ self.dsn = \ urllib.parse.urlunsplit( urllib.parse.urlsplit(dsn). _replace(path=f"/{self.ROOT_DB_NAME}")) self._engine: Any = None self.conn: Any = None try: self._engine = sa.create_engine(self.dsn) self.conn = self._engine.connect() except sa.exc.SQLAlchemyError as ex: error( f"Can't connect to root database '{safe_dsn(self.dsn)}': " f"{ex}") finally: if (self.conn is None) and (self._engine is not None): # Connection failed self._engine.dispose() def __enter__(self) -> "StateDb._RootDb": """ Context entry """ return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """ Context exit """ if self.conn is not None: self.conn.close() if self._engine is not None: self._engine.dispose() def __init__(self, db_dsn: Optional[str]) -> None: """ Constructor Arguments: db_dsn -- Connection string to state database. None in Alembic environment """ self.db_dsn = db_dsn self.db_name: Optional[str] = \ urllib.parse.urlsplit(self.db_dsn).path.strip("/") \ if self.db_dsn else None self.metadata = sa.MetaData() sa.Table( self.MILESTONE_TABLE_NAME, self.metadata, sa.Column("milestone", sa.Enum(DownloaderMilestone), index=True, primary_key=True), sa.Column("region", sa.String(), index=True, primary_key=True), sa.Column("timestamp", sa.DateTime(), nullable=False)) sa.Table( self.ALARM_TABLE_NAME, self.metadata, sa.Column("alarm_type", sa.Enum(AlarmType), primary_key=True, index=True), sa.Column("alarm_reason", sa.String(), primary_key=True, index=True), sa.Column("timestamp", sa.DateTime(), nullable=False)) sa.Table( self.LOG_TABLE_NAME, self.metadata, sa.Column("log_type", sa.Enum(LogType), index=True, nullable=False, primary_key=True), sa.Column("text", sa.String(), nullable=False), sa.Column("timestamp", sa.DateTime(), nullable=False, index=True)) sa.Table( self.CHECKS_TABLE, self.metadata, sa.Column("check_type", sa.Enum(CheckType), primary_key=True, index=True), sa.Column("check_item", sa.String(), primary_key=True, index=True), sa.Column("errmsg", sa.String(), nullable=True), sa.Column("timestamp", sa.DateTime(), nullable=False)) self._engine: Any = None def check_server(self) -> bool: """ True if database server can be connected """ error_if(not self.db_dsn, "FS downloader status database DSN was not specified") assert self.db_dsn is not None with self._RootDb(self.db_dsn) as rdb: rdb.conn.execute("SELECT 1") return True return False def create_db(self, recreate_db=False, recreate_tables=False) -> bool: """ Creates database if absent, optionally adjust if present Arguments: recreate_db -- Recreate database if it exists recreate_tables -- Recreate known database tables if database exists Returns True on success, Fail on failure (if fail_on_error is False) """ engine: Any = None try: if self._engine: self._engine.dispose() self._engine = None error_if(not self.db_dsn, "FS downloader status database DSN was not specified") assert self.db_dsn is not None engine = self._create_engine(self.db_dsn) if recreate_db: with self._RootDb(self.db_dsn) as rdb: try: rdb.conn.execute("COMMIT") rdb.conn.execute( f'DROP DATABASE IF EXISTS "{self.db_name}"') except sa.exc.SQLAlchemyError as ex: error(f"Unable to drop database '{self.db_name}': " f"{repr(ex)}") try: with engine.connect(): pass except sa.exc.SQLAlchemyError: with self._RootDb(self.db_dsn) as rdb: try: rdb.conn.execute("COMMIT") rdb.conn.execute( f'CREATE DATABASE "{self.db_name}"') with engine.connect(): pass except sa.exc.SQLAlchemyError as ex1: error(f"Unable to create target database: {ex1}") try: if recreate_tables: with engine.connect() as conn: conn.execute("COMMIT") for table_name in self.ALL_TABLE_NAMES: conn.execute( f'DROP TABLE IF EXISTS "{table_name}"') self.metadata.create_all(engine, checkfirst=True) self._read_metadata() except sa.exc.SQLAlchemyError as ex: error(f"Unable to (re)create tables in the database " f"'{self.db_name}': {repr(ex)}") self._engine = engine engine = None return True finally: if engine: engine.dispose() def connect(self) -> None: """ Connect to database, that is assumed to be existing """ if self._engine: return engine: Any = None try: error_if(not self.db_dsn, "FS downloader status database DSN was not specified") engine = self._create_engine(self.db_dsn) self._read_metadata() with engine.connect(): pass self._engine = engine engine = None finally: if engine: engine.dispose() def disconnect(self) -> None: """ Disconnect database """ if self._engine: self._engine.dispose() self._engine = None def _read_metadata(self) -> None: """ Reads-in metadata from an existing database """ engine: Any = None try: engine = self._create_engine(self.db_dsn) with engine.connect(): pass metadata = sa.MetaData() metadata.reflect(bind=engine) for table_name in self.ALL_TABLE_NAMES: error_if( table_name not in metadata.tables, f"Table '{table_name}' not present in the database " f"'{safe_dsn(self.db_dsn)}'") self.metadata = metadata except sa.exc.SQLAlchemyError as ex: error(f"Can't connect to database " f"'{safe_dsn(self.db_dsn)}': {repr(ex)}") finally: if engine is not None: engine.dispose() def _execute(self, ops: List[Any]) -> Any: # sa.CursorResult """ Execute database operations Arguments: ops -- List of SqlAlchemy operations to execute Returns resulting cursor """ retry = False assert ops while True: if self._engine is None: self.connect() assert self._engine is not None try: with self._engine.connect() as conn: for op in ops: ret = conn.execute(op) return ret except sa.exc.SQLAlchemyError as ex: if retry: error(f"FS downloader status database operation " f"failed: {repr(ex)}") retry = True try: self.disconnect() except sa.exc.SQLAlchemyError: self._engine = None assert self._engine is None def _create_engine(self, dsn) -> Any: """ Creates synchronous SqlAlchemy engine Overloaded in RcacheDbAsync to create asynchronous engine Returns Engine object """ try: return sa.create_engine(dsn) except sa.exc.SQLAlchemyError as ex: error(f"Invalid database DSN: '{safe_dsn(dsn)}': {ex}") return None # Will never happen, appeasing pylint def write_milestone(self, milestone: DownloaderMilestone, updated_regions: Optional[Iterable[str]] = None, all_regions: Optional[List[str]] = None) -> None: """ Write milestone to state database Arguments: milestone -- Milestone to write updated_regions -- List of region strings of updated regions, None for region-inspecific milestones all_regions -- List of all region strings, None for region-inspecific milestones """ table = self.metadata.tables[self.MILESTONE_TABLE_NAME] ops: List[Any] = [] if all_regions is not None: ops.append( sa.delete(table).where(table.c.region.notin_(all_regions)). where(table.c.milestone == milestone.name)) timestamp = datetime.datetime.now() ins = sa_pg.insert(table).\ values( [{"milestone": milestone.name, "region": region or "", "timestamp": timestamp} for region in (updated_regions or [None])]) ins = ins.on_conflict_do_update( index_elements=[c.name for c in table.c if c.primary_key], set_={c.name: ins.excluded[c.name] for c in table.c if not c.primary_key}) ops.append(ins) self._execute(ops) def read_milestone(self, milestone: DownloaderMilestone) \ -> Dict[Optional[str], datetime.datetime]: """ Read milestone information from state database Arguments: milestone -- Milestone to read Returns by-region dictionary of milestone timetags """ table = self.metadata.tables[self.MILESTONE_TABLE_NAME] sel = sa.select([table.c.region, table.c.timestamp]).\ where(table.c.milestone == milestone.name) rp = self._execute([sel]) return {rec.region or None: rec.timestamp for rec in rp} def write_alarm_reasons( self, reasons: Optional[Dict[AlarmType, Set[str]]] = None) \ -> None: """ Write alarm reason to state database Arguments: reasons -- Map of alarm types to set of reasons """ table = self.metadata.tables[self.ALARM_TABLE_NAME] ops: List[Any] = [sa.delete(table)] if reasons: timestamp = datetime.datetime.now() values: List[Dict[str, Any]] = [] for alarm_type, alarm_reasons in reasons.items(): values += [{"alarm_type": alarm_type.name, "alarm_reason": alarm_reason, "timestamp": timestamp} for alarm_reason in alarm_reasons] ops.append(sa.insert(table).values(values)) self._execute(ops) def read_alarm_reasons(self) -> List[AlarmInfo]: """ Read alarms information from database Returns set of missed milestones """ table = self.metadata.tables[self.ALARM_TABLE_NAME] rp = self._execute( [sa.select( [table.c.alarm_type, table.c.alarm_reason, table.c.timestamp])]) return [AlarmInfo(alarm_type=AlarmType[rec.alarm_type], alarm_reason=rec.alarm_reason, timestamp=rec.timestamp) for rec in rp] def write_log(self, log_type: LogType, log: str) -> None: """ Write downloader log Arguments: log_type -- Type of log being written log -- Log text """ table = self.metadata.tables[self.LOG_TABLE_NAME] ins = sa_pg.insert(table).\ values(log_type=log_type.name, text=log, timestamp=datetime.datetime.now()) ins = \ ins.on_conflict_do_update( index_elements=["log_type"], set_={c.name: ins.excluded[c.name] for c in table.c if not c.primary_key}) self._execute([ins]) def read_last_log(self, log_type: LogType) -> Optional[LogInfo]: """ Read log from database Arguments: log_type -- Type of log to retrieve Returns LogInfo object """ table = self.metadata.tables[self.LOG_TABLE_NAME] sel = sa.select([table.c.log_type, table.c.text, table.c.timestamp]).\ where(table.c.log_type == log_type.name) rp = self._execute([sel]) row = rp.first() return \ LogInfo(text=row.text, timestamp=row.timestamp, log_type=LogType[row.log_type]) if row else None def write_check_results( self, check_type: CheckType, results: Optional[Dict[str, Optional[str]]] = None) -> None: """ Write check results Arguments: check_type -- Type of check results -- Itemized results: dicxtionary contained error message for failed checks, None for succeeded ones """ table = self.metadata.tables[self.CHECKS_TABLE] ops: List[Any] = \ [sa.delete(table).where(table.c.check_type == check_type.name)] if results: timestamp = datetime.datetime.now() ops.append( sa.insert(table).values( [{"check_type": check_type.name, "check_item": key, "errmsg": value, "timestamp": timestamp} for key, value in results.items()])) self._execute(ops) def read_check_results(self) -> Dict[CheckType, List[CheckInfo]]: """ Read check_results Returns dictionary of CheckInfo items lists, indexed by check type """ table = self.metadata.tables[self.CHECKS_TABLE] sel = sa.select([table.c.check_type, table.c.check_item, table.c.errmsg, table.c.timestamp]) rp = self._execute([sel]) ret: Dict[CheckType, List[CheckInfo]] = {} for rec in rp: check_info = \ CheckInfo( check_type=CheckType[rec.check_type], check_item=rec.check_item, errmsg=rec.errmsg, timestamp=rec.timestamp) ret.setdefault(check_info.check_type, []).append(check_info) return ret