Files
oopt-gnpy/tests/compare.py
Jan Kundrát 80f63d32ed use load_json instead of open coding
Change-Id: I43cfbb7272bfdd834fad63e6715932ff45aeac0b
2020-06-08 20:06:11 +02:00

136 lines
4.6 KiB
Python

#!/usr/bin/env python3
from json import dump
from pathlib import Path
from argparse import ArgumentParser
from collections import namedtuple
from gnpy.tools.json_io import load_json
class Results(namedtuple('Results', 'missing extra different expected actual')):
def _asdict(self):
return {'missing': self.missing,
'extra': self.extra,
'different': self.different}
def __str__(self):
rv = []
if self.missing:
rv.append('Missing: {len(self.missing)}/{len(self.expected)}')
rv.extend(f'\t{x}' for x in sorted(self.missing))
if self.extra:
rv.append('Extra: {len(self.extra)}/{len(self.expected)}')
rv.extend(f'\t{x}' for x in sorted(self.extra))
if self.different:
rv.append('Different: {len(self.different)}/{len(self.expected)}')
rv.extend(f'\tExpected: {x}\n\tActual: {y}' for x, y in self.different)
if not self.missing and not self.extra and not self.different:
rv.append('All match!')
return '\n'.join(rv)
class NetworksResults(namedtuple('NetworksResult', 'elements connections')):
def _asdict(self):
return {'elements': self.elements._asdict(),
'connections': self.connections._asdict()}
def __str__(self):
return '\n'.join([
'Elements'.center(40, '='),
str(self.elements),
'Connections'.center(40, '='),
str(self.connections),
])
class ServicesResults(namedtuple('ServicesResult', 'requests synchronizations')):
def _asdict(self):
return {'requests': self.requests.asdict(),
'synchronizations': self.synchronizations.asdict()}
def __str__(self):
return '\n'.join([
'Requests'.center(40, '='),
str(self.requests),
'Synchronizations'.center(40, '='),
str(self.synchronizations),
])
class PathsResults(namedtuple('PathsResults', 'paths')):
def _asdict(self):
return {'paths': self.paths.asdict()}
def __str__(self):
return '\n'.join([
'Paths'.center(40, '='),
str(self.paths),
])
def compare(expected, actual, key=lambda x: x):
expected = {key(el): el for el in expected}
actual = {key(el): el for el in actual}
missing = set(expected) - set(actual)
extra = set(actual) - set(expected)
different = [(expected[x], actual[x]) for
x in set(expected) & set(actual)
if expected[x] != actual[x]]
return Results(missing, extra, different, expected, actual)
def compare_networks(expected, actual):
elements = compare(expected['elements'], actual['elements'],
key=lambda el: el['uid'])
connections = compare(expected['connections'], actual['connections'],
key=lambda el: (el['from_node'], el['to_node']))
return NetworksResults(elements, connections)
def compare_services(expected, actual):
requests = compare(expected['path-request'], actual['path-request'],
key=lambda el: el['request-id'])
synchronizations = compare(expected['path-request'], expected['path-request'],
key=lambda el: el['request-id'])
if 'synchronization' in expected.keys():
synchronizations = compare(expected['synchronization'], actual['synchronization'],
key=lambda el: el['synchronization-id'])
return ServicesResults(requests, synchronizations)
def compare_paths(expected_output, actual_output):
paths = compare(expected['path'], actual['path'], key=lambda el: el['path-id'])
return PathsResults(paths)
COMPARISONS = {
'networks': compare_networks,
'services': compare_services,
'paths': compare_paths,
}
parser = ArgumentParser()
parser.add_argument('expected_output', type=Path, metavar='FILE')
parser.add_argument('actual_output', type=Path, metavar='FILE')
parser.add_argument('-o', '--output', default=None)
parser.add_argument('-c', '--comparison', choices=COMPARISONS, default='networks')
def encode_sets(obj):
if isinstance(obj, set):
return list(obj)
raise TypeError(f'{obj!r} is not JSON serializable!')
if __name__ == '__main__':
args = parser.parse_args()
expected = load_json(args.expected_output)
actual = load_json(args.actual_output)
result = COMPARISONS[args.comparison](expected, actual)
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
dump(result, f, default=encode_sets, indent=2, ensure_ascii=False)
else:
print(str(result))