#!/usr/bin/env python3 """ Computes path difference between two FS (ULS) Databases """ # Copyright (C) 2022 Broadcom. All rights reserved. # The term "Broadcom" refers solely to the Broadcom Inc. corporate affiliate # that owns the software below. # This work is licensed under the OpenAFC Project License, a copy of which is # included with this software program. # pylint: disable=wrong-import-order, invalid-name, too-many-statements # pylint: disable=too-many-branches, too-many-nested-blocks # pylint: disable=too-many-instance-attributes, too-many-arguments # pylint: disable=too-few-public-methods, too-many-locals, protected-access import argparse from collections.abc import Iterator import enum import logging import math import os import sqlalchemy as sa import sys from typing import Any, cast, Dict, List, NamedTuple, Optional, Set, Tuple # START OF DATABASE SCHEMA-DEPENDENT STUFF # Name of table with FS Paths FS_TABLE_NAME = "uls" # Name of table with Passive Repeaters PR_TABLE_NAME = "pr" # Name of table with Restricted Areas RAS_TABLE_NAME = "ras" class PathIdent(NamedTuple): """ FS Path table columns that globally uniquely identify the path Not FSID as it is not necessarily unique across different databases) """ region: str callsign: str path_number: int freq_assigned_start_mhz: float freq_assigned_end_mhz: float def __str__(self): """ String representation for difference report """ return f"{self.region}:{self.callsign}/{self.path_number}" \ f"[{self.freq_assigned_start_mhz}-" \ f"{self.freq_assigned_end_mhz}]MHz" # Fields of FS Path table to NOT include into FS path hash (besides those that # included into PathIdent) FS_NO_HASH_FIELDS = ["fsid", "p_rp_num", "name"] # Fields of Passive Repeater table to NOT include into FS path hash PR_NO_HASH_FIELDS = ["id", "fsid", "prSeq"] # RAS table fields to NOT include into hash RAS_NO_HASH_FIELDS = ["rasid"] # Coordinate field names of some receiver in some database record RxCoordFields = NamedTuple("RxCoordFields", [("lat", str), ("lon", str)]) # Fields in FS Path table that comprise coordinates of path points that affect # AFC Engine computations. Specified in (lat, lon) order FS_COORD_FIELDS = [RxCoordFields("rx_lat_deg", "rx_long_deg")] # Fields in Passive Repeater table that comprise coordinates of path points. # Specified in (lat, lon) order PR_COORD_FIELDS = [RxCoordFields("pr_lat_deg", "pr_lon_deg")] # Primary key field in FS Path table FS_FSID_FIELD = "fsid" # Foreign key field in Passive Repeater table PR_FSID_FIELD = "fsid" # Field in FS Path table that denotes the presence of passive Repeater in the # path FS_PR_FIELD = "p_rp_num" # Field in Passive Repeater table that sorts Repeater in due order PR_ORDER_FIELD = "prSeq" # END OF DATABASE SCHEMA-DEPENDENT STUFF def error(msg: str) -> None: """ Prints given msg as error message and exits abnormally """ logging.error(msg) sys.exit(1) def error_if(cond: Any, msg: str) -> None: """ If condition evaluates to true prints given msg as error message and exits abnormally """ if cond: error(msg) class Tile(NamedTuple): """ 1x1 degree tile containing some receiver """ # Minimum latitude in north-positive degrees min_lat: int # Minimum longitude in east-positive degrees min_lon: int def __str__(self) -> str: """ String representation for difference report """ return f"[{abs(self.min_lat)}-{abs(self.min_lat + 1)}]" \ f"{'N' if self.min_lat >= 0 else 'S'}, " \ f"[{abs(self.min_lon)}-{abs(self.min_lon + 1)}]" \ f"{'E' if self.min_lon >= 0 else 'W'}" class HashFields: """ Fields of some table that participate in hash computation Private attributes: self._field_infos -- List of _FieldInfo object that specify indices (within row) and names of fields """ # Field descriptor (index in DB row and name) _FieldInfo = NamedTuple("_FieldInfo", [("name", str), ("idx", int)]) def __init__(self, table: sa.Table, exclude: List[str]) -> None: """ Constructor Arguments: table -- SqlAlchemy table object (contains information about all columns) exclude -- List of names of fields not to include """ error_if(not all(ef in table.c for ef in exclude), f"Table '{table.name}' missing these fields: " f"{', '.join([ef for ef in exclude if ef not in table.c])}") self._field_infos: List["HashFields._FieldInfo"] = [] for idx, col in enumerate(table.c): if col.name not in exclude: self._field_infos.append( self._FieldInfo(name=col.name, idx=idx)) def get_hash_fields(self, row: "sa.engine.RowProxy") \ -> "Iterator[Tuple[str, Any]]": """ Iterator over fields of given row Yields (filed_name, field_value) tuples """ for fi in self._field_infos: yield (fi.name, row[fi.idx]) class FsPath: """ Information about FS Path Public attributes: ident -- Globally unique FS Path identifier fsid -- Path primary key in DB Private attributes: _row_fields -- Dictionary of FS Path table row fields used in Path Hash computation. None if fields computation was not requested _prs -- List of per-repeater dictionaries of fields used in Path Hash computation. None if fields computation was not requested _tiles -- Set of tiles containing receivers. None if its computation was not requested _path_hash -- Path hash (hash of pertinent non-identification fields in the path) if its computation was requested, None otherwise """ class _CoordFieldMapping(NamedTuple): """ Latitude/longitude field names """ lat: str lon: str def make_tile(self, row: "sa.engine.RowProxy") -> Tile: """ Makes tile from latitude/longitude fields in given DB row """ return Tile(min_lat=math.floor(row[self.lat]), min_lon=math.floor(row[self.lon])) def __init__(self, fs_row: "sa.engine.RowProxy", conn: sa.engine.Connection, pr_table: sa.Table, fs_hash_fields: HashFields, pr_hash_fields: HashFields, compute_hash: bool = False, compute_fields: bool = False, compute_tiles: bool = False) \ -> None: """ Constructor Arguments: fs_row -- Row from FS table conn -- Connection (to use for reading Passive Repeater table) pr_table -- Passive Repeater SqlAlchemy table fs_hash_fields -- Hash fields iterator for FS table pr_hash_fields -- Hash fields iterator for Passive Repeater table compute_hash -- True to compute path hash compute_fields -- True to compute fields' dictionary compute_tiles -- True to compute receiver tiles """ self.ident = PathIdent(**{f: fs_row[f] for f in PathIdent._fields}) self.fsid = fs_row[FS_FSID_FIELD] self._row_fields: Optional[Dict[str, Any]] = \ {} if compute_fields else None self._prs: Optional[List[Dict[str, Any]]] = \ [] if compute_fields else None self._tiles: Optional[Set[Tile]] = set() if compute_tiles else None hash_fields: Optional[List[Any]] = [] if compute_hash else None for field_name, field_value in fs_hash_fields.get_hash_fields(fs_row): if hash_fields is not None: hash_fields.append(field_value) if self._row_fields is not None: self._row_fields[field_name] = field_value if self._tiles is not None: self._tiles |= self._tiles_from_record(fs_row, FS_COORD_FIELDS) if fs_row[FS_PR_FIELD]: sel = sa.select(pr_table).\ where(pr_table.c[PR_FSID_FIELD] == self.fsid).\ order_by(pr_table.c[PR_ORDER_FIELD]) for pr_row in conn.execute(sel): pr: Optional[Dict[str, Any]] = {} if self._prs is not None \ else None if self._prs is not None: assert pr is not None self._prs.append(pr) for field_name, field_value in \ pr_hash_fields.get_hash_fields(pr_row): if hash_fields is not None: hash_fields.append(field_value) if pr is not None: pr[field_name] = field_value if self._tiles is not None: self._tiles |= \ self._tiles_from_record(pr_row, PR_COORD_FIELDS) self._path_hash: Optional[int] = \ hash(tuple(hash_fields)) if hash_fields is not None else None def path_hash(self) -> int: """ Returns path hash """ assert self._path_hash is not None return self._path_hash def tiles(self) -> Set[Tile]: """ Returns receiver tiles """ assert self._tiles is not None return self._tiles def diff_report(self, other: "FsPath") -> str: """ Returns report on difference with other FsPath object """ assert self._row_fields is not None assert self._prs is not None assert other._row_fields is not None assert other._prs is not None diffs: List[str] = [] for field, this_value in self._row_fields.items(): other_value = other._row_fields[field] if this_value != other_value: diffs.append(f"'{field}': '{this_value}' vs '{other_value}'") if len(self._prs) != len(other._prs): diffs.append(f"Different number of repeaters: " f"{len(self._prs)} vs {len(other._prs)}") else: for pr_idx, this_pr_dict in enumerate(self._prs): other_pr_dict = other._prs[pr_idx] for field, this_value in this_pr_dict.items(): other_value = other_pr_dict[field] if this_value != other_value: diffs.append(f"Repeater #{pr_idx + 1}, '{field}': " f"'{this_value}' vs '{other_value}'") return ", ".join(diffs) def _tiles_from_record(self, row: "sa.engine.RowProxy", rx_fields: List[RxCoordFields]) -> Set[Tile]: """ Make set of tiles according to given list of receiver coordinates """ ret: Set[Tile] = set() for rf in rx_fields: ret.add(Tile(min_lat=math.floor(row[rf.lat]), min_lon=math.floor(row[rf.lon]))) return ret @classmethod def compute_hash_fields(cls, fs_table: sa.Table, pr_table: sa.Table) \ -> Tuple[HashFields, HashFields]: """ Computes hash fields' iterators Arguments: fs_table -- SqlAlchemy table object for FS pr_table -- SqlAlchemy table object Passive Repeater Returns (FS_iterator, PR_Iterator) tuple """ missing_ident_fields = \ [f for f in PathIdent._fields if f not in fs_table.c] error_if( missing_ident_fields, f"Table '{fs_table.name}' does not contain following path " f"identification fields: {', '.join(missing_ident_fields)}") return \ (HashFields( table=fs_table, exclude=list(PathIdent._fields) + FS_NO_HASH_FIELDS), HashFields(table=pr_table, exclude=PR_NO_HASH_FIELDS)) class Ras: """ Information about single restricted area: Public attributes: location -- Location name Private attributes: _figure -- Restricted area figure (source of tile sequence) _hash_fields -- Hash fields (represent restricted area identity) """ # Earth radius, same as cconst.cpp CConst::earthRadius EARTH_RADIUS_KM = 6378.137 # Maximum imaginable SP AP height above ground in kilometers MAX_AP_AGL_KM = 1 class RasType(enum.Enum): """ RAS definition types. Values are DB names of these types """ Rect = "One Rectangle" TwoRect = "Two Rectangles" Circle = "Circle" Horizon = "Horizon Distance" class Point: """ Geodetic point (latitude and longitude) """ def __init__(self, lat: float, lon: float) -> None: if not isinstance(lat, float): raise ValueError(f"Latitude {lat} has incorrect type") if not isinstance(lon, float): raise ValueError(f"Longitude {lon} has incorrect type") self.lat = lat self.lon = lon class FigureBase: """ Abstract base class for restricted area figure """ def tiles(self) -> List[Tile]: """ Returns list of 1 degree tiles covering figure """ raise NotImplementedError( f"{self.__class__}.tiles() not implemented") @classmethod def rect_tiles(cls, min_pt: "Ras.Point", max_pt: "Ras.Point") \ -> List[Tile]: """ Returns list of 1 degree tiles covering given geodetic rectangle Arguments: min_pt -- Left bottom point max_pt -- Right top point Returns sequence of 1 degree tiles """ ret: List[Tile] = [] for min_lat in range(math.floor(min_pt.lat), math.ceil(max_pt.lat)): for min_lon in range(math.floor(min_pt.lon), math.ceil(max_pt.lon)): adjusted_min_lon = min_lon if adjusted_min_lon < -180: adjusted_min_lon += 360 elif adjusted_min_lon >= 180: adjusted_min_lon -= 360 ret.append(Tile(min_lat=min_lat, min_lon=adjusted_min_lon)) return ret class Rect(FigureBase): """ Rectangular RAS Private attributes: _min -- Left bottom point _max -- Right top point """ def __init__(self, pt: "Ras.Point") -> None: """ Constructor, constructs from one of corners """ self._min = pt self._max = pt def extend(self, pt: "Ras.Point") -> None: """ Extending rectangle with corner point """ other_lon = pt.lon center_lon = (self._min.lon + self._max.lon) / 2 if (other_lon - center_lon) > 180: other_lon -= 360 elif (other_lon - center_lon) < -180: other_lon += 360 self._min = Ras.Point(lat=min(pt.lat, self._min.lat), lon=min(other_lon, self._min.lon)) self._max = Ras.Point(lat=max(pt.lat, self._max.lat), lon=max(other_lon, self._max.lon)) def tiles(self) -> List[Tile]: """ Returns sequence of covering 1 degree tiles """ return self.rect_tiles(min_pt=self._min, max_pt=self._max) class Circle(FigureBase): """ Round restricted area Private attributes: _center -- Center geodetic point _radius_km -- Radius in kilometers """ def __init__(self, center: "Ras.Point") -> None: """ Construct by center """ self._center = center self._radius_km: float = 0 def set_radius(self, radius_km: float) -> None: """ Sets Radius in kilometers """ if not isinstance(radius_km, float): raise ValueError(f"Invalid circle radius: {radius_km}") self._radius_km = radius_km def set_horizon_radius(self, ant_agl_m: float) -> None: """ Sets horizon radius by specifying central antenna height above ground in meters """ if not isinstance(ant_agl_m, float): raise ValueError(f"Invalid antenna elevation: {ant_agl_m}") self._radius_km = math.sqrt(2 * Ras.EARTH_RADIUS_KM * 4 / 3) * \ (math.sqrt(ant_agl_m / 1000) + math.sqrt(Ras.MAX_AP_AGL_KM)) def tiles(self) -> List[Tile]: """ Returns sequence of covering 1 degree tiles """ radian_radius = self._radius_km / Ras.EARTH_RADIUS_KM lat_radius_deg = math.degrees(radian_radius) lon_radius_deg = \ math.degrees(radian_radius / math.cos(math.radians(self._center.lat))) return \ self.rect_tiles( min_pt=Ras.Point(lat=self._center.lat - lat_radius_deg, lon=self._center.lon - lon_radius_deg), max_pt=Ras.Point(lat=self._center.lat + lat_radius_deg, lon=self._center.lon + lon_radius_deg)) def __init__(self, fs_row: "sa.engine.RowProxy", hash_fields: HashFields) \ -> None: """ Constructor Arguments: fs_row -- Database row to construct from hash_fields -- Retriever of identity fields """ try: self.location = \ f"{fs_row.region}: {fs_row.name} @ {fs_row.location}" ras_type = [rt for rt in self.RasType if rt.value == fs_row.exclusionZone][0] self._figure: "Ras.FigureBase" if ras_type in (self.RasType.Rect, self.RasType.TwoRect): rect = self.Rect(self.Point(lat=fs_row.rect1lat1, lon=fs_row.rect1lon1)) rect.extend(self.Point(lat=fs_row.rect1lat2, lon=fs_row.rect1lon2)) if ras_type == self.RasType.TwoRect: rect.extend(self.Point(lat=fs_row.rect2lat1, lon=fs_row.rect2lon1)) rect.extend(self.Point(lat=fs_row.rect2lat2, lon=fs_row.rect2lon2)) self._figure = rect elif ras_type in (self.RasType.Circle, self.RasType.Horizon): circle = self.Circle(self.Point(lat=fs_row.centerLat, lon=fs_row.centerLon)) if ras_type == self.RasType.Circle: circle.set_radius(fs_row.radiusKm) else: assert ras_type == self.RasType.Horizon circle.set_horizon_radius(ant_agl_m=fs_row.heightAGL) self._figure = circle else: raise ValueError(f"Unsupported RAS type: {ras_type.name}") except (LookupError, ValueError, sa.exc.SQLAlchemyError) as ex: error( f"RAS record '{fs_row}' has unsupported structure: {repr(ex)}") self._hash_fields = \ tuple(v for _, v in hash_fields.get_hash_fields(fs_row)) def tiles(self) -> List[Tile]: """ Returns sequence of covering 1 degree tiles """ return self._figure.tiles() def __eq__(self, other: Any) -> bool: """ Equality comparator """ return isinstance(other, self.__class__) and \ (self._hash_fields == other._hash_fields) def __hash__(self) -> int: """ Hash computer """ return hash(self._hash_fields) class Db: """ Database access (most part of it) Public attributes filename -- SQLite database filename conn -- Database connection fs_table -- SqlAlchemy table object for FS pr_table -- SqlAlchemy table object for Passive Repeater ras_table -- Optional SqlAlchemy table object for RAS fs_hash_fields -- Iterator over hash fields in FS record pr_hash_fields -- Iterator over hash fields in Passive Repeater record ras_hash_fields -- Optional iterator over hash fields in RAS record """ def __init__(self, filename: str) -> None: """ Constructor Arguments: filename -- SQLite database file """ error_if(not os.path.isfile(filename), f"FS SQLite database '{filename}' not found") self.filename = filename try: self._engine = sa.create_engine("sqlite:///" + self.filename) self._metadata = sa.MetaData() self._metadata.reflect(bind=self._engine) self.conn = self._engine.connect() except sa.exc.SQLAlchemyError as ex: error(f"Error opening FS database '{self.filename}': {ex}") error_if(FS_TABLE_NAME not in self._metadata.tables, f"'{self.filename}' does not contain table '{FS_TABLE_NAME}'") self.fs_table = self._metadata.tables[FS_TABLE_NAME] error_if(PR_TABLE_NAME not in self._metadata.tables, f"'{self.filename}' does not contain table '{PR_TABLE_NAME}'") self.pr_table = self._metadata.tables[PR_TABLE_NAME] self.ras_table = self._metadata.tables.get(RAS_TABLE_NAME) self.fs_hash_fields, self.pr_hash_fields = \ FsPath.compute_hash_fields(fs_table=self.fs_table, pr_table=self.pr_table) self.ras_hash_fields = \ None if self.ras_table is None \ else HashFields(table=self.ras_table, exclude=RAS_NO_HASH_FIELDS) def close(self): """ Close connection """ self.conn.close() def all_fs_rows(self) -> "Iterator[sa.engine.RowProxy]": """ Iterator that reads all FS Path rows """ try: for fs_row in self.conn.execute(sa.select(self.fs_table)).\ fetchall(): yield fs_row except sa.exc.SQLAlchemyError as ex: error(f"Error reading '{self.fs_table.name}' table from FS " f"database '{self.filename}': {ex}") def fs_row_by_fsid(self, fsid: int) -> "sa.engine.RowProxy": """ Fetch FS row by FSID """ try: ret = \ self.conn.execute( sa.select(self.fs_table). where(self.fs_table.c[FS_FSID_FIELD] == fsid)).first() error_if( ret is None, f"Record with FSID of {fsid} not found in table " f"'{self.fs_table.name}' of FS database '{self.filename}'") assert ret is not None return ret except sa.exc.SQLAlchemyError as ex: error(f"Error reading '{self.fs_table.name}' table from FS " f"database '{self.filename}': {ex}") raise # Will never happen, just to pacify pylint def all_ras_rows(self) -> "Iterator[sa.engine.RowProxy]": """ Iterator that reads all RAS rows """ if self.ras_table is None: return try: for fs_row in self.conn.execute(sa.select(self.ras_table)).\ fetchall(): yield fs_row except sa.exc.SQLAlchemyError as ex: error(f"Error reading '{self.ras_table.name}' table from FS " f"database '{self.filename}': {ex}") def main(argv: List[str]) -> None: """Do the job. Arguments: argv -- Program arguments """ argument_parser = \ argparse.ArgumentParser( description="Computes path difference between two FS (aka ULS) " "Databases") argument_parser.add_argument( "--report_tiles", action="store_true", help="Print 1x1 degree tiles affected by difference") argument_parser.add_argument( "--report_paths", action="store_true", help="Print paths that constitute difference") argument_parser.add_argument( "DB1", help="First FS (aka ULS) sqlite3 database file to compare") argument_parser.add_argument( "DB2", help="Second FS (aka ULS) sqlite3 database file to compare") if not argv: argument_parser.print_help() sys.exit(1) args = argument_parser.parse_args(argv) console_handler = logging.StreamHandler() console_handler.setFormatter( logging.Formatter( f"{os.path.basename(__file__)}. " f"%(levelname)s: %(asctime)s %(message)s")) logging.getLogger().addHandler(console_handler) logging.getLogger().setLevel(logging.INFO) dbs: List[Db] = [] try: dbs.append(Db(args.DB1)) dbs.append(Db(args.DB2)) IdentHash = NamedTuple("IdentHash", [("ident", PathIdent), ("hash", int)]) IdentHashToFsid = Dict[IdentHash, int] # Per-database dictionaries that map (FS Ident, FS Hash) pairs to FSIDs # Building them... ident_hash_to_fsid_by_db: List[IdentHashToFsid] = [] for db in dbs: ident_hash_to_fsid: IdentHashToFsid = {} ident_hash_to_fsid_by_db.append(ident_hash_to_fsid) try: for fs_row in db.all_fs_rows(): fs_path = \ FsPath( fs_row=fs_row, conn=db.conn, pr_table=db.pr_table, fs_hash_fields=db.fs_hash_fields, pr_hash_fields=db.pr_hash_fields, compute_hash=True) ident_hash_to_fsid[ IdentHash(ident=fs_path.ident, hash=cast(int, fs_path.path_hash()))] = \ fs_path.fsid except sa.exc.SQLAlchemyError as ex: error(f"Error reading FS database '{db.filename}': {ex}") # Idents of FS Paths with differences mapped to FSID dictionaries # (dictionaries indexed by DB index (0 or 1) to correspondent FSIDs). # Building it ident_to_fsids_diff: Dict[PathIdent, Dict[int, int]] = {} # Loop by unique (FS Ident, FS Hash) pairs for unique_ident_hash in (set(ident_hash_to_fsid_by_db[0].keys()) ^ set(ident_hash_to_fsid_by_db[1].keys())): db_idx = 0 if unique_ident_hash in ident_hash_to_fsid_by_db[0] \ else 1 ident_to_fsids_diff.setdefault(unique_ident_hash.ident, {})[db_idx] = \ ident_hash_to_fsid_by_db[db_idx][unique_ident_hash] # Reading RAS tables from databases ras_sets: List[Set[Ras]] = [] for db in dbs: ras_sets.append(set()) for fs_row in db.all_ras_rows(): assert db.ras_hash_fields is not None ras_sets[-1].add(Ras(fs_row=fs_row, hash_fields=db.ras_hash_fields)) ras_difference = ras_sets[0] ^ ras_sets[1] # Making brief report print(f"Paths in DB1: {len(ident_hash_to_fsid_by_db[0])}") print(f"Paths in DB2: {len(ident_hash_to_fsid_by_db[1])}") print(f"Different paths: {len(ident_to_fsids_diff)}") print(f"Different RAS entries: {len(ras_difference)}") # Detailed reports if args.report_paths or args.report_tiles: # RX tiles belonging to unique paths tiles_of_difference: Set[Tile] = set() # Loop by unique FS identifiers for path_ident in sorted(ident_to_fsids_diff.keys()): # Per-database FsPoath objects paths: List[Optional[FsPath]] = [] # Filling it (and adding tiles) for db_idx, db in enumerate(dbs): fsid = ident_to_fsids_diff[path_ident].get(db_idx) if fsid is None: paths.append(None) continue try: fs_row = db.fs_row_by_fsid(fsid) fs_path = \ FsPath( fs_row=fs_row, conn=db.conn, pr_table=db.pr_table, fs_hash_fields=db.fs_hash_fields, pr_hash_fields=db.pr_hash_fields, compute_fields=args.report_paths, compute_tiles=args.report_tiles) except sa.exc.SQLAlchemyError as ex: error( f"Error reading FS database '{db.filename}': {ex}") if args.report_tiles: tiles_of_difference |= fs_path.tiles() paths.append(fs_path) if args.report_paths: if paths[0] is not None: if paths[1] is not None: diff_report = paths[0].diff_report(paths[1]) else: diff_report = "Only present in DB1" else: assert paths[1] is not None diff_report = "Only present in DB2" print(f"Difference in path {path_ident}: {diff_report}") if args.report_paths: for db_idx, ras_set in enumerate(ras_sets): for ras in (ras_set - ras_sets[1 - db_idx]): print(f"RAS '{ras.location}' from DB{db_idx + 1} not " f"present in or different from " f"DB{1 - db_idx + 1}") if args.report_tiles: for ras in ras_difference: tiles_of_difference |= set(ras.tiles()) for tile in sorted(tiles_of_difference): print(f"Difference in tile {tile}") except KeyboardInterrupt: pass finally: for db in dbs: if db: db.close() if __name__ == "__main__": main(sys.argv[1:])