mirror of
https://github.com/Telecominfraproject/oopt-gnpy.git
synced 2025-11-01 02:28:05 +00:00
Given that everything else just uses these constants as imported from numpy, there's no point keeping these wrappers around. Change-Id: I0e19e05f40dc79d8005e915cf3ffb5e36328421a
242 lines
6.5 KiB
Python
242 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
'''
|
|
gnpy.core.utils
|
|
===============
|
|
|
|
This module contains utility functions that are used with gnpy.
|
|
'''
|
|
|
|
|
|
import json
|
|
|
|
from csv import writer
|
|
import numpy as np
|
|
from numpy import pi, cos, sqrt, log10
|
|
from scipy import constants
|
|
|
|
|
|
def load_json(filename):
|
|
with open(filename, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return data
|
|
|
|
|
|
def save_json(obj, filename):
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
json.dump(obj, f, indent=2, ensure_ascii=False)
|
|
|
|
def write_csv(obj, filename):
|
|
"""
|
|
Convert dictionary items to a CSV file the dictionary format:
|
|
::
|
|
|
|
{'result category 1':
|
|
[
|
|
# 1st line of results
|
|
{'header 1' : value_xxx,
|
|
'header 2' : value_yyy},
|
|
# 2nd line of results: same headers, different results
|
|
{'header 1' : value_www,
|
|
'header 2' : value_zzz}
|
|
],
|
|
'result_category 2':
|
|
[
|
|
{},{}
|
|
]
|
|
}
|
|
|
|
The generated csv file will be:
|
|
::
|
|
|
|
result_category 1
|
|
header 1 header 2
|
|
value_xxx value_yyy
|
|
value_www value_zzz
|
|
result_category 2
|
|
...
|
|
"""
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
w = writer(f)
|
|
for data_key, data_list in obj.items():
|
|
#main header
|
|
w.writerow([data_key])
|
|
#sub headers:
|
|
headers = [_ for _ in data_list[0].keys()]
|
|
w.writerow(headers)
|
|
for data_dict in data_list:
|
|
w.writerow([_ for _ in data_dict.values()])
|
|
|
|
|
|
def arrange_frequencies(length, start, stop):
|
|
"""Create an array of frequencies
|
|
|
|
:param length: number of elements
|
|
:param start: Start frequency in THz
|
|
:param stop: Stop frequency in THz
|
|
:type length: integer
|
|
:type start: float
|
|
:type stop: float
|
|
:return: an array of frequencies determined by the spacing parameter
|
|
:rtype: numpy.ndarray
|
|
"""
|
|
return np.linspace(start, stop, length)
|
|
|
|
|
|
def lin2db(value):
|
|
"""Convert linear unit to logarithmic (dB)
|
|
|
|
>>> lin2db(0.001)
|
|
-30.0
|
|
>>> round(lin2db(1.0), 2)
|
|
0.0
|
|
>>> round(lin2db(1.26), 2)
|
|
1.0
|
|
>>> round(lin2db(10.0), 2)
|
|
10.0
|
|
>>> round(lin2db(100.0), 2)
|
|
20.0
|
|
"""
|
|
return 10 * log10(value)
|
|
|
|
|
|
def db2lin(value):
|
|
"""Convert logarithimic units to linear
|
|
|
|
>>> round(db2lin(10.0), 2)
|
|
10.0
|
|
>>> round(db2lin(20.0), 2)
|
|
100.0
|
|
>>> round(db2lin(1.0), 2)
|
|
1.26
|
|
>>> round(db2lin(0.0), 2)
|
|
1.0
|
|
>>> round(db2lin(-10.0), 2)
|
|
0.1
|
|
"""
|
|
return 10**(value / 10)
|
|
|
|
def round2float(number, step):
|
|
step = round(step, 1)
|
|
if step >= 0.01:
|
|
number = round(number / step, 0)
|
|
number = round(number * step, 1)
|
|
else:
|
|
number = round(number, 2)
|
|
return number
|
|
|
|
wavelength2freq = constants.lambda2nu
|
|
freq2wavelength = constants.nu2lambda
|
|
|
|
def freq2wavelength(value):
|
|
""" Converts frequency units to wavelength units.
|
|
"""
|
|
return constants.c / value
|
|
|
|
def snr_sum(snr, bw, snr_added, bw_added=12.5e9):
|
|
snr_added = snr_added - lin2db(bw/bw_added)
|
|
snr = -lin2db(db2lin(-snr)+db2lin(-snr_added))
|
|
return snr
|
|
|
|
def deltawl2deltaf(delta_wl, wavelength):
|
|
""" deltawl2deltaf(delta_wl, wavelength):
|
|
delta_wl is BW in wavelength units
|
|
wavelength is the center wl
|
|
units for delta_wl and wavelength must be same
|
|
|
|
:param delta_wl: delta wavelength BW in same units as wavelength
|
|
:param wavelength: wavelength BW is relevant for
|
|
:type delta_wl: float or numpy.ndarray
|
|
:type wavelength: float
|
|
:return: The BW in frequency units
|
|
:rtype: float or ndarray
|
|
|
|
"""
|
|
f = wavelength2freq(wavelength)
|
|
return delta_wl * f / wavelength
|
|
|
|
|
|
def deltaf2deltawl(delta_f, frequency):
|
|
""" deltawl2deltaf(delta_f, frequency):
|
|
converts delta frequency to delta wavelength
|
|
units for delta_wl and wavelength must be same
|
|
|
|
:param delta_f: delta frequency in same units as frequency
|
|
:param frequency: frequency BW is relevant for
|
|
:type delta_f: float or numpy.ndarray
|
|
:type frequency: float
|
|
:return: The BW in wavelength units
|
|
:rtype: float or ndarray
|
|
|
|
"""
|
|
wl = freq2wavelength(frequency)
|
|
return delta_f * wl / frequency
|
|
|
|
|
|
def rrc(ffs, baud_rate, alpha):
|
|
""" rrc(ffs, baud_rate, alpha): computes the root-raised cosine filter
|
|
function.
|
|
|
|
:param ffs: A numpy array of frequencies
|
|
:param baud_rate: The Baud Rate of the System
|
|
:param alpha: The roll-off factor of the filter
|
|
:type ffs: numpy.ndarray
|
|
:type baud_rate: float
|
|
:type alpha: float
|
|
:return: hf a numpy array of the filter shape
|
|
:rtype: numpy.ndarray
|
|
|
|
"""
|
|
Ts = 1 / baud_rate
|
|
l_lim = (1 - alpha) / (2 * Ts)
|
|
r_lim = (1 + alpha) / (2 * Ts)
|
|
hf = np.zeros(np.shape(ffs))
|
|
slope_inds = np.where(
|
|
np.logical_and(np.abs(ffs) > l_lim, np.abs(ffs) < r_lim))
|
|
hf[slope_inds] = 0.5 * (1 + cos((pi * Ts / alpha) *
|
|
(np.abs(ffs[slope_inds]) - l_lim)))
|
|
p_inds = np.where(np.logical_and(np.abs(ffs) > 0, np.abs(ffs) < l_lim))
|
|
hf[p_inds] = 1
|
|
return sqrt(hf)
|
|
|
|
def merge_amplifier_restrictions(dict1, dict2):
|
|
"""Updates contents of dicts recursively
|
|
|
|
>>> d1 = {'params': {'restrictions': {'preamp_variety_list': [], 'booster_variety_list': []}}}
|
|
>>> d2 = {'params': {'target_pch_out_db': -20}}
|
|
>>> merge_amplifier_restrictions(d1, d2)
|
|
{'params': {'restrictions': {'preamp_variety_list': [], 'booster_variety_list': []}, 'target_pch_out_db': -20}}
|
|
|
|
>>> d3 = {'params': {'restrictions': {'preamp_variety_list': ['foo'], 'booster_variety_list': ['bar']}}}
|
|
>>> merge_amplifier_restrictions(d1, d3)
|
|
{'params': {'restrictions': {'preamp_variety_list': [], 'booster_variety_list': []}}}
|
|
"""
|
|
|
|
copy_dict1 = dict1.copy()
|
|
for key in dict2:
|
|
if key in dict1:
|
|
if isinstance(dict1[key], dict):
|
|
copy_dict1[key] = merge_amplifier_restrictions(copy_dict1[key], dict2[key])
|
|
else:
|
|
copy_dict1[key] = dict2[key]
|
|
return copy_dict1
|
|
|
|
def silent_remove(this_list, elem):
|
|
"""Remove matching elements from a list without raising ValueError
|
|
|
|
>>> li = [0, 1]
|
|
>>> li = silent_remove(li, 1)
|
|
>>> li
|
|
[0]
|
|
>>> li = silent_remove(li, 1)
|
|
>>> li
|
|
[0]
|
|
"""
|
|
|
|
try:
|
|
this_list.remove(elem)
|
|
except ValueError:
|
|
pass
|
|
return this_list
|