Simulation Parameters

This change siplifies the structure of the simulation parameters, removing the gnpy.science_utils.simulation layer, provides some documentation of the parameters and define a mock fixture for testing in safe mode

Change-Id: If5ef341e0585586127d5dae3f39dca2c232236f1
This commit is contained in:
AndreaDAmico
2021-06-17 12:17:15 +02:00
committed by EstherLerouzic
parent 3910803dfa
commit def82b6515
7 changed files with 105 additions and 166 deletions

View File

@@ -34,92 +34,63 @@ class PumpParams(Parameters):
class RamanParams(Parameters):
def __init__(self, **kwargs):
self._flag_raman = kwargs['flag_raman']
self._space_resolution = kwargs['space_resolution'] if 'space_resolution' in kwargs else None
self._tolerance = kwargs['tolerance'] if 'tolerance' in kwargs else None
@property
def flag_raman(self):
return self._flag_raman
@property
def space_resolution(self):
return self._space_resolution
@property
def tolerance(self):
return self._tolerance
def __init__(self, flag=False, space_resolution=10e3, tolerance=None):
""" Simulation parameters used within the Raman Solver
:params flag: boolean for enabling/disable the evaluation of the Raman Power profile in frequency and position
:params space_resolution: spatial resolution of the evaluated Raman Power profile
:params tolerance: tuning parameter for scipy.integrate.solve_bvp solution
"""
self.flag = flag
self.space_resolution = space_resolution # [m]
self.tolerance = tolerance
class NLIParams(Parameters):
def __init__(self, **kwargs):
self._nli_method_name = kwargs['nli_method_name']
self._wdm_grid_size = kwargs['wdm_grid_size']
self._dispersion_tolerance = kwargs['dispersion_tolerance']
self._phase_shift_tolerance = kwargs['phase_shift_tolerance']
self._f_cut_resolution = None
self._f_pump_resolution = None
self._computed_channels = kwargs['computed_channels'] if 'computed_channels' in kwargs else None
@property
def nli_method_name(self):
return self._nli_method_name
@property
def wdm_grid_size(self):
return self._wdm_grid_size
@property
def dispersion_tolerance(self):
return self._dispersion_tolerance
@property
def phase_shift_tolerance(self):
return self._phase_shift_tolerance
@property
def f_cut_resolution(self):
return self._f_cut_resolution
@f_cut_resolution.setter
def f_cut_resolution(self, f_cut_resolution):
self._f_cut_resolution = f_cut_resolution
@property
def f_pump_resolution(self):
return self._f_pump_resolution
@f_pump_resolution.setter
def f_pump_resolution(self, f_pump_resolution):
self._f_pump_resolution = f_pump_resolution
@property
def computed_channels(self):
return self._computed_channels
def __init__(self, method='gn_model_analytic', dispersion_tolerance=1, phase_shift_tolerance=0.1,
computed_channels=None, wdm_grid_size=None, f_cut_resolution=None, f_pump_resolution=None):
""" Simulation parameters used within the Nli Solver
:params method: formula for NLI calculation
:params dispersion_tolerance: tuning parameter for ggn model solution
:params phase_shift_tolerance: tuning parameter for ggn model solution
:params computed_channels: the NLI is evaluated for these channels and extrapolated for the others
"""
self.method = method.lower()
self.dispersion_tolerance = dispersion_tolerance
self.phase_shift_tolerance = phase_shift_tolerance
self.computed_channels = computed_channels
self.wdm_grid_size = wdm_grid_size
self.f_cut_resolution = f_cut_resolution
self.f_pump_resolution = f_pump_resolution
class SimParams(Parameters):
def __init__(self, **kwargs):
try:
if 'nli_parameters' in kwargs:
self._nli_params = NLIParams(**kwargs['nli_parameters'])
else:
self._nli_params = None
if 'raman_parameters' in kwargs:
self._raman_params = RamanParams(**kwargs['raman_parameters'])
else:
self._raman_params = None
except KeyError as e:
raise ParametersError(f'Simulation parameters must include {e}. Configuration: {kwargs}')
_shared_dict = {'nli_params': NLIParams(), 'raman_params': RamanParams()}
def __init__(self):
if type(self) == SimParams:
raise NotImplementedError('Instances of SimParams cannot be generated')
@classmethod
def set_params(cls, sim_params):
cls._shared_dict['nli_params'] = NLIParams(**sim_params.get('nli_params', {}))
cls._shared_dict['raman_params'] = RamanParams(**sim_params.get('raman_params', {}))
@classmethod
def get(cls):
self = cls.__new__(cls)
return self
@property
def nli_params(self):
return self._nli_params
return self._shared_dict['nli_params']
@property
def raman_params(self):
return self._raman_params
return self._shared_dict['raman_params']
@classmethod
def default(cls):
cls._shared_dict = {'nli_params': NLIParams(), 'raman_params': RamanParams()}
class FiberParams(Parameters):

View File

@@ -23,13 +23,12 @@ from math import isclose
from gnpy.core.utils import db2lin, lin2db
from gnpy.core.exceptions import EquipmentConfigError
from gnpy.core.parameters import SimParams
logger = getLogger(__name__)
sim_params = SimParams.get()
def propagate_raman_fiber(fiber, *carriers):
simulation = Simulation.get_simulation()
sim_params = simulation.sim_params
raman_params = sim_params.raman_params
nli_params = sim_params.nli_params
# apply input attenuation to carriers
@@ -51,11 +50,11 @@ def propagate_raman_fiber(fiber, *carriers):
stimulated_raman_scattering = raman_solver.stimulated_raman_scattering
fiber_attenuation = (stimulated_raman_scattering.rho[:, -1])**-2
if not raman_params.flag_raman:
if not raman_params.flag:
fiber_attenuation = tuple(fiber.params.lin_attenuation for _ in carriers)
# evaluate Raman ASE noise if required by sim_params and if raman pumps are present
if raman_params.flag_raman and fiber.raman_pumps:
if raman_params.flag and fiber.raman_pumps:
raman_ase = raman_solver.spontaneous_raman_scattering.power[:, -1]
else:
raman_ase = tuple(0 for _ in carriers)
@@ -146,27 +145,6 @@ def raised_cosine_comb(f, *carriers):
return psd
class Simulation:
_shared_dict = {}
def __init__(self):
if type(self) == Simulation:
raise NotImplementedError('Simulation cannot be instatiated')
@classmethod
def set_params(cls, sim_params):
cls._shared_dict['sim_params'] = sim_params
@classmethod
def get_simulation(cls):
self = cls.__new__(cls)
return self
@property
def sim_params(self):
return self._shared_dict['sim_params']
class SpontaneousRamanScattering:
def __init__(self, frequency, z, power):
self.frequency = frequency
@@ -298,9 +276,6 @@ class RamanSolver:
cr_raman_matrix, freq_diff, ase_bc, bn_array, temperature):
spontaneous_raman_scattering = OptimizeResult()
simulation = Simulation.get_simulation()
sim_params = simulation.sim_params
dx = sim_params.raman_params.space_resolution
h = ph.value('Planck constant')
kb = ph.value('Boltzmann constant')
@@ -339,10 +314,8 @@ class RamanSolver:
# fiber parameters
fiber_length = self.fiber.params.length
raman_efficiency = self.fiber.params.raman_efficiency
simulation = Simulation.get_simulation()
sim_params = simulation.sim_params
if not sim_params.raman_params.flag_raman:
if not sim_params.raman_params.flag:
raman_efficiency['cr'] = zeros(len(raman_efficiency['cr']))
# raman solver parameters
z_resolution = sim_params.raman_params.space_resolution
@@ -476,15 +449,13 @@ class NliSolver:
""" Compute NLI power generated by the WDM comb `*carriers` on the channel under test `carrier`
at the end of the fiber span.
"""
simulation = Simulation.get_simulation()
sim_params = simulation.sim_params
if 'gn_model_analytic' == sim_params.nli_params.nli_method_name.lower():
if 'gn_model_analytic' == sim_params.nli_params.method:
carrier_nli = self._gn_analytic(carrier, *carriers)
elif 'ggn_spectrally_separated' in sim_params.nli_params.nli_method_name.lower():
elif 'ggn_spectrally_separated' in sim_params.nli_params.method:
eta_matrix = self._compute_eta_matrix(carrier, *carriers)
carrier_nli = self._carrier_nli_from_eta_matrix(eta_matrix, carrier, *carriers)
else:
raise ValueError(f'Method {sim_params.nli_params.nli_method_name} not implemented.')
raise ValueError(f'Method {sim_params.nli_params.method} not implemented.')
return carrier_nli
@@ -501,8 +472,6 @@ class NliSolver:
def _compute_eta_matrix(self, cut_carrier, *carriers):
cut_index = cut_carrier.channel_number - 1
simulation = Simulation.get_simulation()
sim_params = simulation.sim_params
# Matrix initialization
matrix_size = max(carriers, key=lambda x: getattr(x, 'channel_number')).channel_number
eta_matrix = zeros(shape=(matrix_size, matrix_size))
@@ -510,10 +479,10 @@ class NliSolver:
# SPM
logger.debug(f'Start computing SPM on channel #{cut_carrier.channel_number}')
# SPM GGN
if 'ggn' in sim_params.nli_params.nli_method_name.lower():
if 'ggn' in sim_params.nli_params.method:
partial_nli = self._generalized_spectrally_separated_spm(cut_carrier)
# SPM GN
elif 'gn' in sim_params.nli_params.nli_method_name.lower():
elif 'gn' in sim_params.nli_params.method:
partial_nli = self._gn_analytic(cut_carrier, *[cut_carrier])
eta_matrix[cut_index, cut_index] = partial_nli / (cut_carrier.power.signal**3)
@@ -524,10 +493,10 @@ class NliSolver:
logger.debug(f'Start computing XPM on channel #{cut_carrier.channel_number} '
f'from channel #{pump_carrier.channel_number}')
# XPM GGN
if 'ggn' in sim_params.nli_params.nli_method_name.lower():
if 'ggn' in sim_params.nli_params.method:
partial_nli = self._generalized_spectrally_separated_xpm(cut_carrier, pump_carrier)
# XPM GGN
elif 'gn' in sim_params.nli_params.nli_method_name.lower():
elif 'gn' in sim_params.nli_params.method:
partial_nli = self._gn_analytic(cut_carrier, *[pump_carrier])
eta_matrix[pump_index, pump_index] = \
partial_nli / (cut_carrier.power.signal * pump_carrier.power.signal**2)
@@ -560,8 +529,6 @@ class NliSolver:
# Methods for computing the GGN-model
def _generalized_spectrally_separated_spm(self, carrier):
gamma = self.fiber.params.gamma
simulation = Simulation.get_simulation()
sim_params = simulation.sim_params
f_cut_resolution = sim_params.nli_params.f_cut_resolution['delta_0']
f_eval = carrier.frequency
g_cut = (carrier.power.signal / carrier.baud_rate)
@@ -572,8 +539,6 @@ class NliSolver:
def _generalized_spectrally_separated_xpm(self, cut_carrier, pump_carrier):
gamma = self.fiber.params.gamma
simulation = Simulation.get_simulation()
sim_params = simulation.sim_params
delta_index = pump_carrier.channel_number - cut_carrier.channel_number
f_cut_resolution = sim_params.nli_params.f_cut_resolution[f'delta_{delta_index}']
f_pump_resolution = sim_params.nli_params.f_pump_resolution

View File

@@ -1,14 +1,14 @@
{
"raman_parameters": {
"flag_raman": true,
"raman_params": {
"flag": true,
"space_resolution": 10e3,
"tolerance": 1e-8
},
"nli_parameters": {
"nli_method_name": "ggn_spectrally_separated",
"wdm_grid_size": 50e9,
"dispersion_tolerance": 1,
"phase_shift_tolerance": 0.1,
"computed_channels": [1, 18, 37, 56, 75]
"nli_params": {
"method": "ggn_spectrally_separated",
"wdm_grid_size": 50e9,
"dispersion_tolerance": 1,
"phase_shift_tolerance": 0.1,
"computed_channels": [1, 18, 37, 56, 75]
}
}

View File

@@ -21,7 +21,6 @@ from gnpy.core.equipment import trx_mode_params
import gnpy.core.exceptions as exceptions
from gnpy.core.network import build_network
from gnpy.core.parameters import SimParams
from gnpy.core.science_utils import Simulation
from gnpy.core.utils import db2lin, lin2db, automatic_nch
from gnpy.topology.request import (ResultElement, jsontocsv, compute_path_dsjctn, requests_aggregation,
BLOCKING_NOPATH, correct_json_route_list,
@@ -57,14 +56,14 @@ def load_common_data(equipment_filename, topology_filename, simulation_filename,
if save_raw_network_filename is not None:
save_network(network, save_raw_network_filename)
print(f'{ansi_escapes.blue}Raw network (no optimizations) saved to {save_raw_network_filename}{ansi_escapes.reset}')
sim_params = SimParams(**load_json(simulation_filename)) if simulation_filename is not None else None
if not sim_params:
if not simulation_filename:
SimParams.default()
if next((node for node in network if isinstance(node, RamanFiber)), None) is not None:
print(f'{ansi_escapes.red}Invocation error:{ansi_escapes.reset} '
f'RamanFiber requires passing simulation params via --sim-params')
sys.exit(1)
else:
Simulation.set_params(sim_params)
SimParams.set_params(load_json(simulation_filename))
except exceptions.EquipmentConfigError as e:
print(f'{ansi_escapes.red}Configuration error in the equipment library:{ansi_escapes.reset} {e}')
sys.exit(1)

View File

@@ -1,11 +1,11 @@
{
"raman_parameters": {
"flag_raman": true,
"raman_params": {
"flag": true,
"space_resolution": 10e3,
"tolerance": 1e-8
},
"nli_parameters": {
"nli_method_name": "ggn_spectrally_separated",
"nli_params": {
"method": "ggn_spectrally_separated",
"wdm_grid_size": 50e9,
"dispersion_tolerance": 1,
"phase_shift_tolerance": 0.1,

View File

@@ -1,26 +1,32 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
"""
Checks that the class SimParams behaves as a mutable Singleton.
"""
from gnpy.core.parameters import SimParams
from gnpy.core.science_utils import Simulation
from gnpy.tools.json_io import load_json
TEST_DIR = Path(__file__).parent
DATA_DIR = TEST_DIR / 'data'
import pytest
from gnpy.core.parameters import SimParams, NLIParams, RamanParams
def test_sim_parameters():
j = load_json(DATA_DIR / 'sim_params.json')
sim_params = SimParams(**j)
Simulation.set_params(sim_params)
s1 = Simulation.get_simulation()
assert s1.sim_params.raman_params.flag_raman
s2 = Simulation.get_simulation()
assert s2.sim_params.raman_params.flag_raman
j['raman_parameters']['flag_raman'] = False
sim_params = SimParams(**j)
Simulation.set_params(sim_params)
assert not s2.sim_params.raman_params.flag_raman
assert not s1.sim_params.raman_params.flag_raman
class MockSimParams(SimParams):
"""Mock simulation parameters for monkey patch"""
_shared_dict = {'nli_params': NLIParams(), 'raman_params': RamanParams()}
@pytest.fixture
def set_sim_params(monkeypatch):
monkeypatch.setattr(SimParams, '_shared_dict', MockSimParams._shared_dict)
def test_sim_parameters(set_sim_params):
sim_params = {'nli_params': {}, 'raman_params': {}}
MockSimParams.set_params(sim_params)
s1 = SimParams.get()
assert s1.nli_params.method == 'gn_model_analytic'
s2 = SimParams.get()
assert not s1.raman_params.flag
sim_params['raman_params']['flag'] = True
MockSimParams.set_params(sim_params)
assert s2.raman_params.flag
assert s1.raman_params.flag

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Author: Alessio Ferrari
"""
Checks that RamanFiber propagates properly the spectral information. In this way, also the RamanSolver and the NliSolver
are tested.
@@ -12,14 +12,14 @@ from numpy.testing import assert_allclose
from gnpy.core.info import create_input_spectral_information
from gnpy.core.elements import RamanFiber
from gnpy.core.parameters import SimParams
from gnpy.core.science_utils import Simulation
from gnpy.tools.json_io import load_json
from tests.test_parameters import MockSimParams, set_sim_params
TEST_DIR = Path(__file__).parent
def test_raman_fiber():
def test_raman_fiber(set_sim_params):
""" Test the accuracy of propagating the RamanFiber."""
# spectral information generation
power = 1e-3
@@ -30,9 +30,7 @@ def test_raman_fiber():
spectral_info_params.pop('tx_osnr')
spectral_info_params.pop('sys_margins')
spectral_info_input = create_input_spectral_information(power=power, **spectral_info_params)
sim_params = SimParams(**load_json(TEST_DIR / 'data' / 'sim_params.json'))
Simulation.set_params(sim_params)
MockSimParams.set_params(load_json(TEST_DIR / 'data' / 'sim_params.json'))
fiber = RamanFiber(**load_json(TEST_DIR / 'data' / 'raman_fiber_config.json'))
# propagation