From 7ac6e058ec44835fc70b095cf4ab2db7b7a4772d Mon Sep 17 00:00:00 2001 From: AndreaDAmico Date: Wed, 14 Apr 2021 20:30:06 +0200 Subject: [PATCH] EDFA new spectral information restructuring Change-Id: Ia30e0e9bd666e83394c7a0740b2117a2d9c9d485 --- gnpy/core/elements.py | 118 ++++++++++------------------------------ gnpy/core/info.py | 9 +++ gnpy/core/parameters.py | 44 +++++++++++++++ tests/test_amplifier.py | 54 ++++++++---------- 4 files changed, 105 insertions(+), 120 deletions(-) diff --git a/gnpy/core/elements.py b/gnpy/core/elements.py index b2edf21a..8a9f56c9 100644 --- a/gnpy/core/elements.py +++ b/gnpy/core/elements.py @@ -27,7 +27,7 @@ from scipy.interpolate import interp1d from collections import namedtuple from gnpy.core.utils import lin2db, db2lin, arrange_frequencies, snr_sum -from gnpy.core.parameters import RoadmParams, FusedParams, FiberParams, PumpParams +from gnpy.core.parameters import RoadmParams, FusedParams, FiberParams, PumpParams, EdfaParams, EdfaOperational from gnpy.core.science_utils import NliSolver, RamanSolver from gnpy.core.info import SpectralInformation from gnpy.core.exceptions import NetworkTopologyError, SpectrumError @@ -556,50 +556,6 @@ class RamanFiber(Fiber): spectral_info.apply_attenuation_db(attenuation_out_db) -class EdfaParams: - def __init__(self, **params): - self.update_params(params) - if params == {}: - self.type_variety = '' - self.type_def = '' - # self.gain_flatmax = 0 - # self.gain_min = 0 - # self.p_max = 0 - # self.nf_model = None - # self.nf_fit_coeff = None - # self.nf_ripple = None - # self.dgt = None - # self.gain_ripple = None - # self.out_voa_auto = False - # self.allowed_for_design = None - - def update_params(self, kwargs): - for k, v in kwargs.items(): - setattr(self, k, self.update_params(**v) if isinstance(v, dict) else v) - - -class EdfaOperational: - default_values = { - 'gain_target': None, - 'delta_p': None, - 'out_voa': None, - 'tilt_target': 0 - } - - def __init__(self, **operational): - self.update_attr(operational) - - def update_attr(self, kwargs): - clean_kwargs = {k: v for k, v in kwargs.items() if v != ''} - for k, v in self.default_values.items(): - setattr(self, k, clean_kwargs.get(k, v)) - - def __repr__(self): - return (f'{type(self).__name__}(' - f'gain_target={self.gain_target!r}, ' - f'tilt_target={self.tilt_target!r})') - - class Edfa(_Node): def __init__(self, *args, params=None, operational=None, **kwargs): if params is None: @@ -607,12 +563,7 @@ class Edfa(_Node): if operational is None: operational = {} self.variety_list = kwargs.pop('variety_list', None) - super().__init__( - *args, - params=EdfaParams(**params), - operational=EdfaOperational(**operational), - **kwargs - ) + super().__init__(*args, params=EdfaParams(**params), operational=EdfaOperational(**operational), **kwargs) self.interpol_dgt = None # interpolated dynamic gain tilt self.interpol_gain_ripple = None # gain ripple self.interpol_nf_ripple = None # nf_ripple @@ -678,22 +629,25 @@ class Edfa(_Node): f' effective pch (dBm): {self.effective_pch_out_db:.2f}', f' output VOA (dB): {self.out_voa:.2f}']) - def interpol_params(self, frequencies, pin, baud_rates, pref): + def interpol_params(self, spectral_info): """interpolate SI channel frequencies with the edfa dgt and gain_ripple frquencies from JSON + :param spectral_info: instance of gnpy.core.info.SpectralInformation + :return: None """ # TODO|jla: read amplifier actual frequencies from additional params in json - self.channel_freq = frequencies + self.channel_freq = spectral_info.frequency amplifier_freq = arrange_frequencies(len(self.params.dgt), self.params.f_min, self.params.f_max) # Hz - self.interpol_dgt = interp(self.channel_freq, amplifier_freq, self.params.dgt) + self.interpol_dgt = interp(spectral_info.frequency, amplifier_freq, self.params.dgt) amplifier_freq = arrange_frequencies(len(self.params.gain_ripple), self.params.f_min, self.params.f_max) # Hz - self.interpol_gain_ripple = interp(self.channel_freq, amplifier_freq, self.params.gain_ripple) + self.interpol_gain_ripple = interp(spectral_info.frequency, amplifier_freq, self.params.gain_ripple) amplifier_freq = arrange_frequencies(len(self.params.nf_ripple), self.params.f_min, self.params.f_max) # Hz - self.interpol_nf_ripple = interp(self.channel_freq, amplifier_freq, self.params.nf_ripple) + self.interpol_nf_ripple = interp(spectral_info.frequency, amplifier_freq, self.params.nf_ripple) - self.nch = frequencies.size + self.nch = spectral_info.number_of_channels + pin = spectral_info.signal + spectral_info.ase + spectral_info.nli self.pin_db = lin2db(sum(pin * 1e3)) # The following should be changed when we have the new spectral information including slot widths. # For now, with homogeneous spectrum, we can calculate it as the difference between neighbouring channels. @@ -701,6 +655,7 @@ class Edfa(_Node): """in power mode: delta_p is defined and can be used to calculate the power target This power target is used calculate the amplifier gain""" + pref = spectral_info.pref if self.delta_p is not None: self.target_pch_out_db = round(self.delta_p + pref.p_span0, 2) self.effective_gain = self.target_pch_out_db - pref.p_spani @@ -718,7 +673,7 @@ class Edfa(_Node): self.nf = self._calc_nf() self.gprofile = self._gain_profile(pin) - pout = (pin + self.noise_profile(baud_rates)) * db2lin(self.gprofile) + pout = (pin + self.noise_profile(spectral_info)) * db2lin(self.gprofile) self.pout_db = lin2db(sum(pout * 1e3)) # ase & nli are only calculated in signal bandwidth # pout_db is not the absolute full output power (negligible if sufficient channels) @@ -791,13 +746,8 @@ class Edfa(_Node): else: return self.interpol_nf_ripple + nf_avg # input VOA = 1 for 1 NF degradation - def noise_profile(self, df): - """noise_profile(bw) computes amplifier ASE (W) in signal bandwidth (Hz) - - Noise is calculated at amplifier input - - :bw: signal bandwidth = baud rate in Hz - :type bw: float + def noise_profile(self, spectral_info: SpectralInformation): + """Computes amplifier ASE noise integrated over the signal bandwidth. This is calculated at amplifier input. :return: the asepower in W in the signal bandwidth bw for 96 channels :return type: numpy array of float @@ -833,7 +783,7 @@ class Edfa(_Node): quoting power spectral density in the same BW for both signal and ASE, e.g. 12.5GHz.""" - ase = h * df * self.channel_freq * db2lin(self.nf) # W + ase = h * spectral_info.baud_rate * spectral_info.frequency * db2lin(self.nf) # W return ase # in W at amplifier input def _gain_profile(self, pin, err_tolerance=1.0e-11, simple_opt=True): @@ -939,32 +889,24 @@ class Edfa(_Node): return g1st - voa + array(self.interpol_dgt) * dgts3 - def propagate(self, pref, *carriers): + def propagate(self, spectral_info): """add ASE noise to the propagating carriers of :class:`.info.SpectralInformation`""" - pin = array([c.power.signal + c.power.nli + c.power.ase for c in carriers]) # pin in W - freq = array([c.frequency for c in carriers]) - brate = array([c.baud_rate for c in carriers]) # interpolate the amplifier vectors with the carriers freq, calculate nf & gain profile - self.interpol_params(freq, pin, brate, pref) + self.interpol_params(spectral_info) - gains = db2lin(self.gprofile) - carrier_ases = self.noise_profile(brate) - att = db2lin(self.out_voa) + ase = self.noise_profile(spectral_info) + spectral_info.ase += ase - for gain, carrier_ase, carrier in zip(gains, carrier_ases, carriers): - pwr = carrier.power - pwr = pwr._replace(signal=pwr.signal * gain / att, - nli=pwr.nli * gain / att, - ase=(pwr.ase + carrier_ase) * gain / att) - pmd = sqrt(carrier.pmd**2 + self.params.pmd**2) - pdl = sqrt(carrier.pdl**2 + self.params.pdl**2) - yield carrier._replace(power=pwr, pmd=pmd, pdl=pdl) + spectral_info.apply_gain_db(self.gprofile - self.out_voa) + spectral_info.pmd = sqrt(spectral_info.pmd ** 2 + self.params.pmd ** 2) + spectral_info.pdl = sqrt(spectral_info.pdl ** 2 + self.params.pdl ** 2) - def update_pref(self, pref): - return pref._replace(p_span0=pref.p_span0, - p_spani=pref.p_spani + self.effective_gain - self.out_voa) + def update_pref(self, spectral_info): + spectral_info.pref = \ + spectral_info.pref._replace(p_span0=spectral_info.pref.p_span0, + p_spani=spectral_info.pref.p_spani + self.effective_gain - self.out_voa) def __call__(self, spectral_info): - carriers = tuple(self.propagate(spectral_info.pref, *spectral_info.carriers)) - pref = self.update_pref(spectral_info.pref) - return spectral_info._replace(carriers=carriers, pref=pref) + self.propagate(spectral_info) + self.update_pref(spectral_info) + return spectral_info diff --git a/gnpy/core/info.py b/gnpy/core/info.py index b00e7923..f1a2a4ff 100644 --- a/gnpy/core/info.py +++ b/gnpy/core/info.py @@ -186,6 +186,15 @@ class SpectralInformation(object): attenuation_lin = 1 / db2lin(attenuation_db) self.apply_attenuation_lin(attenuation_lin) + def apply_gain_lin(self, gain_lin): + self.signal *= gain_lin + self.nli *= gain_lin + self.ase *= gain_lin + + def apply_gain_db(self, gain_db): + gain_lin = db2lin(gain_db) + self.apply_gain_lin(gain_lin) + def __add__(self, other: SpectralInformation): try: return SpectralInformation(frequency=append(self.frequency, other.frequency), diff --git a/gnpy/core/parameters.py b/gnpy/core/parameters.py index cc97727e..82c74ef8 100644 --- a/gnpy/core/parameters.py +++ b/gnpy/core/parameters.py @@ -268,3 +268,47 @@ class FiberParams(Parameters): if not self.raman_efficiency: dictionary.pop('raman_efficiency') return dictionary + + +class EdfaParams: + def __init__(self, **params): + self.update_params(params) + if params == {}: + self.type_variety = '' + self.type_def = '' + # self.gain_flatmax = 0 + # self.gain_min = 0 + # self.p_max = 0 + # self.nf_model = None + # self.nf_fit_coeff = None + # self.nf_ripple = None + # self.dgt = None + # self.gain_ripple = None + # self.out_voa_auto = False + # self.allowed_for_design = None + + def update_params(self, kwargs): + for k, v in kwargs.items(): + setattr(self, k, self.update_params(**v) if isinstance(v, dict) else v) + + +class EdfaOperational: + default_values = { + 'gain_target': None, + 'delta_p': None, + 'out_voa': None, + 'tilt_target': 0 + } + + def __init__(self, **operational): + self.update_attr(operational) + + def update_attr(self, kwargs): + clean_kwargs = {k: v for k, v in kwargs.items() if v != ''} + for k, v in self.default_values.items(): + setattr(self, k, clean_kwargs.get(k, v)) + + def __repr__(self): + return (f'{type(self).__name__}(' + f'gain_target={self.gain_target!r}, ' + f'tilt_target={self.tilt_target!r})') diff --git a/tests/test_amplifier.py b/tests/test_amplifier.py index 0c501805..19ec432e 100644 --- a/tests/test_amplifier.py +++ b/tests/test_amplifier.py @@ -80,13 +80,12 @@ def si(nch_and_spacing, bw): def test_variable_gain_nf(gain, nf_expected, setup_edfa_variable_gain, si): """=> unitary test for variable gain model Edfa._calc_nf() (and Edfa.interpol_params)""" edfa = setup_edfa_variable_gain - frequencies = array([c.frequency for c in si.carriers]) - pin = array([c.power.signal + c.power.nli + c.power.ase for c in si.carriers]) - pin = pin / db2lin(gain) - baud_rates = array([c.baud_rate for c in si.carriers]) + si.signal /= db2lin(gain) + si.nli /= db2lin(gain) + si.ase /= db2lin(gain) edfa.operational.gain_target = gain - pref = Pref(0, -gain, lin2db(len(frequencies))) - edfa.interpol_params(frequencies, pin, baud_rates, pref) + si.pref = si.pref._replace(p_span0=0, p_spani=-gain, neq_ch=lin2db(si.number_of_channels)) + edfa.interpol_params(si) result = edfa.nf assert pytest.approx(nf_expected, abs=0.01) == result[0] @@ -95,23 +94,20 @@ def test_variable_gain_nf(gain, nf_expected, setup_edfa_variable_gain, si): def test_fixed_gain_nf(gain, nf_expected, setup_edfa_fixed_gain, si): """=> unitary test for fixed gain model Edfa._calc_nf() (and Edfa.interpol_params)""" edfa = setup_edfa_fixed_gain - frequencies = array([c.frequency for c in si.carriers]) - pin = array([c.power.signal + c.power.nli + c.power.ase for c in si.carriers]) - pin = pin / db2lin(gain) - baud_rates = array([c.baud_rate for c in si.carriers]) + si.signal /= db2lin(gain) + si.nli /= db2lin(gain) + si.ase /= db2lin(gain) edfa.operational.gain_target = gain - pref = Pref(0, -gain, lin2db(len(frequencies))) - edfa.interpol_params(frequencies, pin, baud_rates, pref) - + si.pref = si.pref._replace(p_span0=0, p_spani=-gain, neq_ch=lin2db(si.number_of_channels)) + edfa.interpol_params(si) assert pytest.approx(nf_expected, abs=0.01) == edfa.nf[0] def test_si(si, nch_and_spacing): """basic total power check of the channel comb generation""" nb_channel = nch_and_spacing[0] - pin = array([c.power.signal + c.power.nli + c.power.ase for c in si.carriers]) - p_tot = pin.sum() - expected_p_tot = si.carriers[0].power.signal * nb_channel + p_tot = sum(si.signal + si.ase + si.nli) + expected_p_tot = si.signal[0] * nb_channel assert pytest.approx(expected_p_tot, abs=0.01) == p_tot @@ -122,14 +118,13 @@ def test_compare_nf_models(gain, setup_edfa_variable_gain, si): between gain_min and gain_flatmax some discrepancy is expected but target < 0.5dB => unitary test for Edfa._calc_nf (and Edfa.interpol_params)""" edfa = setup_edfa_variable_gain - frequencies = array([c.frequency for c in si.carriers]) - pin = array([c.power.signal + c.power.nli + c.power.ase for c in si.carriers]) - pin = pin / db2lin(gain) - baud_rates = array([c.baud_rate for c in si.carriers]) + si.signal /= db2lin(gain) + si.nli /= db2lin(gain) + si.ase /= db2lin(gain) edfa.operational.gain_target = gain # edfa is variable gain type - pref = Pref(0, -gain, lin2db(len(frequencies))) - edfa.interpol_params(frequencies, pin, baud_rates, pref) + si.pref = si.pref._replace(p_span0=0, p_spani=-gain, neq_ch=lin2db(si.number_of_channels)) + edfa.interpol_params(si) nf_model = edfa.nf[0] # change edfa type variety to a polynomial @@ -155,7 +150,7 @@ def test_compare_nf_models(gain, setup_edfa_variable_gain, si): edfa = Edfa(**el_config) # edfa is variable gain type - edfa.interpol_params(frequencies, pin, baud_rates, pref) + edfa.interpol_params(si) nf_poly = edfa.nf[0] print(nf_poly, nf_model) assert pytest.approx(nf_model, abs=0.5) == nf_poly @@ -183,21 +178,16 @@ def test_ase_noise(gain, si, setup_trx, bw): si = span(si) print(span) - frequencies = array([c.frequency for c in si.carriers]) - pin = array([c.power.signal + c.power.nli + c.power.ase for c in si.carriers]) - baud_rates = array([c.baud_rate for c in si.carriers]) - pref = Pref(0, -gain, lin2db(len(frequencies))) - edfa.interpol_params(frequencies, pin, baud_rates, pref) + si.pref = si.pref._replace(p_span0=0, p_spani=-gain, neq_ch=lin2db(si.number_of_channels)) + edfa.interpol_params(si) nf = edfa.nf print('nf', nf) - pin = lin2db(pin[0] * 1e3) + pin = lin2db((si.signal[0] + si.ase[0] + si.nli[0]) * 1e3) osnr_expected = pin - nf[0] + 58 si = edfa(si) print(edfa) - pout = array([c.power.signal for c in si.carriers]) - pase = array([c.power.ase for c in si.carriers]) - osnr = lin2db(pout[0] / pase[0]) - lin2db(12.5e9 / bw) + osnr = lin2db(si.signal[0] / si.ase[0]) - lin2db(12.5e9 / bw) assert pytest.approx(osnr_expected, abs=0.01) == osnr trx = setup_trx