mirror of
https://github.com/Telecominfraproject/openafc_final.git
synced 2025-11-01 02:27:49 +00:00
748 lines
30 KiB
Python
748 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
""" Computes path difference between two FS (ULS) Databases """
|
|
|
|
# Copyright (C) 2022 Broadcom. All rights reserved.
|
|
# The term "Broadcom" refers solely to the Broadcom Inc. corporate affiliate
|
|
# that owns the software below.
|
|
# This work is licensed under the OpenAFC Project License, a copy of which is
|
|
# included with this software program.
|
|
|
|
# pylint: disable=wrong-import-order, invalid-name, too-many-statements
|
|
# pylint: disable=too-many-branches, too-many-nested-blocks
|
|
# pylint: disable=too-many-instance-attributes, too-many-arguments
|
|
# pylint: disable=too-few-public-methods, too-many-locals, protected-access
|
|
|
|
import argparse
|
|
from collections.abc import Iterator
|
|
import enum
|
|
import logging
|
|
import math
|
|
import os
|
|
import sqlalchemy as sa
|
|
import sys
|
|
from typing import Any, cast, Dict, List, NamedTuple, Optional, Set, Tuple
|
|
|
|
|
|
# START OF DATABASE SCHEMA-DEPENDENT STUFF
|
|
|
|
# Name of table with FS Paths
|
|
FS_TABLE_NAME = "uls"
|
|
|
|
# Name of table with Passive Repeaters
|
|
PR_TABLE_NAME = "pr"
|
|
|
|
# Name of table with Restricted Areas
|
|
RAS_TABLE_NAME = "ras"
|
|
|
|
|
|
class PathIdent(NamedTuple):
|
|
""" FS Path table columns that globally uniquely identify the path
|
|
|
|
Not FSID as it is not necessarily unique across different databases) """
|
|
|
|
region: str
|
|
callsign: str
|
|
path_number: int
|
|
freq_assigned_start_mhz: float
|
|
freq_assigned_end_mhz: float
|
|
|
|
def __str__(self):
|
|
""" String representation for difference report """
|
|
return f"{self.region}:{self.callsign}/{self.path_number}" \
|
|
f"[{self.freq_assigned_start_mhz}-" \
|
|
f"{self.freq_assigned_end_mhz}]MHz"
|
|
|
|
|
|
# Fields of FS Path table to NOT include into FS path hash (besides those that
|
|
# included into PathIdent)
|
|
FS_NO_HASH_FIELDS = ["fsid", "p_rp_num", "name"]
|
|
|
|
# Fields of Passive Repeater table to NOT include into FS path hash
|
|
PR_NO_HASH_FIELDS = ["id", "fsid", "prSeq"]
|
|
|
|
# RAS table fields to NOT include into hash
|
|
RAS_NO_HASH_FIELDS = ["rasid"]
|
|
|
|
# Coordinate field names of some receiver in some database record
|
|
RxCoordFields = NamedTuple("RxCoordFields", [("lat", str), ("lon", str)])
|
|
|
|
# Fields in FS Path table that comprise coordinates of path points that affect
|
|
# AFC Engine computations. Specified in (lat, lon) order
|
|
FS_COORD_FIELDS = [RxCoordFields("rx_lat_deg", "rx_long_deg")]
|
|
|
|
# Fields in Passive Repeater table that comprise coordinates of path points.
|
|
# Specified in (lat, lon) order
|
|
PR_COORD_FIELDS = [RxCoordFields("pr_lat_deg", "pr_lon_deg")]
|
|
|
|
# Primary key field in FS Path table
|
|
FS_FSID_FIELD = "fsid"
|
|
|
|
# Foreign key field in Passive Repeater table
|
|
PR_FSID_FIELD = "fsid"
|
|
|
|
# Field in FS Path table that denotes the presence of passive Repeater in the
|
|
# path
|
|
FS_PR_FIELD = "p_rp_num"
|
|
|
|
# Field in Passive Repeater table that sorts Repeater in due order
|
|
PR_ORDER_FIELD = "prSeq"
|
|
|
|
# END OF DATABASE SCHEMA-DEPENDENT STUFF
|
|
|
|
|
|
def error(msg: str) -> None:
|
|
""" Prints given msg as error message and exits abnormally """
|
|
logging.error(msg)
|
|
sys.exit(1)
|
|
|
|
|
|
def error_if(cond: Any, msg: str) -> None:
|
|
""" If condition evaluates to true prints given msg as error message and
|
|
exits abnormally """
|
|
if cond:
|
|
error(msg)
|
|
|
|
|
|
class Tile(NamedTuple):
|
|
""" 1x1 degree tile containing some receiver """
|
|
# Minimum latitude in north-positive degrees
|
|
min_lat: int
|
|
|
|
# Minimum longitude in east-positive degrees
|
|
min_lon: int
|
|
|
|
def __str__(self) -> str:
|
|
""" String representation for difference report """
|
|
return f"[{abs(self.min_lat)}-{abs(self.min_lat + 1)}]" \
|
|
f"{'N' if self.min_lat >= 0 else 'S'}, " \
|
|
f"[{abs(self.min_lon)}-{abs(self.min_lon + 1)}]" \
|
|
f"{'E' if self.min_lon >= 0 else 'W'}"
|
|
|
|
|
|
class HashFields:
|
|
""" Fields of some table that participate in hash computation
|
|
|
|
Private attributes:
|
|
self._field_infos -- List of _FieldInfo object that specify indices (within
|
|
row) and names of fields
|
|
"""
|
|
# Field descriptor (index in DB row and name)
|
|
_FieldInfo = NamedTuple("_FieldInfo", [("name", str), ("idx", int)])
|
|
|
|
def __init__(self, table: sa.Table, exclude: List[str]) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
table -- SqlAlchemy table object (contains information about all
|
|
columns)
|
|
exclude -- List of names of fields not to include
|
|
"""
|
|
error_if(not all(ef in table.c for ef in exclude),
|
|
f"Table '{table.name}' missing these fields: "
|
|
f"{', '.join([ef for ef in exclude if ef not in table.c])}")
|
|
self._field_infos: List["HashFields._FieldInfo"] = []
|
|
for idx, col in enumerate(table.c):
|
|
if col.name not in exclude:
|
|
self._field_infos.append(
|
|
self._FieldInfo(name=col.name, idx=idx))
|
|
|
|
def get_hash_fields(self, row: "sa.engine.RowProxy") \
|
|
-> "Iterator[Tuple[str, Any]]":
|
|
""" Iterator over fields of given row
|
|
|
|
Yields (filed_name, field_value) tuples
|
|
"""
|
|
for fi in self._field_infos:
|
|
yield (fi.name, row[fi.idx])
|
|
|
|
|
|
class FsPath:
|
|
""" Information about FS Path
|
|
|
|
Public attributes:
|
|
ident -- Globally unique FS Path identifier
|
|
fsid -- Path primary key in DB
|
|
|
|
Private attributes:
|
|
_row_fields -- Dictionary of FS Path table row fields used in Path Hash
|
|
computation. None if fields computation was not requested
|
|
_prs -- List of per-repeater dictionaries of fields used in Path
|
|
Hash computation. None if fields computation was not
|
|
requested
|
|
_tiles -- Set of tiles containing receivers. None if its computation
|
|
was not requested
|
|
_path_hash -- Path hash (hash of pertinent non-identification fields in
|
|
the path) if its computation was requested, None otherwise
|
|
"""
|
|
class _CoordFieldMapping(NamedTuple):
|
|
""" Latitude/longitude field names """
|
|
lat: str
|
|
lon: str
|
|
|
|
def make_tile(self, row: "sa.engine.RowProxy") -> Tile:
|
|
""" Makes tile from latitude/longitude fields in given DB row """
|
|
return Tile(min_lat=math.floor(row[self.lat]),
|
|
min_lon=math.floor(row[self.lon]))
|
|
|
|
def __init__(self, fs_row: "sa.engine.RowProxy",
|
|
conn: sa.engine.Connection,
|
|
pr_table: sa.Table, fs_hash_fields: HashFields,
|
|
pr_hash_fields: HashFields, compute_hash: bool = False,
|
|
compute_fields: bool = False, compute_tiles: bool = False) \
|
|
-> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
fs_row -- Row from FS table
|
|
conn -- Connection (to use for reading Passive Repeater
|
|
table)
|
|
pr_table -- Passive Repeater SqlAlchemy table
|
|
fs_hash_fields -- Hash fields iterator for FS table
|
|
pr_hash_fields -- Hash fields iterator for Passive Repeater table
|
|
compute_hash -- True to compute path hash
|
|
compute_fields -- True to compute fields' dictionary
|
|
compute_tiles -- True to compute receiver tiles
|
|
"""
|
|
self.ident = PathIdent(**{f: fs_row[f] for f in PathIdent._fields})
|
|
self.fsid = fs_row[FS_FSID_FIELD]
|
|
self._row_fields: Optional[Dict[str, Any]] = \
|
|
{} if compute_fields else None
|
|
self._prs: Optional[List[Dict[str, Any]]] = \
|
|
[] if compute_fields else None
|
|
self._tiles: Optional[Set[Tile]] = set() if compute_tiles else None
|
|
hash_fields: Optional[List[Any]] = [] if compute_hash else None
|
|
for field_name, field_value in fs_hash_fields.get_hash_fields(fs_row):
|
|
if hash_fields is not None:
|
|
hash_fields.append(field_value)
|
|
if self._row_fields is not None:
|
|
self._row_fields[field_name] = field_value
|
|
if self._tiles is not None:
|
|
self._tiles |= self._tiles_from_record(fs_row, FS_COORD_FIELDS)
|
|
if fs_row[FS_PR_FIELD]:
|
|
sel = sa.select(pr_table).\
|
|
where(pr_table.c[PR_FSID_FIELD] == self.fsid).\
|
|
order_by(pr_table.c[PR_ORDER_FIELD])
|
|
for pr_row in conn.execute(sel):
|
|
pr: Optional[Dict[str, Any]] = {} if self._prs is not None \
|
|
else None
|
|
if self._prs is not None:
|
|
assert pr is not None
|
|
self._prs.append(pr)
|
|
for field_name, field_value in \
|
|
pr_hash_fields.get_hash_fields(pr_row):
|
|
if hash_fields is not None:
|
|
hash_fields.append(field_value)
|
|
if pr is not None:
|
|
pr[field_name] = field_value
|
|
if self._tiles is not None:
|
|
self._tiles |= \
|
|
self._tiles_from_record(pr_row, PR_COORD_FIELDS)
|
|
self._path_hash: Optional[int] = \
|
|
hash(tuple(hash_fields)) if hash_fields is not None else None
|
|
|
|
def path_hash(self) -> int:
|
|
""" Returns path hash """
|
|
assert self._path_hash is not None
|
|
return self._path_hash
|
|
|
|
def tiles(self) -> Set[Tile]:
|
|
""" Returns receiver tiles """
|
|
assert self._tiles is not None
|
|
return self._tiles
|
|
|
|
def diff_report(self, other: "FsPath") -> str:
|
|
""" Returns report on difference with other FsPath object """
|
|
assert self._row_fields is not None
|
|
assert self._prs is not None
|
|
assert other._row_fields is not None
|
|
assert other._prs is not None
|
|
diffs: List[str] = []
|
|
for field, this_value in self._row_fields.items():
|
|
other_value = other._row_fields[field]
|
|
if this_value != other_value:
|
|
diffs.append(f"'{field}': '{this_value}' vs '{other_value}'")
|
|
if len(self._prs) != len(other._prs):
|
|
diffs.append(f"Different number of repeaters: "
|
|
f"{len(self._prs)} vs {len(other._prs)}")
|
|
else:
|
|
for pr_idx, this_pr_dict in enumerate(self._prs):
|
|
other_pr_dict = other._prs[pr_idx]
|
|
for field, this_value in this_pr_dict.items():
|
|
other_value = other_pr_dict[field]
|
|
if this_value != other_value:
|
|
diffs.append(f"Repeater #{pr_idx + 1}, '{field}': "
|
|
f"'{this_value}' vs '{other_value}'")
|
|
return ", ".join(diffs)
|
|
|
|
def _tiles_from_record(self, row: "sa.engine.RowProxy",
|
|
rx_fields: List[RxCoordFields]) -> Set[Tile]:
|
|
""" Make set of tiles according to given list of receiver coordinates
|
|
"""
|
|
ret: Set[Tile] = set()
|
|
for rf in rx_fields:
|
|
ret.add(Tile(min_lat=math.floor(row[rf.lat]),
|
|
min_lon=math.floor(row[rf.lon])))
|
|
return ret
|
|
|
|
@classmethod
|
|
def compute_hash_fields(cls, fs_table: sa.Table, pr_table: sa.Table) \
|
|
-> Tuple[HashFields, HashFields]:
|
|
""" Computes hash fields' iterators
|
|
|
|
Arguments:
|
|
fs_table -- SqlAlchemy table object for FS
|
|
pr_table -- SqlAlchemy table object Passive Repeater
|
|
Returns (FS_iterator, PR_Iterator) tuple
|
|
"""
|
|
missing_ident_fields = \
|
|
[f for f in PathIdent._fields if f not in fs_table.c]
|
|
error_if(
|
|
missing_ident_fields,
|
|
f"Table '{fs_table.name}' does not contain following path "
|
|
f"identification fields: {', '.join(missing_ident_fields)}")
|
|
return \
|
|
(HashFields(
|
|
table=fs_table,
|
|
exclude=list(PathIdent._fields) + FS_NO_HASH_FIELDS),
|
|
HashFields(table=pr_table, exclude=PR_NO_HASH_FIELDS))
|
|
|
|
|
|
class Ras:
|
|
""" Information about single restricted area:
|
|
|
|
Public attributes:
|
|
location -- Location name
|
|
|
|
Private attributes:
|
|
_figure -- Restricted area figure (source of tile sequence)
|
|
_hash_fields -- Hash fields (represent restricted area identity)
|
|
"""
|
|
# Earth radius, same as cconst.cpp CConst::earthRadius
|
|
EARTH_RADIUS_KM = 6378.137
|
|
# Maximum imaginable SP AP height above ground in kilometers
|
|
MAX_AP_AGL_KM = 1
|
|
|
|
class RasType(enum.Enum):
|
|
""" RAS definition types. Values are DB names of these types """
|
|
Rect = "One Rectangle"
|
|
TwoRect = "Two Rectangles"
|
|
Circle = "Circle"
|
|
Horizon = "Horizon Distance"
|
|
|
|
class Point:
|
|
""" Geodetic point (latitude and longitude) """
|
|
|
|
def __init__(self, lat: float, lon: float) -> None:
|
|
if not isinstance(lat, float):
|
|
raise ValueError(f"Latitude {lat} has incorrect type")
|
|
if not isinstance(lon, float):
|
|
raise ValueError(f"Longitude {lon} has incorrect type")
|
|
self.lat = lat
|
|
self.lon = lon
|
|
|
|
class FigureBase:
|
|
""" Abstract base class for restricted area figure """
|
|
|
|
def tiles(self) -> List[Tile]:
|
|
""" Returns list of 1 degree tiles covering figure """
|
|
raise NotImplementedError(
|
|
f"{self.__class__}.tiles() not implemented")
|
|
|
|
@classmethod
|
|
def rect_tiles(cls, min_pt: "Ras.Point", max_pt: "Ras.Point") \
|
|
-> List[Tile]:
|
|
""" Returns list of 1 degree tiles covering given geodetic
|
|
rectangle
|
|
|
|
Arguments:
|
|
min_pt -- Left bottom point
|
|
max_pt -- Right top point
|
|
Returns sequence of 1 degree tiles
|
|
"""
|
|
ret: List[Tile] = []
|
|
for min_lat in range(math.floor(min_pt.lat),
|
|
math.ceil(max_pt.lat)):
|
|
for min_lon in range(math.floor(min_pt.lon),
|
|
math.ceil(max_pt.lon)):
|
|
adjusted_min_lon = min_lon
|
|
if adjusted_min_lon < -180:
|
|
adjusted_min_lon += 360
|
|
elif adjusted_min_lon >= 180:
|
|
adjusted_min_lon -= 360
|
|
ret.append(Tile(min_lat=min_lat, min_lon=adjusted_min_lon))
|
|
return ret
|
|
|
|
class Rect(FigureBase):
|
|
""" Rectangular RAS
|
|
|
|
Private attributes:
|
|
_min -- Left bottom point
|
|
_max -- Right top point
|
|
"""
|
|
|
|
def __init__(self, pt: "Ras.Point") -> None:
|
|
""" Constructor, constructs from one of corners """
|
|
self._min = pt
|
|
self._max = pt
|
|
|
|
def extend(self, pt: "Ras.Point") -> None:
|
|
""" Extending rectangle with corner point """
|
|
other_lon = pt.lon
|
|
center_lon = (self._min.lon + self._max.lon) / 2
|
|
if (other_lon - center_lon) > 180:
|
|
other_lon -= 360
|
|
elif (other_lon - center_lon) < -180:
|
|
other_lon += 360
|
|
self._min = Ras.Point(lat=min(pt.lat, self._min.lat),
|
|
lon=min(other_lon, self._min.lon))
|
|
self._max = Ras.Point(lat=max(pt.lat, self._max.lat),
|
|
lon=max(other_lon, self._max.lon))
|
|
|
|
def tiles(self) -> List[Tile]:
|
|
""" Returns sequence of covering 1 degree tiles """
|
|
return self.rect_tiles(min_pt=self._min, max_pt=self._max)
|
|
|
|
class Circle(FigureBase):
|
|
""" Round restricted area
|
|
|
|
Private attributes:
|
|
_center -- Center geodetic point
|
|
_radius_km -- Radius in kilometers
|
|
"""
|
|
|
|
def __init__(self, center: "Ras.Point") -> None:
|
|
""" Construct by center """
|
|
self._center = center
|
|
self._radius_km: float = 0
|
|
|
|
def set_radius(self, radius_km: float) -> None:
|
|
""" Sets Radius in kilometers """
|
|
if not isinstance(radius_km, float):
|
|
raise ValueError(f"Invalid circle radius: {radius_km}")
|
|
self._radius_km = radius_km
|
|
|
|
def set_horizon_radius(self, ant_agl_m: float) -> None:
|
|
""" Sets horizon radius by specifying central antenna height above
|
|
ground in meters """
|
|
if not isinstance(ant_agl_m, float):
|
|
raise ValueError(f"Invalid antenna elevation: {ant_agl_m}")
|
|
self._radius_km = math.sqrt(2 * Ras.EARTH_RADIUS_KM * 4 / 3) * \
|
|
(math.sqrt(ant_agl_m / 1000) + math.sqrt(Ras.MAX_AP_AGL_KM))
|
|
|
|
def tiles(self) -> List[Tile]:
|
|
""" Returns sequence of covering 1 degree tiles """
|
|
radian_radius = self._radius_km / Ras.EARTH_RADIUS_KM
|
|
lat_radius_deg = math.degrees(radian_radius)
|
|
lon_radius_deg = \
|
|
math.degrees(radian_radius /
|
|
math.cos(math.radians(self._center.lat)))
|
|
return \
|
|
self.rect_tiles(
|
|
min_pt=Ras.Point(lat=self._center.lat - lat_radius_deg,
|
|
lon=self._center.lon - lon_radius_deg),
|
|
max_pt=Ras.Point(lat=self._center.lat + lat_radius_deg,
|
|
lon=self._center.lon + lon_radius_deg))
|
|
|
|
def __init__(self, fs_row: "sa.engine.RowProxy", hash_fields: HashFields) \
|
|
-> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
fs_row -- Database row to construct from
|
|
hash_fields -- Retriever of identity fields
|
|
"""
|
|
try:
|
|
self.location = \
|
|
f"{fs_row.region}: {fs_row.name} @ {fs_row.location}"
|
|
ras_type = [rt for rt in self.RasType
|
|
if rt.value == fs_row.exclusionZone][0]
|
|
self._figure: "Ras.FigureBase"
|
|
if ras_type in (self.RasType.Rect, self.RasType.TwoRect):
|
|
rect = self.Rect(self.Point(lat=fs_row.rect1lat1,
|
|
lon=fs_row.rect1lon1))
|
|
rect.extend(self.Point(lat=fs_row.rect1lat2,
|
|
lon=fs_row.rect1lon2))
|
|
if ras_type == self.RasType.TwoRect:
|
|
rect.extend(self.Point(lat=fs_row.rect2lat1,
|
|
lon=fs_row.rect2lon1))
|
|
rect.extend(self.Point(lat=fs_row.rect2lat2,
|
|
lon=fs_row.rect2lon2))
|
|
self._figure = rect
|
|
elif ras_type in (self.RasType.Circle, self.RasType.Horizon):
|
|
circle = self.Circle(self.Point(lat=fs_row.centerLat,
|
|
lon=fs_row.centerLon))
|
|
if ras_type == self.RasType.Circle:
|
|
circle.set_radius(fs_row.radiusKm)
|
|
else:
|
|
assert ras_type == self.RasType.Horizon
|
|
circle.set_horizon_radius(ant_agl_m=fs_row.heightAGL)
|
|
self._figure = circle
|
|
else:
|
|
raise ValueError(f"Unsupported RAS type: {ras_type.name}")
|
|
except (LookupError, ValueError, sa.exc.SQLAlchemyError) as ex:
|
|
error(
|
|
f"RAS record '{fs_row}' has unsupported structure: {repr(ex)}")
|
|
self._hash_fields = \
|
|
tuple(v for _, v in hash_fields.get_hash_fields(fs_row))
|
|
|
|
def tiles(self) -> List[Tile]:
|
|
""" Returns sequence of covering 1 degree tiles """
|
|
return self._figure.tiles()
|
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
""" Equality comparator """
|
|
return isinstance(other, self.__class__) and \
|
|
(self._hash_fields == other._hash_fields)
|
|
|
|
def __hash__(self) -> int:
|
|
""" Hash computer """
|
|
return hash(self._hash_fields)
|
|
|
|
|
|
class Db:
|
|
""" Database access (most part of it)
|
|
|
|
Public attributes
|
|
filename -- SQLite database filename
|
|
conn -- Database connection
|
|
fs_table -- SqlAlchemy table object for FS
|
|
pr_table -- SqlAlchemy table object for Passive Repeater
|
|
ras_table -- Optional SqlAlchemy table object for RAS
|
|
fs_hash_fields -- Iterator over hash fields in FS record
|
|
pr_hash_fields -- Iterator over hash fields in Passive Repeater record
|
|
ras_hash_fields -- Optional iterator over hash fields in RAS record
|
|
"""
|
|
|
|
def __init__(self, filename: str) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
filename -- SQLite database file
|
|
"""
|
|
error_if(not os.path.isfile(filename),
|
|
f"FS SQLite database '{filename}' not found")
|
|
self.filename = filename
|
|
try:
|
|
self._engine = sa.create_engine("sqlite:///" + self.filename)
|
|
self._metadata = sa.MetaData()
|
|
self._metadata.reflect(bind=self._engine)
|
|
self.conn = self._engine.connect()
|
|
except sa.exc.SQLAlchemyError as ex:
|
|
error(f"Error opening FS database '{self.filename}': {ex}")
|
|
error_if(FS_TABLE_NAME not in self._metadata.tables,
|
|
f"'{self.filename}' does not contain table '{FS_TABLE_NAME}'")
|
|
self.fs_table = self._metadata.tables[FS_TABLE_NAME]
|
|
error_if(PR_TABLE_NAME not in self._metadata.tables,
|
|
f"'{self.filename}' does not contain table '{PR_TABLE_NAME}'")
|
|
self.pr_table = self._metadata.tables[PR_TABLE_NAME]
|
|
self.ras_table = self._metadata.tables.get(RAS_TABLE_NAME)
|
|
self.fs_hash_fields, self.pr_hash_fields = \
|
|
FsPath.compute_hash_fields(fs_table=self.fs_table,
|
|
pr_table=self.pr_table)
|
|
self.ras_hash_fields = \
|
|
None if self.ras_table is None \
|
|
else HashFields(table=self.ras_table, exclude=RAS_NO_HASH_FIELDS)
|
|
|
|
def close(self):
|
|
""" Close connection """
|
|
self.conn.close()
|
|
|
|
def all_fs_rows(self) -> "Iterator[sa.engine.RowProxy]":
|
|
""" Iterator that reads all FS Path rows """
|
|
try:
|
|
for fs_row in self.conn.execute(sa.select(self.fs_table)).\
|
|
fetchall():
|
|
yield fs_row
|
|
except sa.exc.SQLAlchemyError as ex:
|
|
error(f"Error reading '{self.fs_table.name}' table from FS "
|
|
f"database '{self.filename}': {ex}")
|
|
|
|
def fs_row_by_fsid(self, fsid: int) -> "sa.engine.RowProxy":
|
|
""" Fetch FS row by FSID """
|
|
try:
|
|
ret = \
|
|
self.conn.execute(
|
|
sa.select(self.fs_table).
|
|
where(self.fs_table.c[FS_FSID_FIELD] == fsid)).first()
|
|
error_if(
|
|
ret is None,
|
|
f"Record with FSID of {fsid} not found in table "
|
|
f"'{self.fs_table.name}' of FS database '{self.filename}'")
|
|
assert ret is not None
|
|
return ret
|
|
except sa.exc.SQLAlchemyError as ex:
|
|
error(f"Error reading '{self.fs_table.name}' table from FS "
|
|
f"database '{self.filename}': {ex}")
|
|
raise # Will never happen, just to pacify pylint
|
|
|
|
def all_ras_rows(self) -> "Iterator[sa.engine.RowProxy]":
|
|
""" Iterator that reads all RAS rows """
|
|
if self.ras_table is None:
|
|
return
|
|
try:
|
|
for fs_row in self.conn.execute(sa.select(self.ras_table)).\
|
|
fetchall():
|
|
yield fs_row
|
|
except sa.exc.SQLAlchemyError as ex:
|
|
error(f"Error reading '{self.ras_table.name}' table from FS "
|
|
f"database '{self.filename}': {ex}")
|
|
|
|
|
|
def main(argv: List[str]) -> None:
|
|
"""Do the job.
|
|
|
|
Arguments:
|
|
argv -- Program arguments
|
|
"""
|
|
argument_parser = \
|
|
argparse.ArgumentParser(
|
|
description="Computes path difference between two FS (aka ULS) "
|
|
"Databases")
|
|
argument_parser.add_argument(
|
|
"--report_tiles", action="store_true",
|
|
help="Print 1x1 degree tiles affected by difference")
|
|
argument_parser.add_argument(
|
|
"--report_paths", action="store_true",
|
|
help="Print paths that constitute difference")
|
|
argument_parser.add_argument(
|
|
"DB1",
|
|
help="First FS (aka ULS) sqlite3 database file to compare")
|
|
argument_parser.add_argument(
|
|
"DB2",
|
|
help="Second FS (aka ULS) sqlite3 database file to compare")
|
|
|
|
if not argv:
|
|
argument_parser.print_help()
|
|
sys.exit(1)
|
|
|
|
args = argument_parser.parse_args(argv)
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(
|
|
logging.Formatter(
|
|
f"{os.path.basename(__file__)}. "
|
|
f"%(levelname)s: %(asctime)s %(message)s"))
|
|
logging.getLogger().addHandler(console_handler)
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
dbs: List[Db] = []
|
|
try:
|
|
dbs.append(Db(args.DB1))
|
|
dbs.append(Db(args.DB2))
|
|
|
|
IdentHash = NamedTuple("IdentHash", [("ident", PathIdent),
|
|
("hash", int)])
|
|
IdentHashToFsid = Dict[IdentHash, int]
|
|
|
|
# Per-database dictionaries that map (FS Ident, FS Hash) pairs to FSIDs
|
|
# Building them...
|
|
ident_hash_to_fsid_by_db: List[IdentHashToFsid] = []
|
|
for db in dbs:
|
|
ident_hash_to_fsid: IdentHashToFsid = {}
|
|
ident_hash_to_fsid_by_db.append(ident_hash_to_fsid)
|
|
try:
|
|
for fs_row in db.all_fs_rows():
|
|
fs_path = \
|
|
FsPath(
|
|
fs_row=fs_row, conn=db.conn, pr_table=db.pr_table,
|
|
fs_hash_fields=db.fs_hash_fields,
|
|
pr_hash_fields=db.pr_hash_fields,
|
|
compute_hash=True)
|
|
ident_hash_to_fsid[
|
|
IdentHash(ident=fs_path.ident,
|
|
hash=cast(int, fs_path.path_hash()))] = \
|
|
fs_path.fsid
|
|
except sa.exc.SQLAlchemyError as ex:
|
|
error(f"Error reading FS database '{db.filename}': {ex}")
|
|
|
|
# Idents of FS Paths with differences mapped to FSID dictionaries
|
|
# (dictionaries indexed by DB index (0 or 1) to correspondent FSIDs).
|
|
# Building it
|
|
ident_to_fsids_diff: Dict[PathIdent, Dict[int, int]] = {}
|
|
# Loop by unique (FS Ident, FS Hash) pairs
|
|
for unique_ident_hash in (set(ident_hash_to_fsid_by_db[0].keys()) ^
|
|
set(ident_hash_to_fsid_by_db[1].keys())):
|
|
db_idx = 0 if unique_ident_hash in ident_hash_to_fsid_by_db[0] \
|
|
else 1
|
|
ident_to_fsids_diff.setdefault(unique_ident_hash.ident,
|
|
{})[db_idx] = \
|
|
ident_hash_to_fsid_by_db[db_idx][unique_ident_hash]
|
|
|
|
# Reading RAS tables from databases
|
|
ras_sets: List[Set[Ras]] = []
|
|
for db in dbs:
|
|
ras_sets.append(set())
|
|
for fs_row in db.all_ras_rows():
|
|
assert db.ras_hash_fields is not None
|
|
ras_sets[-1].add(Ras(fs_row=fs_row,
|
|
hash_fields=db.ras_hash_fields))
|
|
ras_difference = ras_sets[0] ^ ras_sets[1]
|
|
|
|
# Making brief report
|
|
print(f"Paths in DB1: {len(ident_hash_to_fsid_by_db[0])}")
|
|
print(f"Paths in DB2: {len(ident_hash_to_fsid_by_db[1])}")
|
|
print(f"Different paths: {len(ident_to_fsids_diff)}")
|
|
print(f"Different RAS entries: {len(ras_difference)}")
|
|
|
|
# Detailed reports
|
|
if args.report_paths or args.report_tiles:
|
|
# RX tiles belonging to unique paths
|
|
tiles_of_difference: Set[Tile] = set()
|
|
# Loop by unique FS identifiers
|
|
for path_ident in sorted(ident_to_fsids_diff.keys()):
|
|
# Per-database FsPoath objects
|
|
paths: List[Optional[FsPath]] = []
|
|
# Filling it (and adding tiles)
|
|
for db_idx, db in enumerate(dbs):
|
|
fsid = ident_to_fsids_diff[path_ident].get(db_idx)
|
|
if fsid is None:
|
|
paths.append(None)
|
|
continue
|
|
try:
|
|
fs_row = db.fs_row_by_fsid(fsid)
|
|
fs_path = \
|
|
FsPath(
|
|
fs_row=fs_row, conn=db.conn,
|
|
pr_table=db.pr_table,
|
|
fs_hash_fields=db.fs_hash_fields,
|
|
pr_hash_fields=db.pr_hash_fields,
|
|
compute_fields=args.report_paths,
|
|
compute_tiles=args.report_tiles)
|
|
except sa.exc.SQLAlchemyError as ex:
|
|
error(
|
|
f"Error reading FS database '{db.filename}': {ex}")
|
|
if args.report_tiles:
|
|
tiles_of_difference |= fs_path.tiles()
|
|
paths.append(fs_path)
|
|
if args.report_paths:
|
|
if paths[0] is not None:
|
|
if paths[1] is not None:
|
|
diff_report = paths[0].diff_report(paths[1])
|
|
else:
|
|
diff_report = "Only present in DB1"
|
|
else:
|
|
assert paths[1] is not None
|
|
diff_report = "Only present in DB2"
|
|
print(f"Difference in path {path_ident}: {diff_report}")
|
|
if args.report_paths:
|
|
for db_idx, ras_set in enumerate(ras_sets):
|
|
for ras in (ras_set - ras_sets[1 - db_idx]):
|
|
print(f"RAS '{ras.location}' from DB{db_idx + 1} not "
|
|
f"present in or different from "
|
|
f"DB{1 - db_idx + 1}")
|
|
if args.report_tiles:
|
|
for ras in ras_difference:
|
|
tiles_of_difference |= set(ras.tiles())
|
|
for tile in sorted(tiles_of_difference):
|
|
print(f"Difference in tile {tile}")
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
for db in dbs:
|
|
if db:
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv[1:])
|