mirror of
				https://github.com/Telecominfraproject/oopt-gnpy.git
				synced 2025-10-31 01:57:54 +00:00 
			
		
		
		
	 c384af8062
			
		
	
	c384af8062
	
	
	
		
			
			Signed-off-by: EstherLerouzic <esther.lerouzic@orange.com> Change-Id: I00018c38b0cc0ceefcd21d50dd0cdc639019cc70
		
			
				
	
	
		
			1101 lines
		
	
	
		
			52 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1101 lines
		
	
	
		
			52 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| """
 | |
| gnpy.core.network
 | |
| =================
 | |
| 
 | |
| Working with networks which consist of network elements
 | |
| """
 | |
| 
 | |
| from operator import attrgetter
 | |
| from collections import namedtuple
 | |
| from logging import getLogger
 | |
| from typing import Tuple, List, Optional, Union
 | |
| from networkx import DiGraph
 | |
| 
 | |
| from gnpy.core import elements
 | |
| from gnpy.core.exceptions import ConfigurationError, NetworkTopologyError
 | |
| from gnpy.core.utils import round2float, convert_length, psd2powerdbm, lin2db, watt2dbm, dbm2watt, automatic_nch, \
 | |
|     find_common_range
 | |
| from gnpy.core.info import ReferenceCarrier, create_input_spectral_information
 | |
| from gnpy.core.parameters import SimParams, EdfaParams
 | |
| from gnpy.core.science_utils import RamanSolver
 | |
| 
 | |
| 
 | |
| logger = getLogger(__name__)
 | |
| 
 | |
| 
 | |
| def edfa_nf(gain_target: float, amp_params) -> float:
 | |
|     """Calculates the noise figure (NF) of an EDFA (Erbium-Doped Fiber Amplifier)
 | |
|     based on the specified gain target and amplifier parameters.
 | |
| 
 | |
|     This function creates an EDFA instance with the given parameters and
 | |
|     computes its noise figure using the internal calculation method.
 | |
| 
 | |
|     Parameters:
 | |
|     -----------
 | |
|     gain_target : float
 | |
|         The target gain for which the noise figure is to be calculated.
 | |
|     amp_params : object
 | |
|         An object containing the amplifier parameters.
 | |
| 
 | |
|     Returns:
 | |
|     --------
 | |
|     float
 | |
|         The calculated noise figure (NF) of the EDFA in dB.
 | |
|     """
 | |
|     amp = elements.Edfa(
 | |
|         uid='calc_NF',
 | |
|         params=amp_params.__dict__,
 | |
|         operational={
 | |
|             'gain_target': gain_target,
 | |
|             'tilt_target': 0
 | |
|         }
 | |
|     )
 | |
|     amp.pin_db = 0
 | |
|     amp.nch = 88
 | |
|     amp.slot_width = 50e9
 | |
|     return amp._calc_nf(True)
 | |
| 
 | |
| 
 | |
| def select_edfa(raman_allowed: bool, gain_target: float, power_target: float, edfa_eqpt: dict, uid: str,
 | |
|                 target_extended_gain: float, restrictions: Optional[List[str]] = None, verbose: Optional[bool] = True) \
 | |
|                 -> Tuple[str, float]:
 | |
|     """Selects an amplifier within a library based on specified parameters.
 | |
| 
 | |
|     This function implements an amplifier selection algorithm that considers
 | |
|     various constraints, including gain and power targets, as well as
 | |
|     restrictions on amplifier types. It can also handle Raman amplifiers
 | |
|     if allowed.
 | |
| 
 | |
|     Parameters:
 | |
|     -----------
 | |
|     raman_allowed : bool
 | |
|         Indicates whether Raman amplifiers are permitted in the selection.
 | |
|     gain_target : float
 | |
|         The target gain that the selected amplifier should achieve.
 | |
|     power_target : float
 | |
|         The target power level for the amplifier.
 | |
|     edfa_eqpt : dict
 | |
|         A dictionary containing available EDFA equipment, where keys are
 | |
|         amplifier names and values are amplifier objects.
 | |
|     uid : str
 | |
|         A unique identifier for the node where the amplifier will be used.
 | |
|     target_extended_gain : float
 | |
|         The extended gain target derived from configuration settings.
 | |
|     restrictions : Optional[List[str]], default=None
 | |
|         A list of amplifier names that are restricted from selection.
 | |
|     verbose : Optional[bool], default=True
 | |
|         If True, enables verbose logging of warnings and information.
 | |
| 
 | |
|     Returns:
 | |
|     --------
 | |
|     Tuple[str, float]
 | |
|         A tuple containing the selected amplifier's variety and the power
 | |
|         reduction applied (if any).
 | |
| 
 | |
|     Raises:
 | |
|     -------
 | |
|     ConfigurationError
 | |
|         If no amplifiers meet the minimum gain requirement for the specified
 | |
|         node.
 | |
| 
 | |
|     Notes:
 | |
|     ------
 | |
|     - The function considers both gain and power limitations when selecting
 | |
|       an amplifier.
 | |
|     - If no suitable amplifier is found or if the target gain exceeds the
 | |
|       capabilities of available amplifiers, a warning is logged.
 | |
| 
 | |
|     Author:
 | |
|     -------
 | |
|     Jean-Luc Augé
 | |
|     """
 | |
|     Edfa_list = namedtuple('Edfa_list', 'variety power gain_min nf')
 | |
| 
 | |
|     # for roadm restriction only: create a dict including not allowed for design amps
 | |
|     # because main use case is to have specific radm amp which are not allowed for ILA
 | |
|     # with the auto design
 | |
|     edfa_dict = {name: amp for (name, amp) in edfa_eqpt.items()
 | |
|                  if restrictions is None or name in restrictions}
 | |
| 
 | |
|     pin = power_target - gain_target
 | |
| 
 | |
|     # create 2 list of available amplifiers with relevant attributes for their selection
 | |
| 
 | |
|     # edfa list with:
 | |
|     # extended gain min allowance of 3dB: could be parametrized, but a bit complex
 | |
|     # extended gain max allowance target_extended_gain is coming from eqpt_config.json
 | |
|     # power attribut include power AND gain limitations
 | |
|     edfa_list = [Edfa_list(
 | |
|         variety=edfa_variety,
 | |
|         power=min(pin + edfa.gain_flatmax + target_extended_gain, edfa.p_max) - power_target,
 | |
|         gain_min=gain_target + 3 - edfa.gain_min,
 | |
|         nf=edfa_nf(gain_target, edfa_eqpt[edfa_variety]))
 | |
|         for edfa_variety, edfa in edfa_dict.items()
 | |
|         if ((edfa.allowed_for_design or restrictions is not None) and not edfa.raman)]
 | |
| 
 | |
|     # consider a Raman list because of different gain_min requirement:
 | |
|     # do not allow extended gain min for Raman
 | |
|     raman_list = [Edfa_list(
 | |
|         variety=edfa_variety,
 | |
|         power=min(pin + edfa.gain_flatmax + target_extended_gain, edfa.p_max) - power_target,
 | |
|         gain_min=gain_target - edfa.gain_min,
 | |
|         nf=edfa_nf(gain_target, edfa_eqpt[edfa_variety]))
 | |
|         for edfa_variety, edfa in edfa_dict.items()
 | |
|         if (edfa.allowed_for_design and edfa.raman)] \
 | |
|         if raman_allowed else []
 | |
| 
 | |
|     # merge raman and edfa lists
 | |
|     amp_list = edfa_list + raman_list
 | |
| 
 | |
|     # filter on min gain limitation:
 | |
|     acceptable_gain_min_list = [x for x in amp_list if x.gain_min > 0]
 | |
| 
 | |
|     if len(acceptable_gain_min_list) < 1:
 | |
|         # do not take this empty list into account for the rest of the code
 | |
|         # but issue a warning to the user and do not consider Raman
 | |
|         # Raman below min gain should not be allowed because i is meant to be a design requirement
 | |
|         # and raman padding at the amplifier input is impossible!
 | |
| 
 | |
|         if len(edfa_list) < 1:
 | |
|             raise ConfigurationError(f'auto_design could not find any amplifier \
 | |
|                     to satisfy min gain requirement in node {uid} \
 | |
|                     please increase span fiber padding')
 | |
|         else:
 | |
|             if verbose:
 | |
|                 logger.warning(f'\n\tWARNING: target gain in node {uid} is below all available amplifiers min gain: '
 | |
|                                + '\n\tamplifier input padding will be assumed, consider increase span fiber padding '
 | |
|                                + 'instead.\n')
 | |
|             acceptable_gain_min_list = edfa_list
 | |
| 
 | |
|     # filter on gain+power limitation:
 | |
|     # this list checks both the gain and the power requirement
 | |
|     # because of the way .power is calculated in the list
 | |
|     acceptable_power_list = [x for x in acceptable_gain_min_list if x.power > 0]
 | |
|     if len(acceptable_power_list) < 1:
 | |
|         # no amplifier satisfies the required power, so pick the highest power(s):
 | |
|         power_max = max(acceptable_gain_min_list, key=attrgetter('power')).power
 | |
|         # check and pick if other amplifiers may have a similar gain/power
 | |
|         # allow a 0.3dB power range
 | |
|         # this allows to chose an amplifier with a better NF subsequentely
 | |
|         acceptable_power_list = [x for x in acceptable_gain_min_list
 | |
|                                  if x.power - power_max > -0.3]
 | |
| 
 | |
|     # gain and power requirements are resolved,
 | |
|     #       =>chose the amp with the best NF among the acceptable ones:
 | |
|     selected_edfa = min(acceptable_power_list, key=attrgetter('nf'))  # filter on NF
 | |
|     # check what are the gain and power limitations of this amp
 | |
|     power_reduction = min(selected_edfa.power, 0.0)
 | |
|     if power_reduction < -0.5 and verbose:
 | |
|         logger.warning(f'\n\tWARNING: target gain and power in node {uid}\n'
 | |
|                        + '\tis beyond all available amplifiers capabilities and/or extended_gain_range:\n'
 | |
|                        + f'\ta power reduction of {round(power_reduction, 2)} is applied\n')
 | |
|     return selected_edfa.variety, power_reduction
 | |
| 
 | |
| 
 | |
| def target_power(network, node, equipment):  # get_fiber_dp
 | |
|     """Computes target power using J. -L. Auge, V. Curri and E. Le Rouzic,
 | |
|     Open Design for Multi-Vendor Optical Networks, OFC 2019.
 | |
|     equation 4
 | |
|     """
 | |
|     if isinstance(node, elements.Roadm):
 | |
|         return 0
 | |
| 
 | |
|     SPAN_LOSS_REF = 20
 | |
|     POWER_SLOPE = 0.3
 | |
|     dp_range = list(equipment['Span']['default'].delta_power_range_db)
 | |
|     node_loss = span_loss(network, node, equipment)
 | |
|     try:
 | |
|         dp = round2float((node_loss - SPAN_LOSS_REF) * POWER_SLOPE, dp_range[2])
 | |
|         dp = max(dp_range[0], dp)
 | |
|         dp = min(dp_range[1], dp)
 | |
|     except IndexError:
 | |
|         raise ConfigurationError('invalid delta_power_range_db definition in eqpt_config[Span]'
 | |
|                                  'delta_power_range_db: [lower_bound, upper_bound, step]')
 | |
| 
 | |
|     return dp
 | |
| 
 | |
| 
 | |
| _fiber_fused_types = (elements.Fused, elements.Fiber)
 | |
| 
 | |
| 
 | |
| def prev_node_generator(network, node):
 | |
|     """fused spans interest:
 | |
|     iterate over all predecessors while they are either Fused or Fibers succeeded by Fused"""
 | |
|     try:
 | |
|         prev_node = next(network.predecessors(node))
 | |
|     except StopIteration:
 | |
|         if isinstance(node, elements.Transceiver):
 | |
|             return
 | |
|         raise NetworkTopologyError(f'Node {node.uid} is not properly connected, please check network topology')
 | |
|     if ((isinstance(prev_node, elements.Fused) and isinstance(node, _fiber_fused_types)) or
 | |
|             (isinstance(prev_node, _fiber_fused_types) and isinstance(node, elements.Fused))):
 | |
|         yield prev_node
 | |
|         yield from prev_node_generator(network, prev_node)
 | |
| 
 | |
| 
 | |
| def next_node_generator(network, node):
 | |
|     """fused spans interest:
 | |
|     iterate over all predecessors while they are either Fused or Fibers preceded by Fused"""
 | |
|     try:
 | |
|         next_node = next(network.successors(node))
 | |
|     except StopIteration:
 | |
|         if isinstance(node, elements.Transceiver):
 | |
|             return
 | |
|         raise NetworkTopologyError(f'Node {node.uid} is not properly connected, please check network topology')
 | |
| 
 | |
|     if ((isinstance(next_node, elements.Fused) and isinstance(node, _fiber_fused_types)) or
 | |
|             (isinstance(next_node, _fiber_fused_types) and isinstance(node, elements.Fused))):
 | |
|         yield next_node
 | |
|         yield from next_node_generator(network, next_node)
 | |
| 
 | |
| 
 | |
| def estimate_raman_gain(node, equipment, power_dbm):
 | |
|     """If node is RamanFiber, then estimate the possible Raman gain if any
 | |
|     for this purpose computes stimulated_raman_scattering loss_profile. This may be time consuming.
 | |
|     """
 | |
|     if isinstance(node, elements.RamanFiber):
 | |
|         if hasattr(node, "estimated_gain"):
 | |
|             return node.estimated_gain
 | |
|         f_min = equipment['SI']['default'].f_min
 | |
|         f_max = equipment['SI']['default'].f_max
 | |
|         roll_off = equipment['SI']['default'].roll_off
 | |
|         baud_rate = equipment['SI']['default'].baud_rate
 | |
|         power = dbm2watt(power_dbm)
 | |
|         spacing = equipment['SI']['default'].spacing
 | |
|         tx_osnr = equipment['SI']['default'].tx_osnr
 | |
| 
 | |
|         # reduce the nb of channels to speed up
 | |
|         spacing = spacing * 3
 | |
|         power = power * 3
 | |
| 
 | |
|         sim_params = {
 | |
|             "raman_params": {
 | |
|                 "flag": True,
 | |
|                 "result_spatial_resolution": 50e3,
 | |
|                 "solver_spatial_resolution": 100
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         # in order to take into account gain generated in RamanFiber, propagate in the RamanFiber with
 | |
|         if hasattr(node, "estimated_gain"):
 | |
|             # do not compute twice to save on time
 | |
|             return node.estimated_gain
 | |
|         spectral_info = create_input_spectral_information(f_min=f_min, f_max=f_max, roll_off=roll_off,
 | |
|                                                           baud_rate=baud_rate, tx_power=power, spacing=spacing,
 | |
|                                                           tx_osnr=tx_osnr)
 | |
|         pin = watt2dbm(sum(spectral_info.signal))
 | |
|         attenuation_in_db = node.params.con_in + node.params.att_in
 | |
|         spectral_info.apply_attenuation_db(attenuation_in_db)
 | |
|         save_sim_params = {"raman_params": SimParams._shared_dict['raman_params'].to_json(),
 | |
|                            "nli_params": SimParams._shared_dict['nli_params'].to_json()}
 | |
|         SimParams.set_params(sim_params)
 | |
|         stimulated_raman_scattering = RamanSolver.calculate_stimulated_raman_scattering(spectral_info, node)
 | |
|         attenuation_fiber = stimulated_raman_scattering.loss_profile[:spectral_info.number_of_channels, -1]
 | |
|         spectral_info.apply_attenuation_lin(attenuation_fiber)
 | |
|         attenuation_out_db = node.params.con_out
 | |
|         spectral_info.apply_attenuation_db(attenuation_out_db)
 | |
|         pout = watt2dbm(sum(spectral_info.signal))
 | |
|         estimated_loss = pin - pout
 | |
|         estimated_gain = node.loss - estimated_loss
 | |
|         node.estimated_gain = estimated_gain
 | |
|         SimParams.set_params(save_sim_params)
 | |
|         return round(estimated_gain, 2)
 | |
|     else:
 | |
|         return 0.0
 | |
| 
 | |
| 
 | |
| def span_loss(network, node, equipment, input_power=None):
 | |
|     """Total loss of a span (Fiber and Fused nodes) which contains the given node
 | |
|     Do not recompute, if it was already computed: records it in design_span_loss"""
 | |
|     if hasattr(node, "design_span_loss"):
 | |
|         return node.design_span_loss
 | |
|     loss = node.loss if node.passive else 0
 | |
|     loss += sum(n.loss for n in prev_node_generator(network, node))
 | |
|     loss += sum(n.loss for n in next_node_generator(network, node))
 | |
|     # add the possible Raman gain
 | |
|     gain = estimate_raman_gain(node, equipment, input_power)
 | |
|     gain += sum(estimate_raman_gain(n, equipment, input_power) for n in prev_node_generator(network, node))
 | |
|     gain += sum(estimate_raman_gain(n, equipment, input_power) for n in next_node_generator(network, node))
 | |
|     return loss - gain
 | |
| 
 | |
| 
 | |
| def find_first_node(network, node):
 | |
|     """Fused node interest:
 | |
|     returns the 1st node at the origin of a succession of fused nodes
 | |
|     (aka no amp in between)"""
 | |
|     this_node = node
 | |
|     for this_node in prev_node_generator(network, node):
 | |
|         pass
 | |
|     return this_node
 | |
| 
 | |
| 
 | |
| def find_last_node(network, node):
 | |
|     """Fused node interest:
 | |
|     returns the last node in a succession of fused nodes
 | |
|     (aka no amp in between)"""
 | |
|     this_node = node
 | |
|     for this_node in next_node_generator(network, node):
 | |
|         pass
 | |
|     return this_node
 | |
| 
 | |
| 
 | |
| def set_amplifier_voa(amp, power_target, power_mode):
 | |
|     VOA_MARGIN = 1  # do not maximize the VOA optimization
 | |
|     if amp.out_voa is None:
 | |
|         if power_mode and amp.params.out_voa_auto:
 | |
|             voa = min(amp.params.p_max - power_target,
 | |
|                       amp.params.gain_flatmax - amp.effective_gain)
 | |
|             voa = max(round2float(voa, 0.5) - VOA_MARGIN, 0)
 | |
|             amp.delta_p = amp.delta_p + voa
 | |
|             amp.effective_gain = amp.effective_gain + voa
 | |
|         else:
 | |
|             voa = 0  # no output voa optimization in gain mode
 | |
|         amp.out_voa = voa
 | |
| 
 | |
| 
 | |
| def get_oms_edge_list(oms_ingress_node: Union[elements.Roadm, elements.Transceiver], network: DiGraph) \
 | |
|         -> List[Tuple]:
 | |
|     """get the list of OMS edges (node, neighbour next node) starting from its ingress down to its egress
 | |
|     oms_ingress_node can be a ROADM or a Transceiver
 | |
|     """
 | |
|     oms_edges = []
 | |
|     node = oms_ingress_node
 | |
|     visited_nodes = []
 | |
|     # collect the OMS element list (ROADM to ROADM, or Transceiver to ROADM)
 | |
|     while not (isinstance(node, elements.Roadm) or isinstance(node, elements.Transceiver)):
 | |
|         next_node = get_next_node(node, network)
 | |
|         visited_nodes.append(node.uid)
 | |
|         if next_node.uid in visited_nodes:
 | |
|             raise NetworkTopologyError(f'Loop detected for {type(node).__name__} {node.uid}, '
 | |
|                                        + 'please check network topology')
 | |
|         oms_edges.append((node, next_node))
 | |
|         node = next_node
 | |
| 
 | |
|     return oms_edges
 | |
| 
 | |
| 
 | |
| def check_oms_single_type(oms_edges: List[Tuple]) -> List[str]:
 | |
|     """Verifies that the OMS only contains all single band amplifiers or all multi band amplifiers
 | |
|     No mixed OMS are permitted for the time being.
 | |
|     returns the amplifiers'type of the OMS
 | |
|     """
 | |
|     oms_types = {}
 | |
|     for node, _ in oms_edges:
 | |
|         if isinstance(node, elements.Edfa):
 | |
|             oms_types[node.uid] = 'Edfa'
 | |
|         elif isinstance(node, elements.Multiband_amplifier):
 | |
|             oms_types[node.uid] = 'Multiband_amplifier'
 | |
|     # checks that the element in the OMS are consistant (no multi-band mixed with single band)
 | |
|     types = set(list(oms_types.values()))
 | |
|     if len(types) > 1:
 | |
|         msg = 'type_variety Multiband ("Multiband_amplifier") and single band ("Edfa") cannot be mixed;\n' \
 | |
|             + f'Multiband amps: {[e for e in oms_types.keys() if oms_types[e] == "Multiband_amplifier"]}\n' \
 | |
|             + f'single band amps: {[e for e in oms_types.keys() if oms_types[e] == "Edfa"]}'
 | |
|         raise NetworkTopologyError(msg)
 | |
|     return list(types)
 | |
| 
 | |
| 
 | |
| def set_one_amplifier(node: elements.Edfa, prev_node, next_node, power_mode: bool, prev_voa: float, prev_dp: float,
 | |
|                       pref_ch_db: float, pref_total_db: float, network: DiGraph, equipment: dict, verbose: bool) \
 | |
|         -> Tuple[float, float]:
 | |
|     """Set the EDFA amplifier configuration based on power targets:
 | |
| 
 | |
|     This function adjusts the amplifier settings according to the specified parameters and
 | |
|     ensures compliance with power and gain targets. It handles both cases where the
 | |
|     amplifier type is specified or needs to be selected based on restrictions.
 | |
| 
 | |
|     Args:
 | |
|         node (elements.Edfa): The EDFA amplifier node to configure.
 | |
|         prev_node (elements.Node): The previous node in the network.
 | |
|         next_node (elements.Node): The next node in the network.
 | |
|         power_mode (bool): Indicates if the amplifier is in power mode.
 | |
|         prev_voa (float): The previous amplifier variable optical attenuator value.
 | |
|         prev_dp (float): The previous amplifier delta power.
 | |
|         pref_ch_db (float): reference per channel power in dB.
 | |
|         pref_total_db (float): reference total power in dB.
 | |
|         network (DiGraph): The network graph.
 | |
|         equipment (dict): Equipment library.
 | |
|         verbose (bool): Flag for verbose logging.
 | |
| 
 | |
|     Returns:
 | |
|         tuple[float, float]: The updated delta power and variable optical attenuator values.
 | |
|     """
 | |
|     node_loss = span_loss(network, prev_node, equipment)
 | |
|     voa = node.out_voa if node.out_voa else 0
 | |
|     if node.operational.delta_p is None:
 | |
|         dp = target_power(network, next_node, equipment) + voa
 | |
|     else:
 | |
|         dp = node.operational.delta_p
 | |
|     if node.effective_gain is None or power_mode:
 | |
|         gain_target = node_loss + dp - prev_dp + prev_voa
 | |
|     else:  # gain mode with effective_gain
 | |
|         gain_target = node.effective_gain
 | |
|         dp = prev_dp - node_loss - prev_voa + gain_target
 | |
| 
 | |
|     power_target = pref_total_db + dp
 | |
| 
 | |
|     if isinstance(prev_node, elements.Fiber):
 | |
|         max_fiber_lineic_loss_for_raman = \
 | |
|             equipment['Span']['default'].max_fiber_lineic_loss_for_raman * 1e-3  # dB/m
 | |
|         raman_allowed = (prev_node.params.loss_coef < max_fiber_lineic_loss_for_raman).all()
 | |
|     else:
 | |
|         raman_allowed = False
 | |
| 
 | |
|     if node.params.type_variety == '':
 | |
|         if node.variety_list and isinstance(node.variety_list, list):
 | |
|             restrictions = node.variety_list
 | |
|         elif isinstance(prev_node, elements.Roadm) and prev_node.restrictions['booster_variety_list']:
 | |
|             # implementation of restrictions on roadm boosters
 | |
|             restrictions = prev_node.restrictions['booster_variety_list']
 | |
|         elif isinstance(next_node, elements.Roadm) and next_node.restrictions['preamp_variety_list']:
 | |
|             # implementation of restrictions on roadm preamp
 | |
|             restrictions = next_node.restrictions['preamp_variety_list']
 | |
|         else:
 | |
|             restrictions = None
 | |
|         edfa_eqpt = {n: a for n, a in equipment['Edfa'].items() if a.type_def != 'multi_band'}
 | |
|         edfa_variety, power_reduction = \
 | |
|             select_edfa(raman_allowed, gain_target, power_target, edfa_eqpt,
 | |
|                         node.uid,
 | |
|                         target_extended_gain=equipment['Span']['default'].target_extended_gain,
 | |
|                         restrictions=restrictions, verbose=verbose)
 | |
|         extra_params = equipment['Edfa'][edfa_variety]
 | |
|         node.params.update_params(extra_params.__dict__)
 | |
|         dp += power_reduction
 | |
|         gain_target += power_reduction
 | |
|     else:
 | |
|         # Check power saturation also in this case
 | |
|         p_max = equipment['Edfa'][node.params.type_variety].p_max
 | |
|         if power_mode:
 | |
|             power_reduction = min(0, p_max - (pref_total_db + dp))
 | |
|         else:
 | |
|             pout = pref_total_db + prev_dp - node_loss - prev_voa + gain_target
 | |
|             power_reduction = min(0, p_max - pout)
 | |
|         dp += power_reduction
 | |
|         gain_target += power_reduction
 | |
|         if node.params.raman and not raman_allowed:
 | |
|             if isinstance(prev_node, elements.Fiber) and verbose:
 | |
|                 logger.warning(f'\n\tWARNING: raman is used in node {node.uid}\n '
 | |
|                                + '\tbut fiber lineic loss is above threshold\n')
 | |
|             else:
 | |
|                 logger.warning(f'\n\tWARNING: raman is used in node {node.uid}\n '
 | |
|                                + '\tbut previous node is not a fiber\n')
 | |
|         # if variety is imposed by user, and if the gain_target (computed or imposed) is also above
 | |
|         # variety max gain + extended range, then warn that gain > max_gain + extended range
 | |
|         if gain_target - equipment['Edfa'][node.params.type_variety].gain_flatmax - \
 | |
|                 equipment['Span']['default'].target_extended_gain > 1e-2 and verbose:
 | |
|             # 1e-2 to allow a small margin according to round2float min step
 | |
|             logger.warning(f'\n\tWARNING: effective gain in Node {node.uid}\n'
 | |
|                            + f'\tis above user specified amplifier {node.params.type_variety}\n'
 | |
|                            + '\tmax flat gain: '
 | |
|                            + f'{equipment["Edfa"][node.params.type_variety].gain_flatmax}dB ; '
 | |
|                            + f'required gain: {round(gain_target, 2)}dB. Please check amplifier type.\n')
 | |
| 
 | |
|     node.delta_p = dp if power_mode else None
 | |
|     node.effective_gain = gain_target
 | |
|     # if voa is not set, then set it and possibly optimize it with gain and update delta_p and
 | |
|     # effective_gain values
 | |
|     set_amplifier_voa(node, power_target, power_mode)
 | |
|     # set_amplifier_voa may change delta_p in power_mode
 | |
|     node._delta_p = node.delta_p if power_mode else dp
 | |
| 
 | |
|     # target_pch_out_dbm records target power for design: If user defines one, then this is displayed,
 | |
|     # else display the one computed during design
 | |
|     if node.delta_p is not None and node.operational.delta_p is not None:
 | |
|         # use the user defined target
 | |
|         node.target_pch_out_dbm = round(node.operational.delta_p + pref_ch_db, 2)
 | |
|     elif node.delta_p is not None:
 | |
|         # use the design target if no target were set
 | |
|         node.target_pch_out_dbm = round(node.delta_p + pref_ch_db, 2)
 | |
|     elif node.delta_p is None:
 | |
|         node.target_pch_out_dbm = None
 | |
|     return dp, voa
 | |
| 
 | |
| 
 | |
| def set_egress_amplifier(network: DiGraph, this_node: Union[elements.Roadm, elements.Transceiver], equipment: dict,
 | |
|                          pref_ch_db: float, pref_total_db: float, verbose: bool):
 | |
|     """This node can be a transceiver or a ROADM (same function called in both cases).
 | |
| 
 | |
|     Go through each link starting from this_node until next Roadm or Transceiver and
 | |
|     set the amplifiers (Edfa and multiband) according to configurations set by user.
 | |
|     Computes the gain for Raman finers and records it as the gain for reference design.
 | |
|     power_mode = True, set amplifiers delta_p and effective_gain
 | |
|     power_mode = False, set amplifiers effective_gain and ignore delta_p config: set it to None.
 | |
|     records the computed dp in an internal variable for autodesign purpose.
 | |
| 
 | |
|     Args:
 | |
|         network (DiGraph): The network graph containing nodes and links.
 | |
|         this_node (Union[elements.Roadm, elements.Transceiver]): The starting node for OMS link configuration.
 | |
|         equipment (dict): Equipment specifications.
 | |
|         pref_ch_db (float): Reference channel power in dB.
 | |
|         pref_total_db (float): Reference total power in dB.
 | |
|         verbose (bool): Flag for verbose logging.
 | |
| 
 | |
|     Raises:
 | |
|         NetworkTopologyError: If a loop is detected in the network topology.
 | |
|     """
 | |
|     power_mode = equipment['Span']['default'].power_mode
 | |
|     next_oms = (n for n in network.successors(this_node) if not isinstance(n, elements.Transceiver))
 | |
|     for oms in next_oms:
 | |
|         # go through all the OMS departing from the ROADM
 | |
|         prev_node = this_node
 | |
|         node = oms
 | |
|         if isinstance(this_node, elements.Transceiver):
 | |
|             # todo change pref to a ref channel
 | |
|             if equipment['SI']['default'].tx_power_dbm is not None:
 | |
|                 this_node_out_power = equipment['SI']['default'].tx_power_dbm
 | |
|             else:
 | |
|                 this_node_out_power = pref_ch_db
 | |
|         if isinstance(this_node, elements.Roadm):
 | |
|             # get target power out from ROADM for the reference carrier based on equalization settings
 | |
|             this_node_out_power = this_node.get_per_degree_ref_power(degree=node.uid)
 | |
|         # use the target power on this degree
 | |
|         prev_dp = this_node_out_power - pref_ch_db
 | |
|         dp = prev_dp
 | |
|         prev_voa = 0
 | |
|         voa = 0
 | |
|         visited_nodes = []
 | |
|         while not (isinstance(node, elements.Roadm) or isinstance(node, elements.Transceiver)):
 | |
|             # go through all nodes in the OMS (loop until next Roadm instance)
 | |
|             next_node = get_next_node(node, network)
 | |
|             visited_nodes.append(node)
 | |
|             if next_node in visited_nodes:
 | |
|                 raise NetworkTopologyError(f'Loop detected for {type(node).__name__} {node.uid}, '
 | |
|                                            + 'please check network topology')
 | |
|             if isinstance(node, elements.Edfa):
 | |
|                 dp, voa = set_one_amplifier(node, prev_node, next_node, power_mode, prev_voa, prev_dp,
 | |
|                                             pref_ch_db, pref_total_db, network, equipment, verbose)
 | |
|             elif isinstance(node, elements.RamanFiber):
 | |
|                 # this is to record the expected gain in Raman fiber in its .estimated_gain attribute.
 | |
|                 _ = span_loss(network, node, equipment, input_power=pref_ch_db + dp)
 | |
|             if isinstance(node, elements.Multiband_amplifier):
 | |
|                 for amp in node.amplifiers.values():
 | |
|                     dp, voa = set_one_amplifier(amp, prev_node, next_node, power_mode, prev_voa, prev_dp,
 | |
|                                                 pref_ch_db, pref_total_db, network, equipment, verbose)
 | |
|             prev_dp = dp
 | |
|             prev_voa = voa
 | |
|             prev_node = node
 | |
|             node = next_node
 | |
| 
 | |
| 
 | |
| def set_roadm_ref_carrier(roadm, equipment):
 | |
|     """ref_carrier records carrier information used for design and usefull for equalization
 | |
|     """
 | |
|     roadm.ref_carrier = ReferenceCarrier(baud_rate=equipment['SI']['default'].baud_rate,
 | |
|                                          slot_width=equipment['SI']['default'].spacing)
 | |
| 
 | |
| 
 | |
| def set_roadm_per_degree_targets(roadm, network):
 | |
|     """Set target powers/PSD on all degrees
 | |
|     This is needed to populate per_degree_pch_out_dbm or per_degree_pch_psd or per_degree_pch_psw dicts when
 | |
|     they are not initialized by users.
 | |
|     """
 | |
|     next_oms = (n for n in network.successors(roadm) if not isinstance(n, elements.Transceiver))
 | |
| 
 | |
|     for node in next_oms:
 | |
|         # go through all the OMS departing from the ROADM
 | |
|         if node.uid not in roadm.per_degree_pch_out_dbm and node.uid not in roadm.per_degree_pch_psd and \
 | |
|                 node.uid not in roadm.per_degree_pch_psw:
 | |
|             # if no target power is defined on this degree or no per degree target power is given use the global one
 | |
|             if roadm.params.target_pch_out_db:
 | |
|                 roadm.per_degree_pch_out_dbm[node.uid] = roadm.params.target_pch_out_db
 | |
|             elif roadm.params.target_psd_out_mWperGHz:
 | |
|                 roadm.per_degree_pch_psd[node.uid] = roadm.params.target_psd_out_mWperGHz
 | |
|             elif roadm.params.target_out_mWperSlotWidth:
 | |
|                 roadm.per_degree_pch_psw[node.uid] = roadm.params.target_out_mWperSlotWidth
 | |
|             else:
 | |
|                 raise ConfigurationError(roadm.uid, 'needs an equalization target')
 | |
| 
 | |
| 
 | |
| def set_per_degree_design_band(node: Union[elements.Roadm, elements.Transceiver], network: DiGraph, equipment: dict):
 | |
|     """Configures the design bands for each degree of a node based on network and equipment constraints.
 | |
|     This function determines the design bands for each degree of a node (either a ROADM or a Transceiver)
 | |
|     based on the existing amplifier types and spectral information (SI) constraints. It uses a default
 | |
|     design band derived from the SI or ROADM bands if no specific bands are defined by the user.
 | |
|     node.params.x contains the values initially defined by user (with x in design_bands,
 | |
|     per_degree_design_bands). node.x contains the autodesign values.
 | |
| 
 | |
|     Parameters:
 | |
|         node (Node): The node for which design bands are being set.
 | |
|         network (Network): The network containing the node and its connections.
 | |
|         equipment (dict): A dictionary containing equipment data, including spectral information.
 | |
| 
 | |
|     Raises:
 | |
|         NetworkTopologyError: If there is an inconsistency in band definitions or unsupported configurations.
 | |
| 
 | |
|     Notes:
 | |
|         - The function prioritizes user-defined bands in `node.params` if available.
 | |
|         - It checks for consistency between default bands and amplifier types.
 | |
|         - Mixed single-band and multi-band configurations are not supported and will raise an error.
 | |
|         - The function ensures that all bands are ordered by their minimum frequency.
 | |
|     """
 | |
|     next_oms = (n for n in network.successors(node))
 | |
|     if len(node.design_bands) == 0:
 | |
|         node.design_bands = [{'f_min': si.f_min, 'f_max': si.f_max} for si in equipment['SI'].values()]
 | |
| 
 | |
|     default_is_single_band = len(node.design_bands) == 1
 | |
|     for next_node in next_oms:
 | |
|         # get all the elements from the OMS and retrieve their amps types and bands
 | |
|         oms_edges = get_oms_edge_list(next_node, network)
 | |
|         amps_type = check_oms_single_type(oms_edges)
 | |
|         oms_is_single_band = "Edfa" in amps_type if len(amps_type) == 1 else None
 | |
|         # oms_is_single_band can be True (single band OMS), False (Multiband OMS) or None (undefined: waiting for
 | |
|         # autodesign).
 | |
|         el_list = [n for n, _ in oms_edges]
 | |
|         amp_bands = [n.params.bands for n in el_list if isinstance(n, (elements.Edfa, elements.Multiband_amplifier))
 | |
|                      and n.params.bands]
 | |
|         # Use node.design_bands constraints if they are consistent with the amps type
 | |
|         if oms_is_single_band == default_is_single_band:
 | |
|             amp_bands.append(node.design_bands)
 | |
| 
 | |
|         common_range = find_common_range(amp_bands, None, None)
 | |
|         # node.per_degree_design_bands has already been populated with node.params.per_degree_design_bands loaded
 | |
|         # from the json.
 | |
|         # let's complete the dict with the design band of degrees for which there was no definition
 | |
|         if next_node.uid not in node.per_degree_design_bands:
 | |
|             if common_range:
 | |
|                 # if degree design band was not defined, then use the common_range computed with the oms amplifiers
 | |
|                 # already defined
 | |
|                 node.per_degree_design_bands[next_node.uid] = common_range
 | |
|             elif oms_is_single_band is None or (oms_is_single_band == default_is_single_band):
 | |
|                 # else if no amps are defined (no bands) then use default ROADM bands
 | |
|                 # use default ROADM bands only if this is consistent with the oms amps type
 | |
|                 node.per_degree_design_bands[next_node.uid] = node.design_bands
 | |
|             else:
 | |
|                 # unsupported case: single band OMS with default multiband design band
 | |
|                 raise NetworkTopologyError(f"in {node.uid} degree {next_node.uid}: inconsistent design multiband/"
 | |
|                                            + " single band definition on a single band/ multiband OMS")
 | |
|         if next_node.uid in node.params.per_degree_design_bands:
 | |
|             # order bands per min frequency in params.per_degree_design_bands for those degree that are defined there
 | |
|             node.params.per_degree_design_bands[next_node.uid] = \
 | |
|                 sorted(node.params.per_degree_design_bands[next_node.uid], key=lambda x: x['f_min'])
 | |
|         # order the bands per min frequency in .per_degree_design_bands (all degrees must exist there)
 | |
|         node.per_degree_design_bands[next_node.uid] = \
 | |
|             sorted(node.per_degree_design_bands[next_node.uid], key=lambda x: x['f_min'])
 | |
|     # check node.params.per_degree_design_bands keys
 | |
|     if node.params.per_degree_design_bands:
 | |
|         next_oms_uid = [n.uid for n in network.successors(node)]
 | |
|         for degree in node.params.per_degree_design_bands.keys():
 | |
|             if degree not in next_oms_uid:
 | |
|                 raise NetworkTopologyError(f"in {node.uid} degree {degree} does not match any degree"
 | |
|                                            + f"{list(node.per_degree_design_bands.keys())}")
 | |
| 
 | |
| 
 | |
| def set_roadm_input_powers(network, roadm, equipment, pref_ch_db):
 | |
|     """Set reference powers at ROADM input for a reference channel and based on the adjacent OMS.
 | |
|     This supposes that there is no dependency on path. For example, the succession:
 | |
|     node                             power out of element
 | |
|     roadm A (target power -10dBm)   -10dBm
 | |
|     fiber A (16 dB loss)            -26dBm
 | |
|     roadm B (target power -12dBm)   -26dBm
 | |
|     fiber B (10 dB loss)            -36dBm
 | |
|     roadm C (target power -14dBm)   -36dBm
 | |
|     is not consistent because target powers in roadm B and roadm C can not be met.
 | |
|     input power for the reference channel will be set -26 dBm in roadm B and -22dBm in roadm C,
 | |
|     because at design time we can not know about path.
 | |
|     The function raises a warning if target powers can not be met with the design.
 | |
|     User should be aware that design was not successfull and that power reduction was applied.
 | |
|     Note that this value is only used for visualisation purpose (to compute ROADM loss in elements).
 | |
|     """
 | |
|     previous_elements = [n for n in network.predecessors(roadm)]
 | |
|     roadm.ref_pch_in_dbm = {}
 | |
|     for element in previous_elements:
 | |
|         node = element
 | |
|         loss = 0.0
 | |
|         while isinstance(node, (elements.Fiber, elements.Fused, elements.RamanFiber)):
 | |
|             # go through all predecessors until a power target is found either in an amplifier, a ROADM or a transceiver
 | |
|             # then deduce power at ROADM input from this degree based on this target and crossed losses
 | |
|             loss += node.loss
 | |
|             previous_node = node
 | |
|             node = next(network.predecessors(node))
 | |
|         if isinstance(node, elements.Edfa):
 | |
|             roadm.ref_pch_in_dbm[element.uid] = pref_ch_db + node._delta_p - node.out_voa - loss
 | |
|         elif isinstance(node, elements.Roadm):
 | |
|             roadm.ref_pch_in_dbm[element.uid] = \
 | |
|                 node.get_per_degree_ref_power(degree=previous_node.uid) - loss
 | |
|         elif isinstance(node, elements.Transceiver):
 | |
|             roadm.ref_pch_in_dbm[element.uid] = pref_ch_db - loss
 | |
|         elif isinstance(node, elements.Multiband_amplifier):
 | |
|             # use the worst (min) value among amps
 | |
|             roadm.ref_pch_in_dbm[element.uid] = min([pref_ch_db + amp._delta_p - amp.out_voa - loss
 | |
|                                                      for amp in node.amplifiers.values()])
 | |
|     # check if target power can be met
 | |
|     temp = []
 | |
|     if roadm.per_degree_pch_out_dbm:
 | |
|         temp.append(max([p for p in roadm.per_degree_pch_out_dbm.values()]))
 | |
|     if roadm.per_degree_pch_psd:
 | |
|         temp.append(max([psd2powerdbm(p, roadm.ref_carrier.baud_rate) for p in roadm.per_degree_pch_psd.values()]))
 | |
|     if roadm.per_degree_pch_psw:
 | |
|         temp.append(max([psd2powerdbm(p, roadm.ref_carrier.slot_width) for p in roadm.per_degree_pch_psw.values()]))
 | |
|     if roadm.params.target_pch_out_db:
 | |
|         temp.append(roadm.params.target_pch_out_db)
 | |
|     if roadm.params.target_psd_out_mWperGHz:
 | |
|         temp.append(psd2powerdbm(roadm.params.target_psd_out_mWperGHz, roadm.ref_carrier.baud_rate))
 | |
|     if roadm.params.target_out_mWperSlotWidth:
 | |
|         temp.append(psd2powerdbm(roadm.params.target_out_mWperSlotWidth, roadm.ref_carrier.slot_width))
 | |
|     if not temp:
 | |
|         raise ConfigurationError(f'Could not find target power/PSD/PSW in ROADM "{roadm.uid}"')
 | |
|     target_to_be_supported = max(temp)
 | |
|     for from_degree, in_power in roadm.ref_pch_in_dbm.items():
 | |
|         if in_power < target_to_be_supported:
 | |
|             logger.warning(
 | |
|                 f'WARNING: maximum target power {target_to_be_supported}dBm '
 | |
|                 + f'in ROADM "{roadm.uid}" can not be met for at least one crossing path. Min input power '
 | |
|                 + f'from "{from_degree}" direction is {round(in_power, 2)}dBm. Please correct input topology.'
 | |
|             )
 | |
| 
 | |
| 
 | |
| def set_fiber_input_power(network, fiber, equipment, pref_ch_db):
 | |
|     """Set reference powers at fiber input for a reference channel.
 | |
|     Supposes that target power out of ROADMs and amplifiers are consistent.
 | |
|     This is only for visualisation purpose
 | |
|     """
 | |
|     loss = 0.0
 | |
|     node = next(network.predecessors(fiber))
 | |
|     while isinstance(node, elements.Fused):
 | |
|         loss += node.loss
 | |
|         previous_node = node
 | |
|         node = next(network.predecessors(node))
 | |
|     if isinstance(node, (elements.Fiber, elements.RamanFiber)) and node.ref_pch_in_dbm is not None:
 | |
|         fiber.ref_pch_in_dbm = node.ref_pch_in_dbm - loss - node.loss
 | |
|     if isinstance(node, (elements.Fiber, elements.RamanFiber)) and node.ref_pch_in_dbm is None:
 | |
|         set_fiber_input_power(network, node, equipment, pref_ch_db)
 | |
|         fiber.ref_pch_in_dbm = node.ref_pch_in_dbm - loss - node.loss
 | |
|     elif isinstance(node, elements.Roadm):
 | |
|         fiber.ref_pch_in_dbm = \
 | |
|             node.get_per_degree_ref_power(degree=previous_node.uid) - loss
 | |
|     elif isinstance(node, elements.Edfa):
 | |
|         fiber.ref_pch_in_dbm = pref_ch_db + node._delta_p - node.out_voa - loss
 | |
|     elif isinstance(node, elements.Transceiver):
 | |
|         fiber.ref_pch_in_dbm = pref_ch_db - loss
 | |
|     elif isinstance(node, elements.Multiband_amplifier):
 | |
|         # use the worst (min) value among amps
 | |
|         fiber.ref_pch_in_dbm = min([pref_ch_db + amp._delta_p - amp.out_voa - loss for amp in node.amplifiers.values()])
 | |
| 
 | |
| 
 | |
| def set_roadm_internal_paths(roadm, network):
 | |
|     """Set ROADM path types (express, add, drop)
 | |
| 
 | |
|     Uses implicit guess if no information is set in ROADM
 | |
|     """
 | |
|     next_oms = [n.uid for n in network.successors(roadm) if not isinstance(n, elements.Transceiver)]
 | |
|     previous_oms = [n.uid for n in network.predecessors(roadm) if not isinstance(n, elements.Transceiver)]
 | |
|     drop_port = [n.uid for n in network.successors(roadm) if isinstance(n, elements.Transceiver)]
 | |
|     add_port = [n.uid for n in network.predecessors(roadm) if isinstance(n, elements.Transceiver)]
 | |
| 
 | |
|     default_express = 'express'
 | |
|     default_add = 'add'
 | |
|     default_drop = 'drop'
 | |
|     # take user defined element impairment id if it exists
 | |
|     correct_from_degrees = []
 | |
|     correct_add = []
 | |
|     correct_to_degrees = []
 | |
|     correct_drop = []
 | |
|     for from_degree in previous_oms:
 | |
|         correct_from_degrees.append(from_degree)
 | |
|         for to_degree in next_oms:
 | |
|             correct_to_degrees.append(to_degree)
 | |
|             impairment_id = roadm.get_per_degree_impairment_id(from_degree, to_degree)
 | |
|             roadm.set_roadm_paths(from_degree=from_degree, to_degree=to_degree, path_type=default_express,
 | |
|                                   impairment_id=impairment_id)
 | |
|         for drop in drop_port:
 | |
|             correct_drop.append(drop)
 | |
|             impairment_id = roadm.get_per_degree_impairment_id(from_degree, drop)
 | |
|             path_type = roadm.get_path_type_per_id(impairment_id)
 | |
|             # a degree connected to a transceiver MUST be add or drop
 | |
|             # but a degree connected  to something else could be an express, add or drop
 | |
|             # (for example case of external shelves)
 | |
|             if path_type and path_type != 'drop':
 | |
|                 msg = f'Roadm {roadm.uid} path_type is defined as {path_type} but it should be drop'
 | |
|                 raise NetworkTopologyError(msg)
 | |
|             roadm.set_roadm_paths(from_degree=from_degree, to_degree=drop, path_type=default_drop,
 | |
|                                   impairment_id=impairment_id)
 | |
|     for to_degree in next_oms:
 | |
|         for add in add_port:
 | |
|             correct_add.append(add)
 | |
|             impairment_id = roadm.get_per_degree_impairment_id(add, to_degree)
 | |
|             path_type = roadm.get_path_type_per_id(impairment_id)
 | |
|             if path_type and path_type != 'add':
 | |
|                 msg = f'Roadm {roadm.uid} path_type is defined as {path_type} but it should be add'
 | |
|                 raise NetworkTopologyError(msg)
 | |
|             roadm.set_roadm_paths(from_degree=add, to_degree=to_degree, path_type=default_add,
 | |
|                                   impairment_id=impairment_id)
 | |
|     # sanity check: raise an error if per_degree from or to degrees are not in the correct list
 | |
|     # raise an error if user defined path_type is not consistent with inferred path_type:
 | |
|     for item in roadm.per_degree_impairments.values():
 | |
|         if item['from_degree'] not in correct_from_degrees + correct_add or \
 | |
|                 item['to_degree'] not in correct_to_degrees + correct_drop:
 | |
|             msg = f'Roadm {roadm.uid} has wrong from-to degree uid {item["from_degree"]} - {item["to_degree"]}'
 | |
|             raise NetworkTopologyError(msg)
 | |
| 
 | |
| 
 | |
| def add_roadm_booster(network, roadm):
 | |
|     next_nodes = [n for n in network.successors(roadm)
 | |
|                   if not isinstance(n, (elements.Transceiver, elements.Fused, elements.Edfa,
 | |
|                                         elements.Multiband_amplifier))]
 | |
|     # no amplification for fused spans or TRX
 | |
|     for next_node in next_nodes:
 | |
|         network.remove_edge(roadm, next_node)
 | |
|         amp = elements.Edfa(
 | |
|             uid=f'Edfa_booster_{roadm.uid}_to_{next_node.uid}',
 | |
|             params=EdfaParams.default_values,
 | |
|             metadata={
 | |
|                 'location': {
 | |
|                     'latitude': roadm.lat,
 | |
|                     'longitude': roadm.lng,
 | |
|                     'city': roadm.loc.city,
 | |
|                     'region': roadm.loc.region,
 | |
|                 }
 | |
|             },
 | |
|             operational={
 | |
|                 'gain_target': None,
 | |
|                 'tilt_target': 0,
 | |
|             })
 | |
|         network.add_node(amp)
 | |
|         network.add_edge(roadm, amp, weight=0.01)
 | |
|         network.add_edge(amp, next_node, weight=0.01)
 | |
| 
 | |
| 
 | |
| def add_roadm_preamp(network, roadm):
 | |
|     prev_nodes = [n for n in network.predecessors(roadm)
 | |
|                   if not isinstance(n, (elements.Transceiver, elements.Fused, elements.Edfa,
 | |
|                                         elements.Multiband_amplifier))]
 | |
|     # no amplification for fused spans or TRX
 | |
|     for prev_node in prev_nodes:
 | |
|         network.remove_edge(prev_node, roadm)
 | |
|         amp = elements.Edfa(
 | |
|             uid=f'Edfa_preamp_{roadm.uid}_from_{prev_node.uid}',
 | |
|             params=EdfaParams.default_values,
 | |
|             metadata={
 | |
|                 'location': {
 | |
|                     'latitude': roadm.lat,
 | |
|                     'longitude': roadm.lng,
 | |
|                     'city': roadm.loc.city,
 | |
|                     'region': roadm.loc.region,
 | |
|                 }
 | |
|             },
 | |
|             operational={
 | |
|                 'gain_target': None,
 | |
|                 'tilt_target': 0,
 | |
|             })
 | |
|         network.add_node(amp)
 | |
|         if isinstance(prev_node, elements.Fiber):
 | |
|             edgeweight = prev_node.params.length
 | |
|         else:
 | |
|             edgeweight = 0.01
 | |
|         network.add_edge(prev_node, amp, weight=edgeweight)
 | |
|         network.add_edge(amp, roadm, weight=0.01)
 | |
| 
 | |
| 
 | |
| def add_inline_amplifier(network, fiber):
 | |
|     next_node = get_next_node(fiber, network)
 | |
|     if isinstance(next_node, elements.Fiber) or isinstance(next_node, elements.RamanFiber):
 | |
|         # no amplification for fused spans or TRX
 | |
|         network.remove_edge(fiber, next_node)
 | |
|         amp = elements.Edfa(
 | |
|             uid=f'Edfa_{fiber.uid}',
 | |
|             params=EdfaParams.default_values,
 | |
|             metadata={
 | |
|                 'location': {
 | |
|                     'latitude': (fiber.lat + next_node.lat) / 2,
 | |
|                     'longitude': (fiber.lng + next_node.lng) / 2,
 | |
|                     'city': fiber.loc.city,
 | |
|                     'region': fiber.loc.region,
 | |
|                 }
 | |
|             },
 | |
|             operational={
 | |
|                 'gain_target': None,
 | |
|                 'tilt_target': 0,
 | |
|             })
 | |
|         network.add_node(amp)
 | |
|         network.add_edge(fiber, amp, weight=fiber.params.length)
 | |
|         network.add_edge(amp, next_node, weight=0.01)
 | |
| 
 | |
| 
 | |
| def calculate_new_length(fiber_length, bounds, target_length):
 | |
|     """If fiber is over boundary, then assume this is a link "intent" and computes the set of
 | |
|     identical fiber spans this link should be composed of.
 | |
|     """
 | |
|     if fiber_length < bounds.stop:
 | |
|         return fiber_length, 1
 | |
| 
 | |
|     n_spans2 = int(fiber_length // target_length)
 | |
|     n_spans1 = n_spans2 + 1
 | |
| 
 | |
|     length1 = fiber_length / n_spans1
 | |
|     length2 = fiber_length / n_spans2
 | |
| 
 | |
|     if (bounds.start <= length1 <= bounds.stop) and not(bounds.start <= length2 <= bounds.stop):
 | |
|         return (length1, n_spans1)
 | |
|     elif (bounds.start <= length2 <= bounds.stop) and not(bounds.start <= length1 <= bounds.stop):
 | |
|         return (length2, n_spans2)
 | |
|     elif length2 - target_length <= target_length - length1 and length2 <= bounds.stop:
 | |
|         return (length2, n_spans2)
 | |
|     else:
 | |
|         return (length1, n_spans1)
 | |
| 
 | |
| 
 | |
| def get_next_node(node, network):
 | |
|     """get_next node else raise tha appropriate error
 | |
|     """
 | |
|     try:
 | |
|         next_node = next(network.successors(node))
 | |
|         return next_node
 | |
|     except StopIteration:
 | |
|         raise NetworkTopologyError(
 | |
|             f'{type(node).__name__} {node.uid} is not properly connected, please check network topology')
 | |
| 
 | |
| 
 | |
| def split_fiber(network, fiber, bounds, target_length):
 | |
|     """If fiber length exceeds boundary then assume this is a link "intent", and replace this one-span link
 | |
|     with an n_spans link, with identical fiber types.
 | |
|     """
 | |
|     new_length, n_spans = calculate_new_length(fiber.params.length, bounds, target_length)
 | |
|     if n_spans == 1:
 | |
|         return
 | |
| 
 | |
|     try:
 | |
|         next_node = next(network.successors(fiber))
 | |
|         prev_node = next(network.predecessors(fiber))
 | |
|     except StopIteration:
 | |
|         raise NetworkTopologyError(f'Fiber {fiber.uid} is not properly connected, please check network topology')
 | |
| 
 | |
|     network.remove_node(fiber)
 | |
| 
 | |
|     fiber.params.length = new_length
 | |
| 
 | |
|     xpos = [prev_node.lng + (next_node.lng - prev_node.lng) * (n + 0.5) / n_spans for n in range(n_spans)]
 | |
|     ypos = [prev_node.lat + (next_node.lat - prev_node.lat) * (n + 0.5) / n_spans for n in range(n_spans)]
 | |
|     for span, lng, lat in zip(range(n_spans), xpos, ypos):
 | |
|         new_span = elements.Fiber(uid=f'{fiber.uid}_({span+1}/{n_spans})',
 | |
|                          type_variety=fiber.type_variety,
 | |
|                          metadata={
 | |
|                               'location': {
 | |
|                                   'latitude': lat,
 | |
|                                   'longitude': lng,
 | |
|                                   'city': fiber.loc.city,
 | |
|                                   'region': fiber.loc.region,
 | |
|                               }
 | |
|                          },
 | |
|                          params=fiber.params.asdict())
 | |
|         if isinstance(prev_node, elements.Fiber):
 | |
|             edgeweight = prev_node.params.length
 | |
|         else:
 | |
|             edgeweight = 0.01
 | |
|         network.add_edge(prev_node, new_span, weight=edgeweight)
 | |
|         prev_node = new_span
 | |
|     if isinstance(prev_node, elements.Fiber):
 | |
|         edgeweight = prev_node.params.length
 | |
|     else:
 | |
|         edgeweight = 0.01
 | |
|     network.add_edge(prev_node, next_node, weight=edgeweight)
 | |
| 
 | |
| 
 | |
| def add_connector_loss(network, fibers, default_con_in, default_con_out, EOL):
 | |
|     """Add default connector loss if no loss are defined. EOL repair margin is added as a connector loss
 | |
|     """
 | |
|     for fiber in fibers:
 | |
|         next_node = get_next_node(fiber, network)
 | |
|         if fiber.params.con_in is None:
 | |
|             fiber.params.con_in = default_con_in
 | |
|         if fiber.params.con_out is None:
 | |
|             fiber.params.con_out = default_con_out
 | |
|         if not isinstance(next_node, elements.Fused):
 | |
|             fiber.params.con_out += EOL
 | |
| 
 | |
| 
 | |
| def add_fiber_padding(network, fibers, padding, equipment):
 | |
|     """Add a padding att_in at the input of the 1st fiber of a succession of fibers and fused
 | |
|     """
 | |
|     for fiber in fibers:
 | |
|         next_node = get_next_node(fiber, network)
 | |
|         if isinstance(next_node, elements.Fused):
 | |
|             continue
 | |
|         # do not pad if this is a Raman Fiber
 | |
|         if isinstance(fiber, elements.RamanFiber):
 | |
|             continue
 | |
|         this_span_loss = span_loss(network, fiber, equipment)
 | |
|         fiber.design_span_loss = this_span_loss
 | |
|         if this_span_loss < padding:
 | |
|             # add a padding att_in at the input of the 1st fiber:
 | |
|             # address the case when several fibers are spliced together
 | |
|             first_fiber = find_first_node(network, fiber)
 | |
|             # in order to support no booster , fused might be placed
 | |
|             # just after a roadm: need to check that first_fiber is really a fiber
 | |
|             if isinstance(first_fiber, elements.Fiber):
 | |
|                 first_fiber.params.att_in = first_fiber.params.att_in + padding - this_span_loss
 | |
|                 fiber.design_span_loss += first_fiber.params.att_in
 | |
| 
 | |
| 
 | |
| def add_missing_elements_in_network(network, equipment):
 | |
|     """Autodesign network: add missing elements. split fibers if their length is too big
 | |
|     add ROADM preamp or booster and inline amplifiers between fibers
 | |
|     """
 | |
|     default_span_data = equipment['Span']['default']
 | |
|     max_length = int(convert_length(default_span_data.max_length, default_span_data.length_units))
 | |
|     min_length = max(int(default_span_data.padding / 0.2 * 1e3), 50_000)
 | |
|     bounds = range(min_length, max_length)
 | |
|     target_length = max(min_length, min(max_length, 90_000))
 | |
|     fibers = [f for f in network.nodes() if isinstance(f, elements.Fiber)]
 | |
|     for fiber in fibers:
 | |
|         split_fiber(network, fiber, bounds, target_length)
 | |
|     roadms = [r for r in network.nodes() if isinstance(r, elements.Roadm)]
 | |
|     for roadm in roadms:
 | |
|         add_roadm_preamp(network, roadm)
 | |
|         add_roadm_booster(network, roadm)
 | |
|     fibers = [f for f in network.nodes() if isinstance(f, elements.Fiber)]
 | |
|     for fiber in fibers:
 | |
|         add_inline_amplifier(network, fiber)
 | |
| 
 | |
| 
 | |
| def add_missing_fiber_attributes(network, equipment):
 | |
|     """Fill in connector loss with default values. Add the padding loss is required.
 | |
|     EOL is added as a connector loss
 | |
|     """
 | |
|     default_span_data = equipment['Span']['default']
 | |
|     fibers = [f for f in network.nodes() if isinstance(f, elements.Fiber)]
 | |
|     add_connector_loss(network, fibers, default_span_data.con_in, default_span_data.con_out, default_span_data.EOL)
 | |
|     # don't group split fiber and add amp in the same loop
 | |
|     # =>for code clarity (at the expense of speed):
 | |
|     add_fiber_padding(network, fibers, default_span_data.padding, equipment)
 | |
| 
 | |
| 
 | |
| def build_network(network, equipment, pref_ch_db, pref_total_db, set_connector_losses=True, verbose=True):
 | |
|     """Set roadm equalization target and amplifier gain and power
 | |
|     """
 | |
|     roadms = [r for r in network.nodes() if isinstance(r, elements.Roadm)]
 | |
|     transceivers = [t for t in network.nodes() if isinstance(t, elements.Transceiver)]
 | |
| 
 | |
|     if set_connector_losses:
 | |
|         add_missing_fiber_attributes(network, equipment)
 | |
|     # set roadm equalization targets first
 | |
|     for roadm in roadms:
 | |
|         set_roadm_ref_carrier(roadm, equipment)
 | |
|         set_roadm_per_degree_targets(roadm, network)
 | |
|         set_per_degree_design_band(roadm, network, equipment)
 | |
|     for transceiver in transceivers:
 | |
|         set_per_degree_design_band(transceiver, network, equipment)
 | |
|     # then set amplifiers gain, delta_p and out_voa on each OMS
 | |
|     for roadm in roadms + transceivers:
 | |
|         set_egress_amplifier(network, roadm, equipment, pref_ch_db, pref_total_db, verbose)
 | |
|     for roadm in roadms:
 | |
|         set_roadm_input_powers(network, roadm, equipment, pref_ch_db)
 | |
|         set_roadm_internal_paths(roadm, network)
 | |
|     for fiber in [f for f in network.nodes() if isinstance(f, (elements.Fiber, elements.RamanFiber))]:
 | |
|         set_fiber_input_power(network, fiber, equipment, pref_ch_db)
 | |
| 
 | |
| 
 | |
| def design_network(reference_channel, network, equipment, set_connector_losses=True, verbose=True):
 | |
|     """Network is designed according to reference channel. Verbose indicate if the function should
 | |
|     print all warnings or not
 | |
|     """
 | |
|     pref_ch_db = watt2dbm(reference_channel.power)  # reference channel power
 | |
|     # reference total power (limited to C band till C+L autodesign is not solved)
 | |
|     designed_nb_channel = min(reference_channel.nb_channel,
 | |
|                               automatic_nch(191.0e12, 196.2e12, reference_channel.spacing))
 | |
|     pref_total_db = pref_ch_db + lin2db(designed_nb_channel)
 | |
|     build_network(network, equipment, pref_ch_db, pref_total_db, set_connector_losses=set_connector_losses,
 | |
|                   verbose=verbose)
 |