mirror of
https://github.com/Telecominfraproject/oopt-gnpy.git
synced 2025-10-30 17:47:50 +00:00
184 lines
4.7 KiB
Python
184 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
'''
|
|
gnpy.core.utils
|
|
===============
|
|
|
|
This module contains utility functions that are used with gnpy.
|
|
'''
|
|
|
|
|
|
import json
|
|
|
|
import numpy as np
|
|
from csv import writer
|
|
from numpy import pi, cos, sqrt, log10
|
|
from scipy import constants
|
|
|
|
|
|
def load_json(filename):
|
|
with open(filename, 'r') as f:
|
|
data = json.load(f)
|
|
return data
|
|
|
|
|
|
def save_json(obj, filename):
|
|
with open(filename, 'w') as f:
|
|
json.dump(obj, f, indent=2)
|
|
|
|
def write_csv(obj, filename):
|
|
"""
|
|
convert dictionary items to a csv file
|
|
the dictionary format :
|
|
|
|
{'result category 1':
|
|
[
|
|
# 1st line of results
|
|
{'header 1' : value_xxx,
|
|
'header 2' : value_yyy},
|
|
# 2nd line of results: same headers, different results
|
|
{'header 1' : value_www,
|
|
'header 2' : value_zzz}
|
|
],
|
|
'result_category 2':
|
|
[
|
|
{},{}
|
|
]
|
|
}
|
|
|
|
the generated csv file will be:
|
|
result_category 1
|
|
header 1 header 2
|
|
value_xxx value_yyy
|
|
value_www value_zzz
|
|
result_category 2
|
|
...
|
|
"""
|
|
with open(filename, 'w') as f:
|
|
w = writer(f)
|
|
for data_key, data_list in obj.items():
|
|
#main header
|
|
w.writerow([data_key])
|
|
#sub headers:
|
|
headers = [_ for _ in data_list[0].keys()]
|
|
w.writerow(headers)
|
|
for data_dict in data_list:
|
|
w.writerow([_ for _ in data_dict.values()])
|
|
|
|
def c():
|
|
"""
|
|
Returns the speed of light in meters per second
|
|
"""
|
|
return constants.c
|
|
|
|
|
|
def itufs(spacing, startf=191.35, stopf=196.10):
|
|
"""Creates an array of frequencies whose default range is
|
|
191.35-196.10 THz
|
|
|
|
:param spacing: Frequency spacing in THz
|
|
:param starf: Start frequency in THz
|
|
:param stopf: Stop frequency in THz
|
|
:type spacing: float
|
|
:type startf: float
|
|
:type stopf: float
|
|
:return an array of frequnecies determined by the spacing parameter
|
|
:rtype: numpy.ndarray
|
|
"""
|
|
return np.arange(startf, stopf + spacing / 2, spacing)
|
|
|
|
|
|
def h():
|
|
"""
|
|
Returns plank's constant in J*s
|
|
"""
|
|
return constants.h
|
|
|
|
|
|
def lin2db(value):
|
|
return 10 * log10(value)
|
|
|
|
|
|
def db2lin(value):
|
|
return 10**(value / 10)
|
|
|
|
def round2float(number, step):
|
|
step = round(step, 1)
|
|
if step >= 0.01:
|
|
number = round(number / step, 0)
|
|
number = round(number * step, 1)
|
|
else:
|
|
number = round(number, 2)
|
|
return number
|
|
|
|
wavelength2freq = constants.lambda2nu
|
|
freq2wavelength = constants.nu2lambda
|
|
|
|
def freq2wavelength(value):
|
|
""" Converts frequency units to wavelength units.
|
|
"""
|
|
return c() / value
|
|
|
|
|
|
def deltawl2deltaf(delta_wl, wavelength):
|
|
""" deltawl2deltaf(delta_wl, wavelength):
|
|
delta_wl is BW in wavelength units
|
|
wavelength is the center wl
|
|
units for delta_wl and wavelength must be same
|
|
|
|
:param delta_wl: delta wavelength BW in same units as wavelength
|
|
:param wavelength: wavelength BW is relevant for
|
|
:type delta_wl: float or numpy.ndarray
|
|
:type wavelength: float
|
|
:return: The BW in frequency units
|
|
:rtype: float or ndarray
|
|
|
|
"""
|
|
f = wavelength2freq(wavelength)
|
|
return delta_wl * f / wavelength
|
|
|
|
|
|
def deltaf2deltawl(delta_f, frequency):
|
|
""" deltawl2deltaf(delta_f, frequency):
|
|
converts delta frequency to delta wavelength
|
|
units for delta_wl and wavelength must be same
|
|
|
|
:param delta_f: delta frequency in same units as frequency
|
|
:param frequency: frequency BW is relevant for
|
|
:type delta_f: float or numpy.ndarray
|
|
:type frequency: float
|
|
:return: The BW in wavelength units
|
|
:rtype: float or ndarray
|
|
|
|
"""
|
|
wl = freq2wavelength(frequency)
|
|
return delta_f * wl / frequency
|
|
|
|
|
|
def rrc(ffs, baud_rate, alpha):
|
|
""" rrc(ffs, baud_rate, alpha): computes the root-raised cosine filter
|
|
function.
|
|
|
|
:param ffs: A numpy array of frequencies
|
|
:param baud_rate: The Baud Rate of the System
|
|
:param alpha: The roll-off factor of the filter
|
|
:type ffs: numpy.ndarray
|
|
:type baud_rate: float
|
|
:type alpha: float
|
|
:return: hf a numpy array of the filter shape
|
|
:rtype: numpy.ndarray
|
|
|
|
"""
|
|
Ts = 1 / baud_rate
|
|
l_lim = (1 - alpha) / (2 * Ts)
|
|
r_lim = (1 + alpha) / (2 * Ts)
|
|
hf = np.zeros(np.shape(ffs))
|
|
slope_inds = np.where(
|
|
np.logical_and(np.abs(ffs) > l_lim, np.abs(ffs) < r_lim))
|
|
hf[slope_inds] = 0.5 * (1 + cos((pi * Ts / alpha) *
|
|
(np.abs(ffs[slope_inds]) - l_lim)))
|
|
p_inds = np.where(np.logical_and(np.abs(ffs) > 0, np.abs(ffs) < l_lim))
|
|
hf[p_inds] = 1
|
|
return sqrt(hf)
|