Refactor Barman scripts and add a sub-command to switch Barman config (#3016)

We currently have a script named `patroni_barman_recover` in Patroni, which is intended to be used as a custom bootstrap method, or as a custom replica creation method.

Now there is need of one more Barman related script in Patroni to handle switching of config models in Barman upon `on_role_change` events.

However, instead of creating another Patroni script, let's say `patroni_barman_config_switch`, and duplicating a lot of logic in the code, we decided to refactor the code so:

* Instead of two separate scripts (`patroni_barman_recover` and `patroni_barman_config_switch`), we have a single script (`patroni_barman`) with 2 sub-commands (`recover` and `config-switch`)

This is the overview of changes that have been performed:

* File `patroni.scripts.barman_recover` has been removed, and its logic has been split into a few files:
  * `patroni.scripts.barman.cli`: handles the entrypoint of the new `patroni_barman` command, exposing the argument parser and calling the appropriate functions depending on the sub-command
  * `patroni.scripts.barman.utils`: implements utilitary enums, functions and classes wich can be used by `cli` and by sub-commands implementation:
    * retry mechanism
    * logging set up
    * communication with pg-backup-api
  * `patroni.scripts.barman.recover`: implements the `recover` sub-command only
* File `patroni.tests.test_barman_recover` has been renamed as `patroni.tests.test_barman`
* File `patroni.scripts.barman.config_switch` was created to implement the `config-switch` sub-command only
* `setup.py` has been changed so it generates a `patroni_barman` application instead of `patroni_barman_recover`
* Docs and unit tests were updated accordingly

References: PAT-154.
This commit is contained in:
Israel
2024-03-20 05:04:55 -03:00
committed by GitHub
parent a8cfd46801
commit 014777b20a
12 changed files with 1654 additions and 843 deletions

View File

@@ -34,6 +34,7 @@ Currently supported PostgreSQL versions: 9.3 to 16.
kubernetes
citus
existing_data
tools_integration
security
ha_multi_dc
faq

View File

@@ -1,3 +1,5 @@
.. _replica_imaging_and_bootstrap:
Replica imaging and bootstrap
=============================
@@ -79,14 +81,13 @@ As an example, you are able to bootstrap a fresh Patroni cluster from a Barman b
method: barman
barman:
keep_existing_recovery_conf: true
command: patroni_barman_recover
api-url: https://barman-host:7480
command: patroni_barman --api-url https://barman-host:7480 recover
barman-server: my_server
ssh-command: ssh postgres@patroni-host
.. note::
``patroni_barman_recover`` requires that you have both Barman and ``pg-backup-api`` configured in the Barman host, so it can execute a remote ``barman recover`` through the backup API.
The above example uses a subset of the available parameters. You can get more information running ``patroni_barman_recover --help``.
``patroni_barman recover`` requires that you have both Barman and ``pg-backup-api`` configured in the Barman host, so it can execute a remote ``barman recover`` through the backup API.
The above example uses a subset of the available parameters. You can get more information running ``patroni_barman recover --help``.
.. _custom_replica_creation:
@@ -150,16 +151,15 @@ example: Barman
- barman
- basebackup
barman:
command: patroni_barman_recover
api-url: https://barman-host:7480
command: patroni_barman --api-url https://barman-host:7480 recover
barman-server: my_server
ssh-command: ssh postgres@patroni-host
basebackup:
max-rate: '100M'
.. note::
``patroni_barman_recover`` requires that you have both Barman and ``pg-backup-api`` configured in the Barman host, so it can execute a remote ``barman recover`` through the backup API.
The above example uses a subset of the available parameters. You can get more information running ``patroni_barman_recover --help``.
``patroni_barman recover`` requires that you have both Barman and ``pg-backup-api`` configured in the Barman host, so it can execute a remote ``barman recover`` through the backup API.
The above example uses a subset of the available parameters. You can get more information running ``patroni_barman recover --help``.
The ``create_replica_methods`` defines available replica creation methods and the order of executing them. Patroni will
stop on the first one that returns 0. Each method should define a separate section in the configuration file, listing the command

View File

@@ -0,0 +1,62 @@
Integration with other tools
============================
Patroni is able to integrate with other tools in your stack. In this section you
will find a list of examples, which although not an exhaustive list, might
provide you with ideas on how Patroni can integrate with other tools.
Barman
------
Patroni delivers an application named ``patroni_barman`` which has logic to
communicate with ``pg-backup-api``, so you are able to perform Barman operations
remotely.
This application currently has a couple of sub-commands: ``recover`` and
``config-switch``.
patroni_barman recover
^^^^^^^^^^^^^^^^^^^^^^
The ``recover`` sub-command can be used as a custom bootstrap or custom replica
creation method. You can find more information about that in
:ref:`replica_imaging_and_bootstrap`.
patroni_barman config-switch
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The ``config-switch`` sub-command is designed to be used as an ``on_role_change``
callback in Patroni. As an example, assume you are streaming WALs from your
current primary to your Barman host. In the event of a failover in the cluster
you might want to start streaming WALs from the new primary. You can accomplish
this by using ``patroni_barman config-switch`` as the ``on_role_change`` callback.
.. note::
That sub-command relies on the ``barman config-switch`` command, which is in
charge of overriding the configuration of a Barman server by applying a
pre-defined model on top of it. This command is available since Barman 3.10.
Please consult the Barman documentation for more details.
This is an example of how you can configure Patroni to apply a configuration
model in case this Patroni node is promoted to primary:
.. code:: YAML
postgresql:
callbacks:
on_role_change: >
patroni_barman
--api-url YOUR_API_URL
config-switch
--barman-server YOUR_BARMAN_SERVER_NAME
--barman-model YOUR_BARMAN_MODEL_NAME
--switch-when promoted
.. note::
``patroni_barman config-switch`` requires that you have both Barman and
``pg-backup-api`` configured in the Barman host, so it can execute a remote
``barman config-switch`` through the backup API. Also, it requires that you
have pre-configured Barman models to be applied. The above example uses a
subset of the available parameters. You can get more information running
``patroni_barman config-switch --help``, and by consulting the Barman
documentation.

View File

@@ -0,0 +1 @@
"""Create :mod:`patroni.scripts.barman`."""

View File

@@ -0,0 +1,240 @@
#!/usr/bin/env python
"""Perform operations on Barman through ``pg-backup-api``.
The actual operations are implemented by separate modules. This module only
builds the CLI that makes an interface with the actual commands.
.. note::
See :class:ExitCode` for possible exit codes of this main script.
"""
from argparse import ArgumentParser
from enum import IntEnum
import logging
import sys
from .config_switch import run_barman_config_switch
from .recover import run_barman_recover
from .utils import ApiNotOk, PgBackupApi, set_up_logging
class ExitCode(IntEnum):
"""Possible exit codes of this script.
:cvar NO_COMMAND: if no sub-command of ``patroni_barman`` application has
been selected by the user.
:cvar API_NOT_OK: ``pg-backup-api`` status is not ``OK``.
"""
NO_COMMAND = -1
API_NOT_OK = -2
def main() -> None:
"""Entry point of ``patroni_barman`` application.
Implements the parser for the application and for its sub-commands.
The script exit code may be one of:
* :attr:`ExitCode.NO_COMMAND`: if no sub-command was specified in the
``patroni_barman`` call;
* :attr:`ExitCode.API_NOT_OK`: if ``pg-backup-api`` is not correctly up and
running;
* Value returned by :func:`~patroni.scripts.barman.config_switch.run_barman_config_switch`,
if running ``patroni_barman config-switch``;
* Value returned by :func:`~patroni.scripts.barman.recover.run_barman_recover`,
if running ``patroni_barman recover``.
The called sub-command is expected to exit execution once finished using
its own set of exit codes.
"""
parser = ArgumentParser(
description=(
"Wrapper application for pg-backup-api. Communicate with the API "
"running at the given URL to perform remote Barman operations."
),
)
parser.add_argument(
"--api-url",
type=str,
required=True,
help="URL to reach the pg-backup-api, e.g. 'http://localhost:7480'",
dest="api_url",
)
parser.add_argument(
"--cert-file",
type=str,
required=False,
help="Certificate to authenticate against the API, if required.",
dest="cert_file",
)
parser.add_argument(
"--key-file",
type=str,
required=False,
help="Certificate key to authenticate against the API, if required.",
dest="key_file",
)
parser.add_argument(
"--retry-wait",
type=int,
required=False,
default=2,
help="How long in seconds to wait before retrying a failed "
"pg-backup-api request (default: '%(default)s')",
dest="retry_wait",
)
parser.add_argument(
"--max-retries",
type=int,
required=False,
default=5,
help="Maximum number of retries when receiving malformed responses "
"from the pg-backup-api (default: '%(default)s')",
dest="max_retries",
)
parser.add_argument(
"--log-file",
type=str,
required=False,
help="File where to log messages produced by this application, if any.",
dest="log_file",
)
subparsers = parser.add_subparsers(title="Sub-commands")
recover_parser = subparsers.add_parser(
"recover",
help="Remote 'barman recover'",
description="Restore a Barman backup of a given Barman server"
)
recover_parser.add_argument(
"--barman-server",
type=str,
required=True,
help="Name of the Barman server from which to restore the backup.",
dest="barman_server",
)
recover_parser.add_argument(
"--backup-id",
type=str,
required=False,
default="latest",
help="ID of the Barman backup to be restored. You can use any value "
"supported by 'barman recover' command "
"(default: '%(default)s')",
dest="backup_id",
)
recover_parser.add_argument(
"--ssh-command",
type=str,
required=True,
help="Value to be passed as '--remote-ssh-command' to 'barman recover'.",
dest="ssh_command",
)
recover_parser.add_argument(
"--data-directory",
"--datadir",
type=str,
required=True,
help="Destination path where to restore the barman backup in the "
"local host.",
dest="data_directory",
)
recover_parser.add_argument(
"--loop-wait",
type=int,
required=False,
default=10,
help="How long to wait before checking again the status of the "
"recovery process, in seconds. Use higher values if your "
"recovery is expected to take long (default: '%(default)s')",
dest="loop_wait",
)
recover_parser.set_defaults(func=run_barman_recover)
config_switch_parser = subparsers.add_parser(
"config-switch",
help="Remote 'barman config-switch'",
description="Switch the configuration of a given Barman server. "
"Intended to be used as a 'on_role_change' callback."
)
config_switch_parser.add_argument(
"action",
type=str,
choices=["on_role_change"],
help="Name of the callback (automatically filled by Patroni)",
)
config_switch_parser.add_argument(
"role",
type=str,
choices=["master", "primary", "promoted", "standby_leader", "replica",
"demoted"],
help="Name of the new role of this node (automatically filled by "
"Patroni)",
)
config_switch_parser.add_argument(
"cluster",
type=str,
help="Name of the Patroni cluster involved in the callback "
"(automatically filled by Patroni)",
)
config_switch_parser.add_argument(
"--barman-server",
type=str,
required=True,
help="Name of the Barman server which config is to be switched.",
dest="barman_server",
)
group = config_switch_parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--barman-model",
type=str,
help="Name of the Barman config model to be applied to the server.",
dest="barman_model",
)
group.add_argument(
"--reset",
action="store_true",
help="Unapply the currently active model for the server, if any.",
dest="reset",
)
config_switch_parser.add_argument(
"--switch-when",
type=str,
required=True,
default="promoted",
choices=["promoted", "demoted", "always"],
help="Controls under which circumstances the 'on_role_change' callback "
"should actually switch config in Barman. 'promoted' means the "
"'role' is either 'master', 'primary' or 'promoted'. 'demoted' "
"means the 'role' is either 'replica' or 'demoted' "
"(default: '%(default)s')",
dest="switch_when",
)
config_switch_parser.set_defaults(func=run_barman_config_switch)
args, _ = parser.parse_known_args()
set_up_logging(args.log_file)
if not hasattr(args, "func"):
parser.print_help()
sys.exit(ExitCode.NO_COMMAND)
api = None
try:
api = PgBackupApi(args.api_url, args.cert_file, args.key_file,
args.retry_wait, args.max_retries)
except ApiNotOk as exc:
logging.error("pg-backup-api is not working: %r", exc)
sys.exit(ExitCode.API_NOT_OK)
sys.exit(args.func(api, args))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,146 @@
#!/usr/bin/env python
"""Implements ``patroni_barman config-switch`` sub-command.
Apply a Barman configuration model through ``pg-backup-api``.
This sub-command is specially useful as a ``on_role_change`` callback to change
Barman configuration in response to failovers and switchovers. Check the output
of ``--help`` to understand the parameters supported by the sub-command.
It requires that you have previously configured a Barman server and Barman
config models, and that you have ``pg-backup-api`` configured and running in
the same host as Barman.
Refer to :class:`ExitCode` for possible exit codes of this sub-command.
"""
from argparse import Namespace
from enum import IntEnum
import logging
import time
from typing import Optional, TYPE_CHECKING
from .utils import OperationStatus, RetriesExceeded
if TYPE_CHECKING: # pragma: no cover
from .utils import PgBackupApi
class ExitCode(IntEnum):
"""Possible exit codes of this script.
:cvar CONFIG_SWITCH_DONE: config switch was successfully performed.
:cvar CONFIG_SWITCH_SKIPPED: if the execution was skipped because of not
matching user expectations.
:cvar CONFIG_SWITCH_FAILED: config switch faced an issue.
:cvar HTTP_ERROR: an error has occurred while communicating with
``pg-backup-api``
:cvar INVALID_ARGS: an invalid set of arguments has been given to the
operation.
"""
CONFIG_SWITCH_DONE = 0
CONFIG_SWITCH_SKIPPED = 1
CONFIG_SWITCH_FAILED = 2
HTTP_ERROR = 3
INVALID_ARGS = 4
def _should_skip_switch(args: Namespace) -> bool:
"""Check if we should skip the config switch operation.
:param args: arguments received from the command-line of
``patroni_barman config-switch`` command.
:returns: if the operation should be skipped.
"""
if args.switch_when == "promoted":
return args.role not in {"master", "primary", "promoted"}
if args.switch_when == "demoted":
return args.role not in {"replica", "demoted"}
return False
def _switch_config(api: "PgBackupApi", barman_server: str,
barman_model: Optional[str], reset: Optional[bool]) -> int:
"""Switch configuration of Barman server through ``pg-backup-api``.
.. note::
If requests to ``pg-backup-api`` fail recurrently or we face HTTP
errors, then exit with :attr:`ExitCode.HTTP_ERROR`.
:param api: a :class:`PgBackupApi` instance to handle communication with
the API.
:param barman_server: name of the Barman server which config is to be
switched.
:param barman_model: name of the Barman model to be applied to the server,
if any.
:param reset: ``True`` if you would like to unapply the currently active
model for the server, if any.
:returns: the return code to be used when exiting the ``patroni_barman``
application. Refer to :class:`ExitCode`.
"""
operation_id = None
try:
operation_id = api.create_config_switch_operation(
barman_server,
barman_model,
reset,
)
except RetriesExceeded as exc:
logging.error("An issue was faced while trying to create a config "
"switch operation: %r", exc)
return ExitCode.HTTP_ERROR
logging.info("Created the config switch operation with ID %s",
operation_id)
status = None
while True:
try:
status = api.get_operation_status(barman_server, operation_id)
except RetriesExceeded:
logging.error("Maximum number of retries exceeded, exiting.")
return ExitCode.HTTP_ERROR
if status != OperationStatus.IN_PROGRESS:
break
logging.info("Config switch operation %s is still in progress",
operation_id)
time.sleep(5)
if status == OperationStatus.DONE:
logging.info("Config switch operation finished successfully.")
return ExitCode.CONFIG_SWITCH_DONE
else:
logging.error("Config switch operation failed.")
return ExitCode.CONFIG_SWITCH_FAILED
def run_barman_config_switch(api: "PgBackupApi", args: Namespace) -> int:
"""Run a remote ``barman config-switch`` through the ``pg-backup-api``.
:param api: a :class:`PgBackupApi` instance to handle communication with
the API.
:param args: arguments received from the command-line of
``patroni_barman config-switch`` command.
:returns: the return code to be used when exiting the ``patroni_barman``
application. Refer to :class:`ExitCode`.
"""
if _should_skip_switch(args):
logging.info("Config switch operation was skipped (role=%s, "
"switch_when=%s).", args.role, args.switch_when)
return ExitCode.CONFIG_SWITCH_SKIPPED
if not bool(args.barman_model) ^ bool(args.reset):
logging.error("One, and only one among 'barman_model' ('%s') and "
"'reset' ('%s') should be given", args.barman_model, args.reset)
return ExitCode.INVALID_ARGS
return _switch_config(api, args.barman_server, args.barman_model, args.reset)

View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python
"""Implements ``patroni_barman recover`` sub-command.
Restore a Barman backup to the local node through ``pg-backup-api``.
This sub-command can be used both as a custom bootstrap method, and as a custom
create replica method. Check the output of ``--help`` to understand the
parameters supported by the sub-command. ``--datadir`` is a special parameter
and it is automatically filled by Patroni in both cases.
It requires that you have previously configured a Barman server, and that you
have ``pg-backup-api`` configured and running in the same host as Barman.
Refer to :class:`ExitCode` for possible exit codes of this sub-command.
"""
from argparse import Namespace
from enum import IntEnum
import logging
import time
from typing import TYPE_CHECKING
from .utils import OperationStatus, RetriesExceeded
if TYPE_CHECKING: # pragma: no cover
from .utils import PgBackupApi
class ExitCode(IntEnum):
"""Possible exit codes of this script.
:cvar RECOVERY_DONE: backup was successfully restored.
:cvar RECOVERY_FAILED: recovery of the backup faced an issue.
:cvar HTTP_ERROR: an error has occurred while communicating with
``pg-backup-api``
"""
RECOVERY_DONE = 0
RECOVERY_FAILED = 1
HTTP_ERROR = 2
def _restore_backup(api: "PgBackupApi", barman_server: str, backup_id: str,
ssh_command: str, data_directory: str,
loop_wait: int) -> int:
"""Restore the configured Barman backup through ``pg-backup-api``.
.. note::
If requests to ``pg-backup-api`` fail recurrently or we face HTTP
errors, then exit with :attr:`ExitCode.HTTP_ERROR`.
:param api: a :class:`PgBackupApi` instance to handle communication with
the API.
:param barman_server: name of the Barman server which backup is to be
restored.
:param backup_id: ID of the backup from the Barman server.
:param ssh_command: SSH command to connect from the Barman host to the
target host.
:param data_directory: path to the Postgres data directory where to restore
the backup in.
:param loop_wait: how long in seconds to wait before checking again the
status of the recovery process. Higher values are useful for backups
that are expected to take longer to restore.
:returns: the return code to be used when exiting the ``patroni_barman``
application. Refer to :class:`ExitCode`.
"""
operation_id = None
try:
operation_id = api.create_recovery_operation(
barman_server,
backup_id,
ssh_command,
data_directory,
)
except RetriesExceeded as exc:
logging.error("An issue was faced while trying to create a recovery "
"operation: %r", exc)
return ExitCode.HTTP_ERROR
logging.info("Created the recovery operation with ID %s", operation_id)
status = None
while True:
try:
status = api.get_operation_status(barman_server, operation_id)
except RetriesExceeded:
logging.error("Maximum number of retries exceeded, exiting.")
return ExitCode.HTTP_ERROR
if status != OperationStatus.IN_PROGRESS:
break
logging.info("Recovery operation %s is still in progress",
operation_id)
time.sleep(loop_wait)
if status == OperationStatus.DONE:
logging.info("Recovery operation finished successfully.")
return ExitCode.RECOVERY_DONE
else:
logging.error("Recovery operation failed.")
return ExitCode.RECOVERY_FAILED
def run_barman_recover(api: "PgBackupApi", args: Namespace) -> int:
"""Run a remote ``barman recover`` through the ``pg-backup-api``.
:param api: a :class:`PgBackupApi` instance to handle communication with
the API.
:param args: arguments received from the command-line of
``patroni_barman recover`` command.
:returns: the return code to be used when exiting the ``patroni_barman``
application. Refer to :class:`ExitCode`.
"""
return _restore_backup(api, args.barman_server, args.backup_id,
args.ssh_command, args.data_directory,
args.loop_wait)

View File

@@ -0,0 +1,308 @@
#!/usr/bin/env python
"""Utilitary stuff to be used by Barman related scripts."""
from enum import IntEnum
import json
import logging
from typing import Any, Callable, Dict, Optional, Tuple, Type, Union
import time
from urllib.parse import urljoin
from urllib3 import PoolManager
from urllib3.exceptions import MaxRetryError
from urllib3.response import HTTPResponse
class RetriesExceeded(Exception):
"""Maximum number of retries exceeded."""
def retry(exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]]) \
-> Any:
"""Retry an operation n times if expected *exceptions* are faced.
.. note::
Should be used as a decorator of a class' method as it expects the
first argument to be a class instance.
The class which method is going to be decorated should contain a couple
attributes:
* ``max_retries``: maximum retry attempts before failing;
* ``retry_wait``: how long in seconds to wait before retrying.
:param exceptions: exceptions that could trigger a retry attempt.
:raises:
:exc:`RetriesExceeded`: if the maximum number of attempts has been
exhausted.
"""
def decorator(func: Callable[..., Any]) -> Any:
def inner_func(instance: object, *args: Any, **kwargs: Any) -> Any:
times: int = getattr(instance, "max_retries")
retry_wait: int = getattr(instance, "retry_wait")
method_name = f"{instance.__class__.__name__}.{func.__name__}"
attempt = 1
while attempt <= times:
try:
return func(instance, *args, **kwargs)
except exceptions as exc:
logging.warning("Attempt %d of %d on method %s failed "
"with %r.",
attempt, times, method_name, exc)
attempt += 1
time.sleep(retry_wait)
raise RetriesExceeded("Maximum number of retries exceeded for "
f"method {method_name}.")
return inner_func
return decorator
def set_up_logging(log_file: Optional[str] = None) -> None:
"""Set up logging to file, if *log_file* is given, otherwise to console.
:param log_file: file where to log messages, if any.
"""
logging.basicConfig(filename=log_file, level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s")
class OperationStatus(IntEnum):
"""Possible status of ``pg-backup-api`` operations.
:cvar IN_PROGRESS: the operation is still ongoing.
:cvar FAILED: the operation failed.
:cvar DONE: the operation finished successfully.
"""
IN_PROGRESS = 0
FAILED = 1
DONE = 2
class ApiNotOk(Exception):
"""The ``pg-backup-api`` is not currently up and running."""
class PgBackupApi:
"""Facilities for communicating with the ``pg-backup-api``.
:ivar api_url: base URL to reach the ``pg-backup-api``.
:ivar cert_file: certificate to authenticate against the ``pg-backup-api``,
if required.
:ivar key_file: certificate key to authenticate against the
``pg-backup-api``, if required.
:ivar retry_wait: how long in seconds to wait before retrying a failed
request to the ``pg-backup-api``.
:ivar max_retries: maximum number of retries when ``pg-backup-api`` returns
malformed responses.
:ivar http: a HTTP pool manager for performing web requests.
"""
def __init__(self, api_url: str, cert_file: Optional[str],
key_file: Optional[str], retry_wait: int,
max_retries: int) -> None:
"""Create a new instance of :class:`BarmanRecover`.
Make sure the ``pg-backup-api`` is reachable and running fine.
.. note::
When using any method which send requests to the API, be aware that
they might raise :exc:`RetriesExceeded` upon HTTP request errors.
Similarly, when instantiating this class you may face an
:exc:`ApiNotOk`, if the API is down or returns a bogus status.
:param api_url: base URL to reach the ``pg-backup-api``.
:param cert_file: certificate to authenticate against the
``pg-backup-api``, if required.
:param key_file: certificate key to authenticate against the
``pg-backup-api``, if required.
:param retry_wait: how long in seconds to wait before retrying a failed
request to the ``pg-backup-api``.
:param max_retries: maximum number of retries when ``pg-backup-api``
returns malformed responses.
"""
self.api_url = api_url
self.cert_file = cert_file
self.key_file = key_file
self.retry_wait = retry_wait
self.max_retries = max_retries
self._http = PoolManager(cert_file=cert_file, key_file=key_file)
self._ensure_api_ok()
def _build_full_url(self, url_path: str) -> str:
"""Build the full URL by concatenating *url_path* with the base URL.
:param url_path: path to be accessed in the ``pg-backup-api``.
:returns: the full URL after concatenating.
"""
return urljoin(self.api_url, url_path)
@staticmethod
def _deserialize_response(response: HTTPResponse) -> Any:
"""Retrieve body from *response* as a deserialized JSON object.
:param response: response from which JSON body will be deserialized.
:returns: the deserialized JSON body.
"""
return json.loads(response.data.decode("utf-8"))
@staticmethod
def _serialize_request(body: Any) -> Any:
"""Serialize a request body.
:param body: content of the request body to be serialized.
:returns: the serialized request body.
"""
return json.dumps(body).encode("utf-8")
def _get_request(self, url_path: str) -> Any:
"""Perform a ``GET`` request to *url_path*.
:param url_path: URL to perform the ``GET`` request against.
:returns: the deserialized response body.
:raises:
:exc:`RetriesExceeded`: raised from the corresponding :mod:`urllib3`
exception.
"""
url = self._build_full_url(url_path)
response = None
try:
response = self._http.request("GET", url)
except MaxRetryError as exc:
msg = f"Failed to perform a GET request to {url}"
raise RetriesExceeded(msg) from exc
return self._deserialize_response(response)
def _post_request(self, url_path: str, body: Any) -> Any:
"""Perform a ``POST`` request to *url_path* serializing *body* as JSON.
:param url_path: URL to perform the ``POST`` request against.
:param body: the body to be serialized as JSON and sent in the request.
:returns: the deserialized response body.
:raises:
:exc:`RetriesExceeded`: raised from the corresponding :mod:`urllib3`
exception.
"""
body = self._serialize_request(body)
url = self._build_full_url(url_path)
response = None
try:
response = self._http.request("POST",
url,
body=body,
headers={
"Content-Type": "application/json"
})
except MaxRetryError as exc:
msg = f"Failed to perform a POST request to {url} with {body}"
raise RetriesExceeded(msg) from exc
return self._deserialize_response(response)
def _ensure_api_ok(self) -> None:
"""Ensure ``pg-backup-api`` is reachable and ``OK``.
:raises:
:exc:`ApiNotOk`: if ``pg-backup-api`` status is not ``OK``.
"""
response = self._get_request("status")
if response != "OK":
msg = (
"pg-backup-api is currently not up and running at "
f"{self.api_url}: {response}"
)
raise ApiNotOk(msg)
@retry(KeyError)
def get_operation_status(self, barman_server: str,
operation_id: str) -> OperationStatus:
"""Get status of the operation which ID is *operation_id*.
:param barman_server: name of the Barman server related with the
operation.
:param operation_id: ID of the operation to be checked.
:returns: the status of the operation.
"""
response = self._get_request(
f"servers/{barman_server}/operations/{operation_id}",
)
status = response["status"]
return OperationStatus[status]
@retry(KeyError)
def create_recovery_operation(self, barman_server: str, backup_id: str,
ssh_command: str, data_directory: str) -> str:
"""Create a recovery operation on the ``pg-backup-api``.
:param barman_server: name of the Barman server which backup is to be
restored.
:param backup_id: ID of the backup from the Barman server.
:param ssh_command: SSH command to connect from the Barman host to the
target host.
:param data_directory: path to the Postgres data directory where to
restore the backup at.
:returns: the ID of the recovery operation that has been created.
"""
response = self._post_request(
f"servers/{barman_server}/operations",
{
"type": "recovery",
"backup_id": backup_id,
"remote_ssh_command": ssh_command,
"destination_directory": data_directory,
},
)
return response["operation_id"]
@retry(KeyError)
def create_config_switch_operation(self, barman_server: str,
barman_model: Optional[str],
reset: Optional[bool]) -> str:
"""Create a config switch operation on the ``pg-backup-api``.
:param barman_server: name of the Barman server which config is to be
switched.
:param barman_model: name of the Barman model to be applied to the
server, if any.
:param reset: ``True`` if you would like to unapply the currently active
model for the server, if any.
:returns: the ID of the config switch operation that has been created.
"""
body: Dict[str, Any] = {"type": "config_switch"}
if barman_model:
body["model_name"] = barman_model
elif reset:
body["reset"] = reset
response = self._post_request(
f"servers/{barman_server}/operations",
body,
)
return response["operation_id"]

View File

@@ -1,468 +0,0 @@
#!/usr/bin/env python
"""Restore a Barman backup to the local node through ``pg-backup-api``.
This script can be used both as a custom bootstrap method, and as a custom
create replica method. Check the output of ``--help`` to understand the
parameters supported by the script. ``--datadir`` is a special parameter and it
is automatically filled by Patroni in both cases.
It requires that you have previously configured a Barman server, and that you
have ``pg-backup-api`` configured and running in the same host as Barman.
Refer to :class:`ExitCode` for possible exit codes of this script.
"""
from argparse import ArgumentParser
from enum import IntEnum
import json
import logging
import sys
import time
from typing import Any, Callable, Optional, Tuple, Type, Union
from urllib.parse import urljoin
from urllib3 import PoolManager
from urllib3.exceptions import MaxRetryError
from urllib3.response import HTTPResponse
class ExitCode(IntEnum):
"""Possible exit codes of this script.
:cvar RECOVERY_DONE: backup was successfully restored.
:cvar RECOVERY_FAILED: recovery of the backup faced an issue.
:cvar API_NOT_OK: ``pg-backup-api`` status is not ``OK``.
:cvar HTTP_REQUEST_ERROR: an error has occurred during a request to the
``pg-backup-api``.
:cvar HTTP_RESPONSE_MALFORMED: ``pg-backup-api`` returned a bogus response.
"""
RECOVERY_DONE = 0
RECOVERY_FAILED = 1
API_NOT_OK = 2
HTTP_REQUEST_ERROR = 3
HTTP_RESPONSE_MALFORMED = 4
class RetriesExceeded(Exception):
"""Maximum number of retries exceeded."""
def retry(exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]]) \
-> Any:
"""Retry an operation n times if expected *exceptions* are faced.
.. note::
Should be used as a decorator of a class' method as it expects the
first argument to be a class instance.
The class which method is going to be decorated should contain a couple
attributes:
* ``max_retries``: maximum retry attempts before failing;
* ``retry_wait``: how long to wait before retrying.
:param exceptions: exceptions that could trigger a retry attempt.
:raises:
:exc:`RetriesExceeded`: if the maximum number of attempts has been
exhausted.
"""
def decorator(func: Callable[..., Any]) -> Any:
def inner_func(instance: object, *args: Any, **kwargs: Any) -> Any:
times: int = getattr(instance, "max_retries")
retry_wait: int = getattr(instance, "retry_wait")
method_name = f"{instance.__class__.__name__}.{func.__name__}"
attempt = 1
while attempt <= times:
try:
return func(instance, *args, **kwargs)
except exceptions as exc:
logging.warning("Attempt %d of %d on method %s failed "
"with %r.",
attempt, times, method_name, exc)
attempt += 1
time.sleep(retry_wait)
raise RetriesExceeded("Maximum number of retries exceeded for "
f"method {method_name}.")
return inner_func
return decorator
class BarmanRecover:
"""Facilities for performing a remote ``barman recover`` operation.
You should instantiate this class, which will take care of configuring the
operation accordingly. When you want to start the operation, you should
call :meth:`restore_backup`. At any point of interaction with this class,
you may face a :func:`sys.exit` call. Refer to :class:`ExitCode` for a view
on the possible exit codes.
:ivar api_url: base URL to reach the ``pg-backup-api``.
:ivar cert_file: certificate to authenticate against the
``pg-backup-api``, if required.
:ivar key_file: certificate key to authenticate against the
``pg-backup-api``, if required.
:ivar barman_server: name of the Barman server which backup is to be
restored.
:ivar backup_id: ID of the backup from the Barman server.
:ivar ssh_command: SSH command to connect from the Barman host to the
local host.
:ivar data_directory: path to the Postgres data directory where to
restore the backup at.
:ivar loop_wait: how long to wait before checking again the status of the
recovery process. Higher values are useful for backups that are
expected to take long to restore.
:ivar retry_wait: how long to wait before retrying a failed request to the
``pg-backup-api``.
:ivar max_retries: maximum number of retries when ``pg-backup-api`` returns
malformed responses.
:ivar http: a HTTP pool manager for performing web requests.
"""
def __init__(self, api_url: str, barman_server: str, backup_id: str,
ssh_command: str, data_directory: str, loop_wait: int,
retry_wait: int, max_retries: int,
cert_file: Optional[str] = None,
key_file: Optional[str] = None) -> None:
"""Create a new instance of :class:`BarmanRecover`.
Make sure the ``pg-backup-api`` is reachable and running fine.
:param api_url: base URL to reach the ``pg-backup-api``.
:param barman_server: name of the Barman server which backup is to be
restored.
:param backup_id: ID of the backup from the Barman server.
:param ssh_command: SSH command to connect from the Barman host to the
local host.
:param data_directory: path to the Postgres data directory where to
restore the backup at.
:param loop_wait: how long to wait before checking again the status of
the recovery process. Higher values are useful for backups that are
expected to take long to restore.
:param retry_wait: how long to wait before retrying a failed request to
the ``pg-backup-api``.
:param max_retries: maximum number of retries when ``pg-backup-api``
returns malformed responses.
:param cert_file: certificate to authenticate against the
``pg-backup-api``, if required.
:param key_file: certificate key to authenticate against the
``pg-backup-api``, if required.
"""
self.api_url = api_url
self.cert_file = cert_file
self.key_file = key_file
self.barman_server = barman_server
self.backup_id = backup_id
self.ssh_command = ssh_command
self.data_directory = data_directory
self.loop_wait = loop_wait
self.retry_wait = retry_wait
self.max_retries = max_retries
self.http = PoolManager(cert_file=cert_file, key_file=key_file)
self._ensure_api_ok()
def _build_full_url(self, url_path: str) -> str:
"""Build the full URL by concatenating *url_path* with the base URL.
:param url_path: path to be accessed in the ``pg-backup-api``.
:returns: the full URL after concatenating.
"""
return urljoin(self.api_url, url_path)
@staticmethod
def _deserialize_response(response: HTTPResponse) -> Any:
"""Retrieve body from *response* as a deserialized JSON object.
:param response: response from which JSON body will be deserialized.
:returns: the deserialized JSON body.
"""
return json.loads(response.data.decode("utf-8"))
@staticmethod
def _serialize_request(body: Any) -> Any:
"""Serialize a request body.
:param body: content of the request body to be serialized.
:returns: the serialized request body.
"""
return json.dumps(body).encode("utf-8")
def _get_request(self, url_path: str) -> Any:
"""Perform a ``GET`` request to *url_path*.
.. note::
If a :exc:`MaxRetryError` is faced while performing the request,
then exit with :attr:`ExitCode.HTTP_REQUEST_ERROR`
:param url_path: URL to perform the ``GET`` request against.
:returns: the deserialized response body.
"""
response = None
try:
response = self.http.request("GET", self._build_full_url(url_path))
except MaxRetryError as exc:
logging.critical("An error occurred while performing an HTTP GET "
"request: %r", exc)
sys.exit(ExitCode.HTTP_REQUEST_ERROR)
return self._deserialize_response(response)
def _post_request(self, url_path: str, body: Any) -> Any:
"""Perform a ``POST`` request to *url_path* serializing *body* as JSON.
.. note::
If a :exc:`MaxRetryError` is faced while performing the request,
then exit with :attr:`ExitCode.HTTP_REQUEST_ERROR`
:param url_path: URL to perform the ``POST`` request against.
:param body: the body to be serialized as JSON and sent in the request.
:returns: the deserialized response body.
"""
body = self._serialize_request(body)
response = None
try:
response = self.http.request("POST",
self._build_full_url(url_path),
body=body,
headers={
"Content-Type": "application/json"
})
except MaxRetryError as exc:
logging.critical("An error occurred while performing an HTTP POST "
"request: %r", exc)
sys.exit(ExitCode.HTTP_REQUEST_ERROR)
return self._deserialize_response(response)
def _ensure_api_ok(self) -> None:
"""Ensure ``pg-backup-api`` is reachable and ``OK``.
.. note::
If ``pg-backup-api`` status is not ``OK``, then exit with
:attr:`ExitCode.API_NOT_OK`.
"""
response = self._get_request("status")
if response != "OK":
logging.critical("pg-backup-api is not working: %s", response)
sys.exit(ExitCode.API_NOT_OK)
@retry(KeyError)
def _create_recovery_operation(self) -> str:
"""Create a recovery operation on the ``pg-backup-api``.
:returns: the ID of the recovery operation that has been created.
"""
response = self._post_request(
f"servers/{self.barman_server}/operations",
{
"type": "recovery",
"backup_id": self.backup_id,
"remote_ssh_command": self.ssh_command,
"destination_directory": self.data_directory,
},
)
return response["operation_id"]
@retry(KeyError)
def _get_recovery_operation_status(self, operation_id: str) -> str:
"""Get status of the recovery operation *operation_id*.
:param operation_id: ID of the recovery operation to be checked.
:returns: the status of the recovery operation.
"""
response = self._get_request(
f"servers/{self.barman_server}/operations/{operation_id}",
)
return response["status"]
def restore_backup(self) -> bool:
"""Restore the configured Barman backup through ``pg-backup-api``.
.. note::
If recovery API request returns a malformed response, then exit with
:attr:`ExitCode.HTTP_RESPONSE_MALFORMED`.
:returns: ``True`` if it was successfully recovered, ``False``
otherwise.
"""
operation_id = None
try:
operation_id = self._create_recovery_operation()
except RetriesExceeded:
logging.critical("Maximum number of retries exceeded, exiting.")
sys.exit(ExitCode.HTTP_RESPONSE_MALFORMED)
logging.info("Created the recovery operation with ID %s", operation_id)
status = None
while True:
try:
status = self._get_recovery_operation_status(operation_id)
except RetriesExceeded:
logging.critical("Maximum number of retries exceeded, "
"exiting.")
sys.exit(ExitCode.HTTP_RESPONSE_MALFORMED)
if status != "IN_PROGRESS":
break
logging.info("Recovery operation %s is still in progress",
operation_id)
time.sleep(self.loop_wait)
return status == "DONE"
def set_up_logging(log_file: Optional[str] = None) -> None:
"""Set up logging to file, if *log_file* is given, otherwise to console.
:param log_file: file where to log messages, if any.
"""
logging.basicConfig(filename=log_file, level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s")
def main() -> None:
"""Entry point of this script.
Parse the command-line arguments and recover a Barman backup through
``pg-backup-api`` to the local host.
"""
parser = ArgumentParser(
epilog=(
"Wrapper script for ``pg-backup-api``. Communicate with the API "
"running at ``--api-url`` to restore a ``--backup-id`` Barman "
"backup of the server ``--barman-server``."
),
)
parser.add_argument(
"--api-url",
type=str,
required=True,
help="URL to reach the ``pg-backup-api``, e.g. "
"``http://localhost:7480``",
dest="api_url",
)
parser.add_argument(
"--cert-file",
type=str,
required=False,
help="Certificate to authenticate against the API, if required.",
dest="cert_file",
)
parser.add_argument(
"--key-file",
type=str,
required=False,
help="Certificate key to authenticate against the API, if required.",
dest="key_file",
)
parser.add_argument(
"--barman-server",
type=str,
required=True,
help="Name of the Barman server from which to restore the backup.",
dest="barman_server",
)
parser.add_argument(
"--backup-id",
type=str,
required=False,
default="latest",
help="ID of the Barman backup to be restored. You can use any value "
"supported by ``barman recover`` command "
"(default: ``%(default)s``)",
dest="backup_id",
)
parser.add_argument(
"--ssh-command",
type=str,
required=True,
help="Value to be passed as ``--remote-ssh-command`` to "
"``barman recover``.",
dest="ssh_command",
)
parser.add_argument(
"--data-directory",
"--datadir",
type=str,
required=True,
help="Destination path where to restore the barman backup in the "
"local host.",
dest="data_directory",
)
parser.add_argument(
"--log-file",
type=str,
required=False,
help="File where to log messages produced by this script, if any.",
dest="log_file",
)
parser.add_argument(
"--loop-wait",
type=int,
required=False,
default=10,
help="How long to wait before checking again the status of the "
"recovery process, in seconds. Use higher values if your "
"recovery is expected to take long (default: ``%(default)s``)",
dest="loop_wait",
)
parser.add_argument(
"--retry-wait",
type=int,
required=False,
default=2,
help="How long to wait before retrying a failed ``pg-backup-api`` "
"request (default: ``%(default)s``)",
dest="retry_wait",
)
parser.add_argument(
"--max-retries",
type=int,
required=False,
default=5,
help="Maximum number of retries when receiving malformed responses "
"from the ``pg-backup-api`` (default: ``%(default)s``)",
dest="max_retries",
)
args, _ = parser.parse_known_args()
set_up_logging(args.log_file)
barman_recover = BarmanRecover(args.api_url, args.barman_server,
args.backup_id, args.ssh_command,
args.data_directory, args.loop_wait,
args.retry_wait, args.max_retries,
args.cert_file, args.key_file)
successful = barman_recover.restore_backup()
if successful:
logging.info("Recovery operation finished successfully.")
sys.exit(ExitCode.RECOVERY_DONE)
else:
logging.critical("Recovery operation failed.")
sys.exit(ExitCode.RECOVERY_FAILED)
if __name__ == "__main__":
main()

View File

@@ -55,7 +55,7 @@ CONSOLE_SCRIPTS = ['patroni = patroni.__main__:main',
'patroni_raft_controller = patroni.raft_controller:main',
"patroni_wale_restore = patroni.scripts.wale_restore:main",
"patroni_aws = patroni.scripts.aws:main",
"patroni_barman_recover = patroni.scripts.barman_recover:main"]
"patroni_barman = patroni.scripts.barman.cli:main"]
class _Command(Command):

765
tests/test_barman.py Normal file
View File

@@ -0,0 +1,765 @@
import logging
import mock
from mock import MagicMock, Mock, patch
import unittest
from urllib3.exceptions import MaxRetryError
from patroni.scripts.barman.cli import main
from patroni.scripts.barman.config_switch import (ExitCode as BarmanConfigSwitchExitCode, _should_skip_switch,
_switch_config, run_barman_config_switch)
from patroni.scripts.barman.recover import ExitCode as BarmanRecoverExitCode, _restore_backup, run_barman_recover
from patroni.scripts.barman.utils import ApiNotOk, OperationStatus, PgBackupApi, RetriesExceeded, set_up_logging
API_URL = "http://localhost:7480"
BARMAN_SERVER = "my_server"
BARMAN_MODEL = "my_model"
BACKUP_ID = "backup_id"
SSH_COMMAND = "ssh postgres@localhost"
DATA_DIRECTORY = "/path/to/pgdata"
LOOP_WAIT = 10
RETRY_WAIT = 2
MAX_RETRIES = 5
# stuff from patroni.scripts.barman.utils
@patch("logging.basicConfig")
def test_set_up_logging(mock_log_config):
log_file = "/path/to/some/file.log"
set_up_logging(log_file)
mock_log_config.assert_called_once_with(filename=log_file, level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s")
class TestPgBackupApi(unittest.TestCase):
@patch.object(PgBackupApi, "_ensure_api_ok", Mock())
@patch("patroni.scripts.barman.utils.PoolManager", MagicMock())
def setUp(self):
self.api = PgBackupApi(API_URL, None, None, RETRY_WAIT, MAX_RETRIES)
# Reset the mock as the same instance is used across tests
self.api._http.request.reset_mock()
self.api._http.request.side_effect = None
def test__build_full_url(self):
self.assertEqual(self.api._build_full_url("/some/path"), f"{API_URL}/some/path")
@patch("json.loads")
def test__deserialize_response(self, mock_json_loads):
mock_response = MagicMock()
self.assertIsNotNone(self.api._deserialize_response(mock_response))
mock_json_loads.assert_called_once_with(mock_response.data.decode("utf-8"))
@patch("json.dumps")
def test__serialize_request(self, mock_json_dumps):
body = "some_body"
ret = self.api._serialize_request(body)
self.assertIsNotNone(ret)
mock_json_dumps.assert_called_once_with(body)
mock_json_dumps.return_value.encode.assert_called_once_with("utf-8")
@patch.object(PgBackupApi, "_deserialize_response", Mock(return_value="test"))
def test__get_request(self):
mock_request = self.api._http.request
# with no error
self.assertEqual(self.api._get_request("/some/path"), "test")
mock_request.assert_called_once_with("GET", f"{API_URL}/some/path")
# with MaxRetryError
http_error = MaxRetryError(self.api._http, f"{API_URL}/some/path")
mock_request.side_effect = http_error
with self.assertRaises(RetriesExceeded) as exc:
self.assertIsNone(self.api._get_request("/some/path"))
self.assertEqual(
str(exc.exception),
"Failed to perform a GET request to http://localhost:7480/some/path"
)
@patch.object(PgBackupApi, "_deserialize_response", Mock(return_value="test"))
@patch.object(PgBackupApi, "_serialize_request")
def test__post_request(self, mock_serialize):
mock_request = self.api._http.request
# with no error
self.assertEqual(self.api._post_request("/some/path", "some body"), "test")
mock_serialize.assert_called_once_with("some body")
mock_request.assert_called_once_with("POST", f"{API_URL}/some/path", body=mock_serialize.return_value,
headers={"Content-Type": "application/json"})
# with HTTPError
http_error = MaxRetryError(self.api._http, f"{API_URL}/some/path")
mock_request.side_effect = http_error
with self.assertRaises(RetriesExceeded) as exc:
self.assertIsNone(self.api._post_request("/some/path", "some body"))
self.assertEqual(
str(exc.exception),
f"Failed to perform a POST request to http://localhost:7480/some/path with {mock_serialize.return_value}"
)
@patch.object(PgBackupApi, "_get_request")
def test__ensure_api_ok(self, mock_get_request):
# API ok
mock_get_request.return_value = "OK"
self.assertIsNone(self.api._ensure_api_ok())
# API not ok
mock_get_request.return_value = "random"
with self.assertRaises(ApiNotOk) as exc:
self.assertIsNone(self.api._ensure_api_ok())
self.assertEqual(
str(exc.exception),
"pg-backup-api is currently not up and running at http://localhost:7480: random",
)
@patch("patroni.scripts.barman.utils.OperationStatus")
@patch("logging.warning")
@patch("time.sleep")
@patch.object(PgBackupApi, "_get_request")
def test_get_operation_status(self, mock_get_request, mock_sleep, mock_logging, mock_op_status):
# well formed response
mock_get_request.return_value = {"status": "some status"}
mock_op_status.__getitem__.return_value = "SOME_STATUS"
self.assertEqual(self.api.get_operation_status(BARMAN_SERVER, "some_id"), "SOME_STATUS")
mock_get_request.assert_called_once_with(f"servers/{BARMAN_SERVER}/operations/some_id")
mock_sleep.assert_not_called()
mock_logging.assert_not_called()
mock_op_status.__getitem__.assert_called_once_with("some status")
# malformed response
mock_get_request.return_value = {"statuss": "some status"}
with self.assertRaises(RetriesExceeded) as exc:
self.api.get_operation_status(BARMAN_SERVER, "some_id")
self.assertEqual(str(exc.exception),
"Maximum number of retries exceeded for method PgBackupApi.get_operation_status.")
self.assertEqual(mock_sleep.call_count, self.api.max_retries)
mock_sleep.assert_has_calls([mock.call(self.api.retry_wait)] * self.api.max_retries)
self.assertEqual(mock_logging.call_count, self.api.max_retries)
for i in range(mock_logging.call_count):
call_args = mock_logging.call_args_list[i][0]
self.assertEqual(len(call_args), 5)
self.assertEqual(call_args[0], "Attempt %d of %d on method %s failed with %r.")
self.assertEqual(call_args[1], i + 1)
self.assertEqual(call_args[2], self.api.max_retries)
self.assertEqual(call_args[3], "PgBackupApi.get_operation_status")
self.assertIsInstance(call_args[4], KeyError)
self.assertEqual(call_args[4].args, ('status',))
@patch("logging.warning")
@patch("time.sleep")
@patch.object(PgBackupApi, "_post_request")
def test_create_recovery_operation(self, mock_post_request, mock_sleep, mock_logging):
# well formed response
mock_post_request.return_value = {"operation_id": "some_id"}
self.assertEqual(
self.api.create_recovery_operation(BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY),
"some_id",
)
mock_sleep.assert_not_called()
mock_logging.assert_not_called()
mock_post_request.assert_called_once_with(
f"servers/{BARMAN_SERVER}/operations",
{
"type": "recovery",
"backup_id": BACKUP_ID,
"remote_ssh_command": SSH_COMMAND,
"destination_directory": DATA_DIRECTORY,
}
)
# malformed response
mock_post_request.return_value = {"operation_idd": "some_id"}
with self.assertRaises(RetriesExceeded) as exc:
self.api.create_recovery_operation(BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY)
self.assertEqual(str(exc.exception),
"Maximum number of retries exceeded for method PgBackupApi.create_recovery_operation.")
self.assertEqual(mock_sleep.call_count, self.api.max_retries)
mock_sleep.assert_has_calls([mock.call(self.api.retry_wait)] * self.api.max_retries)
self.assertEqual(mock_logging.call_count, self.api.max_retries)
for i in range(mock_logging.call_count):
call_args = mock_logging.call_args_list[i][0]
self.assertEqual(len(call_args), 5)
self.assertEqual(call_args[0], "Attempt %d of %d on method %s failed with %r.")
self.assertEqual(call_args[1], i + 1)
self.assertEqual(call_args[2], self.api.max_retries)
self.assertEqual(call_args[3], "PgBackupApi.create_recovery_operation")
self.assertIsInstance(call_args[4], KeyError)
self.assertEqual(call_args[4].args, ('operation_id',))
@patch("logging.warning")
@patch("time.sleep")
@patch.object(PgBackupApi, "_post_request")
def test_create_config_switch_operation(self, mock_post_request, mock_sleep, mock_logging):
# well formed response -- sample 1
mock_post_request.return_value = {"operation_id": "some_id"}
self.assertEqual(
self.api.create_config_switch_operation(BARMAN_SERVER, BARMAN_MODEL, None),
"some_id",
)
mock_sleep.assert_not_called()
mock_logging.assert_not_called()
mock_post_request.assert_called_once_with(
f"servers/{BARMAN_SERVER}/operations",
{
"type": "config_switch",
"model_name": BARMAN_MODEL,
}
)
# well formed response -- sample 2
mock_post_request.reset_mock()
self.assertEqual(
self.api.create_config_switch_operation(BARMAN_SERVER, None, True),
"some_id",
)
mock_sleep.assert_not_called()
mock_logging.assert_not_called()
mock_post_request.assert_called_once_with(
f"servers/{BARMAN_SERVER}/operations",
{
"type": "config_switch",
"reset": True,
}
)
# malformed response
mock_post_request.return_value = {"operation_idd": "some_id"}
with self.assertRaises(RetriesExceeded) as exc:
self.api.create_config_switch_operation(BARMAN_SERVER, BARMAN_MODEL, None)
self.assertEqual(str(exc.exception),
"Maximum number of retries exceeded for method PgBackupApi.create_config_switch_operation.")
self.assertEqual(mock_sleep.call_count, self.api.max_retries)
mock_sleep.assert_has_calls([mock.call(self.api.retry_wait)] * self.api.max_retries)
self.assertEqual(mock_logging.call_count, self.api.max_retries)
for i in range(mock_logging.call_count):
call_args = mock_logging.call_args_list[i][0]
self.assertEqual(len(call_args), 5)
self.assertEqual(call_args[0], "Attempt %d of %d on method %s failed with %r.")
self.assertEqual(call_args[1], i + 1)
self.assertEqual(call_args[2], self.api.max_retries)
self.assertEqual(call_args[3], "PgBackupApi.create_config_switch_operation")
self.assertIsInstance(call_args[4], KeyError)
self.assertEqual(call_args[4].args, ('operation_id',))
# stuff from patroni.scripts.barman.recover
class TestBarmanRecover(unittest.TestCase):
def setUp(self):
self.api = MagicMock()
# Reset the mock as the same instance is used across tests
self.api._http.request.reset_mock()
self.api._http.request.side_effect = None
@patch("time.sleep")
@patch("logging.info")
@patch("logging.error")
def test__restore_backup(self, mock_log_error, mock_log_info, mock_sleep):
mock_create_op = self.api.create_recovery_operation
mock_get_status = self.api.get_operation_status
# successful fast restore
mock_create_op.return_value = "some_id"
mock_get_status.return_value = OperationStatus.DONE
self.assertEqual(
_restore_backup(self.api, BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY, LOOP_WAIT),
BarmanRecoverExitCode.RECOVERY_DONE,
)
mock_create_op.assert_called_once_with(BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY)
mock_get_status.assert_called_once_with(BARMAN_SERVER, "some_id")
mock_log_info.assert_has_calls([
mock.call("Created the recovery operation with ID %s", "some_id"),
mock.call("Recovery operation finished successfully."),
])
mock_log_error.assert_not_called()
mock_sleep.assert_not_called()
# successful slow restore
mock_create_op.reset_mock()
mock_get_status.reset_mock()
mock_log_info.reset_mock()
mock_get_status.side_effect = [OperationStatus.IN_PROGRESS] * 20 + [OperationStatus.DONE]
self.assertEqual(
_restore_backup(self.api, BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY, LOOP_WAIT),
BarmanRecoverExitCode.RECOVERY_DONE,
)
mock_create_op.assert_called_once()
self.assertEqual(mock_get_status.call_count, 21)
mock_get_status.assert_has_calls([mock.call(BARMAN_SERVER, "some_id")] * 21)
self.assertEqual(mock_log_info.call_count, 22)
mock_log_info.assert_has_calls([mock.call("Created the recovery operation with ID %s", "some_id")]
+ [mock.call("Recovery operation %s is still in progress", "some_id")] * 20
+ [mock.call("Recovery operation finished successfully.")])
mock_log_error.assert_not_called()
self.assertEqual(mock_sleep.call_count, 20)
mock_sleep.assert_has_calls([mock.call(LOOP_WAIT)] * 20)
# failed fast restore
mock_create_op.reset_mock()
mock_get_status.reset_mock()
mock_log_info.reset_mock()
mock_sleep.reset_mock()
mock_get_status.side_effect = None
mock_get_status.return_value = OperationStatus.FAILED
self.assertEqual(
_restore_backup(self.api, BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY, LOOP_WAIT),
BarmanRecoverExitCode.RECOVERY_FAILED,
)
mock_create_op.assert_called_once()
mock_get_status.assert_called_once_with(BARMAN_SERVER, "some_id")
mock_log_info.assert_has_calls([
mock.call("Created the recovery operation with ID %s", "some_id"),
])
mock_log_error.assert_has_calls([
mock.call("Recovery operation failed."),
])
mock_sleep.assert_not_called()
# failed slow restore
mock_create_op.reset_mock()
mock_get_status.reset_mock()
mock_log_info.reset_mock()
mock_log_error.reset_mock()
mock_sleep.reset_mock()
mock_get_status.side_effect = [OperationStatus.IN_PROGRESS] * 20 + [OperationStatus.FAILED]
self.assertEqual(
_restore_backup(self.api, BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY, LOOP_WAIT),
BarmanRecoverExitCode.RECOVERY_FAILED,
)
mock_create_op.assert_called_once()
self.assertEqual(mock_get_status.call_count, 21)
mock_get_status.assert_has_calls([mock.call(BARMAN_SERVER, "some_id")] * 21)
self.assertEqual(mock_log_info.call_count, 21)
mock_log_info.assert_has_calls([mock.call("Created the recovery operation with ID %s", "some_id")]
+ [mock.call("Recovery operation %s is still in progress", "some_id")] * 20)
mock_log_error.assert_has_calls([
mock.call("Recovery operation failed."),
])
self.assertEqual(mock_sleep.call_count, 20)
mock_sleep.assert_has_calls([mock.call(LOOP_WAIT)] * 20)
# create retries exceeded
mock_log_info.reset_mock()
mock_log_error.reset_mock()
mock_sleep.reset_mock()
mock_create_op.side_effect = RetriesExceeded()
mock_get_status.side_effect = None
self.assertEqual(
_restore_backup(self.api, BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY, LOOP_WAIT),
BarmanRecoverExitCode.HTTP_ERROR,
)
mock_log_info.assert_not_called()
mock_log_error.assert_called_once_with("An issue was faced while trying to create a recovery operation: %r",
mock_create_op.side_effect)
mock_sleep.assert_not_called()
# get status retries exceeded
mock_create_op.reset_mock()
mock_create_op.side_effect = None
mock_log_error.reset_mock()
mock_log_info.reset_mock()
mock_get_status.side_effect = RetriesExceeded
self.assertEqual(
_restore_backup(self.api, BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY, LOOP_WAIT),
BarmanRecoverExitCode.HTTP_ERROR,
)
mock_log_info.assert_called_once_with("Created the recovery operation with ID %s", "some_id")
mock_log_error.assert_called_once_with("Maximum number of retries exceeded, exiting.")
mock_sleep.assert_not_called()
class TestBarmanRecoverCli(unittest.TestCase):
@patch("patroni.scripts.barman.recover._restore_backup")
def test_run_barman_recover(self, mock_rb):
api = MagicMock()
args = MagicMock()
# successful execution
mock_rb.return_value = BarmanRecoverExitCode.RECOVERY_DONE
self.assertEqual(
run_barman_recover(api, args),
BarmanRecoverExitCode.RECOVERY_DONE,
)
mock_rb.assert_called_once_with(api, args.barman_server, args.backup_id,
args.ssh_command, args.data_directory,
args.loop_wait)
# failed execution
mock_rb.reset_mock()
mock_rb.return_value = BarmanRecoverExitCode.RECOVERY_FAILED
self.assertEqual(
run_barman_recover(api, args),
BarmanRecoverExitCode.RECOVERY_FAILED,
)
mock_rb.assert_called_once_with(api, args.barman_server, args.backup_id,
args.ssh_command, args.data_directory,
args.loop_wait)
# stuff from patroni.scripts.barman.config_switch
class TestBarmanConfigSwitch(unittest.TestCase):
def setUp(self):
self.api = MagicMock()
# Reset the mock as the same instance is used across tests
self.api._http.request.reset_mock()
self.api._http.request.side_effect = None
@patch("time.sleep")
@patch("logging.info")
@patch("logging.error")
def test__switch_config(self, mock_log_error, mock_log_info, mock_sleep):
mock_create_op = self.api.create_config_switch_operation
mock_get_status = self.api.get_operation_status
# successful fast config-switch
mock_create_op.return_value = "some_id"
mock_get_status.return_value = OperationStatus.DONE
self.assertEqual(
_switch_config(self.api, BARMAN_SERVER, BARMAN_MODEL, None),
BarmanConfigSwitchExitCode.CONFIG_SWITCH_DONE,
)
mock_create_op.assert_called_once_with(BARMAN_SERVER, BARMAN_MODEL, None)
mock_get_status.assert_called_once_with(BARMAN_SERVER, "some_id")
mock_log_info.assert_has_calls([
mock.call("Created the config switch operation with ID %s", "some_id"),
mock.call("Config switch operation finished successfully."),
])
mock_log_error.assert_not_called()
mock_sleep.assert_not_called()
# successful slow config-switch
mock_create_op.reset_mock()
mock_get_status.reset_mock()
mock_log_info.reset_mock()
mock_get_status.side_effect = [OperationStatus.IN_PROGRESS] * 20 + [OperationStatus.DONE]
self.assertEqual(
_switch_config(self.api, BARMAN_SERVER, BARMAN_MODEL, None),
BarmanConfigSwitchExitCode.CONFIG_SWITCH_DONE,
)
mock_create_op.assert_called_once_with(BARMAN_SERVER, BARMAN_MODEL, None)
self.assertEqual(mock_get_status.call_count, 21)
mock_get_status.assert_has_calls([mock.call(BARMAN_SERVER, "some_id")] * 21)
self.assertEqual(mock_log_info.call_count, 22)
mock_log_info.assert_has_calls([mock.call("Created the config switch operation with ID %s", "some_id")]
+ [mock.call("Config switch operation %s is still in progress", "some_id")] * 20
+ [mock.call("Config switch operation finished successfully.")])
mock_log_error.assert_not_called()
self.assertEqual(mock_sleep.call_count, 20)
mock_sleep.assert_has_calls([mock.call(5)] * 20)
# failed fast config-switch
mock_create_op.reset_mock()
mock_get_status.reset_mock()
mock_log_info.reset_mock()
mock_sleep.reset_mock()
mock_get_status.side_effect = None
mock_get_status.return_value = OperationStatus.FAILED
self.assertEqual(
_switch_config(self.api, BARMAN_SERVER, BARMAN_MODEL, None),
BarmanConfigSwitchExitCode.CONFIG_SWITCH_FAILED,
)
mock_create_op.assert_called_once()
mock_get_status.assert_called_once_with(BARMAN_SERVER, "some_id")
mock_log_info.assert_called_once_with("Created the config switch operation with ID %s", "some_id")
mock_log_error.assert_called_once_with("Config switch operation failed.")
mock_sleep.assert_not_called()
# failed slow config-switch
mock_create_op.reset_mock()
mock_get_status.reset_mock()
mock_log_info.reset_mock()
mock_log_error.reset_mock()
mock_sleep.reset_mock()
mock_get_status.side_effect = [OperationStatus.IN_PROGRESS] * 20 + [OperationStatus.FAILED]
self.assertEqual(
_switch_config(self.api, BARMAN_SERVER, BARMAN_MODEL, None),
BarmanConfigSwitchExitCode.CONFIG_SWITCH_FAILED,
)
mock_create_op.assert_called_once()
self.assertEqual(mock_get_status.call_count, 21)
mock_get_status.assert_has_calls([mock.call(BARMAN_SERVER, "some_id")] * 21)
self.assertEqual(mock_log_info.call_count, 21)
mock_log_info.assert_has_calls([mock.call("Created the config switch operation with ID %s", "some_id")]
+ [mock.call("Config switch operation %s is still in progress", "some_id")] * 20)
mock_log_error.assert_called_once_with("Config switch operation failed.")
self.assertEqual(mock_sleep.call_count, 20)
mock_sleep.assert_has_calls([mock.call(5)] * 20)
# create retries exceeded
mock_log_info.reset_mock()
mock_log_error.reset_mock()
mock_sleep.reset_mock()
mock_create_op.side_effect = RetriesExceeded()
mock_get_status.side_effect = None
self.assertEqual(
_switch_config(self.api, BARMAN_SERVER, BARMAN_MODEL, None),
BarmanConfigSwitchExitCode.HTTP_ERROR,
)
mock_log_info.assert_not_called()
mock_log_error.assert_called_once_with("An issue was faced while trying to create a config switch operation: "
"%r",
mock_create_op.side_effect)
mock_sleep.assert_not_called()
# get status retries exceeded
mock_create_op.reset_mock()
mock_create_op.side_effect = None
mock_log_error.reset_mock()
mock_get_status.side_effect = RetriesExceeded
self.assertEqual(
_switch_config(self.api, BARMAN_SERVER, BARMAN_MODEL, None),
BarmanConfigSwitchExitCode.HTTP_ERROR,
)
mock_log_info.assert_called_once_with("Created the config switch operation with ID %s", "some_id")
mock_log_error.assert_called_once_with("Maximum number of retries exceeded, exiting.")
mock_sleep.assert_not_called()
class TestBarmanConfigSwitchCli(unittest.TestCase):
def test__should_skip_switch(self):
args = MagicMock()
for role, switch_when, expected in [
("master", "promoted", False),
("master", "demoted", True),
("master", "always", False),
("primary", "promoted", False),
("primary", "demoted", True),
("primary", "always", False),
("promoted", "promoted", False),
("promoted", "demoted", True),
("promoted", "always", False),
("standby_leader", "promoted", True),
("standby_leader", "demoted", True),
("standby_leader", "always", False),
("replica", "promoted", True),
("replica", "demoted", False),
("replica", "always", False),
("demoted", "promoted", True),
("demoted", "demoted", False),
("demoted", "always", False),
]:
args.role = role
args.switch_when = switch_when
self.assertEqual(_should_skip_switch(args), expected)
@patch("patroni.scripts.barman.config_switch._should_skip_switch")
@patch("patroni.scripts.barman.config_switch._switch_config")
@patch("logging.error")
@patch("logging.info")
def test_run_barman_config_switch(self, mock_log_info, mock_log_error, mock_sc, mock_skip):
api = MagicMock()
args = MagicMock()
args.reset = None
# successful execution
mock_skip.return_value = False
mock_sc.return_value = BarmanConfigSwitchExitCode.CONFIG_SWITCH_DONE
self.assertEqual(
run_barman_config_switch(api, args),
BarmanConfigSwitchExitCode.CONFIG_SWITCH_DONE,
)
mock_sc.assert_called_once_with(api, args.barman_server, args.barman_model,
args.reset)
# failed execution
mock_sc.reset_mock()
mock_sc.return_value = BarmanConfigSwitchExitCode.CONFIG_SWITCH_FAILED
self.assertEqual(
run_barman_config_switch(api, args),
BarmanConfigSwitchExitCode.CONFIG_SWITCH_FAILED,
)
mock_sc.assert_called_once_with(api, args.barman_server, args.barman_model,
args.reset)
# skipped execution
mock_sc.reset_mock()
mock_skip.return_value = True
self.assertEqual(
run_barman_config_switch(api, args),
BarmanConfigSwitchExitCode.CONFIG_SWITCH_SKIPPED
)
mock_sc.assert_not_called()
mock_log_info.assert_called_once_with("Config switch operation was skipped (role=%s, "
"switch_when=%s).", args.role, args.switch_when)
mock_log_error.assert_not_called()
# invalid args -- sample 1
mock_skip.return_value = False
args = MagicMock()
args.barman_server = BARMAN_SERVER
args.barman_model = BARMAN_MODEL
args.reset = True
self.assertEqual(
run_barman_config_switch(api, args),
BarmanConfigSwitchExitCode.INVALID_ARGS,
)
mock_log_error.assert_called_once_with("One, and only one among 'barman_model' ('%s') and 'reset' "
"('%s') should be given", BARMAN_MODEL, True)
api.assert_not_called()
# invalid args -- sample 2
args = MagicMock()
args.barman_server = BARMAN_SERVER
args.barman_model = None
args.reset = None
mock_log_error.reset_mock()
api.reset_mock()
self.assertEqual(
run_barman_config_switch(api, args),
BarmanConfigSwitchExitCode.INVALID_ARGS,
)
mock_log_error.assert_called_once_with("One, and only one among 'barman_model' ('%s') and 'reset' "
"('%s') should be given", None, None)
api.assert_not_called()
# stuff from patroni.scripts.barman.cli
class TestMain(unittest.TestCase):
@patch("patroni.scripts.barman.cli.PgBackupApi")
@patch("patroni.scripts.barman.cli.set_up_logging")
@patch("patroni.scripts.barman.cli.ArgumentParser")
def test_main(self, mock_arg_parse, mock_set_up_log, mock_api):
# sub-command specified
args = MagicMock()
args.func.return_value = 0
mock_arg_parse.return_value.parse_known_args.return_value = (args, None)
with self.assertRaises(SystemExit) as exc:
main()
mock_arg_parse.assert_called_once()
mock_set_up_log.assert_called_once_with(args.log_file)
mock_api.assert_called_once_with(args.api_url, args.cert_file,
args.key_file, args.retry_wait,
args.max_retries)
mock_arg_parse.return_value.print_help.assert_not_called()
args.func.assert_called_once_with(mock_api.return_value, args)
self.assertEqual(exc.exception.code, 0)
# Issue in the API
mock_arg_parse.reset_mock()
mock_set_up_log.reset_mock()
mock_api.reset_mock()
mock_api.side_effect = ApiNotOk()
with self.assertRaises(SystemExit) as exc:
main()
mock_arg_parse.assert_called_once()
mock_set_up_log.assert_called_once_with(args.log_file)
mock_api.assert_called_once_with(args.api_url, args.cert_file,
args.key_file, args.retry_wait,
args.max_retries)
mock_arg_parse.return_value.print_help.assert_not_called()
self.assertEqual(exc.exception.code, -2)
# sub-command not specified
mock_arg_parse.reset_mock()
mock_set_up_log.reset_mock()
mock_api.reset_mock()
delattr(args, "func")
mock_api.side_effect = None
with self.assertRaises(SystemExit) as exc:
main()
mock_arg_parse.assert_called_once()
mock_set_up_log.assert_called_once_with(args.log_file)
mock_api.assert_not_called()
mock_arg_parse.return_value.print_help.assert_called_once_with()
self.assertEqual(exc.exception.code, -1)

View File

@@ -1,366 +0,0 @@
import logging
import mock
from mock import MagicMock, Mock, patch
import unittest
from urllib3.exceptions import MaxRetryError
from patroni.scripts.barman_recover import BarmanRecover, ExitCode, RetriesExceeded, main, set_up_logging
API_URL = "http://localhost:7480"
BARMAN_SERVER = "my_server"
BACKUP_ID = "backup_id"
SSH_COMMAND = "ssh postgres@localhost"
DATA_DIRECTORY = "/path/to/pgdata"
LOOP_WAIT = 10
RETRY_WAIT = 2
MAX_RETRIES = 5
class TestBarmanRecover(unittest.TestCase):
@patch.object(BarmanRecover, "_ensure_api_ok", Mock())
@patch("patroni.scripts.barman_recover.PoolManager", MagicMock())
def setUp(self):
self.br = BarmanRecover(API_URL, BARMAN_SERVER, BACKUP_ID, SSH_COMMAND, DATA_DIRECTORY, LOOP_WAIT, RETRY_WAIT,
MAX_RETRIES)
# Reset the mock as the same instance is used across tests
self.br.http.request.reset_mock()
self.br.http.request.side_effect = None
def test__build_full_url(self):
self.assertEqual(self.br._build_full_url("/some/path"), f"{API_URL}/some/path")
@patch("json.loads")
def test__deserialize_response(self, mock_json_loads):
mock_response = MagicMock()
self.assertIsNotNone(self.br._deserialize_response(mock_response))
mock_json_loads.assert_called_once_with(mock_response.data.decode("utf-8"))
@patch("json.dumps")
def test__serialize_request(self, mock_json_dumps):
body = "some_body"
ret = self.br._serialize_request(body)
self.assertIsNotNone(ret)
mock_json_dumps.assert_called_once_with(body)
mock_json_dumps.return_value.encode.assert_called_once_with("utf-8")
@patch.object(BarmanRecover, "_deserialize_response", Mock(return_value="test"))
@patch("logging.critical")
def test__get_request(self, mock_logging):
mock_request = self.br.http.request
# with no error
self.assertEqual(self.br._get_request("/some/path"), "test")
mock_request.assert_called_once_with("GET", f"{API_URL}/some/path")
# with MaxRetryError
http_error = MaxRetryError(self.br.http, f"{API_URL}/some/path")
mock_request.side_effect = http_error
with self.assertRaises(SystemExit) as exc:
self.assertIsNone(self.br._get_request("/some/path"))
mock_logging.assert_called_once_with("An error occurred while performing an HTTP GET request: %r", http_error)
self.assertEqual(exc.exception.code, ExitCode.HTTP_REQUEST_ERROR)
# with Exception
mock_logging.reset_mock()
mock_request.side_effect = Exception("Some error.")
with patch("sys.exit") as mock_sys:
with self.assertRaises(Exception):
self.assertIsNone(self.br._get_request("/some/path"))
mock_logging.assert_not_called()
mock_sys.assert_not_called()
@patch.object(BarmanRecover, "_deserialize_response", Mock(return_value="test"))
@patch("logging.critical")
@patch.object(BarmanRecover, "_serialize_request")
def test__post_request(self, mock_serialize, mock_logging):
mock_request = self.br.http.request
# with no error
self.assertEqual(self.br._post_request("/some/path", "some body"), "test")
mock_serialize.assert_called_once_with("some body")
mock_request.assert_called_once_with("POST", f"{API_URL}/some/path", body=mock_serialize.return_value,
headers={"Content-Type": "application/json"})
# with HTTPError
http_error = MaxRetryError(self.br.http, f"{API_URL}/some/path")
mock_request.side_effect = http_error
with self.assertRaises(SystemExit) as exc:
self.assertIsNone(self.br._post_request("/some/path", "some body"))
mock_logging.assert_called_once_with("An error occurred while performing an HTTP POST request: %r", http_error)
self.assertEqual(exc.exception.code, ExitCode.HTTP_REQUEST_ERROR)
# with Exception
mock_logging.reset_mock()
mock_request.side_effect = Exception("Some error.")
with patch("sys.exit") as mock_sys:
with self.assertRaises(Exception):
self.br._post_request("/some/path", "some body")
mock_logging.assert_not_called()
mock_sys.assert_not_called()
@patch("logging.critical")
@patch.object(BarmanRecover, "_get_request")
def test__ensure_api_ok(self, mock_get_request, mock_logging):
# API ok
mock_get_request.return_value = "OK"
with patch("sys.exit") as mock_sys:
self.assertIsNone(self.br._ensure_api_ok())
mock_logging.assert_not_called()
mock_sys.assert_not_called()
# API not ok
mock_get_request.return_value = "random"
with self.assertRaises(SystemExit) as exc:
self.assertIsNone(self.br._ensure_api_ok())
mock_logging.assert_called_once_with("pg-backup-api is not working: %s", "random")
self.assertEqual(exc.exception.code, ExitCode.API_NOT_OK)
@patch("logging.warning")
@patch("time.sleep")
@patch.object(BarmanRecover, "_post_request")
def test__create_recovery_operation(self, mock_post_request, mock_sleep, mock_logging):
# well formed response
mock_post_request.return_value = {"operation_id": "some_id"}
self.assertEqual(self.br._create_recovery_operation(), "some_id")
mock_sleep.assert_not_called()
mock_logging.assert_not_called()
mock_post_request.assert_called_once_with(
f"servers/{BARMAN_SERVER}/operations",
{
"type": "recovery",
"backup_id": BACKUP_ID,
"remote_ssh_command": SSH_COMMAND,
"destination_directory": DATA_DIRECTORY,
}
)
# malformed response
mock_post_request.return_value = {"operation_idd": "some_id"}
with self.assertRaises(RetriesExceeded) as exc:
self.br._create_recovery_operation()
self.assertEqual(str(exc.exception),
"Maximum number of retries exceeded for method BarmanRecover._create_recovery_operation.")
self.assertEqual(mock_sleep.call_count, self.br.max_retries)
mock_sleep.assert_has_calls([mock.call(self.br.retry_wait)] * self.br.max_retries)
self.assertEqual(mock_logging.call_count, self.br.max_retries)
for i in range(mock_logging.call_count):
call_args = mock_logging.call_args_list[i][0]
self.assertEqual(len(call_args), 5)
self.assertEqual(call_args[0], "Attempt %d of %d on method %s failed with %r.")
self.assertEqual(call_args[1], i + 1)
self.assertEqual(call_args[2], self.br.max_retries)
self.assertEqual(call_args[3], "BarmanRecover._create_recovery_operation")
self.assertIsInstance(call_args[4], KeyError)
self.assertEqual(call_args[4].args, ('operation_id',))
@patch("logging.warning")
@patch("time.sleep")
@patch.object(BarmanRecover, "_get_request")
def test__get_recovery_operation_status(self, mock_get_request, mock_sleep, mock_logging):
# well formed response
mock_get_request.return_value = {"status": "some status"}
self.assertEqual(self.br._get_recovery_operation_status("some_id"), "some status")
mock_get_request.assert_called_once_with(f"servers/{BARMAN_SERVER}/operations/some_id")
mock_sleep.assert_not_called()
mock_logging.assert_not_called()
# malformed response
mock_get_request.return_value = {"statuss": "some status"}
with self.assertRaises(RetriesExceeded) as exc:
self.br._get_recovery_operation_status("some_id")
self.assertEqual(str(exc.exception),
"Maximum number of retries exceeded for method BarmanRecover._get_recovery_operation_status.")
self.assertEqual(mock_sleep.call_count, self.br.max_retries)
mock_sleep.assert_has_calls([mock.call(self.br.retry_wait)] * self.br.max_retries)
self.assertEqual(mock_logging.call_count, self.br.max_retries)
for i in range(mock_logging.call_count):
call_args = mock_logging.call_args_list[i][0]
self.assertEqual(len(call_args), 5)
self.assertEqual(call_args[0], "Attempt %d of %d on method %s failed with %r.")
self.assertEqual(call_args[1], i + 1)
self.assertEqual(call_args[2], self.br.max_retries)
self.assertEqual(call_args[3], "BarmanRecover._get_recovery_operation_status")
self.assertIsInstance(call_args[4], KeyError)
self.assertEqual(call_args[4].args, ('status',))
@patch.object(BarmanRecover, "_get_recovery_operation_status")
@patch("time.sleep")
@patch("logging.info")
@patch("logging.critical")
@patch.object(BarmanRecover, "_create_recovery_operation")
def test_restore_backup(self, mock_create_op, mock_log_critical, mock_log_info, mock_sleep, mock_get_status):
# successful fast restore
mock_create_op.return_value = "some_id"
mock_get_status.return_value = "DONE"
self.assertTrue(self.br.restore_backup())
mock_create_op.assert_called_once()
mock_get_status.assert_called_once_with("some_id")
mock_log_info.assert_called_once_with("Created the recovery operation with ID %s", "some_id")
mock_log_critical.assert_not_called()
mock_sleep.assert_not_called()
# successful slow restore
mock_create_op.reset_mock()
mock_get_status.reset_mock()
mock_log_info.reset_mock()
mock_get_status.side_effect = ["IN_PROGRESS"] * 20 + ["DONE"]
self.assertTrue(self.br.restore_backup())
mock_create_op.assert_called_once()
self.assertEqual(mock_get_status.call_count, 21)
mock_get_status.assert_has_calls([mock.call("some_id")] * 21)
self.assertEqual(mock_log_info.call_count, 21)
mock_log_info.assert_has_calls([mock.call("Created the recovery operation with ID %s", "some_id")]
+ [mock.call("Recovery operation %s is still in progress", "some_id")] * 20)
mock_log_critical.assert_not_called()
self.assertEqual(mock_sleep.call_count, 20)
mock_sleep.assert_has_calls([mock.call(LOOP_WAIT)] * 20)
# failed fast restore
mock_create_op.reset_mock()
mock_get_status.reset_mock()
mock_log_info.reset_mock()
mock_sleep.reset_mock()
mock_get_status.side_effect = None
mock_get_status.return_value = "FAILED"
self.assertFalse(self.br.restore_backup())
mock_create_op.assert_called_once()
mock_get_status.assert_called_once_with("some_id")
mock_log_info.assert_called_once_with("Created the recovery operation with ID %s", "some_id")
mock_log_critical.assert_not_called()
mock_sleep.assert_not_called()
# failed slow restore
mock_create_op.reset_mock()
mock_get_status.reset_mock()
mock_log_info.reset_mock()
mock_sleep.reset_mock()
mock_get_status.side_effect = ["IN_PROGRESS"] * 20 + ["FAILED"]
self.assertFalse(self.br.restore_backup())
mock_create_op.assert_called_once()
self.assertEqual(mock_get_status.call_count, 21)
mock_get_status.assert_has_calls([mock.call("some_id")] * 21)
self.assertEqual(mock_log_info.call_count, 21)
mock_log_info.assert_has_calls([mock.call("Created the recovery operation with ID %s", "some_id")]
+ [mock.call("Recovery operation %s is still in progress", "some_id")] * 20)
mock_log_critical.assert_not_called()
self.assertEqual(mock_sleep.call_count, 20)
mock_sleep.assert_has_calls([mock.call(LOOP_WAIT)] * 20)
# create retries exceeded
mock_log_info.reset_mock()
mock_sleep.reset_mock()
mock_create_op.side_effect = RetriesExceeded
mock_get_status.side_effect = None
with self.assertRaises(SystemExit) as exc:
self.assertIsNone(self.br.restore_backup())
self.assertEqual(exc.exception.code, ExitCode.HTTP_RESPONSE_MALFORMED)
mock_log_info.assert_not_called()
mock_log_critical.assert_called_once_with("Maximum number of retries exceeded, exiting.")
mock_sleep.assert_not_called()
# get status retries exceeded
mock_create_op.reset_mock()
mock_create_op.side_effect = None
mock_log_critical.reset_mock()
mock_log_info.reset_mock()
mock_get_status.side_effect = RetriesExceeded
with self.assertRaises(SystemExit) as exc:
self.assertIsNone(self.br.restore_backup())
self.assertEqual(exc.exception.code, ExitCode.HTTP_RESPONSE_MALFORMED)
mock_log_info.assert_called_once_with("Created the recovery operation with ID %s", "some_id")
mock_log_critical.assert_called_once_with("Maximum number of retries exceeded, exiting.")
mock_sleep.assert_not_called()
class TestMain(unittest.TestCase):
@patch("logging.basicConfig")
def test_set_up_logging(self, mock_log_config):
log_file = "/path/to/some/file.log"
set_up_logging(log_file)
mock_log_config.assert_called_once_with(filename=log_file, level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s")
@patch("logging.critical")
@patch("logging.info")
@patch("patroni.scripts.barman_recover.set_up_logging")
@patch("patroni.scripts.barman_recover.BarmanRecover")
@patch("patroni.scripts.barman_recover.ArgumentParser")
def test_main(self, mock_arg_parse, mock_br, mock_set_up_log, mock_log_info, mock_log_critical):
# successful restore
args = MagicMock()
mock_arg_parse.return_value.parse_known_args.return_value = (args, None)
mock_br.return_value.restore_backup.return_value = True
with self.assertRaises(SystemExit) as exc:
main()
mock_arg_parse.assert_called_once()
mock_set_up_log.assert_called_once_with(args.log_file)
mock_br.assert_called_once_with(args.api_url, args.barman_server, args.backup_id, args.ssh_command,
args.data_directory, args.loop_wait, args.retry_wait, args.max_retries,
args.cert_file, args.key_file)
mock_log_info.assert_called_once_with("Recovery operation finished successfully.")
mock_log_critical.assert_not_called()
self.assertEqual(exc.exception.code, ExitCode.RECOVERY_DONE)
# failed restore
mock_arg_parse.reset_mock()
mock_set_up_log.reset_mock()
mock_br.reset_mock()
mock_log_info.reset_mock()
mock_br.return_value.restore_backup.return_value = False
with self.assertRaises(SystemExit) as exc:
main()
mock_arg_parse.assert_called_once()
mock_set_up_log.assert_called_once_with(args.log_file)
mock_br.assert_called_once_with(args.api_url, args.barman_server, args.backup_id, args.ssh_command,
args.data_directory, args.loop_wait, args.retry_wait, args.max_retries,
args.cert_file, args.key_file)
mock_log_info.assert_not_called()
mock_log_critical.assert_called_once_with("Recovery operation failed.")
self.assertEqual(exc.exception.code, ExitCode.RECOVERY_FAILED)