From bfbc4860d5140fbbedee01687000131e6bef8cb1 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Wed, 29 Jul 2020 15:34:44 +0200 Subject: [PATCH] PoC: Patroni on pure RAFT (#375) * new node can join the cluster dynamically and become a part of consensus * it is also possible to join only Patroni cluster (without adding the node to the raft), just comment or remove `raft.self_addr` for that * when the node joins the cluster it is using values from `raft.partner_addrs` only for initial discovery. * It is possible to run Patroni and Postgres on two nodes plus one node with `patroni_raft_controller` (without Patroni and Postgres). In such setup one can temporarily lose one node without affecting the primary. --- .travis.yml | 6 + docs/ENVIRONMENT.rst | 7 + docs/SETTINGS.rst | 29 +++ features/environment.py | 47 ++++ patroni/__init__.py | 109 ++------- patroni/config.py | 4 + patroni/daemon.py | 104 +++++++++ patroni/dcs/raft.py | 409 ++++++++++++++++++++++++++++++++++ patroni/raft_controller.py | 34 +++ patroni_raft_controller.py | 6 + postgres0.yml | 7 + postgres1.yml | 7 + postgres2.yml | 7 + requirements.txt | 1 + setup.py | 3 +- tests/test_config.py | 1 + tests/test_patroni.py | 2 +- tests/test_raft.py | 144 ++++++++++++ tests/test_raft_controller.py | 46 ++++ 19 files changed, 884 insertions(+), 89 deletions(-) create mode 100644 patroni/daemon.py create mode 100644 patroni/dcs/raft.py create mode 100644 patroni/raft_controller.py create mode 100755 patroni_raft_controller.py create mode 100644 tests/test_raft.py create mode 100644 tests/test_raft_controller.py diff --git a/.travis.yml b/.travis.yml index 0a4855a6..e7111e8b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,8 @@ matrix: env: DCS="exhibitor" TEST_SUITE="behave" - python: "3.6" env: DCS="consul" TEST_SUITE="behave" + - python: "3.6" + env: DCS="raft" TEST_SUITE="behave" - python: "3.6" env: DCS="kubernetes" TEST_SUITE="behave" branches: @@ -102,6 +104,10 @@ install: done& } + function get_raft() { + return 0 + } + attempt_num=1 until get_${DCS}; do [[ $attempt_num -ge 3 ]] && exit 1 diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 4ab39d49..c86a35f8 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -87,6 +87,13 @@ Kubernetes - **PATRONI\_KUBERNETES\_POD\_IP**: (optional) IP address of the pod Patroni is running in. This value is required when `PATRONI_KUBERNETES_USE_ENDPOINTS` is enabled and is used to populate the leader endpoint subsets when the pod's PostgreSQL is promoted. - **PATRONI\_KUBERNETES\_PORTS**: (optional) if the Service object has the name for the port, the same name must appear in the Endpoint object, otherwise service won't work. For example, if your service is defined as ``{Kind: Service, spec: {ports: [{name: postgresql, port: 5432, targetPort: 5432}]}}``, then you have to set ``PATRONI_KUBERNETES_PORTS='[{"name": "postgresql", "port": 5432}]'`` and Patroni will use it for updating subsets of the leader Endpoint. This parameter is used only if `PATRONI_KUBERNETES_USE_ENDPOINTS` is set. +Raft +---- + +- **PATRONI\_RAFT\_SELF\_ADDR**: ``ip:port`` to listen on for Raft connections. If not set, the node will not participate in consensus. +- **PATRONI\_RAFT\_PARTNER\_ADDRS**: list of other Patroni nodes in the cluster in format ``"'ip1:port1','ip2:port2'"``. It is important to quote every single entity! +- **PATRONI\_RAFT\_DATA\_DIR**: directory where to store Raft log and snapshot. If not specified the current working directory is used. + PostgreSQL ---------- - **PATRONI\_POSTGRESQL\_LISTEN**: IP address + port that Postgres listens to. Multiple comma-separated addresses are permitted, as long as the port component is appended after to the last one with a colon, i.e. ``listen: 127.0.0.1,127.0.0.2:5432``. Patroni will use the first address from this list to establish local connections to the PostgreSQL node. diff --git a/docs/SETTINGS.rst b/docs/SETTINGS.rst index 88d5a53b..abc308e8 100644 --- a/docs/SETTINGS.rst +++ b/docs/SETTINGS.rst @@ -159,6 +159,35 @@ Kubernetes - **pod\_ip**: (optional) IP address of the pod Patroni is running in. This value is required when `use_endpoints` is enabled and is used to populate the leader endpoint subsets when the pod's PostgreSQL is promoted. - **ports**: (optional) if the Service object has the name for the port, the same name must appear in the Endpoint object, otherwise service won't work. For example, if your service is defined as ``{Kind: Service, spec: {ports: [{name: postgresql, port: 5432, targetPort: 5432}]}}``, then you have to set ``kubernetes.ports: [{"name": "postgresql", "port": 5432}]`` and Patroni will use it for updating subsets of the leader Endpoint. This parameter is used only if `kubernetes.use_endpoints` is set. +Raft +---- +- **self\_addr**: ``ip:port`` to listen on for Raft connections. If not set, the node will not participate in consensus. +- **partner\_addrs**: list of other Patroni nodes in the cluster in format: ['ip1:port', 'ip2:port', 'etc...'] +- **data\_dir**: directory where to store Raft log and snapshot. If not specified the current working directory is used. + + Short FAQ about Raft implementation + + - Q: How to list all the nodes providing consensus? + + A: ``syncobj_admin -conn host:port`` -status where the host:port is the address of one of the cluster nodes + + - Q: Node that was a part of consensus and has gone and I can't reuse the same IP for other node. How to remove this node from the consensus? + + A: ``syncobj_admin -conn host:port -remove host2:port2`` where the ``host2:port2`` is the address of the node you want to remove from consensus. + + - Q: Where to get the ``syncobj_admin`` utility? + + A: It is installed together with ``pysyncobj`` module (python RAFT implementation), which is Patroni dependancy. + + - Q: it is possible to run Patroni node without adding in to the consensus? + + A: Yes, just comment out or remove ``raft.self_addr`` from Patroni configuration. + + - Q: It is possible to run Patroni and PostgreSQL only on two nodes? + + A: Yes, on the third node you can run ``patroni_raft_controller`` (without Patroni and PostgreSQL). In such setup one can temporary loose one node without affecting the primary. + + .. _postgresql_settings: PostgreSQL diff --git a/features/environment.py b/features/environment.py index ab65768d..563ea0c6 100644 --- a/features/environment.py +++ b/features/environment.py @@ -165,6 +165,11 @@ class PatroniController(AbstractController): config = yaml.safe_load(f) config.pop('etcd', None) + raft_port = os.environ.get('RAFT_PORT') + if raft_port: + os.environ['RAFT_PORT'] = str(int(raft_port) + 1) + config['raft'] = {'data_dir': self._output_dir, 'self_addr': 'localhost:' + os.environ['RAFT_PORT']} + host = config['postgresql']['listen'].split(':')[0] config['postgresql']['listen'] = config['postgresql']['connect_address'] = '{0}:{1}'.format(host, self.__PORT) @@ -531,6 +536,48 @@ class ExhibitorController(ZooKeeperController): os.environ.update({'PATRONI_EXHIBITOR_HOSTS': 'localhost', 'PATRONI_EXHIBITOR_PORT': '8181'}) +class RaftController(AbstractDcsController): + + CONTROLLER_ADDR = 'localhost:1234' + + def __init__(self, context): + super(RaftController, self).__init__(context) + os.environ.update(PATRONI_RAFT_PARTNER_ADDRS="'" + self.CONTROLLER_ADDR + "'", RAFT_PORT='1234') + self._raft = None + + def _start(self): + env = os.environ.copy() + del env['PATRONI_RAFT_PARTNER_ADDRS'] + env['PATRONI_RAFT_SELF_ADDR'] = self.CONTROLLER_ADDR + env['PATRONI_RAFT_DATA_DIR'] = self._work_directory + return subprocess.Popen([sys.executable, '-m', 'coverage', 'run', + '--source=patroni', '-p', 'patroni_raft_controller.py'], + stdout=self._log, stderr=subprocess.STDOUT, env=env) + + def query(self, key, scope='batman'): + ret = self._raft.get(self.path(key, scope)) + return ret and ret['value'] + + def set(self, key, value): + self._raft.set(self.path(key), value) + + def cleanup_service_tree(self): + from patroni.dcs.raft import KVStoreTTL + from pysyncobj import SyncObjConf + + if self._raft: + self._raft.destroy() + self._raft._SyncObj__thread.join() + self.stop() + os.makedirs(self._work_directory) + self.start() + + ready_event = threading.Event() + conf = SyncObjConf(appendEntriesUseBatch=False, dynamicMembershipChange=True, onReady=ready_event.set) + self._raft = KVStoreTTL(None, [self.CONTROLLER_ADDR], conf) + ready_event.wait() + + class PatroniPoolController(object): BACKUP_SCRIPT = [sys.executable, 'features/backup_create.py'] diff --git a/patroni/__init__.py b/patroni/__init__.py index c079d61b..4d3463a5 100644 --- a/patroni/__init__.py +++ b/patroni/__init__.py @@ -4,7 +4,8 @@ import signal import sys import time -from patroni.version import __version__ +from .daemon import AbstractPatroniDaemon, abstract_main +from .version import __version__ logger = logging.getLogger(__name__) @@ -12,23 +13,19 @@ PATRONI_ENV_PREFIX = 'PATRONI_' KUBERNETES_ENV_PREFIX = 'KUBERNETES_' -class Patroni(object): +class Patroni(AbstractPatroniDaemon): - def __init__(self, conf): + def __init__(self, config): from patroni.api import RestApiServer from patroni.dcs import get_dcs from patroni.ha import Ha - from patroni.log import PatroniLogger from patroni.postgresql import Postgresql from patroni.request import PatroniRequest from patroni.watchdog import Watchdog - self.setup_signal_handlers() + super(Patroni, self).__init__(config) self.version = __version__ - self.logger = PatroniLogger() - self.config = conf - self.logger.reload_config(self.config.get('log', {})) self.dcs = get_dcs(self.config) self.watchdog = Watchdog(self.config) self.load_dynamic_configuration() @@ -71,14 +68,14 @@ class Patroni(object): def nosync(self): return bool(self.tags.get('nosync', False)) - def reload_config(self, sighup=False): + def reload_config(self, sighup=False, local=False): try: - self.tags = self.get_tags() - self.logger.reload_config(self.config.get('log', {})) - self.watchdog.reload_config(self.config) - if sighup: + super(Patroni, self).reload_config(sighup, local) + if local: + self.tags = self.get_tags() self.request.reload_config(self.config) - self.api.reload_config(self.config['restapi']) + self.api.reload_config(self.config['restapi']) + self.watchdog.reload_config(self.config) self.postgresql.reload_config(self.config['postgresql'], sighup) self.dcs.reload_config(self.config) except Exception: @@ -88,15 +85,6 @@ class Patroni(object): def replicatefrom(self): return self.tags.get('replicatefrom') - def sighup_handler(self, *args): - self._received_sighup = True - - def sigterm_handler(self, *args): - with self._sigterm_lock: - if not self._received_sigterm: - self._received_sigterm = True - sys.exit() - @property def noloadbalance(self): return bool(self.tags.get('noloadbalance', False)) @@ -114,48 +102,24 @@ class Patroni(object): elif self.ha.watch(nap_time): self.next_run = time.time() - @property - def received_sigterm(self): - with self._sigterm_lock: - return self._received_sigterm - def run(self): self.api.start() - self.logger.start() self.next_run = time.time() + super(Patroni, self).run() - while not self.received_sigterm: - if self._received_sighup: - self._received_sighup = False - if self.config.reload_local_configuration(): - self.reload_config(True) - else: - self.postgresql.config.reload_config(self.config['postgresql'], True) + def _run_cycle(self): + logger.info(self.ha.run_cycle()) - logger.info(self.ha.run_cycle()) + if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \ + and self.config.set_dynamic_configuration(self.dcs.cluster.config): + self.reload_config() - if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \ - and self.config.set_dynamic_configuration(self.dcs.cluster.config): - self.reload_config() + if self.postgresql.role != 'uninitialized': + self.config.save_cache() - if self.postgresql.role != 'uninitialized': - self.config.save_cache() + self.schedule_next_run() - self.schedule_next_run() - - def setup_signal_handlers(self): - from threading import Lock - - self._received_sighup = False - self._sigterm_lock = Lock() - self._received_sigterm = False - if os.name != 'nt': - signal.signal(signal.SIGHUP, self.sighup_handler) - signal.signal(signal.SIGTERM, self.sigterm_handler) - - def shutdown(self): - with self._sigterm_lock: - self._received_sigterm = True + def _shutdown(self): try: self.api.shutdown() except Exception: @@ -164,43 +128,14 @@ class Patroni(object): self.ha.shutdown() except Exception: logger.exception('Exception during Ha.shutdown') - self.logger.shutdown() def patroni_main(): - import argparse - from multiprocessing import freeze_support - from patroni.config import Config, ConfigParseError from patroni.validator import schema freeze_support() - - parser = argparse.ArgumentParser() - parser.add_argument('--version', action='version', version='%(prog)s {0}'.format(__version__)) - parser.add_argument('--validate-config', action='store_true', help='Run config validator and exit') - parser.add_argument('configfile', nargs='?', default='', - help='Patroni may also read the configuration from the {0} environment variable' - .format(Config.PATRONI_CONFIG_VARIABLE)) - args = parser.parse_args() - try: - if args.validate_config: - conf = Config(args.configfile, validator=schema) - sys.exit() - else: - conf = Config(args.configfile) - except ConfigParseError as e: - if e.value: - print(e.value) - parser.print_help() - sys.exit(1) - patroni = Patroni(conf) - try: - patroni.run() - except KeyboardInterrupt: - pass - finally: - patroni.shutdown() + abstract_main(Patroni, schema) def fatal(string, *args): diff --git a/patroni/config.py b/patroni/config.py index f6fa4854..7644a8e7 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -293,6 +293,10 @@ class Config(object): logger.exception('Exception when parsing list %s', value) return None + _set_section_values('raft', ['data_dir', 'self_addr', 'partner_addrs']) + if 'raft' in ret and 'partner_addrs' in ret['raft']: + ret['raft']['partner_addrs'] = _parse_list(ret['raft']['partner_addrs']) + for param in list(os.environ.keys()): if param.startswith(PATRONI_ENV_PREFIX): # PATRONI_(ETCD|CONSUL|ZOOKEEPER|EXHIBITOR|...)_(HOSTS?|PORT|..) diff --git a/patroni/daemon.py b/patroni/daemon.py new file mode 100644 index 00000000..5bb99990 --- /dev/null +++ b/patroni/daemon.py @@ -0,0 +1,104 @@ +import abc +import os +import signal +import six +import sys + +from threading import Lock + + +@six.add_metaclass(abc.ABCMeta) +class AbstractPatroniDaemon(object): + + def __init__(self, config): + from patroni.log import PatroniLogger + + self.setup_signal_handlers() + + self.logger = PatroniLogger() + self.config = config + AbstractPatroniDaemon.reload_config(self, local=True) + + def sighup_handler(self, *args): + self._received_sighup = True + + def sigterm_handler(self, *args): + with self._sigterm_lock: + if not self._received_sigterm: + self._received_sigterm = True + sys.exit() + + def setup_signal_handlers(self): + self._received_sighup = False + self._sigterm_lock = Lock() + self._received_sigterm = False + if os.name != 'nt': + signal.signal(signal.SIGHUP, self.sighup_handler) + signal.signal(signal.SIGTERM, self.sigterm_handler) + + @property + def received_sigterm(self): + with self._sigterm_lock: + return self._received_sigterm + + def reload_config(self, sighup=False, local=False): + if local: + self.logger.reload_config(self.config.get('log', {})) + + @abc.abstractmethod + def _run_cycle(self): + """_run_cycle""" + + def run(self): + self.logger.start() + while not self.received_sigterm: + if self._received_sighup: + self._received_sighup = False + self.reload_config(True, self.config.reload_local_configuration()) + + self._run_cycle() + + @abc.abstractmethod + def _shutdown(self): + """_shutdown""" + + def shutdown(self): + with self._sigterm_lock: + self._received_sigterm = True + self._shutdown() + self.logger.shutdown() + + +def abstract_main(cls, validator=None): + import argparse + + from .config import Config, ConfigParseError + from .version import __version__ + + parser = argparse.ArgumentParser() + parser.add_argument('--version', action='version', version='%(prog)s {0}'.format(__version__)) + if validator: + parser.add_argument('--validate-config', action='store_true', help='Run config validator and exit') + parser.add_argument('configfile', nargs='?', default='', + help='Patroni may also read the configuration from the {0} environment variable' + .format(Config.PATRONI_CONFIG_VARIABLE)) + args = parser.parse_args() + try: + if validator and args.validate_config: + Config(args.configfile, validator=validator) + sys.exit() + + config = Config(args.configfile) + except ConfigParseError as e: + if e.value: + print(e.value) + parser.print_help() + sys.exit(1) + + controller = cls(config) + try: + controller.run() + except KeyboardInterrupt: + pass + finally: + controller.shutdown() diff --git a/patroni/dcs/raft.py b/patroni/dcs/raft.py new file mode 100644 index 00000000..66359d14 --- /dev/null +++ b/patroni/dcs/raft.py @@ -0,0 +1,409 @@ +import json +import logging +import os +import threading +import time + +from patroni.dcs import AbstractDCS, ClusterConfig, Cluster, Failover, Leader, Member, SyncState, TimelineHistory +from pysyncobj import SyncObj, SyncObjConf, replicated, FAIL_REASON +from pysyncobj.transport import Node, TCPTransport, CONNECTION_STATE + +logger = logging.getLogger(__name__) + + +class MessageNode(Node): + + def __init__(self, address): + self.address = address + + +class UtilityTransport(TCPTransport): + + def __init__(self, syncObj, selfNode, otherNodes): + super(UtilityTransport, self).__init__(syncObj, selfNode, otherNodes) + self._selfIsReadonlyNode = False + + def _connectIfNecessarySingle(self, node): + pass + + def connectionState(self, node): + return self._connections[node].state + + def isDisconnected(self, node): + return self.connectionState(node) == CONNECTION_STATE.DISCONNECTED + + def connectIfRequiredSingle(self, node): + if self.isDisconnected(node): + return self._connections[node].connect(node.ip, node.port) + + def disconnectSingle(self, node): + self._connections[node].disconnect() + + +class SyncObjUtility(SyncObj): + + def __init__(self, otherNodes, conf): + autoTick = conf.autoTick + conf.autoTick = False + super(SyncObjUtility, self).__init__(None, otherNodes, conf, transportClass=UtilityTransport) + conf.autoTick = autoTick + self._SyncObj__transport.setOnMessageReceivedCallback(self._onMessageReceived) + self.__result = None + + def setPartnerNode(self, partner): + self.__node = partner + + def sendMessage(self, message): + # Abuse the fact that node address is send as a first message + self._SyncObj__transport._selfNode = MessageNode(message) + self._SyncObj__transport.connectIfRequiredSingle(self.__node) + while not self._SyncObj__transport.isDisconnected(self.__node): + self._poller.poll(0.5) + return self.__result + + def _onMessageReceived(self, _, message): + self.__result = message + self._SyncObj__transport.disconnectSingle(self.__node) + + +class MyTCPTransport(TCPTransport): + + def _onIncomingMessageReceived(self, conn, message): + if self._syncObj.encryptor and not conn.sendRandKey: + conn.sendRandKey = message + conn.recvRandKey = os.urandom(32) + conn.send(conn.recvRandKey) + return + + # Utility messages + if isinstance(message, list) and message[0] == 'members': + conn.send(self._syncObj._get_members()) + return True + + return super(MyTCPTransport, self)._onIncomingMessageReceived(conn, message) + + +class DynMemberSyncObj(SyncObj): + + def __init__(self, selfAddress, partnerAddrs, conf): + add_self = False + if selfAddress: + utility = SyncObjUtility(partnerAddrs, conf) + for node in utility._SyncObj__otherNodes: + utility.setPartnerNode(node) + response = utility.sendMessage(['members']) + if response: + partnerAddrs = [member['addr'] for member in response if member['addr'] != selfAddress] + add_self = len(partnerAddrs) == len(response) + break + + super(DynMemberSyncObj, self).__init__(selfAddress, partnerAddrs, conf, transportClass=MyTCPTransport) + if add_self: + threading.Thread(target=utility.sendMessage, args=(['add', selfAddress],)).start() + + def _get_members(self): + ret = [{'addr': node.id, 'leader': node == self._getLeader(), + 'status': CONNECTION_STATE.CONNECTED if node in self._SyncObj__connectedNodes + else CONNECTION_STATE.DISCONNECTED} for node in self._SyncObj__otherNodes] + ret.append({'addr': self._SyncObj__selfNode.id, 'leader': self._isLeader(), + 'status': CONNECTION_STATE.CONNECTED}) + return ret + + def _SyncObj__doChangeCluster(self, request, reverse=False): + ret = False + if not self._SyncObj__selfNode or request[0] != 'add' or reverse or request[1] != self._SyncObj__selfNode.id: + ret = super(DynMemberSyncObj, self)._SyncObj__doChangeCluster(request, reverse) + if ret: + self.forceLogCompaction() + return ret + + +class KVStoreTTL(DynMemberSyncObj): + + def __init__(self, selfAddress, partnerAddrs, conf, on_set=None, on_delete=None): + self.__on_set = on_set + self.__on_delete = on_delete + self.__limb = {} + self.__retry_timeout = None + self.__early_apply_local_log = selfAddress is not None + self.applied_local_log = False + super(KVStoreTTL, self).__init__(selfAddress, partnerAddrs, conf) + self.__data = {} + + @staticmethod + def __check_requirements(old_value, **kwargs): + return ('prevExist' not in kwargs or bool(kwargs['prevExist']) == bool(old_value)) and \ + ('prevValue' not in kwargs or old_value and old_value['value'] == kwargs['prevValue']) and \ + (not kwargs.get('prevIndex') or old_value and old_value['index'] == kwargs['prevIndex']) + + def set_retry_timeout(self, retry_timeout): + self.__retry_timeout = retry_timeout + + def retry(self, func, *args, **kwargs): + event = threading.Event() + ret = {'result': None, 'error': -1} + + def callback(result, error): + ret.update(result=result, error=error) + event.set() + + kwargs['callback'] = callback + timeout = kwargs.pop('timeout', None) or self.__retry_timeout + deadline = timeout and time.time() + timeout + + while True: + event.clear() + func(*args, **kwargs) + event.wait(timeout) + if ret['error'] == FAIL_REASON.SUCCESS: + return ret['result'] + elif ret['error'] == FAIL_REASON.REQUEST_DENIED: + break + elif deadline: + timeout = deadline - time.time() + if timeout <= 0: + break + time.sleep(1) + return False + + @replicated + def _set(self, key, value, **kwargs): + old_value = self.__data.get(key, {}) + if not self.__check_requirements(old_value, **kwargs): + return False + + if old_value and old_value['created'] != value['created']: + value['created'] = value['updated'] + value['index'] = self._SyncObj__raftLastApplied + 1 + + self.__data[key] = value + if self.__on_set: + self.__on_set(key, value) + return True + + def set(self, key, value, ttl=None, **kwargs): + old_value = self.__data.get(key, {}) + if not self.__check_requirements(old_value, **kwargs): + return False + + value = {'value': value, 'updated': time.time()} + value['created'] = old_value.get('created', value['updated']) + if ttl: + value['expire'] = value['updated'] + ttl + return self.retry(self._set, key, value, **kwargs) + + def __pop(self, key): + self.__data.pop(key) + if self.__on_delete: + self.__on_delete(key) + + @replicated + def _delete(self, key, recursive=False, **kwargs): + if recursive: + for k in list(self.__data.keys()): + if k.startswith(key): + self.__pop(k) + elif not self.__check_requirements(self.__data.get(key, {}), **kwargs): + return False + else: + self.__pop(key) + return True + + def delete(self, key, recursive=False, **kwargs): + if not recursive and not self.__check_requirements(self.__data.get(key, {}), **kwargs): + return False + return self.retry(self._delete, key, recursive=recursive, **kwargs) + + @staticmethod + def __values_match(old, new): + return all(old.get(n) == new.get(n) for n in ('created', 'updated', 'expire', 'value')) + + @replicated + def _expire(self, key, value, callback=None): + current = self.__data.get(key) + if current and self.__values_match(current, value): + self.__pop(key) + + def __expire_keys(self): + for key, value in self.__data.items(): + if value and 'expire' in value and value['expire'] <= time.time() and \ + not (key in self.__limb and self.__values_match(self.__limb[key], value)): + self.__limb[key] = value + + def callback(*args): + if key in self.__limb and self.__values_match(self.__limb[key], value): + self.__limb.pop(key) + self._expire(key, value, callback=callback) + + def get(self, key, recursive=False): + if not recursive: + return self.__data.get(key) + return {k: v for k, v in self.__data.items() if k.startswith(key)} + + def _onTick(self, timeToWait=0.0): + # The SyncObj starts applying the local log only when there is at least one node connected. + # We want to change this behavior and apply the local log even when there is nobody except us. + # It gives us at least some picture about the last known cluster state. + if self.__early_apply_local_log and not self.applied_local_log and self._SyncObj__needLoadDumpFile: + self._SyncObj__raftCommitIndex = self._SyncObj__getCurrentLogIndex() + self._SyncObj__raftCurrentTerm = self._SyncObj__getCurrentLogTerm() + + super(KVStoreTTL, self)._onTick(timeToWait) + + # The SyncObj calls onReady callback only when cluster got the leader and is ready for writes. + # In some cases for us it is safe to "signal" the Raft object when the local log is fully applied. + # We are using the `applied_local_log` property for that, but not calling the callback function. + if self.__early_apply_local_log and not self.applied_local_log and self._SyncObj__raftCommitIndex != 1 and \ + self._SyncObj__raftLastApplied == self._SyncObj__raftCommitIndex: + self.applied_local_log = True + + if self._isLeader(): + self.__expire_keys() + else: + self.__limb.clear() + + +class Raft(AbstractDCS): + + def __init__(self, config): + super(Raft, self).__init__(config) + self._ttl = int(config.get('ttl') or 30) + + self_addr = None if self._ctl else config.get('self_addr') + template = os.path.join(config.get('data_dir', ''), self_addr or '') + files = {'journalFile': template + '.journal', 'fullDumpFile': template + '.dump'} if self_addr else {} + + ready_event = threading.Event() + conf = SyncObjConf(commandsWaitLeader=False, appendEntriesUseBatch=False, onReady=ready_event.set, + dynamicMembershipChange=True, **files) + self._sync_obj = KVStoreTTL(self_addr, config.get('partner_addrs', []), conf, self._on_set, self._on_delete) + while True: + ready_event.wait(5) + if ready_event.isSet() or self._sync_obj.applied_local_log: + break + else: + logger.info('waiting on raft') + self._sync_obj.forceLogCompaction() + self.set_retry_timeout(int(config.get('retry_timeout') or 10)) + + def _on_set(self, key, value): + leader = (self._sync_obj.get(self.leader_path) or {}).get('value') + if key == value['created'] == value['updated'] and \ + (key.startswith(self.members_path) or key == self.leader_path and leader != self._name) or \ + key == self.leader_optime_path and leader != self._name or key in (self.config_path, self.sync_path): + self.event.set() + + def _on_delete(self, key): + if key == self.leader_path: + self.event.set() + + def set_ttl(self, ttl): + self._ttl = ttl + + @property + def ttl(self): + return self._ttl + + def set_retry_timeout(self, retry_timeout): + self._sync_obj.set_retry_timeout(retry_timeout) + + @staticmethod + def member(key, value): + return Member.from_node(value['index'], os.path.basename(key), None, value['value']) + + def _load_cluster(self): + prefix = self.client_path('') + response = self._sync_obj.get(prefix, recursive=True) + if not response: + return Cluster(None, None, None, None, [], None, None, None) + nodes = {os.path.relpath(key, prefix).replace('\\', '/'): value for key, value in response.items()} + + # get initialize flag + initialize = nodes.get(self._INITIALIZE) + initialize = initialize and initialize['value'] + + # get global dynamic configuration + config = nodes.get(self._CONFIG) + config = config and ClusterConfig.from_node(config['index'], config['value']) + + # get timeline history + history = nodes.get(self._HISTORY) + history = history and TimelineHistory.from_node(history['index'], history['value']) + + # get last leader operation + last_leader_operation = nodes.get(self._LEADER_OPTIME) + last_leader_operation = 0 if last_leader_operation is None else int(last_leader_operation['value']) + + # get list of members + members = [self.member(k, n) for k, n in nodes.items() if k.startswith(self._MEMBERS) and k.count('/') == 1] + + # get leader + leader = nodes.get(self._LEADER) + if leader: + member = Member(-1, leader['value'], None, {}) + member = ([m for m in members if m.name == leader['value']] or [member])[0] + leader = Leader(leader['index'], None, member) + + # failover key + failover = nodes.get(self._FAILOVER) + if failover: + failover = Failover.from_node(failover['index'], failover['value']) + + # get synchronization state + sync = nodes.get(self._SYNC) + sync = SyncState.from_node(sync and sync['index'], sync and sync['value']) + + return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history) + + def _write_leader_optime(self, last_operation): + return self._sync_obj.set(self.leader_optime_path, last_operation, timeout=1) + + def _update_leader(self): + ret = self._sync_obj.set(self.leader_path, self._name, ttl=self._ttl, prevValue=self._name) + if not ret and self._sync_obj.get(self.leader_path) is None: + ret = self.attempt_to_acquire_leader() + return ret + + def attempt_to_acquire_leader(self, permanent=False): + return self._sync_obj.set(self.leader_path, self._name, prevExist=False, + ttl=None if permanent else self._ttl) + + def set_failover_value(self, value, index=None): + return self._sync_obj.set(self.failover_path, value, prevIndex=index) + + def set_config_value(self, value, index=None): + return self._sync_obj.set(self.config_path, value, prevIndex=index) + + def touch_member(self, data, permanent=False): + data = json.dumps(data, separators=(',', ':')) + return self._sync_obj.set(self.member_path, data, None if permanent else self._ttl, timeout=2) + + def take_leader(self): + return self._sync_obj.set(self.leader_path, self._name, ttl=self._ttl) + + def initialize(self, create_new=True, sysid=''): + return self._sync_obj.set(self.initialize_path, sysid, prevExist=(not create_new)) + + def _delete_leader(self): + return self._sync_obj.delete(self.leader_path, prevValue=self._name, timeout=1) + + def cancel_initialization(self): + return self._sync_obj.delete(self.initialize_path) + + def delete_cluster(self): + return self._sync_obj.delete(self.client_path(''), recursive=True) + + def set_history_value(self, value): + return self._sync_obj.set(self.history_path, value) + + def set_sync_state_value(self, value, index=None): + return self._sync_obj.set(self.sync_path, value, prevIndex=index) + + def delete_sync_state(self, index=None): + return self._sync_obj.delete(self.sync_path, prevIndex=index) + + def watch(self, leader_index, timeout): + try: + return super(Raft, self).watch(leader_index, timeout) + finally: + self.event.clear() diff --git a/patroni/raft_controller.py b/patroni/raft_controller.py new file mode 100644 index 00000000..5a9ae704 --- /dev/null +++ b/patroni/raft_controller.py @@ -0,0 +1,34 @@ +import logging +import os + +from patroni.daemon import AbstractPatroniDaemon, abstract_main +from patroni.dcs.raft import KVStoreTTL +from pysyncobj import SyncObjConf + +logger = logging.getLogger(__name__) + + +class RaftController(AbstractPatroniDaemon): + + def __init__(self, config): + super(RaftController, self).__init__(config) + + raft_config = self.config.get('raft') + self_addr = raft_config['self_addr'] + template = os.path.join(raft_config.get('data_dir', ''), self_addr) + self._syncobj_config = SyncObjConf(autoTick=False, appendEntriesUseBatch=False, dynamicMembershipChange=True, + journalFile=template + '.journal', fullDumpFile=template + '.dump') + self._raft = KVStoreTTL(self_addr, raft_config.get('partner_addrs', []), self._syncobj_config) + + def _run_cycle(self): + try: + self._raft.doTick(self._syncobj_config.autoTickPeriod) + except Exception: + logger.exception('doTick') + + def _shutdown(self): + self._raft.destroy() + + +def main(): + abstract_main(RaftController) diff --git a/patroni_raft_controller.py b/patroni_raft_controller.py new file mode 100755 index 00000000..f0eaabde --- /dev/null +++ b/patroni_raft_controller.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python +from patroni.raft_controller import main + + +if __name__ == '__main__': + main() diff --git a/postgres0.yml b/postgres0.yml index bac56b1b..610bab88 100644 --- a/postgres0.yml +++ b/postgres0.yml @@ -30,6 +30,13 @@ etcd: #It is possible to change this behavior through by setting: #use_proxies: true +#raft: +# data_dir: . +# self_addr: 127.0.0.1:2222 +# partner_addrs: +# - 127.0.0.1:2223 +# - 127.0.0.1:2224 + bootstrap: # this section will be written into Etcd:///config after initializing new cluster # and all other cluster members will use it as a `global configuration` diff --git a/postgres1.yml b/postgres1.yml index 2bc12f1a..3e379cd8 100644 --- a/postgres1.yml +++ b/postgres1.yml @@ -30,6 +30,13 @@ etcd: #It is possible to change this behavior through by setting: #use_proxies: true +#raft: +# data_dir: . +# self_addr: 127.0.0.1:2223 +# partner_addrs: +# - 127.0.0.1:2222 +# - 127.0.0.1:2224 + bootstrap: # this section will be written into Etcd:///config after initializing new cluster # and all other cluster members will use it as a `global configuration` diff --git a/postgres2.yml b/postgres2.yml index e1f29b97..b5bb863f 100644 --- a/postgres2.yml +++ b/postgres2.yml @@ -30,6 +30,13 @@ etcd: #It is possible to change this behavior through by setting: #use_proxies: true +#raft: +# data_dir: . +# self_addr: 127.0.0.1:2224 +# partner_addrs: +# - 127.0.0.1:2222 +# - 127.0.0.1:2223 + bootstrap: # this section will be written into Etcd:///config after initializing new cluster # and all other cluster members will use it as a `global configuration` diff --git a/requirements.txt b/requirements.txt index 79298af9..5c30d3a0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,5 +8,6 @@ python-consul>=0.7.1 click>=4.1 prettytable>=0.7 python-dateutil +pysyncobj>=0.3.5 psutil>=2.0.0 cdiff diff --git a/setup.py b/setup.py index f536fa49..9a0858a0 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ KEYWORDS = 'etcd governor patroni postgresql postgres ha haproxy confd' +\ ' zookeeper exhibitor consul streaming replication kubernetes k8s' EXTRAS_REQUIRE = {'aws': ['boto'], 'etcd': ['python-etcd'], 'consul': ['python-consul'], - 'exhibitor': ['kazoo'], 'zookeeper': ['kazoo'], 'kubernetes': []} + 'exhibitor': ['kazoo'], 'zookeeper': ['kazoo'], 'kubernetes': [], 'raft': ['pysyncobj']} COVERAGE_XML = True COVERAGE_HTML = False @@ -51,6 +51,7 @@ CLASSIFIERS = [ CONSOLE_SCRIPTS = ['patroni = patroni:main', 'patronictl = patroni.ctl:ctl', + 'patroni_raft_controller = patroni.raft_controller:main', "patroni_wale_restore = patroni.scripts.wale_restore:main", "patroni_aws = patroni.scripts.aws:main"] diff --git a/tests/test_config.py b/tests/test_config.py index b7bf9aba..702026df 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -55,6 +55,7 @@ class TestConfig(unittest.TestCase): 'PATRONI_ZOOKEEPER_HOSTS': "'host1:2181','host2:2181'", 'PATRONI_EXHIBITOR_HOSTS': 'host1,host2', 'PATRONI_EXHIBITOR_PORT': '8181', + 'PATRONI_RAFT_PARTNER_ADDRS': "'host1:1234','host2:1234'", 'PATRONI_foo_HOSTS': '[host1,host2', # Exception in parse_list 'PATRONI_SUPERUSER_USERNAME': 'postgres', 'PATRONI_SUPERUSER_PASSWORD': 'zalando', diff --git a/tests/test_patroni.py b/tests/test_patroni.py index 42370253..db1f69fa 100644 --- a/tests/test_patroni.py +++ b/tests/test_patroni.py @@ -167,7 +167,7 @@ class TestPatroni(unittest.TestCase): def test_reload_config(self): self.p.reload_config() self.p.get_tags = Mock(side_effect=Exception) - self.p.reload_config() + self.p.reload_config(local=True) def test_nosync(self): self.p.tags['nosync'] = True diff --git a/tests/test_raft.py b/tests/test_raft.py new file mode 100644 index 00000000..eceef8b6 --- /dev/null +++ b/tests/test_raft.py @@ -0,0 +1,144 @@ +import os +import unittest +import time + +from mock import Mock, patch +from patroni.dcs.raft import DynMemberSyncObj, KVStoreTTL, Raft, SyncObjUtility +from pysyncobj import SyncObjConf, FAIL_REASON + + +@patch('pysyncobj.tcp_server.TcpServer.bind', Mock()) +class TestDynMemberSyncObj(unittest.TestCase): + + @patch('pysyncobj.tcp_server.TcpServer.bind', Mock()) + def setUp(self): + self.conf = SyncObjConf(appendEntriesUseBatch=False, dynamicMembershipChange=True, autoTick=False) + self.so = DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf) + + @patch.object(SyncObjUtility, 'sendMessage') + def test_add_member(self, mock_send_message): + mock_send_message.return_value = [{'addr': '127.0.0.1:1235'}, {'addr': '127.0.0.1:1236'}] + mock_send_message.ver = 0 + DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf) + self.conf.dynamicMembershipChange = False + DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf) + + def test___onUtilityMessage(self): + self.so._SyncObj__encryptor = Mock() + mock_conn = Mock() + mock_conn.sendRandKey = None + self.so._SyncObj__transport._onIncomingMessageReceived(mock_conn, 'randkey') + self.so._SyncObj__transport._onIncomingMessageReceived(mock_conn, ['members']) + self.so._SyncObj__transport._onIncomingMessageReceived(mock_conn, ['status']) + + def test__SyncObj__doChangeCluster(self): + self.so._SyncObj__doChangeCluster(['add', '127.0.0.1:1236']) + + def test_utility(self): + utility = SyncObjUtility(['127.0.0.1:1235'], self.conf) + utility.setPartnerNode(list(utility._SyncObj__otherNodes)[0]) + utility.sendMessage(['members']) + utility._onMessageReceived(0, '') + + +class TestKVStoreTTL(unittest.TestCase): + + def setUp(self): + self.conf = SyncObjConf(appendEntriesUseBatch=False, appendEntriesPeriod=0.001, journalFile='foo.journal', + raftMinTimeout=0.004, raftMaxTimeout=0.005, autoTickPeriod=0.001) + callback = Mock() + callback.replicated = False + self.so = KVStoreTTL('127.0.0.1:1234', [], self.conf, on_set=callback, on_delete=callback) + self.so.set_retry_timeout(10) + + @staticmethod + def destroy(so): + so.destroy() + so._SyncObj__thread.join() + + def tearDown(self): + if self.so: + self.destroy(self.so) + if os.path.exists('foo.journal'): + os.unlink('foo.journal') + + def test_set(self): + self.assertTrue(self.so.set('foo', 'bar', prevExist=False, ttl=30)) + self.assertFalse(self.so.set('foo', 'bar', prevExist=False, ttl=30)) + self.assertFalse(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1}, prevValue='')) + self.assertTrue(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1})) + + def test_delete(self): + self.so.set('foo', 'bar') + self.so.set('fooo', 'bar') + self.assertFalse(self.so.delete('foo', prevValue='buz')) + self.assertFalse(self.so.delete('foo', prevValue='bar', timeout=0.00001)) + self.assertFalse(self.so.delete('foo', prevValue='bar')) + self.assertTrue(self.so.delete('foo', recursive=True)) + self.assertFalse(self.so.retry(self.so._delete, 'foo', prevValue='')) + + def test_expire(self): + self.so.set('foo', 'bar', ttl=0.001) + time.sleep(1) + self.assertIsNone(self.so.get('foo')) + self.assertEqual(self.so.get('foo', recursive=True), {}) + + @patch('time.sleep', Mock()) + def test_retry(self): + return_values = [FAIL_REASON.QUEUE_FULL, FAIL_REASON.SUCCESS, FAIL_REASON.REQUEST_DENIED] + + def test(callback): + callback(True, return_values.pop(0)) + self.assertTrue(self.so.retry(test)) + self.assertFalse(self.so.retry(test)) + + def test_on_ready_override(self): + self.assertTrue(self.so.set('foo', 'bar')) + self.destroy(self.so) + self.so = None + self.conf.onReady = Mock() + self.conf.autoTick = False + so = KVStoreTTL('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf) + so.doTick(0) + so.destroy() + + +class TestRaft(unittest.TestCase): + + def test_raft(self): + raft = Raft({'ttl': 30, 'scope': 'test', 'name': 'pg', 'self_addr': '127.0.0.1:1234', 'retry_timeout': 10}) + raft.set_retry_timeout(20) + raft.set_ttl(60) + self.assertTrue(raft.touch_member('')) + self.assertTrue(raft.initialize()) + self.assertTrue(raft.cancel_initialization()) + self.assertTrue(raft.set_config_value('{}')) + self.assertTrue(raft.write_sync_state('foo', 'bar')) + self.assertTrue(raft.update_leader('1')) + self.assertTrue(raft.manual_failover('foo', 'bar')) + raft.get_cluster() + self.assertTrue(raft.delete_sync_state()) + self.assertTrue(raft.delete_leader()) + self.assertTrue(raft.set_history_value('')) + self.assertTrue(raft.delete_cluster()) + raft.get_cluster() + self.assertTrue(raft.take_leader()) + raft.watch(None, 0.001) + raft._sync_obj.destroy() + raft._sync_obj._SyncObj__thread.join() + + def tearDown(self): + for f in ('journal', 'dump'): + f = '127.0.0.1:1234.' + f + if os.path.exists(f): + os.unlink(f) + + def setUp(self): + self.tearDown() + + @patch('patroni.dcs.raft.KVStoreTTL') + @patch('threading.Event') + def test_init(self, mock_event, mock_kvstore): + mock_kvstore.return_value.applied_local_log = False + mock_event.return_value.isSet.side_effect = [False, True] + self.assertIsNotNone(Raft({'ttl': 30, 'scope': 'test', 'name': 'pg', 'patronictl': True})) diff --git a/tests/test_raft_controller.py b/tests/test_raft_controller.py new file mode 100644 index 00000000..92c4d1d4 --- /dev/null +++ b/tests/test_raft_controller.py @@ -0,0 +1,46 @@ +import logging +import os +import unittest + +from mock import Mock, patch +from pysyncobj import SyncObj +from patroni.config import Config +from patroni.raft_controller import RaftController, main as _main + +from . import SleepException + + +class TestPatroniRaftController(unittest.TestCase): + + SELF_ADDR = '127.0.0.1:5360' + + def remove_files(self): + for f in ('journal', 'dump'): + f = self.SELF_ADDR + '.' + f + if os.path.exists(f): + os.unlink(f) + + @patch('pysyncobj.tcp_server.TcpServer.bind', Mock()) + def setUp(self): + self._handlers = logging.getLogger().handlers[:] + self.remove_files() + os.environ['PATRONI_RAFT_SELF_ADDR'] = self.SELF_ADDR + config = Config('postgres0.yml', validator=None) + self.rc = RaftController(config) + + def tearDown(self): + logging.getLogger().handlers[:] = self._handlers + self.remove_files() + + def test_reload_config(self): + self.rc.reload_config() + + @patch('logging.Logger.error', Mock(side_effect=SleepException)) + @patch.object(SyncObj, 'doTick', Mock(side_effect=Exception)) + def test_run(self): + self.assertRaises(SleepException, self.rc.run) + self.rc.shutdown() + + @patch('sys.argv', ['patroni']) + def test_patroni_raft_controller_main(self): + self.assertRaises(SystemExit, _main)