mirror of
https://github.com/Telecominfraproject/openafc_final.git
synced 2026-01-27 02:22:02 +00:00
1261 lines
56 KiB
Python
Executable File
1261 lines
56 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
""" ULS data download control service """
|
|
|
|
# Copyright (C) 2022 Broadcom. All rights reserved.
|
|
# The term "Broadcom" refers solely to the Broadcom Inc. corporate affiliate
|
|
# that owns the software below.
|
|
# This work is licensed under the OpenAFC Project License, a copy of which is
|
|
# included with this software program.
|
|
|
|
# pylint: disable=unused-wildcard-import, wrong-import-order, wildcard-import
|
|
# pylint: disable=too-many-statements, too-many-branches, unnecessary-pass
|
|
# pylint: disable=logging-fstring-interpolation, invalid-name, too-many-locals
|
|
# pylint: disable=too-few-public-methods, too-many-arguments
|
|
# pylint: disable=too-many-nested-blocks, too-many-lines
|
|
# pylint: disable=too-many-instance-attributes
|
|
|
|
import argparse
|
|
import datetime
|
|
import glob
|
|
import logging
|
|
import os
|
|
import prometheus_client
|
|
import pydantic
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import signal
|
|
import sqlalchemy as sa
|
|
import statsd
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
from typing import cast, Dict, Iterable, List, NamedTuple, Optional, Tuple, \
|
|
Union
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
from rcache_client import RcacheClient
|
|
from rcache_models import LatLonRect, RcacheClientSettings
|
|
from uls_service_common import *
|
|
from uls_service_state_db import CheckType, DownloaderMilestone, LogType, \
|
|
safe_dsn, StateDb
|
|
|
|
# Filemask for ULS databases
|
|
ULS_FILEMASK = "*.sqlite3"
|
|
|
|
# ULS database identity table
|
|
DATA_IDS_TABLE = "data_ids"
|
|
|
|
# Identity table region column
|
|
DATA_IDS_REG_COLUMN = "region"
|
|
|
|
# Identity table region ID column
|
|
DATA_IDS_ID_COLUMN = "identity"
|
|
|
|
# Name of FSID tool script
|
|
FSID_TOOL = os.path.join(os.path.dirname(__file__), "fsid_tool.py")
|
|
|
|
# Name of FS DB Diff script
|
|
FS_DB_DIFF = os.path.join(os.path.dirname(__file__), "fs_db_diff.py")
|
|
|
|
# Name of FS AFC test script
|
|
FS_AFC = os.path.join(os.path.dirname(__file__), "fs_afc.py")
|
|
|
|
# Healthcheck script
|
|
HEALTHCHECK_SCRIPT = os.path.join(os.path.dirname(__file__),
|
|
"uls_service_healthcheck.py")
|
|
|
|
# Default StatsD port
|
|
DEFAULT_STATSD_PORT = 8125
|
|
|
|
|
|
class Settings(pydantic.BaseSettings):
|
|
""" Arguments from command lines - with their default values """
|
|
download_script: str = \
|
|
pydantic.Field(
|
|
"/mnt/nfs/rat_transfer/daily_uls_parse/daily_uls_parse.py",
|
|
env="ULS_DOWNLOAD_SCRIPT",
|
|
description="FS download script")
|
|
download_script_args: Optional[str] = \
|
|
pydantic.Field(None, env="ULS_DOWNLOAD_SCRIPT_ARGS",
|
|
description="Additional download script parameters")
|
|
region: Optional[str] = \
|
|
pydantic.Field(None, env="ULS_DOWNLOAD_REGION",
|
|
description="Download regions", no="All")
|
|
result_dir: str = \
|
|
pydantic.Field(
|
|
"/mnt/nfs/rat_transfer/ULS_Database/", env="ULS_RESULT_DIR",
|
|
description="Directory where download script puts downloaded file")
|
|
temp_dir: str = \
|
|
pydantic.Field(
|
|
"/mnt/nfs/rat_transfer/daily_uls_parse/temp/", env="ULS_TEMP_DIR",
|
|
description="Temporary directory of ULS download script, cleaned "
|
|
"before downloading")
|
|
ext_db_dir: str = \
|
|
pydantic.Field(
|
|
..., env="ULS_EXT_DB_DIR",
|
|
description="Ultimate downloaded file destination directory")
|
|
ext_db_symlink: str = \
|
|
pydantic.Field(..., env="ULS_CURRENT_DB_SYMLINK",
|
|
description="Symlink pointing to current ULS file")
|
|
fsid_file: str = \
|
|
pydantic.Field(
|
|
"/mnt/nfs/rat_transfer/daily_uls_parse/data_files/fsid_table.csv",
|
|
env="ULS_FSID_FILE",
|
|
description="FSID file location expected by ULS download script")
|
|
ext_ras_database: str = \
|
|
pydantic.Field(..., env="ULS_EXT_RAS_DATABASE",
|
|
description="RAS database")
|
|
ras_database: str = \
|
|
pydantic.Field(
|
|
"/mnt/nfs/rat_transfer/daily_uls_parse/data_files/RASdatabase.dat",
|
|
env="ULS_RAS_DATABASE",
|
|
description="Where from ULS script reads RAS database")
|
|
service_state_db_dsn: str = \
|
|
pydantic.Field(
|
|
..., env="ULS_SERVICE_STATE_DB_DSN",
|
|
description="Connection string to service state database",
|
|
convert=safe_dsn)
|
|
service_state_db_create_if_absent: bool = \
|
|
pydantic.Field(
|
|
True, env="ULS_SERVICE_STATE_DB_CREATE_IF_ABSENT",
|
|
description="Create service state database if it is absent")
|
|
service_state_db_recreate: bool = \
|
|
pydantic.Field(
|
|
False, env="ULS_SERVICE_STATE_DB_RECREATE",
|
|
description="Recreate service state database if it exists")
|
|
prometheus_port: Optional[int] = \
|
|
pydantic.Field(None, env="ULS_PROMETHEUS_PORT",
|
|
description="Port to serve Prometheus metrics on")
|
|
statsd_server: Optional[str] = \
|
|
pydantic.Field(None, env="ULS_STATSD_SERVER",
|
|
description="StatsD server to send metrics to")
|
|
check_ext_files: Optional[List[str]] = \
|
|
pydantic.Field(
|
|
"https://raw.githubusercontent.com/Wireless-Innovation-Forum/"
|
|
"6-GHz-AFC/main/data/common_data"
|
|
":raw_wireless_innovation_forum_files"
|
|
":antenna_model_diameter_gain.csv,billboard_reflector.csv,"
|
|
"category_b1_antennas.csv,high_performance_antennas.csv,"
|
|
"fcc_fixed_service_channelization.csv,"
|
|
"transmit_radio_unit_architecture.csv",
|
|
env="ULS_CHECK_EXT_FILES",
|
|
description="Verify that that files are the same as in internet",
|
|
no="None")
|
|
max_change_percent: Optional[float] = \
|
|
pydantic.Field(
|
|
10., env="ULS_MAX_CHANGE_PERCENT",
|
|
description="Limit on number of paths changed",
|
|
convert=lambda v: f"{v}%" if v else "Don't check")
|
|
afc_url: Optional[str] = \
|
|
pydantic.Field(
|
|
None, env="ULS_AFC_URL",
|
|
description="AFC Service URL to use for database validity check",
|
|
no="Don't check")
|
|
afc_parallel: Optional[int] = \
|
|
pydantic.Field(
|
|
None, env="ULS_AFC_PARALLEL",
|
|
description="Number of parallel AFC Requests to use when doing "
|
|
"validity check", no="fs_afc.py's default")
|
|
rcache_url: Optional[str] = \
|
|
pydantic.Field(None, env="RCACHE_SERVICE_URL",
|
|
description="Rcache service url",
|
|
no="Don't do spatial invalidation")
|
|
rcache_enabled: bool = \
|
|
pydantic.Field(True, env="RCACHE_ENABLED",
|
|
description="Rcache spatial invalidation",
|
|
yes="Enabled", no="Disabled")
|
|
delay_hr: float = \
|
|
pydantic.Field(0., env="ULS_DELAY_HR",
|
|
description="Hours to delay first download by")
|
|
interval_hr: float = \
|
|
pydantic.Field(4, env="ULS_INTERVAL_HR",
|
|
description="Download interval in hours")
|
|
timeout_hr: float = \
|
|
pydantic.Field(1, env="ULS_TIMEOUT_HR",
|
|
description="Download maximum duration in hours")
|
|
nice: bool = \
|
|
pydantic.Field(False, env="ULS_NICE",
|
|
description="Run in lowered (nice) priority")
|
|
verbose: bool = \
|
|
pydantic.Field(False, description="Print debug info")
|
|
run_once: bool = \
|
|
pydantic.Field(False, env="ULS_RUN_ONCE",
|
|
description="Run", yes="Once", no="Indefinitely")
|
|
force: bool = \
|
|
pydantic.Field(False,
|
|
description="Force FS database update (even if not "
|
|
"changed or found invalid)")
|
|
|
|
@pydantic.validator("check_ext_files", pre=True)
|
|
@classmethod
|
|
def check_ext_files_str_to_list(cls, v: Any) -> Any:
|
|
""" Converts string value of 'check_ext_files' from environment from
|
|
string to list (as it is list in argparse) """
|
|
return [v] if v and isinstance(v, str) else v
|
|
|
|
@pydantic.validator("statsd_server", pre=False)
|
|
@classmethod
|
|
def check_statsd_server(cls, v: Any) -> Any:
|
|
""" Applies default StatsD port """
|
|
if v:
|
|
if ":" not in v:
|
|
v = f"{v}:{DEFAULT_STATSD_PORT}"
|
|
else:
|
|
host, port = v.split(":", 1)
|
|
int(port)
|
|
assert host
|
|
return v
|
|
|
|
@pydantic.root_validator(pre=True)
|
|
@classmethod
|
|
def remove_empty(cls, v: Any) -> Any:
|
|
""" Prevalidator that removes empty values (presumably from environment
|
|
variables) to force use of defaults
|
|
"""
|
|
if not isinstance(v, dict):
|
|
return v
|
|
for key, value in list(v.items()):
|
|
if value in (None, "", []):
|
|
del v[key]
|
|
return v
|
|
|
|
|
|
class ProcessingException(Exception):
|
|
""" ULS processing exception """
|
|
pass
|
|
|
|
|
|
def print_args(settings: Settings) -> None:
|
|
""" Print invocation parameters to log """
|
|
logging.info("FS downloader started with the following parameters")
|
|
for name, model_field in settings.__fields__.items():
|
|
value = getattr(settings, name)
|
|
extra = getattr(model_field.field_info, "extra", {})
|
|
value_repr: str
|
|
if "convert" in extra:
|
|
value_repr = extra["convert"](value)
|
|
elif value and ("yes" in extra):
|
|
value_repr = extra["yes"]
|
|
elif (not value) and ("no" in extra):
|
|
value_repr = extra["no"]
|
|
elif model_field.type_ == bool:
|
|
value_repr = "Yes" if value else "No"
|
|
else:
|
|
value_repr = str(value)
|
|
logging.info(f"{model_field.field_info.description}: {value_repr}")
|
|
|
|
|
|
class LoggingExecutor:
|
|
""" Program executor that collects output
|
|
|
|
Private attributes:
|
|
_lines -- Output lines
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._lines: List[str] = []
|
|
|
|
def execute(self, cmd: Union[str, List[str]], fail_on_error: bool,
|
|
return_output: bool = False, cwd: Optional[str] = None,
|
|
timeout_sec: Optional[float] = None) -> \
|
|
Union[bool, Optional[str]]:
|
|
""" Execute command
|
|
|
|
Arguments:
|
|
cmd -- Command as string or list of strings
|
|
fail_on_error -- True to raise exception on error, False to return
|
|
failure code on error
|
|
return_output -- True to return output (None on failure), False to
|
|
return boolean success status
|
|
cwd -- Directory to execute in or None
|
|
timeout_sec -- Timeout in seconds or None
|
|
Returns If 'return_output' - returns output/None on success/failure,
|
|
otherwise returns boolean success status
|
|
"""
|
|
ret_lines: Optional[List[str]] = [] if return_output else None
|
|
success = True
|
|
timed_out = False
|
|
|
|
def killer_timer(pgid: int) -> None:
|
|
""" Kills given process by process group id (rumors are that
|
|
os.kill() only adequate if shell=False)
|
|
"""
|
|
try:
|
|
os.killpg(pgid, signal.SIGTERM)
|
|
timed_out = True
|
|
except OSError:
|
|
pass
|
|
|
|
self._lines.append(
|
|
"> " +
|
|
(cmd if isinstance(cmd, str)
|
|
else ''.join(shlex.quote(arg) for arg in cmd)) +
|
|
"\n")
|
|
try:
|
|
with subprocess.Popen(cmd, shell=isinstance(cmd, str), text=True,
|
|
encoding="utf-8", stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, bufsize=0,
|
|
cwd=cwd) as p:
|
|
timer: Optional[threading.Timer] = \
|
|
threading.Timer(timeout_sec, killer_timer,
|
|
kwargs={"pgid": os.getpgid(p.pid)}) \
|
|
if timeout_sec is not None else None
|
|
assert p.stdout is not None
|
|
for line in p.stdout:
|
|
print(line, end="", flush=True)
|
|
self._lines.append(line)
|
|
if ret_lines is not None:
|
|
ret_lines.append(line)
|
|
p.wait()
|
|
if timer is not None:
|
|
timer.cancel()
|
|
if timed_out:
|
|
raise subprocess.TimeoutExpired(cmd,
|
|
cast(float, timeout_sec))
|
|
if p.returncode:
|
|
raise subprocess.CalledProcessError(p.returncode, p.args)
|
|
except (OSError, subprocess.SubprocessError) as ex:
|
|
success = False
|
|
self._lines.append(f"{ex}\n")
|
|
if fail_on_error:
|
|
raise
|
|
if return_output:
|
|
assert ret_lines is not None
|
|
return "".join(ret_lines) if success else None
|
|
return success
|
|
|
|
def get_output(self) -> str:
|
|
""" Returns and resets accumulated output """
|
|
ret = "".join(self._lines)
|
|
self._lines = []
|
|
return ret
|
|
|
|
|
|
class StatusUpdater:
|
|
""" Updater of State DB and Prometheus/StatsD metrics
|
|
|
|
Private attributes:
|
|
_state_db -- StateDb object to update
|
|
_prometheus_metrics -- By-milestone dictionary of region-inspecific
|
|
Prometheus metrics. Metrics are gauges,
|
|
containing seconds since epoch of recent
|
|
milestone occurrence. Empty if Prometheus
|
|
metrics are not served
|
|
_prometheus_region_metrics -- By-milestone dictionary of region-specific
|
|
Prometheus metrics. Metrics are gauges,
|
|
containing seconds since epoch of recent
|
|
milestone occurrence. Each metric has
|
|
'region' label. Empty if Prometheus metrics\
|
|
are not served
|
|
_prometheus_check_metric -- None or Prometheus gauge, with 'check_type'
|
|
label (containing name of CheckType item),
|
|
containing 1 if check passed, 0 if not
|
|
_statsd_connection -- statsd.Connection object or None
|
|
_statsd_metrics -- By-milestone dictionary of region-inspecific
|
|
StatsD metrics. Metrics are gauges,
|
|
containing seconds since epoch of recent
|
|
milestone occurrence. Empty if StatsD metrics
|
|
are not served
|
|
_statsd_region_metrics -- By-milestone dictionary of region-specific
|
|
StatsD metric descriptors. Empty if StatsD
|
|
metrics are not served
|
|
_statsd_check_metrics -- None or check result StatsD gauge descriptor
|
|
"""
|
|
# StatsD metric descriptor (working around the absence of labels in StatsD
|
|
# by putting them into name)
|
|
_StatsdLabeledMetricInfo = \
|
|
NamedTuple(
|
|
"_StatsdLabeledMetricInfo",
|
|
[
|
|
# String.format()-compatible pattern, containing placeholder for
|
|
# label value
|
|
("pattern", str),
|
|
# Dictionary of StatsD metrics, indexed by label value name
|
|
("metrics", Dict[str, statsd.Gauge])])
|
|
|
|
def __init__(self, state_db: StateDb, prometheus_port: Optional[int],
|
|
statsd_server: Optional[str]) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
state_db -- StateDb object
|
|
prometheus_port -- Port to serve Prometheus metrics on or None
|
|
statsd_server -- Address of StatsD server to send metrics to or None
|
|
"""
|
|
self._state_db = state_db
|
|
self._prometheus_metrics: Dict[DownloaderMilestone, Any] = {}
|
|
self._prometheus_region_metrics: Dict[DownloaderMilestone, Any] = {}
|
|
self._prometheus_check_metric: Any = None
|
|
self._statsd_connection: Optional[statsd.Connection] = None
|
|
self._statsd_metrics: Dict[DownloaderMilestone, Any] = {}
|
|
self._statsd_region_metrics: \
|
|
Dict[DownloaderMilestone,
|
|
"StatusUpdater._StatsdLabeledMetricInfo"] = {}
|
|
self._statsd_check_metrics: \
|
|
Optional["StatusUpdater._StatsdLabeledMetricInfo"] = None
|
|
if prometheus_port is not None:
|
|
prometheus_client.start_http_server(prometheus_port)
|
|
self._prometheus_metrics[DownloaderMilestone.DownloadStart] = \
|
|
prometheus_client.Gauge(
|
|
"fs_download_started",
|
|
"Seconds since epoch of last downloader script run")
|
|
self._prometheus_metrics[DownloaderMilestone.DownloadSuccess] = \
|
|
prometheus_client.Gauge(
|
|
"fs_download_succeeded",
|
|
"Seconds since epoch of last downloader script success")
|
|
self._prometheus_metrics[DownloaderMilestone.DbUpdated] = \
|
|
prometheus_client.Gauge(
|
|
"fs_download_database_updated",
|
|
"Seconds since epoch of last FS database file update")
|
|
self._prometheus_region_metrics[
|
|
DownloaderMilestone.RegionChanged] = \
|
|
prometheus_client.Gauge(
|
|
"fs_download_region_changed",
|
|
"Seconds since epoch of last region changed", ["region"])
|
|
self._prometheus_check_metric = \
|
|
prometheus_client.Gauge(
|
|
"fs_download_check_passed",
|
|
"Recent check state (1 - success, 0 - failure)",
|
|
["check_type"])
|
|
if statsd_server:
|
|
host, port = statsd_server.split(":", 1)
|
|
self._statsd_connection = \
|
|
statsd.Connection(host=host, port=int(port))
|
|
self._statsd_metrics[DownloaderMilestone.DownloadStart] = \
|
|
statsd.Gauge("fs_download_started")
|
|
self._statsd_metrics[DownloaderMilestone.DownloadSuccess] = \
|
|
statsd.Gauge("fs_download_succeeded")
|
|
self._statsd_metrics[DownloaderMilestone.DbUpdated] = \
|
|
statsd.Gauge("fs_download_database_updated")
|
|
self._statsd_region_metrics[DownloaderMilestone.RegionChanged] = \
|
|
self._StatsdLabeledMetricInfo(
|
|
pattern="fs_download_region_changed_{region}", metrics={})
|
|
self._statsd_check_metrics = \
|
|
self._StatsdLabeledMetricInfo(
|
|
pattern="fs_download_check_passed_{check_type}",
|
|
metrics={})
|
|
|
|
def milestone(self, milestone: DownloaderMilestone,
|
|
updated_regions: Optional[Iterable[str]] = None,
|
|
all_regions: Optional[List[str]] = None) -> None:
|
|
""" Update milestone
|
|
|
|
Arguments:
|
|
milestone -- Milestone to write
|
|
updated_regions -- List of region strings of updated regions, None for
|
|
region-inspecific milestones
|
|
all_regions -- List of all region strings, None for
|
|
region-inspecific milestones
|
|
"""
|
|
self._state_db.write_milestone(
|
|
milestone=milestone, updated_regions=updated_regions,
|
|
all_regions=all_regions)
|
|
seconds_since_epoch = int(time.time())
|
|
if milestone in self._prometheus_metrics:
|
|
self._prometheus_metrics[milestone].set(seconds_since_epoch)
|
|
if (milestone in self._prometheus_region_metrics) and updated_regions:
|
|
for region in updated_regions:
|
|
self._prometheus_region_metrics[milestone].\
|
|
labels(region=region).set(seconds_since_epoch)
|
|
if milestone in self._statsd_metrics:
|
|
self._statsd_metrics[milestone].send(seconds_since_epoch)
|
|
if (milestone in self._statsd_region_metrics) and updated_regions:
|
|
mi = self._statsd_region_metrics[milestone]
|
|
for region in updated_regions:
|
|
metric: Optional[statsd.Gauge] = mi.metrics.get(region)
|
|
if not metric:
|
|
metric = statsd.Gauge(mi.pattern.format(region=region))
|
|
mi.metrics[region] = metric
|
|
metric.send(seconds_since_epoch)
|
|
|
|
def status_check(self, check_type: CheckType,
|
|
results: Optional[Dict[str, Optional[str]]] = None) \
|
|
-> None:
|
|
""" Update status check results
|
|
|
|
Arguments:
|
|
check_type -- Type of check
|
|
results -- Itemized results: dictionary contained error message for
|
|
failed checks, None for succeeded ones
|
|
"""
|
|
self._state_db.write_check_results(check_type=check_type,
|
|
results=results)
|
|
success = all(result is None for result in (results or {}).values())
|
|
if self._prometheus_check_metric:
|
|
self._prometheus_check_metric.labels(check_type=check_type.name).\
|
|
set(1 if success else 0)
|
|
if self._statsd_check_metrics:
|
|
metric: Optional[statsd.Gauge] = \
|
|
self._statsd_check_metrics.metrics.get(check_type.name)
|
|
if not metric:
|
|
metric = \
|
|
statsd.Gauge(
|
|
self._statsd_check_metrics.pattern.format(
|
|
check_type=check_type.name))
|
|
self._statsd_check_metrics.metrics[check_type.name] = metric
|
|
metric.send(1 if success else 0)
|
|
|
|
|
|
def extract_fsid_table(uls_file: str, fsid_file: str,
|
|
executor: LoggingExecutor) -> None:
|
|
""" Try to extract FSID table from ULS database:
|
|
|
|
Arguments:
|
|
uls_file -- FS database filename
|
|
fsid_file -- FSID table filename
|
|
executor -- LoggingExecutor object
|
|
"""
|
|
# Clean previous FSID files...
|
|
fsid_name_parts = os.path.splitext(fsid_file)
|
|
logging.info("Extracting FSID table")
|
|
if not executor.execute(f"rm -f {fsid_name_parts[0]}*{fsid_name_parts[1]}",
|
|
fail_on_error=False):
|
|
logging.warning(f"Strangely can't remove "
|
|
f"{fsid_name_parts[0]}*{fsid_name_parts[1]}. "
|
|
f"Proceeding nevertheless")
|
|
# ... and extracting latest one from previous FS database
|
|
if os.path.isfile(uls_file):
|
|
if executor.execute([FSID_TOOL, "check", uls_file],
|
|
fail_on_error=False):
|
|
executor.execute([FSID_TOOL, "extract", uls_file, fsid_file],
|
|
fail_on_error=True)
|
|
|
|
|
|
def get_uls_identity(uls_file: str) -> Optional[Dict[str, str]]:
|
|
""" Read regions' identity from ULS database
|
|
|
|
Arguments:
|
|
uls_file -- ULS database
|
|
Returns dictionary of region identities indexed by region name
|
|
"""
|
|
if not os.path.isfile(uls_file):
|
|
return None
|
|
engine = sa.create_engine("sqlite:///" + uls_file)
|
|
conn = engine.connect()
|
|
try:
|
|
metadata = sa.MetaData()
|
|
metadata.reflect(bind=engine)
|
|
id_table = metadata.tables.get(DATA_IDS_TABLE)
|
|
if id_table is None:
|
|
return None
|
|
if not all(col in id_table.c for col in (DATA_IDS_REG_COLUMN,
|
|
DATA_IDS_ID_COLUMN)):
|
|
return None
|
|
ret: Dict[str, str] = {}
|
|
for row in conn.execute(sa.select(id_table)).fetchall():
|
|
ret[row[DATA_IDS_REG_COLUMN]] = row[DATA_IDS_ID_COLUMN]
|
|
return ret
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_uls_file(uls_dir: str, uls_file: str, symlink: str,
|
|
executor: LoggingExecutor) -> None:
|
|
""" Atomically retargeting symlink to new ULS file
|
|
|
|
Arguments:
|
|
uls_dir -- Directory containing ULS files and symlink
|
|
uls_file -- Base name of new ULS file (already in ULS directory)
|
|
symlink -- Base name of symlink pointing to current ULS file
|
|
executor -- LoggingExecutor object
|
|
"""
|
|
# Getting random name for temporary symlink
|
|
fd, temp_symlink_name = tempfile.mkstemp(dir=uls_dir, suffix="_" + symlink)
|
|
os.close(fd)
|
|
os.unlink(temp_symlink_name)
|
|
# Name race condition is astronomically improbable, as name is random and
|
|
# downloader is one (except for development environment)
|
|
|
|
assert uls_file == os.path.basename(uls_file)
|
|
assert os.path.isfile(os.path.join(uls_dir, uls_file))
|
|
os.symlink(uls_file, temp_symlink_name)
|
|
|
|
executor.execute(
|
|
["mv", "-fT", temp_symlink_name, os.path.join(uls_dir, symlink)],
|
|
fail_on_error=True)
|
|
logging.info(f"FS database symlink '{os.path.join(uls_dir, symlink)}' "
|
|
f"now points to '{uls_file}'")
|
|
|
|
|
|
class DbDiff:
|
|
""" Computes and holds difference between two FS (aka ULS) databases
|
|
|
|
Public attributes:
|
|
valid -- True if difference is valid
|
|
prev_len -- Number of paths in previous database
|
|
new_len -- Number of paths in new database
|
|
diff_len -- Number of different paths
|
|
ras_diff_len -- Number of different RAS entries
|
|
diff_tiles -- Tiles containing receivers of different paths
|
|
"""
|
|
|
|
def __init__(self, prev_filename: str, new_filename: str,
|
|
executor: LoggingExecutor) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
prev_filename -- Previous file name
|
|
new_filename -- New filename
|
|
executor -- LoggingExecutor object
|
|
"""
|
|
self.valid = False
|
|
self.prev_len = 0
|
|
self.new_len = 0
|
|
self.diff_len = 0
|
|
self.diff_tiles: List[LatLonRect] = []
|
|
logging.info("Getting differences with previous database")
|
|
output = \
|
|
executor.execute(
|
|
[FS_DB_DIFF, "--report_tiles", prev_filename, new_filename],
|
|
timeout_sec=10 * 60, return_output=True, fail_on_error=False)
|
|
if output is None:
|
|
logging.error("Database comparison failed")
|
|
return
|
|
m = re.search(r"Paths in DB1:\s+(?P<db1>\d+)(.|\n)+"
|
|
r"Paths in DB2:\s+(?P<db2>\d+)(.|\n)+"
|
|
r"Different paths:\s+(?P<diff>\d+)(.|\n)+"
|
|
r"Different RAS entries:\s+(?P<ras_diff>\d+)(.|\n)+",
|
|
cast(str, output))
|
|
if m is None:
|
|
logging.error(
|
|
f"Output of '{FS_DB_DIFF}' has unrecognized structure")
|
|
return
|
|
self.prev_len = int(cast(str, m.group("db2")))
|
|
self.new_len = int(cast(str, m.group("db2")))
|
|
self.diff_len = int(cast(str, m.group("diff")))
|
|
self.ras_diff_len = int(cast(str, m.group("ras_diff")))
|
|
for m in re.finditer(
|
|
r"Difference in tile "
|
|
r"\[(?P<min_lat>[0-9.]+)-(?P<max_lat>[0-9.]+)\]"
|
|
r"(?P<lat_sign>[NS]), "
|
|
r"\[(?P<min_lon>[0-9.]+)-(?P<max_lon>[0-9.]+)\]"
|
|
r"(?P<lon_sign>[EW])", cast(str, output)):
|
|
lat_sign = 1 if m.group("lat_sign") == "N" else -1
|
|
lon_sign = 1 if m.group("lon_sign") == "E" else -1
|
|
self.diff_tiles.append(
|
|
LatLonRect(
|
|
min_lat=float(m.group("min_lat")) * lat_sign,
|
|
max_lat=float(m.group("max_lat")) * lat_sign,
|
|
min_lon=float(m.group("min_lon")) * lon_sign,
|
|
max_lon=float(m.group("max_lon")) * lon_sign))
|
|
logging.info(
|
|
f"Database comparison succeeded: "
|
|
f"{os.path.basename(prev_filename)} has {self.prev_len} paths, "
|
|
f"{os.path.basename(new_filename)} has {self.new_len} paths, "
|
|
f"difference is in {self.diff_len} paths, "
|
|
f"{self.ras_diff_len} RAS entries, "
|
|
f"{len(self.diff_tiles)} tiles")
|
|
self.valid = True
|
|
|
|
|
|
class UlsFileChecker:
|
|
""" Checker of ULS database validity
|
|
|
|
Private attributes:
|
|
_max_change_percent -- Optional percent of maximum difference
|
|
_afc_url -- Optional AFC Service URL to test ULS database on
|
|
_afc_parallel -- Optional number of parallel AFC requests to make
|
|
during database verification
|
|
_regions -- List of regions to test database on. Empty if on all
|
|
regions
|
|
_executor -- LoggingExecutor object
|
|
_status_updater -- StatusUpdater object
|
|
"""
|
|
|
|
def __init__(self, executor: LoggingExecutor,
|
|
status_updater: StatusUpdater,
|
|
max_change_percent: Optional[float] = None,
|
|
afc_url: Optional[str] = None,
|
|
afc_parallel: Optional[int] = None,
|
|
regions: Optional[List[str]] = None) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
executor -- LoggingExecutor object
|
|
status_updater -- StatusUpdater object
|
|
max_change_percent -- Optional percent of maximum difference
|
|
afc_url -- Optional AFC Service URL to test ULS database on
|
|
afc_parallel -- Optional number of parallel AFC requests to make
|
|
during database verification
|
|
regions -- Optional list of regions to test database on. If
|
|
empty or None - test on all regions
|
|
"""
|
|
self._executor = executor
|
|
self._status_updater = status_updater
|
|
self._max_change_percent = max_change_percent
|
|
self._afc_url = afc_url
|
|
self._afc_parallel = afc_parallel
|
|
self._regions: List[str] = regions or []
|
|
|
|
def valid(self, base_dir: str, new_filename: str,
|
|
db_diff: Optional[DbDiff]) -> bool:
|
|
""" Checks validity of given database
|
|
|
|
Arguments:
|
|
base_dir -- Directory, containing database being checked. This
|
|
argument is currently unused
|
|
new_filename -- Database being checked. Should have exactly same path
|
|
as required in AFC Config
|
|
db_diff -- None or difference from previous database
|
|
Returns True if check passed
|
|
"""
|
|
check_results: Dict[str, Optional[str]] = {}
|
|
for item, (tested, errmsg) in \
|
|
[("difference from previous", self._check_diff(db_diff)),
|
|
("usable by AFC Service", self._check_afc(new_filename))]:
|
|
if not tested:
|
|
continue
|
|
if errmsg is not None:
|
|
logging.error(errmsg)
|
|
check_results[item] = errmsg
|
|
self._status_updater.status_check(CheckType.FsDatabase, check_results)
|
|
return all(errmsg is None for errmsg in check_results.values())
|
|
|
|
def _check_diff(self, db_diff: Optional[DbDiff]) \
|
|
-> Tuple[bool, Optional[str]]:
|
|
""" Checks amount of difference since previous database.
|
|
Returns (check_performed, error_message) tuple """
|
|
if db_diff is None:
|
|
return (False, None)
|
|
if not db_diff.valid:
|
|
return (True, "Database difference can't be obtained")
|
|
if ((db_diff.diff_len == 0) and (db_diff.ras_diff_len == 0)) != \
|
|
(len(db_diff.diff_tiles) == 0):
|
|
return \
|
|
(True,
|
|
f"Inconsistent indication of database difference: difference "
|
|
f"is in {db_diff.diff_len} paths and in "
|
|
f"{db_diff.ras_diff_len} RAS entries, but in "
|
|
f"{len(db_diff.diff_tiles)} tiles")
|
|
if self._max_change_percent is None:
|
|
return (False, None)
|
|
diff_percent = \
|
|
round(
|
|
100 if db_diff.new_len == 0 else
|
|
((db_diff.diff_len * 100) / db_diff.new_len),
|
|
3)
|
|
if diff_percent > self._max_change_percent:
|
|
return \
|
|
(True,
|
|
f"Database changed by {diff_percent}%, which exceeds the "
|
|
f"limit of {self._max_change_percent}%")
|
|
return (True, None)
|
|
|
|
def _check_afc(self, new_filename: str) -> Tuple[bool, Optional[str]]:
|
|
""" Checks new database against AFC Service
|
|
|
|
Arguments:
|
|
new_filename -- Database being checked. Should have exactly same path
|
|
as required in AFC Config
|
|
Returns (check_performed, error_message) tuple
|
|
"""
|
|
if self._afc_url is None:
|
|
return (False, None)
|
|
logging.info("Testing new FS database on AFC Service")
|
|
args = [FS_AFC, "--server_url", self._afc_url] + \
|
|
(["--parallel", str(self._afc_parallel)]
|
|
if self._afc_parallel is not None else []) + \
|
|
[f"--region={r}" for r in self._regions] + [new_filename]
|
|
logging.debug(" ".join(shlex.quote(arg) for arg in args))
|
|
if not self._executor.execute(args, fail_on_error=False,
|
|
timeout_sec=30 * 60):
|
|
return (True, "AFC Service test failed")
|
|
return (True, None)
|
|
|
|
|
|
class ExtParamFilesChecker:
|
|
""" Checks that external parameter file match those in image
|
|
|
|
Private attributes:
|
|
_epf_list -- List of external parameter file sets descriptors
|
|
(_ExtParamFiles objects) or None
|
|
_script_dir -- Downloader script directory or None
|
|
_status_updater -- StatusUpdater object
|
|
"""
|
|
|
|
# Descriptor of external parameter files
|
|
_ExtParamFiles = \
|
|
NamedTuple("_ExtParamFiles",
|
|
[
|
|
# Location in the internet
|
|
("base_url", str),
|
|
# Downloader script subdirectory
|
|
("subdir", str), ("files", List[str])])
|
|
|
|
def __init__(self, status_updater: StatusUpdater,
|
|
ext_files_arg: Optional[List[str]] = None,
|
|
script_dir: Optional[str] = None) -> None:
|
|
""" Constructor
|
|
|
|
Arguments:
|
|
status_updater -- StatusUpdater object
|
|
ext_files_arg -- List of 'BASE_URL:SUBDIR:FILES,FILE...' groups,
|
|
separated with semicolon: external parameter file
|
|
descriptors from command line
|
|
script_dir -- Downloader script directory
|
|
"""
|
|
self._epf_list: \
|
|
Optional[List["ExtParamFilesChecker._ExtParamFiles"]] = None
|
|
if ext_files_arg is not None:
|
|
self._epf_list = []
|
|
for efg in ext_files_arg:
|
|
for ef in efg.split(";"):
|
|
m = re.match(r"^(.*):(.+):(.+)$", ef)
|
|
error_if(m is None,
|
|
f"Invalid format of --check_ext_files parameter: "
|
|
f"'{ef}'")
|
|
assert m is not None
|
|
self._epf_list.append(
|
|
self._ExtParamFiles(base_url=m.group(1).rstrip("/"),
|
|
subdir=m.group(2),
|
|
files=m.group(3).split(",")))
|
|
self._script_dir = script_dir
|
|
self._status_updater = status_updater
|
|
|
|
def check(self) -> None:
|
|
""" Check that external files matches those in image """
|
|
assert (self._epf_list is not None) and (self._script_dir is not None)
|
|
temp_dir: Optional[str] = None
|
|
check_results: Dict[str, Optional[str]] = {}
|
|
try:
|
|
temp_dir = tempfile.mkdtemp()
|
|
for epf in self._epf_list:
|
|
for filename in epf.files:
|
|
errmsg: Optional[str] = None
|
|
try:
|
|
internal_file_name = \
|
|
os.path.join(self._script_dir, epf.subdir,
|
|
filename)
|
|
if not os.path.isfile(internal_file_name):
|
|
errmsg = "Absent in container"
|
|
continue
|
|
try:
|
|
url = f"{epf.base_url}/{filename}"
|
|
external_file_name = os.path.join(temp_dir,
|
|
filename)
|
|
urllib.request.urlretrieve(url, external_file_name)
|
|
except urllib.error.URLError as ex:
|
|
logging.warning(f"Error downloading '{url}': {ex}")
|
|
errmsg = "Download failed"
|
|
continue
|
|
contents: List[bytes] = []
|
|
try:
|
|
for fn in (internal_file_name, external_file_name):
|
|
with open(fn, "rb") as f:
|
|
contents.append(f.read())
|
|
except OSError as ex:
|
|
logging.warning(f"Read failed: {ex}")
|
|
errmsg = "Read failed"
|
|
continue
|
|
if contents[0] != contents[1]:
|
|
errmsg = "External content changed"
|
|
finally:
|
|
check_results[os.path.join(epf.subdir, filename)] = \
|
|
errmsg
|
|
finally:
|
|
self._status_updater.status_check(check_type=CheckType.ExtParams,
|
|
results=check_results)
|
|
if temp_dir:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
|
|
def main(argv: List[str]) -> None:
|
|
"""Do the job.
|
|
|
|
Arguments:
|
|
argv -- Program arguments
|
|
"""
|
|
argument_parser = argparse.ArgumentParser(
|
|
description="ULS data download control service")
|
|
argument_parser.add_argument(
|
|
"--download_script", metavar="ULS_PARSER_SCRIPT",
|
|
help=f"ULS download script{env_help(Settings, 'download_script')}")
|
|
argument_parser.add_argument(
|
|
"--download_script_args", metavar="ARGS",
|
|
help=f"Optional additional arguments to pass to ULS download script"
|
|
f"{env_help(Settings, 'download_script_args')}")
|
|
argument_parser.add_argument(
|
|
"--region", metavar="REG1[:REG2[:REG3...]]",
|
|
help=f"Colon-separated list of regions to download. Default is all "
|
|
f"regions{env_help(Settings, 'region')}")
|
|
argument_parser.add_argument(
|
|
"--result_dir", metavar="RESULT_DIR",
|
|
help=f"Directory where ULS download script puts resulting database"
|
|
f"{env_help(Settings, 'result_dir')}")
|
|
argument_parser.add_argument(
|
|
"--temp_dir", metavar="TEMP_DIR",
|
|
help=f"Directory containing downloader's temporary files (cleared "
|
|
f"before downloading){env_help(Settings, 'temp_dir')}")
|
|
argument_parser.add_argument(
|
|
"--ext_db_dir", metavar="EXTERNAL_DATABASE_DIR",
|
|
help=f"Directory where new ULS databases should be copied. If "
|
|
f"--ext_db_symlink contains path, this parameter is root directory "
|
|
f"for this path{env_help(Settings, 'ext_db_dir')}")
|
|
argument_parser.add_argument(
|
|
"--ext_db_symlink", metavar="CURRENT_DATABASE_SYMLINK",
|
|
help=f"Symlink in database directory (specified with --ext_db_dir) "
|
|
f"that points to current database. May contain path - if so, this "
|
|
f"path is used for AFC Config override during database validity check"
|
|
f"{env_help(Settings, 'ext_db_symlink')}")
|
|
argument_parser.add_argument(
|
|
"--ext_ras_database", metavar="FILENAME",
|
|
help=f"Externally maintained RAS 'database' file"
|
|
f"{env_help(Settings, 'ext_ras_database')}")
|
|
argument_parser.add_argument(
|
|
"--ras_database", metavar="FILENAME",
|
|
help=f"Where from downloader scripts reads RAS 'database'"
|
|
f"{env_help(Settings, 'ras_database')}")
|
|
argument_parser.add_argument(
|
|
"--fsid_file", metavar="FSID_FILE",
|
|
help=f"FSID file where ULS downloader is expected to read/update it"
|
|
f"{env_help(Settings, 'fsid_file')}")
|
|
argument_parser.add_argument(
|
|
"--service_state_db_dsn", metavar="STATE_DB_DSN",
|
|
help=f"Connection string to database containing FS service state "
|
|
f"(that is used by healthcheck script)"
|
|
f"{env_help(Settings, 'service_state_db_dsn')}")
|
|
argument_parser.add_argument(
|
|
"--service_state_db_create_if_absent", action="store_true",
|
|
help=f"Create state database if absent"
|
|
f"{env_help(Settings, 'service_state_db_create_if_absent')}")
|
|
argument_parser.add_argument(
|
|
"--service_state_db_recreate", action="store_true",
|
|
help=f"Recreate state DB if it exists"
|
|
f"{env_help(Settings, 'service_state_db_recreate')}")
|
|
argument_parser.add_argument(
|
|
"--prometheus_port", metavar="PORT_NUMBER",
|
|
help=f"Port to serve Prometheus metrics on. Default is to not serve "
|
|
f"Prometheus metrics. Ignored (as irrelevant) if specified with "
|
|
f"'--run_once'{env_help(Settings, 'prometheus_port')}")
|
|
argument_parser.add_argument(
|
|
"--statsd_server", metavar="HOST[:PORT]",
|
|
help=f"Send metrics to given StatsD host. Default is not to"
|
|
f"{env_help(Settings, 'prometheus_port')}")
|
|
argument_parser.add_argument(
|
|
"--check_ext_files", metavar="BASE_URL:SUBDIR:FILENAME[,...][;...]",
|
|
action="append", default=[],
|
|
help=f"Verify that given files at given location match files at given "
|
|
f"subdirectory of ULS downloader script, several such "
|
|
f"semicolon-separated groups may be specified (e.g. in environment "
|
|
f"variable){env_help(Settings, 'check_ext_files')}")
|
|
argument_parser.add_argument(
|
|
"--max_change_percent", metavar="MAX_CHANGE_PERCENT",
|
|
help=f"Maximum allowed change since previous database in percents"
|
|
f"{env_help(Settings, 'max_change_percent')}")
|
|
argument_parser.add_argument(
|
|
"--afc_url", metavar="URL",
|
|
help=f"URL for making trial AFC Requests with new database"
|
|
f"{env_help(Settings, 'afc_url')}")
|
|
argument_parser.add_argument(
|
|
"--afc_parallel", metavar="NUMBER",
|
|
help=f"Number of parallel AFC Request to make during verifying new "
|
|
f"database against AFC Engine{env_help(Settings, 'afc_parallel')}")
|
|
argument_parser.add_argument(
|
|
"--rcache_url", metavar="URL",
|
|
help=f"URL for spatial invalidation{env_help(Settings, 'rcache_url')}")
|
|
argument_parser.add_argument(
|
|
"--rcache_enabled", metavar="TRUE/FALSE",
|
|
help=f"FALSE to disable spatial invalidation (even if URL specified)"
|
|
f"{env_help(Settings, 'rcache_enabled')}")
|
|
argument_parser.add_argument(
|
|
"--delay_hr", metavar="DELAY_HR",
|
|
help=f"Delay before invocation in hours"
|
|
f"{env_help(Settings, 'delay_hr')}")
|
|
argument_parser.add_argument(
|
|
"--interval_hr", metavar="INTERVAL_HR",
|
|
help=f"Download interval in hours{env_help(Settings, 'interval_hr')}")
|
|
argument_parser.add_argument(
|
|
"--timeout_hr", metavar="TIMEOUT_HR",
|
|
help=f"Download script execution timeout in hours"
|
|
f"{env_help(Settings, 'timeout_hr')}")
|
|
argument_parser.add_argument(
|
|
"--nice", action="store_true",
|
|
help=f"Run download script on nice (low) priority"
|
|
f"{env_help(Settings, 'nice')}")
|
|
argument_parser.add_argument(
|
|
"--verbose", action="store_true",
|
|
help=f"Print detailed log information{env_help(Settings, 'verbose')}")
|
|
argument_parser.add_argument(
|
|
"--run_once", action="store_true",
|
|
help=f"Run download once and exit{env_help(Settings, 'run_once')}")
|
|
argument_parser.add_argument(
|
|
"--force", action="store_true",
|
|
help=f"Force database update even if it is not noticeably changed or "
|
|
f"not passed validity check{env_help(Settings, 'force')}")
|
|
|
|
settings: Settings = \
|
|
cast(Settings, merge_args(settings_class=Settings,
|
|
args=argument_parser.parse_args(argv)))
|
|
|
|
try:
|
|
setup_logging(verbose=settings.verbose)
|
|
|
|
if settings.run_once and (settings.prometheus_port is not None):
|
|
logging.warning(
|
|
"There is no point in using Prometheus in run-once mode. Use "
|
|
"--statsd_server if metrics are necessary")
|
|
settings.prometheus_port = None
|
|
|
|
print_args(settings)
|
|
|
|
state_db = StateDb(db_dsn=settings.service_state_db_dsn)
|
|
state_db.check_server()
|
|
if settings.service_state_db_create_if_absent:
|
|
state_db.create_db(
|
|
recreate_tables=settings.service_state_db_recreate)
|
|
|
|
status_updater = \
|
|
StatusUpdater(state_db=state_db,
|
|
prometheus_port=settings.prometheus_port,
|
|
statsd_server=settings.statsd_server)
|
|
|
|
if not state_db.read_milestone(DownloaderMilestone.ServiceBirth):
|
|
status_updater.milestone(DownloaderMilestone.ServiceBirth)
|
|
|
|
error_if(not os.path.isfile(settings.download_script),
|
|
f"Download script '{settings.download_script}' not found")
|
|
full_ext_db_dir = \
|
|
os.path.join(settings.ext_db_dir,
|
|
os.path.dirname(settings.ext_db_symlink))
|
|
error_if(
|
|
not os.path.isdir(full_ext_db_dir),
|
|
f"External database directory '{full_ext_db_dir}' not found")
|
|
|
|
executor = LoggingExecutor()
|
|
|
|
ext_params_file_checker = \
|
|
ExtParamFilesChecker(
|
|
ext_files_arg=settings.check_ext_files,
|
|
script_dir=os.path.dirname(settings.download_script),
|
|
status_updater=status_updater)
|
|
|
|
current_uls_file = os.path.join(settings.ext_db_dir,
|
|
settings.ext_db_symlink)
|
|
|
|
uls_file_checker = \
|
|
UlsFileChecker(
|
|
max_change_percent=settings.max_change_percent,
|
|
afc_url=settings.afc_url, afc_parallel=settings.afc_parallel,
|
|
regions=None if settings.region is None
|
|
else settings.region.split(":"),
|
|
executor=executor, status_updater=status_updater)
|
|
|
|
rcache_settings = \
|
|
RcacheClientSettings(
|
|
enabled=settings.rcache_enabled and bool(settings.rcache_url),
|
|
service_url=settings.rcache_url, postgres_dsn=None,
|
|
rmq_dsn=None)
|
|
rcache_settings.validate_for(rcache=True)
|
|
rcache: Optional[RcacheClient] = \
|
|
RcacheClient(rcache_settings) if rcache_settings.enabled else None
|
|
|
|
status_updater.milestone(DownloaderMilestone.ServiceStart)
|
|
|
|
if settings.delay_hr and (not settings.run_once):
|
|
logging.info(f"Delaying by {settings.delay_hr} hours")
|
|
time.sleep(settings.delay_hr * 3600)
|
|
|
|
if settings.run_once:
|
|
logging.info("Running healthcheck script")
|
|
executor.execute(
|
|
[sys.executable, HEALTHCHECK_SCRIPT, "--force_success"],
|
|
timeout_sec=200, fail_on_error=True)
|
|
|
|
# Temporary name of new ULS database in external directory
|
|
temp_uls_file_name: Optional[str] = None
|
|
|
|
while True:
|
|
err_msg: Optional[str] = None
|
|
completed = False
|
|
logging.info("Starting ULS download")
|
|
download_start_time = datetime.datetime.now()
|
|
status_updater.milestone(DownloaderMilestone.DownloadStart)
|
|
try:
|
|
has_previous = os.path.islink(current_uls_file)
|
|
if has_previous:
|
|
logging.info(
|
|
f"Current database: '{os.readlink(current_uls_file)}'")
|
|
|
|
extract_fsid_table(uls_file=current_uls_file,
|
|
fsid_file=settings.fsid_file,
|
|
executor=executor)
|
|
|
|
# Clear some directories from stuff left from previous
|
|
# downloads
|
|
for dir_to_clean in [settings.result_dir, settings.temp_dir]:
|
|
if dir_to_clean and os.path.isdir(dir_to_clean):
|
|
executor.execute(f"rm -rf {dir_to_clean}/*",
|
|
timeout_sec=100, fail_on_error=True)
|
|
|
|
logging.info("Copying RAS database")
|
|
shutil.copyfile(settings.ext_ras_database,
|
|
settings.ras_database)
|
|
|
|
logging.info("Checking if external parameter files changed")
|
|
ext_params_file_checker.check()
|
|
|
|
# Issue download script
|
|
cmdline_args: List[str] = []
|
|
if settings.nice and (os.name == "posix"):
|
|
cmdline_args.append("nice")
|
|
cmdline_args.append(settings.download_script)
|
|
if settings.region:
|
|
cmdline_args += ["--region", settings.region]
|
|
if settings.download_script_args:
|
|
cmdline_args.append(settings.download_script_args)
|
|
logging.info(f"Starting {' '.join(cmdline_args)}")
|
|
executor.execute(
|
|
" ".join(cmdline_args) if settings.download_script_args
|
|
else cmdline_args,
|
|
cwd=os.path.dirname(settings.download_script),
|
|
timeout_sec=settings.timeout_hr * 3600,
|
|
fail_on_error=True)
|
|
|
|
# Find out name of new ULS file
|
|
uls_files = glob.glob(os.path.join(settings.result_dir,
|
|
ULS_FILEMASK))
|
|
if len(uls_files) < 1:
|
|
raise ProcessingException(
|
|
"ULS file not generated by ULS downloader")
|
|
if len(uls_files) > 1:
|
|
raise ProcessingException(
|
|
f"More than one {ULS_FILEMASK} file generated by ULS "
|
|
f"downloader. What gives?")
|
|
|
|
# Check what regions were updated
|
|
new_uls_file = uls_files[0]
|
|
logging.info(f"ULS file '{new_uls_file}' created. It will "
|
|
f"undergo some inspection")
|
|
new_uls_identity = get_uls_identity(new_uls_file)
|
|
if new_uls_identity is None:
|
|
raise ProcessingException(
|
|
"Generated ULS file does not contain identity "
|
|
"information")
|
|
status_updater.milestone(DownloaderMilestone.DownloadSuccess)
|
|
|
|
updated_regions = set(new_uls_identity.keys())
|
|
if has_previous:
|
|
for current_region, current_identity in \
|
|
(get_uls_identity(current_uls_file) or {}).items():
|
|
if current_identity and \
|
|
(new_uls_identity.get(current_region) ==
|
|
current_identity):
|
|
updated_regions.remove(current_region)
|
|
if updated_regions:
|
|
logging.info(f"Updated regions: "
|
|
f"{', '.join(sorted(updated_regions))}")
|
|
|
|
# If anything was updated - do the update routine
|
|
if updated_regions or settings.force:
|
|
# Embed updated FSID table to the new database
|
|
logging.info("Embedding FSID table")
|
|
executor.execute(
|
|
[FSID_TOOL, "embed", new_uls_file, settings.fsid_file],
|
|
fail_on_error=True)
|
|
|
|
temp_uls_file_name = \
|
|
os.path.join(full_ext_db_dir,
|
|
"temp_" + os.path.basename(new_uls_file))
|
|
# Copy new ULS file to external directory
|
|
logging.debug(
|
|
f"Copying '{new_uls_file}' to '{temp_uls_file_name}'")
|
|
shutil.copy2(new_uls_file, temp_uls_file_name)
|
|
|
|
db_diff = DbDiff(prev_filename=current_uls_file,
|
|
new_filename=temp_uls_file_name,
|
|
executor=executor) \
|
|
if has_previous else None
|
|
if settings.force or \
|
|
uls_file_checker.valid(
|
|
base_dir=settings.ext_db_dir,
|
|
new_filename=os.path.join(
|
|
os.path.dirname(settings.ext_db_symlink),
|
|
os.path.basename(temp_uls_file_name)),
|
|
db_diff=db_diff):
|
|
if settings.force:
|
|
status_updater.status_check(CheckType.FsDatabase,
|
|
None)
|
|
# Renaming database
|
|
permanent_uls_file_name = \
|
|
os.path.join(full_ext_db_dir,
|
|
os.path.basename(new_uls_file))
|
|
os.rename(temp_uls_file_name, permanent_uls_file_name)
|
|
# Retargeting symlink
|
|
update_uls_file(
|
|
uls_dir=full_ext_db_dir,
|
|
uls_file=os.path.basename(new_uls_file),
|
|
symlink=os.path.basename(settings.ext_db_symlink),
|
|
executor=executor)
|
|
if rcache and (db_diff is not None) and \
|
|
db_diff.diff_tiles:
|
|
tile_list = \
|
|
"<" + \
|
|
">, <".join(tile.short_str() for tile in
|
|
db_diff.diff_tiles[: 1000]) + \
|
|
">"
|
|
logging.info(f"Requesting invalidation of the "
|
|
f"following tiles: {tile_list}")
|
|
rcache.rcache_spatial_invalidate(
|
|
tiles=db_diff.diff_tiles)
|
|
|
|
# Update data change times (for health checker)
|
|
status_updater.milestone(
|
|
milestone=DownloaderMilestone.RegionChanged,
|
|
updated_regions=updated_regions,
|
|
all_regions=list(new_uls_identity.keys()))
|
|
|
|
status_updater.milestone(DownloaderMilestone.DbUpdated)
|
|
completed = True
|
|
else:
|
|
logging.info("FS data is identical to previous. No update "
|
|
"will be done")
|
|
except (OSError, subprocess.SubprocessError, ProcessingException) \
|
|
as ex:
|
|
err_msg = str(ex)
|
|
logging.error(f"Download failed: {ex}")
|
|
finally:
|
|
exec_output = executor.get_output()
|
|
if err_msg:
|
|
exec_output = f"{exec_output.rstrip()}\n{err_msg}\n"
|
|
state_db.write_log(log_type=LogType.Last, log=exec_output)
|
|
if completed:
|
|
state_db.write_log(log_type=LogType.LastCompleted,
|
|
log=exec_output)
|
|
if err_msg:
|
|
state_db.write_log(log_type=LogType.LastFailed,
|
|
log=exec_output)
|
|
try:
|
|
if temp_uls_file_name and \
|
|
os.path.isfile(temp_uls_file_name):
|
|
logging.debug(f"Removing '{temp_uls_file_name}'")
|
|
os.unlink(temp_uls_file_name)
|
|
except OSError as ex:
|
|
logging.error(f"Attempt to remove temporary ULS database "
|
|
f"'{temp_uls_file_name}' failed: {ex}")
|
|
|
|
# Prepare to sleep
|
|
download_duration_sec = \
|
|
(datetime.datetime.now() - download_start_time).total_seconds()
|
|
logging.info(
|
|
f"Download took {download_duration_sec // 60} minutes")
|
|
if settings.run_once:
|
|
break
|
|
remaining_seconds = \
|
|
max(0, settings.interval_hr * 3600 - download_duration_sec)
|
|
if remaining_seconds:
|
|
next_attempt_at = \
|
|
(datetime.datetime.now() +
|
|
datetime.timedelta(seconds=remaining_seconds)).isoformat()
|
|
logging.info(f"Next download at {next_attempt_at}")
|
|
time.sleep(remaining_seconds)
|
|
except KeyboardInterrupt:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv[1:])
|