mirror of
https://github.com/Telecominfraproject/oopt-gnpy.git
synced 2025-10-30 09:42:22 +00:00
Merge branch 'no-convenience-access' into raman
This required some adaptations in the new Raman code now that the property aliases are gone.
This commit is contained in:
@@ -287,7 +287,7 @@ if __name__ == '__main__':
|
||||
ch_snr_nl = ch_power - 10 * log10(final_carrier.power.nli)
|
||||
ch_snr = ch_power - 10 * log10(final_carrier.power.nli + final_carrier.power.ase)
|
||||
if not isnan(ch_snr):
|
||||
print(f'{final_carrier.num_chan} \t\t {round(ch_freq, 2):.2f} \t\t\t {round(ch_snr_nl, 2):.2f} '
|
||||
print(f'{final_carrier.channel_number} \t\t {round(ch_freq, 2):.2f} \t\t\t {round(ch_snr_nl, 2):.2f} '
|
||||
f'\t\t\t\t {round(ch_snr, 2):.2f}')
|
||||
|
||||
if not args.source:
|
||||
|
||||
@@ -155,8 +155,8 @@ class Roadm(Node):
|
||||
#all ingress channels in xpress are set to this power level
|
||||
#but add channels are not, so we define an effective loss
|
||||
#in the case of add channels
|
||||
self.effective_pch_out_db = min(pref.pi, self.params.target_pch_out_db)
|
||||
self.effective_loss = pref.pi - self.effective_pch_out_db
|
||||
self.effective_pch_out_db = min(pref.p_spani, self.params.target_pch_out_db)
|
||||
self.effective_loss = pref.p_spani - self.effective_pch_out_db
|
||||
carriers_power = array([c.power.signal +c.power.nli+c.power.ase for c in carriers])
|
||||
carriers_att = list(map(lambda x : lin2db(x*1e3)-self.params.target_pch_out_db, carriers_power))
|
||||
exceeding_att = -min(list(filter(lambda x: x < 0, carriers_att)), default = 0)
|
||||
@@ -164,17 +164,17 @@ class Roadm(Node):
|
||||
for carrier_att, carrier in zip(carriers_att, carriers) :
|
||||
pwr = carrier.power
|
||||
pwr = pwr._replace( signal = pwr.signal/carrier_att,
|
||||
nonlinear_interference = pwr.nli/carrier_att,
|
||||
amplified_spontaneous_emission = pwr.ase/carrier_att)
|
||||
nli = pwr.nli/carrier_att,
|
||||
ase = pwr.ase/carrier_att)
|
||||
yield carrier._replace(power=pwr)
|
||||
|
||||
def update_pref(self, pref):
|
||||
return pref._replace(p_span0=pref.p0, p_spani=self.effective_pch_out_db)
|
||||
return pref._replace(p_span0=pref.p_span0, p_spani=self.effective_pch_out_db)
|
||||
|
||||
def __call__(self, spectral_info):
|
||||
carriers = tuple(self.propagate(spectral_info.pref, *spectral_info.carriers))
|
||||
pref = self.update_pref(spectral_info.pref)
|
||||
return spectral_info.update(carriers=carriers, pref=pref)
|
||||
return spectral_info._replace(carriers=carriers, pref=pref)
|
||||
|
||||
FusedParams = namedtuple('FusedParams', 'loss')
|
||||
|
||||
@@ -212,17 +212,17 @@ class Fused(Node):
|
||||
for carrier in carriers:
|
||||
pwr = carrier.power
|
||||
pwr = pwr._replace(signal=pwr.signal/attenuation,
|
||||
nonlinear_interference=pwr.nli/attenuation,
|
||||
amplified_spontaneous_emission=pwr.ase/attenuation)
|
||||
nli=pwr.nli/attenuation,
|
||||
ase=pwr.ase/attenuation)
|
||||
yield carrier._replace(power=pwr)
|
||||
|
||||
def update_pref(self, pref):
|
||||
return pref._replace(p_span0=pref.p0, p_spani=pref.pi - self.loss)
|
||||
return pref._replace(p_span0=pref.p_span0, p_spani=pref.p_spani - self.loss)
|
||||
|
||||
def __call__(self, spectral_info):
|
||||
carriers = tuple(self.propagate(*spectral_info.carriers))
|
||||
pref = self.update_pref(spectral_info.pref)
|
||||
return spectral_info.update(carriers=carriers, pref=pref)
|
||||
return spectral_info._replace(carriers=carriers, pref=pref)
|
||||
|
||||
FiberParams = namedtuple('FiberParams', 'type_variety length loss_coef length_units \
|
||||
att_in con_in con_out dispersion gamma')
|
||||
@@ -328,11 +328,6 @@ class Fiber(Node):
|
||||
if not (loc in ('in', 'out') and attr in ('nli', 'signal', 'total', 'ase')):
|
||||
yield None
|
||||
return
|
||||
power_dict = {
|
||||
'nli': 'nonlinear_interference',
|
||||
'ase': 'amplified_spontaneous_emission'
|
||||
}
|
||||
attr = power_dict.get(attr, attr)
|
||||
loc_attr = 'carriers_'+loc
|
||||
for c in getattr(self, loc_attr) :
|
||||
if attr == 'total':
|
||||
@@ -362,11 +357,11 @@ class Fiber(Node):
|
||||
|
||||
def _psi(self, carrier, interfering_carrier):
|
||||
"""Calculates eq. 123 from `arXiv:1209.0394 <https://arxiv.org/abs/1209.0394>`__"""
|
||||
if carrier.num_chan == interfering_carrier.num_chan: # SCI
|
||||
if carrier.channel_number == interfering_carrier.channel_number: # SCI
|
||||
psi = arcsinh(0.5 * pi**2 * self.asymptotic_length
|
||||
* abs(self.beta2()) * carrier.baud_rate**2)
|
||||
else: # XCI
|
||||
delta_f = carrier.freq - interfering_carrier.freq
|
||||
delta_f = carrier.frequency - interfering_carrier.frequency
|
||||
psi = arcsinh(pi**2 * self.asymptotic_length * abs(self.beta2())
|
||||
* carrier.baud_rate * (delta_f + 0.5 * interfering_carrier.baud_rate))
|
||||
psi -= arcsinh(pi**2 * self.asymptotic_length * abs(self.beta2())
|
||||
@@ -403,8 +398,8 @@ class Fiber(Node):
|
||||
for carrier in carriers:
|
||||
pwr = carrier.power
|
||||
pwr = pwr._replace(signal=pwr.signal/attenuation,
|
||||
nonlinear_interference=pwr.nli/attenuation,
|
||||
amplified_spontaneous_emission=pwr.ase/attenuation)
|
||||
nli=pwr.nli/attenuation,
|
||||
ase=pwr.ase/attenuation)
|
||||
carrier = carrier._replace(power=pwr)
|
||||
chan.append(carrier)
|
||||
|
||||
@@ -416,20 +411,20 @@ class Fiber(Node):
|
||||
pwr = carrier.power
|
||||
carrier_nli = self._gn_analytic(carrier, *carriers)
|
||||
pwr = pwr._replace(signal=pwr.signal/self.lin_attenuation/attenuation,
|
||||
nonlinear_interference=(pwr.nli+carrier_nli)/self.lin_attenuation/attenuation,
|
||||
amplified_spontaneous_emission=pwr.ase/self.lin_attenuation/attenuation)
|
||||
nli=(pwr.nli+carrier_nli)/self.lin_attenuation/attenuation,
|
||||
ase=pwr.ase/self.lin_attenuation/attenuation)
|
||||
yield carrier._replace(power=pwr)
|
||||
|
||||
def update_pref(self, pref):
|
||||
self.pch_out_db = round(pref.pi - self.loss, 2)
|
||||
return pref._replace(p_span0=pref.p0, p_spani=self.pch_out_db)
|
||||
self.pch_out_db = round(pref.p_spani - self.loss, 2)
|
||||
return pref._replace(p_span0=pref.p_span0, p_spani=self.pch_out_db)
|
||||
|
||||
def __call__(self, spectral_info):
|
||||
self.carriers_in = spectral_info.carriers
|
||||
carriers = tuple(self.propagate(*spectral_info.carriers))
|
||||
pref = self.update_pref(spectral_info.pref)
|
||||
self.carriers_out = carriers
|
||||
return spectral_info.update(carriers=carriers, pref=pref)
|
||||
return spectral_info._replace(carriers=carriers, pref=pref)
|
||||
|
||||
RamanFiberParams = namedtuple('RamanFiberParams', 'type_variety length loss_coef length_units \
|
||||
att_in con_in con_out dispersion gamma raman_efficiency')
|
||||
@@ -541,11 +536,6 @@ class RamanFiber(Node):
|
||||
if not (loc in ('in', 'out') and attr in ('nli', 'signal', 'total', 'ase')):
|
||||
yield None
|
||||
return
|
||||
power_dict = {
|
||||
'nli': 'nonlinear_interference',
|
||||
'ase': 'amplified_spontaneous_emission'
|
||||
}
|
||||
attr = power_dict.get(attr, attr)
|
||||
loc_attr = 'carriers_'+loc
|
||||
for c in getattr(self, loc_attr) :
|
||||
if attr == 'total':
|
||||
@@ -578,11 +568,11 @@ class RamanFiber(Node):
|
||||
def _psi(self, carrier, interfering_carrier):
|
||||
""" Calculates eq. 123 from arXiv:1209.0394.
|
||||
"""
|
||||
if carrier.num_chan == interfering_carrier.num_chan: # SCI
|
||||
if carrier.channel_number == interfering_carrier.channel_number: # SCI
|
||||
psi = arcsinh(0.5 * pi**2 * self.asymptotic_length
|
||||
* abs(self.beta2()) * carrier.baud_rate**2)
|
||||
else: # XCI
|
||||
delta_f = carrier.freq - interfering_carrier.freq
|
||||
delta_f = carrier.frequency - interfering_carrier.frequency
|
||||
psi = arcsinh(pi**2 * self.asymptotic_length * abs(self.beta2())
|
||||
* carrier.baud_rate * (delta_f + 0.5 * interfering_carrier.baud_rate))
|
||||
psi -= arcsinh(pi**2 * self.asymptotic_length * abs(self.beta2())
|
||||
@@ -613,14 +603,14 @@ class RamanFiber(Node):
|
||||
def update_pref(self, pref, *carriers):
|
||||
pch_out_db = 10*log10(mean([carrier.power.signal for carrier in carriers])) + 30
|
||||
self.pch_out_db = round(pch_out_db, 2)
|
||||
return pref._replace(p_span0=pref.p0, p_spani=self.pch_out_db)
|
||||
return pref._replace(p_span0=pref.p_span0, p_spani=self.pch_out_db)
|
||||
|
||||
def __call__(self, spectral_info):
|
||||
self.carriers_in = spectral_info.carriers
|
||||
carriers = tuple(self.propagate(*spectral_info.carriers))
|
||||
pref = self.update_pref(spectral_info.pref, *carriers)
|
||||
self.carriers_out = carriers
|
||||
return spectral_info.update(carriers=carriers, pref=pref)
|
||||
return spectral_info._replace(carriers=carriers, pref=pref)
|
||||
|
||||
def propagate(self, *carriers):
|
||||
for propagated_carrier in propagate_raman_fiber(self, *carriers):
|
||||
@@ -758,11 +748,6 @@ class Edfa(Node):
|
||||
if not (loc in ('in', 'out') and attr in ('nli', 'signal', 'total', 'ase')):
|
||||
yield None
|
||||
return
|
||||
power_dict = {
|
||||
'nli': 'nonlinear_interference',
|
||||
'ase': 'amplified_spontaneous_emission'
|
||||
}
|
||||
attr = power_dict.get(attr, attr)
|
||||
loc_attr = 'carriers_'+loc
|
||||
for c in getattr(self, loc_attr) :
|
||||
if attr == 'total':
|
||||
@@ -789,19 +774,19 @@ class Edfa(Node):
|
||||
"""in power mode: delta_p is defined and can be used to calculate the power target
|
||||
This power target is used calculate the amplifier gain"""
|
||||
if self.delta_p is not None:
|
||||
self.target_pch_out_db = round(self.delta_p + pref.p0, 2)
|
||||
self.effective_gain = self.target_pch_out_db - pref.pi
|
||||
self.target_pch_out_db = round(self.delta_p + pref.p_span0, 2)
|
||||
self.effective_gain = self.target_pch_out_db - pref.p_spani
|
||||
|
||||
"""check power saturation and correct effective gain & power accordingly:"""
|
||||
self.effective_gain = min(
|
||||
self.effective_gain,
|
||||
self.params.p_max - (pref.pi + pref.neq_ch)
|
||||
self.params.p_max - (pref.p_spani + pref.neq_ch)
|
||||
)
|
||||
#print(self.uid, self.effective_gain, self.operational.gain_target)
|
||||
self.effective_pch_out_db = round(pref.pi + self.effective_gain, 2)
|
||||
self.effective_pch_out_db = round(pref.p_spani + self.effective_gain, 2)
|
||||
|
||||
"""check power saturation and correct target_gain accordingly:"""
|
||||
#print(self.uid, self.effective_gain, self.pin_db, pref.pi)
|
||||
#print(self.uid, self.effective_gain, self.pin_db, pref.p_spani)
|
||||
self.nf = self._calc_nf()
|
||||
self.gprofile = self._gain_profile(pin)
|
||||
|
||||
@@ -1039,17 +1024,17 @@ class Edfa(Node):
|
||||
for gain, carrier_ase, carrier in zip(gains, carrier_ases, carriers):
|
||||
pwr = carrier.power
|
||||
pwr = pwr._replace(signal=pwr.signal*gain/att,
|
||||
nonlinear_interference=pwr.nli*gain/att,
|
||||
amplified_spontaneous_emission=(pwr.ase+carrier_ase)*gain/att)
|
||||
nli=pwr.nli*gain/att,
|
||||
ase=(pwr.ase+carrier_ase)*gain/att)
|
||||
yield carrier._replace(power=pwr)
|
||||
|
||||
def update_pref(self, pref):
|
||||
return pref._replace(p_span0=pref.p0,
|
||||
p_spani=pref.pi + self.effective_gain - self.out_voa)
|
||||
return pref._replace(p_span0=pref.p_span0,
|
||||
p_spani=pref.p_spani + self.effective_gain - self.out_voa)
|
||||
|
||||
def __call__(self, spectral_info):
|
||||
self.carriers_in = spectral_info.carriers
|
||||
carriers = tuple(self.propagate(spectral_info.pref, *spectral_info.carriers))
|
||||
pref = self.update_pref(spectral_info.pref)
|
||||
self.carriers_out = carriers
|
||||
return spectral_info.update(carriers=carriers, pref=pref)
|
||||
return spectral_info._replace(carriers=carriers, pref=pref)
|
||||
|
||||
@@ -16,50 +16,26 @@ from json import loads
|
||||
from gnpy.core.utils import load_json
|
||||
from gnpy.core.equipment import automatic_nch, automatic_spacing
|
||||
|
||||
class ConvenienceAccess:
|
||||
|
||||
def __init_subclass__(cls):
|
||||
for abbrev, field in getattr(cls, '_ABBREVS', {}).items():
|
||||
setattr(cls, abbrev, property(lambda self, f=field: getattr(self, f)))
|
||||
|
||||
def update(self, **kwargs):
|
||||
for abbrev, field in getattr(self, '_ABBREVS', {}).items():
|
||||
if abbrev in kwargs:
|
||||
kwargs[field] = kwargs.pop(abbrev)
|
||||
return self._replace(**kwargs)
|
||||
|
||||
|
||||
class Power(namedtuple('Power', 'signal nonlinear_interference amplified_spontaneous_emission'), ConvenienceAccess):
|
||||
class Power(namedtuple('Power', 'signal nli ase')):
|
||||
"""carriers power in W"""
|
||||
_ABBREVS = {'nli': 'nonlinear_interference',
|
||||
'ase': 'amplified_spontaneous_emission',}
|
||||
|
||||
|
||||
class Channel(namedtuple('Channel', 'channel_number frequency baud_rate roll_off power'), ConvenienceAccess):
|
||||
class Channel(namedtuple('Channel', 'channel_number frequency baud_rate roll_off power')):
|
||||
pass
|
||||
|
||||
_ABBREVS = {'channel': 'channel_number',
|
||||
'num_chan': 'channel_number',
|
||||
'ffs': 'frequency',
|
||||
'freq': 'frequency',}
|
||||
|
||||
class Pref(namedtuple('Pref', 'p_span0, p_spani, neq_ch '), ConvenienceAccess):
|
||||
class Pref(namedtuple('Pref', 'p_span0, p_spani, neq_ch ')):
|
||||
"""noiseless reference power in dBm:
|
||||
p0: inital target carrier power
|
||||
pi: carrier power after element i
|
||||
neqch: equivalent channel count in dB"""
|
||||
p_span0: inital target carrier power
|
||||
p_spani: carrier power after element i
|
||||
neq_ch: equivalent channel count in dB"""
|
||||
|
||||
_ABBREVS = {'p0' : 'p_span0',
|
||||
'pi' : 'p_spani'}
|
||||
|
||||
class SpectralInformation(namedtuple('SpectralInformation', 'pref carriers'), ConvenienceAccess):
|
||||
class SpectralInformation(namedtuple('SpectralInformation', 'pref carriers')):
|
||||
|
||||
def __new__(cls, pref, carriers):
|
||||
return super().__new__(cls, pref, carriers)
|
||||
|
||||
def merge_input_spectral_information(*si):
|
||||
"""mix channel combs of different baud rates and power"""
|
||||
#TODO
|
||||
pass
|
||||
|
||||
def create_input_spectral_information(f_min, f_max, roll_off, baud_rate, power, spacing):
|
||||
# pref in dB : convert power lin into power in dB
|
||||
@@ -86,11 +62,11 @@ if __name__ == '__main__':
|
||||
si = SpectralInformation()
|
||||
spacing = 0.05 # THz
|
||||
|
||||
si = si.update(carriers=tuple(Channel(f+1, 191.3+spacing*(f+1), 32e9, 0.15, Power(1e-3, f, 1)) for f in range(96)))
|
||||
si = si._replace(carriers=tuple(Channel(f+1, 191.3+spacing*(f+1), 32e9, 0.15, Power(1e-3, f, 1)) for f in range(96)))
|
||||
|
||||
print(f'si = {si}')
|
||||
print(f'si = {si.carriers[0].power.nli}')
|
||||
print(f'si = {si.carriers[20].power.nli}')
|
||||
si2 = si.update(carriers=tuple(c.update(power = c.power.update(nli = c.power.nli * 1e5))
|
||||
si2 = si._replace(carriers=tuple(c._replace(power = c.power._replace(nli = c.power.nli * 1e5))
|
||||
for c in si.carriers))
|
||||
print(f'si2 = {si2}')
|
||||
|
||||
@@ -177,8 +177,8 @@ def propagate_raman_fiber(fiber, *carriers):
|
||||
for carrier in carriers:
|
||||
pwr = carrier.power
|
||||
pwr = pwr._replace(signal=pwr.signal / attenuation_in,
|
||||
nonlinear_interference=pwr.nli / attenuation_in,
|
||||
amplified_spontaneous_emission=pwr.ase / attenuation_in)
|
||||
nli=pwr.nli / attenuation_in,
|
||||
ase=pwr.ase / attenuation_in)
|
||||
carrier = carrier._replace(power=pwr)
|
||||
chan.append(carrier)
|
||||
carriers = tuple(f for f in chan)
|
||||
@@ -219,8 +219,8 @@ def propagate_raman_fiber(fiber, *carriers):
|
||||
else:
|
||||
carrier_nli = np.nan
|
||||
pwr = pwr._replace(signal=pwr.signal/attenuation/attenuation_out,
|
||||
nonlinear_interference=(pwr.nli+carrier_nli)/attenuation/attenuation_out,
|
||||
amplified_spontaneous_emission=((pwr.ase/attenuation)+rmn_ase)/attenuation_out)
|
||||
nli=(pwr.nli+carrier_nli)/attenuation/attenuation_out,
|
||||
ase=((pwr.ase/attenuation)+rmn_ase)/attenuation_out)
|
||||
new_carriers.append(carrier._replace(power=pwr))
|
||||
return new_carriers
|
||||
|
||||
|
||||
Reference in New Issue
Block a user