mirror of
				https://github.com/Telecominfraproject/oopt-gnpy.git
				synced 2025-11-03 03:28:04 +00:00 
			
		
		
		
	This change enables to use a different tx_osnr per carrier. If tx_osnr is defined via spectrum then use it to define a tx_osnr per carrier in si else use the tx_osnr of request to set tx_osnr of si. Then, the propagate function for requests is changed to update OSNR with tx_OSNR per carrier defined in si. TODO: The tx_osnr defined in spectrum is not yet taken into account for the propagate_and_optimize function, because the loop that optimizes the choice for the mode only loops on baudrate. Signed-off-by: EstherLerouzic <esther.lerouzic@orange.com> Change-Id: I0fcdf559d4f1f8f0047faa257076084ec7adcc77
		
			
				
	
	
		
			358 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			358 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
"""
 | 
						|
gnpy.core.info
 | 
						|
==============
 | 
						|
 | 
						|
This module contains classes for modelling :class:`SpectralInformation`.
 | 
						|
"""
 | 
						|
 | 
						|
from __future__ import annotations
 | 
						|
from collections import namedtuple
 | 
						|
from collections.abc import Iterable
 | 
						|
from typing import Union
 | 
						|
from dataclasses import dataclass
 | 
						|
from numpy import argsort, mean, array, append, ones, ceil, any, zeros, outer, full, ndarray, asarray
 | 
						|
 | 
						|
from gnpy.core.utils import automatic_nch, db2lin, watt2dbm
 | 
						|
from gnpy.core.exceptions import SpectrumError
 | 
						|
 | 
						|
DEFAULT_SLOT_WIDTH_STEP = 12.5e9  # Hz
 | 
						|
"""Channels with unspecified slot width will have their slot width evaluated as the baud rate rounded up to the minimum
 | 
						|
multiple of the DEFAULT_SLOT_WIDTH_STEP (the baud rate is extended including the roll off in this evaluation)"""
 | 
						|
 | 
						|
 | 
						|
class Power(namedtuple('Power', 'signal nli ase')):
 | 
						|
    """carriers power in W"""
 | 
						|
 | 
						|
 | 
						|
class Channel(namedtuple('Channel',
 | 
						|
                         'channel_number frequency baud_rate slot_width roll_off power chromatic_dispersion pmd pdl')):
 | 
						|
    """ Class containing the parameters of a WDM signal.
 | 
						|
        :param channel_number: channel number in the WDM grid
 | 
						|
        :param frequency: central frequency of the signal (Hz)
 | 
						|
        :param baud_rate: the symbol rate of the signal (Baud)
 | 
						|
        :param slot_width: the slot width (Hz)
 | 
						|
        :param roll_off: the roll off of the signal. It is a pure number between 0 and 1
 | 
						|
        :param power (gnpy.core.info.Power): power of signal, ASE noise and NLI (W)
 | 
						|
        :param chromatic_dispersion: chromatic dispersion (s/m)
 | 
						|
        :param pmd: polarization mode dispersion (s)
 | 
						|
        :param pdl: polarization dependent loss (dB)
 | 
						|
    """
 | 
						|
 | 
						|
 | 
						|
class Pref(namedtuple('Pref', 'p_span0, p_spani')):
 | 
						|
    """noiseless reference power in dBm:
 | 
						|
    p_span0: inital target carrier power for a reference channel defined by user
 | 
						|
    p_spani: carrier power after element i for a reference channel defined by user
 | 
						|
    """
 | 
						|
 | 
						|
 | 
						|
class SpectralInformation(object):
 | 
						|
    """ Class containing the parameters of the entire WDM comb.
 | 
						|
    delta_pdb_per_channel: (per frequency) per channel delta power in dbm for the actual mix of channels"""
 | 
						|
 | 
						|
    def __init__(self, frequency: array, baud_rate: array, slot_width: array, signal: array, nli: array, ase: array,
 | 
						|
                 roll_off: array, chromatic_dispersion: array, pmd: array, pdl: array, delta_pdb_per_channel: array,
 | 
						|
                 tx_osnr: array, ref_power: Pref):
 | 
						|
        indices = argsort(frequency)
 | 
						|
        self._frequency = frequency[indices]
 | 
						|
        self._df = outer(ones(frequency.shape), frequency) - outer(frequency, ones(frequency.shape))
 | 
						|
        self._number_of_channels = len(self._frequency)
 | 
						|
        self._channel_number = [*range(1, self._number_of_channels + 1)]
 | 
						|
        self._slot_width = slot_width[indices]
 | 
						|
        self._baud_rate = baud_rate[indices]
 | 
						|
        overlap = self._frequency[:-1] + self._slot_width[:-1] / 2 > self._frequency[1:] - self._slot_width[1:] / 2
 | 
						|
        if any(overlap):
 | 
						|
            overlap = [pair for pair in zip(overlap * self._channel_number[:-1], overlap * self._channel_number[1:])
 | 
						|
                       if pair != (0, 0)]
 | 
						|
            raise SpectrumError(f'Spectrum required slot widths larger than the frequency spectral distances '
 | 
						|
                                f'between channels: {overlap}.')
 | 
						|
        exceed = self._baud_rate > self._slot_width
 | 
						|
        if any(exceed):
 | 
						|
            raise SpectrumError(f'Spectrum baud rate, including the roll off, larger than the slot width for channels: '
 | 
						|
                                f'{[ch for ch in exceed * self._channel_number if ch]}.')
 | 
						|
        self._signal = signal[indices]
 | 
						|
        self._nli = nli[indices]
 | 
						|
        self._ase = ase[indices]
 | 
						|
        self._roll_off = roll_off[indices]
 | 
						|
        self._chromatic_dispersion = chromatic_dispersion[indices]
 | 
						|
        self._pmd = pmd[indices]
 | 
						|
        self._pdl = pdl[indices]
 | 
						|
        self._delta_pdb_per_channel = delta_pdb_per_channel[indices]
 | 
						|
        self._tx_osnr = tx_osnr[indices]
 | 
						|
        self._pref = ref_power
 | 
						|
 | 
						|
    @property
 | 
						|
    def pref(self):
 | 
						|
        """Instance of gnpy.info.Pref"""
 | 
						|
        return self._pref
 | 
						|
 | 
						|
    @pref.setter
 | 
						|
    def pref(self, pref: Pref):
 | 
						|
        self._pref = pref
 | 
						|
 | 
						|
    @property
 | 
						|
    def frequency(self):
 | 
						|
        return self._frequency
 | 
						|
 | 
						|
    @property
 | 
						|
    def df(self):
 | 
						|
        """Matrix of relative frequency distances between all channels. Positive elements in the upper right side."""
 | 
						|
        return self._df
 | 
						|
 | 
						|
    @property
 | 
						|
    def slot_width(self):
 | 
						|
        return self._slot_width
 | 
						|
 | 
						|
    @property
 | 
						|
    def baud_rate(self):
 | 
						|
        return self._baud_rate
 | 
						|
 | 
						|
    @property
 | 
						|
    def number_of_channels(self):
 | 
						|
        return self._number_of_channels
 | 
						|
 | 
						|
    @property
 | 
						|
    def powers(self):
 | 
						|
        powers = zip(self.signal, self.nli, self.ase)
 | 
						|
        return [Power(*p) for p in powers]
 | 
						|
 | 
						|
    @property
 | 
						|
    def signal(self):
 | 
						|
        return self._signal
 | 
						|
 | 
						|
    @signal.setter
 | 
						|
    def signal(self, signal):
 | 
						|
        self._signal = signal
 | 
						|
 | 
						|
    @property
 | 
						|
    def nli(self):
 | 
						|
        return self._nli
 | 
						|
 | 
						|
    @nli.setter
 | 
						|
    def nli(self, nli):
 | 
						|
        self._nli = nli
 | 
						|
 | 
						|
    @property
 | 
						|
    def ase(self):
 | 
						|
        return self._ase
 | 
						|
 | 
						|
    @ase.setter
 | 
						|
    def ase(self, ase):
 | 
						|
        self._ase = ase
 | 
						|
 | 
						|
    @property
 | 
						|
    def roll_off(self):
 | 
						|
        return self._roll_off
 | 
						|
 | 
						|
    @property
 | 
						|
    def chromatic_dispersion(self):
 | 
						|
        return self._chromatic_dispersion
 | 
						|
 | 
						|
    @chromatic_dispersion.setter
 | 
						|
    def chromatic_dispersion(self, chromatic_dispersion):
 | 
						|
        self._chromatic_dispersion = chromatic_dispersion
 | 
						|
 | 
						|
    @property
 | 
						|
    def pmd(self):
 | 
						|
        return self._pmd
 | 
						|
 | 
						|
    @pmd.setter
 | 
						|
    def pmd(self, pmd):
 | 
						|
        self._pmd = pmd
 | 
						|
 | 
						|
    @property
 | 
						|
    def pdl(self):
 | 
						|
        return self._pdl
 | 
						|
 | 
						|
    @pdl.setter
 | 
						|
    def pdl(self, pdl):
 | 
						|
        self._pdl = pdl
 | 
						|
 | 
						|
    @property
 | 
						|
    def delta_pdb_per_channel(self):
 | 
						|
        return self._delta_pdb_per_channel
 | 
						|
 | 
						|
    @delta_pdb_per_channel.setter
 | 
						|
    def delta_pdb_per_channel(self, delta_pdb_per_channel):
 | 
						|
        self._delta_pdb_per_channel = delta_pdb_per_channel
 | 
						|
 | 
						|
    @property
 | 
						|
    def tx_osnr(self):
 | 
						|
        return self._tx_osnr
 | 
						|
 | 
						|
    @tx_osnr.setter
 | 
						|
    def tx_osnr(self, tx_osnr):
 | 
						|
        self._tx_osnr = tx_osnr
 | 
						|
 | 
						|
    @property
 | 
						|
    def channel_number(self):
 | 
						|
        return self._channel_number
 | 
						|
 | 
						|
    @property
 | 
						|
    def carriers(self):
 | 
						|
        entries = zip(self.channel_number, self.frequency, self.baud_rate, self.slot_width,
 | 
						|
                      self.roll_off, self.powers, self.chromatic_dispersion, self.pmd, self.pdl)
 | 
						|
        return [Channel(*entry) for entry in entries]
 | 
						|
 | 
						|
    def apply_attenuation_lin(self, attenuation_lin):
 | 
						|
        self.signal *= attenuation_lin
 | 
						|
        self.nli *= attenuation_lin
 | 
						|
        self.ase *= attenuation_lin
 | 
						|
 | 
						|
    def apply_attenuation_db(self, attenuation_db):
 | 
						|
        attenuation_lin = 1 / db2lin(attenuation_db)
 | 
						|
        self.apply_attenuation_lin(attenuation_lin)
 | 
						|
 | 
						|
    def apply_gain_lin(self, gain_lin):
 | 
						|
        self.signal *= gain_lin
 | 
						|
        self.nli *= gain_lin
 | 
						|
        self.ase *= gain_lin
 | 
						|
 | 
						|
    def apply_gain_db(self, gain_db):
 | 
						|
        gain_lin = db2lin(gain_db)
 | 
						|
        self.apply_gain_lin(gain_lin)
 | 
						|
 | 
						|
    def __add__(self, other: SpectralInformation):
 | 
						|
        try:
 | 
						|
            # Note that pref.p_spanx from "self" and "other" must be identical for a given simulation (correspond to the
 | 
						|
            # the simulation setup):
 | 
						|
            # - for a given simulation there is only one design (one p_span0),
 | 
						|
            # - and p_spani is the propagation result of p_span0 so there should not be different p_spani either.
 | 
						|
            if (self.pref.p_span0 != other.pref.p_span0) or (self.pref.p_spani != other.pref.p_spani):
 | 
						|
                raise SpectrumError('reference powers of the spectrum are not identical')
 | 
						|
            return SpectralInformation(frequency=append(self.frequency, other.frequency),
 | 
						|
                                       slot_width=append(self.slot_width, other.slot_width),
 | 
						|
                                       signal=append(self.signal, other.signal), nli=append(self.nli, other.nli),
 | 
						|
                                       ase=append(self.ase, other.ase),
 | 
						|
                                       baud_rate=append(self.baud_rate, other.baud_rate),
 | 
						|
                                       roll_off=append(self.roll_off, other.roll_off),
 | 
						|
                                       chromatic_dispersion=append(self.chromatic_dispersion,
 | 
						|
                                                                   other.chromatic_dispersion),
 | 
						|
                                       pmd=append(self.pmd, other.pmd),
 | 
						|
                                       pdl=append(self.pdl, other.pdl),
 | 
						|
                                       delta_pdb_per_channel=append(self.delta_pdb_per_channel,
 | 
						|
                                                                    other.delta_pdb_per_channel),
 | 
						|
                                       tx_osnr=append(self.tx_osnr, other.tx_osnr),
 | 
						|
                                       ref_power=Pref(self.pref.p_span0, self.pref.p_spani))
 | 
						|
        except SpectrumError:
 | 
						|
            raise SpectrumError('Spectra cannot be summed: channels overlapping.')
 | 
						|
 | 
						|
 | 
						|
    def _replace(self, carriers, pref):
 | 
						|
        self.chromatic_dispersion = array([c.chromatic_dispersion for c in carriers])
 | 
						|
        self.pmd = array([c.pmd for c in carriers])
 | 
						|
        self.pdl = array([c.pdl for c in carriers])
 | 
						|
        self.signal = array([c.power.signal for c in carriers])
 | 
						|
        self.nli = array([c.power.nli for c in carriers])
 | 
						|
        self.ase = array([c.power.ase for c in carriers])
 | 
						|
        self.pref = pref
 | 
						|
        return self
 | 
						|
 | 
						|
 | 
						|
def create_arbitrary_spectral_information(frequency: Union[ndarray, Iterable, int, float],
 | 
						|
                                          signal: Union[int, float, ndarray, Iterable],
 | 
						|
                                          baud_rate: Union[int, float, ndarray, Iterable],
 | 
						|
                                          tx_osnr: Union[int, float, ndarray, Iterable],
 | 
						|
                                          delta_pdb_per_channel: Union[int, float, ndarray, Iterable] = 0.,
 | 
						|
                                          slot_width: Union[int, float, ndarray, Iterable] = None,
 | 
						|
                                          roll_off: Union[int, float, ndarray, Iterable] = 0.,
 | 
						|
                                          chromatic_dispersion: Union[int, float, ndarray, Iterable] = 0.,
 | 
						|
                                          pmd: Union[int, float, ndarray, Iterable] = 0.,
 | 
						|
                                          pdl: Union[int, float, ndarray, Iterable] = 0.,
 | 
						|
                                          ref_power: Pref = None):
 | 
						|
    """This is just a wrapper around the SpectralInformation.__init__() that simplifies the creation of
 | 
						|
    a non-uniform spectral information with NLI and ASE powers set to zero."""
 | 
						|
    frequency = asarray(frequency)
 | 
						|
    number_of_channels = frequency.size
 | 
						|
    try:
 | 
						|
        signal = full(number_of_channels, signal)
 | 
						|
        baud_rate = full(number_of_channels, baud_rate)
 | 
						|
        roll_off = full(number_of_channels, roll_off)
 | 
						|
        slot_width = full(number_of_channels, slot_width) if slot_width is not None else \
 | 
						|
            ceil((1 + roll_off) * baud_rate / DEFAULT_SLOT_WIDTH_STEP) * DEFAULT_SLOT_WIDTH_STEP
 | 
						|
        chromatic_dispersion = full(number_of_channels, chromatic_dispersion)
 | 
						|
        pmd = full(number_of_channels, pmd)
 | 
						|
        pdl = full(number_of_channels, pdl)
 | 
						|
        nli = zeros(number_of_channels)
 | 
						|
        ase = zeros(number_of_channels)
 | 
						|
        delta_pdb_per_channel = full(number_of_channels, delta_pdb_per_channel)
 | 
						|
        tx_osnr = full(number_of_channels, tx_osnr)
 | 
						|
        return SpectralInformation(frequency=frequency, slot_width=slot_width,
 | 
						|
                                   signal=signal, nli=nli, ase=ase,
 | 
						|
                                   baud_rate=baud_rate, roll_off=roll_off,
 | 
						|
                                   chromatic_dispersion=chromatic_dispersion,
 | 
						|
                                   pmd=pmd, pdl=pdl,
 | 
						|
                                   delta_pdb_per_channel=delta_pdb_per_channel,
 | 
						|
                                   tx_osnr=tx_osnr,
 | 
						|
                                   ref_power=ref_power)
 | 
						|
    except ValueError as e:
 | 
						|
        if 'could not broadcast' in str(e):
 | 
						|
            raise SpectrumError('Dimension mismatch in input fields.')
 | 
						|
        else:
 | 
						|
            raise
 | 
						|
 | 
						|
 | 
						|
def create_input_spectral_information(f_min, f_max, roll_off, baud_rate, power, spacing, tx_osnr):
 | 
						|
    """ Creates a fixed slot width spectral information with flat power.
 | 
						|
    all arguments are scalar values"""
 | 
						|
    number_of_channels = automatic_nch(f_min, f_max, spacing)
 | 
						|
    frequency = [(f_min + spacing * i) for i in range(1, number_of_channels + 1)]
 | 
						|
    p_span0 = watt2dbm(power)
 | 
						|
    p_spani = watt2dbm(power)
 | 
						|
    delta_pdb_per_channel = zeros(number_of_channels)
 | 
						|
    return create_arbitrary_spectral_information(frequency, slot_width=spacing, signal=power, baud_rate=baud_rate,
 | 
						|
                                                 roll_off=roll_off, delta_pdb_per_channel=delta_pdb_per_channel,
 | 
						|
                                                 tx_osnr=tx_osnr, ref_power=Pref(p_span0=p_span0, p_spani=p_spani))
 | 
						|
 | 
						|
 | 
						|
def carriers_to_spectral_information(initial_spectrum: dict[Union[int, float], Carrier],
 | 
						|
                                     ref_carrier: ReferenceCarrier) -> SpectralInformation:
 | 
						|
    """Initial spectrum is a dict with key = carrier frequency, and value a Carrier object.
 | 
						|
    :param initial_spectrum: indexed by frequency in Hz, with power offset (delta_pdb), baudrate, slot width,
 | 
						|
    tx_osnr and roll off.
 | 
						|
    :param ref_carrier: reference carrier (baudrate and power) used for the reference channel
 | 
						|
    """
 | 
						|
    frequency = list(initial_spectrum.keys())
 | 
						|
    signal = [ref_carrier.req_power * db2lin(c.delta_pdb) for c in initial_spectrum.values()]
 | 
						|
    roll_off = [c.roll_off for c in initial_spectrum.values()]
 | 
						|
    baud_rate = [c.baud_rate for c in initial_spectrum.values()]
 | 
						|
    delta_pdb_per_channel = array([c.delta_pdb for c in initial_spectrum.values()])
 | 
						|
    slot_width = [c.slot_width for c in initial_spectrum.values()]
 | 
						|
    tx_osnr = [c.tx_osnr for c in initial_spectrum.values()]
 | 
						|
    p_span0 = watt2dbm(ref_carrier.req_power)
 | 
						|
    p_spani = watt2dbm(ref_carrier.req_power)
 | 
						|
    return create_arbitrary_spectral_information(frequency=frequency, signal=signal, baud_rate=baud_rate,
 | 
						|
                                                 slot_width=slot_width, roll_off=roll_off,
 | 
						|
                                                 delta_pdb_per_channel=delta_pdb_per_channel, tx_osnr=tx_osnr,
 | 
						|
                                                 ref_power=Pref(p_span0=p_span0, p_spani=p_spani))
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class Carrier:
 | 
						|
    """One channel in the initial mixed-type spectrum definition, each type being defined by
 | 
						|
    its delta_pdb (power offset with respect to reference power), baud rate, slot_width, roll_off
 | 
						|
    and tx_osnr. delta_pdb offset is applied to target power out of Roadm.
 | 
						|
    """
 | 
						|
    delta_pdb: float
 | 
						|
    baud_rate: float
 | 
						|
    slot_width: float
 | 
						|
    roll_off: float
 | 
						|
    tx_osnr: float
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class ReferenceCarrier:
 | 
						|
    """Reference channel is used during autodesign to determine target power
 | 
						|
    based on power spectral density values during propagation in ROADMs for equalization purpose.
 | 
						|
    It is also required to correctly compute the loss experienced by p_span_i in Roadm element.
 | 
						|
 | 
						|
    In typical scenarios, users would pick a 32 GBaud channel at 0dBm, which will
 | 
						|
    neatly lead to the same power spectral density for a 64 GBaud channel at 3 dBm.
 | 
						|
    Other attributes (like slot_width or roll-off) may be added there for future equalization purpose.
 | 
						|
    """
 | 
						|
    baud_rate: float
 | 
						|
    req_power: float
 |