mirror of
https://github.com/Telecominfraproject/oopt-gnpy.git
synced 2025-10-30 01:32:21 +00:00
Signed-off-by: Florian FRANK <florian1.frank@orange.com> Change-Id: I2a0c249c7e3278e282c2c45ea8be52073f014de3
1817 lines
86 KiB
Python
1817 lines
86 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
# gnpy.core.elements: Standard network elements which propagate optical spectrum
|
|
# Copyright (C) 2025 Telecom Infra Project and GNPy contributors
|
|
# see AUTHORS.rst for a list of contributors
|
|
|
|
"""
|
|
gnpy.core.elements
|
|
==================
|
|
|
|
Standard network elements which propagate optical spectrum.
|
|
|
|
A network element is a Python callable. It takes a :class:`.info.SpectralInformation`
|
|
object and returns a copy with appropriate fields affected. This structure
|
|
represents spectral information that is "propagated" by this network element.
|
|
Network elements must have only a local "view" of the network and propagate
|
|
:class:`.info.SpectralInformation` using only this information. They should be independent and
|
|
self-contained.
|
|
|
|
Network elements MUST implement two attributes :py:attr:`uid` and :py:attr:`name` representing a
|
|
unique identifier and a printable name, and provide the :py:meth:`__call__` method taking a
|
|
:class:`SpectralInformation` as an input and returning another :class:`SpectralInformation`
|
|
instance as a result.
|
|
"""
|
|
|
|
from copy import deepcopy
|
|
from collections import namedtuple
|
|
from typing import Union, List
|
|
from logging import getLogger
|
|
import warnings
|
|
from numpy import abs, array, errstate, ones, interp, mean, pi, polyfit, polyval, sum, sqrt, log10, exp, asarray, \
|
|
full, squeeze, zeros, outer, ndarray
|
|
from scipy.constants import h, c
|
|
from scipy.interpolate import interp1d
|
|
|
|
from gnpy.core.utils import lin2db, db2lin, arrange_frequencies, snr_sum, per_label_average, pretty_summary_print, \
|
|
watt2dbm, psd2powerdbm, calculate_absolute_min_or_zero, nice_column_str
|
|
from gnpy.core.parameters import RoadmParams, FusedParams, FiberParams, PumpParams, EdfaParams, EdfaOperational, \
|
|
MultiBandParams, RoadmPath, RoadmImpairment, TransceiverParams, find_band_name, FrequencyBand
|
|
from gnpy.core.science_utils import NliSolver, RamanSolver
|
|
from gnpy.core.info import SpectralInformation, muxed_spectral_information, demuxed_spectral_information
|
|
from gnpy.core.exceptions import NetworkTopologyError, SpectrumError, ParametersError
|
|
|
|
|
|
_logger = getLogger(__name__)
|
|
|
|
|
|
class Location(namedtuple('Location', 'latitude longitude city region')):
|
|
"""Represents a geographical location with latitude, longitude, city, and region."""
|
|
def __new__(cls, latitude: float = 0, longitude: float = 0, city: str = None, region: str = None):
|
|
return super().__new__(cls, latitude, longitude, city, region)
|
|
|
|
|
|
class _Node:
|
|
"""Convenience class for providing common functionality of all network elements
|
|
|
|
This class is just an internal implementation detail; do **not** assume that all network elements
|
|
inherit from :class:`_Node`.
|
|
|
|
:ivar uid: Unique identifier for the node.
|
|
:vartype uid: str
|
|
:ivar name: Printable name of the node.
|
|
:vartype name: str
|
|
:ivar params: Parameters associated with the node.
|
|
:vartype params: Any
|
|
:ivar metadata: Metadata including location.
|
|
:vartype metadata: Dict[str, Any]
|
|
:ivar operational: Operational parameters.
|
|
:vartype operational: Any
|
|
:ivar type_variety: Type variety of the node.
|
|
:vartype type_variety: str
|
|
"""
|
|
def __init__(self, uid, name=None, params=None, metadata=None, operational=None, type_variety=None):
|
|
"""Constructor method
|
|
"""
|
|
if name is None:
|
|
name = uid
|
|
self.uid, self.name = uid, name
|
|
if metadata is None:
|
|
metadata = {'location': {}}
|
|
if metadata and not isinstance(metadata.get('location'), Location):
|
|
metadata['location'] = Location(**metadata.pop('location', {}))
|
|
self.params, self.metadata, self.operational = params, metadata, operational
|
|
if type_variety:
|
|
self.type_variety = type_variety
|
|
|
|
@property
|
|
def location(self):
|
|
"""Returns the location of the node."""
|
|
return self.metadata['location']
|
|
loc = location
|
|
|
|
@property
|
|
def longitude(self):
|
|
"""Returns the longitude of the node."""
|
|
return self.location.longitude
|
|
lng = longitude
|
|
|
|
@property
|
|
def latitude(self):
|
|
"""Returns the latitude of the node."""
|
|
return self.location.latitude
|
|
lat = latitude
|
|
|
|
|
|
class Transceiver(_Node):
|
|
"""Represents a logical start for propagation in the optical network.
|
|
|
|
:ivar osnr_ase_01nm: OSNR in 0.1 nm bandwidth per carrier in the spectrum.
|
|
:vartype osnr_ase_01nm: numpy.ndarray
|
|
:ivar osnr_ase: OSNR ASE value per carrier in the spectrum.
|
|
:vartype osnr_ase: numpy.ndarray
|
|
:ivar osnr_nli: OSNR NLI value per carrier in the spectrum.
|
|
:vartype osnr_nli: numpy.ndarray
|
|
:ivar snr: Generalized Signal-to-noise ratio per carrier in the spectrum.
|
|
:vartype snr: numpy.ndarray
|
|
:ivar passive: Indicates if the system is passive (default is False).
|
|
:vartype passive: bool
|
|
:ivar baud_rate: Baud rate of each carrier of the emitted spectrum.
|
|
:vartype baud_rate: numpy.ndarray
|
|
:ivar chromatic_dispersion: Chromatic dispersion value per carrier in the spectrum.
|
|
:vartype chromatic_dispersion: numpy.ndarray
|
|
:ivar pmd: PMD value per carrier in the spectrum.
|
|
:vartype pmd: numpy.ndarray
|
|
:ivar pdl: PDL value per carrier in the spectrum.
|
|
:vartype pdl: numpy.ndarray
|
|
:ivar latency: Latency value per carrier in the spectrum.
|
|
:vartype latency: numpy.ndarray
|
|
:ivar penalties: Penalties for various impairments.
|
|
:vartype penalties: Dict[str, float]
|
|
:ivar total_penalty: Total penalty value per carrier in the spectrum.
|
|
:vartype total_penalty: numpy.ndarray
|
|
:ivar propagated_labels: Labels propagated by the transceiver.
|
|
:vartype propagated_labels: numpy.ndarray[str]
|
|
:ivar tx_power: Transmit power.
|
|
:vartype tx_power: numpy.ndarray
|
|
:ivar design_bands: Design bands parameters.
|
|
:vartype design_bands: list
|
|
:ivar per_degree_design_bands: Per degree design bands parameters.
|
|
:vartype per_degree_design_bands: dict
|
|
"""
|
|
def __init__(self, *args, params=None, **kwargs):
|
|
"""Constructor method
|
|
"""
|
|
if not params:
|
|
params = {}
|
|
try:
|
|
with warnings.catch_warnings(record=True) as caught_warnings:
|
|
super().__init__(*args, params=TransceiverParams(**params), **kwargs)
|
|
if caught_warnings:
|
|
msg = f'In Transceiver {kwargs["uid"]}: {caught_warnings[0].message}'
|
|
_logger.warning(msg)
|
|
except ParametersError as e:
|
|
msg = f'Config error in {kwargs["uid"]}: {e}'
|
|
_logger.critical(msg)
|
|
raise ParametersError(msg) from e
|
|
|
|
self.osnr_ase_01nm = None
|
|
self.osnr_ase = None
|
|
self.osnr_nli = None
|
|
self.snr = None
|
|
self.passive = False
|
|
self.baud_rate = None
|
|
self.chromatic_dispersion = None
|
|
self.pmd = None
|
|
self.pdl = None
|
|
self.latency = None
|
|
self.penalties = {}
|
|
self.total_penalty = 0
|
|
self.propagated_labels = [""]
|
|
self.tx_power = None
|
|
self.design_bands = self.params.design_bands
|
|
self.per_degree_design_bands = self.params.per_degree_design_bands
|
|
|
|
def _calc_cd(self, spectral_info):
|
|
"""Updates the Transceiver property with the CD of the received channels. CD in ps/nm.
|
|
"""
|
|
self.chromatic_dispersion = spectral_info.chromatic_dispersion * 1e3
|
|
|
|
def _calc_pmd(self, spectral_info):
|
|
"""Updates the Transceiver property with the PMD of the received channels. PMD in ps.
|
|
"""
|
|
self.pmd = spectral_info.pmd * 1e12
|
|
|
|
def _calc_pdl(self, spectral_info):
|
|
"""Updates the Transceiver property with the PDL of the received channels. PDL in dB.
|
|
"""
|
|
self.pdl = spectral_info.pdl
|
|
|
|
def _calc_latency(self, spectral_info):
|
|
"""Updates the Transceiver property with the latency of the received channels. Latency in ms.
|
|
"""
|
|
self.latency = spectral_info.latency * 1e3
|
|
|
|
def _calc_penalty(self, impairment_value, boundary_list):
|
|
"""Computes the SNR penalty given the impairment value.
|
|
|
|
:param impairment_value: The impairment value.
|
|
:type impairment_value: float
|
|
:param boundary_list: The boundary list for penalties.
|
|
:type boundary_list: Dict[str, Any]
|
|
|
|
:return float: The computed penalty.
|
|
"""
|
|
return interp(impairment_value, boundary_list['up_to_boundary'], boundary_list['penalty_value'],
|
|
left=float('inf'), right=float('inf'))
|
|
|
|
def calc_penalties(self, penalties):
|
|
"""Updates the Transceiver property with penalties (CD, PMD, etc.) of the received channels in dB.
|
|
Penalties are linearly interpolated between given points and set to 'inf' outside interval.
|
|
"""
|
|
self.penalties = {impairment: self._calc_penalty(getattr(self, impairment), boundary_list)
|
|
for impairment, boundary_list in penalties.items()}
|
|
self.total_penalty = sum(list(self.penalties.values()), axis=0)
|
|
|
|
def _calc_snr(self, spectral_info):
|
|
with errstate(divide='ignore'):
|
|
self.propagated_labels = spectral_info.label
|
|
self.baud_rate = spectral_info.baud_rate
|
|
ratio_01nm = lin2db(12.5e9 / self.baud_rate)
|
|
# set raw values to record original calculation, before update_snr()
|
|
self.raw_osnr_ase = lin2db(spectral_info.signal / spectral_info.ase)
|
|
self.raw_osnr_ase_01nm = self.raw_osnr_ase - ratio_01nm
|
|
self.raw_osnr_nli = lin2db(spectral_info.signal / spectral_info.nli)
|
|
self.raw_snr = lin2db(spectral_info.signal / (spectral_info.ase + spectral_info.nli))
|
|
self.raw_snr_01nm = self.raw_snr - ratio_01nm
|
|
|
|
self.osnr_ase = self.raw_osnr_ase
|
|
self.osnr_ase_01nm = self.raw_osnr_ase_01nm
|
|
self.osnr_nli = self.raw_osnr_nli
|
|
self.snr = self.raw_snr
|
|
self.snr_01nm = self.raw_snr_01nm
|
|
|
|
def update_snr(self, *args):
|
|
"""
|
|
snr_added in 0.1nm
|
|
compute SNR penalties such as transponder Tx_osnr or Roadm add_drop_osnr
|
|
only applied in request.py / propagate on the last Trasceiver node of the path
|
|
all penalties are added in a single call because to avoid uncontrolled cumul
|
|
"""
|
|
# use raw_values so that the added SNR penalties are not cumulated
|
|
snr_added = 0
|
|
for s in args:
|
|
if s is not None:
|
|
snr_added += db2lin(-s)
|
|
snr_added = -lin2db(snr_added)
|
|
self.osnr_ase = snr_sum(self.raw_osnr_ase, self.baud_rate, snr_added)
|
|
self.snr = snr_sum(self.raw_snr, self.baud_rate, snr_added)
|
|
self.osnr_ase_01nm = snr_sum(self.raw_osnr_ase_01nm, 12.5e9, snr_added)
|
|
self.snr_01nm = snr_sum(self.raw_snr_01nm, 12.5e9, snr_added)
|
|
|
|
@property
|
|
def to_json(self):
|
|
"""Converts the transceiver's state to a JSON-compatible dictionary.
|
|
|
|
:return Dict[str, Any]: JSON representation of the transceiver.
|
|
"""
|
|
return {'uid': self.uid,
|
|
'type': type(self).__name__,
|
|
'metadata': {
|
|
'location': self.metadata['location']._asdict()
|
|
}
|
|
}
|
|
|
|
def __repr__(self):
|
|
"""Returns a string representation of the transceiver.
|
|
|
|
:return str: String representation of the transceiver.
|
|
"""
|
|
return (f'{type(self).__name__}('
|
|
f'uid={self.uid!r}, '
|
|
f'osnr_ase_01nm={self.osnr_ase_01nm!r}, '
|
|
f'osnr_ase={self.osnr_ase!r}, '
|
|
f'osnr_nli={self.osnr_nli!r}, '
|
|
f'snr={self.snr!r}, '
|
|
f'chromatic_dispersion={self.chromatic_dispersion!r}, '
|
|
f'pmd={self.pmd!r}, '
|
|
f'pdl={self.pdl!r}, '
|
|
f'latency={self.latency!r}, '
|
|
f'penalties={self.penalties!r})')
|
|
|
|
def __str__(self):
|
|
"""Returns a formatted string representation of the transceiver.
|
|
|
|
:return str: Formatted string representation of the transceiver.
|
|
"""
|
|
if self.snr is None or self.osnr_ase is None:
|
|
return f'{type(self).__name__} {self.uid}'
|
|
|
|
snr = per_label_average(self.snr, self.propagated_labels)
|
|
osnr_ase = per_label_average(self.osnr_ase, self.propagated_labels)
|
|
osnr_ase_01nm = per_label_average(self.osnr_ase_01nm, self.propagated_labels)
|
|
snr_01nm = per_label_average(self.snr_01nm, self.propagated_labels)
|
|
tx_power_dbm = per_label_average(watt2dbm(self.tx_power), self.propagated_labels)
|
|
cd = mean(self.chromatic_dispersion)
|
|
pmd = mean(self.pmd)
|
|
pdl = mean(self.pdl)
|
|
latency = mean(self.latency)
|
|
|
|
result = '\n'.join([f'{type(self).__name__} {self.uid}',
|
|
f' GSNR (0.1nm, dB): {pretty_summary_print(snr_01nm)}',
|
|
f' GSNR (signal bw, dB): {pretty_summary_print(snr)}',
|
|
f' OSNR ASE (0.1nm, dB): {pretty_summary_print(osnr_ase_01nm)}',
|
|
f' OSNR ASE (signal bw, dB): {pretty_summary_print(osnr_ase)}',
|
|
f' CD (ps/nm): {cd:.2f}',
|
|
f' PMD (ps): {pmd:.2f}',
|
|
f' PDL (dB): {pdl:.2f}',
|
|
f' Latency (ms): {latency:.2f}',
|
|
f' Actual pch out (dBm): {pretty_summary_print(tx_power_dbm)}'])
|
|
|
|
cd_penalty = self.penalties.get('chromatic_dispersion')
|
|
if cd_penalty is not None:
|
|
result += f'\n CD penalty (dB): {mean(cd_penalty):.2f}'
|
|
pmd_penalty = self.penalties.get('pmd')
|
|
if pmd_penalty is not None:
|
|
result += f'\n PMD penalty (dB): {mean(pmd_penalty):.2f}'
|
|
pdl_penalty = self.penalties.get('pdl')
|
|
if pdl_penalty is not None:
|
|
result += f'\n PDL penalty (dB): {mean(pdl_penalty):.2f}'
|
|
|
|
return result
|
|
|
|
def __call__(self, spectral_info):
|
|
"""Propagates spectral information through the transceiver:
|
|
i) computes the accumulated impairments and convert them into penalties for each cariier,
|
|
ii) computes the resulting OSNR and GSNR per carrier and records the values into the attributes
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInfomation)
|
|
:return SpectralInformation: The updated spectral information object.
|
|
"""
|
|
self.tx_power = spectral_info.tx_power
|
|
self._calc_snr(spectral_info)
|
|
self._calc_cd(spectral_info)
|
|
self._calc_pmd(spectral_info)
|
|
self._calc_pdl(spectral_info)
|
|
self._calc_latency(spectral_info)
|
|
return spectral_info
|
|
|
|
|
|
class Roadm(_Node):
|
|
"""Represents a Reconfigurable Optical Add-Drop Multiplexer (ROADM).
|
|
|
|
:ivar ref_pch_out_dbm: Reference output power in dBm.
|
|
:vartype ref_pch_out_dbm: float
|
|
:ivar loss: Total loss experienced by the ROADM.
|
|
:vartype loss: float
|
|
:ivar loss_pch_db: Loss per channel in dB.
|
|
:vartype loss_pch_db: numpy.ndarray
|
|
:ivar ref_effective_loss: Effective loss for a reference carrier.
|
|
:vartype ref_effective_loss: float
|
|
:ivar passive: True, indicates that the ROADM is passive.
|
|
:vartype passive: bool
|
|
:ivar restrictions: Restrictions on the ROADM.
|
|
:vartype restrictions: dict
|
|
:ivar propagated_labels: Labels propagated by the ROADM.
|
|
:vartype propagated_labels: numpy.ndarray[str]
|
|
:ivar target_pch_out_dbm: Target output power in dBm.
|
|
:vartype target_pch_out_dbm: float
|
|
:ivar target_psd_out_mWperGHz: Target PSD output in mW/GHz.
|
|
:vartype target_psd_out_mWperGHz: float
|
|
:ivar target_out_mWperSlotWidth: Target output power per slot width.
|
|
:vartype target_out_mWperSlotWidth: float
|
|
:ivar per_degree_pch_out_dbm: Per degree target output power.
|
|
:vartype per_degree_pch_out_dbm: Dict[str, float]
|
|
:ivar per_degree_pch_psd: Per degree target PSD output.
|
|
:vartype per_degree_pch_psd: Dict[str, float]
|
|
:ivar per_degree_pch_psw: Per degree target output per slot width.
|
|
:vartype per_degree_pch_psw: Dict[str, float]
|
|
:ivar ref_pch_in_dbm: Reference input power in dBm.
|
|
:vartype ref_pch_in_dbm: Dict[str, float]
|
|
:ivar ref_carrier: Reference carrier.
|
|
:vartype ref_carrier: ReferenceCarrier
|
|
:ivar roadm_paths: Internal paths for the ROADM.
|
|
:vartype roadm_paths: Dict[str, Any]
|
|
:ivar roadm_path_impairments: Impairment profiles for the ROADM paths.
|
|
:vartype roadm_path_impairments: Dict
|
|
:ivar per_degree_impairments: Per degree impairments.
|
|
:vartype per_degree_impairments: Dict[str, Any]
|
|
:ivar design_bands: Design bands parameters.
|
|
:vartype design_bands: List[Dict]
|
|
:ivar per_degree_design_bands: Per degree design bands parameters.
|
|
:vartype per_degree_design_bands: Dict[str, Dict]
|
|
"""
|
|
def __init__(self, *args, params=None, **kwargs):
|
|
"""Constructor method
|
|
"""
|
|
# pylint: disable=C0103
|
|
if not params:
|
|
params = {}
|
|
try:
|
|
with warnings.catch_warnings(record=True) as caught_warnings:
|
|
super().__init__(*args, params=RoadmParams(**params), **kwargs)
|
|
if caught_warnings:
|
|
msg = f'In ROADM {kwargs["uid"]}: {caught_warnings[0].message}'
|
|
_logger.warning(msg)
|
|
except ParametersError as e:
|
|
msg = f'Config error in {kwargs["uid"]}: {e}'
|
|
raise ParametersError(msg) from e
|
|
|
|
# Target output power for the reference carrier, can only be computed on the fly, because it depends
|
|
# on the path, since it depends on the equalization definition on the degree.
|
|
self.ref_pch_out_dbm = None
|
|
self.loss = 0 # auto-design interest
|
|
self.loss_pch_db = None
|
|
|
|
# Optical power of carriers are equalized by the ROADM, so that the experienced loss is not the same for
|
|
# different carriers. The ref_effective_loss records the loss for a reference carrier.
|
|
self.ref_effective_loss = None
|
|
|
|
self.passive = True
|
|
self.restrictions = self.params.restrictions
|
|
self.propagated_labels = [""]
|
|
# element contains the two types of equalisation parameters, but only one is not None or empty
|
|
# target for equalization for the ROADM only one must be not None
|
|
self.target_pch_out_dbm = self.params.target_pch_out_db
|
|
self.target_psd_out_mWperGHz = self.params.target_psd_out_mWperGHz
|
|
self.target_out_mWperSlotWidth = self.params.target_out_mWperSlotWidth
|
|
self.per_degree_pch_out_dbm = self.params.per_degree_pch_out_db
|
|
self.per_degree_pch_psd = self.params.per_degree_pch_psd
|
|
self.per_degree_pch_psw = self.params.per_degree_pch_psw
|
|
self.ref_pch_in_dbm = {}
|
|
self.ref_carrier = None
|
|
# Define the nature of from-to internal connection: express-path, drop-path, add-path
|
|
# roadm_paths contains a list of RoadmPath object for each path crossing the ROADM
|
|
self.roadm_paths = []
|
|
# roadm_path_impairments contains a dictionnary of impairments profiles corresponding to type_variety
|
|
# first listed add, drop an express constitute the default
|
|
self.roadm_path_impairments = self.params.roadm_path_impairments
|
|
# per degree definitions, in case some degrees have particular deviations with respect to default.
|
|
self.per_degree_impairments = {f'{i["from_degree"]}-{i["to_degree"]}': {"from_degree": i["from_degree"],
|
|
"to_degree": i["to_degree"],
|
|
"impairment_id": i["impairment_id"]}
|
|
for i in self.params.per_degree_impairments}
|
|
self.design_bands = deepcopy(self.params.design_bands)
|
|
self.per_degree_design_bands = deepcopy(self.params.per_degree_design_bands)
|
|
|
|
@property
|
|
def to_json(self):
|
|
"""Converts the ROADM's state to a JSON-compatible dictionary.
|
|
|
|
:return Dict[str, Any]: JSON representation of the ROADM.
|
|
"""
|
|
if self.target_pch_out_dbm is not None:
|
|
equalisation, value = 'target_pch_out_db', self.target_pch_out_dbm
|
|
elif self.target_psd_out_mWperGHz is not None:
|
|
equalisation, value = 'target_psd_out_mWperGHz', self.target_psd_out_mWperGHz
|
|
elif self.target_out_mWperSlotWidth is not None:
|
|
equalisation, value = 'target_out_mWperSlotWidth', self.target_out_mWperSlotWidth
|
|
else:
|
|
assert False, 'There must be one default equalization defined in ROADM'
|
|
to_json = {
|
|
'uid': self.uid,
|
|
'type': type(self).__name__,
|
|
'type_variety': self.type_variety,
|
|
'params': {
|
|
equalisation: value,
|
|
'restrictions': self.restrictions
|
|
},
|
|
'metadata': {
|
|
'location': self.metadata['location']._asdict()
|
|
}
|
|
}
|
|
# several per_degree equalization may coexist on different degrees
|
|
if self.per_degree_pch_out_dbm:
|
|
to_json['params']['per_degree_pch_out_db'] = self.per_degree_pch_out_dbm
|
|
if self.per_degree_pch_psd:
|
|
to_json['params']['per_degree_psd_out_mWperGHz'] = self.per_degree_pch_psd
|
|
if self.per_degree_pch_psw:
|
|
to_json['params']['per_degree_psd_out_mWperSlotWidth'] = self.per_degree_pch_psw
|
|
if self.per_degree_impairments:
|
|
to_json['params']['per_degree_impairments'] = list(self.per_degree_impairments.values())
|
|
|
|
if self.params.design_bands is not None:
|
|
if len(self.params.design_bands) > 1:
|
|
to_json['params']['design_bands'] = self.params.design_bands
|
|
if self.params.per_degree_design_bands:
|
|
to_json['params']['per_degree_design_bands'] = self.params.per_degree_design_bands
|
|
return to_json
|
|
|
|
def __repr__(self):
|
|
"""Returns a string representation of the ROADM.
|
|
|
|
:return str: String representation of the ROADM.
|
|
"""
|
|
return f'{type(self).__name__}(uid={self.uid!r}, loss={self.loss!r})'
|
|
|
|
def __str__(self):
|
|
"""Returns a formatted string representation of the ROADM.
|
|
|
|
:return str: Formatted string representation of the ROADM.
|
|
"""
|
|
if self.ref_effective_loss is None:
|
|
return f'{type(self).__name__} {self.uid}'
|
|
|
|
total_pch = pretty_summary_print(per_label_average(self.pch_out_dbm, self.propagated_labels))
|
|
total_loss = pretty_summary_print(per_label_average(self.loss_pch_db, self.propagated_labels))
|
|
return '\n'.join([f'{type(self).__name__} {self.uid}',
|
|
f' Type_variety: {self.type_variety}',
|
|
f' Reference loss (dB): {self.ref_effective_loss:.2f}',
|
|
f' Actual loss (dB): {total_loss}',
|
|
f' Reference pch out (dBm): {self.ref_pch_out_dbm:.2f}',
|
|
f' Actual pch out (dBm): {total_pch}'])
|
|
|
|
def get_roadm_target_power(self, spectral_info: SpectralInformation = None) -> Union[float, ndarray]:
|
|
"""Computes the power in dBm for a reference carrier or for a spectral information.
|
|
power is computed based on equalization target.
|
|
if spectral_info baud_rate is baud_rate = [32e9, 42e9, 64e9, 42e9, 32e9], and
|
|
target_pch_out_dbm is defined to -20 dbm, then the function returns an array of powers
|
|
[-20, -20, -20, -20, -20]
|
|
if target_psd_out_mWperGHz is defined instead with 3.125e-4mW/GHz then it returns
|
|
[-20, -18.819, -16.9897, -18.819, -20]
|
|
if instead a reference_baud_rate is defined, the functions computes the result for a
|
|
single reference carrier whose baud_rate is reference_baudrate.
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation, optional
|
|
:return: Target power in dBm.
|
|
:rtype: Union[float, ndarray]
|
|
"""
|
|
if spectral_info:
|
|
if self.target_pch_out_dbm is not None:
|
|
return full(len(spectral_info.channel_number), self.target_pch_out_dbm)
|
|
if self.target_psd_out_mWperGHz is not None:
|
|
return psd2powerdbm(self.target_psd_out_mWperGHz, spectral_info.baud_rate)
|
|
if self.target_out_mWperSlotWidth is not None:
|
|
return psd2powerdbm(self.target_out_mWperSlotWidth, spectral_info.slot_width)
|
|
else:
|
|
if self.target_pch_out_dbm is not None:
|
|
return self.target_pch_out_dbm
|
|
if self.target_psd_out_mWperGHz is not None:
|
|
return psd2powerdbm(self.target_psd_out_mWperGHz, self.ref_carrier.baud_rate)
|
|
if self.target_out_mWperSlotWidth is not None:
|
|
return psd2powerdbm(self.target_out_mWperSlotWidth, self.ref_carrier.slot_width)
|
|
return None
|
|
|
|
def get_per_degree_ref_power(self, degree):
|
|
"""Get the target power in dBm out of ROADM degree for the reference bandwidth
|
|
If no equalization is defined on this degree use the ROADM level one.
|
|
|
|
:param degree: The degree identifier.
|
|
:type degree: str
|
|
|
|
:return float: Target power in dBm
|
|
"""
|
|
if degree in self.per_degree_pch_out_dbm:
|
|
return self.per_degree_pch_out_dbm[degree]
|
|
if degree in self.per_degree_pch_psd:
|
|
return psd2powerdbm(self.per_degree_pch_psd[degree], self.ref_carrier.baud_rate)
|
|
if degree in self.per_degree_pch_psw:
|
|
return psd2powerdbm(self.per_degree_pch_psw[degree], self.ref_carrier.slot_width)
|
|
return self.get_roadm_target_power()
|
|
|
|
def get_per_degree_power(self, degree, spectral_info):
|
|
"""Get the target power in dBm out of ROADM degree for the spectral information
|
|
If no equalization is defined on this degree use the ROADM level one.
|
|
|
|
:param degree: The degree identifier.
|
|
:type degree: str
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
|
|
:return float: Target power in dBm.
|
|
"""
|
|
if degree in self.per_degree_pch_out_dbm:
|
|
return self.per_degree_pch_out_dbm[degree]
|
|
if degree in self.per_degree_pch_psd:
|
|
return psd2powerdbm(self.per_degree_pch_psd[degree], spectral_info.baud_rate)
|
|
if degree in self.per_degree_pch_psw:
|
|
return psd2powerdbm(self.per_degree_pch_psw[degree], spectral_info.slot_width)
|
|
return self.get_roadm_target_power(spectral_info=spectral_info)
|
|
|
|
def propagate(self, spectral_info, degree, from_degree):
|
|
"""Equalization targets are read from topology file if defined and completed with default
|
|
definition of the library.
|
|
If the input power is lower than the target one, use the input power minus the ROADM loss
|
|
if is exists, because a ROADM doesn't amplify, it can only attenuate.
|
|
There is no difference for add or express : the same target is applied.
|
|
For the moment propagate operates with spectral info carriers all having the same source or destination.
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
:param degree: The egress degree.
|
|
:type degree: str
|
|
:param from_degree: The ingress degree.
|
|
:type from_degree: str
|
|
"""
|
|
# record input powers to compute the actual loss at the end of the process
|
|
input_power_dbm = watt2dbm(spectral_info.signal + spectral_info.nli + spectral_info.ase)
|
|
# apply min ROADM loss if it exists
|
|
roadm_maxloss_db = self.get_impairment('roadm-maxloss', spectral_info.frequency, from_degree, degree)
|
|
spectral_info.apply_attenuation_db(roadm_maxloss_db)
|
|
# records the total power after applying minimum loss
|
|
net_input_power_dbm = watt2dbm(spectral_info.signal + spectral_info.nli + spectral_info.ase)
|
|
# find the target power for the reference carrier
|
|
ref_per_degree_pch = self.get_per_degree_ref_power(degree)
|
|
# find the target powers for each signal carrier
|
|
per_degree_pch = self.get_per_degree_power(degree, spectral_info=spectral_info)
|
|
|
|
# Definition of ref_pch_out_dbm for the reference channel:
|
|
# Depending on propagation upstream from this ROADM, the input power might be smaller than
|
|
# the target power out configured for this ROADM degree's egress. Since ROADM does not amplify,
|
|
# the power out of the ROADM for the ref channel is the min value between target power and input power.
|
|
ref_pch_in_dbm = self.ref_pch_in_dbm[from_degree]
|
|
# Calculate the output power for the reference channel (only for visualization)
|
|
self.ref_pch_out_dbm = min(ref_pch_in_dbm - max(roadm_maxloss_db), ref_per_degree_pch)
|
|
|
|
# Definition of effective_loss:
|
|
# Optical power of carriers are equalized by the ROADM, so that the experienced loss is not the same for
|
|
# different carriers. effective_loss records the loss for the reference carrier.
|
|
# Calculate the effective loss for the reference channel
|
|
self.ref_effective_loss = ref_pch_in_dbm - self.ref_pch_out_dbm
|
|
|
|
# Calculate the target power per channel according to the equalization policy
|
|
target_power_per_channel = per_degree_pch + spectral_info.delta_pdb_per_channel
|
|
# Computation of the correction according to equalization policy
|
|
# If target_power_per_channel has some channels power above input power, then the whole target is reduced.
|
|
# For example, if user specifies delta_pdb_per_channel:
|
|
# freq1: 1dB, freq2: 3dB, freq3: -3dB, and target is -20dBm out of the ROADM,
|
|
# then the target power for each channel uses the specified delta_pdb_per_channel.
|
|
# target_power_per_channel[f1, f2, f3] = -19, -17, -23
|
|
# However if input_signal = -23, -16, -26, then the target can not be applied, because
|
|
# -23 < -19dBm and -26 < -23dBm. Then the target is only applied to signals whose power is above the
|
|
# threshold. others are left unchanged and unequalized.
|
|
# the new target is [-23, -17, -26]
|
|
# and the attenuation to apply is [-23, -16, -26] - [-23, -17, -26] = [0, 1, 0]
|
|
# note that this changes the previous behaviour that equalized all identical channels based on the one
|
|
# that had the min power.
|
|
# This change corresponds to a discussion held during coders call. Please look at this document for
|
|
# a reference: https://telecominfraproject.atlassian.net/wiki/spaces/OOPT/pages/669679645/PSE+Meeting+Minutes
|
|
correction = calculate_absolute_min_or_zero(net_input_power_dbm - target_power_per_channel)
|
|
new_target = target_power_per_channel - correction
|
|
delta_power = net_input_power_dbm - new_target
|
|
|
|
spectral_info.apply_attenuation_db(delta_power)
|
|
|
|
# Update the PMD information
|
|
pmd_impairment = self.get_impairment('roadm-pmd', spectral_info.frequency, from_degree, degree)
|
|
spectral_info.pmd = sqrt(spectral_info.pmd ** 2 + pmd_impairment ** 2)
|
|
|
|
# Update the PMD information
|
|
pdl_impairment = self.get_impairment('roadm-pdl', spectral_info.frequency, from_degree, degree)
|
|
spectral_info.pdl = sqrt(spectral_info.pdl ** 2 + pdl_impairment ** 2)
|
|
|
|
# Update the per channel power with the result of propagation
|
|
self.pch_out_dbm = watt2dbm(spectral_info.signal + spectral_info.nli + spectral_info.ase)
|
|
|
|
# Update the loss per channel and the labels
|
|
self.loss_pch_db = input_power_dbm - self.pch_out_dbm
|
|
self.propagated_labels = spectral_info.label
|
|
|
|
def set_roadm_paths(self, from_degree, to_degree, path_type, impairment_id=None):
|
|
"""set internal path type: express, drop or add with corresponding impairment
|
|
|
|
If no impairment id is defined, then use the first profile that matches the path_type in the
|
|
profile dictionnary.
|
|
|
|
:param from_degree: The ingress degree.
|
|
:type from_degree: str
|
|
:param to_degree: The egress degree.
|
|
:type to_degree: str
|
|
:param path_type: Type of the path (express, drop, add).-
|
|
:type path_type: str
|
|
:param impairment_id: Impairment profile ID. This parameter is optional.
|
|
:type impairment_id: int, optional
|
|
"""
|
|
# initialize impairment with params.pmd, params.cd
|
|
# if more detailed parameters are available for the Roadm, the use them instead
|
|
roadm_global_impairment = {
|
|
'impairment': [{
|
|
'roadm-pmd': self.params.pmd,
|
|
'roadm-pdl': self.params.pdl,
|
|
'frequency-range': {
|
|
'lower-frequency': None,
|
|
'upper-frequency': None
|
|
}}]}
|
|
if path_type in ['add', 'drop']:
|
|
# without detailed imparments, we assume that add OSNR contribution is the same as drop contribution
|
|
# add_drop_osnr_db = - 10log10(1/add_osnr + 1/drop_osnr) with add_osnr = drop_osnr
|
|
# = add_osnr_db + 10log10(2)
|
|
roadm_global_impairment['impairment'][0]['roadm-osnr'] = self.params.add_drop_osnr + lin2db(2)
|
|
impairment = RoadmImpairment(roadm_global_impairment)
|
|
|
|
if impairment_id is None:
|
|
# get the first item in the type variety that matches the path_type
|
|
for path_impairment_id, path_impairment in self.roadm_path_impairments.items():
|
|
if path_impairment.path_type == path_type:
|
|
impairment = path_impairment
|
|
impairment_id = path_impairment_id
|
|
break
|
|
# at this point, path_type is not part of roadm_path_impairment, impairment and impairment_id are None
|
|
else:
|
|
if impairment_id in self.roadm_path_impairments:
|
|
impairment = self.roadm_path_impairments[impairment_id]
|
|
else:
|
|
msg = f'ROADM {self.uid}: impairment profile id {impairment_id} is not defined in library'
|
|
raise NetworkTopologyError(msg)
|
|
# print(from_degree, to_degree, path_type)
|
|
self.roadm_paths.append(RoadmPath(from_degree=from_degree, to_degree=to_degree, path_type=path_type,
|
|
impairment_id=impairment_id, impairment=impairment))
|
|
|
|
def get_roadm_path(self, from_degree: str, to_degree: str):
|
|
"""Get internal path type impairment.
|
|
|
|
:param from_degree: The ingress degree.
|
|
:type from_degree: str
|
|
:param to_degree: The egress degree.
|
|
:type to_degree: str
|
|
|
|
:return Any: The roadm path object.
|
|
"""
|
|
for roadm_path in self.roadm_paths:
|
|
if roadm_path.from_degree == from_degree and roadm_path.to_degree == to_degree:
|
|
return roadm_path
|
|
msg = f'Could not find from_degree-to_degree {from_degree}-{to_degree} path in ROADM {self.uid}'
|
|
raise NetworkTopologyError(msg)
|
|
|
|
def get_per_degree_impairment_id(self, from_degree: str, to_degree: str) -> Union[int, None]:
|
|
"""returns the id of the impairment if the degrees are in the per_degree tab.
|
|
|
|
:param from_degree: The ingress degree.
|
|
:type from_degree: str
|
|
:param to_degree: The egress degree.
|
|
:type to_degree: str
|
|
|
|
:return Union[int, None]: The impairment ID or None if not found.
|
|
"""
|
|
if f'{from_degree}-{to_degree}' in self.per_degree_impairments.keys():
|
|
return self.per_degree_impairments[f'{from_degree}-{to_degree}']["impairment_id"]
|
|
return None
|
|
|
|
def get_path_type_per_id(self, impairment_id: int) -> Union[str, None]:
|
|
"""returns the path_type of the impairment if the id is defined
|
|
|
|
:param impairment_id: The impairment ID.
|
|
:type impairment_id: int
|
|
|
|
:return Union[str, None]: The path type or None if not found.
|
|
"""
|
|
if impairment_id in self.roadm_path_impairments.keys():
|
|
return self.roadm_path_impairments[impairment_id].path_type
|
|
return None
|
|
|
|
def get_impairment(self, impairment: str, frequency_array: array, from_degree: str, degree: str) \
|
|
-> array:
|
|
"""
|
|
Retrieves the specified impairment values for the given frequency array.
|
|
|
|
:param impairment: The type of impairment to retrieve (e.g., roadm-pmd, roadm-maxloss).
|
|
:type impairment: str
|
|
:param frequency_array: The frequencies at which to check for impairments.
|
|
:type frequency_array: numpy.ndarray
|
|
:param from_degree: The ingress degree for the ROADM internal path.
|
|
:type from_degree: str
|
|
:param degree: The egress degree for the ROADM internal path.
|
|
:type degree: str
|
|
|
|
:return array: An array of impairment values for the specified frequencies.
|
|
"""
|
|
result = []
|
|
impairment_per_band = self.get_roadm_path(from_degree, degree).impairment.impairments
|
|
for frequency in frequency_array:
|
|
for item in impairment_per_band:
|
|
f_min = item['frequency-range']['lower-frequency']
|
|
f_max = item['frequency-range']['upper-frequency']
|
|
if (f_min is None or f_min <= frequency <= f_max):
|
|
item[impairment] = item.get(impairment, RoadmImpairment.default_values[impairment])
|
|
if item[impairment] is not None:
|
|
result.append(item[impairment])
|
|
break # Stop searching after the first match for this frequency
|
|
if result:
|
|
return array(result)
|
|
|
|
def __call__(self, spectral_info: SpectralInformation, degree: str, from_degree: str) -> SpectralInformation:
|
|
"""Propagate from_degree to degree in the ROADM
|
|
|
|
param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
:param degree: The egress degree.
|
|
:type degree: str
|
|
:param from_degree: The ingress degree.
|
|
:type from_degree: str
|
|
|
|
:return SpectralInformation: The updated spectral information object.
|
|
"""
|
|
self.propagate(spectral_info, degree=degree, from_degree=from_degree)
|
|
return spectral_info
|
|
|
|
|
|
class Fused(_Node):
|
|
"""Represents a fused optical element in the network.
|
|
|
|
:ivar loss (float): Loss experienced by the fused element.
|
|
:ivar passive (bool): Indicates if the fused element is passive.
|
|
"""
|
|
def __init__(self, *args, params=None, **kwargs):
|
|
"""Constructor method
|
|
"""
|
|
if not params:
|
|
params = {}
|
|
super().__init__(*args, params=FusedParams(**params), **kwargs)
|
|
self.loss = self.params.loss
|
|
self.passive = True
|
|
|
|
@property
|
|
def to_json(self):
|
|
"""Converts the fused element's state to a JSON-compatible dictionary.
|
|
|
|
:return Dict[str, Any]: JSON representation of the fused element.
|
|
"""
|
|
return {'uid': self.uid,
|
|
'type': type(self).__name__,
|
|
'params': {
|
|
'loss': self.loss
|
|
},
|
|
'metadata': {
|
|
'location': self.metadata['location']._asdict()
|
|
}
|
|
}
|
|
|
|
def __repr__(self):
|
|
"""Returns a string representation of the fused element.
|
|
|
|
:return str: String representation of the fused element.
|
|
"""
|
|
return f'{type(self).__name__}(uid={self.uid!r}, loss={self.loss!r})'
|
|
|
|
def __str__(self):
|
|
"""Returns a formatted string representation of the fused element.
|
|
|
|
:return str: Formatted string representation of the fused element.
|
|
"""
|
|
return '\n'.join([f'{type(self).__name__} {self.uid}',
|
|
f' loss (dB): {self.loss:.2f}'])
|
|
|
|
def propagate(self, spectral_info):
|
|
"""Applies loss to the spectral information.
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
"""
|
|
spectral_info.apply_attenuation_db(self.loss)
|
|
|
|
def __call__(self, spectral_info):
|
|
"""Propagates spectral information through the fused element.
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
|
|
:return SpectralInformation: The updated spectral information object.
|
|
"""
|
|
self.propagate(spectral_info)
|
|
return spectral_info
|
|
|
|
|
|
class Fiber(_Node):
|
|
"""Represents an optical fiber element in the network.
|
|
|
|
:ivar pch_out_db: Output power per channel in dBm.
|
|
:vartype pch_out_db: float
|
|
:ivar passive: Indicates if the fiber is passive.
|
|
:vartype passive: bool
|
|
:ivar propagated_labels: Labels propagated by the fiber.
|
|
:vartype propagated_labels: List[str]
|
|
"""
|
|
def __init__(self, *args, params=None, **kwargs):
|
|
"""Constructor method
|
|
"""
|
|
if not params:
|
|
params = {}
|
|
try:
|
|
super().__init__(*args, params=FiberParams(**params), **kwargs)
|
|
except ParametersError as e:
|
|
msg = f'Config error in {kwargs["uid"]}: {e}'
|
|
raise ParametersError(msg) from e
|
|
self.pch_out_db = None
|
|
self.passive = True
|
|
self.propagated_labels = [""]
|
|
|
|
# Lumped losses
|
|
z_lumped_losses = array([lumped['position'] for lumped in self.params.lumped_losses]) # km
|
|
lumped_losses_power = array([lumped['loss'] for lumped in self.params.lumped_losses]) # dB
|
|
if not ((z_lumped_losses > 0) * (z_lumped_losses < 1e-3 * self.params.length)).all():
|
|
raise NetworkTopologyError("Lumped loss positions must be between 0 and the fiber length "
|
|
f"({1e-3 * self.params.length} km), boundaries excluded.")
|
|
self.lumped_losses = db2lin(- lumped_losses_power) # [linear units]
|
|
self.z_lumped_losses = array(z_lumped_losses) * 1e3 # [m]
|
|
self.ref_pch_in_dbm = None
|
|
|
|
@property
|
|
def to_json(self):
|
|
"""Converts the fiber's state to a JSON-compatible dictionary.
|
|
Adapts the json export: scalar or vector.
|
|
|
|
:return Dict[str, Any]: JSON representation of the fiber.
|
|
"""
|
|
params = {
|
|
# have to specify each because namedtupple cannot be updated :(
|
|
'length': round(self.params.length * 1e-3, 6),
|
|
'loss_coef': round(self.params.loss_coef * 1e3, 6),
|
|
'length_units': 'km',
|
|
'att_in': self.params.att_in,
|
|
'con_in': self.params.con_in,
|
|
'con_out': self.params.con_out
|
|
}
|
|
# Export pmd_coef only if it is not a default from library.
|
|
if self.params.pmd_coef_defined:
|
|
params['pmd_coef'] = self.params.pmd_coef
|
|
|
|
return {'uid': self.uid,
|
|
'type': type(self).__name__,
|
|
'type_variety': self.type_variety,
|
|
'params': params,
|
|
'metadata': {
|
|
'location': self.metadata['location']._asdict()
|
|
}
|
|
}
|
|
|
|
def __repr__(self):
|
|
"""Returns a string representation of the fiber.
|
|
|
|
:return str: String representation of the fiber.
|
|
"""
|
|
return f'{type(self).__name__}(uid={self.uid!r}, ' \
|
|
f'length={round(self.params.length * 1e-3, 1)!r}km, ' \
|
|
f'loss={round(self.loss, 1)!r}dB)'
|
|
|
|
def __str__(self):
|
|
"""Returns a formatted string representation of the fiber.
|
|
|
|
:return str: Formatted string representation of the fiber.
|
|
"""
|
|
if self.pch_out_db is None:
|
|
return f'{type(self).__name__} {self.uid}'
|
|
|
|
total_pch = pretty_summary_print(per_label_average(self.pch_out_dbm, self.propagated_labels))
|
|
return '\n'.join([f'{type(self).__name__} {self.uid}',
|
|
f' type_variety: {self.type_variety}',
|
|
f' length (km): {self.params.length * 1e-3:.2f}',
|
|
f' pad att_in (dB): {self.params.att_in:.2f}',
|
|
f' total loss (dB): {self.loss:.2f}',
|
|
f' (includes conn loss (dB) in: {self.params.con_in:.2f} out: {self.params.con_out:.2f})',
|
|
' (conn loss out includes EOL margin defined in eqpt_config.json)',
|
|
f' reference pch out (dBm): {self.pch_out_db:.2f}',
|
|
f' actual pch out (dBm): {total_pch}'])
|
|
|
|
def interpolate_parameter_over_spectrum(self, parameter, ref_frequency, spectrum_frequency, name):
|
|
"""Interpolates loss coefficient value given the input frequency.
|
|
|
|
:param parameter (ndarray): The parameter to interpolate.
|
|
:param ref_frequency (ndarray): Reference frequencies.
|
|
:param spectrum_frequency (ndarray): Frequencies of the spectrum.
|
|
:param name (str): Name of the parameter for error messages.
|
|
|
|
:return ndarray: Interpolated values.
|
|
"""
|
|
try:
|
|
interpolation = interp1d(ref_frequency, parameter)(spectrum_frequency)
|
|
return interpolation
|
|
except ValueError:
|
|
try:
|
|
start = spectrum_frequency[0]
|
|
stop = spectrum_frequency[-1]
|
|
except IndexError:
|
|
# when frequency is a 0-dimensionnal array
|
|
start = spectrum_frequency
|
|
stop = spectrum_frequency
|
|
raise SpectrumError('The spectrum bandwidth exceeds the frequency interval used to define the fiber '
|
|
f'{name} in "{type(self).__name__} {self.uid}".'
|
|
f'\nSpectrum f_min-f_max: {round(start * 1e-12, 2)}-'
|
|
f'{round(stop * 1e-12, 2)}'
|
|
f'\n{name} f_min-f_max: {round(ref_frequency[0] * 1e-12, 2)}-'
|
|
f'{round(ref_frequency[-1] * 1e-12, 2)}')
|
|
|
|
def loss_coef_func(self, frequency):
|
|
"""
|
|
Returns the loss coefficient (of a fibre) which can be uniform,
|
|
or made frequency-dependent defined via a dictionnary-model per instance
|
|
in the topology-file, or via a list-model ('LUT') in the equipment-file.
|
|
|
|
If a list-model is declared, then the legacy 'loss_coef' scalar is used to
|
|
offset the provided LUT-model for the given self.params.ref_frequency' such
|
|
that at the 'self.params.ref_frequency' the offset model values the 'loss_coef' scalar;
|
|
in case the 'loss_coef' scalar is not defined then no offset is applied.
|
|
|
|
In case a dictionary-model as well as a list-model are declared, then the legacy
|
|
dictionary-based model has priority.
|
|
|
|
:param frequency (Union[float, ndarray]): Frequency at which to compute the loss coefficient.
|
|
|
|
:return ndarray: Loss coefficient values.
|
|
"""
|
|
frequency = asarray(frequency)
|
|
if self.params.loss_coef.size > 1:
|
|
loss_coef = self.interpolate_parameter_over_spectrum(self.params.loss_coef, self.params.f_loss_ref,
|
|
frequency, 'Loss Coefficient')
|
|
else:
|
|
loss_coef = full(frequency.size, self.params.loss_coef)
|
|
return squeeze(loss_coef)
|
|
|
|
@property
|
|
def loss(self):
|
|
"""total loss including padding att_in: useful for polymorphism with roadm loss
|
|
|
|
:return float: Total loss in dB.
|
|
"""
|
|
return self.loss_coef_func(self.params.ref_frequency) * self.params.length + \
|
|
self.params.con_in + self.params.con_out + self.params.att_in + sum(lin2db(1 / self.lumped_losses))
|
|
|
|
def alpha(self, frequency):
|
|
"""Returns the linear exponent attenuation coefficient such that
|
|
:math: `lin_attenuation = e^{- alpha length}`
|
|
|
|
:param frequency: the frequency at which alpha is computed [Hz]
|
|
:return: alpha: power attenuation coefficient for f in frequency [Neper/m]
|
|
"""
|
|
return self.loss_coef_func(frequency) / (10 * log10(exp(1)))
|
|
|
|
def beta2(self, frequency=None):
|
|
"""Returns the beta2 chromatic dispersion coefficient as the second order term of the beta function
|
|
expanded as a Taylor series evaluated at the given frequency
|
|
|
|
:param frequency: the frequency at which alpha is computed [Hz]
|
|
:return: beta2: beta2 chromatic dispersion coefficient for f in frequency # 1/(m * Hz^2)
|
|
"""
|
|
frequency = asarray(self.params.ref_frequency if frequency is None else frequency)
|
|
if self.params.dispersion.size > 1:
|
|
dispersion = self.interpolate_parameter_over_spectrum(self.params.dispersion, self.params.f_dispersion_ref,
|
|
frequency, 'Chromatic Dispersion')
|
|
else:
|
|
if self.params.dispersion_slope is None:
|
|
dispersion = (frequency / self.params.f_dispersion_ref) ** 2 * self.params.dispersion
|
|
else:
|
|
wavelength = c / frequency
|
|
dispersion = self.params.dispersion + self.params.dispersion_slope * \
|
|
(wavelength - c / self.params.f_dispersion_ref) # noqa E127
|
|
beta2 = -((c / frequency) ** 2 * dispersion) / (2 * pi * c)
|
|
return beta2
|
|
|
|
def beta3(self, frequency=None):
|
|
"""Returns the beta3 chromatic dispersion coefficient as the third order term of the beta function
|
|
expanded as a Taylor series evaluated at the given frequency
|
|
|
|
:param frequency: the frequency at which alpha is computed [Hz]
|
|
:return: beta3: beta3 chromatic dispersion coefficient for f in frequency # 1/(m * Hz^3)
|
|
"""
|
|
frequency = asarray(self.params.ref_frequency if frequency is None else frequency)
|
|
if self.params.dispersion.size > 1:
|
|
beta3 = polyfit(self.params.f_dispersion_ref - self.params.ref_frequency,
|
|
self.beta2(self.params.f_dispersion_ref), 2)[1] / (2 * pi)
|
|
beta3 = full(frequency.size, beta3)
|
|
else:
|
|
if self.params.dispersion_slope is None:
|
|
beta3 = zeros(frequency.size)
|
|
else:
|
|
dispersion_slope = self.params.dispersion_slope
|
|
beta2 = self.beta2(frequency)
|
|
beta3 = (dispersion_slope - (4 * pi * frequency ** 3 / c ** 2) * beta2) / (
|
|
2 * pi * frequency ** 2 / c) ** 2 # noqa E127
|
|
return beta3
|
|
|
|
def gamma(self, frequency=None):
|
|
"""Returns the nonlinear interference coefficient such that
|
|
:math: `gamma(f) = 2 pi f n_2 c^{-1} A_{eff}^{-1}`
|
|
|
|
:param frequency: the frequency at which gamma is computed [Hz]
|
|
:return: gamma: nonlinear interference coefficient for f in frequency [1/(W m)]
|
|
"""
|
|
frequency = self.params.ref_frequency if frequency is None else frequency
|
|
return self.params.gamma_scaling(frequency)
|
|
|
|
def cr(self, frequency):
|
|
"""Returns the raman gain coefficient matrix including the vibrational loss
|
|
|
|
:param frequency: the frequency at which cr is computed [Hz]
|
|
:return: cr: raman gain coefficient matrix [1 / (W m)]
|
|
"""
|
|
df = outer(ones(frequency.shape), frequency) - outer(frequency, ones(frequency.shape))
|
|
effective_area_overlap = self.params.effective_area_overlap(frequency, frequency)
|
|
cr = interp(df, self.params.raman_coefficient.frequency_offset,
|
|
self.params.raman_coefficient.normalized_gamma_raman) * frequency / effective_area_overlap
|
|
vibrational_loss = outer(frequency, ones(frequency.shape)) / outer(ones(frequency.shape), frequency)
|
|
return cr * (cr >= 0) + cr * (cr < 0) * vibrational_loss # [1/(W m)]
|
|
|
|
def chromatic_dispersion(self, freq=None):
|
|
"""Returns accumulated chromatic dispersion (CD).
|
|
|
|
:param freq: the frequency at which the chromatic dispersion is computed
|
|
:return: chromatic dispersion: the accumulated dispersion [s/m]
|
|
"""
|
|
freq = self.params.ref_frequency if freq is None else freq
|
|
beta2 = self.beta2(freq)
|
|
beta3 = self.beta3(freq)
|
|
ref_f = self.params.ref_frequency
|
|
length = self.params.length
|
|
beta = beta2 + 2 * pi * beta3 * (freq - ref_f)
|
|
dispersion = -beta * 2 * pi * ref_f**2 / c
|
|
return dispersion * length
|
|
|
|
@property
|
|
def pmd(self):
|
|
"""differential group delay (PMD) [s]"""
|
|
return self.params.pmd_coef * sqrt(self.params.length)
|
|
|
|
def propagate(self, spectral_info: SpectralInformation):
|
|
"""Modifies the spectral information computing the attenuation, the non-linear interference generation,
|
|
the CD and PMD accumulation.
|
|
"""
|
|
# apply the attenuation due to the input connector loss
|
|
attenuation_in_db = self.params.con_in + self.params.att_in
|
|
spectral_info.apply_attenuation_db(attenuation_in_db)
|
|
|
|
# inter channels Raman effect
|
|
stimulated_raman_scattering = RamanSolver.calculate_stimulated_raman_scattering(spectral_info, self)
|
|
|
|
# NLI noise evaluated at the fiber input
|
|
spectral_info.nli += NliSolver.compute_nli(spectral_info, stimulated_raman_scattering, self)
|
|
|
|
# chromatic dispersion and pmd variations
|
|
spectral_info.chromatic_dispersion += self.chromatic_dispersion(spectral_info.frequency)
|
|
spectral_info.pmd = sqrt(spectral_info.pmd ** 2 + self.pmd ** 2)
|
|
|
|
# latency
|
|
spectral_info.latency += self.params.latency
|
|
|
|
# apply the attenuation due to the fiber losses
|
|
attenuation_fiber = stimulated_raman_scattering.loss_profile[:, -1]
|
|
spectral_info.apply_attenuation_lin(attenuation_fiber)
|
|
|
|
# apply the attenuation due to the output connector loss
|
|
attenuation_out_db = self.params.con_out
|
|
spectral_info.apply_attenuation_db(attenuation_out_db)
|
|
self.pch_out_dbm = watt2dbm(spectral_info.signal + spectral_info.nli + spectral_info.ase)
|
|
self.propagated_labels = spectral_info.label
|
|
|
|
def __call__(self, spectral_info):
|
|
"""Propagate through the fiber
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
|
|
:return SpectralInformation: The updated spectral information object.
|
|
"""
|
|
# _psig_in records the total signal power of the spectral information before propagation.
|
|
self._psig_in = sum(spectral_info.signal)
|
|
self.propagate(spectral_info)
|
|
# In case of Raman, the resulting loss of the fiber is not equivalent to self.loss
|
|
# because of Raman gain. The resulting loss is:
|
|
# power_out - power_in. We use the total signal power (sum on all channels) to compute
|
|
# this loss.
|
|
loss = round(lin2db(self._psig_in / sum(spectral_info.signal)), 2)
|
|
self.pch_out_db = self.ref_pch_in_dbm - loss
|
|
return spectral_info
|
|
|
|
|
|
class RamanFiber(Fiber):
|
|
"""Class representing a Raman fiber in a network.
|
|
|
|
Inherits from the Fiber class and adds specific parameters and methods for Raman amplification.
|
|
|
|
:ivar raman_pumps: A tuple of pump parameters for the Raman amplification.
|
|
:vartype raman_pumps: Tuple[PumpParams]
|
|
:ivar temperature: The operational temperature of the Raman fiber.
|
|
:vartype temperature: float
|
|
:raises NetworkTopologyError: If the fiber is defined as a RamanFiber without operational parameters,
|
|
or if required operational parameters are missing.
|
|
"""
|
|
def __init__(self, *args, params=None, **kwargs):
|
|
"""Constructor method
|
|
"""
|
|
super().__init__(*args, params=params, **kwargs)
|
|
if not self.operational:
|
|
raise NetworkTopologyError(f'Fiber element uid:{self.uid} '
|
|
'defined as RamanFiber without operational parameters')
|
|
|
|
if 'raman_pumps' not in self.operational:
|
|
raise NetworkTopologyError(f'Fiber element uid:{self.uid} '
|
|
'defined as RamanFiber without raman pumps description in operational')
|
|
|
|
if 'temperature' not in self.operational:
|
|
raise NetworkTopologyError(f'Fiber element uid:{self.uid} '
|
|
'defined as RamanFiber without temperature in operational')
|
|
|
|
pump_loss = db2lin(self.params.con_out)
|
|
self.raman_pumps = tuple(PumpParams(p['power'] / pump_loss, p['frequency'], p['propagation_direction'])
|
|
for p in self.operational['raman_pumps'])
|
|
self.temperature = self.operational['temperature']
|
|
|
|
@property
|
|
def to_json(self):
|
|
"""Converts the RamanFiber's state to a JSON-compatible dictionary.
|
|
Adapts the json export: scalar or vector.
|
|
|
|
:return Dict[str, Any]: JSON representation of the RamanFiber.
|
|
"""
|
|
return dict(super().to_json, operational=self.operational)
|
|
|
|
def __str__(self):
|
|
return super().__str__() + f'\n reference gain (dB): {round(self.estimated_gain, 2)}' \
|
|
+ f'\n actual gain (dB): {round(self.actual_raman_gain, 2)}'
|
|
|
|
def propagate(self, spectral_info: SpectralInformation):
|
|
"""Modifies the spectral information computing the attenuation, the non-linear interference generation,
|
|
the CD and PMD accumulation.
|
|
"""
|
|
# apply the attenuation due to the input connector loss
|
|
pin = watt2dbm(sum(spectral_info.signal))
|
|
attenuation_in_db = self.params.con_in + self.params.att_in
|
|
spectral_info.apply_attenuation_db(attenuation_in_db)
|
|
|
|
# Raman pumps and inter channel Raman effect
|
|
stimulated_raman_scattering = RamanSolver.calculate_stimulated_raman_scattering(spectral_info, self)
|
|
spontaneous_raman_scattering = \
|
|
RamanSolver.calculate_spontaneous_raman_scattering(spectral_info, stimulated_raman_scattering, self)
|
|
|
|
# nli and ase noise evaluated at the fiber input
|
|
spectral_info.nli += NliSolver.compute_nli(spectral_info, stimulated_raman_scattering, self)
|
|
spectral_info.ase += spontaneous_raman_scattering
|
|
|
|
# chromatic dispersion and pmd variations
|
|
spectral_info.chromatic_dispersion += self.chromatic_dispersion(spectral_info.frequency)
|
|
spectral_info.pmd = sqrt(spectral_info.pmd ** 2 + self.pmd ** 2)
|
|
|
|
# latency
|
|
spectral_info.latency += self.params.latency
|
|
|
|
# apply the attenuation due to the fiber losses
|
|
attenuation_fiber = stimulated_raman_scattering.loss_profile[:spectral_info.number_of_channels, -1]
|
|
|
|
spectral_info.apply_attenuation_lin(attenuation_fiber)
|
|
|
|
# apply the attenuation due to the output connector loss
|
|
attenuation_out_db = self.params.con_out
|
|
spectral_info.apply_attenuation_db(attenuation_out_db)
|
|
self.pch_out_dbm = watt2dbm(spectral_info.signal + spectral_info.nli + spectral_info.ase)
|
|
self.propagated_labels = spectral_info.label
|
|
pout = watt2dbm(sum(spectral_info.signal))
|
|
self.actual_raman_gain = self.loss + pout - pin
|
|
|
|
|
|
class Edfa(_Node):
|
|
"""Class representing an Erbium-Doped Fiber Amplifier (EDFA).
|
|
|
|
This class models the behavior of an EDFA, including its parameters, operational characteristics,
|
|
and methods for propagation of spectral information.
|
|
|
|
:ivar variety_list: A list of type_variety associated with the amplifier.
|
|
:vartype variety_list: Union[List[str], None]
|
|
:ivar interpol_dgt: Interpolated dynamic gain tilt defined per frequency on the amplifier band.
|
|
:vartype interpol_dgt: numpy.ndarray
|
|
:ivar interpol_gain_ripple: Interpolated gain ripple.
|
|
:vartype interpol_gain_ripple: numpy.ndarray
|
|
:ivar interpol_nf_ripple: Interpolated noise figure ripple.
|
|
:vartype interpol_nf_ripple: numpy.ndarray
|
|
:ivar channel_freq: SI channel frequencies.
|
|
:vartype channel_freq: numpy.ndarray
|
|
:ivar nf: Noise figure in dB at the operational gain target.
|
|
:vartype nf: numpy.ndarray
|
|
:ivar gprofile: Gain profile of the amplifier.
|
|
:vartype gprofile: numpy.ndarray
|
|
:ivar pin_db: Input power in dBm.
|
|
:vartype pin_db: float
|
|
:ivar nch: Number of channels.
|
|
:vartype nch: int
|
|
:ivar pout_db: Output power in dBm.
|
|
:vartype pout_db: float
|
|
:ivar target_pch_out_dbm: Target output power per channel in dBm.
|
|
:vartype target_pch_out_dbm: float
|
|
:ivar effective_pch_out_db: Effective output power per channel in dBm.
|
|
:vartype effective_pch_out_db: float
|
|
:ivar passive: Indicates if the fiber is passive.
|
|
:vartype passive: bool
|
|
:ivar att_in: Input attenuation in dB.
|
|
:vartype att_in: float
|
|
:ivar effective_gain: Effective gain of the amplifier.
|
|
:vartype effective_gain: float
|
|
:ivar delta_p: Delta power defined by the operational parameters.
|
|
:vartype delta_p: float
|
|
:ivar _delta_p: Computed delta power during design.
|
|
:vartype _delta_p: float
|
|
:ivar tilt_target: Target tilt defined per wavelength on the amplifier band.
|
|
:vartype tilt_target: float
|
|
:ivar out_voa: Output variable optical attenuator setting.
|
|
:vartype out_voa: float
|
|
:ivar in_voa: Input variable optical attenuator setting.
|
|
:vartype in_voa: float
|
|
:ivar propagated_labels: Labels propagated by the amplifier.
|
|
:vartype propagated_labels: numpy.ndarray
|
|
:raises ParametersError: If there are conflicting amplifier definitions for the same frequency
|
|
band during initialization.
|
|
:raises ValueError: If the input spectral information does not match any defined amplifier bands
|
|
during propagation.
|
|
"""
|
|
def __init__(self, *args, params=None, operational=None, **kwargs):
|
|
"""Constructor method for initializing the EDFA.
|
|
|
|
:param args: Positional arguments for the parent class.
|
|
:param params: Parameters for the EDFA, defaults to an empty dictionary if None.
|
|
:type params: dict, optional
|
|
:param operational: Operational parameters for the EDFA, defaults to an empty dictionary if None.
|
|
:type operational: dict, optional
|
|
"""
|
|
if params is None:
|
|
params = {}
|
|
if operational is None:
|
|
operational = {}
|
|
self.variety_list = kwargs.pop('variety_list', None)
|
|
try:
|
|
super().__init__(*args, params=EdfaParams(**params), operational=EdfaOperational(**operational), **kwargs)
|
|
except ParametersError as e:
|
|
raise ParametersError(f'{kwargs["uid"]}: {e}') from e
|
|
self.interpol_dgt = None # interpolated dynamic gain tilt defined per frequency on amp band
|
|
self.interpol_gain_ripple = None # gain ripple
|
|
self.interpol_nf_ripple = None # nf_ripple
|
|
self.channel_freq = None # SI channel frequencies
|
|
# nf, gprofile, pin and pout attributes are set by interpol_params
|
|
self.nf = None # dB edfa nf at operational.gain_target
|
|
self.gprofile = None
|
|
self.pin_db = None
|
|
self.nch = None
|
|
self.pout_db = None
|
|
self.target_pch_out_dbm = None
|
|
self.effective_pch_out_db = None
|
|
self.passive = False
|
|
self.att_in = None
|
|
self.effective_gain = self.operational.gain_target
|
|
# self.operational.delta_p is defined by user for reference channel
|
|
# self.delta_p is set with self.operational.delta_p, but it may be changed during design:
|
|
# - if operational.delta_p is None, self.delta_p is computed at design phase
|
|
# - if operational.delta_p can not be applied because of saturation, self.delta_p is recomputed
|
|
# - if power_mode is False, then it is set to None
|
|
self.delta_p = self.operational.delta_p
|
|
# self._delta_p contains computed delta_p during design even if power_mode is False
|
|
self._delta_p = None
|
|
self.tilt_target = self.operational.tilt_target # defined per lambda on the amp band
|
|
self.out_voa = self.operational.out_voa
|
|
self.in_voa = self.operational.in_voa
|
|
self.propagated_labels = [""]
|
|
|
|
@property
|
|
def to_json(self):
|
|
"""Converts the Edfa's state to a JSON-compatible dictionary.
|
|
Adapts the json export: scalar or vector.
|
|
|
|
:return Dict[str, Any]: JSON representation of the Edfa.
|
|
"""
|
|
# keep None value for tilt_target and avoid exporting -0.0 values
|
|
tilt_target = None
|
|
if self.tilt_target is not None:
|
|
tilt_target = 0 if self.tilt_target == -0.0 else self.tilt_target
|
|
|
|
_to_json = {
|
|
'uid': self.uid,
|
|
'type': type(self).__name__,
|
|
'type_variety': self.params.type_variety,
|
|
'operational': {
|
|
'gain_target': round(self.effective_gain, 6) if self.effective_gain is not None else None,
|
|
'delta_p': self.delta_p,
|
|
'tilt_target': round(tilt_target, 5) if tilt_target is not None else None,
|
|
# defined per lambda on the amp band
|
|
'out_voa': self.out_voa,
|
|
'in_voa': self.in_voa
|
|
},
|
|
'metadata': {
|
|
'location': self.metadata['location']._asdict()
|
|
}
|
|
}
|
|
return _to_json
|
|
|
|
def __repr__(self):
|
|
return (f'{type(self).__name__}(uid={self.uid!r}, '
|
|
f'type_variety={self.params.type_variety!r}, '
|
|
f'interpol_dgt={self.interpol_dgt!r}, '
|
|
f'interpol_gain_ripple={self.interpol_gain_ripple!r}, '
|
|
f'interpol_nf_ripple={self.interpol_nf_ripple!r}, '
|
|
f'channel_freq={self.channel_freq!r}, '
|
|
f'nf={self.nf!r}, '
|
|
f'gprofile={self.gprofile!r}, '
|
|
f'pin_db={self.pin_db!r}, '
|
|
f'pout_db={self.pout_db!r})')
|
|
|
|
def __str__(self):
|
|
if self.pin_db is None or self.pout_db is None:
|
|
return f'{type(self).__name__} {self.uid}'
|
|
nf = mean(self.nf)
|
|
total_pch = pretty_summary_print(per_label_average(self.pch_out_dbm, self.propagated_labels))
|
|
voas = f' input VOA (dB): {self.in_voa:.2f}\n' \
|
|
+ f' output VOA (dB): {self.out_voa:.2f}' if self.in_voa else \
|
|
f' output VOA (dB): {self.out_voa:.2f}'
|
|
return '\n'.join([f'{type(self).__name__} {self.uid}',
|
|
f' type_variety: {self.params.type_variety}',
|
|
f' effective gain(dB): {self.effective_gain:.2f}',
|
|
' (before att_in and before output VOA)',
|
|
# tilt_target is per lambda on the amp band
|
|
f' tilt-target(dB) {self.tilt_target if self.tilt_target not in [None, -0.0] else 0:.2f}',
|
|
# avoids -0.00 value for tilt_target
|
|
f' noise figure (dB): {nf:.2f}',
|
|
' (including att_in)',
|
|
f' pad att_in (dB): {self.att_in:.2f}',
|
|
f' Power In (dBm): {self.pin_db:.2f}',
|
|
f' Power Out (dBm): {self.pout_db:.2f}',
|
|
' Delta_P (dB): ' + (f'{self.delta_p:.2f}'
|
|
if self.delta_p is not None else 'None'),
|
|
' target pch (dBm): ' + (f'{self.target_pch_out_dbm:.2f}'
|
|
if self.target_pch_out_dbm is not None else 'None'),
|
|
f' actual pch out (dBm): {total_pch}',
|
|
voas])
|
|
|
|
def interpol_params(self, spectral_info):
|
|
"""interpolate SI channel frequencies with the edfa dgt and gain_ripple frquencies from JSON
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
"""
|
|
self.channel_freq = spectral_info.frequency
|
|
amplifier_freq = arrange_frequencies(len(self.params.dgt), self.params.f_min, self.params.f_max) # Hz
|
|
self.interpol_dgt = interp(spectral_info.frequency, amplifier_freq, self.params.dgt)
|
|
|
|
amplifier_freq = arrange_frequencies(len(self.params.gain_ripple), self.params.f_min, self.params.f_max) # Hz
|
|
self.interpol_gain_ripple = interp(spectral_info.frequency, amplifier_freq, self.params.gain_ripple)
|
|
|
|
amplifier_freq = arrange_frequencies(len(self.params.nf_ripple), self.params.f_min, self.params.f_max) # Hz
|
|
self.interpol_nf_ripple = interp(spectral_info.frequency, amplifier_freq, self.params.nf_ripple)
|
|
|
|
self.nch = spectral_info.number_of_channels
|
|
pin = spectral_info.signal + spectral_info.ase + spectral_info.nli
|
|
self.pin_db = watt2dbm(sum(pin))
|
|
# The following should be changed when we have the new spectral information including slot widths.
|
|
# For now, with homogeneous spectrum, we can calculate it as the difference between neighbouring channels.
|
|
self.slot_width = self.channel_freq[1] - self.channel_freq[0]
|
|
|
|
# check power saturation and correct effective gain & power accordingly:
|
|
# Compute the saturation accounting for actual power at the input of the amp
|
|
self.effective_gain = min(
|
|
self.effective_gain,
|
|
self.params.p_max - self.pin_db
|
|
)
|
|
|
|
# check power saturation and correct target_gain accordingly:
|
|
self.nf = self._calc_nf()
|
|
self.gprofile = self._gain_profile(pin)
|
|
|
|
pout = (pin + self.noise_profile(spectral_info)) * db2lin(self.gprofile)
|
|
self.pout_db = lin2db(sum(pout * 1e3))
|
|
# ase & nli are only calculated in signal bandwidth
|
|
# pout_db is not the absolute full output power (negligible if sufficient channels)
|
|
|
|
def _nf(self, type_def, nf_model, nf_fit_coeff, gain_min, gain_flatmax, gain_target):
|
|
# if hybrid raman, use edfa_gain_flatmax attribute, else use gain_flatmax
|
|
# gain_flatmax = getattr(params, 'edfa_gain_flatmax', params.gain_flatmax)
|
|
# pylint: disable=C0103
|
|
pad = max(gain_min - gain_target, 0)
|
|
gain_target += pad
|
|
dg = max(gain_flatmax - gain_target, 0)
|
|
if type_def == 'variable_gain':
|
|
g1a = gain_target - nf_model.delta_p - dg
|
|
nf_avg = lin2db(db2lin(nf_model.nf1) + db2lin(nf_model.nf2) / db2lin(g1a))
|
|
elif type_def == 'fixed_gain':
|
|
nf_avg = nf_model.nf0
|
|
elif type_def == 'openroadm':
|
|
# OpenROADM specifies OSNR vs. input power per channel for 50 GHz slot width so we
|
|
# scale it to 50 GHz based on actual slot width.
|
|
pin_ch_50GHz = self.pin_db - lin2db(self.nch) + lin2db(50e9 / self.slot_width)
|
|
# model OSNR = f(Pin per 50 GHz channel)
|
|
nf_avg = pin_ch_50GHz - polyval(nf_model.nf_coef, pin_ch_50GHz) + 58
|
|
elif type_def == 'openroadm_preamp':
|
|
# OpenROADM specifies OSNR vs. input power per channel for 50 GHz slot width so we
|
|
# scale it to 50 GHz based on actual slot width.
|
|
pin_ch_50GHz = self.pin_db - lin2db(self.nch) + lin2db(50e9 / self.slot_width)
|
|
# model OSNR = f(Pin per 50 GHz channel)
|
|
nf_avg = pin_ch_50GHz - min((4 * pin_ch_50GHz + 275) / 7, 33) + 58
|
|
elif type_def == 'openroadm_booster':
|
|
# model a zero-noise amp with "infinitely negative" (in dB) NF
|
|
nf_avg = float('-inf')
|
|
elif type_def == 'advanced_model':
|
|
nf_avg = polyval(nf_fit_coeff, -dg)
|
|
return nf_avg + pad, pad
|
|
|
|
def _calc_nf(self, avg=False):
|
|
"""nf calculation based on 2 models: self.params.nf_model.enabled from json import:
|
|
True => 2 stages amp modelling based on precalculated nf1, nf2 and delta_p in build_OA_json
|
|
False => polynomial fit based on self.params.nf_fit_coeff"""
|
|
# gain_min > gain_target TBD:
|
|
if self.params.type_def == 'dual_stage':
|
|
g1 = self.params.preamp_gain_flatmax
|
|
g2 = self.effective_gain - g1
|
|
nf1_avg, pad = self._nf(self.params.preamp_type_def,
|
|
self.params.preamp_nf_model,
|
|
self.params.preamp_nf_fit_coeff,
|
|
self.params.preamp_gain_min,
|
|
self.params.preamp_gain_flatmax,
|
|
g1)
|
|
# no padding expected for the 1stage because g1 = gain_max
|
|
nf2_avg, pad = self._nf(self.params.booster_type_def,
|
|
self.params.booster_nf_model,
|
|
self.params.booster_nf_fit_coeff,
|
|
self.params.booster_gain_min,
|
|
self.params.booster_gain_flatmax,
|
|
g2)
|
|
nf_avg = lin2db(db2lin(nf1_avg) + db2lin(nf2_avg - g1))
|
|
# no padding expected for the 1stage because g1 = gain_max
|
|
pad = 0
|
|
else:
|
|
nf_avg, pad = self._nf(self.params.type_def,
|
|
self.params.nf_model,
|
|
self.params.nf_fit_coeff,
|
|
self.params.gain_min,
|
|
self.params.gain_flatmax,
|
|
self.effective_gain)
|
|
|
|
self.att_in = pad # not used to attenuate carriers, only used in _repr_ and _str_
|
|
if avg:
|
|
return nf_avg
|
|
return self.interpol_nf_ripple + nf_avg # input VOA = 1 for 1 NF degradation
|
|
|
|
def noise_profile(self, spectral_info: SpectralInformation):
|
|
"""Computes amplifier ASE noise integrated over the signal bandwidth. This is calculated at amplifier input.
|
|
|
|
:return: the ASE power in W in the signal bandwidth bw for 96 channels
|
|
:rtype: numpy.ndarray
|
|
|
|
ASE power using per channel gain profile inputs:
|
|
|
|
NF_dB - Noise figure in dB, vector of length number of channels or
|
|
spectral slices
|
|
G_dB - Actual gain calculated for the EDFA, vector of length number of
|
|
channels or spectral slices
|
|
ffs - Center frequency grid of the channels or spectral slices in
|
|
THz, vector of length number of channels or spectral slices
|
|
dF - width of each channel or spectral slice in THz,
|
|
vector of length number of channels or spectral slices
|
|
|
|
OUTPUT:
|
|
|
|
ase_dBm - ase in dBm per channel or spectral slice
|
|
|
|
NOTE:
|
|
|
|
The output is the total ASE in the channel or spectral slice. For
|
|
50GHz channels the ASE BW is effectively 0.4nm. To get to noise power
|
|
in 0.1nm, subtract 6dB.
|
|
|
|
ONSR is usually quoted as channel power divided by
|
|
the ASE power in 0.1nm RBW, regardless of the width of the actual
|
|
channel. This is a historical convention from the days when optical
|
|
signals were much smaller (155Mbps, 2.5Gbps, ... 10Gbps) than the
|
|
resolution of the OSAs that were used to measure spectral power which
|
|
were set to 0.1nm resolution for convenience. Moving forward into
|
|
flexible grid and high baud rate signals, it may be convenient to begin
|
|
quoting power spectral density in the same BW for both signal and ASE,
|
|
e.g. 12.5GHz."""
|
|
|
|
ase = h * spectral_info.baud_rate * spectral_info.frequency * db2lin(self.nf) # W
|
|
return ase # in W at amplifier input
|
|
|
|
def _gain_profile(self, pin, err_tolerance=1.0e-11, simple_opt=True):
|
|
"""
|
|
Pin : input power / channel in W
|
|
|
|
:param gain_ripple: design flat gain
|
|
:param dgt: design gain tilt
|
|
:param Pin: total input power in W
|
|
:param gp: Average gain setpoint in dB units (provisioned gain)
|
|
:param gtp: gain tilt setting (provisioned tilt)
|
|
:type gain_ripple: numpy.ndarray
|
|
:type dgt: numpy.ndarray
|
|
:type Pin: numpy.ndarray
|
|
:type gp: float
|
|
:type gtp: float
|
|
:return: gain profile in dBm, per channel or spectral slice
|
|
:rtype: numpy.ndarray
|
|
|
|
Checking of output power clamping is implemented in interpol_params().
|
|
|
|
|
|
Based on:
|
|
|
|
R. di Muro, "The Er3+ fiber gain coefficient derived from a dynamic
|
|
gain tilt technique", Journal of Lightwave Technology, Vol. 18,
|
|
Iss. 3, Pp. 343-347, 2000.
|
|
|
|
Ported from Matlab version written by David Boerges at Ciena.
|
|
"""
|
|
|
|
# TODO|jla: check what param should be used (currently length(dgt))
|
|
if len(self.interpol_dgt) == 1:
|
|
return array([self.effective_gain])
|
|
|
|
# TODO|jla: find a way to use these or lose them. Primarily we should have
|
|
# a way to determine if exceeding the gain or output power of the amp
|
|
tot_in_power_db = self.pin_db # Pin in W
|
|
|
|
# linear fit to get the
|
|
p = polyfit(self.channel_freq, self.interpol_dgt, 1)
|
|
dgt_slope = p[0]
|
|
|
|
# Calculate the target slope defined per frequency on the amp band
|
|
targ_slope = -self.tilt_target / (self.params.f_max - self.params.f_min)
|
|
|
|
# first estimate of DGT scaling
|
|
dgts1 = targ_slope / dgt_slope if dgt_slope != 0. else 0.
|
|
|
|
# when simple_opt is true, make 2 attempts to compute gain and
|
|
# the internal voa value. This is currently here to provide direct
|
|
# comparison with original Matlab code. Will be removed.
|
|
# TODO|jla: replace with loop
|
|
|
|
if not simple_opt:
|
|
return
|
|
|
|
# first estimate of Er gain & VOA loss
|
|
g1st = array(self.interpol_gain_ripple) + self.params.gain_flatmax \
|
|
+ array(self.interpol_dgt) * dgts1
|
|
voa = lin2db(mean(db2lin(g1st))) - self.effective_gain
|
|
|
|
# second estimate of amp ch gain using the channel input profile
|
|
g2nd = g1st - voa
|
|
|
|
pout_db = lin2db(sum(pin * 1e3 * db2lin(g2nd)))
|
|
dgts2 = self.effective_gain - (pout_db - tot_in_power_db)
|
|
|
|
# center estimate of amp ch gain
|
|
xcent = dgts2
|
|
gcent = g1st - voa + array(self.interpol_dgt) * xcent
|
|
pout_db = lin2db(sum(pin * 1e3 * db2lin(gcent)))
|
|
gavg_cent = pout_db - tot_in_power_db
|
|
|
|
# Lower estimate of amp ch gain
|
|
deltax = max(g1st) - min(g1st)
|
|
# if no ripple deltax = 0 and xlow = xcent: div 0
|
|
# TODO|jla: add check for flat gain response
|
|
if abs(deltax) <= 0.05: # not enough ripple to consider calculation
|
|
return g1st - voa
|
|
|
|
xlow = dgts2 - deltax
|
|
glow = g1st - voa + array(self.interpol_dgt) * xlow
|
|
pout_db = lin2db(sum(pin * 1e3 * db2lin(glow)))
|
|
gavg_low = pout_db - tot_in_power_db
|
|
|
|
# upper gain estimate
|
|
xhigh = dgts2 + deltax
|
|
ghigh = g1st - voa + array(self.interpol_dgt) * xhigh
|
|
pout_db = lin2db(sum(pin * 1e3 * db2lin(ghigh)))
|
|
gavg_high = pout_db - tot_in_power_db
|
|
|
|
# compute slope
|
|
slope1 = (gavg_low - gavg_cent) / (xlow - xcent)
|
|
slope2 = (gavg_cent - gavg_high) / (xcent - xhigh)
|
|
|
|
if abs(self.effective_gain - gavg_cent) <= err_tolerance:
|
|
dgts3 = xcent
|
|
elif self.effective_gain < gavg_cent:
|
|
dgts3 = xcent - (gavg_cent - self.effective_gain) / slope1
|
|
else:
|
|
dgts3 = xcent + (-gavg_cent + self.effective_gain) / slope2
|
|
|
|
return g1st - voa + array(self.interpol_dgt) * dgts3
|
|
|
|
def propagate(self, spectral_info):
|
|
"""add ASE noise to the propagating carriers of :class:`.info.SpectralInformation`
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
"""
|
|
# interpolate the amplifier vectors with the carriers freq, calculate nf & gain profile
|
|
if self.in_voa is not None:
|
|
spectral_info.apply_attenuation_db(self.in_voa)
|
|
self.interpol_params(spectral_info)
|
|
|
|
ase = self.noise_profile(spectral_info)
|
|
spectral_info.ase += ase
|
|
|
|
spectral_info.apply_gain_db(self.gprofile - self.out_voa)
|
|
spectral_info.pmd = sqrt(spectral_info.pmd ** 2 + self.params.pmd ** 2)
|
|
spectral_info.pdl = sqrt(spectral_info.pdl ** 2 + self.params.pdl ** 2)
|
|
self.pch_out_dbm = watt2dbm(spectral_info.signal + spectral_info.nli + spectral_info.ase)
|
|
self.propagated_labels = spectral_info.label
|
|
|
|
def __call__(self, spectral_info):
|
|
"""Propagate through the amplifier.
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
|
|
:return SpectralInformation: The updated spectral information object.
|
|
"""
|
|
# filter out carriers outside the amplifier band
|
|
band = next(b for b in self.params.bands)
|
|
spectral_info = demuxed_spectral_information(spectral_info, band)
|
|
if spectral_info.carriers:
|
|
self.propagate(spectral_info)
|
|
return spectral_info
|
|
raise ValueError(f'Amp {self.uid} Defined propagation band does not match amplifiers band.')
|
|
|
|
|
|
class Multiband_amplifier(_Node):
|
|
"""Represents a multiband amplifier that manages multiple amplifiers across different frequency bands.
|
|
|
|
This class allows for the initialization and management of amplifiers, each associated with a specific
|
|
frequency band. It provides methods for signal propagation through the amplifiers and for exporting
|
|
to JSON format.
|
|
|
|
:param amplifiers: list of dictionaries, each containing parameters for setting an individual amplifier.
|
|
:type amplifiers: List[Dict]
|
|
:param params: dictionary of parameters for the multiband amplifier, which must include necessary
|
|
configuration settings.
|
|
:type params: Dict
|
|
:param args: Additional positional and keyword arguments passed to the parent class `_Node`.
|
|
:param kwargs: Additional positional and keyword arguments passed to the parent class `_Node`.
|
|
:ivar variety_list: A list of varieties associated with the amplifier.
|
|
:vartype variety_list: list
|
|
:ivar amplifiers: A dictionary mapping band names to their corresponding amplifier instances.
|
|
:vartype amplifiers: dict
|
|
:raises ParametersError: If there are conflicting amplifier definitions for the same frequency
|
|
band during initialization.
|
|
:raises ValueError: If the input spectral information does not match any defined amplifier bands
|
|
during propagation.
|
|
"""
|
|
# pylint: disable=C0103
|
|
# separate the top level type_variety from kwargs to avoid having multiple type_varieties on each element processing
|
|
def __init__(self, *args, amplifiers: List[dict], params: dict, **kwargs):
|
|
"""Constructor method
|
|
"""
|
|
self.variety_list = kwargs.pop('variety_list', None)
|
|
try:
|
|
super().__init__(params=MultiBandParams(**params), **kwargs)
|
|
except ParametersError as e:
|
|
raise ParametersError(f'{kwargs["uid"]}: {e}') from e
|
|
self.amplifiers = {}
|
|
if 'type_variety' in kwargs:
|
|
kwargs.pop('type_variety')
|
|
self.passive = False
|
|
for amp_dict in amplifiers:
|
|
# amplifiers dict uses default names as key to represent the band
|
|
amp = Edfa(**amp_dict, **kwargs)
|
|
band = next(b for b in amp.params.bands)
|
|
band_name = find_band_name(FrequencyBand(f_min=band["f_min"], f_max=band["f_max"]))
|
|
if band_name not in self.amplifiers and band not in self.params.bands:
|
|
self.params.bands.append(band)
|
|
self.amplifiers[band_name] = amp
|
|
elif band_name not in self.amplifiers and band in self.params.bands:
|
|
self.amplifiers[band_name] = amp
|
|
else:
|
|
raise ParametersError(f'{kwargs["uid"]}: has more than one amp defined for the same band')
|
|
|
|
def __call__(self, spectral_info: SpectralInformation):
|
|
"""propagates in each amp and returns the muxed spectrum
|
|
|
|
:param spectral_info: The spectral information object.
|
|
:type spectral_info: SpectralInformation
|
|
|
|
:return SpectralInformation: The updated spectral information object.
|
|
"""
|
|
out_si = []
|
|
for _, amp in self.amplifiers.items():
|
|
si = demuxed_spectral_information(spectral_info, amp.params.bands[0])
|
|
# if spectral_info frequencies are outside amp band, si is None
|
|
if si:
|
|
si = amp(si)
|
|
out_si.append(si)
|
|
if not out_si:
|
|
raise ValueError('Defined propagation band does not match amplifiers band.')
|
|
return muxed_spectral_information(out_si)
|
|
|
|
@property
|
|
def to_json(self):
|
|
"""Converts the MultibandAmplifier's state to a JSON-compatible dictionary.
|
|
|
|
:return Dict[str, Any]: JSON representation of the MultibandAmplifier.
|
|
"""
|
|
return {'uid': self.uid,
|
|
'type': type(self).__name__,
|
|
'type_variety': self.params.type_variety,
|
|
'amplifiers': [{
|
|
'type_variety': amp.params.type_variety,
|
|
'operational': {
|
|
'gain_target': round(amp.effective_gain, 6) if amp.effective_gain else None,
|
|
'delta_p': amp.delta_p,
|
|
'tilt_target': amp.tilt_target,
|
|
'out_voa': amp.out_voa
|
|
}} for amp in self.amplifiers.values()
|
|
],
|
|
'metadata': {
|
|
'location': self.metadata['location']._asdict()
|
|
}
|
|
}
|
|
|
|
def __repr__(self):
|
|
"""Returns a string representation of the MultibandAmplifier.
|
|
|
|
:return str: String representation of the MultibandAmplifier.
|
|
"""
|
|
return (f'{type(self).__name__}(uid={self.uid!r}, '
|
|
f'type_variety={self.type_variety!r}, ')
|
|
|
|
def __str__(self):
|
|
"""Returns a formatted string representation of the MultibandAmplifier.
|
|
|
|
:return str: Formatted string representation of the MultibandAmplifier.
|
|
"""
|
|
amp_str = [f'{type(self).__name__} {self.uid}',
|
|
f' type_variety: {self.type_variety}']
|
|
multi_str_data = []
|
|
max_width = 0
|
|
for amp in self.amplifiers.values():
|
|
lines = amp.__str__().split('\n')
|
|
# start at index 1 to remove uid from each amp list of strings
|
|
# records only if amp is used ie si has frequencies in amp) otherwise there is no other string than the uid
|
|
if len(lines) > 1:
|
|
max_width = max(max_width, max([len(line) for line in lines[1:]]))
|
|
multi_str_data.append(lines[1:])
|
|
# multi_str_data contains lines with each amp str, instead we want to print per column: transpose the string
|
|
transposed_data = list(map(list, zip(*multi_str_data)))
|
|
return '\n'.join(amp_str) + '\n' + nice_column_str(data=transposed_data, max_length=max_width + 2, padding=3)
|