mirror of
https://github.com/Telecominfraproject/oopt-gnpy.git
synced 2025-10-30 17:47:50 +00:00
can pick up the best NF amplifier among several amplifiers sharing the same gain and/or power requirements when these requirements are not satisfied (used to take the highest gain or the highest power) Signed-off-by: Jean-Luc Auge <jeanluc.auge@orange.com>
378 lines
14 KiB
Python
378 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
|
|
'''
|
|
gnpy.core.network
|
|
=================
|
|
|
|
This module contains functions for constructing networks of network elements.
|
|
'''
|
|
|
|
from gnpy.core.convert import convert_file
|
|
from networkx import DiGraph
|
|
from numpy import arange
|
|
from logging import getLogger
|
|
from operator import itemgetter
|
|
from gnpy.core import elements
|
|
from gnpy.core.elements import Fiber, Edfa, Transceiver, Roadm, Fused
|
|
from gnpy.core.equipment import edfa_nf
|
|
from gnpy.core.units import UNITS
|
|
from gnpy.core.utils import load_json
|
|
from gnpy.core.utils import round2float
|
|
from gnpy.core.utils import db2lin, lin2db
|
|
from sys import exit
|
|
from collections import namedtuple
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
def load_network(filename, equipment):
|
|
json_filename = ''
|
|
if filename.suffix.lower() == '.xls':
|
|
logger.info('Automatically generating topology JSON file')
|
|
json_filename = convert_file(filename)
|
|
elif filename.suffix.lower() == '.json':
|
|
json_filename = filename
|
|
else:
|
|
raise ValueError(f'unsuported topology filename extension {filename.suffix.lower()}')
|
|
json_data = load_json(json_filename)
|
|
return network_from_json(json_data, equipment)
|
|
|
|
def network_from_json(json_data, equipment):
|
|
# NOTE|dutc: we could use the following, but it would tie our data format
|
|
# too closely to the graph library
|
|
# from networkx import node_link_graph
|
|
g = DiGraph()
|
|
for el_config in json_data['elements']:
|
|
typ = el_config.pop('type')
|
|
variety = el_config.pop('type_variety', 'default')
|
|
if typ in equipment and variety in equipment[typ]:
|
|
extra_params = equipment[typ][variety]
|
|
el_config.setdefault('params', {}).update(extra_params._asdict())
|
|
elif typ in ['Edfa', 'Fiber']: #catch it now because the code will crash later!
|
|
print( f'The {typ} of variety type {variety} was not recognized:'
|
|
'\nplease check it is properly defined in the eqpt_config json file')
|
|
exit()
|
|
cls = getattr(elements, typ)
|
|
el = cls(**el_config)
|
|
g.add_node(el)
|
|
|
|
nodes = {k.uid: k for k in g.nodes()}
|
|
|
|
for cx in json_data['connections']:
|
|
from_node, to_node = cx['from_node'], cx['to_node']
|
|
g.add_edge(nodes[from_node], nodes[to_node])
|
|
|
|
return g
|
|
|
|
def select_edfa(gain_target, power_target, equipment):
|
|
"""amplifer selection algorithm
|
|
@Orange Jean-Luc Augé
|
|
"""
|
|
Edfa_list = namedtuple('Edfa_list', 'variety power gain nf')
|
|
TARGET_EXTENDED_GAIN = 2.1
|
|
#MAX_EXTENDED_GAIN = 5
|
|
edfa_dict = equipment['Edfa']
|
|
pin = power_target - gain_target
|
|
|
|
edfa_list = [Edfa_list(
|
|
variety=edfa_variety,
|
|
power=min(
|
|
pin
|
|
+edfa.gain_flatmax
|
|
+TARGET_EXTENDED_GAIN,
|
|
edfa.p_max
|
|
)
|
|
-power_target,
|
|
gain=edfa.gain_flatmax-gain_target,
|
|
nf=edfa_nf(gain_target, edfa_variety, equipment)) \
|
|
for edfa_variety, edfa in edfa_dict.items()
|
|
if edfa.allowed_for_design]
|
|
|
|
acceptable_gain_list = \
|
|
list(filter(lambda x : x.gain>-TARGET_EXTENDED_GAIN, edfa_list))
|
|
if len(acceptable_gain_list) < 1:
|
|
#no amplifier satisfies the required gain, so pick the highest gain:
|
|
gain_max = max(edfa_list, key=itemgetter(2)).gain
|
|
#pick up all amplifiers that share this max gain:
|
|
acceptable_gain_list = \
|
|
list(filter(lambda x : x.gain-gain_max>-0.1, edfa_list))
|
|
acceptable_power_list = \
|
|
list(filter(lambda x : x.power>=0, acceptable_gain_list))
|
|
if len(acceptable_power_list) < 1:
|
|
#no amplifier satisfies the required power, so pick the highest power:
|
|
power_max = \
|
|
max(acceptable_gain_list, key=itemgetter(1)).power
|
|
#pick up all amplifiers that share this max gain:
|
|
acceptable_power_list = \
|
|
list(filter(lambda x : x.power-power_max>-0.1, acceptable_gain_list))
|
|
# gain and power requirements are resolved,
|
|
# =>chose the amp with the best NF among the acceptable ones:
|
|
return min(acceptable_power_list, key=itemgetter(3)).variety #filter on NF
|
|
|
|
def set_roadm_loss(network, equipment, pref_ch_db):
|
|
roadms = [roadm for roadm in network if isinstance(roadm, Roadm)]
|
|
power_mode = equipment['Spans']['default'].power_mode
|
|
default_roadm_loss = equipment['Roadms']['default'].gain_mode_default_loss
|
|
pref_roadm_db = equipment['Roadms']['default'].power_mode_pref
|
|
roadm_loss = pref_ch_db - pref_roadm_db
|
|
|
|
for roadm in roadms:
|
|
if power_mode:
|
|
roadm.loss = roadm_loss
|
|
elif roadm.loss == None:
|
|
roadm.loss = default_roadm_loss
|
|
|
|
def target_power(network, node, equipment): #get_fiber_dp
|
|
SPAN_LOSS_REF = 20
|
|
POWER_SLOPE = 0.3
|
|
power_mode = equipment['Spans']['default'].power_mode
|
|
dp_range = list(equipment['Spans']['default'].delta_power_range_db)
|
|
node_loss = span_loss(network, node)
|
|
try:
|
|
dp = round2float((node_loss - SPAN_LOSS_REF) * POWER_SLOPE, dp_range[2])
|
|
dp = max(dp_range[0], dp)
|
|
dp = min(dp_range[1], dp)
|
|
except KeyError:
|
|
print(f'invalid delta_power_range_db definition in eqpt_config[Spans]'
|
|
f'delta_power_range_db: [lower_bound, upper_bound, step]')
|
|
exit()
|
|
if isinstance(node, Roadm) or not power_mode:
|
|
dp = 0
|
|
print(f'{repr(node)} delta power in:\n{dp}dB')
|
|
return dp
|
|
|
|
|
|
def prev_node_generator(network, node):
|
|
"""fused spans interest:
|
|
iterate over all predecessors while they are Fused or Fiber type"""
|
|
prev_node = next(n for n in network.predecessors(node))
|
|
# yield and re-iterate
|
|
if isinstance(prev_node, Fused) or isinstance(node, Fused):
|
|
yield prev_node
|
|
yield from prev_node_generator(network, prev_node)
|
|
else:
|
|
StopIteration
|
|
|
|
def next_node_generator(network, node):
|
|
"""fused spans interest:
|
|
iterate over all successors while they are Fused or Fiber type"""
|
|
next_node = next(n for n in network.successors(node))
|
|
# yield and re-iterate
|
|
if isinstance(next_node, Fused) or isinstance(node, Fused):
|
|
yield next_node
|
|
yield from next_node_generator(network, next_node)
|
|
else:
|
|
StopIteration
|
|
|
|
def span_loss(network, node):
|
|
"""Fused span interest:
|
|
return the total span loss of all the fibers spliced by a Fused node"""
|
|
loss = node.loss if node.passive else 0
|
|
try:
|
|
prev_node = next(n for n in network.predecessors(node))
|
|
if isinstance(prev_node, Fused):
|
|
loss += sum(n.loss for n in prev_node_generator(network, node))
|
|
except StopIteration:
|
|
pass
|
|
try:
|
|
next_node = next(n for n in network.successors(node))
|
|
if isinstance(next_node, Fused):
|
|
loss += sum(n.loss for n in next_node_generator(network, node))
|
|
except StopIteration:
|
|
pass
|
|
return loss
|
|
|
|
def find_first_node(network, node):
|
|
"""Fused node interest:
|
|
returns the 1st node at the origin of a succession of fused nodes
|
|
(aka no amp in between)"""
|
|
this_node = node
|
|
for this_node in prev_node_generator(network, node):
|
|
pass
|
|
return this_node
|
|
|
|
def find_last_node(network, node):
|
|
"""Fused node interest:
|
|
returns the last node in a succession of fused nodes
|
|
(aka no amp in between)"""
|
|
this_node = node
|
|
for this_node in next_node_generator(network, node):
|
|
pass
|
|
return this_node
|
|
|
|
def set_amplifier_voa(amp, pref_total_db):
|
|
#TODO!!
|
|
gain_target = amp.operational.gain_target
|
|
p_max = amp.params.p_max
|
|
gain_flatmax = amp.params.gain_flatmax
|
|
pin = pref_total_db + amp.dp_db
|
|
|
|
def set_egress_amplifier(network, roadm, equipment, pref_total_db):
|
|
power_mode = equipment['Spans']['default'].power_mode
|
|
next_oms = (n for n in network.successors(roadm) if not isinstance(n, Transceiver))
|
|
for oms in next_oms:
|
|
#go through all the OMS departing from the Roadm
|
|
node = roadm
|
|
prev_node = roadm
|
|
next_node = oms
|
|
# if isinstance(next_node, Fused): #support ROADM wo egress amp for metro applications
|
|
# node = find_last_node(next_node)
|
|
# next_node = next(n for n in network.successors(node))
|
|
# next_node = find_last_node(next_node)
|
|
prev_dp = 0
|
|
dp = 0
|
|
while True:
|
|
#go through all nodes in the OMS (loop until next Roadm instance)
|
|
if isinstance(node, Edfa):
|
|
node_loss = span_loss(network, prev_node)
|
|
dp = target_power(network, next_node, equipment)
|
|
if node.operational.gain_target > 0:
|
|
gain_target = node.operational.gain_target
|
|
dp = prev_dp + gain_target - node_loss
|
|
else :
|
|
gain_target = node_loss + dp - prev_dp
|
|
if power_mode:
|
|
node.dp_db = dp
|
|
node.operational.gain_target = gain_target
|
|
if node.params.type_variety == '':
|
|
power_target = pref_total_db + dp
|
|
edfa_variety = select_edfa(gain_target, power_target, equipment)
|
|
extra_params = equipment['Edfa'][edfa_variety]
|
|
node.params.update_params(extra_params._asdict())
|
|
if isinstance(next_node, Roadm) or isinstance(next_node, Transceiver):
|
|
break
|
|
prev_dp = dp
|
|
prev_node = node
|
|
node = next_node
|
|
next_node = next(n for n in network.successors(node))
|
|
|
|
|
|
def add_egress_amplifier(network, node):
|
|
next_nodes = [n for n in network.successors(node)
|
|
if not (isinstance(n, Transceiver) or isinstance(n, Fused) or isinstance(n, Edfa))]
|
|
#no amplification for fused spans or TRX
|
|
for i, next_node in enumerate(next_nodes):
|
|
network.remove_edge(node, next_node)
|
|
amp = Edfa(
|
|
uid = f'Edfa{i}_{node.uid}',
|
|
params = {},
|
|
operational = {
|
|
'gain_target': 0,
|
|
'tilt_target': 0,
|
|
})
|
|
network.add_node(amp)
|
|
network.add_edge(node, amp)
|
|
network.add_edge(amp, next_node)
|
|
|
|
|
|
def calculate_new_length(fiber_length, bounds, target_length):
|
|
if fiber_length < bounds.stop:
|
|
return fiber_length, 1
|
|
|
|
n_spans = int(fiber_length // target_length)
|
|
|
|
length1 = fiber_length / (n_spans+1)
|
|
delta1 = target_length-length1
|
|
result1 = (length1, n_spans+1)
|
|
|
|
length2 = fiber_length / n_spans
|
|
delta2 = length2-target_length
|
|
result2 = (length2, n_spans)
|
|
|
|
if (bounds.start<=length1<=bounds.stop) and not(bounds.start<=length2<=bounds.stop):
|
|
result = result1
|
|
elif (bounds.start<=length2<=bounds.stop) and not(bounds.start<=length1<=bounds.stop):
|
|
result = result2
|
|
else:
|
|
result = result1 if delta1 < delta2 else result2
|
|
|
|
return result
|
|
|
|
|
|
def split_fiber(network, fiber, bounds, target_length, equipment):
|
|
new_length, n_spans = calculate_new_length(fiber.length, bounds, target_length)
|
|
if n_spans == 1:
|
|
return
|
|
|
|
try:
|
|
next_node = next(network.successors(fiber))
|
|
prev_node = next(network.predecessors(fiber))
|
|
except StopIteration:
|
|
print(f'{repr(fiber)} is not properly connected, please check network topology')
|
|
exit()
|
|
|
|
network.remove_edge(fiber, next_node)
|
|
network.remove_edge(prev_node, fiber)
|
|
network.remove_node(fiber)
|
|
# update connector loss parameter with default values
|
|
fiber_params = fiber.params._asdict()
|
|
fiber_params['con_in'] = fiber.con_in
|
|
fiber_params['con_out'] = fiber.con_out
|
|
new_spans = [
|
|
Fiber(
|
|
uid = f'{fiber.uid}_({span}/{n_spans})',
|
|
metadata = fiber.metadata,
|
|
params = fiber_params
|
|
) for span in range(n_spans)
|
|
]
|
|
for new_span in new_spans:
|
|
new_span.length = new_length
|
|
network.add_node(new_span)
|
|
network.add_edge(prev_node, new_span)
|
|
prev_node = new_span
|
|
network.add_edge(prev_node, next_node)
|
|
|
|
def add_connector_loss(fibers, con_in, con_out):
|
|
for fiber in fibers:
|
|
if fiber.con_in is None: fiber.con_in = con_in
|
|
if fiber.con_out is None: fiber.con_out = con_out
|
|
|
|
def add_fiber_padding(network, fibers, padding):
|
|
"""last_fibers = (fiber for n in network.nodes()
|
|
if not (isinstance(n, Fiber) or isinstance(n, Fused))
|
|
for fiber in network.predecessors(n)
|
|
if isinstance(fiber, Fiber))"""
|
|
for fiber in fibers:
|
|
fiber_loss = span_loss(network, fiber)
|
|
next_node = next(network.successors(fiber))
|
|
if fiber_loss < padding and not (isinstance(next_node, Fused)):
|
|
#add a padding att_in at the input of the 1st fiber:
|
|
#address the case when several fibers are spliced together
|
|
first_fiber = find_first_node(network, fiber)
|
|
first_fiber.att_in = padding - fiber_loss
|
|
|
|
def build_network(network, equipment, pref_ch_db, pref_total_db):
|
|
default_span_data = equipment['Spans']['default']
|
|
max_length = int(default_span_data.max_length * UNITS[default_span_data.length_units])
|
|
min_length = max(int(default_span_data.padding/0.2*1e3),50_000)
|
|
bounds = range(min_length, max_length)
|
|
target_length = max(min_length, 90_000)
|
|
con_in = default_span_data.con_in
|
|
con_out = default_span_data.con_out + default_span_data.EOL
|
|
padding = default_span_data.padding
|
|
|
|
#set raodm loss for gain_mode before to build network
|
|
set_roadm_loss(network, equipment, pref_ch_db)
|
|
fibers = [f for f in network.nodes() if isinstance(f, Fiber)]
|
|
add_connector_loss(fibers, con_in, con_out)
|
|
add_fiber_padding(network, fibers, padding)
|
|
# don't group split fiber and add amp in the same loop
|
|
# =>for code clarity (at the expense of speed):
|
|
for fiber in fibers:
|
|
split_fiber(network, fiber, bounds, target_length, equipment)
|
|
|
|
amplified_nodes = [n for n in network.nodes()
|
|
if isinstance(n, Fiber) or isinstance(n, Roadm)]
|
|
for node in amplified_nodes:
|
|
add_egress_amplifier(network, node)
|
|
|
|
roadms = [r for r in network.nodes() if isinstance(r, Roadm)]
|
|
for roadm in roadms:
|
|
set_egress_amplifier(network, roadm, equipment, pref_total_db)
|
|
|
|
#support older json input topology wo Roadms:
|
|
if len(roadms) == 0:
|
|
trx = [t for t in network.nodes() if isinstance(t, Transceiver)]
|
|
for t in trx:
|
|
set_egress_amplifier(network, t, equipment, pref_total_db)
|
|
|