PoC: Patroni on pure RAFT (#375)

* new node can join the cluster dynamically and become a part of consensus
 * it is also possible to join only Patroni cluster (without adding the node to the raft), just comment or remove `raft.self_addr` for that
 * when the node joins the cluster it is using values from `raft.partner_addrs` only for initial discovery.
* It is possible to run Patroni and Postgres on two nodes plus one node with `patroni_raft_controller` (without Patroni and Postgres). In such setup one can temporarily lose one node without affecting the primary.
This commit is contained in:
Alexander Kukushkin
2020-07-29 15:34:44 +02:00
committed by GitHub
parent c42d507b82
commit bfbc4860d5
19 changed files with 884 additions and 89 deletions

View File

@@ -21,6 +21,8 @@ matrix:
env: DCS="exhibitor" TEST_SUITE="behave"
- python: "3.6"
env: DCS="consul" TEST_SUITE="behave"
- python: "3.6"
env: DCS="raft" TEST_SUITE="behave"
- python: "3.6"
env: DCS="kubernetes" TEST_SUITE="behave"
branches:
@@ -102,6 +104,10 @@ install:
done&
}
function get_raft() {
return 0
}
attempt_num=1
until get_${DCS}; do
[[ $attempt_num -ge 3 ]] && exit 1

View File

@@ -87,6 +87,13 @@ Kubernetes
- **PATRONI\_KUBERNETES\_POD\_IP**: (optional) IP address of the pod Patroni is running in. This value is required when `PATRONI_KUBERNETES_USE_ENDPOINTS` is enabled and is used to populate the leader endpoint subsets when the pod's PostgreSQL is promoted.
- **PATRONI\_KUBERNETES\_PORTS**: (optional) if the Service object has the name for the port, the same name must appear in the Endpoint object, otherwise service won't work. For example, if your service is defined as ``{Kind: Service, spec: {ports: [{name: postgresql, port: 5432, targetPort: 5432}]}}``, then you have to set ``PATRONI_KUBERNETES_PORTS='[{"name": "postgresql", "port": 5432}]'`` and Patroni will use it for updating subsets of the leader Endpoint. This parameter is used only if `PATRONI_KUBERNETES_USE_ENDPOINTS` is set.
Raft
----
- **PATRONI\_RAFT\_SELF\_ADDR**: ``ip:port`` to listen on for Raft connections. If not set, the node will not participate in consensus.
- **PATRONI\_RAFT\_PARTNER\_ADDRS**: list of other Patroni nodes in the cluster in format ``"'ip1:port1','ip2:port2'"``. It is important to quote every single entity!
- **PATRONI\_RAFT\_DATA\_DIR**: directory where to store Raft log and snapshot. If not specified the current working directory is used.
PostgreSQL
----------
- **PATRONI\_POSTGRESQL\_LISTEN**: IP address + port that Postgres listens to. Multiple comma-separated addresses are permitted, as long as the port component is appended after to the last one with a colon, i.e. ``listen: 127.0.0.1,127.0.0.2:5432``. Patroni will use the first address from this list to establish local connections to the PostgreSQL node.

View File

@@ -159,6 +159,35 @@ Kubernetes
- **pod\_ip**: (optional) IP address of the pod Patroni is running in. This value is required when `use_endpoints` is enabled and is used to populate the leader endpoint subsets when the pod's PostgreSQL is promoted.
- **ports**: (optional) if the Service object has the name for the port, the same name must appear in the Endpoint object, otherwise service won't work. For example, if your service is defined as ``{Kind: Service, spec: {ports: [{name: postgresql, port: 5432, targetPort: 5432}]}}``, then you have to set ``kubernetes.ports: [{"name": "postgresql", "port": 5432}]`` and Patroni will use it for updating subsets of the leader Endpoint. This parameter is used only if `kubernetes.use_endpoints` is set.
Raft
----
- **self\_addr**: ``ip:port`` to listen on for Raft connections. If not set, the node will not participate in consensus.
- **partner\_addrs**: list of other Patroni nodes in the cluster in format: ['ip1:port', 'ip2:port', 'etc...']
- **data\_dir**: directory where to store Raft log and snapshot. If not specified the current working directory is used.
Short FAQ about Raft implementation
- Q: How to list all the nodes providing consensus?
A: ``syncobj_admin -conn host:port`` -status where the host:port is the address of one of the cluster nodes
- Q: Node that was a part of consensus and has gone and I can't reuse the same IP for other node. How to remove this node from the consensus?
A: ``syncobj_admin -conn host:port -remove host2:port2`` where the ``host2:port2`` is the address of the node you want to remove from consensus.
- Q: Where to get the ``syncobj_admin`` utility?
A: It is installed together with ``pysyncobj`` module (python RAFT implementation), which is Patroni dependancy.
- Q: it is possible to run Patroni node without adding in to the consensus?
A: Yes, just comment out or remove ``raft.self_addr`` from Patroni configuration.
- Q: It is possible to run Patroni and PostgreSQL only on two nodes?
A: Yes, on the third node you can run ``patroni_raft_controller`` (without Patroni and PostgreSQL). In such setup one can temporary loose one node without affecting the primary.
.. _postgresql_settings:
PostgreSQL

View File

@@ -165,6 +165,11 @@ class PatroniController(AbstractController):
config = yaml.safe_load(f)
config.pop('etcd', None)
raft_port = os.environ.get('RAFT_PORT')
if raft_port:
os.environ['RAFT_PORT'] = str(int(raft_port) + 1)
config['raft'] = {'data_dir': self._output_dir, 'self_addr': 'localhost:' + os.environ['RAFT_PORT']}
host = config['postgresql']['listen'].split(':')[0]
config['postgresql']['listen'] = config['postgresql']['connect_address'] = '{0}:{1}'.format(host, self.__PORT)
@@ -531,6 +536,48 @@ class ExhibitorController(ZooKeeperController):
os.environ.update({'PATRONI_EXHIBITOR_HOSTS': 'localhost', 'PATRONI_EXHIBITOR_PORT': '8181'})
class RaftController(AbstractDcsController):
CONTROLLER_ADDR = 'localhost:1234'
def __init__(self, context):
super(RaftController, self).__init__(context)
os.environ.update(PATRONI_RAFT_PARTNER_ADDRS="'" + self.CONTROLLER_ADDR + "'", RAFT_PORT='1234')
self._raft = None
def _start(self):
env = os.environ.copy()
del env['PATRONI_RAFT_PARTNER_ADDRS']
env['PATRONI_RAFT_SELF_ADDR'] = self.CONTROLLER_ADDR
env['PATRONI_RAFT_DATA_DIR'] = self._work_directory
return subprocess.Popen([sys.executable, '-m', 'coverage', 'run',
'--source=patroni', '-p', 'patroni_raft_controller.py'],
stdout=self._log, stderr=subprocess.STDOUT, env=env)
def query(self, key, scope='batman'):
ret = self._raft.get(self.path(key, scope))
return ret and ret['value']
def set(self, key, value):
self._raft.set(self.path(key), value)
def cleanup_service_tree(self):
from patroni.dcs.raft import KVStoreTTL
from pysyncobj import SyncObjConf
if self._raft:
self._raft.destroy()
self._raft._SyncObj__thread.join()
self.stop()
os.makedirs(self._work_directory)
self.start()
ready_event = threading.Event()
conf = SyncObjConf(appendEntriesUseBatch=False, dynamicMembershipChange=True, onReady=ready_event.set)
self._raft = KVStoreTTL(None, [self.CONTROLLER_ADDR], conf)
ready_event.wait()
class PatroniPoolController(object):
BACKUP_SCRIPT = [sys.executable, 'features/backup_create.py']

View File

@@ -4,7 +4,8 @@ import signal
import sys
import time
from patroni.version import __version__
from .daemon import AbstractPatroniDaemon, abstract_main
from .version import __version__
logger = logging.getLogger(__name__)
@@ -12,23 +13,19 @@ PATRONI_ENV_PREFIX = 'PATRONI_'
KUBERNETES_ENV_PREFIX = 'KUBERNETES_'
class Patroni(object):
class Patroni(AbstractPatroniDaemon):
def __init__(self, conf):
def __init__(self, config):
from patroni.api import RestApiServer
from patroni.dcs import get_dcs
from patroni.ha import Ha
from patroni.log import PatroniLogger
from patroni.postgresql import Postgresql
from patroni.request import PatroniRequest
from patroni.watchdog import Watchdog
self.setup_signal_handlers()
super(Patroni, self).__init__(config)
self.version = __version__
self.logger = PatroniLogger()
self.config = conf
self.logger.reload_config(self.config.get('log', {}))
self.dcs = get_dcs(self.config)
self.watchdog = Watchdog(self.config)
self.load_dynamic_configuration()
@@ -71,14 +68,14 @@ class Patroni(object):
def nosync(self):
return bool(self.tags.get('nosync', False))
def reload_config(self, sighup=False):
def reload_config(self, sighup=False, local=False):
try:
self.tags = self.get_tags()
self.logger.reload_config(self.config.get('log', {}))
self.watchdog.reload_config(self.config)
if sighup:
super(Patroni, self).reload_config(sighup, local)
if local:
self.tags = self.get_tags()
self.request.reload_config(self.config)
self.api.reload_config(self.config['restapi'])
self.api.reload_config(self.config['restapi'])
self.watchdog.reload_config(self.config)
self.postgresql.reload_config(self.config['postgresql'], sighup)
self.dcs.reload_config(self.config)
except Exception:
@@ -88,15 +85,6 @@ class Patroni(object):
def replicatefrom(self):
return self.tags.get('replicatefrom')
def sighup_handler(self, *args):
self._received_sighup = True
def sigterm_handler(self, *args):
with self._sigterm_lock:
if not self._received_sigterm:
self._received_sigterm = True
sys.exit()
@property
def noloadbalance(self):
return bool(self.tags.get('noloadbalance', False))
@@ -114,48 +102,24 @@ class Patroni(object):
elif self.ha.watch(nap_time):
self.next_run = time.time()
@property
def received_sigterm(self):
with self._sigterm_lock:
return self._received_sigterm
def run(self):
self.api.start()
self.logger.start()
self.next_run = time.time()
super(Patroni, self).run()
while not self.received_sigterm:
if self._received_sighup:
self._received_sighup = False
if self.config.reload_local_configuration():
self.reload_config(True)
else:
self.postgresql.config.reload_config(self.config['postgresql'], True)
def _run_cycle(self):
logger.info(self.ha.run_cycle())
logger.info(self.ha.run_cycle())
if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \
and self.config.set_dynamic_configuration(self.dcs.cluster.config):
self.reload_config()
if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \
and self.config.set_dynamic_configuration(self.dcs.cluster.config):
self.reload_config()
if self.postgresql.role != 'uninitialized':
self.config.save_cache()
if self.postgresql.role != 'uninitialized':
self.config.save_cache()
self.schedule_next_run()
self.schedule_next_run()
def setup_signal_handlers(self):
from threading import Lock
self._received_sighup = False
self._sigterm_lock = Lock()
self._received_sigterm = False
if os.name != 'nt':
signal.signal(signal.SIGHUP, self.sighup_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
def shutdown(self):
with self._sigterm_lock:
self._received_sigterm = True
def _shutdown(self):
try:
self.api.shutdown()
except Exception:
@@ -164,43 +128,14 @@ class Patroni(object):
self.ha.shutdown()
except Exception:
logger.exception('Exception during Ha.shutdown')
self.logger.shutdown()
def patroni_main():
import argparse
from multiprocessing import freeze_support
from patroni.config import Config, ConfigParseError
from patroni.validator import schema
freeze_support()
parser = argparse.ArgumentParser()
parser.add_argument('--version', action='version', version='%(prog)s {0}'.format(__version__))
parser.add_argument('--validate-config', action='store_true', help='Run config validator and exit')
parser.add_argument('configfile', nargs='?', default='',
help='Patroni may also read the configuration from the {0} environment variable'
.format(Config.PATRONI_CONFIG_VARIABLE))
args = parser.parse_args()
try:
if args.validate_config:
conf = Config(args.configfile, validator=schema)
sys.exit()
else:
conf = Config(args.configfile)
except ConfigParseError as e:
if e.value:
print(e.value)
parser.print_help()
sys.exit(1)
patroni = Patroni(conf)
try:
patroni.run()
except KeyboardInterrupt:
pass
finally:
patroni.shutdown()
abstract_main(Patroni, schema)
def fatal(string, *args):

View File

@@ -293,6 +293,10 @@ class Config(object):
logger.exception('Exception when parsing list %s', value)
return None
_set_section_values('raft', ['data_dir', 'self_addr', 'partner_addrs'])
if 'raft' in ret and 'partner_addrs' in ret['raft']:
ret['raft']['partner_addrs'] = _parse_list(ret['raft']['partner_addrs'])
for param in list(os.environ.keys()):
if param.startswith(PATRONI_ENV_PREFIX):
# PATRONI_(ETCD|CONSUL|ZOOKEEPER|EXHIBITOR|...)_(HOSTS?|PORT|..)

104
patroni/daemon.py Normal file
View File

@@ -0,0 +1,104 @@
import abc
import os
import signal
import six
import sys
from threading import Lock
@six.add_metaclass(abc.ABCMeta)
class AbstractPatroniDaemon(object):
def __init__(self, config):
from patroni.log import PatroniLogger
self.setup_signal_handlers()
self.logger = PatroniLogger()
self.config = config
AbstractPatroniDaemon.reload_config(self, local=True)
def sighup_handler(self, *args):
self._received_sighup = True
def sigterm_handler(self, *args):
with self._sigterm_lock:
if not self._received_sigterm:
self._received_sigterm = True
sys.exit()
def setup_signal_handlers(self):
self._received_sighup = False
self._sigterm_lock = Lock()
self._received_sigterm = False
if os.name != 'nt':
signal.signal(signal.SIGHUP, self.sighup_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
@property
def received_sigterm(self):
with self._sigterm_lock:
return self._received_sigterm
def reload_config(self, sighup=False, local=False):
if local:
self.logger.reload_config(self.config.get('log', {}))
@abc.abstractmethod
def _run_cycle(self):
"""_run_cycle"""
def run(self):
self.logger.start()
while not self.received_sigterm:
if self._received_sighup:
self._received_sighup = False
self.reload_config(True, self.config.reload_local_configuration())
self._run_cycle()
@abc.abstractmethod
def _shutdown(self):
"""_shutdown"""
def shutdown(self):
with self._sigterm_lock:
self._received_sigterm = True
self._shutdown()
self.logger.shutdown()
def abstract_main(cls, validator=None):
import argparse
from .config import Config, ConfigParseError
from .version import __version__
parser = argparse.ArgumentParser()
parser.add_argument('--version', action='version', version='%(prog)s {0}'.format(__version__))
if validator:
parser.add_argument('--validate-config', action='store_true', help='Run config validator and exit')
parser.add_argument('configfile', nargs='?', default='',
help='Patroni may also read the configuration from the {0} environment variable'
.format(Config.PATRONI_CONFIG_VARIABLE))
args = parser.parse_args()
try:
if validator and args.validate_config:
Config(args.configfile, validator=validator)
sys.exit()
config = Config(args.configfile)
except ConfigParseError as e:
if e.value:
print(e.value)
parser.print_help()
sys.exit(1)
controller = cls(config)
try:
controller.run()
except KeyboardInterrupt:
pass
finally:
controller.shutdown()

409
patroni/dcs/raft.py Normal file
View File

@@ -0,0 +1,409 @@
import json
import logging
import os
import threading
import time
from patroni.dcs import AbstractDCS, ClusterConfig, Cluster, Failover, Leader, Member, SyncState, TimelineHistory
from pysyncobj import SyncObj, SyncObjConf, replicated, FAIL_REASON
from pysyncobj.transport import Node, TCPTransport, CONNECTION_STATE
logger = logging.getLogger(__name__)
class MessageNode(Node):
def __init__(self, address):
self.address = address
class UtilityTransport(TCPTransport):
def __init__(self, syncObj, selfNode, otherNodes):
super(UtilityTransport, self).__init__(syncObj, selfNode, otherNodes)
self._selfIsReadonlyNode = False
def _connectIfNecessarySingle(self, node):
pass
def connectionState(self, node):
return self._connections[node].state
def isDisconnected(self, node):
return self.connectionState(node) == CONNECTION_STATE.DISCONNECTED
def connectIfRequiredSingle(self, node):
if self.isDisconnected(node):
return self._connections[node].connect(node.ip, node.port)
def disconnectSingle(self, node):
self._connections[node].disconnect()
class SyncObjUtility(SyncObj):
def __init__(self, otherNodes, conf):
autoTick = conf.autoTick
conf.autoTick = False
super(SyncObjUtility, self).__init__(None, otherNodes, conf, transportClass=UtilityTransport)
conf.autoTick = autoTick
self._SyncObj__transport.setOnMessageReceivedCallback(self._onMessageReceived)
self.__result = None
def setPartnerNode(self, partner):
self.__node = partner
def sendMessage(self, message):
# Abuse the fact that node address is send as a first message
self._SyncObj__transport._selfNode = MessageNode(message)
self._SyncObj__transport.connectIfRequiredSingle(self.__node)
while not self._SyncObj__transport.isDisconnected(self.__node):
self._poller.poll(0.5)
return self.__result
def _onMessageReceived(self, _, message):
self.__result = message
self._SyncObj__transport.disconnectSingle(self.__node)
class MyTCPTransport(TCPTransport):
def _onIncomingMessageReceived(self, conn, message):
if self._syncObj.encryptor and not conn.sendRandKey:
conn.sendRandKey = message
conn.recvRandKey = os.urandom(32)
conn.send(conn.recvRandKey)
return
# Utility messages
if isinstance(message, list) and message[0] == 'members':
conn.send(self._syncObj._get_members())
return True
return super(MyTCPTransport, self)._onIncomingMessageReceived(conn, message)
class DynMemberSyncObj(SyncObj):
def __init__(self, selfAddress, partnerAddrs, conf):
add_self = False
if selfAddress:
utility = SyncObjUtility(partnerAddrs, conf)
for node in utility._SyncObj__otherNodes:
utility.setPartnerNode(node)
response = utility.sendMessage(['members'])
if response:
partnerAddrs = [member['addr'] for member in response if member['addr'] != selfAddress]
add_self = len(partnerAddrs) == len(response)
break
super(DynMemberSyncObj, self).__init__(selfAddress, partnerAddrs, conf, transportClass=MyTCPTransport)
if add_self:
threading.Thread(target=utility.sendMessage, args=(['add', selfAddress],)).start()
def _get_members(self):
ret = [{'addr': node.id, 'leader': node == self._getLeader(),
'status': CONNECTION_STATE.CONNECTED if node in self._SyncObj__connectedNodes
else CONNECTION_STATE.DISCONNECTED} for node in self._SyncObj__otherNodes]
ret.append({'addr': self._SyncObj__selfNode.id, 'leader': self._isLeader(),
'status': CONNECTION_STATE.CONNECTED})
return ret
def _SyncObj__doChangeCluster(self, request, reverse=False):
ret = False
if not self._SyncObj__selfNode or request[0] != 'add' or reverse or request[1] != self._SyncObj__selfNode.id:
ret = super(DynMemberSyncObj, self)._SyncObj__doChangeCluster(request, reverse)
if ret:
self.forceLogCompaction()
return ret
class KVStoreTTL(DynMemberSyncObj):
def __init__(self, selfAddress, partnerAddrs, conf, on_set=None, on_delete=None):
self.__on_set = on_set
self.__on_delete = on_delete
self.__limb = {}
self.__retry_timeout = None
self.__early_apply_local_log = selfAddress is not None
self.applied_local_log = False
super(KVStoreTTL, self).__init__(selfAddress, partnerAddrs, conf)
self.__data = {}
@staticmethod
def __check_requirements(old_value, **kwargs):
return ('prevExist' not in kwargs or bool(kwargs['prevExist']) == bool(old_value)) and \
('prevValue' not in kwargs or old_value and old_value['value'] == kwargs['prevValue']) and \
(not kwargs.get('prevIndex') or old_value and old_value['index'] == kwargs['prevIndex'])
def set_retry_timeout(self, retry_timeout):
self.__retry_timeout = retry_timeout
def retry(self, func, *args, **kwargs):
event = threading.Event()
ret = {'result': None, 'error': -1}
def callback(result, error):
ret.update(result=result, error=error)
event.set()
kwargs['callback'] = callback
timeout = kwargs.pop('timeout', None) or self.__retry_timeout
deadline = timeout and time.time() + timeout
while True:
event.clear()
func(*args, **kwargs)
event.wait(timeout)
if ret['error'] == FAIL_REASON.SUCCESS:
return ret['result']
elif ret['error'] == FAIL_REASON.REQUEST_DENIED:
break
elif deadline:
timeout = deadline - time.time()
if timeout <= 0:
break
time.sleep(1)
return False
@replicated
def _set(self, key, value, **kwargs):
old_value = self.__data.get(key, {})
if not self.__check_requirements(old_value, **kwargs):
return False
if old_value and old_value['created'] != value['created']:
value['created'] = value['updated']
value['index'] = self._SyncObj__raftLastApplied + 1
self.__data[key] = value
if self.__on_set:
self.__on_set(key, value)
return True
def set(self, key, value, ttl=None, **kwargs):
old_value = self.__data.get(key, {})
if not self.__check_requirements(old_value, **kwargs):
return False
value = {'value': value, 'updated': time.time()}
value['created'] = old_value.get('created', value['updated'])
if ttl:
value['expire'] = value['updated'] + ttl
return self.retry(self._set, key, value, **kwargs)
def __pop(self, key):
self.__data.pop(key)
if self.__on_delete:
self.__on_delete(key)
@replicated
def _delete(self, key, recursive=False, **kwargs):
if recursive:
for k in list(self.__data.keys()):
if k.startswith(key):
self.__pop(k)
elif not self.__check_requirements(self.__data.get(key, {}), **kwargs):
return False
else:
self.__pop(key)
return True
def delete(self, key, recursive=False, **kwargs):
if not recursive and not self.__check_requirements(self.__data.get(key, {}), **kwargs):
return False
return self.retry(self._delete, key, recursive=recursive, **kwargs)
@staticmethod
def __values_match(old, new):
return all(old.get(n) == new.get(n) for n in ('created', 'updated', 'expire', 'value'))
@replicated
def _expire(self, key, value, callback=None):
current = self.__data.get(key)
if current and self.__values_match(current, value):
self.__pop(key)
def __expire_keys(self):
for key, value in self.__data.items():
if value and 'expire' in value and value['expire'] <= time.time() and \
not (key in self.__limb and self.__values_match(self.__limb[key], value)):
self.__limb[key] = value
def callback(*args):
if key in self.__limb and self.__values_match(self.__limb[key], value):
self.__limb.pop(key)
self._expire(key, value, callback=callback)
def get(self, key, recursive=False):
if not recursive:
return self.__data.get(key)
return {k: v for k, v in self.__data.items() if k.startswith(key)}
def _onTick(self, timeToWait=0.0):
# The SyncObj starts applying the local log only when there is at least one node connected.
# We want to change this behavior and apply the local log even when there is nobody except us.
# It gives us at least some picture about the last known cluster state.
if self.__early_apply_local_log and not self.applied_local_log and self._SyncObj__needLoadDumpFile:
self._SyncObj__raftCommitIndex = self._SyncObj__getCurrentLogIndex()
self._SyncObj__raftCurrentTerm = self._SyncObj__getCurrentLogTerm()
super(KVStoreTTL, self)._onTick(timeToWait)
# The SyncObj calls onReady callback only when cluster got the leader and is ready for writes.
# In some cases for us it is safe to "signal" the Raft object when the local log is fully applied.
# We are using the `applied_local_log` property for that, but not calling the callback function.
if self.__early_apply_local_log and not self.applied_local_log and self._SyncObj__raftCommitIndex != 1 and \
self._SyncObj__raftLastApplied == self._SyncObj__raftCommitIndex:
self.applied_local_log = True
if self._isLeader():
self.__expire_keys()
else:
self.__limb.clear()
class Raft(AbstractDCS):
def __init__(self, config):
super(Raft, self).__init__(config)
self._ttl = int(config.get('ttl') or 30)
self_addr = None if self._ctl else config.get('self_addr')
template = os.path.join(config.get('data_dir', ''), self_addr or '')
files = {'journalFile': template + '.journal', 'fullDumpFile': template + '.dump'} if self_addr else {}
ready_event = threading.Event()
conf = SyncObjConf(commandsWaitLeader=False, appendEntriesUseBatch=False, onReady=ready_event.set,
dynamicMembershipChange=True, **files)
self._sync_obj = KVStoreTTL(self_addr, config.get('partner_addrs', []), conf, self._on_set, self._on_delete)
while True:
ready_event.wait(5)
if ready_event.isSet() or self._sync_obj.applied_local_log:
break
else:
logger.info('waiting on raft')
self._sync_obj.forceLogCompaction()
self.set_retry_timeout(int(config.get('retry_timeout') or 10))
def _on_set(self, key, value):
leader = (self._sync_obj.get(self.leader_path) or {}).get('value')
if key == value['created'] == value['updated'] and \
(key.startswith(self.members_path) or key == self.leader_path and leader != self._name) or \
key == self.leader_optime_path and leader != self._name or key in (self.config_path, self.sync_path):
self.event.set()
def _on_delete(self, key):
if key == self.leader_path:
self.event.set()
def set_ttl(self, ttl):
self._ttl = ttl
@property
def ttl(self):
return self._ttl
def set_retry_timeout(self, retry_timeout):
self._sync_obj.set_retry_timeout(retry_timeout)
@staticmethod
def member(key, value):
return Member.from_node(value['index'], os.path.basename(key), None, value['value'])
def _load_cluster(self):
prefix = self.client_path('')
response = self._sync_obj.get(prefix, recursive=True)
if not response:
return Cluster(None, None, None, None, [], None, None, None)
nodes = {os.path.relpath(key, prefix).replace('\\', '/'): value for key, value in response.items()}
# get initialize flag
initialize = nodes.get(self._INITIALIZE)
initialize = initialize and initialize['value']
# get global dynamic configuration
config = nodes.get(self._CONFIG)
config = config and ClusterConfig.from_node(config['index'], config['value'])
# get timeline history
history = nodes.get(self._HISTORY)
history = history and TimelineHistory.from_node(history['index'], history['value'])
# get last leader operation
last_leader_operation = nodes.get(self._LEADER_OPTIME)
last_leader_operation = 0 if last_leader_operation is None else int(last_leader_operation['value'])
# get list of members
members = [self.member(k, n) for k, n in nodes.items() if k.startswith(self._MEMBERS) and k.count('/') == 1]
# get leader
leader = nodes.get(self._LEADER)
if leader:
member = Member(-1, leader['value'], None, {})
member = ([m for m in members if m.name == leader['value']] or [member])[0]
leader = Leader(leader['index'], None, member)
# failover key
failover = nodes.get(self._FAILOVER)
if failover:
failover = Failover.from_node(failover['index'], failover['value'])
# get synchronization state
sync = nodes.get(self._SYNC)
sync = SyncState.from_node(sync and sync['index'], sync and sync['value'])
return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
def _write_leader_optime(self, last_operation):
return self._sync_obj.set(self.leader_optime_path, last_operation, timeout=1)
def _update_leader(self):
ret = self._sync_obj.set(self.leader_path, self._name, ttl=self._ttl, prevValue=self._name)
if not ret and self._sync_obj.get(self.leader_path) is None:
ret = self.attempt_to_acquire_leader()
return ret
def attempt_to_acquire_leader(self, permanent=False):
return self._sync_obj.set(self.leader_path, self._name, prevExist=False,
ttl=None if permanent else self._ttl)
def set_failover_value(self, value, index=None):
return self._sync_obj.set(self.failover_path, value, prevIndex=index)
def set_config_value(self, value, index=None):
return self._sync_obj.set(self.config_path, value, prevIndex=index)
def touch_member(self, data, permanent=False):
data = json.dumps(data, separators=(',', ':'))
return self._sync_obj.set(self.member_path, data, None if permanent else self._ttl, timeout=2)
def take_leader(self):
return self._sync_obj.set(self.leader_path, self._name, ttl=self._ttl)
def initialize(self, create_new=True, sysid=''):
return self._sync_obj.set(self.initialize_path, sysid, prevExist=(not create_new))
def _delete_leader(self):
return self._sync_obj.delete(self.leader_path, prevValue=self._name, timeout=1)
def cancel_initialization(self):
return self._sync_obj.delete(self.initialize_path)
def delete_cluster(self):
return self._sync_obj.delete(self.client_path(''), recursive=True)
def set_history_value(self, value):
return self._sync_obj.set(self.history_path, value)
def set_sync_state_value(self, value, index=None):
return self._sync_obj.set(self.sync_path, value, prevIndex=index)
def delete_sync_state(self, index=None):
return self._sync_obj.delete(self.sync_path, prevIndex=index)
def watch(self, leader_index, timeout):
try:
return super(Raft, self).watch(leader_index, timeout)
finally:
self.event.clear()

View File

@@ -0,0 +1,34 @@
import logging
import os
from patroni.daemon import AbstractPatroniDaemon, abstract_main
from patroni.dcs.raft import KVStoreTTL
from pysyncobj import SyncObjConf
logger = logging.getLogger(__name__)
class RaftController(AbstractPatroniDaemon):
def __init__(self, config):
super(RaftController, self).__init__(config)
raft_config = self.config.get('raft')
self_addr = raft_config['self_addr']
template = os.path.join(raft_config.get('data_dir', ''), self_addr)
self._syncobj_config = SyncObjConf(autoTick=False, appendEntriesUseBatch=False, dynamicMembershipChange=True,
journalFile=template + '.journal', fullDumpFile=template + '.dump')
self._raft = KVStoreTTL(self_addr, raft_config.get('partner_addrs', []), self._syncobj_config)
def _run_cycle(self):
try:
self._raft.doTick(self._syncobj_config.autoTickPeriod)
except Exception:
logger.exception('doTick')
def _shutdown(self):
self._raft.destroy()
def main():
abstract_main(RaftController)

6
patroni_raft_controller.py Executable file
View File

@@ -0,0 +1,6 @@
#!/usr/bin/env python
from patroni.raft_controller import main
if __name__ == '__main__':
main()

View File

@@ -30,6 +30,13 @@ etcd:
#It is possible to change this behavior through by setting:
#use_proxies: true
#raft:
# data_dir: .
# self_addr: 127.0.0.1:2222
# partner_addrs:
# - 127.0.0.1:2223
# - 127.0.0.1:2224
bootstrap:
# this section will be written into Etcd:/<namespace>/<scope>/config after initializing new cluster
# and all other cluster members will use it as a `global configuration`

View File

@@ -30,6 +30,13 @@ etcd:
#It is possible to change this behavior through by setting:
#use_proxies: true
#raft:
# data_dir: .
# self_addr: 127.0.0.1:2223
# partner_addrs:
# - 127.0.0.1:2222
# - 127.0.0.1:2224
bootstrap:
# this section will be written into Etcd:/<namespace>/<scope>/config after initializing new cluster
# and all other cluster members will use it as a `global configuration`

View File

@@ -30,6 +30,13 @@ etcd:
#It is possible to change this behavior through by setting:
#use_proxies: true
#raft:
# data_dir: .
# self_addr: 127.0.0.1:2224
# partner_addrs:
# - 127.0.0.1:2222
# - 127.0.0.1:2223
bootstrap:
# this section will be written into Etcd:/<namespace>/<scope>/config after initializing new cluster
# and all other cluster members will use it as a `global configuration`

View File

@@ -8,5 +8,6 @@ python-consul>=0.7.1
click>=4.1
prettytable>=0.7
python-dateutil
pysyncobj>=0.3.5
psutil>=2.0.0
cdiff

View File

@@ -23,7 +23,7 @@ KEYWORDS = 'etcd governor patroni postgresql postgres ha haproxy confd' +\
' zookeeper exhibitor consul streaming replication kubernetes k8s'
EXTRAS_REQUIRE = {'aws': ['boto'], 'etcd': ['python-etcd'], 'consul': ['python-consul'],
'exhibitor': ['kazoo'], 'zookeeper': ['kazoo'], 'kubernetes': []}
'exhibitor': ['kazoo'], 'zookeeper': ['kazoo'], 'kubernetes': [], 'raft': ['pysyncobj']}
COVERAGE_XML = True
COVERAGE_HTML = False
@@ -51,6 +51,7 @@ CLASSIFIERS = [
CONSOLE_SCRIPTS = ['patroni = patroni:main',
'patronictl = patroni.ctl:ctl',
'patroni_raft_controller = patroni.raft_controller:main',
"patroni_wale_restore = patroni.scripts.wale_restore:main",
"patroni_aws = patroni.scripts.aws:main"]

View File

@@ -55,6 +55,7 @@ class TestConfig(unittest.TestCase):
'PATRONI_ZOOKEEPER_HOSTS': "'host1:2181','host2:2181'",
'PATRONI_EXHIBITOR_HOSTS': 'host1,host2',
'PATRONI_EXHIBITOR_PORT': '8181',
'PATRONI_RAFT_PARTNER_ADDRS': "'host1:1234','host2:1234'",
'PATRONI_foo_HOSTS': '[host1,host2', # Exception in parse_list
'PATRONI_SUPERUSER_USERNAME': 'postgres',
'PATRONI_SUPERUSER_PASSWORD': 'zalando',

View File

@@ -167,7 +167,7 @@ class TestPatroni(unittest.TestCase):
def test_reload_config(self):
self.p.reload_config()
self.p.get_tags = Mock(side_effect=Exception)
self.p.reload_config()
self.p.reload_config(local=True)
def test_nosync(self):
self.p.tags['nosync'] = True

144
tests/test_raft.py Normal file
View File

@@ -0,0 +1,144 @@
import os
import unittest
import time
from mock import Mock, patch
from patroni.dcs.raft import DynMemberSyncObj, KVStoreTTL, Raft, SyncObjUtility
from pysyncobj import SyncObjConf, FAIL_REASON
@patch('pysyncobj.tcp_server.TcpServer.bind', Mock())
class TestDynMemberSyncObj(unittest.TestCase):
@patch('pysyncobj.tcp_server.TcpServer.bind', Mock())
def setUp(self):
self.conf = SyncObjConf(appendEntriesUseBatch=False, dynamicMembershipChange=True, autoTick=False)
self.so = DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
@patch.object(SyncObjUtility, 'sendMessage')
def test_add_member(self, mock_send_message):
mock_send_message.return_value = [{'addr': '127.0.0.1:1235'}, {'addr': '127.0.0.1:1236'}]
mock_send_message.ver = 0
DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
self.conf.dynamicMembershipChange = False
DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
def test___onUtilityMessage(self):
self.so._SyncObj__encryptor = Mock()
mock_conn = Mock()
mock_conn.sendRandKey = None
self.so._SyncObj__transport._onIncomingMessageReceived(mock_conn, 'randkey')
self.so._SyncObj__transport._onIncomingMessageReceived(mock_conn, ['members'])
self.so._SyncObj__transport._onIncomingMessageReceived(mock_conn, ['status'])
def test__SyncObj__doChangeCluster(self):
self.so._SyncObj__doChangeCluster(['add', '127.0.0.1:1236'])
def test_utility(self):
utility = SyncObjUtility(['127.0.0.1:1235'], self.conf)
utility.setPartnerNode(list(utility._SyncObj__otherNodes)[0])
utility.sendMessage(['members'])
utility._onMessageReceived(0, '')
class TestKVStoreTTL(unittest.TestCase):
def setUp(self):
self.conf = SyncObjConf(appendEntriesUseBatch=False, appendEntriesPeriod=0.001, journalFile='foo.journal',
raftMinTimeout=0.004, raftMaxTimeout=0.005, autoTickPeriod=0.001)
callback = Mock()
callback.replicated = False
self.so = KVStoreTTL('127.0.0.1:1234', [], self.conf, on_set=callback, on_delete=callback)
self.so.set_retry_timeout(10)
@staticmethod
def destroy(so):
so.destroy()
so._SyncObj__thread.join()
def tearDown(self):
if self.so:
self.destroy(self.so)
if os.path.exists('foo.journal'):
os.unlink('foo.journal')
def test_set(self):
self.assertTrue(self.so.set('foo', 'bar', prevExist=False, ttl=30))
self.assertFalse(self.so.set('foo', 'bar', prevExist=False, ttl=30))
self.assertFalse(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1}, prevValue=''))
self.assertTrue(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1}))
def test_delete(self):
self.so.set('foo', 'bar')
self.so.set('fooo', 'bar')
self.assertFalse(self.so.delete('foo', prevValue='buz'))
self.assertFalse(self.so.delete('foo', prevValue='bar', timeout=0.00001))
self.assertFalse(self.so.delete('foo', prevValue='bar'))
self.assertTrue(self.so.delete('foo', recursive=True))
self.assertFalse(self.so.retry(self.so._delete, 'foo', prevValue=''))
def test_expire(self):
self.so.set('foo', 'bar', ttl=0.001)
time.sleep(1)
self.assertIsNone(self.so.get('foo'))
self.assertEqual(self.so.get('foo', recursive=True), {})
@patch('time.sleep', Mock())
def test_retry(self):
return_values = [FAIL_REASON.QUEUE_FULL, FAIL_REASON.SUCCESS, FAIL_REASON.REQUEST_DENIED]
def test(callback):
callback(True, return_values.pop(0))
self.assertTrue(self.so.retry(test))
self.assertFalse(self.so.retry(test))
def test_on_ready_override(self):
self.assertTrue(self.so.set('foo', 'bar'))
self.destroy(self.so)
self.so = None
self.conf.onReady = Mock()
self.conf.autoTick = False
so = KVStoreTTL('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
so.doTick(0)
so.destroy()
class TestRaft(unittest.TestCase):
def test_raft(self):
raft = Raft({'ttl': 30, 'scope': 'test', 'name': 'pg', 'self_addr': '127.0.0.1:1234', 'retry_timeout': 10})
raft.set_retry_timeout(20)
raft.set_ttl(60)
self.assertTrue(raft.touch_member(''))
self.assertTrue(raft.initialize())
self.assertTrue(raft.cancel_initialization())
self.assertTrue(raft.set_config_value('{}'))
self.assertTrue(raft.write_sync_state('foo', 'bar'))
self.assertTrue(raft.update_leader('1'))
self.assertTrue(raft.manual_failover('foo', 'bar'))
raft.get_cluster()
self.assertTrue(raft.delete_sync_state())
self.assertTrue(raft.delete_leader())
self.assertTrue(raft.set_history_value(''))
self.assertTrue(raft.delete_cluster())
raft.get_cluster()
self.assertTrue(raft.take_leader())
raft.watch(None, 0.001)
raft._sync_obj.destroy()
raft._sync_obj._SyncObj__thread.join()
def tearDown(self):
for f in ('journal', 'dump'):
f = '127.0.0.1:1234.' + f
if os.path.exists(f):
os.unlink(f)
def setUp(self):
self.tearDown()
@patch('patroni.dcs.raft.KVStoreTTL')
@patch('threading.Event')
def test_init(self, mock_event, mock_kvstore):
mock_kvstore.return_value.applied_local_log = False
mock_event.return_value.isSet.side_effect = [False, True]
self.assertIsNotNone(Raft({'ttl': 30, 'scope': 'test', 'name': 'pg', 'patronictl': True}))

View File

@@ -0,0 +1,46 @@
import logging
import os
import unittest
from mock import Mock, patch
from pysyncobj import SyncObj
from patroni.config import Config
from patroni.raft_controller import RaftController, main as _main
from . import SleepException
class TestPatroniRaftController(unittest.TestCase):
SELF_ADDR = '127.0.0.1:5360'
def remove_files(self):
for f in ('journal', 'dump'):
f = self.SELF_ADDR + '.' + f
if os.path.exists(f):
os.unlink(f)
@patch('pysyncobj.tcp_server.TcpServer.bind', Mock())
def setUp(self):
self._handlers = logging.getLogger().handlers[:]
self.remove_files()
os.environ['PATRONI_RAFT_SELF_ADDR'] = self.SELF_ADDR
config = Config('postgres0.yml', validator=None)
self.rc = RaftController(config)
def tearDown(self):
logging.getLogger().handlers[:] = self._handlers
self.remove_files()
def test_reload_config(self):
self.rc.reload_config()
@patch('logging.Logger.error', Mock(side_effect=SleepException))
@patch.object(SyncObj, 'doTick', Mock(side_effect=Exception))
def test_run(self):
self.assertRaises(SleepException, self.rc.run)
self.rc.shutdown()
@patch('sys.argv', ['patroni'])
def test_patroni_raft_controller_main(self):
self.assertRaises(SystemExit, _main)