patronictl add ability to print ASCII topology (#1576)

Example:
```bash
$ patronictl topology
+ Cluster: batman (6834835313225022118) -----+---------+----+-----------+------------------------------------------------+
| Member          |      Host      |   Role  |  State  | TL | Lag in MB | Tags                                           |
+-----------------+----------------+---------+---------+----+-----------+------------------------------------------------+
| postgresql0     | localhost:5432 |  Leader | running |  2 |           |                                                |
| + postgresql1   | localhost:5433 | Replica | running |  2 |       0.0 |                                                |
|   + postgresql2 | localhost:5434 | Replica | running |  2 |       0.0 | {nofailover: true, replicatefrom: postgresql1} |
+-----------------+----------------+---------+---------+----+-----------+------------------------------------------------+
```
This commit is contained in:
Maxim Fedotov
2020-06-12 16:23:42 +03:00
committed by GitHub
parent 2d5c8e0067
commit 623b594539
2 changed files with 77 additions and 8 deletions

View File

@@ -23,6 +23,7 @@ import time
import yaml
from click import ClickException
from collections import defaultdict
from contextlib import contextmanager
from patroni.dcs import get_dcs as _get_dcs
from patroni.exceptions import PatroniException
@@ -170,14 +171,14 @@ def print_output(columns, rows, alignment=None, fmt='pretty', header=None, delim
elements = [{k: v for k, v in zip(columns, r) if not header or str(v)} for r in rows]
func = json.dumps if fmt == 'json' else format_config_for_editing
click.echo(func(elements))
elif fmt in {'pretty', 'tsv'}:
elif fmt in {'pretty', 'tsv', 'topology'}:
list_cluster = bool(header and columns and columns[0] == 'Cluster')
if list_cluster and 'Tags' in columns: # we want to format member tags as YAML
i = columns.index('Tags')
for row in rows:
if row[i]:
row[i] = format_config_for_editing(row[i], fmt == 'tsv').strip()
if list_cluster and fmt == 'pretty': # skip cluster name if pretty-printing
row[i] = format_config_for_editing(row[i], fmt != 'pretty').strip()
if list_cluster and fmt != 'tsv': # skip cluster name if pretty-printing
columns = columns[1:] if columns else []
rows = [row[1:] for row in rows]
@@ -746,6 +747,33 @@ def switchover(obj, cluster_name, master, candidate, force, scheduled):
_do_failover_or_switchover(obj, 'switchover', cluster_name, master, candidate, force, scheduled)
def generate_topology(level, member, topology):
members = topology.get(member['name'], [])
if level > 0:
member['name'] = '{0}+ {1}'.format((' ' * (level - 1) * 2), member['name'])
if member['name']:
yield member
for member in members:
for member in generate_topology(level + 1, member, topology):
yield member
def topology_sort(members):
topology = defaultdict(list)
leader = next((m for m in members if m['role'].endswith('leader')), {'name': None})
replicas = set(member['name'] for member in members if not member['role'].endswith('leader'))
for member in members:
if not member['role'].endswith('leader'):
parent = member.get('tags', {}).get('replicatefrom')
parent = parent if parent and parent != member['name'] and parent in replicas else leader['name']
topology[parent].append(member)
for member in generate_topology(0, leader, topology):
yield member
def output_members(cluster, name, extended=False, fmt='pretty'):
rows = []
logging.debug(cluster)
@@ -762,12 +790,13 @@ def output_members(cluster, name, extended=False, fmt='pretty'):
append_port = any('port' in m and m['port'] != 5432 for m in members) or\
len(set(m['host'] for m in members)) < len(members)
for m in cluster['members']:
sort = topology_sort if fmt == 'topology' else iter
for m in sort(cluster['members']):
logging.debug(m)
lag = m.get('lag', '')
m.update(cluster=name, member=m['name'], host=m.get('host', ''), tl=m.get('timeline', ''),
role='' if m['role'] == 'replica' else m['role'].replace('_', ' ').title(),
role=m['role'].replace('_', ' ').title(),
lag_in_mb=round(lag/1024/1024) if isinstance(lag, six.integer_types) else lag,
pending_restart='*' if m.get('pending_restart') else '')
@@ -782,10 +811,10 @@ def output_members(cluster, name, extended=False, fmt='pretty'):
rows.append([m.get(n.lower().replace(' ', '_'), '') for n in columns])
print_output(columns, rows, {'Lag in MB': 'r', 'TL': 'r', 'Tags': 'l'},
print_output(columns, rows, {'Member': 'l', 'Lag in MB': 'r', 'TL': 'r', 'Tags': 'l'},
fmt, ' Cluster: {0} ({1}) '.format(name, initialize))
if fmt != 'pretty': # Omit service info when using machine-readable formats
if fmt not in ('pretty', 'topology'): # Omit service info when using machine-readable formats
return
service_info = []
@@ -829,6 +858,16 @@ def members(obj, cluster_names, fmt, watch, w, extended, ts):
output_members(cluster, cluster_name, extended, fmt)
@ctl.command('topology', help='Prints ASCII topology for given cluster')
@click.argument('cluster_names', nargs=-1)
@option_watch
@option_watchrefresh
@click.pass_obj
@click.pass_context
def topology(ctx, obj, cluster_names, watch, w):
ctx.forward(members, fmt='topology')
def timestamp(precision=6):
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:precision - 7]

View File

@@ -73,7 +73,7 @@ class TestCtl(unittest.TestCase):
scheduled_at = datetime.now(tzutc) + timedelta(seconds=600)
cluster = get_cluster_initialized_with_leader(Failover(1, 'foo', 'bar', scheduled_at))
del cluster.members[1].data['conn_url']
for fmt in ('pretty', 'json', 'yaml', 'tsv'):
for fmt in ('pretty', 'json', 'yaml', 'tsv', 'topology'):
self.assertIsNone(output_members(cluster, name='abc', fmt=fmt))
@patch('patroni.ctl.get_dcs')
@@ -417,6 +417,36 @@ class TestCtl(unittest.TestCase):
assert '2100' in result.output
assert 'Scheduled restart' in result.output
@patch('patroni.ctl.get_dcs')
def test_topology(self, mock_get_dcs):
mock_get_dcs.return_value = self.e
cluster = get_cluster_initialized_with_leader()
cascade_member = Member(0, 'cascade', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5437/postgres',
'api_url': 'http://127.0.0.1:8012/patroni',
'state': 'running',
'tags': {'replicatefrom': 'other'},
})
cascade_member_wrong_tags = Member(0, 'wrong_cascade', 28,
{'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5438/postgres',
'api_url': 'http://127.0.0.1:8013/patroni',
'state': 'running',
'tags': {'replicatefrom': 'nonexistinghost'},
})
cluster.members.append(cascade_member)
cluster.members.append(cascade_member_wrong_tags)
mock_get_dcs.return_value.get_cluster = Mock(return_value=cluster)
result = self.runner.invoke(ctl, ['topology', 'dummy'])
assert '+\n| leader | 127.0.0.1:5435 | Leader |' in result.output
assert '|\n| + other | 127.0.0.1:5436 | Replica |' in result.output
assert '|\n| + cascade | 127.0.0.1:5437 | Replica |' in result.output
assert '|\n| + wrong_cascade | 127.0.0.1:5438 | Replica |' in result.output
cluster = get_cluster_initialized_without_leader()
mock_get_dcs.return_value.get_cluster = Mock(return_value=cluster)
result = self.runner.invoke(ctl, ['topology', 'dummy'])
assert '+\n| + leader | 127.0.0.1:5435 | Replica |' in result.output
assert '|\n| + other | 127.0.0.1:5436 | Replica |' in result.output
@patch('patroni.ctl.get_dcs')
@patch.object(PoolManager, 'request', Mock(return_value=MockResponse()))
def test_flush(self, mock_get_dcs):