diff --git a/patroni/ctl.py b/patroni/ctl.py index 81ced93c..efdda0fe 100644 --- a/patroni/ctl.py +++ b/patroni/ctl.py @@ -23,6 +23,7 @@ import time import yaml from click import ClickException +from collections import defaultdict from contextlib import contextmanager from patroni.dcs import get_dcs as _get_dcs from patroni.exceptions import PatroniException @@ -170,14 +171,14 @@ def print_output(columns, rows, alignment=None, fmt='pretty', header=None, delim elements = [{k: v for k, v in zip(columns, r) if not header or str(v)} for r in rows] func = json.dumps if fmt == 'json' else format_config_for_editing click.echo(func(elements)) - elif fmt in {'pretty', 'tsv'}: + elif fmt in {'pretty', 'tsv', 'topology'}: list_cluster = bool(header and columns and columns[0] == 'Cluster') if list_cluster and 'Tags' in columns: # we want to format member tags as YAML i = columns.index('Tags') for row in rows: if row[i]: - row[i] = format_config_for_editing(row[i], fmt == 'tsv').strip() - if list_cluster and fmt == 'pretty': # skip cluster name if pretty-printing + row[i] = format_config_for_editing(row[i], fmt != 'pretty').strip() + if list_cluster and fmt != 'tsv': # skip cluster name if pretty-printing columns = columns[1:] if columns else [] rows = [row[1:] for row in rows] @@ -746,6 +747,33 @@ def switchover(obj, cluster_name, master, candidate, force, scheduled): _do_failover_or_switchover(obj, 'switchover', cluster_name, master, candidate, force, scheduled) +def generate_topology(level, member, topology): + members = topology.get(member['name'], []) + + if level > 0: + member['name'] = '{0}+ {1}'.format((' ' * (level - 1) * 2), member['name']) + + if member['name']: + yield member + + for member in members: + for member in generate_topology(level + 1, member, topology): + yield member + + +def topology_sort(members): + topology = defaultdict(list) + leader = next((m for m in members if m['role'].endswith('leader')), {'name': None}) + replicas = set(member['name'] for member in members if not member['role'].endswith('leader')) + for member in members: + if not member['role'].endswith('leader'): + parent = member.get('tags', {}).get('replicatefrom') + parent = parent if parent and parent != member['name'] and parent in replicas else leader['name'] + topology[parent].append(member) + for member in generate_topology(0, leader, topology): + yield member + + def output_members(cluster, name, extended=False, fmt='pretty'): rows = [] logging.debug(cluster) @@ -762,12 +790,13 @@ def output_members(cluster, name, extended=False, fmt='pretty'): append_port = any('port' in m and m['port'] != 5432 for m in members) or\ len(set(m['host'] for m in members)) < len(members) - for m in cluster['members']: + sort = topology_sort if fmt == 'topology' else iter + for m in sort(cluster['members']): logging.debug(m) lag = m.get('lag', '') m.update(cluster=name, member=m['name'], host=m.get('host', ''), tl=m.get('timeline', ''), - role='' if m['role'] == 'replica' else m['role'].replace('_', ' ').title(), + role=m['role'].replace('_', ' ').title(), lag_in_mb=round(lag/1024/1024) if isinstance(lag, six.integer_types) else lag, pending_restart='*' if m.get('pending_restart') else '') @@ -782,10 +811,10 @@ def output_members(cluster, name, extended=False, fmt='pretty'): rows.append([m.get(n.lower().replace(' ', '_'), '') for n in columns]) - print_output(columns, rows, {'Lag in MB': 'r', 'TL': 'r', 'Tags': 'l'}, + print_output(columns, rows, {'Member': 'l', 'Lag in MB': 'r', 'TL': 'r', 'Tags': 'l'}, fmt, ' Cluster: {0} ({1}) '.format(name, initialize)) - if fmt != 'pretty': # Omit service info when using machine-readable formats + if fmt not in ('pretty', 'topology'): # Omit service info when using machine-readable formats return service_info = [] @@ -829,6 +858,16 @@ def members(obj, cluster_names, fmt, watch, w, extended, ts): output_members(cluster, cluster_name, extended, fmt) +@ctl.command('topology', help='Prints ASCII topology for given cluster') +@click.argument('cluster_names', nargs=-1) +@option_watch +@option_watchrefresh +@click.pass_obj +@click.pass_context +def topology(ctx, obj, cluster_names, watch, w): + ctx.forward(members, fmt='topology') + + def timestamp(precision=6): return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:precision - 7] diff --git a/tests/test_ctl.py b/tests/test_ctl.py index cc97c85b..e2cb11de 100644 --- a/tests/test_ctl.py +++ b/tests/test_ctl.py @@ -73,7 +73,7 @@ class TestCtl(unittest.TestCase): scheduled_at = datetime.now(tzutc) + timedelta(seconds=600) cluster = get_cluster_initialized_with_leader(Failover(1, 'foo', 'bar', scheduled_at)) del cluster.members[1].data['conn_url'] - for fmt in ('pretty', 'json', 'yaml', 'tsv'): + for fmt in ('pretty', 'json', 'yaml', 'tsv', 'topology'): self.assertIsNone(output_members(cluster, name='abc', fmt=fmt)) @patch('patroni.ctl.get_dcs') @@ -417,6 +417,36 @@ class TestCtl(unittest.TestCase): assert '2100' in result.output assert 'Scheduled restart' in result.output + @patch('patroni.ctl.get_dcs') + def test_topology(self, mock_get_dcs): + mock_get_dcs.return_value = self.e + cluster = get_cluster_initialized_with_leader() + cascade_member = Member(0, 'cascade', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5437/postgres', + 'api_url': 'http://127.0.0.1:8012/patroni', + 'state': 'running', + 'tags': {'replicatefrom': 'other'}, + }) + cascade_member_wrong_tags = Member(0, 'wrong_cascade', 28, + {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5438/postgres', + 'api_url': 'http://127.0.0.1:8013/patroni', + 'state': 'running', + 'tags': {'replicatefrom': 'nonexistinghost'}, + }) + cluster.members.append(cascade_member) + cluster.members.append(cascade_member_wrong_tags) + mock_get_dcs.return_value.get_cluster = Mock(return_value=cluster) + result = self.runner.invoke(ctl, ['topology', 'dummy']) + assert '+\n| leader | 127.0.0.1:5435 | Leader |' in result.output + assert '|\n| + other | 127.0.0.1:5436 | Replica |' in result.output + assert '|\n| + cascade | 127.0.0.1:5437 | Replica |' in result.output + assert '|\n| + wrong_cascade | 127.0.0.1:5438 | Replica |' in result.output + + cluster = get_cluster_initialized_without_leader() + mock_get_dcs.return_value.get_cluster = Mock(return_value=cluster) + result = self.runner.invoke(ctl, ['topology', 'dummy']) + assert '+\n| + leader | 127.0.0.1:5435 | Replica |' in result.output + assert '|\n| + other | 127.0.0.1:5436 | Replica |' in result.output + @patch('patroni.ctl.get_dcs') @patch.object(PoolManager, 'request', Mock(return_value=MockResponse())) def test_flush(self, mock_get_dcs):