Files
oopt-gnpy/gnpy/tools/yang_convert_utils.py
EstherLerouzic 1a795639c7 feat: Add conversion utilities for YANG and legacy formats in GNPy
This commit introduces new functions for converting between YANG formatted files and
legacy formats. The conversion processes adhere to RFC7951 for encoding YANG data.

Key changes include:
- Conversion of float and empty type representations.
- Transformation of Span and SI lists xx_power_range into dictionaries.
- Addition of necessary namespaces.
- use of oopt-gnpy-libyang to enforce compliancy to yang models

These utilities enable full compatibility with GNPy.

Co-authored-by: Renato Ambrosone <renato.ambrosone@polito.it>

Signed-off-by: EstherLerouzic <esther.lerouzic@orange.com>
Change-Id: Ia004113bca2b0631d1648564e5ccb60504fe80f8
2025-09-03 10:34:14 +02:00

961 lines
37 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: BSD-3-Clause
# Utils for yang <-> legacy format conversion
# Copyright (C) 2025 Telecom Infra Project and GNPy contributors
# see AUTHORS.rst for a list of contributors
"""
Utils for yang <-> legacy format conversion
===========================================
Format conversion utils.
"""
from pathlib import Path
from copy import deepcopy
from typing import Dict, Union, List, Any, NamedTuple
import json
import os
import oopt_gnpy_libyang as ly
from gnpy.yang.precision_dict import PRECISION_DICT
ELEMENTS_KEY = 'elements'
ROADM_KEY = 'Roadm'
PARAMS_KEY = 'params'
METADATA_KEY = 'metadata'
LOCATION_KEY = 'location'
DEGREE_KEY = 'degree_uid'
PATH_REQUEST_KEY = 'path-request'
RESPONSE_KEY = 'response'
SPECTRUM_KEY = 'spectrum'
LOSS_COEF_KEY = 'loss_coef'
LOSS_COEF_KEY_PER_FREQ = 'loss_coef_per_frequency'
RAMAN_COEF_KEY = 'raman_coefficient'
RAMAN_EFFICIENCY_KEY = 'raman_efficiency'
EQPT_TYPES = ['Edfa', 'Transceiver', 'Fiber', 'Roadm']
EDFA_CONFIG_KEYS = ['nf_fit_coeff', 'nf_ripple', 'gain_ripple', 'dgt']
SIM_PARAMS_KEYS = ['raman_params', 'nli_params']
TOPO_NMSP = 'gnpy-network-topology:topology'
EQPT_NMSP = 'gnpy-eqpt-config:equipment'
SERV_NMSP = 'gnpy-path-computation:services'
RESP_NMSP = 'gnpy-path-computation:responses'
EDFA_CONFIG_NMSP = 'gnpy-edfa-config:edfa-config'
SIM_PARAMS_NMSP = 'gnpy-sim-params:sim-params'
SPECTRUM_NMSP = 'gnpy-spectrum:spectrum'
class PrettyFloat(float):
""""A float subclass for formatting according to specific fraction digit requirements.
>>> PrettyFloat(3.1245)
3.12
>>> PrettyFloat(100.65, 5)
100.65
>>> PrettyFloat(2.1e-5, 8)
0.000021
>>> PrettyFloat(10, 3)
10.0
>>> PrettyFloat(-0.3110761646066259, 18)
-0.3110761646066259
"""
def __new__(cls, value: float, fraction_digit: int = 2):
"""Create a new instance of PrettyFloat"""
instance = super().__new__(cls, value)
instance.fraction_digit = fraction_digit
instance.value = value
return instance
def __repr__(self) -> str:
"""Return the string representation of the float formatted to the specified fraction digits. It removes
scientific notation ("e-x").
"""
# When fraction digit is over 16, the usual formatting does not works properly because of floating point issues.
# For example -0.3110761646066259 is represented as "-0.311076164606625905". The following function makes
# sure that the unwanted floating point issue does not change the value. Maximum fraction digit in YANG is 18.
if self.fraction_digit in range(0, 19):
temp = str(self.value)
if 'e' in temp or '.' not in temp or self.fraction_digit < 17:
formatted_value = f'{self:.{self.fraction_digit}f}' # noqa E231
if '.' in formatted_value:
formatted_value = formatted_value.rstrip('0')
if formatted_value.endswith('.'):
formatted_value += '0'
return formatted_value
if '.' in temp:
parts = temp.split('.')
formatted_value = parts[0] + '.' + parts[1][0:min(self.fraction_digit, len(parts[1]))]
formatted_value = formatted_value.rstrip('0')
if formatted_value.endswith('.'):
formatted_value += '0'
return formatted_value
return temp
raise ValueError(f'Fraction digit {self.fraction_digit} not handled')
def gnpy_precision_dict() -> Dict[str, int]:
"""Return a dictionary of fraction-digit definitions for GNPy.
Precision correspond to fraction digit number if it is a decimal64 yang type, or 0 if it is an
(u)int < 64 or -1 if it is a string or an (u)int64 type.
:return: Dictionnary mapping key names with digit numbers for values.
:rtype: Dict[str, int]
"""
return PRECISION_DICT
def convert_dict(data: Dict, fraction_digit: int = 2, precision: Union[Dict[str, int], None] = None) \
-> Union[Dict, List, float, int, str, None]:
"""Recursive conversion from float to str, conformed to RFC7951
does not work for int64 (will not returm str as stated in standard)
If nothing is stated precision is using gnpy_precision_dict.
:param data: the input dictionary to convert.
:type data: data: Dict
:param fraction_digit: the number of decimal places to format.
:type fraction_digit: int
:param precision: A dictionary defining precision for specific keys.
:type precision: Union[Dict[str, int], None]
:return: A new dictionary with converted values.
:rtype: Dict
>>> convert_dict({"y": "amp", "t": "vn", "g": 25, "gamma": 0.0016, "p": 21.5, "o": True, \
"output-power": 14.12457896})
{'y': 'amp', 't': 'vn', 'g': '25.0', 'gamma': '0.0016', 'p': '21.5', 'o': True, 'output-power': '14.12457896'}
"""
if not precision:
precision = gnpy_precision_dict()
if isinstance(data, dict):
for k, v in data.items():
fraction_digit = precision.get(k, 2)
data[k] = convert_dict(v, fraction_digit, precision=precision)
elif isinstance(data, list):
temp = deepcopy(data)
for i, el in enumerate(temp):
if isinstance(el, float):
data[i] = PrettyFloat(el, fraction_digit)
data[i] = str(data[i])
else:
data[i] = convert_dict(el, fraction_digit=fraction_digit, precision=precision)
elif isinstance(data, bool):
return data
elif isinstance(data, int):
data = PrettyFloat(data)
data.fraction_digit = fraction_digit
if fraction_digit > 0:
return str(data)
if fraction_digit < 0:
return data
return int(data)
elif isinstance(data, float):
data = PrettyFloat(data)
data.fraction_digit = fraction_digit
return str(data)
return data
def convert_back(data: Dict, fraction_digit: Union[int, None] = None, precision: Union[Dict[str, int], None] = None) \
-> Union[Dict, List, float, int, str, None]:
"""Recursively convert strings back to their original types int, float according to RFC7951.
:param data: the input dictionary to convert.
:type data: Dict
:param fraction_digit: the number of decimal places to format.
:type fraction_digit: Union[int, None]
:param precision: A dictionary defining precision for specific keys.
:type precision: Union[Dict[str, int], None]
:return: A new dictionary with converted values.
:rtype: Dict
>>> a = {'y': 'amp', 't': 'vn', 'N': '25', 'gamma': '0.0000000000000016', 'p': '21.50', 'o': True, \
'output-power': '14.12458'}
>>> convert_back({'a': a, 'delta_power_range_db': ['12.3', '10.6', True]})
{'a': {'y': 'amp', 't': 'vn', 'N': 25, 'gamma': 1.6e-15, 'p': '21.50', 'o': True, 'output-power': 14.12458}, \
'delta_power_range_db': ['12.3', '10.6', True]}
"""
if not precision:
precision = gnpy_precision_dict()
if isinstance(data, dict):
for k, v in data.items():
fraction_digit = None
if k in precision:
fraction_digit = precision[k]
data[k] = convert_back(v, fraction_digit, precision=precision)
elif isinstance(data, list):
for i, el in enumerate(data):
if isinstance(el, str) and fraction_digit not in [None, -1]:
data[i] = float(data[i])
else:
data[i] = convert_back(el, fraction_digit=fraction_digit, precision=precision)
elif isinstance(data, (bool, int, float)):
return data
elif isinstance(data, str) and fraction_digit is not None:
if fraction_digit > 0:
return float(data)
if fraction_digit < 0:
return data
return int(data)
return data
def model_path() -> Path:
"""Filesystem path to YANG models.
return: path to the GNPy YANG modules.
rtype: Path
"""
return Path(__file__).parent.parent / 'yang'
def external_yang() -> Path:
"""Filesystem to the IETF external yang modules.
return: path to the IETF modules.
rtype: Path
"""
return Path(__file__).parent.parent / 'yang' / 'ext'
def yang_lib() -> Path:
"""Path to the json library of needed yang modules.
return: path to the library describing all modules and revisions for this gnpy release.
rtype: Path
"""
return Path(__file__).parent.parent / 'yang' / 'yang-library-gnpy.json'
def _create_context(yang_library) -> ly.Context:
"""Prepare a libyang context for validating data against GNPy YANG models.
:param yang_library: path to the library describing all modules and revisions to be considered for the formatted
string generation.
:type yang_library: Path
:return: Context used to hold all information about schemas.
:rtype: ly.Context
"""
ly.set_log_options(ly.LogOptions.Log | ly.LogOptions.Store)
ctx = ly.Context(str(model_path()) + os.pathsep + str(external_yang()),
ly.ContextOptions.AllImplemented | ly.ContextOptions.DisableSearchCwd)
with open(yang_library, 'r', encoding='utf-8') as file:
data = json.load(file)
yang_modules = [{'name': e['name'], 'revision': e['revision']}
for e in data['ietf-yang-library:modules-state']['module']]
for module in yang_modules:
ctx.load_module(module['name'], revision=module['revision'])
return ctx
class ErrorMessage(NamedTuple):
# pylint: disable=C0115
what: str
where: str
def load_data(s: str, yang_library: Path = yang_lib()) -> ly.DataNode:
"""Load data from YANG-based JSON input and validate them.
:param data: a string contating the json data to be loaded.
:type data: str
:param yang_library: path to the library describing all modules and revisions to be considered for the formatted
string generation.
:type yang_library: Path
:return: DataNode containing the loaded data
:rtype: ly.DataNode
"""
ctx = _create_context(yang_library)
try:
data = ctx.parse_data(s, ly.DataFormat.JSON,
ly.ParseOptions.Strict | ly.ParseOptions.Ordered,
ly.ValidationOptions.Present
| ly.ValidationOptions.MultiError)
except ly.Error as exc:
raise ly.Error(exc, [ErrorMessage(err.message, err.path) for err in ctx.errors()]) from None
return data
def dump_data(data: Dict, yang_library: Path = yang_lib()) -> str:
"""Creates a formatted string using oopt-gnpy-libyang.
:param data: a json dict with data already formatted
:type data: Dict
:param yang_library: path to the library describing all modules and revisions to be considered for the formatted
string generation.
:type yang_library: Path
:return: formatted string data
:rtype: str
"""
return load_data(json.dumps(data), yang_library).print(ly.DataFormat.JSON, ly.PrintFlags.WithSiblings)
def convert_degree(json_data: Dict) -> Dict:
"""Convert legacy json topology format to gnpy yang format revision 2025-01-20:
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: Dict
"""
for elem in json_data[ELEMENTS_KEY]:
if elem['type'] == ROADM_KEY and PARAMS_KEY in elem:
new_targets = []
for equalization_type in ['per_degree_pch_out_db', 'per_degree_psd_out_mWperGHz',
'per_degree_psd_out_mWperSlotWidth']:
targets = elem[PARAMS_KEY].pop(equalization_type, None)
if targets:
new_targets.extend([{DEGREE_KEY: degree, equalization_type: target}
for degree, target in targets.items()])
if new_targets:
elem[PARAMS_KEY]['per_degree_power_targets'] = new_targets
return json_data
def convert_back_degree(json_data: Dict) -> Dict:
"""Convert gnpy yang format back to legacy json topology format.
:param json_data: The input JSON topology data to convert back.
:type json_data: Dict
:return: the converted JSON data
:rtype: Dict
"""
for elem in json_data[ELEMENTS_KEY]:
if elem['type'] != ROADM_KEY or PARAMS_KEY not in elem:
continue
power_targets = elem[PARAMS_KEY].pop('per_degree_power_targets', None)
if not power_targets:
continue
# Process each power target
process_power_targets(elem, power_targets)
return json_data
def process_power_targets(elem: Dict, power_targets: List[Dict]) -> None:
"""Process power targets and update element parameters.
:param elem: The element to update
:type elem: Dict
:param power_targets: List of power target configurations
:type power_targets: List[Dict]
"""
equalization_types = [
'per_degree_pch_out_db',
'per_degree_psd_out_mWperGHz',
'per_degree_psd_out_mWperSlotWidth'
]
for target in power_targets:
degree_uid = target[DEGREE_KEY]
for eq_type in equalization_types:
if eq_type not in target:
continue
# Initialize the equalization type dict if needed
if eq_type not in elem[PARAMS_KEY]:
elem[PARAMS_KEY][eq_type] = {}
# Set the value for this degree
elem[PARAMS_KEY][eq_type][degree_uid] = target[eq_type]
def convert_loss_coeff_list(json_data: Dict) -> Dict:
"""Convert legacy json topology format to gnpy yang format revision 2025-01-20:
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: Dict
"""
for elem in json_data[ELEMENTS_KEY]:
if PARAMS_KEY in elem and LOSS_COEF_KEY in elem[PARAMS_KEY] \
and isinstance(elem[PARAMS_KEY][LOSS_COEF_KEY], dict):
loss_coef_per_frequency = elem[PARAMS_KEY].pop(LOSS_COEF_KEY)
loss_coef_list = loss_coef_per_frequency.pop('loss_coef_value', None)
frequency_list = loss_coef_per_frequency.pop('frequency', None)
if loss_coef_list:
new_loss_coef_per_frequency = [{'frequency': f, 'loss_coef_value': v}
for f, v in zip(frequency_list, loss_coef_list)]
elem[PARAMS_KEY][LOSS_COEF_KEY_PER_FREQ] = new_loss_coef_per_frequency
return json_data
def convert_back_loss_coeff_list(json_data: Dict) -> Dict:
"""Convert gnpy yang format revision 2025-01-20 back to legacy json topology format
:param json_data: The input JSON topology data to convert back
:type json_data: Dict
:return: the converted JSON data
:rtype: Dict
"""
for elem in json_data[ELEMENTS_KEY]:
if PARAMS_KEY in elem and LOSS_COEF_KEY_PER_FREQ in elem[PARAMS_KEY]:
loss_coef_per_frequency = elem[PARAMS_KEY].pop(LOSS_COEF_KEY_PER_FREQ)
if loss_coef_per_frequency:
new_loss_coef_per_frequency = {
'frequency': [item['frequency'] for item in loss_coef_per_frequency],
'loss_coef_value': [item['loss_coef_value'] for item in loss_coef_per_frequency]}
elem[PARAMS_KEY]['loss_coef'] = new_loss_coef_per_frequency
return json_data
def convert_design_band(json_data: Dict) -> Dict:
"""Convert legacy json topology format to gnpy yang format revision 2025-01-20:
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: Dict
"""
for elem in json_data[ELEMENTS_KEY]:
if elem['type'] == ROADM_KEY and PARAMS_KEY in elem:
new_targets = []
targets = elem[PARAMS_KEY].pop('per_degree_design_bands', None)
if targets:
new_targets.extend([{DEGREE_KEY: degree, 'design_bands': target}
for degree, target in targets.items()])
if new_targets:
elem[PARAMS_KEY]['per_degree_design_bands_targets'] = new_targets
return json_data
def convert_back_design_band(json_data: Dict) -> Dict:
"""Convert gnpy yang format revision 2025-01-20 back to legacy json topology format
:param json_data: The input JSON topology data to convert back
:type json_data: Dict
:return: the converted JSON data
:rtype: Dict
"""
for elem in json_data[ELEMENTS_KEY]:
if elem['type'] == ROADM_KEY and PARAMS_KEY in elem:
targets = elem[PARAMS_KEY].pop('per_degree_design_bands_targets', None)
if targets:
design_bands = {}
for target in targets:
design_bands[target[DEGREE_KEY]] = target['design_bands']
if design_bands:
elem[PARAMS_KEY]['per_degree_design_bands'] = design_bands
return json_data
def convert_range_to_dict(range_values: List[float]) -> Dict[str, float]:
"""Convert a range list to a dictionary format:
:param range_values: range of loat values defined with the format [min, max, step].
:type range_value: List[float]
:return: range formatted as a dict {"min_value": min, "max_value": max, "step": step}
:rtype: Dict[str, float]
"""
return {
'min_value': range_values[0],
'max_value': range_values[1],
'step': range_values[2]
}
def process_span_data(span: Dict) -> None:
"""Convert Span data with range in dict format
:param span: The span data to process.
:type span: Dict
"""
if 'delta_power_range_dict_db' in span:
return
if 'delta_power_range_db' not in span:
raise KeyError('delta_power_range or delta_power_range_dict_db missing in Span dict.')
delta_power_range_db = span.get('delta_power_range_db', [0, 0, 0])
span['delta_power_range_dict_db'] = convert_range_to_dict(delta_power_range_db)
del span['delta_power_range_db']
def process_si_data(si: Dict) -> None:
"""Convert Span data with range in dict format
:param si: The span data to process.
:type si: Dict
"""
if 'power_range_dict_db' in si:
return
if 'power_range_db' not in si:
raise KeyError('power_range_db or power_range_dict_db missing in SI dict.')
power_range_db = si.get('power_range_db', [0, 0, 0])
si['power_range_dict_db'] = convert_range_to_dict(power_range_db)
del si['power_range_db']
def convert_delta_power_range(json_data: Dict) -> Dict:
"""Convert legacy json equipment format to GNPy yang format revision 2025-01-20
:param json_data: the input JSON data to convert.
:type json_data: Dict
:return: The converted JSON data.
:rtype: Dict
"""
if 'Span' in json_data:
for span in json_data['Span']:
process_span_data(span)
if 'SI' in json_data:
for si in json_data['SI']:
process_si_data(si)
return json_data
def convert_back_delta_power_range(json_data: Dict) -> Dict:
"""Convert Yang JSON revision 2025-01-20 equipment format to legacy GNPy format.
:param json_data: the input JSON data to convert.
:type json_data: Dict
:return: The converted JSON data.
:rtype: Dict
"""
if 'Span' in json_data and 'delta_power_range_dict_db' in json_data['Span'][0]:
delta_power_range_db = json_data['Span'][0]['delta_power_range_dict_db']
json_data['Span'][0]['delta_power_range_db'] = [
delta_power_range_db['min_value'],
delta_power_range_db['max_value'],
delta_power_range_db['step']]
del json_data['Span'][0]['delta_power_range_dict_db']
if 'SI' in json_data and 'power_range_dict_db' in json_data['SI'][0]:
power_range_db = json_data['SI'][0]['power_range_dict_db']
json_data['SI'][0]['power_range_db'] = [
power_range_db['min_value'],
power_range_db['max_value'],
power_range_db['step']]
del json_data['SI'][0]['power_range_dict_db']
return json_data
def add_missing_default_type_variety(json_data: Dict) -> Dict:
"""Case of ROADM: legacy does not enforce type_variety to be present.
This utils ensures that 'default' type_variety is inserted if the key is missing.
:param json_data: the input JSON data to convert.
:type json_data: Dict
:return: The converted JSON data.
:rtype: Dict
"""
if 'Roadm' not in json_data:
return json_data
for i, elem in enumerate(json_data['Roadm']):
if 'type_variety' not in elem:
# make sure type_variety is the first key in the elem
temp = {'type_variety': 'default'}
temp.update(elem)
json_data['Roadm'][i] = temp
break
return json_data
def remove_null_region_city(json_data: Dict) -> Dict:
"""if present, name should not be None.
:param json_data: the input JSON data to convert.
:type json_data: Dict
:return: The converted JSON data.
:rtype: Dict
"""
for elem in json_data[ELEMENTS_KEY]:
if "metadata" in elem and "location" in elem[METADATA_KEY]:
for name in ['city', 'region']:
if name in elem[METADATA_KEY][LOCATION_KEY] \
and elem[METADATA_KEY][LOCATION_KEY][name] is None:
elem[METADATA_KEY][LOCATION_KEY][name] = ""
return json_data
def remove_union_that_fail(json_data: Dict) -> Dict:
"""Convert GNPy legacy JSON request format to GNPy yang format revision 2025-01-20
If present "N": or "M": should not contain empy data.
If present max-nb-of-channel should not contain empty data.
:param json_data: the input JSON data to convert.
:type json_data: Dict
:return: The converted JSON data.
:rtype: Dict
"""
for elem in json_data[PATH_REQUEST_KEY]:
te = elem['path-constraints']['te-bandwidth']
freq_slot = te.get('effective-freq-slot', None)
if freq_slot:
for slot in freq_slot:
if slot.get('N', None) is None:
slot.pop('N', None)
if slot.get('M', None) is None:
slot.pop('M', None)
if not slot:
te['effective-freq-slot'].remove(slot)
if not te['effective-freq-slot']:
te.pop('effective-freq-slot', None)
for attribute in ['max-nb-of-channel', 'trx_mode', 'output-power']:
if te.get(attribute) is None:
te.pop(attribute, None)
return json_data
def convert_none_to_empty(json_data: Any):
"""Convert all instances of None in the input to [None].
This function recursively traverses the input and replaces any None
values with a list containing None. If the input is already a list
containing None, it returns the input unchanged.
:param json_data: The input data to process, which can be of any type.
:type json_data: Any
:return: A new representation of the input with None values replaced by [None].
:rtype: Any
:example:
>>> a = {'uid': '[930/WRT-2-2-SIG=>923/WRT-1-9-SIG]-923/AMP-1-13', 'type_variety': 'AMP',
... 'metadata': {'location': {'latitude': 0.0, 'longitude': 0.0, 'city': 'Zion', 'region': ''}},
... 'type': 'Multiband_amplifier', 'amplifiers': [{'type_variety': 'AMP_LOW_C',
... 'operational': {'gain_target': 12.22, 'delta_p': 4.19, 'out_voa': None, 'tilt_target': 0.0,
... 'f_min': 191.3, 'f_max': 196.1}}, {'type_variety': 'AMP_LOW_L',
... 'operational': {'gain_target': 12.05, 'delta_p': 4.19, 'out_voa': None, 'tilt_target': 0.0,
... 'f_min': 186.1, 'f_max': 190.9}}]}
>>> convert_none_to_empty(a)
{'uid': '[930/WRT-2-2-SIG=>923/WRT-1-9-SIG]-923/AMP-1-13', 'type_variety': 'AMP', \
'metadata': {'location': {'latitude': 0.0, 'longitude': 0.0, 'city': 'Zion', 'region': ''}}, \
'type': 'Multiband_amplifier', 'amplifiers': [{'type_variety': 'AMP_LOW_C', \
'operational': {'gain_target': 12.22, 'delta_p': 4.19, 'out_voa': [None], 'tilt_target': 0.0, \
'f_min': 191.3, 'f_max': 196.1}}, {'type_variety': 'AMP_LOW_L', \
'operational': {'gain_target': 12.05, 'delta_p': 4.19, 'out_voa': [None], 'tilt_target': 0.0, \
'f_min': 186.1, 'f_max': 190.9}}]}
"""
if json_data == [None]:
# already conformed
return json_data
if isinstance(json_data, dict):
for key, value in json_data.items():
json_data[key] = convert_none_to_empty(value)
elif isinstance(json_data, list):
for i, elem in enumerate(json_data):
json_data[i] = convert_none_to_empty(elem)
elif json_data is None:
return [None]
return json_data
def convert_empty_to_none(json_data: Any):
"""Convert all instances of [None] in the input to None.
This function recursively traverses the input data and replaces any
lists containing a single None element with None. If the input is
already None, it returns None unchanged.
:param json_data: The input data to process, which can be of any type.
:type json_data: Any
:return: A new representation of the input with [None] replaced by None.
:rtype: Any
>>> json_data = {
... "uid": "[930/WRT-2-2-SIG=>923/WRT-1-9-SIG]-923/AMP-1-13",
... "type_variety": "AMP",
... "metadata": {
... "location": {
... "latitude": 0.000000,
... "longitude": 0.000000,
... "city": "Zion",
... "region": ""
... }
... },
... "type": "Multiband_amplifier",
... "amplifiers": [{
... "type_variety": "AMP_LOW_C",
... "operational": {
... "gain_target": 12.22,
... "delta_p": 4.19,
... "out_voa": [None],
... "tilt_target": 0.00,
... "f_min": 191.3,
... "f_max": 196.1
... }
... }, {
... "type_variety": "AMP_LOW_L",
... "operational": {
... "gain_target": 12.05,
... "delta_p": 4.19,
... "out_voa": [None],
... "tilt_target": 0.00,
... "f_min": 186.1,
... "f_max": 190.9
... }
... }
... ]
... }
>>> convert_empty_to_none(json_data)
{'uid': '[930/WRT-2-2-SIG=>923/WRT-1-9-SIG]-923/AMP-1-13', 'type_variety': 'AMP', \
'metadata': {'location': {'latitude': 0.0, 'longitude': 0.0, 'city': 'Zion', 'region': ''}}, \
'type': 'Multiband_amplifier', 'amplifiers': [{'type_variety': 'AMP_LOW_C', \
'operational': {'gain_target': 12.22, 'delta_p': 4.19, 'out_voa': None, 'tilt_target': 0.0, \
'f_min': 191.3, 'f_max': 196.1}}, {'type_variety': 'AMP_LOW_L', \
'operational': {'gain_target': 12.05, 'delta_p': 4.19, 'out_voa': None, 'tilt_target': 0.0, \
'f_min': 186.1, 'f_max': 190.9}}]}
"""
if isinstance(json_data, dict):
for key, value in json_data.items():
json_data[key] = convert_empty_to_none(value)
elif isinstance(json_data, list):
if len(json_data) == 1 and json_data[0] is None:
return None
for i, elem in enumerate(json_data):
json_data[i] = convert_empty_to_none(elem)
return json_data
def remove_namespace_context(json_data: Union[Dict, List, float, int, str, bool, None], namespace: str) \
-> Union[Dict, List, float, int, str, bool, None]:
"""Serialisation with yang introduces a namespace in values that
are defined as identity. this function filter them out.
:param json_data: The input JSON topology data to process.
:type json_data: Union[Dict, List, float, int, str, bool, None]
:param namespace: a namespace string
:type namespace: str
:return: the converted JSON data
:rtype: Union[Dict, List, float, int, str, bool, None]
>>> a = [{"a": 123, "b": "123:alkdje"}, {"a": 456, "c": "123", "d": "123:123"}]
>>> remove_namespace_context(a, "123:")
[{'a': 123, 'b': 'alkdje'}, {'a': 456, 'c': '123', 'd': '123'}]
"""
if isinstance(json_data, dict):
for key, value in json_data.items():
json_data[key] = remove_namespace_context(value, namespace)
elif isinstance(json_data, list):
for i, elem in enumerate(json_data):
json_data[i] = remove_namespace_context(elem, namespace)
elif isinstance(json_data, str) and namespace in json_data:
return json_data.split(namespace)[1]
return json_data
def convert_nf_coef(json_data: Dict) -> Dict:
"""Convert gnpy legacy format yang topology format.
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: dict
"""
if 'Edfa' not in json_data:
return json_data
for edfa in json_data['Edfa']:
if 'nf_coef' in edfa and not isinstance(edfa['nf_coef'][0], dict):
nf_coef = edfa.pop('nf_coef')
new_nf_coef = [{'coef_order': i, 'nf_coef': c} for i, c in enumerate(nf_coef)]
edfa['nf_coef'] = new_nf_coef
return json_data
def convert_back_nf_coef(json_data: Dict) -> Dict:
"""Convert gnpy yang format back to legacy json topology format.
:param json_data: The input JSON topology data to convert back.
:type json_data: Dict
:return: the converted back JSON data
:rtype: dict
"""
if 'Edfa' not in json_data:
return json_data
for edfa in json_data['Edfa']:
if 'nf_coef' in edfa and isinstance(edfa['nf_coef'][0], dict):
nf_coef = edfa.pop('nf_coef')
sorted_nf_coef = sorted(nf_coef, key=lambda x: x['coef_order'])
new_nf_coef = [c['nf_coef'] for c in sorted_nf_coef]
edfa['nf_coef'] = new_nf_coef
return json_data
def convert_nf_fit_coef(json_data: Dict) -> Dict:
"""Convert gnpy legacy format yang topology format.
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: dict
"""
if 'nf_fit_coeff' in json_data and not isinstance(json_data['nf_fit_coeff'][0], dict):
nf_coef = json_data.pop('nf_fit_coeff')
new_nf_coef = [{'coef_order': i, 'nf_coef': c} for i, c in enumerate(nf_coef)]
json_data['nf_fit_coeff'] = new_nf_coef
return json_data
def convert_back_nf_fit_coef(json_data: Dict) -> Dict:
"""Convert gnpy yang format back to legacy json topology format.
:param json_data: The input JSON topology data to convert back.
:type json_data: Dict
:return: the converted back JSON data
:rtype: dict
"""
if 'nf_fit_coeff' in json_data and isinstance(json_data['nf_fit_coeff'][0], dict):
nf_coef = json_data.pop('nf_fit_coeff')
sorted_nf_coef = sorted(nf_coef, key=lambda x: x['coef_order'])
new_nf_coef = [c['nf_coef'] for c in sorted_nf_coef]
json_data['nf_fit_coeff'] = new_nf_coef
return json_data
def convert_raman_coef(json_data: Dict) -> Dict:
"""Convert gnpy legacy format yang topology format.
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: dict
"""
for elem in json_data[ELEMENTS_KEY]:
if PARAMS_KEY in elem and RAMAN_COEF_KEY in elem[PARAMS_KEY] \
and 'g0' in elem[PARAMS_KEY][RAMAN_COEF_KEY]:
raman_coef = elem[PARAMS_KEY].pop(RAMAN_COEF_KEY)
g0_list = raman_coef.pop('g0', [])
frequency_offset_list = raman_coef.pop('frequency_offset', [])
if frequency_offset_list:
new_raman_coef = {'reference_frequency': raman_coef['reference_frequency'],
'g0_per_frequency': [{'frequency_offset': f, 'g0': v}
for f, v in zip(frequency_offset_list, g0_list)]}
elem[PARAMS_KEY][RAMAN_COEF_KEY] = new_raman_coef
return json_data
def convert_back_raman_coef(json_data: Dict) -> Dict:
"""Convert gnpy yang format back to legacy json topology format.
:param json_data: The input JSON topology data to convert back.
:type json_data: Dict
:return: the converted back JSON data
:rtype: dict
"""
for elem in json_data[ELEMENTS_KEY]:
if PARAMS_KEY in elem and RAMAN_COEF_KEY in elem[PARAMS_KEY] \
and 'g0_per_frequency' in elem[PARAMS_KEY][RAMAN_COEF_KEY]:
raman_coef = elem[PARAMS_KEY].pop(RAMAN_COEF_KEY)
g0_list = [g['g0'] for g in raman_coef.pop('g0_per_frequency', [])]
frequency_offset_list = [f['frequency_offset'] for f in raman_coef.pop('g0_per_frequency', [])]
if frequency_offset_list:
new_raman_coef = {'reference_frequency': raman_coef['reference_frequency'],
'g0': g0_list,
'frequency_offset': frequency_offset_list}
elem[PARAMS_KEY][RAMAN_COEF_KEY] = new_raman_coef
return json_data
def convert_raman_efficiency(json_data: Dict) -> Dict:
"""Convert gnpy legacy format yang topology format.
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: dict
"""
if 'RamanFiber' not in json_data:
return json_data
for fiber_eqpt in json_data['RamanFiber']:
if RAMAN_EFFICIENCY_KEY in fiber_eqpt \
and 'cr' in fiber_eqpt[RAMAN_EFFICIENCY_KEY]:
raman_efficiency = fiber_eqpt.pop(RAMAN_EFFICIENCY_KEY)
cr_list = raman_efficiency.pop('cr', [])
frequency_offset_list = raman_efficiency.pop('frequency_offset', [])
if frequency_offset_list:
new_raman_efficiency = [{'frequency_offset': f, 'cr': v}
for f, v in zip(frequency_offset_list, cr_list)]
fiber_eqpt[RAMAN_EFFICIENCY_KEY] = new_raman_efficiency
return json_data
def convert_back_raman_efficiency(json_data: Dict) -> Dict:
"""Convert gnpy yang format back to legacy json topology format.
:param json_data: The input JSON topology data to convert back.
:type json_data: Dict
:return: the converted back JSON data
:rtype: dict
"""
if 'RamanFiber' not in json_data:
return json_data
for fiber_eqpt in json_data['RamanFiber']:
if RAMAN_EFFICIENCY_KEY in fiber_eqpt and isinstance(fiber_eqpt[RAMAN_EFFICIENCY_KEY], list):
raman_efficiency = fiber_eqpt.pop(RAMAN_EFFICIENCY_KEY)
cr_list = [c['cr'] for c in raman_efficiency]
frequency_offset_list = [f['frequency_offset'] for f in raman_efficiency]
if frequency_offset_list:
old_raman_efficiency = {'cr': cr_list,
'frequency_offset': frequency_offset_list}
fiber_eqpt[RAMAN_COEF_KEY] = old_raman_efficiency
return json_data
def reorder_keys(data_list: List, key: str) -> List:
"""Roarder item in a dict placing the key (the key of a list with YANG meaning) first.
This is required because oopt-gnpy-libyang does not recognize the key when it is not placed first in the data node.
:param json_data: the list of dictionary items.
:type data_list: List
:return: the converted back JSON data
:rtype: List
"""
for item in data_list:
index_value = item.pop(key, None)
if index_value is not None:
# Place key first
new_item = {key: index_value}
# add other items
new_item.update(item)
# replace old element with new element
for k in list(item.keys()):
item.pop(k)
item.update(new_item)
return data_list
# next functions because ly requires that the key of a list be in the first position in the item
def reorder_route_objects(json_data: Dict) -> Dict:
"""Make sure that the index of a route object is placed first in the object.
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: dict
"""
for request in json_data['path-request']:
if "explicit-route-objects" in request:
request["explicit-route-objects"]["route-object-include-exclude"] = \
reorder_keys(request["explicit-route-objects"]["route-object-include-exclude"], "index")
return json_data
def reorder_lumped_losses_objects(json_data: Dict) -> Dict:
"""Make sure that the position of a lumped loss object is placed first in the object.
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: dict
"""
for element in json_data['elements']:
if "params" in element and "lumped_losses" in element["params"]:
element["params"]["lumped_losses"] = reorder_keys(element["params"]["lumped_losses"], "position")
return json_data
def reorder_raman_pumps(json_data: Dict) -> Dict:
"""Make sure that the frequency of a Raman pum object is placed first in the object.
:param json_data: The input JSON topology data to convert.
:type json_data: Dict
:return: the converted JSON data
:rtype: dict
"""
for element in json_data['elements']:
if "operational" in element and "raman_pumps" in element["operational"]:
element["operational"]["raman_pumps"] = reorder_keys(element["operational"]["raman_pumps"], "frequency")
return json_data