Introducing new power per channel attribute

Just new definition that will be used in next changes.
At this point there is not any changes in behaviour
and tests are only changed to adapt the new signature
of the spectral information.

Change-Id: Ib3a0fd556681d34978312dc5e80f0e15096ef79c
This commit is contained in:
AndreaDAmico
2025-12-02 12:03:35 -05:00
parent e552b7e70b
commit e152f66546
6 changed files with 94 additions and 78 deletions

View File

@@ -1121,7 +1121,8 @@ class Fiber(_Node):
stimulated_raman_scattering = RamanSolver.calculate_stimulated_raman_scattering(spectral_info, self)
# NLI noise evaluated at the fiber input
spectral_info.nli += NliSolver.compute_nli(spectral_info, stimulated_raman_scattering, self)
nli = NliSolver.compute_nli(spectral_info, stimulated_raman_scattering, self)
spectral_info.add_nli(nli)
# chromatic dispersion and pmd variations
spectral_info.chromatic_dispersion += self.chromatic_dispersion(spectral_info.frequency)
@@ -1221,8 +1222,10 @@ class RamanFiber(Fiber):
RamanSolver.calculate_spontaneous_raman_scattering(spectral_info, stimulated_raman_scattering, self)
# nli and ase noise evaluated at the fiber input
spectral_info.nli += NliSolver.compute_nli(spectral_info, stimulated_raman_scattering, self)
spectral_info.ase += spontaneous_raman_scattering
nli = NliSolver.compute_nli(spectral_info, stimulated_raman_scattering, self)
spectral_info.add_nli(nli)
ase = spontaneous_raman_scattering
spectral_info.add_ase(ase)
# chromatic dispersion and pmd variations
spectral_info.chromatic_dispersion += self.chromatic_dispersion(spectral_info.frequency)
@@ -1675,7 +1678,7 @@ class Edfa(_Node):
self.interpol_params(spectral_info)
ase = self.noise_profile(spectral_info)
spectral_info.ase += ase
spectral_info.add_ase(ase)
spectral_info.apply_gain_db(self.gprofile - self.out_voa)
spectral_info.pmd = sqrt(spectral_info.pmd ** 2 + self.params.pmd ** 2)

View File

@@ -54,7 +54,8 @@ class SpectralInformation(object):
delta_pdb_per_channel: (per frequency) per channel delta power in dbm for the actual mix of channels"""
def __init__(self, frequency: array, baud_rate: array, slot_width: array, signal: array, nli: array, ase: array,
def __init__(self, frequency: array, baud_rate: array, slot_width: array, pch: array,
signal_ratio: array, ase_ratio: array, nli_ratio: array,
roll_off: array, chromatic_dispersion: array, pmd: array, pdl: array, latency: array,
delta_pdb_per_channel: array, tx_osnr: array, tx_power: array, label: array):
indices = argsort(frequency)
@@ -74,9 +75,10 @@ class SpectralInformation(object):
if any(exceed):
raise SpectrumError(f'Spectrum baud rate, including the roll off, larger than the slot width for channels: '
f'{[ch for ch in exceed * self._channel_number if ch]}.')
self._signal = signal[indices]
self._nli = nli[indices]
self._ase = ase[indices]
self._pch = pch[indices]
self._signal_ratio = signal_ratio[indices]
self._nli_ratio = nli_ratio[indices]
self._ase_ratio = ase_ratio[indices]
self._roll_off = roll_off[indices]
self._chromatic_dispersion = chromatic_dispersion[indices]
self._pmd = pmd[indices]
@@ -109,28 +111,54 @@ class SpectralInformation(object):
return self._number_of_channels
@property
def signal(self):
return self._signal
def pch(self):
return self._pch
@signal.setter
def signal(self, signal):
self._signal = signal
@property
def ptot(self):
return sum(self._pch)
@pch.setter
def pch(self, pch):
self._pch = pch
@property
def signal(self):
return self._signal_ratio * self._pch
@property
def nli(self):
return self._nli
return self._nli_ratio * self._pch
@nli.setter
def nli(self, nli):
self._nli = nli
def add_nli(self, nli):
pch = self.pch + nli
self._signal_ratio *= self.pch / pch
self._ase_ratio *= self.pch / pch
self._nli_ratio = (self._nli_ratio * self.pch + nli) / pch
self.pch = pch
@property
def ase(self):
return self._ase
return self._ase_ratio * self._pch
@ase.setter
def ase(self, ase):
self._ase = ase
def add_ase(self, ase):
pch = self.pch + ase
self._signal_ratio *= self.pch / pch
self._nli_ratio *= self.pch / pch
self._ase_ratio = (self._ase_ratio * self.pch + ase) / pch
self.pch = pch
@property
def snr_lin(self):
return self._signal_ratio / self._ase_ratio
@property
def snr_nli(self):
return self._signal_ratio / self._nli_ratio
@property
def gsnr(self):
return self._signal_ratio / (self._ase_ratio + self._nli_ratio)
@property
def roll_off(self):
@@ -207,18 +235,14 @@ class SpectralInformation(object):
return [Channel(*entry) for entry in entries]
def apply_attenuation_lin(self, attenuation_lin):
self.signal *= attenuation_lin
self.nli *= attenuation_lin
self.ase *= attenuation_lin
self.pch *= attenuation_lin
def apply_attenuation_db(self, attenuation_db):
attenuation_lin = 1 / db2lin(attenuation_db)
self.apply_attenuation_lin(attenuation_lin)
def apply_gain_lin(self, gain_lin):
self.signal *= gain_lin
self.nli *= gain_lin
self.ase *= gain_lin
self.pch *= gain_lin
def apply_gain_db(self, gain_db):
gain_lin = db2lin(gain_db)
@@ -228,8 +252,10 @@ class SpectralInformation(object):
try:
return SpectralInformation(frequency=append(self.frequency, other.frequency),
slot_width=append(self.slot_width, other.slot_width),
signal=append(self.signal, other.signal), nli=append(self.nli, other.nli),
ase=append(self.ase, other.ase),
pch=append(self.pch, other.pch),
signal_ratio=append(self._signal_ratio, other._signal_ratio),
nli_ratio=append(self._nli_ratio, other._nli_ratio),
ase_ratio=append(self._ase_ratio, other._ase_ratio),
baud_rate=append(self.baud_rate, other.baud_rate),
roll_off=append(self.roll_off, other.roll_off),
chromatic_dispersion=append(self.chromatic_dispersion,
@@ -245,19 +271,9 @@ class SpectralInformation(object):
except SpectrumError:
raise SpectrumError('Spectra cannot be summed: channels overlapping.')
def _replace(self, carriers):
self.chromatic_dispersion = array([c.chromatic_dispersion for c in carriers])
self.pmd = array([c.pmd for c in carriers])
self.pdl = array([c.pdl for c in carriers])
self.latency = array([c.latency for c in carriers])
self.signal = array([c.power.signal for c in carriers])
self.nli = array([c.power.nli for c in carriers])
self.ase = array([c.power.ase for c in carriers])
return self
def create_arbitrary_spectral_information(frequency: Union[ndarray, Iterable, float],
signal: Union[float, ndarray, Iterable],
pch: Union[float, ndarray, Iterable],
baud_rate: Union[float, ndarray, Iterable],
tx_osnr: Union[float, ndarray, Iterable],
tx_power: Union[float, ndarray, Iterable] = None,
@@ -274,7 +290,7 @@ def create_arbitrary_spectral_information(frequency: Union[ndarray, Iterable, fl
frequency = asarray(frequency)
number_of_channels = frequency.size
try:
signal = full(number_of_channels, signal)
pch = full(number_of_channels, pch)
baud_rate = full(number_of_channels, baud_rate)
roll_off = full(number_of_channels, roll_off)
slot_width = full(number_of_channels, slot_width) if slot_width is not None else \
@@ -283,14 +299,15 @@ def create_arbitrary_spectral_information(frequency: Union[ndarray, Iterable, fl
pmd = full(number_of_channels, pmd)
pdl = full(number_of_channels, pdl)
latency = full(number_of_channels, latency)
nli = zeros(number_of_channels)
ase = zeros(number_of_channels)
signal_ratio = ones(number_of_channels)
nli_ratio = zeros(number_of_channels)
ase_ratio = zeros(number_of_channels)
delta_pdb_per_channel = full(number_of_channels, delta_pdb_per_channel)
tx_osnr = full(number_of_channels, tx_osnr)
tx_power = full(number_of_channels, tx_power)
label = full(number_of_channels, label)
return SpectralInformation(frequency=frequency, slot_width=slot_width,
signal=signal, nli=nli, ase=ase,
return SpectralInformation(frequency=frequency, slot_width=slot_width, pch=pch,
signal_ratio=signal_ratio, nli_ratio=nli_ratio, ase_ratio=ase_ratio,
baud_rate=baud_rate, roll_off=roll_off,
chromatic_dispersion=chromatic_dispersion,
pmd=pmd, pdl=pdl, latency=latency,
@@ -311,7 +328,7 @@ def create_input_spectral_information(f_min, f_max, roll_off, baud_rate, spacing
frequency = [(f_min + spacing * i) for i in range(1, number_of_channels + 1)]
delta_pdb_per_channel = delta_pdb * ones(number_of_channels)
label = [f'{baud_rate * 1e-9 :.2f}G' for i in range(number_of_channels)]
return create_arbitrary_spectral_information(frequency, slot_width=spacing, signal=tx_power, baud_rate=baud_rate,
return create_arbitrary_spectral_information(frequency, slot_width=spacing, pch=tx_power, baud_rate=baud_rate,
roll_off=roll_off, delta_pdb_per_channel=delta_pdb_per_channel,
tx_osnr=tx_osnr, tx_power=tx_power, label=label)
@@ -321,10 +338,12 @@ def select_channels(spectrum: SpectralInformation, select: array) -> SpectralInf
select: boolean array of indices to keep
"""
return SpectralInformation(frequency=spectrum.frequency[select], baud_rate=spectrum.baud_rate[select],
slot_width=spectrum.slot_width[select], signal=spectrum.signal[select],
nli=spectrum.nli[select], ase=spectrum.ase[select], roll_off=spectrum.roll_off[select],
chromatic_dispersion=spectrum.chromatic_dispersion[select], pmd=spectrum.pmd[select],
pdl=spectrum.pdl[select], latency=spectrum.latency[select],
slot_width=spectrum.slot_width[select], pch=spectrum.pch[select],
signal_ratio=spectrum._signal_ratio[select], nli_ratio=spectrum._nli_ratio[select],
ase_ratio=spectrum._ase_ratio[select],
roll_off=spectrum.roll_off[select],
chromatic_dispersion=spectrum.chromatic_dispersion[select],
pmd=spectrum.pmd[select], pdl=spectrum.pdl[select], latency=spectrum.latency[select],
delta_pdb_per_channel=spectrum.delta_pdb_per_channel[select],
tx_osnr=spectrum.tx_osnr[select], tx_power=spectrum.tx_power[select],
label=spectrum.label[select])
@@ -369,7 +388,7 @@ def carriers_to_spectral_information(initial_spectrum: dict[float, Carrier],
:param power: power of the request
"""
frequency = list(initial_spectrum.keys())
signal = [c.tx_power for c in initial_spectrum.values()]
pch = [c.tx_power for c in initial_spectrum.values()]
roll_off = [c.roll_off for c in initial_spectrum.values()]
baud_rate = [c.baud_rate for c in initial_spectrum.values()]
delta_pdb_per_channel = [c.delta_pdb for c in initial_spectrum.values()]
@@ -377,7 +396,7 @@ def carriers_to_spectral_information(initial_spectrum: dict[float, Carrier],
tx_osnr = [c.tx_osnr for c in initial_spectrum.values()]
tx_power = [c.tx_power for c in initial_spectrum.values()]
label = [c.label for c in initial_spectrum.values()]
return create_arbitrary_spectral_information(frequency=frequency, signal=signal, baud_rate=baud_rate,
return create_arbitrary_spectral_information(frequency=frequency, pch=pch, baud_rate=baud_rate,
slot_width=slot_width, roll_off=roll_off,
delta_pdb_per_channel=delta_pdb_per_channel, tx_osnr=tx_osnr,
tx_power=tx_power, label=label)

View File

@@ -124,9 +124,7 @@ def si(nch_and_spacing, bw):
def test_variable_gain_nf(gain, nf_expected, setup_edfa_variable_gain, si):
"""=> unitary test for variable gain model Edfa._calc_nf() (and Edfa.interpol_params)"""
edfa = setup_edfa_variable_gain
si.signal /= db2lin(gain)
si.nli /= db2lin(gain)
si.ase /= db2lin(gain)
si.apply_attenuation_db(gain)
edfa.operational.gain_target = gain
edfa.effective_gain = gain
edfa.interpol_params(si)
@@ -138,9 +136,7 @@ def test_variable_gain_nf(gain, nf_expected, setup_edfa_variable_gain, si):
def test_fixed_gain_nf(gain, nf_expected, setup_edfa_fixed_gain, si):
"""=> unitary test for fixed gain model Edfa._calc_nf() (and Edfa.interpol_params)"""
edfa = setup_edfa_fixed_gain
si.signal /= db2lin(gain)
si.nli /= db2lin(gain)
si.ase /= db2lin(gain)
si.apply_attenuation_db(gain)
edfa.operational.gain_target = gain
edfa.effective_gain = gain
edfa.interpol_params(si)
@@ -162,9 +158,7 @@ def test_compare_nf_models(gain, setup_edfa_variable_gain, si):
between gain_min and gain_flatmax some discrepancy is expected but target < 0.5dB
=> unitary test for Edfa._calc_nf (and Edfa.interpol_params)"""
edfa = setup_edfa_variable_gain
si.signal /= db2lin(gain)
si.nli /= db2lin(gain)
si.ase /= db2lin(gain)
si.apply_attenuation_db(gain)
edfa.operational.gain_target = gain
edfa.effective_gain = gain
# edfa is variable gain type
@@ -358,9 +352,9 @@ def test_amp_saturation(delta_pdb_per_channel, base_power, delta_p):
frequency = 193e12 + array([0, 50e9, 150e9, 225e9, 275e9])
slot_width = array([37.5e9, 50e9, 75e9, 50e9, 37.5e9])
baud_rate = array([32e9, 42e9, 64e9, 42e9, 32e9])
signal = dbm2watt(array([-20.0, -18.0, -22.0, -25.0, -16.0]) + array(delta_pdb_per_channel) + base_power)
pch = dbm2watt(array([-20.0, -18.0, -22.0, -25.0, -16.0]) + array(delta_pdb_per_channel) + base_power)
si = create_arbitrary_spectral_information(frequency=frequency, slot_width=slot_width,
signal=signal, baud_rate=baud_rate, roll_off=0.15,
pch=pch, baud_rate=baud_rate, roll_off=0.15,
delta_pdb_per_channel=delta_pdb_per_channel,
tx_osnr=None, tx_power=None)
total_sig_powerin = sum(si.signal)

View File

@@ -88,9 +88,9 @@ def test_equalization_combination_degree(delta_pdb_per_channel, degree, equaliza
frequency = 191e12 + array([0, 50e9, 150e9, 225e9, 275e9])
slot_width = array([37.5e9, 50e9, 75e9, 50e9, 37.5e9])
baud_rate = array([32e9, 42e9, 64e9, 42e9, 32e9])
signal = dbm2watt(array([-20.0, -18.0, -22.0, -25.0, -16.0]))
pch = dbm2watt(array([-20.0, -18.0, -22.0, -25.0, -16.0]))
si = create_arbitrary_spectral_information(frequency=frequency, slot_width=slot_width,
signal=signal, baud_rate=baud_rate, roll_off=0.15,
pch=pch, baud_rate=baud_rate, roll_off=0.15,
delta_pdb_per_channel=delta_pdb_per_channel,
tx_osnr=None)
to_json_before_propagation = {
@@ -225,10 +225,10 @@ def test_low_input_power(target_out, delta_pdb_per_channel, correction):
frequency = 191e12 + array([0, 50e9, 150e9, 225e9, 275e9])
slot_width = array([37.5e9, 50e9, 75e9, 50e9, 37.5e9])
baud_rate = array([32e9, 42e9, 64e9, 42e9, 32e9])
signal = dbm2watt(array([-20.0, -18.0, -22.0, -25.0, -16.0]))
pch = dbm2watt(array([-20.0, -18.0, -22.0, -25.0, -16.0]))
target = target_out + array(delta_pdb_per_channel)
si = create_arbitrary_spectral_information(frequency=frequency, slot_width=slot_width,
signal=signal, baud_rate=baud_rate, roll_off=0.15,
pch=pch, baud_rate=baud_rate, roll_off=0.15,
delta_pdb_per_channel=delta_pdb_per_channel,
tx_osnr=None)
roadm_config = {
@@ -262,7 +262,7 @@ def test_low_input_power(target_out, delta_pdb_per_channel, correction):
si = roadm(si, degree='toto', from_degree='tata')
assert_allclose(watt2dbm(si.signal), target - correction, rtol=1e-5)
# in other words check that if target is below input power, target is applied else power is unchanged
assert_allclose((watt2dbm(signal) >= target) * target + (watt2dbm(signal) < target) * watt2dbm(signal),
assert_allclose((watt2dbm(pch) >= target) * target + (watt2dbm(pch) < target) * watt2dbm(pch),
watt2dbm(si.signal), rtol=1e-5)
@@ -280,10 +280,10 @@ def test_2low_input_power(target_out, delta_pdb_per_channel, correction):
frequency = 191e12 + array([0, 50e9, 150e9, 225e9, 275e9])
slot_width = array([37.5e9, 50e9, 75e9, 50e9, 37.5e9])
baud_rate = array([32e9, 42e9, 64e9, 42e9, 32e9])
signal = dbm2watt(array([-20.0, -18.0, -22.0, -25.0, -16.0]))
pch = dbm2watt(array([-20.0, -18.0, -22.0, -25.0, -16.0]))
target = psd2powerdbm(target_out, baud_rate) + array(delta_pdb_per_channel)
si = create_arbitrary_spectral_information(frequency=frequency, slot_width=slot_width,
signal=signal, baud_rate=baud_rate, roll_off=0.15,
pch=pch, baud_rate=baud_rate, roll_off=0.15,
delta_pdb_per_channel=delta_pdb_per_channel,
tx_osnr=None)
roadm_config = {

View File

@@ -19,7 +19,7 @@ from gnpy.core.exceptions import SpectrumError
def test_create_arbitrary_spectral_information():
si = create_arbitrary_spectral_information(frequency=[193.25e12, 193.3e12, 193.35e12],
baud_rate=32e9, signal=[1, 1, 1],
baud_rate=32e9, pch=[1, 1, 1],
delta_pdb_per_channel=[1, 1, 1],
tx_osnr=40.0, tx_power=[1, 1, 1])
assert_array_equal(si.baud_rate, array([32e9, 32e9, 32e9]))
@@ -41,25 +41,25 @@ def test_create_arbitrary_spectral_information():
si = create_arbitrary_spectral_information(frequency=array([193.35e12, 193.3e12, 193.25e12]),
slot_width=array([50e9, 50e9, 50e9]),
baud_rate=32e9, signal=array([1, 2, 3]),
baud_rate=32e9, pch=array([1, 2, 3]),
tx_osnr=40.0, tx_power=array([1, 2, 3]))
assert_array_equal(si.signal, array([3, 2, 1]))
with pytest.raises(SpectrumError, match='Spectrum baud rate, including the roll off, '
r'larger than the slot width for channels: \[1, 3\].'):
create_arbitrary_spectral_information(frequency=[193.25e12, 193.3e12, 193.35e12], signal=1,
create_arbitrary_spectral_information(frequency=[193.25e12, 193.3e12, 193.35e12], pch=1,
baud_rate=[64e9, 32e9, 64e9], slot_width=50e9,
tx_osnr=40.0, tx_power=1)
with pytest.raises(SpectrumError, match='Spectrum required slot widths larger than the frequency spectral '
r'distances between channels: \[\(1, 2\), \(3, 4\)\].'):
create_arbitrary_spectral_information(frequency=[193.26e12, 193.3e12, 193.35e12, 193.39e12], signal=1,
create_arbitrary_spectral_information(frequency=[193.26e12, 193.3e12, 193.35e12, 193.39e12], pch=1,
tx_osnr=40.0, baud_rate=32e9, slot_width=50e9, tx_power=1)
with pytest.raises(SpectrumError, match='Spectrum required slot widths larger than the frequency spectral '
r'distances between channels: \[\(1, 2\), \(2, 3\)\].'):
create_arbitrary_spectral_information(frequency=[193.25e12, 193.3e12, 193.35e12], signal=1, baud_rate=49e9,
create_arbitrary_spectral_information(frequency=[193.25e12, 193.3e12, 193.35e12], pch=1, baud_rate=49e9,
tx_osnr=40.0, roll_off=0.1, tx_power=1)
with pytest.raises(SpectrumError,
match='Dimension mismatch in input fields.'):
create_arbitrary_spectral_information(frequency=[193.25e12, 193.3e12, 193.35e12], signal=[1, 2], baud_rate=49e9,
create_arbitrary_spectral_information(frequency=[193.25e12, 193.3e12, 193.35e12], pch=[1, 2], baud_rate=49e9,
tx_osnr=40.0, tx_power=1)

View File

@@ -51,10 +51,10 @@ def test_fiber():
frequency = 191e12 + array([0, 50e9, 150e9, 225e9, 275e9])
slot_width = array([37.5e9, 50e9, 75e9, 50e9, 37.5e9])
baud_rate = array([32e9, 42e9, 64e9, 42e9, 32e9])
signal = 1e-3 + array([0, -1e-4, 3e-4, -2e-4, +2e-4])
pch = 1e-3 + array([0, -1e-4, 3e-4, -2e-4, +2e-4])
delta_pdb_per_channel = [0, 0, 0, 0, 0]
spectral_info_input = create_arbitrary_spectral_information(frequency=frequency, slot_width=slot_width,
signal=signal, baud_rate=baud_rate, roll_off=0.15,
pch=pch, baud_rate=baud_rate, roll_off=0.15,
delta_pdb_per_channel=delta_pdb_per_channel,
tx_osnr=40.0, tx_power=1e-3)
# propagation without Raman