mirror of
				https://github.com/Telecominfraproject/openafc_final.git
				synced 2025-10-31 18:17:46 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			513 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			513 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| # Embedding/extracting FSID table to/from FS SQLite database
 | |
| 
 | |
| # Copyright (C) 2022 Broadcom. All rights reserved.
 | |
| # The term "Broadcom" refers solely to the Broadcom Inc. corporate affiliate
 | |
| # that owns the software below.
 | |
| # This work is licensed under the OpenAFC Project License, a copy of which is
 | |
| # included with this software program.
 | |
| 
 | |
| # pylint: disable=wrong-import-order, invalid-name
 | |
| # pylint: disable=too-many-instance-attributes
 | |
| 
 | |
| import argparse
 | |
| from collections.abc import Iterator, Sequence
 | |
| import csv
 | |
| import os
 | |
| import sqlalchemy as sa
 | |
| import sys
 | |
| from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Union
 | |
| 
 | |
| # Name of table for FSID in FS Database
 | |
| FSID_TABLE_NAME = "fsid_history"
 | |
| 
 | |
| 
 | |
| def warning(warnmsg: str) -> None:
 | |
|     """ Print given warning message """
 | |
|     print(f"{os.path.basename(__file__)}: Warning: {warnmsg}", file=sys.stderr)
 | |
| 
 | |
| 
 | |
| def error(errmsg: str) -> None:
 | |
|     """ Print given error message and exit """
 | |
|     print(f"{os.path.basename(__file__)}: Error: {errmsg}", file=sys.stderr)
 | |
|     sys.exit(1)
 | |
| 
 | |
| 
 | |
| def error_if(condition: Any, errmsg: str) -> None:
 | |
|     """ If given condition met - print given error message and exit """
 | |
|     if condition:
 | |
|         error(errmsg)
 | |
| 
 | |
| 
 | |
| class Record:
 | |
|     """ Stuff about and around of records in FSID table
 | |
| 
 | |
|     Private attributes:
 | |
|     _fields -- FSID record as dictionary, ordered by database field names (i.e.
 | |
|                a database row dictionary). Absent fields not included
 | |
|     """
 | |
|     class SchemaCheckResult(NamedTuple):
 | |
|         """ Results of database or CSV schema check """
 | |
|         # Fields in csv/database not known to script
 | |
|         extra_fields: List[str]
 | |
| 
 | |
|         # Optional fields known to script not found in csv/database
 | |
|         missing_optional_fields: List[str]
 | |
| 
 | |
|         # Required fields known to script not found in csv/database
 | |
|         missing_required_fields: List[str]
 | |
| 
 | |
|         def is_ok(self) -> bool:
 | |
|             """ True if fields in csv/database are the same as in script """
 | |
|             return not (self.extra_fields or self.missing_optional_fields or
 | |
|                         self.missing_required_fields)
 | |
| 
 | |
|         def is_fatal(self) -> bool:
 | |
|             """ True if csv/database lacks some mandatory fields
 | |
|             """
 | |
|             return not self.missing_required_fields
 | |
| 
 | |
|         def errmsg(self) -> str:
 | |
|             """ Error message on field nomenclature mismatch. Empty of all OK
 | |
|             """
 | |
|             ret: List[str] = []
 | |
|             for fields, who, what in \
 | |
|                     [(self.missing_required_fields, "required", "missing"),
 | |
|                      (self.missing_optional_fields, "optional", "missing"),
 | |
|                      (self.extra_fields, "unknown", "present")]:
 | |
|                 if fields:
 | |
|                     ret.append(f"Following {who} fields are {what}: "
 | |
|                                f"{', '.join(fields)}")
 | |
|             return ". ".join(ret)
 | |
| 
 | |
|     # Database dictionary row data type
 | |
|     RecordDataType = Mapping[str, Optional[Union[str, int, float]]]
 | |
| 
 | |
|     # Dictionary stored in this class data type
 | |
|     StoredDataType = Dict[str, Union[str, int, float]]
 | |
| 
 | |
|     # Field descriptor
 | |
|     FieldDsc = NamedTuple("FieldDsc", [("csv_name", str),
 | |
|                                        ("column", sa.Column)])
 | |
| 
 | |
|     # Fields of FSID table
 | |
|     _FIELDS = [
 | |
|         FieldDsc("FSID",
 | |
|                  sa.Column("fsid", sa.Integer(), primary_key=True)),
 | |
|         FieldDsc("Region",
 | |
|                  sa.Column("region", sa.String(10), nullable=True)),
 | |
|         FieldDsc("Callsign",
 | |
|                  sa.Column("callsign", sa.String(16), nullable=False)),
 | |
|         FieldDsc("Path Number",
 | |
|                  sa.Column("path_number", sa.Integer(), nullable=False)),
 | |
|         FieldDsc("Center Frequency (MHz)",
 | |
|                  sa.Column("center_freq_mhz", sa.Float(), nullable=False)),
 | |
|         FieldDsc("Bandwidth (MHz)",
 | |
|                  sa.Column("bandwidth_mhz", sa.Float(), nullable=False))]
 | |
| 
 | |
|     # Index of field descriptors by CSV names
 | |
|     _BY_CSV_NAME = {fd.csv_name: fd for fd in _FIELDS}
 | |
| 
 | |
|     # Index of field descriptors by DB names
 | |
|     _BY_COLUMN = {fd.column.name: fd for fd in _FIELDS}
 | |
| 
 | |
|     def __init__(self, data_dict: "Record.RecordDataType", is_csv: bool) \
 | |
|             -> None:
 | |
|         """ Constructor
 | |
| 
 | |
|         Arguments:
 | |
|         data_dict -- Row data dictionary
 | |
|         is_csv    -- True if data_dict is from CSV (field names from heading
 | |
|                      row, values all strings), False if from DB (field names
 | |
|                      are column names, values are of proper types)
 | |
|         """
 | |
|         self._fields: "Record.StoredDataType"
 | |
|         if is_csv:
 | |
|             self._fields = {}
 | |
|             for csv_name, value in data_dict.items():
 | |
|                 fd = self._BY_CSV_NAME.get(csv_name)
 | |
|                 if (fd is None) or (value is None) or (value == ""):
 | |
|                     continue
 | |
|                 field_name = fd.column.name
 | |
|                 try:
 | |
|                     if isinstance(fd.column.type, sa.String):
 | |
|                         self._fields[field_name] = value
 | |
|                     elif isinstance(fd.column.type, sa.Integer):
 | |
|                         self._fields[field_name] = int(value)
 | |
|                     elif isinstance(fd.column.type, sa.Float):
 | |
|                         self._fields[field_name] = float(value)
 | |
|                     else:
 | |
|                         assert not (f"Internal error: data type "
 | |
|                                     f"{fd.column.type} not supported by "
 | |
|                                     f"script")
 | |
|                 except ValueError as ex:
 | |
|                     error(f"Value '{value}' not valid for field of type "
 | |
|                           f"'{fd.column.type}': {ex}")
 | |
|         else:
 | |
|             self._fields = \
 | |
|                 {name: value for name, value in data_dict.items()
 | |
|                  if (name in self._BY_COLUMN) and (value is not None)}
 | |
| 
 | |
|     def db_dict(self) -> "Record.StoredDataType":
 | |
|         """ Row value as dictionary for inserting to DB """
 | |
|         return self._fields
 | |
| 
 | |
|     def csv_list(self) -> List[str]:
 | |
|         """ Row value as list of strings for writing to CSV """
 | |
|         ret: List[str] = []
 | |
|         for fd in self._FIELDS:
 | |
|             value = self._fields.get(fd.column.name)
 | |
|             ret.append("" if value is None else str(value))
 | |
|         return ret
 | |
| 
 | |
|     @classmethod
 | |
|     def csv_heading(cls) -> List[str]:
 | |
|         """ List of CSV headings """
 | |
|         return [fd.csv_name for fd in cls._FIELDS]
 | |
| 
 | |
|     @classmethod
 | |
|     def db_columns(cls) -> List[sa.Column]:
 | |
|         """ List of DB column descriptors """
 | |
|         return [fd.column for fd in cls._FIELDS]
 | |
| 
 | |
|     @classmethod
 | |
|     def check_table(cls, table_fields: "Sequence[str]") \
 | |
|             -> "Record.SchemaCheckResult":
 | |
|         """ Check database table columns schema in this class """
 | |
|         return cls.SchemaCheckResult(
 | |
|             extra_fields=[field for field in table_fields
 | |
|                           if field not in cls._BY_COLUMN],
 | |
|             missing_optional_fields=[fd.column.name for fd in cls._FIELDS
 | |
|                                      if fd.column.nullable and
 | |
|                                      (fd.column.name not in table_fields)],
 | |
|             missing_required_fields=[fd.column.name for fd in cls._FIELDS
 | |
|                                      if (not fd.column.nullable) and
 | |
|                                      (fd.column.name not in table_fields)])
 | |
| 
 | |
|     @classmethod
 | |
|     def check_csv(cls, csv_headings: List[str]) -> "Record.SchemaCheckResult":
 | |
|         """ Check CSV headings against schema in this class """
 | |
|         return cls.SchemaCheckResult(
 | |
|             extra_fields=[cn for cn in csv_headings
 | |
|                           if cn not in cls._BY_CSV_NAME],
 | |
|             missing_optional_fields=[fd.csv_name for fd in cls._FIELDS
 | |
|                                      if fd.column.nullable and
 | |
|                                      (fd.csv_name not in csv_headings)],
 | |
|             missing_required_fields=[fd.column.name for fd in cls._FIELDS
 | |
|                                      if (not fd.column.nullable) and
 | |
|                                      (fd.csv_name not in csv_headings)])
 | |
| 
 | |
|     @classmethod
 | |
|     def transaction_length(cls, legacy: bool = False) -> int:
 | |
|         """ Maximum length of SqlAlchemy transaction.
 | |
| 
 | |
|         Frankly, I do not remember where from I got these two constants, and
 | |
|         this is not that easy to google """
 | |
|         return (999 if legacy else 32000) // len(cls._FIELDS)
 | |
| 
 | |
|     @classmethod
 | |
|     def num_fields(cls) -> int:
 | |
|         """ Number of fields in record """
 | |
|         return len(cls._FIELDS)
 | |
| 
 | |
| 
 | |
| class Db:
 | |
|     """ Handles all database-related stuff
 | |
| 
 | |
|     Should be used as a context manager (with 'with')
 | |
| 
 | |
|     Private attributes:
 | |
|     _engine                       -- SqlAlchemy engine
 | |
|     _conn                         -- SqlAlchemy connection
 | |
|     _metadata                     -- SqlAlchemy metadata
 | |
|     _fsid_table                   -- SqlAlchemy table for FSID table, None if
 | |
|                                      it is not in the table
 | |
|     _bulk                         -- List of record dictionaries for subsequent
 | |
|                                      bulk insert
 | |
|     _use_legacy_transaction_limit -- True to use legacy transaction length,
 | |
|                                      False for use more recent one
 | |
|     _current_transaction_length   -- Currently used maximum transaction length
 | |
|     """
 | |
| 
 | |
|     def __init__(self, database_file: str,
 | |
|                  recreate_fsid_table: bool = False) -> None:
 | |
|         """ Constructor
 | |
| 
 | |
|         Arguments:
 | |
|         database_file       -- SQLite database file
 | |
|         recreate_fsid_table -- True to create FSID table anew
 | |
|         """
 | |
|         error_if(not os.path.isfile(database_file),
 | |
|                  f"FS SQLite database '{database_file}' not found")
 | |
|         self._database_file = database_file
 | |
|         self._conn: sa.engine.Connection = None
 | |
|         try:
 | |
|             self._engine = \
 | |
|                 sa.create_engine("sqlite:///" + self._database_file)
 | |
|             self._metadata = sa.MetaData()
 | |
|             self._metadata.reflect(bind=self._engine)
 | |
|             self._conn = self._engine.connect()
 | |
|             self._fsid_table: Optional[sa.Table] = \
 | |
|                 self._metadata.tables.get(FSID_TABLE_NAME)
 | |
|         except sa.exc.SQLAlchemyError as ex:
 | |
|             error(f"Error opening FS database '{self._database_file}': {ex}")
 | |
|         if recreate_fsid_table:
 | |
|             try:
 | |
|                 if self._fsid_table is not None:
 | |
|                     self._metadata.drop_all(self._conn,
 | |
|                                             tables=[self._fsid_table])
 | |
|                     # Somehow drop_all() leaves table in metadata
 | |
|                     self._metadata.remove(self._fsid_table)
 | |
|                 self._fsid_table = sa.Table(FSID_TABLE_NAME, self._metadata,
 | |
|                                             *Record.db_columns())
 | |
|                 self._metadata.create_all(self._engine)
 | |
|             except sa.exc.SQLAlchemyError as ex:
 | |
|                 error(f"Error recreating FSID table in FS database "
 | |
|                       f"'{self._database_file}': {ex}")
 | |
|         self._bulk: List[Record.RecordDataType] = []
 | |
|         self._use_legacy_transaction_limit = False
 | |
|         self._current_transaction_length = self._get_transaction_length()
 | |
| 
 | |
|     def fetchall(self) -> "Iterator[Record]":
 | |
|         """ Iterator that reads all records of FSID table """
 | |
|         try:
 | |
|             for db_row in self._conn.execute(sa.select(self._fsid_table)).\
 | |
|                     fetchall():
 | |
|                 yield Record(data_dict=db_row._mapping, is_csv=False)
 | |
|         except sa.exc.SQLAlchemyError as ex:
 | |
|             error(f"Error reading FSID table from FS database "
 | |
|                   f"'{self._database_file}': {ex}")
 | |
| 
 | |
|     def write_row(self, row: Record) -> None:
 | |
|         """ Writes one row to FSID table """
 | |
|         self._bulk.append(row.db_dict())
 | |
|         if len(self._bulk) >= self._current_transaction_length:
 | |
|             self._bulk_write()
 | |
| 
 | |
|     def fsid_fields(self) -> Optional[List[str]]:
 | |
|         """ List of FSID table fields, None if table is absent """
 | |
|         return None if self._fsid_table is None \
 | |
|             else list(self._fsid_table.c.keys())
 | |
| 
 | |
|     def _get_transaction_length(self) -> int:
 | |
|         """ Maximum length of SqlAlchemy transaction.
 | |
| 
 | |
|         Frankly, I do not remember where from I got these two constants, and
 | |
|         this is not that easy to google """
 | |
|         return (999 if self._use_legacy_transaction_limit else 32000) // \
 | |
|             Record.num_fields()
 | |
| 
 | |
|     def _bulk_write(self) -> None:
 | |
|         """ Do bulk write to database """
 | |
|         if not self._bulk:
 | |
|             return
 | |
|         transaction: Optional[sa.engine.Transaction] = None
 | |
|         assert self._fsid_table is not None
 | |
|         try:
 | |
|             transaction = self._conn.begin()
 | |
|             ins = \
 | |
|                 sa.insert(self._fsid_table).execution_options(autocommit=False)
 | |
|             self._conn.execute(ins, self._bulk)
 | |
|             transaction.commit()
 | |
|             transaction = None
 | |
|             self._bulk = []
 | |
|             return
 | |
|         except (sa.exc.CompileError, sa.exc.OperationalError) as ex:
 | |
|             error_if(self._use_legacy_transaction_limit,
 | |
|                      f"Error reading FSID table to FS database "
 | |
|                      f"'{self._database_file}': {ex}")
 | |
|         finally:
 | |
|             if transaction is not None:
 | |
|                 transaction.rollback()
 | |
|                 transaction = None
 | |
|         self._use_legacy_transaction_limit = True
 | |
|         self._current_transaction_length = self._get_transaction_length()
 | |
|         try:
 | |
|             for offset in range(0, len(self._bulk),
 | |
|                                 self._current_transaction_length):
 | |
|                 transaction = self._conn.begin()
 | |
|                 ins = sa.insert(self._fsid_table).\
 | |
|                     execution_options(autocommit=False)
 | |
|                 self._conn.execute(
 | |
|                     ins,
 | |
|                     self._bulk[offset:
 | |
|                                offset + self._current_transaction_length])
 | |
|                 transaction.commit()
 | |
|                 transaction = None
 | |
|                 self._bulk = []
 | |
|             return
 | |
|         finally:
 | |
|             if transaction is not None:
 | |
|                 transaction.rollback()
 | |
|                 transaction = None
 | |
| 
 | |
|     def __enter__(self) -> "Db":
 | |
|         """ Context entry - returns self """
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> None:
 | |
|         """ Context exit """
 | |
|         if self._conn is not None:
 | |
|             if exc_value is None:
 | |
|                 self._bulk_write()
 | |
|             self._conn.close()
 | |
| 
 | |
| 
 | |
| def do_check(args: Any) -> None:
 | |
|     """Execute "extract" command.
 | |
| 
 | |
|     Arguments:
 | |
|     args -- Parsed command line arguments
 | |
|     """
 | |
|     with Db(args.SQLITE_FILE) as db:
 | |
|         if db.fsid_fields() is None:
 | |
|             print(f"No FSID table in '{args.SQLITE_FILE}'")
 | |
|             sys.exit(1)
 | |
| 
 | |
| 
 | |
| def do_extract(args: Any) -> None:
 | |
|     """Execute "extract" command.
 | |
| 
 | |
|     Arguments:
 | |
|     args -- Parsed command line arguments
 | |
|     """
 | |
|     try:
 | |
|         with Db(args.SQLITE_FILE) as db:
 | |
|             fields = db.fsid_fields()
 | |
|             error_if(fields is None,
 | |
|                      f"FS database '{args.SQLITE_FILE}' does not contain FSID "
 | |
|                      f"history table named '{FSID_TABLE_NAME}'")
 | |
|             assert fields is not None
 | |
|             scr = Record.check_table(fields)
 | |
|             error_if(scr.is_fatal() if args.partial else (not scr.is_ok()),
 | |
|                      scr.errmsg())
 | |
|             if not scr.is_ok():
 | |
|                 warning(scr.errmsg())
 | |
|             with open(args.FSID_FILE, mode="w", newline="", encoding="utf-8") \
 | |
|                     as f:
 | |
|                 csv_writer = csv.writer(f, lineterminator="\n")
 | |
|                 csv_writer.writerow(Record.csv_heading())
 | |
|                 for record in db.fetchall():
 | |
|                     csv_writer.writerow(record.csv_list())
 | |
|     except OSError as ex:
 | |
|         error(f"Error writing FSID table file '{args.FSID_FILE}': {ex}")
 | |
| 
 | |
| 
 | |
| def do_embed(args: Any) -> None:
 | |
|     """Execute "embed" command.
 | |
| 
 | |
|     Arguments:
 | |
|     args -- Parsed command line arguments
 | |
|     """
 | |
|     error_if(not os.path.isfile(args.FSID_FILE),
 | |
|              f"FSID table CSV file '{args.FSID_FILE}' not found")
 | |
|     with open(args.FSID_FILE, mode="r", newline="", encoding="utf-8") as f:
 | |
|         headings: List[str] = []
 | |
|         for csv_row in csv.reader(f):
 | |
|             headings = csv_row
 | |
|             break
 | |
|     scr = Record.check_csv(headings)
 | |
|     error_if(scr.is_fatal() if args.partial else (not scr.is_ok()),
 | |
|              scr.errmsg())
 | |
|     if not scr.is_ok():
 | |
|         warning(scr.errmsg())
 | |
|     try:
 | |
|         with Db(args.SQLITE_FILE, recreate_fsid_table=True) as db:
 | |
|             with open(args.FSID_FILE, mode="r", newline="", encoding="utf-8") \
 | |
|                     as f:
 | |
|                 for csv_dict in csv.DictReader(f):
 | |
|                     db.write_row(Record(csv_dict, is_csv=True))
 | |
|     except OSError as ex:
 | |
|         error(f"Error reading FSID table file '{args.FSID_FILE}': {ex}")
 | |
| 
 | |
| 
 | |
| def do_help(args: Any) -> None:
 | |
|     """Execute "help" command.
 | |
| 
 | |
|     Arguments:
 | |
|     args -- Parsed command line arguments (also contains 'argument_parser' and
 | |
|             'subparsers' fields)
 | |
|     """
 | |
|     if args.subcommand is None:
 | |
|         args.argument_parser.print_help()
 | |
|     else:
 | |
|         args.subparsers.choices[args.subcommand].print_help()
 | |
| 
 | |
| 
 | |
| def main(argv: List[str]) -> None:
 | |
|     """Do the job.
 | |
| 
 | |
|     Arguments:
 | |
|     argv -- Program arguments
 | |
|     """
 | |
|     # Switches for database file
 | |
|     switches_database = argparse.ArgumentParser(add_help=False)
 | |
|     switches_database.add_argument(
 | |
|         "SQLITE_FILE",
 | |
|         help="SQLite file containing FS (aka ULS) database")
 | |
| 
 | |
|     # Switches for FSID CSV file
 | |
|     switches_fsid = argparse.ArgumentParser(add_help=False)
 | |
|     switches_fsid.add_argument(
 | |
|         "FSID_FILE",
 | |
|         help="CSV file containing FSID table")
 | |
| 
 | |
|     # Top level parser
 | |
|     argument_parser = argparse.ArgumentParser(
 | |
|         description="Embedding/extracting FSID table to/from FS (aka ULS) "
 | |
|         "SQLite database")
 | |
| 
 | |
|     subparsers = argument_parser.add_subparsers(dest="subcommand",
 | |
|                                                 metavar="SUBCOMMAND")
 | |
| 
 | |
|     # Subparser for 'check' command
 | |
|     parser_check = subparsers.add_parser(
 | |
|         "check", parents=[switches_database],
 | |
|         help="Exits normally if database contains FSID table (of whatever "
 | |
|         "scheme), with error otherwise")
 | |
|     parser_check.set_defaults(func=do_check)
 | |
| 
 | |
|     # Subparser for 'extract' command
 | |
|     parser_extract = subparsers.add_parser(
 | |
|         "extract", parents=[switches_database, switches_fsid],
 | |
|         help="Extract FSID table from FS (aka ULS) Database")
 | |
|     parser_extract.add_argument(
 | |
|         "--partial", action="store_true",
 | |
|         help="Ignore unknown database column names, fill with empty strings "
 | |
|         "missing columns. By default database columns must match ones, known "
 | |
|         "to the script")
 | |
|     parser_extract.set_defaults(func=do_extract)
 | |
| 
 | |
|     # Subparser for 'embed' command
 | |
|     parser_embed = subparsers.add_parser(
 | |
|         "embed", parents=[switches_database, switches_fsid],
 | |
|         help="Embed FSID table from FS (aka ULS) Database")
 | |
|     parser_embed.add_argument(
 | |
|         "--partial", action="store_true",
 | |
|         help="Ignore unknown CSV column names, fill missing columns with "
 | |
|         "NULL. By default CSV columns must match ones, known to the script")
 | |
|     parser_embed.set_defaults(func=do_embed)
 | |
| 
 | |
|     # Subparser for 'help' command
 | |
|     parser_help = subparsers.add_parser(
 | |
|         "help", add_help=False,
 | |
|         help="Prints help on given subcommand")
 | |
|     parser_help.add_argument(
 | |
|         "subcommand", metavar="SUBCOMMAND", nargs="?",
 | |
|         choices=subparsers.choices,
 | |
|         help="Name of subcommand to print help about (use " +
 | |
|         "\"%(prog)s --help\" to get list of all subcommands)")
 | |
|     parser_help.set_defaults(func=do_help, subparsers=subparsers,
 | |
|                              argument_parser=argument_parser)
 | |
| 
 | |
|     if not argv:
 | |
|         argument_parser.print_help()
 | |
|         sys.exit(1)
 | |
| 
 | |
|     args = argument_parser.parse_args(argv)
 | |
|     args.func(args)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main(sys.argv[1:])
 | 
