fix: cli_examples linter issues

Signed-off-by: EstherLerouzic <esther.lerouzic@orange.com>
Change-Id: Id7e2f8b7282913b885062a01f1bd018bbc85e39c
This commit is contained in:
EstherLerouzic
2024-04-22 21:10:38 +02:00
parent de509139b3
commit a59db8fd12

View File

@@ -11,14 +11,13 @@ Common code for CLI examples
import argparse
import logging
import sys
from pathlib import Path
from math import ceil
from numpy import mean
from pathlib import Path
from copy import deepcopy
import gnpy.core.ansi_escapes as ansi_escapes
from gnpy.core.elements import Transceiver, Fiber, RamanFiber, Roadm
import gnpy.core.exceptions as exceptions
from gnpy.core import ansi_escapes
from gnpy.core.elements import Transceiver, Fiber, RamanFiber
from gnpy.core import exceptions
from gnpy.core.parameters import SimParams
from gnpy.core.utils import lin2db, pretty_summary_print, per_label_average, watt2dbm
from gnpy.topology.request import (ResultElement, jsontocsv, BLOCKING_NOPATH)
@@ -106,11 +105,14 @@ def _add_common_options(parser: argparse.ArgumentParser, network_default: Path):
def transmission_main_example(args=None):
"""Main script running a single simulation. It returns the detailed power across crossed elements and
average performance accross all channels.
"""
parser = argparse.ArgumentParser(
description='Send a full spectrum load through the network from point A to point B',
epilog=_help_footer,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
)
_add_common_options(parser, network_default=_examples_dir / 'edfa_example_network.json')
parser.add_argument('--show-channels', action='store_true', help='Show final per-channel OSNR and GSNR summary')
parser.add_argument('-pl', '--plot', action='store_true')
@@ -144,14 +146,14 @@ def transmission_main_example(args=None):
source = None
if args.source:
source = transceivers.pop(args.source, None)
valid_source = True if source else False
valid_source = bool(source)
destination = None
nodes_list = []
loose_list = []
if args.destination:
destination = transceivers.pop(args.destination, None)
valid_destination = True if destination else False
valid_destination = bool(destination)
# If no exact match try to find partial match
if args.source and not source:
@@ -209,8 +211,8 @@ def transmission_main_example(args=None):
except ValueError:
sys.exit(1)
# print or export results
spans = [s.params.length for s in path if isinstance(s, RamanFiber) or isinstance(s, Fiber)]
print(f'\nThere are {len(spans)} fiber spans over {sum(spans)/1000:.0f} km between {source.uid} '
spans = [s.params.length for s in path if isinstance(s, (Fiber, RamanFiber))]
print(f'\nThere are {len(spans)} fiber spans over {sum(spans) / 1000:.0f} km between {source.uid} '
f'and {destination.uid}')
print(f'\nNow propagating between {source.uid} and {destination.uid}:')
print(f'Reference used for design: (Input optical power reference in span = {watt2dbm(ref_req.power):.2f}dBm,\n'
@@ -235,7 +237,7 @@ def transmission_main_example(args=None):
if power_mode:
print(f'\nTransmission result for input optical power reference in span = {power_dbm:.2f} dBm:')
else:
print(f'\nTransmission results:')
print('\nTransmission results:')
print(f' Final GSNR (0.1 nm): {ansi_escapes.cyan}{mean(destination.snr_01nm):.02f} dB{ansi_escapes.reset}')
else:
print(path[-1])
@@ -286,11 +288,14 @@ def _path_result_json(pathresult):
def path_requests_run(args=None):
"""Main script running several services simulations. It returns a summary of the average performance
for each service.
"""
parser = argparse.ArgumentParser(
description='Compute performance for a list of services provided in a json file or an excel sheet',
epilog=_help_footer,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
)
_add_common_options(parser, network_default=_examples_dir / 'meshTopologyExampleV2.xls')
parser.add_argument('service_filename', nargs='?', type=Path, metavar='SERVICES-REQUESTS.(json|xls|xlsx)',
default=_examples_dir / 'meshTopologyExampleV2.xls',
@@ -302,7 +307,6 @@ def path_requests_run(args=None):
parser.add_argument('--redesign-per-request', action='store_true', help='Redesign the network at each request'
+ ' computation using the request as the reference channel')
args = parser.parse_args(args if args is not None else sys.argv[1:])
_setup_logging(args)
@@ -318,7 +322,7 @@ def path_requests_run(args=None):
data = load_requests(args.service_filename, equipment, bidir=args.bidir,
network=network, network_filename=args.topology)
_data = requests_from_json(data, equipment)
oms_list, propagatedpths, reversed_propagatedpths, rqs, dsjn, result = \
_, propagatedpths, reversed_propagatedpths, rqs, dsjn, result = \
planning(network, equipment, data, redesign=args.redesign_per_request)
except exceptions.NetworkTopologyError as e:
print(f'{ansi_escapes.red}Invalid network definition:{ansi_escapes.reset} {e}')
@@ -352,27 +356,27 @@ def path_requests_run(args=None):
for i, this_p in enumerate(propagatedpths):
rev_pth = reversed_propagatedpths[i]
if rev_pth and this_p:
psnrb = f'{round(mean(this_p[-1].snr),2)} ({round(mean(rev_pth[-1].snr),2)})'
psnrb = f'{round(mean(this_p[-1].snr), 2)} ({round(mean(rev_pth[-1].snr), 2)})'
psnr = f'{round(mean(this_p[-1].snr_01nm), 2)}' +\
f' ({round(mean(rev_pth[-1].snr_01nm),2)})'
f' ({round(mean(rev_pth[-1].snr_01nm), 2)})'
elif this_p:
psnrb = f'{round(mean(this_p[-1].snr),2)}'
psnr = f'{round(mean(this_p[-1].snr_01nm),2)}'
psnrb = f'{round(mean(this_p[-1].snr), 2)}'
psnr = f'{round(mean(this_p[-1].snr_01nm), 2)}'
try:
if rqs[i].blocking_reason in BLOCKING_NOPATH:
line = [f'{rqs[i].request_id}', f' {rqs[i].source} to {rqs[i].destination} :',
f'-', f'-', f'-', f'{rqs[i].tsp_mode}', f'{round(rqs[i].path_bandwidth * 1e-9,2)}',
f'-', f'{rqs[i].blocking_reason}']
'-', '-', '-', f'{rqs[i].tsp_mode}', f'{round(rqs[i].path_bandwidth * 1e-9, 2)}',
'-', '{rqs[i].blocking_reason}']
else:
line = [f'{rqs[i].request_id}', f' {rqs[i].source} to {rqs[i].destination} : ', psnrb,
psnr, f'-', f'{rqs[i].tsp_mode}', f'{round(rqs[i].path_bandwidth * 1e-9, 2)}',
f'-', f'{rqs[i].blocking_reason}']
psnr, '-', f'{rqs[i].tsp_mode}', f'{round(rqs[i].path_bandwidth * 1e-9, 2)}',
'-', f'{rqs[i].blocking_reason}']
except AttributeError:
line = [f'{rqs[i].request_id}', f' {rqs[i].source} to {rqs[i].destination} : ', psnrb,
psnr, f'{rqs[i].OSNR + equipment["SI"]["default"].sys_margins}',
f'{rqs[i].tsp_mode}', f'{round(rqs[i].path_bandwidth * 1e-9,2)}',
f'{ceil(rqs[i].path_bandwidth / rqs[i].bit_rate) }', f'({rqs[i].N},{rqs[i].M})']
f'{rqs[i].tsp_mode}', f'{round(rqs[i].path_bandwidth * 1e-9, 2)}',
f'{ceil(rqs[i].path_bandwidth / rqs[i].bit_rate)}', f'({rqs[i].N},{rqs[i].M})']
data.append(line)
col_width = max(len(word) for row in data for word in row[2:]) # padding