fix: linter issues in convert and service_sheet

Signed-off-by: EstherLerouzic <esther.lerouzic@orange.com>
Change-Id: I5f7490ed61b53cea8d2923a8a54d38b3fbb5fa0a
This commit is contained in:
EstherLerouzic
2024-04-25 11:05:28 +02:00
parent c840bb1a44
commit 310995045e
2 changed files with 256 additions and 170 deletions

View File

@@ -20,7 +20,6 @@ In the "Links" sheet, only the first three columns ("Node A", "Node Z" and
the "east" information so that it is possible to input undirected data.
"""
from xlrd import open_workbook
from logging import getLogger
from argparse import ArgumentParser
from collections import namedtuple, Counter, defaultdict
@@ -28,6 +27,10 @@ from itertools import chain
from json import dumps
from pathlib import Path
from copy import copy
from typing import Dict, List, Tuple, DefaultDict
from xlrd import open_workbook
from xlrd.biffh import XLRDError
from networkx import DiGraph
from gnpy.core.utils import silent_remove, transform_data
from gnpy.core.exceptions import NetworkTopologyError
@@ -38,12 +41,16 @@ _logger = getLogger(__name__)
def all_rows(sh, start=0):
"""Returns all rows of the xls(x) sheet starting from start row
"""
return (sh.row(x) for x in range(start, sh.nrows))
class Node(object):
class Node:
"""Node data class
"""
def __init__(self, **kwargs):
super(Node, self).__init__()
super().__init__()
self.update_attr(kwargs)
def update_attr(self, kwargs):
@@ -65,13 +72,13 @@ class Node(object):
}
class Link(object):
class Link:
"""attribtes from west parse_ept_headers dict
+node_a, node_z, west_fiber_con_in, east_fiber_con_in
"""
def __init__(self, **kwargs):
super(Link, self).__init__()
super().__init__()
self.update_attr(kwargs)
self.distance_units = 'km'
@@ -80,7 +87,7 @@ class Link(object):
for k, v in self.default_values.items():
v = clean_kwargs.get(k, v)
setattr(self, k, v)
k = 'west' + k.split('east')[-1]
k = 'west' + k.rsplit('east', maxsplit=1)[-1]
v = clean_kwargs.get(k, v)
setattr(self, k, v)
@@ -101,9 +108,11 @@ class Link(object):
}
class Eqpt(object):
class Eqpt:
"""
"""
def __init__(self, **kwargs):
super(Eqpt, self).__init__()
super().__init__()
self.update_attr(kwargs)
def update_attr(self, kwargs):
@@ -111,7 +120,7 @@ class Eqpt(object):
for k, v in self.default_values.items():
v_east = clean_kwargs.get(k, v)
setattr(self, k, v_east)
k = 'west' + k.split('east')[-1]
k = 'west' + k.rsplit('east', maxsplit=1)[-1]
v_west = clean_kwargs.get(k, v)
setattr(self, k, v_west)
@@ -119,7 +128,6 @@ class Eqpt(object):
'from_city': '',
'to_city': '',
'east_amp_type': '',
'east_att_in': 0,
'east_amp_gain': None,
'east_amp_dp': None,
'east_tilt_vs_wavelength': 0,
@@ -127,9 +135,11 @@ class Eqpt(object):
}
class Roadm(object):
class Roadm:
"""
"""
def __init__(self, **kwargs):
super(Roadm, self).__init__()
super().__init__()
self.update_attr(kwargs)
def update_attr(self, kwargs):
@@ -156,7 +166,7 @@ def read_header(my_sheet, line, slice_):
try:
header = [x.value.strip() for x in my_sheet.row_slice(line, slice_[0], slice_[1])]
header_i = [Param_header(header, i + slice_[0]) for i, header in enumerate(header) if header != '']
except Exception:
except (AttributeError, IndexError):
header_i = []
if header_i != [] and header_i[-1].colindex != slice_[1]:
header_i.append(Param_header('', slice_[1]))
@@ -172,7 +182,7 @@ def read_slice(my_sheet, line, slice_, header):
try:
slice_range = next((h.colindex, header_i[i + 1].colindex)
for i, h in enumerate(header_i) if header in h.header)
except Exception:
except StopIteration:
pass
return slice_range
@@ -193,8 +203,7 @@ def parse_headers(my_sheet, input_headers_dict, headers, start_line, slice_in):
msg = f'missing header {h0}'
if h0 in ('east', 'Node A', 'Node Z', 'City'):
raise NetworkTopologyError(msg)
else:
_logger.warning(msg)
_logger.warning(msg)
elif not isinstance(input_headers_dict[h0], dict):
headers[slice_out[0]] = input_headers_dict[h0]
else:
@@ -206,22 +215,33 @@ def parse_headers(my_sheet, input_headers_dict, headers, start_line, slice_in):
def parse_row(row, headers):
"""
"""
return {f: r.value for f, r in
zip([label for label in headers.values()], [row[i] for i in headers])}
zip(list(headers.values()), [row[i] for i in headers])}
def parse_sheet(my_sheet, input_headers_dict, header_line, start_line, column):
"""
"""
headers = parse_headers(my_sheet, input_headers_dict, {}, header_line, (0, column))
for row in all_rows(my_sheet, start=start_line):
yield parse_row(row[0: column], headers)
def _format_items(items):
def _format_items(items: List[str]):
"""formating utils
"""
return '\n'.join(f' - {item}' for item in items)
def sanity_check(nodes, links, nodes_by_city, links_by_city, eqpts_by_city):
def sanity_check(nodes: List[Node], links: List[Link],
nodes_by_city: Dict[str, Node], links_by_city: DefaultDict[str, List[Link]],
eqpts_by_city: DefaultDict[str, List[Eqpt]]) -> Tuple[List[Node], List[Link]]:
"""Raise correct issues if xls(x) is not correct, Correct type to ROADM if more tha 2-degrees
checks duplicate links, unreferenced nodes in links, in eqpts, unreferenced link in eqpts,
duplicate items
"""
duplicate_links = []
for l1 in links:
for l2 in links:
@@ -230,6 +250,7 @@ def sanity_check(nodes, links, nodes_by_city, links_by_city, eqpts_by_city):
link {l1.from_city}-{l1.to_city} is duplicate \
\nthe 1st duplicate link will be removed but you should check Links sheet input')
duplicate_links.append(l1)
if duplicate_links:
msg = 'XLS error: ' \
+ f'links {_format_items([(d.from_city, d.to_city) for d in duplicate_links])} are duplicate'
@@ -318,7 +339,7 @@ def create_roadm_element(node, roadms_by_city):
'booster_variety_list': silent_remove(node.booster_restriction.split(' | '), '')}
}
if node.city in roadms_by_city.keys():
if 'params' not in roadm.keys():
if 'params' not in roadm:
roadm['params'] = {}
roadm['params']['per_degree_pch_out_db'] = {}
for elem in roadms_by_city[node.city]:
@@ -349,7 +370,7 @@ def create_roadm_element(node, roadms_by_city):
return roadm
def create_east_eqpt_element(node):
def create_east_eqpt_element(node: Node, nodes_by_city: Dict[str, Node]) -> dict:
""" create amplifiers json elements for the east direction.
this includes the case where the case of a fused element defined instead of an
ILA in eqpt sheet
@@ -382,7 +403,7 @@ def create_east_eqpt_element(node):
return eqpt
def create_west_eqpt_element(node):
def create_west_eqpt_element(node: Node, nodes_by_city: Dict[str, Node]) -> dict:
""" create amplifiers json elements for the west direction.
this includes the case where the case of a fused element defined instead of an
ILA in eqpt sheet
@@ -409,7 +430,13 @@ def create_west_eqpt_element(node):
eqpt['params'] = {'loss': 0}
return eqpt
def xls_to_json_data(input_filename, filter_region=[]):
def xls_to_json_data(input_filename: Path, filter_region: List[str] = None) -> Dict:
"""Read the excel sheets and produces the json dict in GNPy format (legacy)
returns json dict
"""
if filter_region is None:
filter_region = []
nodes, links, eqpts, roadms = parse_excel(input_filename)
if filter_region:
nodes = [n for n in nodes if n.region.lower() in filter_region]
@@ -419,16 +446,13 @@ def xls_to_json_data(input_filename, filter_region=[]):
cities = {lnk.from_city for lnk in links} | {lnk.to_city for lnk in links}
nodes = [n for n in nodes if n.city in cities]
global nodes_by_city
nodes_by_city = {n.city: n for n in nodes}
global links_by_city
links_by_city = defaultdict(list)
for link in links:
links_by_city[link.from_city].append(link)
links_by_city[link.to_city].append(link)
global eqpts_by_city
eqpts_by_city = defaultdict(list)
for eqpt in eqpts:
eqpts_by_city[eqpt.from_city].append(eqpt)
@@ -504,24 +528,25 @@ def xls_to_json_data(input_filename, filter_region=[]):
'type': 'Edfa',
'operational': {'gain_target': None,
'tilt_target': 0}
} for x in nodes_by_city.values() if x.node_type.lower() == 'ila' and x.city not in eqpts_by_city] +
[create_east_eqpt_element(e) for e in eqpts] +
[create_west_eqpt_element(e) for e in eqpts],
} for x in nodes_by_city.values() if x.node_type.lower() == 'ila' and x.city not in eqpts_by_city]
+ [create_east_eqpt_element(e, nodes_by_city) for e in eqpts]
+ [create_west_eqpt_element(e, nodes_by_city) for e in eqpts],
'connections':
list(chain.from_iterable([eqpt_connection_by_city(n.city)
list(chain.from_iterable([eqpt_connection_by_city(n.city, eqpts_by_city, links_by_city, nodes_by_city)
for n in nodes]))
+
list(chain.from_iterable(zip(
[{'from_node': f'trx {x.city}',
'to_node': f'roadm {x.city}'}
+ list(chain.from_iterable(zip(
[{'from_node': f'trx {x.city}', 'to_node': f'roadm {x.city}'}
for x in nodes_by_city.values() if x.node_type.lower() == 'roadm'],
[{'from_node': f'roadm {x.city}',
'to_node': f'trx {x.city}'}
[{'from_node': f'roadm {x.city}', 'to_node': f'trx {x.city}'}
for x in nodes_by_city.values() if x.node_type.lower() == 'roadm'])))
}
def convert_file(input_filename, filter_region=[], output_json_file_name=None):
def convert_file(input_filename: Path, filter_region: List[str] = None, output_json_file_name: Path = None):
"""Save the conversion into
"""
if filter_region is None:
filter_region = []
data = xls_to_json_data(input_filename, filter_region)
if output_json_file_name is None:
output_json_file_name = input_filename.with_suffix('.json')
@@ -531,12 +556,12 @@ def convert_file(input_filename, filter_region=[], output_json_file_name=None):
return output_json_file_name
def corresp_names(input_filename, network):
def corresp_names(input_filename: Path, network: DiGraph):
""" a function that builds the correspondance between names given in the excel,
and names used in the json, and created by the autodesign.
All names are listed
"""
nodes, links, eqpts, roadms = parse_excel(input_filename)
nodes, links, eqpts, _ = parse_excel(input_filename)
fused = [n.uid for n in network.nodes() if isinstance(n, Fused)]
ila = [n.uid for n in network.nodes() if isinstance(n, Edfa)]
@@ -598,7 +623,10 @@ def corresp_names(input_filename, network):
return corresp_roadm, corresp_fused, corresp_ila
def parse_excel(input_filename):
def parse_excel(input_filename: Path) -> Tuple[List[Node], List[Link], List[Eqpt], List[Roadm]]:
"""reads xls(x) sheets among Nodes, Eqpts, Links, Roadms and parse the data in the sheets
into internal data structure Node, Link, Eqpt, Roadm, classes
"""
link_headers = {
'Node A': 'from_city',
'Node Z': 'to_city',
@@ -637,7 +665,6 @@ def parse_excel(input_filename):
'Node Z': 'to_city',
'east': {
'amp type': 'east_amp_type',
'att_in': 'east_att_in',
'amp gain': 'east_amp_gain',
'delta p': 'east_amp_dp',
'tilt': 'east_tilt',
@@ -645,7 +672,6 @@ def parse_excel(input_filename):
},
'west': {
'amp type': 'west_amp_type',
'att_in': 'west_att_in',
'amp gain': 'west_amp_gain',
'delta p': 'west_amp_dp',
'tilt': 'west_tilt',
@@ -665,36 +691,32 @@ def parse_excel(input_filename):
links_sheet = wb.sheet_by_name('Links')
try:
eqpt_sheet = wb.sheet_by_name('Eqpt')
except Exception:
except XLRDError:
# eqpt_sheet is optional
eqpt_sheet = None
try:
roadm_sheet = wb.sheet_by_name('Roadms')
except Exception:
except XLRDError:
# roadm_sheet is optional
roadm_sheet = None
nodes = []
for node in parse_sheet(nodes_sheet, node_headers, NODES_LINE, NODES_LINE + 1, NODES_COLUMN):
nodes.append(Node(**node))
nodes = [Node(**node) for node in parse_sheet(nodes_sheet, node_headers,
NODES_LINE, NODES_LINE + 1, NODES_COLUMN)]
expected_node_types = {'ROADM', 'ILA', 'FUSED'}
for n in nodes:
if n.node_type not in expected_node_types:
n.node_type = 'ILA'
links = []
for link in parse_sheet(links_sheet, link_headers, LINKS_LINE, LINKS_LINE + 2, LINKS_COLUMN):
links.append(Link(**link))
links = [Link(**link) for link in parse_sheet(links_sheet, link_headers,
LINKS_LINE, LINKS_LINE + 2, LINKS_COLUMN)]
eqpts = []
if eqpt_sheet is not None:
for eqpt in parse_sheet(eqpt_sheet, eqpt_headers, EQPTS_LINE, EQPTS_LINE + 2, EQPTS_COLUMN):
eqpts.append(Eqpt(**eqpt))
eqpts = [Eqpt(**eqpt) for eqpt in parse_sheet(eqpt_sheet, eqpt_headers,
EQPTS_LINE, EQPTS_LINE + 2, EQPTS_COLUMN)]
roadms = []
if roadm_sheet is not None:
for roadm in parse_sheet(roadm_sheet, roadm_headers, ROADMS_LINE, ROADMS_LINE+2, ROADMS_COLUMN):
roadms.append(Roadm(**roadm))
roadms = [Roadm(**roadm) for roadm in parse_sheet(roadm_sheet, roadm_headers,
ROADMS_LINE, ROADMS_LINE + 2, ROADMS_COLUMN)]
# sanity check
all_cities = Counter(n.city for n in nodes)
@@ -716,32 +738,37 @@ def parse_excel(input_filename):
return nodes, links, eqpts, roadms
def eqpt_connection_by_city(city_name):
other_cities = fiber_dest_from_source(city_name)
def eqpt_connection_by_city(city_name: str, eqpts_by_city: DefaultDict[str, List[Eqpt]],
links_by_city: DefaultDict[str, List[Link]], nodes_by_city: Dict[str, Node]) -> list:
"""
"""
other_cities = fiber_dest_from_source(city_name, links_by_city)
subdata = []
if nodes_by_city[city_name].node_type.lower() in {'ila', 'fused'}:
# Then len(other_cities) == 2
direction = ['west', 'east']
for i in range(2):
from_ = fiber_link(other_cities[i], city_name)
in_ = eqpt_in_city_to_city(city_name, other_cities[0], direction[i])
to_ = fiber_link(city_name, other_cities[1 - i])
from_ = fiber_link(other_cities[i], city_name, links_by_city)
in_ = eqpt_in_city_to_city(city_name, other_cities[0], eqpts_by_city, nodes_by_city, direction[i])
to_ = fiber_link(city_name, other_cities[1 - i], links_by_city)
subdata += connect_eqpt(from_, in_, to_)
elif nodes_by_city[city_name].node_type.lower() == 'roadm':
for other_city in other_cities:
from_ = f'roadm {city_name}'
in_ = eqpt_in_city_to_city(city_name, other_city)
to_ = fiber_link(city_name, other_city)
in_ = eqpt_in_city_to_city(city_name, other_city, eqpts_by_city, nodes_by_city)
to_ = fiber_link(city_name, other_city, links_by_city)
subdata += connect_eqpt(from_, in_, to_)
from_ = fiber_link(other_city, city_name)
in_ = eqpt_in_city_to_city(city_name, other_city, "west")
from_ = fiber_link(other_city, city_name, links_by_city)
in_ = eqpt_in_city_to_city(city_name, other_city, eqpts_by_city, nodes_by_city, "west")
to_ = f'roadm {city_name}'
subdata += connect_eqpt(from_, in_, to_)
return subdata
def connect_eqpt(from_, in_, to_):
def connect_eqpt(from_: str, in_: str, to_: str) -> List[dict]:
"""Utils: create the topology connection json dict between in and to
"""
connections = []
if in_ != '':
connections = [{'from_node': from_, 'to_node': in_},
@@ -751,7 +778,11 @@ def connect_eqpt(from_, in_, to_):
return connections
def eqpt_in_city_to_city(in_city, to_city, direction='east'):
def eqpt_in_city_to_city(in_city: str, to_city: str,
eqpts_by_city: DefaultDict[str, List[Eqpt]], nodes_by_city: Dict[str, Node],
direction: str = 'east') -> str:
"""Utils: returns the formatted dtring corresponding to in_city types and direction
"""
rev_direction = 'west' if direction == 'east' else 'east'
return_eqpt = ''
if in_city in eqpts_by_city:
@@ -770,7 +801,7 @@ def eqpt_in_city_to_city(in_city, to_city, direction='east'):
return return_eqpt
def corresp_next_node(network, corresp_ila, corresp_roadm):
def corresp_next_node(network: DiGraph, corresp_ila: dict, corresp_roadm: dict) -> Tuple[dict, dict]:
""" for each name in corresp dictionnaries find the next node in network and its name
given by user in excel. for meshTopology_exampleV2.xls:
user ILA name Stbrieuc covers the two direction. convert.py creates 2 different ILA
@@ -817,7 +848,7 @@ def corresp_next_node(network, corresp_ila, corresp_roadm):
break
# if next_nd was not already added in the dict with the previous loop,
# add the first found correspondance in ila names
if correct_ila_name not in next_node.keys():
if correct_ila_name not in next_node:
for key, val in corresp_ila.items():
# in case of splitted fibers the ila name might not be exact match
if [e for e in val if e in next_nd.uid]:
@@ -828,7 +859,9 @@ def corresp_next_node(network, corresp_ila, corresp_roadm):
return corresp_ila, next_node
def fiber_dest_from_source(city_name):
def fiber_dest_from_source(city_name: str, links_by_city: DefaultDict[str, List[Link]]) -> List[str]:
"""Returns the list of cities city_name is connected to
"""
destinations = []
links_from_city = links_by_city[city_name]
for l in links_from_city:
@@ -839,7 +872,9 @@ def fiber_dest_from_source(city_name):
return destinations
def fiber_link(from_city, to_city):
def fiber_link(from_city: str, to_city: str, links_by_city: DefaultDict[str, List[Link]]) -> str:
"""utils: returns formatted uid for fibers between from_city and to_city
"""
source_dest = (from_city, to_city)
links = links_by_city[from_city]
link = next(l for l in links if l.from_city in source_dest and l.to_city in source_dest)
@@ -850,7 +885,9 @@ def fiber_link(from_city, to_city):
return fiber
def midpoint(city_a, city_b):
def midpoint(city_a: Node, city_b:Node) -> dict:
"""Computes mipoint coordinates
"""
lats = city_a.latitude, city_b.latitude
longs = city_a.longitude, city_b.longitude
try:
@@ -879,6 +916,8 @@ ROADMS_COLUMN = 6
def _do_convert():
"""Main function for xls(x) topology conversion to JSON format
"""
parser = ArgumentParser()
parser.add_argument('workbook', type=Path)
parser.add_argument('-f', '--filter-region', action='append', default=[])

View File

@@ -11,105 +11,150 @@ Yang model for requesting path computation.
See: draft-ietf-teas-yang-path-computation-01.txt
"""
from xlrd import open_workbook, XL_CELL_EMPTY
from collections import namedtuple
from logging import getLogger
from copy import deepcopy
from pathlib import Path
from typing import Dict, List
from networkx import DiGraph
from xlrd import open_workbook, XL_CELL_EMPTY
from gnpy.core.utils import db2lin
from gnpy.core.exceptions import ServiceError
from gnpy.core.elements import Transceiver, Roadm, Edfa, Fiber
from gnpy.tools.convert import corresp_names, corresp_next_node
from gnpy.tools.convert import corresp_names, corresp_next_node, all_rows
SERVICES_COLUMN = 12
def all_rows(sheet, start=0):
return (sheet.row(x) for x in range(start, sheet.nrows))
logger = getLogger(__name__)
class Request(namedtuple('Request', 'request_id source destination trx_type mode \
spacing power nb_channel disjoint_from nodes_list is_loose path_bandwidth')):
def __new__(cls, request_id, source, destination, trx_type, mode=None, spacing=None, power=None, nb_channel=None, disjoint_from='', nodes_list=None, is_loose='', path_bandwidth=None):
return super().__new__(cls, request_id, source, destination, trx_type, mode, spacing, power, nb_channel, disjoint_from, nodes_list, is_loose, path_bandwidth)
class Request(namedtuple('request_param', 'request_id source destination trx_type mode \
spacing power nb_channel disjoint_from nodes_list is_loose path_bandwidth')):
"""DATA class for a request.
:params request_id (int): The unique identifier for the request.
:params source (str): The source node for the communication.
:params destination (str): The destination node for the communication.
:params trx_type (str): The type of transmission for the communication.
:params mode (str, optional): The mode of transmission. Defaults to None.
:params spacing (float, optional): The spacing between channels. Defaults to None.
:params power (float, optional): The power level for the communication. Defaults to None.
:params nb_channel (int, optional): The number of channels required for the communication. Defaults to None.
:params disjoint_from (str, optional): The node to be disjoint from. Defaults to ''.
:params nodes_list (list, optional): The list of nodes involved in the communication. Defaults to None.
:params is_loose (str, optional): Indicates if the communication is loose. Defaults to ''.
:params path_bandwidth (float, optional): The bandwidth required for the communication. Defaults to None.
"""
def __new__(cls, request_id, source, destination, trx_type, mode=None, spacing=None, power=None, nb_channel=None,
disjoint_from='', nodes_list=None, is_loose='', path_bandwidth=None):
return super().__new__(cls, request_id, source, destination, trx_type, mode, spacing, power, nb_channel,
disjoint_from, nodes_list, is_loose, path_bandwidth)
class Element:
"""
"""
def __init__(self, uid):
self.uid = uid
def __eq__(self, other):
return type(self) == type(other) and self.uid == other.uid
return isinstance(other, type(self)) and self.uid == other.ui
def __hash__(self):
return hash((type(self), self.uid))
class Request_element(Element):
def __init__(self, Request, equipment, bidir):
"""Class that generate the request in the json format
:params request_param (Request): The request object containing the information for the element.
:params equipment (dict): The equipment configuration for the communication.
:params bidir (bool): Indicates if the communication is bidirectional.
Attributes:
request_id (str): The unique identifier for the request.
source (str): The source node for the communication.
destination (str): The destination node for the communication.
srctpid (str): The source TP ID for the communication.
dsttpid (str): The destination TP ID for the communication.
bidir (bool): Indicates if the communication is bidirectional.
trx_type (str): The type of transmission for the communication.
mode (str): The mode of transmission for the communication.
spacing (float): The spacing between channels for the communication.
power (float): The power level for the communication.
nb_channel (int): The number of channels required for the communication.
disjoint_from (list): The list of nodes to be disjoint from.
nodes_list (list): The list of nodes involved in the communication.
loose (str): Indicates if the communication is loose or strict.
path_bandwidth (float): The bandwidth required for the communication.
"""
def __init__(self, request_param: Request, equipment: Dict, bidir: bool):
"""
"""
super().__init__(uid=request_param.request_id)
# request_id is str
# excel has automatic number formatting that adds .0 on integer values
# the next lines recover the pure int value, assuming this .0 is unwanted
self.request_id = correct_xlrd_int_to_str_reading(Request.request_id)
self.source = f'trx {Request.source}'
self.destination = f'trx {Request.destination}'
self.request_id = correct_xlrd_int_to_str_reading(request_param.request_id)
self.source = f'trx {request_param.source}'
self.destination = f'trx {request_param.destination}'
# TODO: the automatic naming generated by excel parser requires that source and dest name
# be a string starting with 'trx' : this is manually added here.
self.srctpid = f'trx {Request.source}'
self.dsttpid = f'trx {Request.destination}'
self.srctpid = f'trx {request_param.source}'
self.dsttpid = f'trx {request_param.destination}'
self.bidir = bidir
# test that trx_type belongs to eqpt_config.json
# if not replace it with a default
try:
if equipment['Transceiver'][Request.trx_type]:
self.trx_type = correct_xlrd_int_to_str_reading(Request.trx_type)
if Request.mode is not None:
Requestmode = correct_xlrd_int_to_str_reading(Request.mode)
if [mode for mode in equipment['Transceiver'][Request.trx_type].mode if mode['format'] == Requestmode]:
self.mode = Requestmode
if equipment['Transceiver'][request_param.trx_type]:
self.trx_type = correct_xlrd_int_to_str_reading(request_param.trx_type)
if request_param.mode is not None:
request_mode = correct_xlrd_int_to_str_reading(request_param.mode)
if [mode for mode in equipment['Transceiver'][request_param.trx_type].mode
if mode['format'] == request_mode]:
self.mode = request_mode
else:
msg = f'Request Id: {self.request_id} - could not find tsp : \'{Request.trx_type}\' ' \
+ f'with mode: \'{Requestmode}\' in eqpt library \nComputation stopped.'
msg = f'Request Id: {self.request_id} - could not find tsp : \'{request_param.trx_type}\' ' \
+ f'with mode: \'{request_mode}\' in eqpt library \nComputation stopped.'
raise ServiceError(msg)
else:
Requestmode = None
self.mode = Request.mode
except KeyError:
msg = f'Request Id: {self.request_id} - could not find tsp : \'{Request.trx_type}\' ' \
+ f'with mode: \'{Request.mode}\' in eqpt library \nComputation stopped.'
raise ServiceError(msg)
request_mode = None
self.mode = request_param.mode
except KeyError as e:
msg = f'Request Id: {self.request_id} - could not find tsp : \'{request_param.trx_type}\' with mode: ' \
+ f'\'{request_param.mode}\' in eqpt library \nComputation stopped.'
raise ServiceError(msg) from e
# excel input are in GHz and dBm
if Request.spacing is not None:
self.spacing = Request.spacing * 1e9
if request_param.spacing is not None:
self.spacing = request_param.spacing * 1e9
else:
msg = f'Request {self.request_id} missing spacing: spacing is mandatory.\ncomputation stopped'
raise ServiceError(msg)
if Request.power is not None:
self.power = db2lin(Request.power) * 1e-3
else:
self.power = None
if Request.nb_channel is not None:
self.nb_channel = int(Request.nb_channel)
else:
self.nb_channel = None
self.power = None
if request_param.power is not None:
self.power = db2lin(request_param.power) * 1e-3
self.nb_channel = None
if request_param.nb_channel is not None:
self.nb_channel = int(request_param.nb_channel)
value = correct_xlrd_int_to_str_reading(Request.disjoint_from)
value = correct_xlrd_int_to_str_reading(request_param.disjoint_from)
self.disjoint_from = [n for n in value.split(' | ') if value]
self.nodes_list = []
if Request.nodes_list:
self.nodes_list = Request.nodes_list.split(' | ')
if request_param.nodes_list:
self.nodes_list = request_param.nodes_list.split(' | ')
self.loose = 'LOOSE'
if Request.is_loose.lower() == 'no':
if request_param.is_loose.lower() == 'no':
self.loose = 'STRICT'
self.path_bandwidth = None
if Request.path_bandwidth is not None:
self.path_bandwidth = Request.path_bandwidth * 1e9
else:
self.path_bandwidth = 0
uid = property(lambda self: repr(self))
self.path_bandwidth = 0
if request_param.path_bandwidth is not None:
self.path_bandwidth = request_param.path_bandwidth * 1e9
@property
def pathrequest(self):
"""Creates json dictionnary for the request
"""
# Default assumption for bidir is False
req_dictionnary = {
'request-id': self.request_id,
@@ -152,29 +197,32 @@ class Request_element(Element):
@property
def pathsync(self):
"""Creates json dictionnary for disjunction list (synchronization vector)
"""
if self.disjoint_from:
return {'synchronization-id': self.request_id,
'svec': {
'relaxable': 'false',
'disjointness': 'node link',
'request-id-number': [self.request_id] + [n for n in self.disjoint_from]
'request-id-number': [self.request_id] + list(self.disjoint_from)
}
}
else:
return None
return None
# TO-DO: avoid multiple entries with same synchronisation vectors
@property
def json(self):
"""Returns the json dictionnary for requests and for synchronisation vector
"""
return self.pathrequest, self.pathsync
def read_service_sheet(
input_filename,
eqpt,
network,
network_filename=None,
bidir=False):
input_filename: Path,
eqpt: Dict,
network: DiGraph,
network_filename: Path = None,
bidir: bool = False) -> Dict:
""" converts a service sheet into a json structure
"""
if network_filename is None:
@@ -184,19 +232,16 @@ def read_service_sheet(
req = correct_xls_route_list(network_filename, network, req)
# if there is no sync vector , do not write any synchronization
synchro = [n.json[1] for n in req if n.json[1] is not None]
data = {'path-request': [n.json[0] for n in req]}
if synchro:
data = {
'path-request': [n.json[0] for n in req],
'synchronization': synchro
}
else:
data = {
'path-request': [n.json[0] for n in req]
}
data['synchronization'] = synchro
return data
def correct_xlrd_int_to_str_reading(v):
"""Utils: ensure that int values in id are read as strings containing the int and
do not use the automatic float conversion from xlrd
"""
if not isinstance(v, str):
value = str(int(v))
if value.endswith('.0'):
@@ -206,22 +251,27 @@ def correct_xlrd_int_to_str_reading(v):
return value
def parse_row(row, fieldnames):
def parse_row(row: List, fieldnames: List[str]) -> Dict:
"""Reads each values in a row and creates a dict using field names
"""
return {f: r.value for f, r in zip(fieldnames, row[0:SERVICES_COLUMN])
if r.ctype != XL_CELL_EMPTY}
def parse_excel(input_filename):
def parse_excel(input_filename: Path) -> List[Request]:
"""Opens xls_file and reads 'Service' sheet
Returns the list of services data in Request class
"""
with open_workbook(input_filename) as wb:
service_sheet = wb.sheet_by_name('Service')
services = list(parse_service_sheet(service_sheet))
return services
def parse_service_sheet(service_sheet):
def parse_service_sheet(service_sheet) -> Request:
""" reads each column according to authorized fieldnames. order is not important.
"""
logger.debug(f'Validating headers on {service_sheet.name!r}')
logger.debug('Validating headers on %r', service_sheet.name)
# add a test on field to enable the '' field case that arises when columns on the
# right hand side are used as comments or drawing in the excel sheet
header = [x.value.strip() for x in service_sheet.row(4)[0:SERVICES_COLUMN]
@@ -239,14 +289,14 @@ def parse_service_sheet(service_sheet):
'routing: is loose?': 'is_loose', 'path bandwidth': 'path_bandwidth'}
try:
service_fieldnames = [authorized_fieldnames[e] for e in header]
except KeyError:
except KeyError as e:
msg = f'Malformed header on Service sheet: {header} field not in {authorized_fieldnames}'
raise ValueError(msg)
raise ValueError(msg) from e
for row in all_rows(service_sheet, start=5):
yield Request(**parse_row(row[0:SERVICES_COLUMN], service_fieldnames))
def check_end_points(pathreq, network):
def check_end_points(pathreq: Request_element, network: DiGraph):
"""Raise error if end point is not correct
"""
transponders = [n.uid for n in network.nodes() if isinstance(n, Transceiver)]
@@ -270,21 +320,21 @@ def find_node_sugestion(n_id, corresp_roadm, corresp_fused, corresp_ila, network
# check that n_id is in the node list, if not find a correspondance name
if n_id in roadmtype + edfatype:
return [n_id]
else:
# checks first roadm, fused, and ila in this order, because ila automatic name
# contain roadm names. If it is a fused node, next ila names might be correct
# suggestions, especially if following fibers were splitted and ila names
# created with the name of the fused node
if n_id in corresp_roadm.keys():
return corresp_roadm[n_id]
if n_id in corresp_fused.keys():
return corresp_fused[n_id] + corresp_ila[n_id]
if n_id in corresp_ila.keys():
return corresp_ila[n_id]
return []
# checks first roadm, fused, and ila in this order, because ila automatic name
# contains roadm names. If it is a fused node, next ila names might be correct
# suggestions, especially if following fibers were splitted and ila names
# created with the name of the fused node
if n_id in corresp_roadm.keys():
return corresp_roadm[n_id]
if n_id in corresp_fused.keys():
return corresp_fused[n_id] + corresp_ila[n_id]
if n_id in corresp_ila.keys():
return corresp_ila[n_id]
return []
def correct_xls_route_list(network_filename, network, pathreqlist):
def correct_xls_route_list(network_filename: Path, network: DiGraph,
pathreqlist: List[Request_element]) -> List[Request_element]:
""" prepares the format of route list of nodes to be consistant with nodes names:
remove wrong names, find correct names for ila, roadm and fused if the entry was
xls.
@@ -324,7 +374,7 @@ def correct_xls_route_list(network_filename, network, pathreqlist):
# if there is more than one suggestion, we need to choose the direction
# we rely on the next node provided by the user for this purpose
new_n = next(n for n in nodes_suggestion
if n in next_node.keys()
if n in next_node
and next_node[n] in temp.nodes_list[i:] + [pathreq.destination]
and next_node[n] not in temp.nodes_list[:i])
elif len(nodes_suggestion) == 1:
@@ -340,11 +390,9 @@ def correct_xls_route_list(network_filename, network, pathreqlist):
logger.info(msg)
pathreq.nodes_list.remove(n_id)
continue
else:
msg = f'{pathreq.request_id}: Could not find node:\n\t\'{n_id}\' in network' \
+ ' topology. Strict constraint can not be applied.'
logger.critical(msg)
raise ServiceError(msg)
msg = f'{pathreq.request_id}: Could not find node:\n\t\'{n_id}\' in network' \
+ ' topology. Strict constraint can not be applied.'
raise ServiceError(msg)
if new_n != n_id:
# warns the user when the correct name is used only in verbose mode,
# eg 'a' is a roadm and correct name is 'roadm a' or when there was
@@ -365,9 +413,8 @@ def correct_xls_route_list(network_filename, network, pathreqlist):
else:
if temp.loose == 'LOOSE':
msg = f'{pathreq.request_id}: Invalid route node specified:\n\t\'{n_id}\'' \
+ ' type is not supported as constraint with xls network input,' \
+ ' skipped!'
logger.info(msg)
+ ' type is not supported as constraint with xls network input, skipped!'
logger.warning(msg)
pathreq.nodes_list.remove(n_id)
else:
msg = f'{pathreq.request_id}: Invalid route node specified \n\t\'{n_id}\'' \