#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' gnpy.core.elements ================== This module contains standard network elements. A network element is a Python callable. It takes a :class:`.info.SpectralInformation` object and returns a copy with appropriate fields affected. This structure represents spectral information that is "propogated" by this network element. Network elements must have only a local "view" of the network and propogate :class:`.info.SpectralInformation` using only this information. They should be independent and self-contained. Network elements MUST implement two attributes .uid and .name representing a unique identifier and a printable name. ''' from numpy import abs, arange, array, exp, divide, errstate from numpy import interp, log10, mean, pi, polyfit, polyval, sum from scipy.constants import c, h from collections import namedtuple from gnpy.core.node import Node from gnpy.core.units import UNITS from gnpy.core.utils import lin2db, db2lin, itufs, itufl, snr_sum from gnpy.core.science_utils import propagate_raman_fiber, _psi class Transceiver(Node): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.osnr_ase_01nm = None self.osnr_ase = None self.osnr_nli = None self.snr = None self.passive = False self.baud_rate = None def _calc_snr(self, spectral_info): with errstate(divide='ignore'): self.baud_rate = [c.baud_rate for c in spectral_info.carriers] ratio_01nm = [lin2db(12.5e9/b_rate) for b_rate in self.baud_rate] #set raw values to record original calculation, before update_snr() self.raw_osnr_ase = [lin2db(divide(c.power.signal, c.power.ase)) for c in spectral_info.carriers] self.raw_osnr_ase_01nm = [ase - ratio for ase, ratio in zip(self.raw_osnr_ase, ratio_01nm)] self.raw_osnr_nli = [lin2db(divide(c.power.signal, c.power.nli)) for c in spectral_info.carriers] self.raw_snr = [lin2db(divide(c.power.signal, c.power.nli+c.power.ase)) for c in spectral_info.carriers] self.raw_snr_01nm = [snr - ratio for snr, ratio in zip(self.raw_snr, ratio_01nm)] self.osnr_ase = self.raw_osnr_ase self.osnr_ase_01nm = self.raw_osnr_ase_01nm self.osnr_nli = self.raw_osnr_nli self.snr = self.raw_snr self.snr_01nm = self.raw_snr_01nm def update_snr(self, *args): """ snr_added in 0.1nm compute SNR penalties such as transponder Tx_osnr or Roadm add_drop_osnr only applied in request.py / propagate on the last Trasceiver node of the path all penalties are added in a single call because to avoid uncontrolled cumul """ #use raw_values so that the added snr penalties are not cumulated snr_added = 0 for s in args: snr_added += db2lin(-s) snr_added = -lin2db(snr_added) self.osnr_ase = list(map(lambda x,y:snr_sum(x,y,snr_added), self.raw_osnr_ase, self.baud_rate)) self.snr = list(map(lambda x,y:snr_sum(x,y,snr_added), self.raw_snr, self.baud_rate)) self.osnr_ase_01nm = list(map(lambda x:snr_sum(x,12.5e9,snr_added), self.raw_osnr_ase_01nm)) self.snr_01nm = list(map(lambda x:snr_sum(x,12.5e9,snr_added), self.raw_snr_01nm)) @property def to_json(self): return {'uid' : self.uid, 'type' : type(self).__name__, 'metadata' : { 'location': self.metadata['location']._asdict() } } def __repr__(self): return (f'{type(self).__name__}(' f'uid={self.uid!r}, ' f'osnr_ase_01nm={self.osnr_ase_01nm!r}, ' f'osnr_ase={self.osnr_ase!r}, ' f'osnr_nli={self.osnr_nli!r}, ' f'snr={self.snr!r})') def __str__(self): if self.snr is None or self.osnr_ase is None: return f'{type(self).__name__} {self.uid}' snr = round(mean(self.snr),2) osnr_ase = round(mean(self.osnr_ase),2) osnr_ase_01nm = round(mean(self.osnr_ase_01nm), 2) snr_01nm = round(mean(self.snr_01nm),2) return '\n'.join([f'{type(self).__name__} {self.uid}', f' OSNR ASE (0.1nm, dB): {osnr_ase_01nm:.2f}', f' OSNR ASE (signal bw, dB): {osnr_ase:.2f}', f' SNR total (signal bw, dB): {snr:.2f}', f' SNR total (0.1nm, dB): {snr_01nm:.2f}']) def __call__(self, spectral_info): self._calc_snr(spectral_info) return spectral_info RoadmParams = namedtuple('RoadmParams', 'target_pch_out_db add_drop_osnr restrictions per_degree_target_pch_out_db') class Roadm(Node): def __init__(self, *args, params, **kwargs): if 'per_degree_target_pch_out_db' not in params.keys(): params['per_degree_target_pch_out_db'] = [] super().__init__(*args, params=RoadmParams(**params), **kwargs) self.loss = 0 #auto-design interest self.effective_loss = None self.effective_pch_out_db = self.params.target_pch_out_db self.passive = True self.restrictions = self.params.restrictions self.per_degree_target_pch_out_db = self.params.per_degree_target_pch_out_db @property def to_json(self): return {'uid' : self.uid, 'type' : type(self).__name__, 'params' : { 'target_pch_out_db' : self.effective_pch_out_db, 'restrictions' : self.restrictions, 'per_degree_target_pch_out_db': self.per_degree_target_pch_out_db }, 'metadata' : { 'location': self.metadata['location']._asdict() } } def __repr__(self): return f'{type(self).__name__}(uid={self.uid!r}, loss={self.loss!r})' def __str__(self): return '\n'.join([f'{type(self).__name__} {self.uid}', f' effective loss (dB): {self.effective_loss:.2f}', f' pch out (dBm): {self.effective_pch_out_db!r}']) def propagate(self, pref, *carriers, degree): #pin_target and loss are read from eqpt_config.json['Roadm'] #all ingress channels in xpress are set to this power level #but add channels are not, so we define an effective loss #in the case of add channels if self.per_degree_target_pch_out_db: # find the target power on this degree try: temp = next(el['target_pch_out_db'] \ for el in self.per_degree_target_pch_out_db if el['to_node']==degree) except StopIteration: # if no target power is defined on this degree use the global one temp = self.params.target_pch_out_db else: # if no per degree target power are defined, use the global one temp = self.params.target_pch_out_db self.effective_pch_out_db = min(pref.p_spani, temp) self.effective_loss = pref.p_spani - self.effective_pch_out_db carriers_power = array([c.power.signal +c.power.nli+c.power.ase for c in carriers]) carriers_att = list(map(lambda x : lin2db(x*1e3)-self.effective_pch_out_db, carriers_power)) exceeding_att = -min(list(filter(lambda x: x < 0, carriers_att)), default = 0) carriers_att = list(map(lambda x: db2lin(x+exceeding_att), carriers_att)) for carrier_att, carrier in zip(carriers_att, carriers) : pwr = carrier.power pwr = pwr._replace( signal = pwr.signal/carrier_att, nli = pwr.nli/carrier_att, ase = pwr.ase/carrier_att) yield carrier._replace(power=pwr) def update_pref(self, pref): return pref._replace(p_span0=pref.p_span0, p_spani=self.effective_pch_out_db) def __call__(self, spectral_info, degree): carriers = tuple(self.propagate(spectral_info.pref, *spectral_info.carriers, degree=degree)) pref = self.update_pref(spectral_info.pref) return spectral_info._replace(carriers=carriers, pref=pref) FusedParams = namedtuple('FusedParams', 'loss') class Fused(Node): def __init__(self, *args, params=None, **kwargs): if params is None: # default loss value if not mentioned in loaded network json params = {'loss':1} super().__init__(*args, params=FusedParams(**params), **kwargs) self.loss = self.params.loss self.passive = True @property def to_json(self): return {'uid' : self.uid, 'type' : type(self).__name__, 'params' :{ 'loss': self.loss }, 'metadata' : { 'location': self.metadata['location']._asdict() } } def __repr__(self): return f'{type(self).__name__}(uid={self.uid!r}, loss={self.loss!r})' def __str__(self): return '\n'.join([f'{type(self).__name__} {self.uid}', f' loss (dB): {self.loss:.2f}']) def propagate(self, *carriers): attenuation = db2lin(self.loss) for carrier in carriers: pwr = carrier.power pwr = pwr._replace(signal=pwr.signal/attenuation, nli=pwr.nli/attenuation, ase=pwr.ase/attenuation) yield carrier._replace(power=pwr) def update_pref(self, pref): return pref._replace(p_span0=pref.p_span0, p_spani=pref.p_spani - self.loss) def __call__(self, spectral_info): carriers = tuple(self.propagate(*spectral_info.carriers)) pref = self.update_pref(spectral_info.pref) return spectral_info._replace(carriers=carriers, pref=pref) FiberParams = namedtuple('FiberParams', 'type_variety length loss_coef length_units \ att_in con_in con_out dispersion gamma') class Fiber(Node): def __init__(self, *args, params=None, **kwargs): if params is None: params = {} if 'con_in' not in params: # if not defined in the network json connector loss in/out # the None value will be updated in network.py[build_network] # with default values from eqpt_config.json[Spans] params['con_in'] = None params['con_out'] = None if 'att_in' not in params: #fixed attenuator for padding params['att_in'] = 0 super().__init__(*args, params=FiberParams(**params), **kwargs) self.type_variety = self.params.type_variety self.length = self.params.length * UNITS[self.params.length_units] # in m self.loss_coef = self.params.loss_coef * 1e-3 # lineic loss dB/m self.lin_loss_coef = self.params.loss_coef / (20 * log10(exp(1))) self.att_in = self.params.att_in self.con_in = self.params.con_in self.con_out = self.params.con_out self.dispersion = self.params.dispersion # s/m/m self.gamma = self.params.gamma # 1/W/m self.pch_out_db = None self.carriers_in = None self.carriers_out = None # TODO|jla: discuss factor 2 in the linear lineic attenuation @property def to_json(self): return {'uid' : self.uid, 'type' : type(self).__name__, 'type_variety' : self.type_variety, 'params' : { #have to specify each because namedtupple cannot be updated :( 'type_variety' : self.type_variety, 'length' : self.length/UNITS[self.params.length_units], 'loss_coef' : self.loss_coef*1e3, 'length_units' : self.params.length_units, 'att_in' : self.att_in, 'con_in' : self.con_in, 'con_out' : self.con_out }, 'metadata' : { 'location': self.metadata['location']._asdict() } } def __repr__(self): return f'{type(self).__name__}(uid={self.uid!r}, length={round(self.length*1e-3,1)!r}km, loss={round(self.loss,1)!r}dB)' def __str__(self): return '\n'.join([f'{type(self).__name__} {self.uid}', f' type_variety: {self.type_variety}', f' length (km): {round(self.length*1e-3):.2f}', f' pad att_in (dB): {self.att_in:.2f}', f' total loss (dB): {self.loss:.2f}', f' (includes conn loss (dB) in: {self.con_in:.2f} out: {self.con_out:.2f})', f' (conn loss out includes EOL margin defined in eqpt_config.json)', f' pch out (dBm): {self.pch_out_db!r}']) @property def fiber_loss(self): """Fiber loss in dB, not including padding attenuator""" return self.loss_coef * self.length + self.con_in + self.con_out @property def loss(self): """total loss including padding att_in: useful for polymorphism with roadm loss""" return self.loss_coef * self.length + self.con_in + self.con_out + self.att_in @property def passive(self): return True @property def lin_attenuation(self): return db2lin(self.length * self.loss_coef) @property def effective_length(self): _, alpha = self.dbkm_2_lin() leff = (1 - exp(-2 * alpha * self.length)) / (2 * alpha) return leff @property def asymptotic_length(self): _, alpha = self.dbkm_2_lin() aleff = 1 / (2 * alpha) return aleff def carriers(self, loc, attr): """retrieve carriers information :param loc: (in, out) of the class element :param attr: (ase, nli, signal, total) power information """ if not (loc in ('in', 'out') and attr in ('nli', 'signal', 'total', 'ase')): yield None return loc_attr = 'carriers_'+loc for c in getattr(self, loc_attr) : if attr == 'total': yield c.power.ase+c.power.nli+c.power.signal else: yield c.power._asdict().get(attr, None) def beta2(self, ref_wavelength=1550e-9): """Returns beta2 from dispersion parameter. Dispersion is entered in ps/nm/km. Disperion can be a numpy array or a single value. :param ref_wavelength: can be a numpy array; default: 1550nm """ # TODO|jla: discuss beta2 as method or attribute D = abs(self.dispersion) b2 = (ref_wavelength ** 2) * D / (2 * pi * c) # 10^21 scales [ps^2/km] return b2 # s/Hz/m def dbkm_2_lin(self): """calculates the linear loss coefficient""" # linear loss coefficient in dB/km^-1 alpha_pcoef = self.loss_coef # linear loss field amplitude coefficient in m^-1 alpha_acoef = alpha_pcoef / (2 * 10 * log10(exp(1))) return alpha_pcoef, alpha_acoef def _gn_analytic(self, carrier, *carriers): """Computes the nonlinear interference power on a single carrier. The method uses eq. 120 from `arXiv:1209.0394 `__. :param carrier: the signal under analysis :param carriers: the full WDM comb :return: carrier_nli: the amount of nonlinear interference in W on the under analysis """ g_nli = 0 for interfering_carrier in carriers: psi = _psi(carrier, interfering_carrier, beta2=self.beta2(), asymptotic_length=self.asymptotic_length) g_nli += (interfering_carrier.power.signal/interfering_carrier.baud_rate)**2 \ * (carrier.power.signal/carrier.baud_rate) * psi g_nli *= (16 / 27) * (self.gamma * self.effective_length)**2 \ / (2 * pi * abs(self.beta2()) * self.asymptotic_length) carrier_nli = carrier.baud_rate * g_nli return carrier_nli def propagate(self, *carriers): # apply connector_att_in on all carriers before computing gn analytics premiere partie pas bonne attenuation = db2lin(self.con_in + self.att_in) chan = [] for carrier in carriers: pwr = carrier.power pwr = pwr._replace(signal=pwr.signal/attenuation, nli=pwr.nli/attenuation, ase=pwr.ase/attenuation) carrier = carrier._replace(power=pwr) chan.append(carrier) carriers = tuple(f for f in chan) # propagate in the fiber and apply attenuation out attenuation = db2lin(self.con_out) for carrier in carriers: pwr = carrier.power carrier_nli = self._gn_analytic(carrier, *carriers) pwr = pwr._replace(signal=pwr.signal/self.lin_attenuation/attenuation, nli=(pwr.nli+carrier_nli)/self.lin_attenuation/attenuation, ase=pwr.ase/self.lin_attenuation/attenuation) yield carrier._replace(power=pwr) def update_pref(self, pref): self.pch_out_db = round(pref.p_spani - self.loss, 2) return pref._replace(p_span0=pref.p_span0, p_spani=self.pch_out_db) def __call__(self, spectral_info): self.carriers_in = spectral_info.carriers carriers = tuple(self.propagate(*spectral_info.carriers)) pref = self.update_pref(spectral_info.pref) self.carriers_out = carriers return spectral_info._replace(carriers=carriers, pref=pref) RamanFiberParams = namedtuple('RamanFiberParams', 'type_variety length loss_coef length_units \ att_in con_in con_out dispersion gamma raman_efficiency') class RamanFiber(Fiber): def __init__(self, *args, params=None, **kwargs): if params is None: params = {} if 'con_in' not in params: # if not defined in the network json connector loss in/out # the None value will be updated in network.py[build_network] # with default values from eqpt_config.json[Spans] params['con_in'] = None params['con_out'] = None if 'att_in' not in params: #fixed attenuator for padding params['att_in'] = 0 # TODO: can we re-use the Fiber constructor in a better way? Node.__init__(self, *args, params=RamanFiberParams(**params), **kwargs) self.type_variety = self.params.type_variety self.length = self.params.length * UNITS[self.params.length_units] # in m self.loss_coef = self.params.loss_coef * 1e-3 # lineic loss dB/m self.lin_loss_coef = self.params.loss_coef / (20 * log10(exp(1))) self.att_in = self.params.att_in self.con_in = self.params.con_in self.con_out = self.params.con_out self.dispersion = self.params.dispersion # s/m/m self.gamma = self.params.gamma # 1/W/m self.pch_out_db = None self.carriers_in = None self.carriers_out = None # TODO|jla: discuss factor 2 in the linear lineic attenuation @property def sim_params(self): return self._sim_params @sim_params.setter def sim_params(self, sim_params=None): self._sim_params = sim_params def update_pref(self, pref, *carriers): pch_out_db = lin2db(mean([carrier.power.signal for carrier in carriers])) + 30 self.pch_out_db = round(pch_out_db, 2) return pref._replace(p_span0=pref.p_span0, p_spani=self.pch_out_db) def __call__(self, spectral_info): self.carriers_in = spectral_info.carriers carriers = tuple(self.propagate(*spectral_info.carriers)) pref = self.update_pref(spectral_info.pref, *carriers) self.carriers_out = carriers return spectral_info._replace(carriers=carriers, pref=pref) def propagate(self, *carriers): for propagated_carrier in propagate_raman_fiber(self, *carriers): yield propagated_carrier class EdfaParams: def __init__(self, **params): self.update_params(params) if params == {}: self.type_variety = '' self.type_def = '' # self.gain_flatmax = 0 # self.gain_min = 0 # self.p_max = 0 # self.nf_model = None # self.nf_fit_coeff = None # self.nf_ripple = None # self.dgt = None # self.gain_ripple = None # self.out_voa_auto = False # self.allowed_for_design = None def update_params(self, kwargs): for k,v in kwargs.items() : setattr(self, k, update_params(**v) if isinstance(v, dict) else v) class EdfaOperational: default_values = \ { 'gain_target': None, 'delta_p': None, 'out_voa': None, 'tilt_target': 0 } def __init__(self, **operational): self.update_attr(operational) def update_attr(self, kwargs): clean_kwargs = {k:v for k,v in kwargs.items() if v !=''} for k,v in self.default_values.items(): setattr(self, k, clean_kwargs.get(k,v)) def __repr__(self): return (f'{type(self).__name__}(' f'gain_target={self.gain_target!r}, ' f'tilt_target={self.tilt_target!r})') class Edfa(Node): def __init__(self, *args, params=None, operational=None, **kwargs): if params is None: params = {} if operational is None: operational = {} super().__init__( *args, params=EdfaParams(**params), operational=EdfaOperational(**operational), **kwargs ) self.interpol_dgt = None # interpolated dynamic gain tilt self.interpol_gain_ripple = None # gain ripple self.interpol_nf_ripple = None # nf_ripple self.channel_freq = None # SI channel frequencies # nf, gprofile, pin and pout attributes are set by interpol_params self.nf = None # dB edfa nf at operational.gain_target self.gprofile = None self.pin_db = None self.nch = None self.pout_db = None self.target_pch_out_db = None self.effective_pch_out_db = None self.passive = False self.att_in = None self.carriers_in = None self.carriers_out = None self.effective_gain = self.operational.gain_target self.delta_p = self.operational.delta_p #delta P with Pref (power swwep) in power mode self.tilt_target = self.operational.tilt_target self.out_voa = self.operational.out_voa @property def to_json(self): return {'uid' : self.uid, 'type' : type(self).__name__, 'type_variety' : self.params.type_variety, 'operational' : { 'gain_target' : self.effective_gain, 'delta_p' : self.delta_p, 'tilt_target' : self.tilt_target, 'out_voa' : self.out_voa }, 'metadata' : { 'location': self.metadata['location']._asdict() } } def __repr__(self): return (f'{type(self).__name__}(uid={self.uid!r}, ' f'type_variety={self.params.type_variety!r}, ' f'interpol_dgt={self.interpol_dgt!r}, ' f'interpol_gain_ripple={self.interpol_gain_ripple!r}, ' f'interpol_nf_ripple={self.interpol_nf_ripple!r}, ' f'channel_freq={self.channel_freq!r}, ' f'nf={self.nf!r}, ' f'gprofile={self.gprofile!r}, ' f'pin_db={self.pin_db!r}, ' f'pout_db={self.pout_db!r})') def __str__(self): if self.pin_db is None or self.pout_db is None: return f'{type(self).__name__} {self.uid}' nf = mean(self.nf) return '\n'.join([f'{type(self).__name__} {self.uid}', f' type_variety: {self.params.type_variety}', f' effective gain(dB): {self.effective_gain:.2f}', f' (before att_in and before output VOA)', f' noise figure (dB): {nf:.2f}', f' (including att_in)', f' pad att_in (dB): {self.att_in:.2f}', f' Power In (dBm): {self.pin_db:.2f}', f' Power Out (dBm): {self.pout_db:.2f}', f' Delta_P (dB): {self.delta_p!r}', f' target pch (dBm): {self.target_pch_out_db!r}', f' effective pch (dBm): {self.effective_pch_out_db!r}', f' output VOA (dB): {self.out_voa:.2f}']) def carriers(self, loc, attr): """retrieve carriers information :param loc: (in, out) of the class element :param attr: (ase, nli, signal, total) power information """ if not (loc in ('in', 'out') and attr in ('nli', 'signal', 'total', 'ase')): yield None return loc_attr = 'carriers_'+loc for c in getattr(self, loc_attr) : if attr == 'total': yield c.power.ase+c.power.nli+c.power.signal else: yield c.power._asdict().get(attr, None) def interpol_params(self, frequencies, pin, baud_rates, pref): """interpolate SI channel frequencies with the edfa dgt and gain_ripple frquencies from JSON set the edfa class __init__ None parameters : self.channel_freq, self.nf, self.interpol_dgt and self.interpol_gain_ripple """ # TODO|jla: read amplifier actual frequencies from additional params in json amplifier_freq = itufl(len(self.params.dgt), self.params.f_min, self.params.f_max) # Hz self.channel_freq = frequencies self.interpol_dgt = interp(self.channel_freq, amplifier_freq, self.params.dgt) self.interpol_gain_ripple = interp(self.channel_freq, amplifier_freq, self.params.gain_ripple) self.interpol_nf_ripple =interp(self.channel_freq, amplifier_freq, self.params.nf_ripple) self.nch = frequencies.size self.pin_db = lin2db(sum(pin*1e3)) """in power mode: delta_p is defined and can be used to calculate the power target This power target is used calculate the amplifier gain""" if self.delta_p is not None: self.target_pch_out_db = round(self.delta_p + pref.p_span0, 2) self.effective_gain = self.target_pch_out_db - pref.p_spani """check power saturation and correct effective gain & power accordingly:""" self.effective_gain = min( self.effective_gain, self.params.p_max - (pref.p_spani + pref.neq_ch) ) #print(self.uid, self.effective_gain, self.operational.gain_target) self.effective_pch_out_db = round(pref.p_spani + self.effective_gain, 2) """check power saturation and correct target_gain accordingly:""" #print(self.uid, self.effective_gain, self.pin_db, pref.p_spani) self.nf = self._calc_nf() self.gprofile = self._gain_profile(pin) pout = (pin + self.noise_profile(baud_rates))*db2lin(self.gprofile) self.pout_db = lin2db(sum(pout*1e3)) # ase & nli are only calculated in signal bandwidth # pout_db is not the absolute full output power (negligible if sufficient channels) def _nf(self, type_def, nf_model, nf_fit_coeff, gain_min, gain_flatmax, gain_target): #if hybrid raman, use edfa_gain_flatmax attribute, else use gain_flatmax #gain_flatmax = getattr(params, 'edfa_gain_flatmax', params.gain_flatmax) pad = max(gain_min - gain_target, 0) gain_target += pad dg = max(gain_flatmax - gain_target, 0) if type_def == 'variable_gain': g1a = gain_target - nf_model.delta_p - dg nf_avg = lin2db(db2lin(nf_model.nf1) + db2lin(nf_model.nf2)/db2lin(g1a)) elif type_def == 'fixed_gain': nf_avg = nf_model.nf0 elif type_def == 'openroadm': pin_ch = self.pin_db - lin2db(self.nch) # model OSNR = f(Pin) nf_avg = pin_ch - polyval(nf_model.nf_coef, pin_ch) + 58 elif type_def == 'advanced_model': nf_avg = polyval(nf_fit_coeff, -dg) else: assert False, "Unrecognized amplifier type, this should have been checked by the JSON loader" return nf_avg+pad, pad def _calc_nf(self, avg = False): """nf calculation based on 2 models: self.params.nf_model.enabled from json import: True => 2 stages amp modelling based on precalculated nf1, nf2 and delta_p in build_OA_json False => polynomial fit based on self.params.nf_fit_coeff""" # gain_min > gain_target TBD: if self.params.type_def == 'dual_stage': g1 = self.params.preamp_gain_flatmax g2 = self.effective_gain - g1 nf1_avg, pad = self._nf( self.params.preamp_type_def, self.params.preamp_nf_model, self.params.preamp_nf_fit_coeff, self.params.preamp_gain_min, self.params.preamp_gain_flatmax, g1) #no padding expected for the 1stage because g1 = gain_max nf2_avg, pad = self._nf( self.params.booster_type_def, self.params.booster_nf_model, self.params.booster_nf_fit_coeff, self.params.booster_gain_min, self.params.booster_gain_flatmax, g2) nf_avg = lin2db(db2lin(nf1_avg) + db2lin(nf2_avg-g1)) #no padding expected for the 1stage because g1 = gain_max pad = 0 else: nf_avg, pad = self._nf( self.params.type_def, self.params.nf_model, self.params.nf_fit_coeff, self.params.gain_min, self.params.gain_flatmax, self.effective_gain) self.att_in = pad # not used to attenuate carriers, only used in _repr_ and _str_ if avg: return nf_avg else: return self.interpol_nf_ripple + nf_avg # input VOA = 1 for 1 NF degradation def noise_profile(self, df): """noise_profile(bw) computes amplifier ase (W) in signal bw (Hz) noise is calculated at amplifier input :bw: signal bandwidth = baud rate in Hz :type bw: float :return: the asepower in W in the signal bandwidth bw for 96 channels :return type: numpy array of float ASE POWER USING PER CHANNEL GAIN PROFILE INPUTS: NF_dB - Noise figure in dB, vector of length number of channels or spectral slices G_dB - Actual gain calculated for the EDFA, vector of length number of channels or spectral slices ffs - Center frequency grid of the channels or spectral slices in THz, vector of length number of channels or spectral slices dF - width of each channel or spectral slice in THz, vector of length number of channels or spectral slices OUTPUT: ase_dBm - ase in dBm per channel or spectral slice NOTE: the output is the total ASE in the channel or spectral slice. For 50GHz channels the ASE BW is effectively 0.4nm. To get to noise power in 0.1nm, subtract 6dB. ONSR is usually quoted as channel power divided by the ASE power in 0.1nm RBW, regardless of the width of the actual channel. This is a historical convention from the days when optical signals were much smaller (155Mbps, 2.5Gbps, ... 10Gbps) than the resolution of the OSAs that were used to measure spectral power which were set to 0.1nm resolution for convenience. Moving forward into flexible grid and high baud rate signals, it may be convenient to begin quoting power spectral density in the same BW for both signal and ASE, e.g. 12.5GHz.""" ase = h * df * self.channel_freq * db2lin(self.nf) # W return ase # in W at amplifier input def _gain_profile(self, pin, err_tolerance=1.0e-11, simple_opt=True): """ Pin : input power / channel in W :param gain_ripple: design flat gain :param dgt: design gain tilt :param Pin: total input power in W :param gp: Average gain setpoint in dB units :param gtp: gain tilt setting :type gain_ripple: numpy.ndarray :type dgt: numpy.ndarray :type Pin: numpy.ndarray :type gp: float :type gtp: float :return: gain profile in dBm :rtype: numpy.ndarray AMPLIFICATION USING INPUT PROFILE INPUTS: gain_ripple - vector of length number of channels or spectral slices DGT - vector of length number of channels or spectral slices Pin - input powers vector of length number of channels or spectral slices Gp - provisioned gain length 1 GTp - provisioned tilt length 1 OUTPUT: amp gain per channel or spectral slice NOTE: there is no checking done for violations of the total output power capability of the amp. EDIT OF PREVIOUS NOTE: power violation now added in interpol_params Ported from Matlab version written by David Boerges at Ciena. Based on: R. di Muro, "The Er3+ fiber gain coefficient derived from a dynamic gain tilt technique", Journal of Lightwave Technology, Vol. 18, Iss. 3, Pp. 343-347, 2000. """ # TODO|jla: check what param should be used (currently length(dgt)) nb_channel = arange(len(self.interpol_dgt)) # TODO|jla: find a way to use these or lose them. Primarily we should have # a way to determine if exceeding the gain or output power of the amp tot_in_power_db = self.pin_db # Pin in W # linear fit to get the p = polyfit(nb_channel, self.interpol_dgt, 1) dgt_slope = p[0] # Calculate the target slope - currently assumes equal spaced channels # TODO|jla: support arbitrary channel spacing targ_slope = self.tilt_target / (len(nb_channel) - 1) # first estimate of DGT scaling if abs(dgt_slope) > 0.001: # check for zero value due to flat dgt dgts1 = targ_slope / dgt_slope else: dgts1 = 0 # when simple_opt is true, make 2 attempts to compute gain and # the internal voa value. This is currently here to provide direct # comparison with original Matlab code. Will be removed. # TODO|jla: replace with loop if not simple_opt: return # first estimate of Er gain & VOA loss g1st = array(self.interpol_gain_ripple) + self.params.gain_flatmax \ + array(self.interpol_dgt) * dgts1 voa = lin2db(mean(db2lin(g1st))) - self.effective_gain # second estimate of amp ch gain using the channel input profile g2nd = g1st - voa pout_db = lin2db(sum(pin*1e3*db2lin(g2nd))) dgts2 = self.effective_gain - (pout_db - tot_in_power_db) # center estimate of amp ch gain xcent = dgts2 gcent = g1st - voa + array(self.interpol_dgt) * xcent pout_db = lin2db(sum(pin*1e3*db2lin(gcent))) gavg_cent = pout_db - tot_in_power_db # Lower estimate of amp ch gain deltax = max(g1st) - min(g1st) # if no ripple deltax = 0 and xlow = xcent: div 0 # TODO|jla: add check for flat gain response if abs(deltax) <= 0.05: # not enough ripple to consider calculation return g1st - voa xlow = dgts2 - deltax glow = g1st - voa + array(self.interpol_dgt) * xlow pout_db = lin2db(sum(pin * 1e3 * db2lin(glow))) gavg_low = pout_db - tot_in_power_db # upper gain estimate xhigh = dgts2 + deltax ghigh = g1st - voa + array(self.interpol_dgt) * xhigh pout_db = lin2db(sum(pin * 1e3 * db2lin(ghigh))) gavg_high = pout_db - tot_in_power_db # compute slope slope1 = (gavg_low - gavg_cent) / (xlow - xcent) slope2 = (gavg_cent - gavg_high) / (xcent - xhigh) if abs(self.effective_gain - gavg_cent) <= err_tolerance: dgts3 = xcent elif self.effective_gain < gavg_cent: dgts3 = xcent - (gavg_cent - self.effective_gain) / slope1 else: dgts3 = xcent + (-gavg_cent + self.effective_gain) / slope2 return g1st - voa + array(self.interpol_dgt) * dgts3 def propagate(self, pref, *carriers): """add ASE noise to the propagating carriers of :class:`.info.SpectralInformation`""" pin = array([c.power.signal+c.power.nli+c.power.ase for c in carriers]) # pin in W freq = array([c.frequency for c in carriers]) brate = array([c.baud_rate for c in carriers]) # interpolate the amplifier vectors with the carriers freq, calculate nf & gain profile self.interpol_params(freq, pin, brate, pref) gains = db2lin(self.gprofile) carrier_ases = self.noise_profile(brate) att = db2lin(self.out_voa) for gain, carrier_ase, carrier in zip(gains, carrier_ases, carriers): pwr = carrier.power pwr = pwr._replace(signal=pwr.signal*gain/att, nli=pwr.nli*gain/att, ase=(pwr.ase+carrier_ase)*gain/att) yield carrier._replace(power=pwr) def update_pref(self, pref): return pref._replace(p_span0=pref.p_span0, p_spani=pref.p_spani + self.effective_gain - self.out_voa) def __call__(self, spectral_info): self.carriers_in = spectral_info.carriers carriers = tuple(self.propagate(spectral_info.pref, *spectral_info.carriers)) pref = self.update_pref(spectral_info.pref) self.carriers_out = carriers return spectral_info._replace(carriers=carriers, pref=pref)