mirror of
				https://github.com/Telecominfraproject/oopt-gnpy.git
				synced 2025-10-30 17:47:50 +00:00 
			
		
		
		
	 7a1b15a916
			
		
	
	7a1b15a916
	
	
	
		
			
			Signed-off-by: EstherLerouzic <esther.lerouzic@orange.com> Change-Id: Ifdd6a566fda74c5b7d417f9d61c51d4d3da07bfd
		
			
				
	
	
		
			736 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			736 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| # SPDX-License-Identifier: BSD-3-Clause
 | |
| # gnpy.core.science_utils: Solver definitions to calculate the Raman effect and the nonlinear interference noise
 | |
| # Copyright (C) 2025 Telecom Infra Project and GNPy contributors
 | |
| # see AUTHORS.rst for a list of contributors
 | |
| 
 | |
| """
 | |
| gnpy.core.science_utils
 | |
| =======================
 | |
| 
 | |
| Solver definitions to calculate the Raman effect and the nonlinear interference noise
 | |
| 
 | |
| The solvers take as input instances of the spectral information, the fiber and the simulation parameters
 | |
| """
 | |
| 
 | |
| from numpy import interp, pi, zeros, cos, array, append, ones, exp, arange, sqrt, trapz, arcsinh, clip, abs, sum, \
 | |
|     concatenate, flip, outer, inner, transpose, max, format_float_scientific, diag, sort, unique, argsort, cumprod, \
 | |
|     polyfit, log, reshape, swapaxes, full, nan, cumsum
 | |
| from logging import getLogger
 | |
| from scipy.constants import k, h
 | |
| from scipy.interpolate import interp1d
 | |
| from math import isclose, factorial
 | |
| 
 | |
| from gnpy.core.utils import db2lin, lin2db
 | |
| from gnpy.core.exceptions import EquipmentConfigError, ParametersError
 | |
| from gnpy.core.parameters import SimParams
 | |
| from gnpy.core.info import SpectralInformation
 | |
| 
 | |
| logger = getLogger(__name__)
 | |
| sim_params = SimParams()
 | |
| 
 | |
| 
 | |
| def raised_cosine(frequency, channel_frequency, channel_baud_rate, channel_roll_off):
 | |
|     """Returns a unitary raised cosine profile for the given parame
 | |
| 
 | |
|     :param frequency: numpy array of frequencies in Hz for the resulting raised cosine
 | |
|     :param channel_frequency: channel frequencies in Hz
 | |
|     :param channel_baud_rate: channel baud rate in Hz
 | |
|     :param channel_roll_off: channel roll off
 | |
|     """
 | |
|     raised_cosine_mask = zeros(frequency.size)
 | |
|     base_frequency = frequency - channel_frequency
 | |
|     ts = 1 / channel_baud_rate
 | |
|     pass_band = (1 - channel_roll_off) * channel_baud_rate / 2
 | |
|     stop_band = (1 + channel_roll_off) * channel_baud_rate / 2
 | |
| 
 | |
|     flat_condition = (abs(base_frequency) <= pass_band) == 1
 | |
|     cosine_condition = (pass_band < abs(base_frequency)) * (abs(base_frequency) < stop_band) == 1
 | |
| 
 | |
|     raised_cosine_mask[flat_condition] = 1
 | |
|     raised_cosine_mask[cosine_condition] = \
 | |
|         0.5 * (1 + cos(pi * ts / channel_roll_off * (abs(base_frequency[cosine_condition]) - pass_band)))
 | |
|     return raised_cosine_mask
 | |
| 
 | |
| 
 | |
| class StimulatedRamanScattering:
 | |
|     def __init__(self, power_profile, loss_profile, frequency, z):
 | |
|         """
 | |
|         :params power_profile: power profile matrix along frequency and z [W]
 | |
|         :params loss_profile: power profile matrix along frequency and z [linear units]
 | |
|         :params frequency: channels frequencies array [Hz]
 | |
|         :params z: positions array [m]
 | |
|         """
 | |
|         self.power_profile = power_profile
 | |
|         self.loss_profile = loss_profile
 | |
|         # Field loss profile matrix along frequency and z
 | |
|         self.rho = sqrt(loss_profile)
 | |
|         self.frequency = frequency
 | |
|         self.z = z
 | |
| 
 | |
| 
 | |
| class RamanSolver:
 | |
|     """This class contains the methods to calculate the Raman scattering effect."""
 | |
| 
 | |
|     @staticmethod
 | |
|     def _create_lumped_losses(z, lumped_losses, z_lumped_losses):
 | |
|         lumped_losses = concatenate((lumped_losses, ones(z.size)))
 | |
|         z, unique_indices = unique(concatenate((z_lumped_losses, z)), return_index=True)
 | |
|         order = argsort(z)
 | |
|         lumped_losses = (lumped_losses[unique_indices])[order]
 | |
|         z = z[order]
 | |
|         return z, lumped_losses
 | |
| 
 | |
|     @staticmethod
 | |
|     def calculate_attenuation_profile(spectral_info: SpectralInformation, fiber):
 | |
|         """Evaluates the attenuation profile along the z axis for all the frequency propagating in the
 | |
|         fiber without considering the stimulated Raman scattering.
 | |
|         """
 | |
|         # z array definition
 | |
|         z = array([0, fiber.params.length])
 | |
| 
 | |
|         # Lumped losses array definition
 | |
|         z, lumped_losses = RamanSolver._create_lumped_losses(z, fiber.lumped_losses, fiber.z_lumped_losses)
 | |
| 
 | |
|         lumped_loss_acc = cumprod(lumped_losses)
 | |
|         frequency = spectral_info.frequency
 | |
|         alpha = fiber.alpha(frequency)
 | |
|         loss_profile = exp(- outer(alpha, z)) * lumped_loss_acc
 | |
|         power_profile = outer(spectral_info.signal, ones(z.size)) * loss_profile
 | |
|         return StimulatedRamanScattering(power_profile, loss_profile, spectral_info.frequency, z)
 | |
| 
 | |
|     @staticmethod
 | |
|     def calculate_stimulated_raman_scattering(spectral_info: SpectralInformation, fiber):
 | |
|         """Evaluates the Raman profile along the z axis for all the frequency propagated in the fiber
 | |
|         including the Raman pumps co- and counter-propagating
 | |
|         """
 | |
|         logger.debug('Start computing fiber Stimulated Raman Scattering')
 | |
| 
 | |
|         if sim_params.raman_params.flag:
 | |
|             # Raman parameters
 | |
|             z_resolution = sim_params.raman_params.result_spatial_resolution
 | |
|             z_step = sim_params.raman_params.solver_spatial_resolution
 | |
|             z = append(arange(0, fiber.params.length, z_step), fiber.params.length)
 | |
|             z_final = append(arange(0, fiber.params.length, z_resolution), fiber.params.length)
 | |
|             z_final = sort(unique(concatenate((fiber.z_lumped_losses, z_final))))
 | |
| 
 | |
|             # Lumped losses array definition
 | |
|             z, lumped_losses = RamanSolver._create_lumped_losses(z, fiber.lumped_losses, fiber.z_lumped_losses)
 | |
| 
 | |
|             if hasattr(fiber, 'raman_pumps'):
 | |
|                 # TODO: verify co-propagating pumps computation and in general unsorted frequency
 | |
|                 # Co-propagating spectrum definition
 | |
|                 co_raman_pump_power = array([pump.power for pump in fiber.raman_pumps
 | |
|                                              if pump.propagation_direction == 'coprop'])
 | |
|                 co_raman_pump_frequency = array([pump.frequency for pump in fiber.raman_pumps
 | |
|                                                  if pump.propagation_direction == 'coprop'])
 | |
| 
 | |
|                 co_power = concatenate((spectral_info.signal, co_raman_pump_power))
 | |
|                 co_frequency = concatenate((spectral_info.frequency, co_raman_pump_frequency))
 | |
| 
 | |
|                 # Counter-propagating spectrum definition
 | |
|                 cnt_power = array([pump.power for pump in fiber.raman_pumps
 | |
|                                    if pump.propagation_direction == 'counterprop'])
 | |
|                 cnt_frequency = array([pump.frequency for pump in fiber.raman_pumps
 | |
|                                        if pump.propagation_direction == 'counterprop'])
 | |
|                 # Co-propagating profile initialization
 | |
|                 co_power_profile = zeros([co_frequency.size, z.size])
 | |
|                 if co_frequency.size:
 | |
|                     co_cr = fiber.cr(co_frequency)
 | |
|                     co_alpha = fiber.alpha(co_frequency)
 | |
|                     co_power_profile = \
 | |
|                         RamanSolver.calculate_unidirectional_stimulated_raman_scattering(co_power, co_alpha, co_cr, z,
 | |
|                                                                                          lumped_losses)
 | |
|                 # Counter-propagating profile initialization
 | |
|                 cnt_power_profile = zeros([cnt_frequency.size, z.size])
 | |
|                 if cnt_frequency.size:
 | |
|                     cnt_cr = fiber.cr(cnt_frequency)
 | |
|                     cnt_alpha = fiber.alpha(cnt_frequency)
 | |
|                     cnt_power_profile = flip(
 | |
|                         RamanSolver.calculate_unidirectional_stimulated_raman_scattering(cnt_power, cnt_alpha, cnt_cr,
 | |
|                                                                                          z[-1] - flip(z),
 | |
|                                                                                          flip(lumped_losses)), axis=1)
 | |
|                 # Co-propagating and Counter-propagating Profile Computation
 | |
|                 if co_frequency.size and cnt_frequency.size:
 | |
|                     co_power_profile, cnt_power_profile = \
 | |
|                         RamanSolver.iterative_algorithm(co_power_profile, cnt_power_profile,
 | |
|                                                         co_frequency, cnt_frequency, z, fiber, lumped_losses)
 | |
|                 # Complete Power Profile
 | |
|                 power_profile = concatenate((co_power_profile, cnt_power_profile), axis=0)
 | |
|                 # Complete Loss Profile
 | |
|                 co_loss_profile = co_power_profile / outer(co_power, ones(z.size))
 | |
|                 cnt_loss_profile = cnt_power_profile / outer(cnt_power, ones(z.size))
 | |
|                 loss_profile = concatenate((co_loss_profile, cnt_loss_profile), axis=0)
 | |
|                 # Complete frequency
 | |
|                 frequency = concatenate((co_frequency, cnt_frequency))
 | |
|             else:
 | |
|                 # Without Raman pumps
 | |
|                 alpha = fiber.alpha(spectral_info.frequency)
 | |
|                 cr = fiber.cr(spectral_info.frequency)
 | |
|                 # Power profile
 | |
|                 power_profile = (
 | |
|                     RamanSolver.calculate_unidirectional_stimulated_raman_scattering(spectral_info.signal, alpha, cr, z,
 | |
|                                                                                      lumped_losses))
 | |
|                 # Loss profile
 | |
|                 loss_profile = power_profile / outer(spectral_info.signal, ones(z.size))
 | |
|                 frequency = spectral_info.frequency
 | |
|             power_profile = interp1d(z, power_profile, axis=1)(z_final)
 | |
|             loss_profile = interp1d(z, loss_profile, axis=1)(z_final)
 | |
|             stimulated_raman_scattering = StimulatedRamanScattering(power_profile, loss_profile, frequency, z_final)
 | |
|         else:
 | |
|             stimulated_raman_scattering = \
 | |
|                 RamanSolver.calculate_attenuation_profile(spectral_info, fiber)
 | |
|         return stimulated_raman_scattering
 | |
| 
 | |
|     @staticmethod
 | |
|     def calculate_spontaneous_raman_scattering(spectral_info: SpectralInformation, srs: StimulatedRamanScattering,
 | |
|                                                fiber):
 | |
|         """Evaluates the Raman profile along the z axis for all the frequency propagated in the fiber
 | |
|         including the Raman pumps co- and counter-propagating.
 | |
|         """
 | |
|         logger.debug('Start computing fiber Spontaneous Raman Scattering')
 | |
|         z = srs.z
 | |
|         baud_rate = spectral_info.baud_rate
 | |
|         frequency = spectral_info.frequency
 | |
|         channels_loss = srs.loss_profile[:spectral_info.number_of_channels, :]
 | |
| 
 | |
|         # calculate ase power
 | |
|         ase = zeros(spectral_info.number_of_channels)
 | |
|         cr = fiber.cr(srs.frequency)[:spectral_info.number_of_channels, spectral_info.number_of_channels:]
 | |
|         for i, pump in enumerate(fiber.raman_pumps):
 | |
|             pump_power = srs.power_profile[spectral_info.number_of_channels + i, :]
 | |
|             df = pump.frequency - frequency
 | |
|             eta = - 1 / (1 - exp(h * df / (k * fiber.temperature)))
 | |
|             integral = trapz(pump_power / channels_loss, z, axis=1)
 | |
|             ase += 2 * h * baud_rate * frequency * (1 + eta) * cr[:, i] * (df > 0) * integral  # 2 factor for double pol
 | |
|         return ase
 | |
| 
 | |
|     @staticmethod
 | |
|     def calculate_unidirectional_stimulated_raman_scattering(power_in, alpha, cr, z, lumped_losses):
 | |
|         """Solves the Raman equation
 | |
| 
 | |
|         :param power_in: launch power array
 | |
|         :param alpha: loss coefficient array
 | |
|         :param cr: Raman efficiency coefficients matrix
 | |
|         :param z: z position array
 | |
|         :param lumped_losses: concentrated losses array along the fiber span
 | |
|         :return: power profile matrix
 | |
|         """
 | |
|         if sim_params.raman_params.method == 'perturbative':
 | |
|             if sim_params.raman_params.order > 4:
 | |
|                 raise ValueError(f'Order {sim_params.raman_params.order} not implemented in Raman Solver.')
 | |
|             z_lumped_losses = append(z[lumped_losses != 1], z[-1])
 | |
|             llumped_losses = append(1, lumped_losses[lumped_losses != 1])
 | |
|             power = outer(power_in, ones(z.size))
 | |
|             last_position = 0
 | |
|             z_indices = arange(0, z.size)
 | |
| 
 | |
|             for z_lumped_loss, lumped_loss in zip(z_lumped_losses, llumped_losses):
 | |
|                 if last_position < z[-1]:
 | |
|                     interval = z_indices[(z >= last_position) * (z <= z_lumped_loss) == 1]
 | |
|                     z_interval = z[interval] - last_position
 | |
|                     dz = z_interval[1:] - z_interval[:-1]
 | |
|                     last_position = z[interval][-1]
 | |
|                     p0 = power_in * lumped_loss
 | |
|                     power_interval = outer(p0, ones(z_interval.size))
 | |
|                     alphaz = outer(alpha, z_interval)
 | |
|                     expz = exp(- alphaz)
 | |
|                     eff_length = 1 / outer(alpha, ones(z_interval.size)) * (1 - expz)
 | |
|                     crpz = transpose(ones([z_interval.size, cr.shape[0], cr.shape[1]]) * cr * p0, (1, 2, 0))
 | |
|                     exponent = - alphaz
 | |
|                     if sim_params.raman_params.order >= 1:
 | |
|                         gamma1 = sum(crpz * eff_length, 1)
 | |
|                         exponent += gamma1
 | |
|                     if sim_params.raman_params.order >= 2:
 | |
|                         z_integrand = expz * gamma1
 | |
|                         z_integral = cumsum((z_integrand[:, :-1] + z_integrand[:, 1:]) / 2 * dz, 1)
 | |
|                         gamma2 = zeros(gamma1.shape)
 | |
|                         gamma2[:, 1:] = sum(crpz[:, :, 1:] * z_integral, 1)
 | |
|                         exponent += gamma2
 | |
|                     if sim_params.raman_params.order >= 3:
 | |
|                         z_integrand = expz * (gamma2 + 1/2 * gamma1**2)
 | |
|                         z_integral = cumsum((z_integrand[:, :-1] + z_integrand[:, 1:]) / 2 * dz, 1)
 | |
|                         gamma3 = zeros(gamma1.shape)
 | |
|                         gamma3[:, 1:] = sum(crpz[:, :, 1:] * z_integral, 1)
 | |
|                         exponent += gamma3
 | |
|                     if sim_params.raman_params.order >= 4:
 | |
|                         z_integrand = expz * (gamma3 + gamma1 * gamma2 + 1/factorial(3) * gamma1**3)
 | |
|                         z_integral = cumsum((z_integrand[:, :-1] + z_integrand[:, 1:]) / 2 * dz, 1)
 | |
|                         gamma4 = zeros(gamma1.shape)
 | |
|                         gamma4[:, 1:] = sum(crpz[:, :, 1:] * z_integral, 1)
 | |
|                         exponent += gamma4
 | |
|                     power_interval *= exp(exponent)
 | |
|                     power[:, interval[1:]] = power_interval[:, 1:]
 | |
|                     power_in = power_interval[:, -1]
 | |
|         elif sim_params.raman_params.method == 'numerical':
 | |
|             dz = z[1:] - z[:-1]
 | |
|             power = outer(power_in, ones(z.size))
 | |
|             for i in range(1, z.size):
 | |
|                 power[:, i] = (power[:, i - 1] * (1 + (- alpha + sum(cr * power[:, i - 1], 1)) * dz[i - 1]) *
 | |
|                                lumped_losses[i - 1])
 | |
|         else:
 | |
|             raise ValueError(f'Method {sim_params.raman_params.method} not implemented in Raman Solver.')
 | |
|         return power
 | |
| 
 | |
|     @staticmethod
 | |
|     def iterative_algorithm(co_initial_guess_power, cnt_initial_guess_power, co_frequency, cnt_frequency, z, fiber,
 | |
|                             lumped_losses):
 | |
|         """Solves the Raman equation in case of both co- and counter-propagating frequencies
 | |
| 
 | |
|         :param co_initial_guess_power: co-propagationg Raman first order derivative equation solution
 | |
|         :param cnt_initial_guess_power: counter-propagationg Raman first order derivative equation solution
 | |
|         :param co_frequency: co-propagationg frequencies
 | |
|         :param cnt_frequency: counter-propagationg frequencies
 | |
|         :param z: z position array
 | |
|         :param fiber: instance of gnpy.core.elements.Fiber or gnpy.core.elements.RamanFiber
 | |
|         :param lumped_losses: concentrated losses array along the fiber span
 | |
|         :return: co- and counter-propagatng power profile matrix
 | |
|         """
 | |
|         logger.debug('  Start iterative algorithm')
 | |
|         residue = 1
 | |
|         residue_tol = 1e-6
 | |
|         accuracy = 1
 | |
|         accuracy_tol = 1e-3
 | |
|         iteration = 0
 | |
|         num_max_iter = 1000
 | |
|         prev_power = concatenate((co_initial_guess_power, cnt_initial_guess_power))
 | |
|         frequency = concatenate((co_frequency, cnt_frequency))
 | |
|         dz = z[1:] - z[:-1]
 | |
|         cr = fiber.cr(frequency)
 | |
|         alpha = fiber.alpha(frequency)
 | |
|         next_power = array(prev_power)
 | |
|         while residue > residue_tol and accuracy > accuracy_tol and iteration < num_max_iter:
 | |
|             iteration += 1
 | |
|             for i in range(1, z.size):
 | |
|                 dpdz = - alpha + sum(cr * next_power[:, i - 1], 1)
 | |
|                 next_power[:co_frequency.size, i] = \
 | |
|                     next_power[:co_frequency.size, i - 1] * (1 + dpdz[:co_frequency.size] * dz[i - 1]) * \
 | |
|                     lumped_losses[i - 1]
 | |
|             for i in range(1, z.size):
 | |
|                 dpdz = - alpha + sum(cr * next_power[:, -i], 1)
 | |
|                 next_power[co_frequency.size:, -i - 1] = \
 | |
|                     next_power[co_frequency.size:, -i] * (1 + dpdz[co_frequency.size:] * dz[-i]) * \
 | |
|                     lumped_losses[-i]
 | |
| 
 | |
|             dpdz_num = (next_power[:co_frequency.size, 1:] - next_power[:co_frequency.size, :-1]) / dz
 | |
|             dpdz_exp = next_power[:co_frequency.size, :-1] * \
 | |
|                 (- outer(alpha, ones(z.size)) + inner(cr, transpose(next_power)))[:co_frequency.size, :-1] * \
 | |
|                 lumped_losses[:-1]
 | |
| 
 | |
|             residue = max(abs((next_power - prev_power) / next_power))
 | |
|             accuracy = max(abs((dpdz_exp - dpdz_num) / dpdz_exp))
 | |
|             prev_power = array(next_power)
 | |
|             logger.debug(f'     Iteration: {iteration}  Accuracy: {format_float_scientific(accuracy, precision=3)}')
 | |
|         return next_power[:co_frequency.size, :], next_power[co_frequency.size:, :]
 | |
| 
 | |
| 
 | |
| class NliSolver:
 | |
|     """This class implements the NLI models.
 | |
|     Model and method can be specified in `sim_params.nli_params.method`.
 | |
|     List of implemented methods:
 | |
|     'gn_model_analytic': eq. 120 from arXiv:1209.0394
 | |
|     'ggn_spectrally_separated': eq. 21 from arXiv: 1710.02225
 | |
|     'ggn_approx': eq. 24-25 jlt:9741324
 | |
|     """
 | |
| 
 | |
|     SPM_WEIGHT = (16.0 / 27.0)
 | |
|     XPM_WEIGHT = 2 * (16.0 / 27.0)
 | |
| 
 | |
|     @staticmethod
 | |
|     def effective_length(alpha, length):
 | |
|         """The effective length identify the region in which the NLI has a significant contribution to
 | |
|         the signal degradation.
 | |
|         """
 | |
|         return (1 - exp(- alpha * length)) / alpha
 | |
| 
 | |
|     @staticmethod
 | |
|     def compute_nli(spectral_info: SpectralInformation, srs: StimulatedRamanScattering, fiber):
 | |
|         """Compute NLI power generated by the WDM comb `*carriers` on the channel under test `carrier`
 | |
|         at the end of the fiber span.
 | |
|         """
 | |
|         logger.debug('Start computing fiber NLI noise')
 | |
| 
 | |
|         if 'gn_model_analytic' == sim_params.nli_params.method:
 | |
|             eta = NliSolver._gn_analytic(spectral_info, fiber)
 | |
| 
 | |
|             cut_power = outer(spectral_info.signal, ones(spectral_info.number_of_channels))
 | |
|             pump_power = outer(ones(spectral_info.number_of_channels), spectral_info.signal)
 | |
|             nli_matrix = cut_power * pump_power ** 2 * eta
 | |
|             nli = sum(nli_matrix, 1)
 | |
|         elif 'ggn_spectrally_separated' in sim_params.nli_params.method:
 | |
|             if sim_params.nli_params.computed_channels is not None:
 | |
|                 cut_indices = array(sim_params.nli_params.computed_channels) - 1
 | |
|             elif sim_params.nli_params.computed_number_of_channels is not None:
 | |
|                 nb_ch_computed = sim_params.nli_params.computed_number_of_channels
 | |
|                 nb_ch = len(spectral_info.channel_number)
 | |
|                 cut_indices = array([round(i * (nb_ch - 1) / (nb_ch_computed - 1)) for i in range(0, nb_ch_computed)])
 | |
|             else:
 | |
|                 cut_indices = array(spectral_info.channel_number) - 1
 | |
| 
 | |
|             eta = NliSolver._ggn_spectrally_separated(cut_indices, spectral_info, fiber, srs)
 | |
| 
 | |
|             # Interpolation over the channels not indicated as compted channels in simulation parameters
 | |
|             cut_power = outer(spectral_info.signal[cut_indices], ones(spectral_info.number_of_channels))
 | |
|             cut_frequency = spectral_info.frequency[cut_indices]
 | |
|             pump_power = outer(ones(cut_indices.size), spectral_info.signal)
 | |
|             cut_baud_rate = outer(spectral_info.baud_rate[cut_indices], ones(spectral_info.number_of_channels))
 | |
| 
 | |
|             g_nli = eta * cut_power * pump_power**2 / cut_baud_rate
 | |
|             g_nli = sum(g_nli, 1)
 | |
|             g_nli = interp(spectral_info.frequency, cut_frequency, g_nli)
 | |
|             nli = spectral_info.baud_rate * g_nli  # Local white noise
 | |
|         elif 'ggn_approx' in sim_params.nli_params.method:
 | |
|             if sim_params.nli_params.computed_channels is not None:
 | |
|                 cut_indices = array(sim_params.nli_params.computed_channels) - 1
 | |
|             elif sim_params.nli_params.computed_number_of_channels is not None:
 | |
|                 nb_ch_computed = sim_params.nli_params.computed_number_of_channels
 | |
|                 nb_ch = len(spectral_info.channel_number)
 | |
|                 cut_indices = array([round(i * (nb_ch - 1) / (nb_ch_computed - 1)) for i in range(0, nb_ch_computed)])
 | |
|             else:
 | |
|                 cut_indices = array(spectral_info.channel_number) - 1
 | |
| 
 | |
|             eta = NliSolver._ggn_approx(cut_indices, spectral_info, fiber, srs)
 | |
| 
 | |
|             # Interpolation over the channels not indicated as computed channels in simulation parameters
 | |
|             cut_power = outer(spectral_info.signal[cut_indices], ones(spectral_info.number_of_channels))
 | |
|             cut_frequency = spectral_info.frequency[cut_indices]
 | |
|             pump_power = outer(ones(cut_indices.size), spectral_info.signal)
 | |
|             cut_baud_rate = outer(spectral_info.baud_rate[cut_indices], ones(spectral_info.number_of_channels))
 | |
| 
 | |
|             g_nli = eta * cut_power * pump_power ** 2 / cut_baud_rate
 | |
|             g_nli = sum(g_nli, 1)
 | |
|             g_nli = interp(spectral_info.frequency, cut_frequency, g_nli)
 | |
|             nli = spectral_info.baud_rate * g_nli  # Local white noise
 | |
|         else:
 | |
|             raise ValueError(f'Method {sim_params.nli_params.method} not implemented.')
 | |
| 
 | |
|         return nli
 | |
| 
 | |
|     # Methods for computing GN-model eta matrix
 | |
|     @staticmethod
 | |
|     def _gn_analytic(spectral_info, fiber, spm_weight=SPM_WEIGHT, xpm_weight=XPM_WEIGHT):
 | |
|         """Computes the nonlinear interference power evaluated at the fiber input.
 | |
|         The method uses eq. 120 from arXiv:1209.0394
 | |
|         """
 | |
|         # Spectral Features
 | |
|         nch = spectral_info.number_of_channels
 | |
|         frequency = spectral_info.frequency
 | |
|         baud_rate = spectral_info.baud_rate
 | |
|         delta_frequency = spectral_info.df
 | |
| 
 | |
|         # Physical fiber parameters
 | |
|         alpha = fiber.alpha(frequency)
 | |
|         beta2 = fiber.beta2(frequency)
 | |
|         gamma = outer(fiber.gamma(frequency), ones(nch))
 | |
|         length = fiber.params.length
 | |
| 
 | |
|         identity = diag(ones(nch))
 | |
|         weight = spm_weight * identity + xpm_weight * (ones([nch, nch]) - identity)
 | |
| 
 | |
|         effective_length = NliSolver.effective_length(alpha, length)
 | |
|         asymptotic_length = 1 / alpha
 | |
| 
 | |
|         cut_baud_rate = outer(baud_rate, ones(nch))
 | |
|         pump_baud_rate = outer(ones(nch), baud_rate)
 | |
| 
 | |
|         psi = NliSolver._psi(delta_frequency, baud_rate, beta2, effective_length, asymptotic_length)
 | |
|         eta_cut_central_frequency = gamma ** 2 * weight * psi / (cut_baud_rate * pump_baud_rate ** 2)
 | |
|         eta = cut_baud_rate * eta_cut_central_frequency  # Local white noise
 | |
|         return eta
 | |
| 
 | |
|     @staticmethod
 | |
|     def _psi(df, baud_rate, beta2, effective_length, asymptotic_length):
 | |
|         """Calculates eq. 123 from `arXiv:1209.0394 <https://arxiv.org/abs/1209.0394>`__"""
 | |
|         cut_baud_rate = outer(baud_rate, ones(baud_rate.size))
 | |
|         cut_beta = outer(beta2, ones(baud_rate.size))
 | |
|         pump_baud_rate = baud_rate
 | |
|         pump_beta = outer(ones(baud_rate.size), beta2)
 | |
|         beta2 = (cut_beta + pump_beta) / 2
 | |
|         right_extreme = df + pump_baud_rate / 2
 | |
|         left_extreme = df - pump_baud_rate / 2
 | |
|         psi = (arcsinh(pi ** 2 * asymptotic_length * abs(beta2) * cut_baud_rate * right_extreme) -
 | |
|                arcsinh(pi ** 2 * asymptotic_length * abs(beta2) * cut_baud_rate * left_extreme)) / 2
 | |
|         psi *= effective_length ** 2 / (2 * pi * abs(beta2) * asymptotic_length)
 | |
|         return psi
 | |
| 
 | |
|     # Methods for computing the GGN-model eta matrix
 | |
|     @staticmethod
 | |
|     def _ggn_spectrally_separated(cut_indices, spectral_info, fiber, srs, spm_weight=SPM_WEIGHT, xpm_weight=XPM_WEIGHT):
 | |
|         """Computes the nonlinear interference power evaluated at the fiber input.
 | |
|         The method uses eq. 21 from arXiv: 1710.02225
 | |
|         """
 | |
|         # Spectral Features
 | |
|         nch = spectral_info.number_of_channels
 | |
|         frequency = spectral_info.frequency
 | |
|         baud_rate = spectral_info.baud_rate
 | |
|         slot_width = spectral_info.slot_width
 | |
|         roll_off = spectral_info.roll_off
 | |
| 
 | |
|         # Physical fiber parameters
 | |
|         alpha = fiber.alpha(frequency)
 | |
|         beta2 = fiber.beta2(frequency)
 | |
|         gamma = outer(fiber.gamma(frequency[cut_indices]), ones(nch))
 | |
| 
 | |
|         identity = diag(ones(nch))
 | |
|         weight = spm_weight * identity + xpm_weight * (ones([nch, nch]) - identity)
 | |
|         weight = weight[cut_indices, :]
 | |
| 
 | |
|         dispersion_tolerance = sim_params.nli_params.dispersion_tolerance
 | |
|         phase_shift_tolerance = sim_params.nli_params.phase_shift_tolerance
 | |
|         max_slot_width = max(slot_width)
 | |
|         delta_z = sim_params.raman_params.result_spatial_resolution
 | |
| 
 | |
|         psi_cut_central_frequency = zeros([cut_indices.size, nch])
 | |
|         for i, cut_index in enumerate(cut_indices):
 | |
|             logger.debug(f'Start computing fiber NLI noise of cut: {cut_index + 1}')
 | |
|             cut_frequency = frequency[cut_index]
 | |
|             cut_baud_rate = baud_rate[cut_index]
 | |
|             cut_roll_off = roll_off[cut_index]
 | |
|             cut_number = cut_index + 1
 | |
|             cut_beta2 = beta2[cut_index]
 | |
|             cut_base_frequency = frequency - cut_frequency
 | |
|             cut_beta_coefficients = polyfit(cut_base_frequency, beta2, 2)
 | |
|             cut_beta3 = cut_beta_coefficients[1] / (2 * pi)
 | |
| 
 | |
|             for pump_index in range(nch):
 | |
|                 pump_frequency = frequency[pump_index]
 | |
|                 pump_baud_rate = baud_rate[pump_index]
 | |
|                 pump_roll_off = roll_off[pump_index]
 | |
|                 pump_number = pump_index + 1
 | |
|                 pump_alpha = alpha[pump_index]
 | |
|                 dn = abs(pump_number - cut_number)
 | |
|                 delta_f = abs(cut_frequency - pump_frequency)
 | |
|                 k_tol = dispersion_tolerance * abs(alpha[pump_index])
 | |
|                 phi_tol = phase_shift_tolerance / delta_z
 | |
|                 f_cut_resolution = min(k_tol, phi_tol) / abs(cut_beta2) / (4 * pi ** 2 * (1 + dn) * max_slot_width)
 | |
|                 f_pump_resolution = min(k_tol, phi_tol) / abs(cut_beta2) / (4 * pi ** 2 * max_slot_width)
 | |
|                 if cut_index == pump_index:  # SPM
 | |
|                     psi_cut_central_frequency[i, pump_index] = \
 | |
|                         NliSolver._generalized_psi(cut_frequency, cut_frequency, cut_baud_rate, cut_roll_off,
 | |
|                                                    pump_frequency, pump_baud_rate, pump_roll_off, f_cut_resolution,
 | |
|                                                    f_pump_resolution, srs, pump_alpha, cut_beta2, cut_beta3,
 | |
|                                                    cut_frequency)
 | |
|                 else:  # XPM
 | |
|                     frequency_offset_threshold = NliSolver._frequency_offset_threshold(cut_beta2, pump_baud_rate)
 | |
|                     if abs(delta_f) <= frequency_offset_threshold:
 | |
|                         psi_cut_central_frequency[i, pump_index] = \
 | |
|                             NliSolver._generalized_psi(cut_frequency, cut_frequency, cut_baud_rate, cut_roll_off,
 | |
|                                                        pump_frequency, pump_baud_rate, pump_roll_off, f_cut_resolution,
 | |
|                                                        f_pump_resolution, srs, pump_alpha, cut_beta2, cut_beta3,
 | |
|                                                        cut_frequency)
 | |
|                     else:
 | |
|                         psi_cut_central_frequency[i, pump_index] = \
 | |
|                             NliSolver._fast_generalized_psi(cut_frequency, cut_frequency, cut_baud_rate, cut_roll_off,
 | |
|                                                             pump_frequency, pump_baud_rate, pump_roll_off,
 | |
|                                                             f_cut_resolution, srs, pump_alpha, cut_beta2, cut_beta3,
 | |
|                                                             cut_frequency)
 | |
| 
 | |
|         cut_baud_rate = outer(baud_rate[cut_indices], ones(nch))
 | |
|         pump_baud_rate = outer(ones(cut_indices.size), baud_rate)
 | |
| 
 | |
|         eta_cut_central_frequency = \
 | |
|             gamma ** 2 * weight * psi_cut_central_frequency / (cut_baud_rate * pump_baud_rate ** 2)
 | |
|         eta = cut_baud_rate * eta_cut_central_frequency  # Local white noise
 | |
|         return eta
 | |
| 
 | |
|     @staticmethod
 | |
|     def _fast_generalized_psi(f_eval, cut_frequency, cut_baud_rate, cut_roll_off, pump_frequency, pump_baud_rate,
 | |
|                               pump_roll_off, f_cut_resolution, srs, alpha, beta2, beta3, f_ref_beta):
 | |
|         """Computes the generalized psi function similarly to the one used in the GN model."""
 | |
|         z = srs.z
 | |
|         rho_norm = srs.rho * exp(outer(alpha / 2, z))
 | |
|         rho_pump = interp1d(srs.frequency, rho_norm, axis=0)(pump_frequency)
 | |
| 
 | |
|         f1_array = array([pump_frequency - (pump_baud_rate * (1 + pump_roll_off) / 2),
 | |
|                           pump_frequency + (pump_baud_rate * (1 + pump_roll_off) / 2)])
 | |
|         f2_array = arange(cut_frequency, cut_frequency + (cut_baud_rate * (1 + cut_roll_off) / 2),
 | |
|                           f_cut_resolution)  # Only positive f2 is used since integrand_f2 is symmetric
 | |
| 
 | |
|         integrand_f1 = zeros(f1_array.size)
 | |
|         for f1_index, f1 in enumerate(f1_array):
 | |
|             delta_beta = 4 * pi ** 2 * (f1 - f_eval) * (f2_array - f_eval) * (
 | |
|                         beta2 + pi * beta3 * (f1 + f2_array - 2 * f_ref_beta))
 | |
|             integrand_f2 = NliSolver._generalized_rho_nli(delta_beta, rho_pump, z, alpha)
 | |
|             integrand_f1[f1_index] = 2 * trapz(integrand_f2, f2_array)  # 2x since integrand_f2 is symmetric in f2
 | |
|         generalized_psi = 0.5 * sum(integrand_f1) * pump_baud_rate
 | |
|         return generalized_psi
 | |
| 
 | |
|     @staticmethod
 | |
|     def _generalized_psi(f_eval, cut_frequency, cut_baud_rate, cut_roll_off, pump_frequency, pump_baud_rate,
 | |
|                          pump_roll_off, f_cut_resolution, f_pump_resolution, srs, alpha, beta2, beta3, f_ref_beta):
 | |
|         """Computes the generalized psi function similarly to the one used in the GN model."""
 | |
|         z = srs.z
 | |
|         rho_norm = srs.rho * exp(outer(alpha / 2, z))
 | |
|         rho_pump = interp1d(srs.frequency, rho_norm, axis=0)(pump_frequency)
 | |
| 
 | |
|         f1_array = arange(pump_frequency - (pump_baud_rate * (1 + pump_roll_off) / 2),
 | |
|                           pump_frequency + (pump_baud_rate * (1 + pump_roll_off) / 2),
 | |
|                           f_pump_resolution)
 | |
|         f2_array = arange(cut_frequency - (cut_baud_rate * (1 + cut_roll_off) / 2),
 | |
|                           cut_frequency + (cut_baud_rate * (1 + cut_roll_off) / 2),
 | |
|                           f_cut_resolution)
 | |
|         rc1 = raised_cosine(f1_array, pump_frequency, pump_baud_rate, pump_roll_off)
 | |
| 
 | |
|         integrand_f1 = zeros(f1_array.size)
 | |
|         for i in range(f1_array.size):
 | |
|             f3_array = f1_array[i] + f2_array - f_eval
 | |
|             rc2 = raised_cosine(f2_array, cut_frequency, cut_baud_rate, cut_roll_off)
 | |
|             rc3 = raised_cosine(f3_array, pump_frequency, pump_baud_rate, pump_roll_off)
 | |
|             delta_beta = 4 * pi ** 2 * (f1_array[i] - f_eval) * (f2_array - f_eval) * (
 | |
|                         beta2 + pi * beta3 * (f1_array[i] + f2_array - 2 * f_ref_beta))
 | |
|             integrand_f2 = rc1[i] * rc2 * rc3 * NliSolver._generalized_rho_nli(delta_beta, rho_pump, z, alpha)
 | |
|             integrand_f1[i] = trapz(integrand_f2, f2_array)
 | |
|         generalized_psi = trapz(integrand_f1, f1_array)
 | |
|         return generalized_psi
 | |
| 
 | |
|     @staticmethod
 | |
|     def _generalized_rho_nli(delta_beta, rho_pump, z, alpha):
 | |
|         w = 1j * delta_beta - alpha
 | |
|         generalized_rho_nli = (rho_pump[-1]**2 * exp(w * z[-1]) - rho_pump[0]**2 * exp(w * z[0])) / w
 | |
|         for z_ind in range(0, len(z) - 1):
 | |
|             derivative_rho = (rho_pump[z_ind + 1]**2 - rho_pump[z_ind]**2) / (z[z_ind + 1] - z[z_ind])
 | |
|             generalized_rho_nli -= derivative_rho * (exp(w * z[z_ind + 1]) - exp(w * z[z_ind])) / (w**2)
 | |
|         generalized_rho_nli = abs(generalized_rho_nli)**2
 | |
|         return generalized_rho_nli
 | |
| 
 | |
|     @staticmethod
 | |
|     def _frequency_offset_threshold(beta2, symbol_rate):
 | |
|         k_ref = 5
 | |
|         beta2_ref = 21.3e-27
 | |
|         delta_f_ref = 50e9
 | |
|         rs_ref = 32e9
 | |
|         beta2 = abs(beta2)
 | |
|         freq_offset_th = ((k_ref * delta_f_ref) * rs_ref * beta2_ref) / (beta2 * symbol_rate)
 | |
|         return freq_offset_th
 | |
| 
 | |
|     @staticmethod
 | |
|     def _ggn_approx(cut_indices, spectral_info: SpectralInformation, fiber, srs, spm_weight=SPM_WEIGHT,
 | |
|                     xpm_weight=XPM_WEIGHT):
 | |
|         """Computes the nonlinear interference power evaluated at the fiber input.
 | |
|         The method uses eq. 24-25 of https://ieeexplore.ieee.org/document/9741324
 | |
|         """
 | |
|         # Spectral Features
 | |
|         nch = spectral_info.number_of_channels
 | |
|         frequency = spectral_info.frequency
 | |
|         baud_rate = spectral_info.baud_rate
 | |
|         slot_width = spectral_info.slot_width
 | |
|         roll_off = spectral_info.roll_off
 | |
|         df = spectral_info.df + diag(full(nch, nan))
 | |
| 
 | |
|         # Physical fiber parameters
 | |
|         alpha = fiber.alpha(frequency)
 | |
|         beta2 = fiber.beta2(frequency)
 | |
|         gamma = outer(fiber.gamma(frequency[cut_indices]), ones(nch))
 | |
| 
 | |
|         identity = diag(ones(nch))
 | |
|         weight = spm_weight * identity + xpm_weight * (ones([nch, nch]) - identity)
 | |
|         weight = weight[cut_indices, :]
 | |
| 
 | |
|         dispersion_tolerance = sim_params.nli_params.dispersion_tolerance
 | |
|         phase_shift_tolerance = sim_params.nli_params.phase_shift_tolerance
 | |
|         max_slot_width = max(slot_width)
 | |
|         max_beta2 = max(abs(beta2))
 | |
|         delta_z = sim_params.raman_params.result_spatial_resolution
 | |
| 
 | |
|         # Approximation psi
 | |
|         loss_profile = srs.loss_profile[:nch]
 | |
|         z = srs.z
 | |
|         psi = NliSolver._approx_psi(df=df, frequency=frequency, beta2=beta2, baud_rate=baud_rate,
 | |
|                                     loss_profile=loss_profile, z=z)
 | |
| 
 | |
|         # GGN for SPM
 | |
|         for cut_index in cut_indices:
 | |
|             dn = 0
 | |
|             cut_frequency = frequency[cut_index]
 | |
|             cut_baud_rate = baud_rate[cut_index]
 | |
|             cut_roll_off = roll_off[cut_index]
 | |
|             cut_beta2 = beta2[cut_index]
 | |
|             cut_alpha = alpha[cut_index]
 | |
|             k_tol = dispersion_tolerance * abs(cut_alpha)
 | |
|             phi_tol = phase_shift_tolerance / delta_z
 | |
|             f_cut_resolution = min(k_tol, phi_tol) / abs(max_beta2) / (4 * pi ** 2 * (1 + dn) * max_slot_width)
 | |
|             f_pump_resolution = min(k_tol, phi_tol) / abs(max_beta2) / (4 * pi ** 2 * max_slot_width)
 | |
|             psi[cut_index, cut_index] = NliSolver._generalized_psi(cut_frequency, cut_frequency, cut_baud_rate,
 | |
|                                                                    cut_roll_off, cut_frequency, cut_baud_rate,
 | |
|                                                                    cut_roll_off, f_cut_resolution, f_pump_resolution,
 | |
|                                                                    srs, cut_alpha, cut_beta2, 0, cut_frequency)
 | |
|         psi = psi[cut_indices, :]
 | |
|         cut_baud_rate = outer(baud_rate[cut_indices], ones(nch))
 | |
|         pump_baud_rate = outer(ones(cut_indices.size), baud_rate)
 | |
| 
 | |
|         eta_cut_central_frequency = \
 | |
|             gamma ** 2 * weight * psi / (cut_baud_rate * pump_baud_rate ** 2)
 | |
|         eta = cut_baud_rate * eta_cut_central_frequency  # Local white noise
 | |
| 
 | |
|         return eta
 | |
| 
 | |
|     @staticmethod
 | |
|     def _approx_psi(df, frequency, baud_rate, beta2, loss_profile, z):
 | |
|         """Computes the approximated psi function similarly to the one used in the GN model.
 | |
|         The method uses eq. 25 of https://ieeexplore.ieee.org/document/9741324"""
 | |
|         pump_baud_rate = outer(ones(frequency.size), baud_rate)
 | |
|         cut_beta = outer(beta2, ones(frequency.size))
 | |
|         pump_beta = outer(ones(frequency.size), beta2)
 | |
|         delta_z = abs(z[:-1] - z[1:])
 | |
| 
 | |
|         loss_lin = log(loss_profile)
 | |
|         pump_alpha = (loss_lin[:, 1:] - loss_lin[:, :-1]) / delta_z
 | |
|         leff = abs((loss_profile[:, 1:] - loss_profile[:, :-1]) / sqrt(abs(pump_alpha))) * pump_alpha / abs(pump_alpha)
 | |
|         leff = reshape(outer(leff, ones(z.size - 1)), newshape=[leff.shape[0], leff.shape[1], leff.shape[1]])
 | |
|         leff2 = leff * swapaxes(leff, 2, 1)
 | |
|         leff2 = sum(leff2, axis=(1, 2))
 | |
|         z_int = outer(ones(frequency.size), leff2)
 | |
| 
 | |
|         delta_beta = (cut_beta + pump_beta) / 2
 | |
|         psi = z_int * pump_baud_rate / (4 * pi * abs(delta_beta * df))
 | |
|         return psi
 | |
| 
 | |
| 
 | |
| 
 | |
| def estimate_nf_model(type_variety, gain_min, gain_max, nf_min, nf_max):
 | |
|     if nf_min < -10:
 | |
|         raise EquipmentConfigError(f'Invalid nf_min value {nf_min!r} for amplifier {type_variety}')
 | |
|     if nf_max < -10:
 | |
|         raise EquipmentConfigError(f'Invalid nf_max value {nf_max!r} for amplifier {type_variety}')
 | |
| 
 | |
|     # NF estimation model based on nf_min and nf_max
 | |
|     # delta_p:  max power dB difference between first and second stage coils
 | |
|     # dB g1a:   first stage gain - internal VOA attenuation
 | |
|     # nf1, nf2: first and second stage coils
 | |
|     #           calculated by solving nf_{min,max} = nf1 + nf2 / g1a{min,max}
 | |
|     delta_p = 5
 | |
|     g1a_min = gain_min - (gain_max - gain_min) - delta_p
 | |
|     g1a_max = gain_max - delta_p
 | |
|     nf2 = lin2db((db2lin(nf_min) - db2lin(nf_max)) /
 | |
|                  (1 / db2lin(g1a_max) - 1 / db2lin(g1a_min)))
 | |
|     nf1 = lin2db(db2lin(nf_min) - db2lin(nf2) / db2lin(g1a_max))
 | |
| 
 | |
|     if nf1 < 4:
 | |
|         raise EquipmentConfigError(f'First coil value too low {nf1} for amplifier {type_variety}')
 | |
| 
 | |
|     # Check 1 dB < delta_p < 6 dB to ensure nf_min and nf_max values make sense.
 | |
|     # There shouldn't be high nf differences between the two coils:
 | |
|     #    nf2 should be nf1 + 0.3 < nf2 < nf1 + 2
 | |
|     # If not, recompute and check delta_p
 | |
|     if not nf1 + 0.3 < nf2 < nf1 + 2:
 | |
|         nf2 = clip(nf2, nf1 + 0.3, nf1 + 2)
 | |
|         g1a_max = lin2db(db2lin(nf2) / (db2lin(nf_min) - db2lin(nf1)))
 | |
|         delta_p = gain_max - g1a_max
 | |
|         g1a_min = gain_min - (gain_max - gain_min) - delta_p
 | |
|         if not 1 < delta_p < 11:
 | |
|             raise EquipmentConfigError(f'Computed \N{greek capital letter delta}P invalid \
 | |
|                 \n 1st coil vs 2nd coil calculated DeltaP {delta_p:.2f} for \
 | |
|                 \n amplifier {type_variety} is not valid: revise inputs \
 | |
|                 \n calculated 1st coil NF = {nf1:.2f}, 2nd coil NF = {nf2:.2f}')
 | |
|     # Check calculated values for nf1 and nf2
 | |
|     calc_nf_min = lin2db(db2lin(nf1) + db2lin(nf2) / db2lin(g1a_max))
 | |
|     if not isclose(nf_min, calc_nf_min, abs_tol=0.01):
 | |
|         raise EquipmentConfigError(f'nf_min does not match calc_nf_min, {nf_min} vs {calc_nf_min} for amp {type_variety}')
 | |
|     calc_nf_max = lin2db(db2lin(nf1) + db2lin(nf2) / db2lin(g1a_min))
 | |
|     if not isclose(nf_max, calc_nf_max, abs_tol=0.01):
 | |
|         raise EquipmentConfigError(f'nf_max does not match calc_nf_max, {nf_max} vs {calc_nf_max} for amp {type_variety}')
 | |
| 
 | |
|     return nf1, nf2, delta_p
 |