Merge pull request #281 from jktjkt/no-convenience-access

Remove property aliases
This commit is contained in:
Jan Kundrát
2019-08-06 09:46:36 +00:00
committed by GitHub
2 changed files with 39 additions and 73 deletions

View File

@@ -154,8 +154,8 @@ class Roadm(Node):
#all ingress channels in xpress are set to this power level
#but add channels are not, so we define an effective loss
#in the case of add channels
self.effective_pch_out_db = min(pref.pi, self.params.target_pch_out_db)
self.effective_loss = pref.pi - self.effective_pch_out_db
self.effective_pch_out_db = min(pref.p_spani, self.params.target_pch_out_db)
self.effective_loss = pref.p_spani - self.effective_pch_out_db
carriers_power = array([c.power.signal +c.power.nli+c.power.ase for c in carriers])
carriers_att = list(map(lambda x : lin2db(x*1e3)-self.params.target_pch_out_db, carriers_power))
exceeding_att = -min(list(filter(lambda x: x < 0, carriers_att)), default = 0)
@@ -163,17 +163,17 @@ class Roadm(Node):
for carrier_att, carrier in zip(carriers_att, carriers) :
pwr = carrier.power
pwr = pwr._replace( signal = pwr.signal/carrier_att,
nonlinear_interference = pwr.nli/carrier_att,
amplified_spontaneous_emission = pwr.ase/carrier_att)
nli = pwr.nli/carrier_att,
ase = pwr.ase/carrier_att)
yield carrier._replace(power=pwr)
def update_pref(self, pref):
return pref._replace(p_span0=pref.p0, p_spani=self.effective_pch_out_db)
return pref._replace(p_span0=pref.p_span0, p_spani=self.effective_pch_out_db)
def __call__(self, spectral_info):
carriers = tuple(self.propagate(spectral_info.pref, *spectral_info.carriers))
pref = self.update_pref(spectral_info.pref)
return spectral_info.update(carriers=carriers, pref=pref)
return spectral_info._replace(carriers=carriers, pref=pref)
FusedParams = namedtuple('FusedParams', 'loss')
@@ -211,17 +211,17 @@ class Fused(Node):
for carrier in carriers:
pwr = carrier.power
pwr = pwr._replace(signal=pwr.signal/attenuation,
nonlinear_interference=pwr.nli/attenuation,
amplified_spontaneous_emission=pwr.ase/attenuation)
nli=pwr.nli/attenuation,
ase=pwr.ase/attenuation)
yield carrier._replace(power=pwr)
def update_pref(self, pref):
return pref._replace(p_span0=pref.p0, p_spani=pref.pi - self.loss)
return pref._replace(p_span0=pref.p_span0, p_spani=pref.p_spani - self.loss)
def __call__(self, spectral_info):
carriers = tuple(self.propagate(*spectral_info.carriers))
pref = self.update_pref(spectral_info.pref)
return spectral_info.update(carriers=carriers, pref=pref)
return spectral_info._replace(carriers=carriers, pref=pref)
FiberParams = namedtuple('FiberParams', 'type_variety length loss_coef length_units \
att_in con_in con_out dispersion gamma')
@@ -327,11 +327,6 @@ class Fiber(Node):
if not (loc in ('in', 'out') and attr in ('nli', 'signal', 'total', 'ase')):
yield None
return
power_dict = {
'nli': 'nonlinear_interference',
'ase': 'amplified_spontaneous_emission'
}
attr = power_dict.get(attr, attr)
loc_attr = 'carriers_'+loc
for c in getattr(self, loc_attr) :
if attr == 'total':
@@ -361,11 +356,11 @@ class Fiber(Node):
def _psi(self, carrier, interfering_carrier):
"""Calculates eq. 123 from `arXiv:1209.0394 <https://arxiv.org/abs/1209.0394>`__"""
if carrier.num_chan == interfering_carrier.num_chan: # SCI
if carrier.channel_number == interfering_carrier.channel_number: # SCI
psi = arcsinh(0.5 * pi**2 * self.asymptotic_length
* abs(self.beta2()) * carrier.baud_rate**2)
else: # XCI
delta_f = carrier.freq - interfering_carrier.freq
delta_f = carrier.frequency - interfering_carrier.frequency
psi = arcsinh(pi**2 * self.asymptotic_length * abs(self.beta2())
* carrier.baud_rate * (delta_f + 0.5 * interfering_carrier.baud_rate))
psi -= arcsinh(pi**2 * self.asymptotic_length * abs(self.beta2())
@@ -403,8 +398,8 @@ class Fiber(Node):
for carrier in carriers:
pwr = carrier.power
pwr = pwr._replace(signal=pwr.signal/attenuation,
nonlinear_interference=pwr.nli/attenuation,
amplified_spontaneous_emission=pwr.ase/attenuation)
nli=pwr.nli/attenuation,
ase=pwr.ase/attenuation)
carrier = carrier._replace(power=pwr)
chan.append(carrier)
@@ -416,20 +411,20 @@ class Fiber(Node):
pwr = carrier.power
carrier_nli = self._gn_analytic(carrier, *carriers)
pwr = pwr._replace(signal=pwr.signal/self.lin_attenuation/attenuation,
nonlinear_interference=(pwr.nli+carrier_nli)/self.lin_attenuation/attenuation,
amplified_spontaneous_emission=pwr.ase/self.lin_attenuation/attenuation)
nli=(pwr.nli+carrier_nli)/self.lin_attenuation/attenuation,
ase=pwr.ase/self.lin_attenuation/attenuation)
yield carrier._replace(power=pwr)
def update_pref(self, pref):
self.pch_out_db = round(pref.pi - self.loss, 2)
return pref._replace(p_span0=pref.p0, p_spani=self.pch_out_db)
self.pch_out_db = round(pref.p_spani - self.loss, 2)
return pref._replace(p_span0=pref.p_span0, p_spani=self.pch_out_db)
def __call__(self, spectral_info):
self.carriers_in = spectral_info.carriers
carriers = tuple(self.propagate(*spectral_info.carriers))
pref = self.update_pref(spectral_info.pref)
self.carriers_out = carriers
return spectral_info.update(carriers=carriers, pref=pref)
return spectral_info._replace(carriers=carriers, pref=pref)
class EdfaParams:
def __init__(self, **params):
@@ -563,11 +558,6 @@ class Edfa(Node):
if not (loc in ('in', 'out') and attr in ('nli', 'signal', 'total', 'ase')):
yield None
return
power_dict = {
'nli': 'nonlinear_interference',
'ase': 'amplified_spontaneous_emission'
}
attr = power_dict.get(attr, attr)
loc_attr = 'carriers_'+loc
for c in getattr(self, loc_attr) :
if attr == 'total':
@@ -594,19 +584,19 @@ class Edfa(Node):
"""in power mode: delta_p is defined and can be used to calculate the power target
This power target is used calculate the amplifier gain"""
if self.delta_p is not None:
self.target_pch_out_db = round(self.delta_p + pref.p0, 2)
self.effective_gain = self.target_pch_out_db - pref.pi
self.target_pch_out_db = round(self.delta_p + pref.p_span0, 2)
self.effective_gain = self.target_pch_out_db - pref.p_spani
"""check power saturation and correct effective gain & power accordingly:"""
self.effective_gain = min(
self.effective_gain,
self.params.p_max - (pref.pi + pref.neq_ch)
self.params.p_max - (pref.p_spani + pref.neq_ch)
)
#print(self.uid, self.effective_gain, self.operational.gain_target)
self.effective_pch_out_db = round(pref.pi + self.effective_gain, 2)
self.effective_pch_out_db = round(pref.p_spani + self.effective_gain, 2)
"""check power saturation and correct target_gain accordingly:"""
#print(self.uid, self.effective_gain, self.pin_db, pref.pi)
#print(self.uid, self.effective_gain, self.pin_db, pref.p_spani)
self.nf = self._calc_nf()
self.gprofile = self._gain_profile(pin)
@@ -844,17 +834,17 @@ class Edfa(Node):
for gain, carrier_ase, carrier in zip(gains, carrier_ases, carriers):
pwr = carrier.power
pwr = pwr._replace(signal=pwr.signal*gain/att,
nonlinear_interference=pwr.nli*gain/att,
amplified_spontaneous_emission=(pwr.ase+carrier_ase)*gain/att)
nli=pwr.nli*gain/att,
ase=(pwr.ase+carrier_ase)*gain/att)
yield carrier._replace(power=pwr)
def update_pref(self, pref):
return pref._replace(p_span0=pref.p0,
p_spani=pref.pi + self.effective_gain - self.out_voa)
return pref._replace(p_span0=pref.p_span0,
p_spani=pref.p_spani + self.effective_gain - self.out_voa)
def __call__(self, spectral_info):
self.carriers_in = spectral_info.carriers
carriers = tuple(self.propagate(spectral_info.pref, *spectral_info.carriers))
pref = self.update_pref(spectral_info.pref)
self.carriers_out = carriers
return spectral_info.update(carriers=carriers, pref=pref)
return spectral_info._replace(carriers=carriers, pref=pref)

View File

@@ -16,50 +16,26 @@ from json import loads
from gnpy.core.utils import load_json
from gnpy.core.equipment import automatic_nch, automatic_spacing
class ConvenienceAccess:
def __init_subclass__(cls):
for abbrev, field in getattr(cls, '_ABBREVS', {}).items():
setattr(cls, abbrev, property(lambda self, f=field: getattr(self, f)))
def update(self, **kwargs):
for abbrev, field in getattr(self, '_ABBREVS', {}).items():
if abbrev in kwargs:
kwargs[field] = kwargs.pop(abbrev)
return self._replace(**kwargs)
class Power(namedtuple('Power', 'signal nonlinear_interference amplified_spontaneous_emission'), ConvenienceAccess):
class Power(namedtuple('Power', 'signal nli ase')):
"""carriers power in W"""
_ABBREVS = {'nli': 'nonlinear_interference',
'ase': 'amplified_spontaneous_emission',}
class Channel(namedtuple('Channel', 'channel_number frequency baud_rate roll_off power'), ConvenienceAccess):
class Channel(namedtuple('Channel', 'channel_number frequency baud_rate roll_off power')):
pass
_ABBREVS = {'channel': 'channel_number',
'num_chan': 'channel_number',
'ffs': 'frequency',
'freq': 'frequency',}
class Pref(namedtuple('Pref', 'p_span0, p_spani, neq_ch '), ConvenienceAccess):
class Pref(namedtuple('Pref', 'p_span0, p_spani, neq_ch ')):
"""noiseless reference power in dBm:
p0: inital target carrier power
pi: carrier power after element i
neqch: equivalent channel count in dB"""
p_span0: inital target carrier power
p_spani: carrier power after element i
neq_ch: equivalent channel count in dB"""
_ABBREVS = {'p0' : 'p_span0',
'pi' : 'p_spani'}
class SpectralInformation(namedtuple('SpectralInformation', 'pref carriers'), ConvenienceAccess):
class SpectralInformation(namedtuple('SpectralInformation', 'pref carriers')):
def __new__(cls, pref, carriers):
return super().__new__(cls, pref, carriers)
def merge_input_spectral_information(*si):
"""mix channel combs of different baud rates and power"""
#TODO
pass
def create_input_spectral_information(f_min, f_max, roll_off, baud_rate, power, spacing):
# pref in dB : convert power lin into power in dB
@@ -86,11 +62,11 @@ if __name__ == '__main__':
si = SpectralInformation()
spacing = 0.05 # THz
si = si.update(carriers=tuple(Channel(f+1, 191.3+spacing*(f+1), 32e9, 0.15, Power(1e-3, f, 1)) for f in range(96)))
si = si._replace(carriers=tuple(Channel(f+1, 191.3+spacing*(f+1), 32e9, 0.15, Power(1e-3, f, 1)) for f in range(96)))
print(f'si = {si}')
print(f'si = {si.carriers[0].power.nli}')
print(f'si = {si.carriers[20].power.nli}')
si2 = si.update(carriers=tuple(c.update(power = c.power.update(nli = c.power.nli * 1e5))
si2 = si._replace(carriers=tuple(c._replace(power = c.power._replace(nli = c.power.nli * 1e5))
for c in si.carriers))
print(f'si2 = {si2}')