mirror of
				https://github.com/Telecominfraproject/oopt-gnpy.git
				synced 2025-10-31 10:07:57 +00:00 
			
		
		
		
	 1a795639c7
			
		
	
	1a795639c7
	
	
	
		
			
			This commit introduces new functions for converting between YANG formatted files and legacy formats. The conversion processes adhere to RFC7951 for encoding YANG data. Key changes include: - Conversion of float and empty type representations. - Transformation of Span and SI lists xx_power_range into dictionaries. - Addition of necessary namespaces. - use of oopt-gnpy-libyang to enforce compliancy to yang models These utilities enable full compatibility with GNPy. Co-authored-by: Renato Ambrosone <renato.ambrosone@polito.it> Signed-off-by: EstherLerouzic <esther.lerouzic@orange.com> Change-Id: Ia004113bca2b0631d1648564e5ccb60504fe80f8
		
			
				
	
	
		
			961 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			961 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| # SPDX-License-Identifier: BSD-3-Clause
 | |
| # Utils for yang <-> legacy format conversion
 | |
| # Copyright (C) 2025 Telecom Infra Project and GNPy contributors
 | |
| # see AUTHORS.rst for a list of contributors
 | |
| 
 | |
| """
 | |
| Utils for yang <-> legacy format conversion
 | |
| ===========================================
 | |
| 
 | |
| Format conversion utils.
 | |
| """
 | |
| 
 | |
| from pathlib import Path
 | |
| from copy import deepcopy
 | |
| from typing import Dict, Union, List, Any, NamedTuple
 | |
| import json
 | |
| import os
 | |
| 
 | |
| import oopt_gnpy_libyang as ly
 | |
| 
 | |
| from gnpy.yang.precision_dict import PRECISION_DICT
 | |
| 
 | |
| ELEMENTS_KEY = 'elements'
 | |
| ROADM_KEY = 'Roadm'
 | |
| PARAMS_KEY = 'params'
 | |
| METADATA_KEY = 'metadata'
 | |
| LOCATION_KEY = 'location'
 | |
| DEGREE_KEY = 'degree_uid'
 | |
| PATH_REQUEST_KEY = 'path-request'
 | |
| RESPONSE_KEY = 'response'
 | |
| SPECTRUM_KEY = 'spectrum'
 | |
| LOSS_COEF_KEY = 'loss_coef'
 | |
| LOSS_COEF_KEY_PER_FREQ = 'loss_coef_per_frequency'
 | |
| RAMAN_COEF_KEY = 'raman_coefficient'
 | |
| RAMAN_EFFICIENCY_KEY = 'raman_efficiency'
 | |
| EQPT_TYPES = ['Edfa', 'Transceiver', 'Fiber', 'Roadm']
 | |
| EDFA_CONFIG_KEYS = ['nf_fit_coeff', 'nf_ripple', 'gain_ripple', 'dgt']
 | |
| SIM_PARAMS_KEYS = ['raman_params', 'nli_params']
 | |
| TOPO_NMSP = 'gnpy-network-topology:topology'
 | |
| EQPT_NMSP = 'gnpy-eqpt-config:equipment'
 | |
| SERV_NMSP = 'gnpy-path-computation:services'
 | |
| RESP_NMSP = 'gnpy-path-computation:responses'
 | |
| EDFA_CONFIG_NMSP = 'gnpy-edfa-config:edfa-config'
 | |
| SIM_PARAMS_NMSP = 'gnpy-sim-params:sim-params'
 | |
| SPECTRUM_NMSP = 'gnpy-spectrum:spectrum'
 | |
| 
 | |
| 
 | |
| class PrettyFloat(float):
 | |
|     """"A float subclass for formatting according to specific fraction digit requirements.
 | |
| 
 | |
|     >>> PrettyFloat(3.1245)
 | |
|     3.12
 | |
|     >>> PrettyFloat(100.65, 5)
 | |
|     100.65
 | |
|     >>> PrettyFloat(2.1e-5, 8)
 | |
|     0.000021
 | |
|     >>> PrettyFloat(10, 3)
 | |
|     10.0
 | |
|     >>> PrettyFloat(-0.3110761646066259, 18)
 | |
|     -0.3110761646066259
 | |
|     """
 | |
|     def __new__(cls, value: float, fraction_digit: int = 2):
 | |
|         """Create a new instance of PrettyFloat"""
 | |
|         instance = super().__new__(cls, value)
 | |
|         instance.fraction_digit = fraction_digit
 | |
|         instance.value = value
 | |
|         return instance
 | |
| 
 | |
|     def __repr__(self) -> str:
 | |
|         """Return the string representation of the float formatted to the specified fraction digits. It removes
 | |
|         scientific notation ("e-x").
 | |
|         """
 | |
|         # When fraction digit is over 16, the usual formatting does not works properly because of floating point issues.
 | |
|         # For example -0.3110761646066259 is represented as "-0.311076164606625905". The following function makes
 | |
|         # sure that the unwanted floating point issue does not change the value. Maximum fraction digit in YANG is 18.
 | |
|         if self.fraction_digit in range(0, 19):
 | |
|             temp = str(self.value)
 | |
|             if 'e' in temp or '.' not in temp or self.fraction_digit < 17:
 | |
|                 formatted_value = f'{self:.{self.fraction_digit}f}'   # noqa E231
 | |
|                 if '.' in formatted_value:
 | |
|                     formatted_value = formatted_value.rstrip('0')
 | |
|                     if formatted_value.endswith('.'):
 | |
|                         formatted_value += '0'
 | |
|                 return formatted_value
 | |
|             if '.' in temp:
 | |
|                 parts = temp.split('.')
 | |
|                 formatted_value = parts[0] + '.' + parts[1][0:min(self.fraction_digit, len(parts[1]))]
 | |
|                 formatted_value = formatted_value.rstrip('0')
 | |
|                 if formatted_value.endswith('.'):
 | |
|                     formatted_value += '0'
 | |
|                 return formatted_value
 | |
|             return temp
 | |
|         raise ValueError(f'Fraction digit {self.fraction_digit} not handled')
 | |
| 
 | |
| 
 | |
| def gnpy_precision_dict() -> Dict[str, int]:
 | |
|     """Return a dictionary of fraction-digit definitions for GNPy.
 | |
|     Precision correspond to fraction digit number if it is a decimal64 yang type, or 0 if it is an
 | |
|     (u)int < 64 or -1 if it is a string or an (u)int64 type.
 | |
| 
 | |
|     :return: Dictionnary mapping key names with digit numbers for values.
 | |
|     :rtype: Dict[str, int]
 | |
|     """
 | |
|     return PRECISION_DICT
 | |
| 
 | |
| 
 | |
| def convert_dict(data: Dict, fraction_digit: int = 2, precision: Union[Dict[str, int], None] = None) \
 | |
|         -> Union[Dict, List, float, int, str, None]:
 | |
|     """Recursive conversion from float to str, conformed to RFC7951
 | |
|     does not work for int64 (will not returm str as stated in standard)
 | |
|     If nothing is stated precision is using gnpy_precision_dict.
 | |
| 
 | |
|     :param data: the input dictionary to convert.
 | |
|     :type data: data: Dict
 | |
|     :param fraction_digit: the number of decimal places to format.
 | |
|     :type fraction_digit: int
 | |
|     :param precision: A dictionary defining precision for specific keys.
 | |
|     :type precision: Union[Dict[str, int], None]
 | |
|     :return: A new dictionary with converted values.
 | |
|     :rtype: Dict
 | |
| 
 | |
|     >>> convert_dict({"y": "amp", "t": "vn", "g": 25, "gamma": 0.0016, "p": 21.5, "o": True, \
 | |
|     "output-power": 14.12457896})
 | |
|     {'y': 'amp', 't': 'vn', 'g': '25.0', 'gamma': '0.0016', 'p': '21.5', 'o': True, 'output-power': '14.12457896'}
 | |
|     """
 | |
|     if not precision:
 | |
|         precision = gnpy_precision_dict()
 | |
|     if isinstance(data, dict):
 | |
|         for k, v in data.items():
 | |
|             fraction_digit = precision.get(k, 2)
 | |
|             data[k] = convert_dict(v, fraction_digit, precision=precision)
 | |
|     elif isinstance(data, list):
 | |
|         temp = deepcopy(data)
 | |
|         for i, el in enumerate(temp):
 | |
|             if isinstance(el, float):
 | |
|                 data[i] = PrettyFloat(el, fraction_digit)
 | |
|                 data[i] = str(data[i])
 | |
|             else:
 | |
|                 data[i] = convert_dict(el, fraction_digit=fraction_digit, precision=precision)
 | |
|     elif isinstance(data, bool):
 | |
|         return data
 | |
|     elif isinstance(data, int):
 | |
|         data = PrettyFloat(data)
 | |
|         data.fraction_digit = fraction_digit
 | |
|         if fraction_digit > 0:
 | |
|             return str(data)
 | |
|         if fraction_digit < 0:
 | |
|             return data
 | |
|         return int(data)
 | |
|     elif isinstance(data, float):
 | |
|         data = PrettyFloat(data)
 | |
|         data.fraction_digit = fraction_digit
 | |
|         return str(data)
 | |
|     return data
 | |
| 
 | |
| 
 | |
| def convert_back(data: Dict, fraction_digit: Union[int, None] = None, precision: Union[Dict[str, int], None] = None) \
 | |
|         -> Union[Dict, List, float, int, str, None]:
 | |
|     """Recursively convert strings back to their original types int, float according to RFC7951.
 | |
| 
 | |
|     :param data: the input dictionary to convert.
 | |
|     :type data: Dict
 | |
|     :param fraction_digit: the number of decimal places to format.
 | |
|     :type fraction_digit: Union[int, None]
 | |
|     :param precision: A dictionary defining precision for specific keys.
 | |
|     :type precision: Union[Dict[str, int], None]
 | |
|     :return: A new dictionary with converted values.
 | |
|     :rtype: Dict
 | |
| 
 | |
|     >>> a = {'y': 'amp', 't': 'vn', 'N': '25', 'gamma': '0.0000000000000016', 'p': '21.50', 'o': True, \
 | |
|       'output-power': '14.12458'}
 | |
|     >>> convert_back({'a': a, 'delta_power_range_db': ['12.3', '10.6', True]})
 | |
|     {'a': {'y': 'amp', 't': 'vn', 'N': 25, 'gamma': 1.6e-15, 'p': '21.50', 'o': True, 'output-power': 14.12458}, \
 | |
| 'delta_power_range_db': ['12.3', '10.6', True]}
 | |
|     """
 | |
|     if not precision:
 | |
|         precision = gnpy_precision_dict()
 | |
|     if isinstance(data, dict):
 | |
|         for k, v in data.items():
 | |
|             fraction_digit = None
 | |
|             if k in precision:
 | |
|                 fraction_digit = precision[k]
 | |
|             data[k] = convert_back(v, fraction_digit, precision=precision)
 | |
|     elif isinstance(data, list):
 | |
|         for i, el in enumerate(data):
 | |
|             if isinstance(el, str) and fraction_digit not in [None, -1]:
 | |
|                 data[i] = float(data[i])
 | |
|             else:
 | |
|                 data[i] = convert_back(el, fraction_digit=fraction_digit, precision=precision)
 | |
|     elif isinstance(data, (bool, int, float)):
 | |
|         return data
 | |
|     elif isinstance(data, str) and fraction_digit is not None:
 | |
|         if fraction_digit > 0:
 | |
|             return float(data)
 | |
|         if fraction_digit < 0:
 | |
|             return data
 | |
|         return int(data)
 | |
|     return data
 | |
| 
 | |
| 
 | |
| def model_path() -> Path:
 | |
|     """Filesystem path to YANG models.
 | |
| 
 | |
|     return: path to the GNPy YANG modules.
 | |
|     rtype: Path
 | |
|     """
 | |
|     return Path(__file__).parent.parent / 'yang'
 | |
| 
 | |
| 
 | |
| def external_yang() -> Path:
 | |
|     """Filesystem to the IETF external yang modules.
 | |
| 
 | |
|     return: path to the IETF modules.
 | |
|     rtype: Path
 | |
|     """
 | |
|     return Path(__file__).parent.parent / 'yang' / 'ext'
 | |
| 
 | |
| 
 | |
| def yang_lib() -> Path:
 | |
|     """Path to the json library of needed yang modules.
 | |
| 
 | |
|     return: path to the library describing all modules and revisions for this gnpy release.
 | |
|     rtype: Path
 | |
|     """
 | |
|     return Path(__file__).parent.parent / 'yang' / 'yang-library-gnpy.json'
 | |
| 
 | |
| 
 | |
| def _create_context(yang_library) -> ly.Context:
 | |
|     """Prepare a libyang context for validating data against GNPy YANG models.
 | |
| 
 | |
|     :param yang_library: path to the library describing all modules and revisions to be considered for the formatted
 | |
|                          string generation.
 | |
|     :type yang_library: Path
 | |
|     :return: Context used to hold all information about schemas.
 | |
|     :rtype: ly.Context
 | |
|     """
 | |
|     ly.set_log_options(ly.LogOptions.Log | ly.LogOptions.Store)
 | |
|     ctx = ly.Context(str(model_path()) + os.pathsep + str(external_yang()),
 | |
|                      ly.ContextOptions.AllImplemented | ly.ContextOptions.DisableSearchCwd)
 | |
|     with open(yang_library, 'r', encoding='utf-8') as file:
 | |
|         data = json.load(file)
 | |
|     yang_modules = [{'name': e['name'], 'revision': e['revision']}
 | |
|                     for e in data['ietf-yang-library:modules-state']['module']]
 | |
|     for module in yang_modules:
 | |
|         ctx.load_module(module['name'], revision=module['revision'])
 | |
|     return ctx
 | |
| 
 | |
| 
 | |
| class ErrorMessage(NamedTuple):
 | |
|     # pylint: disable=C0115
 | |
|     what: str
 | |
|     where: str
 | |
| 
 | |
| 
 | |
| def load_data(s: str, yang_library: Path = yang_lib()) -> ly.DataNode:
 | |
|     """Load data from YANG-based JSON input and validate them.
 | |
| 
 | |
|     :param data: a string contating the json data to be loaded.
 | |
|     :type data: str
 | |
|     :param yang_library: path to the library describing all modules and revisions to be considered for the formatted
 | |
|                          string generation.
 | |
|     :type yang_library: Path
 | |
|     :return: DataNode containing the loaded data
 | |
|     :rtype: ly.DataNode
 | |
|     """
 | |
|     ctx = _create_context(yang_library)
 | |
|     try:
 | |
|         data = ctx.parse_data(s, ly.DataFormat.JSON,
 | |
|                               ly.ParseOptions.Strict | ly.ParseOptions.Ordered,
 | |
|                               ly.ValidationOptions.Present
 | |
|                               | ly.ValidationOptions.MultiError)
 | |
|     except ly.Error as exc:
 | |
|         raise ly.Error(exc, [ErrorMessage(err.message, err.path) for err in ctx.errors()]) from None
 | |
|     return data
 | |
| 
 | |
| 
 | |
| def dump_data(data: Dict, yang_library: Path = yang_lib()) -> str:
 | |
|     """Creates a formatted string using oopt-gnpy-libyang.
 | |
| 
 | |
|     :param data: a json dict with data already formatted
 | |
|     :type data: Dict
 | |
|     :param yang_library: path to the library describing all modules and revisions to be considered for the formatted
 | |
|                          string generation.
 | |
|     :type yang_library: Path
 | |
|     :return: formatted string data
 | |
|     :rtype: str
 | |
|     """
 | |
|     return load_data(json.dumps(data), yang_library).print(ly.DataFormat.JSON, ly.PrintFlags.WithSiblings)
 | |
| 
 | |
| 
 | |
| def convert_degree(json_data: Dict) -> Dict:
 | |
|     """Convert legacy json topology format to gnpy yang format revision 2025-01-20:
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     for elem in json_data[ELEMENTS_KEY]:
 | |
|         if elem['type'] == ROADM_KEY and PARAMS_KEY in elem:
 | |
|             new_targets = []
 | |
|             for equalization_type in ['per_degree_pch_out_db', 'per_degree_psd_out_mWperGHz',
 | |
|                                       'per_degree_psd_out_mWperSlotWidth']:
 | |
|                 targets = elem[PARAMS_KEY].pop(equalization_type, None)
 | |
|                 if targets:
 | |
|                     new_targets.extend([{DEGREE_KEY: degree, equalization_type: target}
 | |
|                                         for degree, target in targets.items()])
 | |
|             if new_targets:
 | |
|                 elem[PARAMS_KEY]['per_degree_power_targets'] = new_targets
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_back_degree(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy yang format back to legacy json topology format.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert back.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     for elem in json_data[ELEMENTS_KEY]:
 | |
|         if elem['type'] != ROADM_KEY or PARAMS_KEY not in elem:
 | |
|             continue
 | |
|         power_targets = elem[PARAMS_KEY].pop('per_degree_power_targets', None)
 | |
|         if not power_targets:
 | |
|             continue
 | |
|         # Process each power target
 | |
|         process_power_targets(elem, power_targets)
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def process_power_targets(elem: Dict, power_targets: List[Dict]) -> None:
 | |
|     """Process power targets and update element parameters.
 | |
| 
 | |
|     :param elem: The element to update
 | |
|     :type elem: Dict
 | |
|     :param power_targets: List of power target configurations
 | |
|     :type power_targets: List[Dict]
 | |
|     """
 | |
|     equalization_types = [
 | |
|         'per_degree_pch_out_db',
 | |
|         'per_degree_psd_out_mWperGHz',
 | |
|         'per_degree_psd_out_mWperSlotWidth'
 | |
|     ]
 | |
| 
 | |
|     for target in power_targets:
 | |
|         degree_uid = target[DEGREE_KEY]
 | |
|         for eq_type in equalization_types:
 | |
|             if eq_type not in target:
 | |
|                 continue
 | |
|             # Initialize the equalization type dict if needed
 | |
|             if eq_type not in elem[PARAMS_KEY]:
 | |
|                 elem[PARAMS_KEY][eq_type] = {}
 | |
|             # Set the value for this degree
 | |
|             elem[PARAMS_KEY][eq_type][degree_uid] = target[eq_type]
 | |
| 
 | |
| 
 | |
| def convert_loss_coeff_list(json_data: Dict) -> Dict:
 | |
|     """Convert legacy json topology format to gnpy yang format revision 2025-01-20:
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     for elem in json_data[ELEMENTS_KEY]:
 | |
|         if PARAMS_KEY in elem and LOSS_COEF_KEY in elem[PARAMS_KEY] \
 | |
|                 and isinstance(elem[PARAMS_KEY][LOSS_COEF_KEY], dict):
 | |
|             loss_coef_per_frequency = elem[PARAMS_KEY].pop(LOSS_COEF_KEY)
 | |
|             loss_coef_list = loss_coef_per_frequency.pop('loss_coef_value', None)
 | |
|             frequency_list = loss_coef_per_frequency.pop('frequency', None)
 | |
|             if loss_coef_list:
 | |
|                 new_loss_coef_per_frequency = [{'frequency': f, 'loss_coef_value': v}
 | |
|                                                for f, v in zip(frequency_list, loss_coef_list)]
 | |
|                 elem[PARAMS_KEY][LOSS_COEF_KEY_PER_FREQ] = new_loss_coef_per_frequency
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_back_loss_coeff_list(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy yang format revision 2025-01-20 back to legacy json topology format
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert back
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     for elem in json_data[ELEMENTS_KEY]:
 | |
|         if PARAMS_KEY in elem and LOSS_COEF_KEY_PER_FREQ in elem[PARAMS_KEY]:
 | |
|             loss_coef_per_frequency = elem[PARAMS_KEY].pop(LOSS_COEF_KEY_PER_FREQ)
 | |
|             if loss_coef_per_frequency:
 | |
|                 new_loss_coef_per_frequency = {
 | |
|                     'frequency': [item['frequency'] for item in loss_coef_per_frequency],
 | |
|                     'loss_coef_value': [item['loss_coef_value'] for item in loss_coef_per_frequency]}
 | |
|                 elem[PARAMS_KEY]['loss_coef'] = new_loss_coef_per_frequency
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_design_band(json_data: Dict) -> Dict:
 | |
|     """Convert legacy json topology format to gnpy yang format revision 2025-01-20:
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     for elem in json_data[ELEMENTS_KEY]:
 | |
|         if elem['type'] == ROADM_KEY and PARAMS_KEY in elem:
 | |
|             new_targets = []
 | |
|             targets = elem[PARAMS_KEY].pop('per_degree_design_bands', None)
 | |
|             if targets:
 | |
|                 new_targets.extend([{DEGREE_KEY: degree, 'design_bands': target}
 | |
|                                     for degree, target in targets.items()])
 | |
|             if new_targets:
 | |
|                 elem[PARAMS_KEY]['per_degree_design_bands_targets'] = new_targets
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_back_design_band(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy yang format revision 2025-01-20 back to legacy json topology format
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert back
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     for elem in json_data[ELEMENTS_KEY]:
 | |
|         if elem['type'] == ROADM_KEY and PARAMS_KEY in elem:
 | |
|             targets = elem[PARAMS_KEY].pop('per_degree_design_bands_targets', None)
 | |
|             if targets:
 | |
|                 design_bands = {}
 | |
|                 for target in targets:
 | |
|                     design_bands[target[DEGREE_KEY]] = target['design_bands']
 | |
|                 if design_bands:
 | |
|                     elem[PARAMS_KEY]['per_degree_design_bands'] = design_bands
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_range_to_dict(range_values: List[float]) -> Dict[str, float]:
 | |
|     """Convert a range list to a dictionary format:
 | |
| 
 | |
|     :param range_values: range of loat values defined with the format [min, max, step].
 | |
|     :type range_value: List[float]
 | |
|     :return: range formatted as a dict {"min_value": min, "max_value": max, "step": step}
 | |
|     :rtype: Dict[str, float]
 | |
|     """
 | |
|     return {
 | |
|         'min_value': range_values[0],
 | |
|         'max_value': range_values[1],
 | |
|         'step': range_values[2]
 | |
|     }
 | |
| 
 | |
| 
 | |
| def process_span_data(span: Dict) -> None:
 | |
|     """Convert Span data with range in dict format
 | |
|     :param span: The span data to process.
 | |
|     :type span: Dict
 | |
|     """
 | |
|     if 'delta_power_range_dict_db' in span:
 | |
|         return
 | |
| 
 | |
|     if 'delta_power_range_db' not in span:
 | |
|         raise KeyError('delta_power_range or delta_power_range_dict_db missing in Span dict.')
 | |
| 
 | |
|     delta_power_range_db = span.get('delta_power_range_db', [0, 0, 0])
 | |
|     span['delta_power_range_dict_db'] = convert_range_to_dict(delta_power_range_db)
 | |
|     del span['delta_power_range_db']
 | |
| 
 | |
| 
 | |
| def process_si_data(si: Dict) -> None:
 | |
|     """Convert Span data with range in dict format
 | |
|     :param si: The span data to process.
 | |
|     :type si: Dict
 | |
|     """
 | |
|     if 'power_range_dict_db' in si:
 | |
|         return
 | |
| 
 | |
|     if 'power_range_db' not in si:
 | |
|         raise KeyError('power_range_db or power_range_dict_db missing in SI dict.')
 | |
| 
 | |
|     power_range_db = si.get('power_range_db', [0, 0, 0])
 | |
|     si['power_range_dict_db'] = convert_range_to_dict(power_range_db)
 | |
|     del si['power_range_db']
 | |
| 
 | |
| 
 | |
| def convert_delta_power_range(json_data: Dict) -> Dict:
 | |
|     """Convert legacy json equipment format to GNPy yang format revision 2025-01-20
 | |
| 
 | |
|     :param json_data: the input JSON data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: The converted JSON data.
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     if 'Span' in json_data:
 | |
|         for span in json_data['Span']:
 | |
|             process_span_data(span)
 | |
| 
 | |
|     if 'SI' in json_data:
 | |
|         for si in json_data['SI']:
 | |
|             process_si_data(si)
 | |
| 
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_back_delta_power_range(json_data: Dict) -> Dict:
 | |
|     """Convert Yang JSON revision 2025-01-20 equipment format to legacy GNPy format.
 | |
| 
 | |
|     :param json_data: the input JSON data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: The converted JSON data.
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     if 'Span' in json_data and 'delta_power_range_dict_db' in json_data['Span'][0]:
 | |
|         delta_power_range_db = json_data['Span'][0]['delta_power_range_dict_db']
 | |
|         json_data['Span'][0]['delta_power_range_db'] = [
 | |
|             delta_power_range_db['min_value'],
 | |
|             delta_power_range_db['max_value'],
 | |
|             delta_power_range_db['step']]
 | |
|         del json_data['Span'][0]['delta_power_range_dict_db']
 | |
|     if 'SI' in json_data and 'power_range_dict_db' in json_data['SI'][0]:
 | |
|         power_range_db = json_data['SI'][0]['power_range_dict_db']
 | |
|         json_data['SI'][0]['power_range_db'] = [
 | |
|             power_range_db['min_value'],
 | |
|             power_range_db['max_value'],
 | |
|             power_range_db['step']]
 | |
|         del json_data['SI'][0]['power_range_dict_db']
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def add_missing_default_type_variety(json_data: Dict) -> Dict:
 | |
|     """Case of ROADM: legacy does not enforce type_variety to be present.
 | |
|     This utils ensures that 'default' type_variety is inserted if the key is missing.
 | |
| 
 | |
|     :param json_data: the input JSON data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: The converted JSON data.
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     if 'Roadm' not in json_data:
 | |
|         return json_data
 | |
|     for i, elem in enumerate(json_data['Roadm']):
 | |
|         if 'type_variety' not in elem:
 | |
|             # make sure type_variety is the first key in the elem
 | |
|             temp = {'type_variety': 'default'}
 | |
|             temp.update(elem)
 | |
|             json_data['Roadm'][i] = temp
 | |
|             break
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def remove_null_region_city(json_data: Dict) -> Dict:
 | |
|     """if present, name should not be None.
 | |
| 
 | |
|     :param json_data: the input JSON data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: The converted JSON data.
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     for elem in json_data[ELEMENTS_KEY]:
 | |
|         if "metadata" in elem and "location" in elem[METADATA_KEY]:
 | |
|             for name in ['city', 'region']:
 | |
|                 if name in elem[METADATA_KEY][LOCATION_KEY] \
 | |
|                         and elem[METADATA_KEY][LOCATION_KEY][name] is None:
 | |
|                     elem[METADATA_KEY][LOCATION_KEY][name] = ""
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def remove_union_that_fail(json_data: Dict) -> Dict:
 | |
|     """Convert GNPy legacy JSON request format to GNPy yang format revision 2025-01-20
 | |
|     If present "N": or "M": should not contain empy data.
 | |
|     If present max-nb-of-channel should not contain empty data.
 | |
| 
 | |
|     :param json_data: the input JSON data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: The converted JSON data.
 | |
|     :rtype: Dict
 | |
|     """
 | |
|     for elem in json_data[PATH_REQUEST_KEY]:
 | |
|         te = elem['path-constraints']['te-bandwidth']
 | |
|         freq_slot = te.get('effective-freq-slot', None)
 | |
|         if freq_slot:
 | |
|             for slot in freq_slot:
 | |
|                 if slot.get('N', None) is None:
 | |
|                     slot.pop('N', None)
 | |
|                 if slot.get('M', None) is None:
 | |
|                     slot.pop('M', None)
 | |
|                 if not slot:
 | |
|                     te['effective-freq-slot'].remove(slot)
 | |
|             if not te['effective-freq-slot']:
 | |
|                 te.pop('effective-freq-slot', None)
 | |
|         for attribute in ['max-nb-of-channel', 'trx_mode', 'output-power']:
 | |
|             if te.get(attribute) is None:
 | |
|                 te.pop(attribute, None)
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_none_to_empty(json_data: Any):
 | |
|     """Convert all instances of None in the input to [None].
 | |
| 
 | |
|     This function recursively traverses the input and replaces any None
 | |
|     values with a list containing None. If the input is already a list
 | |
|     containing None, it returns the input unchanged.
 | |
| 
 | |
|     :param json_data: The input data to process, which can be of any type.
 | |
|     :type json_data: Any
 | |
|     :return: A new representation of the input with None values replaced by [None].
 | |
|     :rtype: Any
 | |
| 
 | |
|     :example:
 | |
|     >>> a = {'uid': '[930/WRT-2-2-SIG=>923/WRT-1-9-SIG]-923/AMP-1-13', 'type_variety': 'AMP',
 | |
|     ... 'metadata': {'location': {'latitude': 0.0, 'longitude': 0.0, 'city': 'Zion', 'region': ''}},
 | |
|     ... 'type': 'Multiband_amplifier', 'amplifiers': [{'type_variety': 'AMP_LOW_C',
 | |
|     ... 'operational': {'gain_target': 12.22, 'delta_p': 4.19, 'out_voa': None, 'tilt_target': 0.0,
 | |
|     ... 'f_min': 191.3, 'f_max': 196.1}}, {'type_variety': 'AMP_LOW_L',
 | |
|     ... 'operational': {'gain_target': 12.05, 'delta_p': 4.19, 'out_voa': None, 'tilt_target': 0.0,
 | |
|     ... 'f_min': 186.1, 'f_max': 190.9}}]}
 | |
|     >>> convert_none_to_empty(a)
 | |
|     {'uid': '[930/WRT-2-2-SIG=>923/WRT-1-9-SIG]-923/AMP-1-13', 'type_variety': 'AMP', \
 | |
| 'metadata': {'location': {'latitude': 0.0, 'longitude': 0.0, 'city': 'Zion', 'region': ''}}, \
 | |
| 'type': 'Multiband_amplifier', 'amplifiers': [{'type_variety': 'AMP_LOW_C', \
 | |
| 'operational': {'gain_target': 12.22, 'delta_p': 4.19, 'out_voa': [None], 'tilt_target': 0.0, \
 | |
| 'f_min': 191.3, 'f_max': 196.1}}, {'type_variety': 'AMP_LOW_L', \
 | |
| 'operational': {'gain_target': 12.05, 'delta_p': 4.19, 'out_voa': [None], 'tilt_target': 0.0, \
 | |
| 'f_min': 186.1, 'f_max': 190.9}}]}
 | |
| 
 | |
|     """
 | |
|     if json_data == [None]:
 | |
|         # already conformed
 | |
|         return json_data
 | |
|     if isinstance(json_data, dict):
 | |
|         for key, value in json_data.items():
 | |
|             json_data[key] = convert_none_to_empty(value)
 | |
|     elif isinstance(json_data, list):
 | |
|         for i, elem in enumerate(json_data):
 | |
|             json_data[i] = convert_none_to_empty(elem)
 | |
|     elif json_data is None:
 | |
|         return [None]
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_empty_to_none(json_data: Any):
 | |
|     """Convert all instances of [None] in the input to None.
 | |
| 
 | |
|     This function recursively traverses the input data and replaces any
 | |
|     lists containing a single None element with None. If the input is
 | |
|     already None, it returns None unchanged.
 | |
| 
 | |
|     :param json_data: The input data to process, which can be of any type.
 | |
|     :type json_data: Any
 | |
|     :return: A new representation of the input with [None] replaced by None.
 | |
|     :rtype: Any
 | |
| 
 | |
|     >>> json_data = {
 | |
|     ...     "uid": "[930/WRT-2-2-SIG=>923/WRT-1-9-SIG]-923/AMP-1-13",
 | |
|     ...     "type_variety": "AMP",
 | |
|     ...     "metadata": {
 | |
|     ...         "location": {
 | |
|     ...             "latitude": 0.000000,
 | |
|     ...             "longitude": 0.000000,
 | |
|     ...             "city": "Zion",
 | |
|     ...             "region": ""
 | |
|     ...         }
 | |
|     ...     },
 | |
|     ...     "type": "Multiband_amplifier",
 | |
|     ...     "amplifiers": [{
 | |
|     ...             "type_variety": "AMP_LOW_C",
 | |
|     ...             "operational": {
 | |
|     ...                 "gain_target": 12.22,
 | |
|     ...                 "delta_p": 4.19,
 | |
|     ...                 "out_voa": [None],
 | |
|     ...                 "tilt_target": 0.00,
 | |
|     ...                 "f_min": 191.3,
 | |
|     ...                 "f_max": 196.1
 | |
|     ...             }
 | |
|     ...         }, {
 | |
|     ...             "type_variety": "AMP_LOW_L",
 | |
|     ...             "operational": {
 | |
|     ...                 "gain_target": 12.05,
 | |
|     ...                 "delta_p": 4.19,
 | |
|     ...                 "out_voa": [None],
 | |
|     ...                 "tilt_target": 0.00,
 | |
|     ...                 "f_min": 186.1,
 | |
|     ...                 "f_max": 190.9
 | |
|     ...             }
 | |
|     ...         }
 | |
|     ...     ]
 | |
|     ... }
 | |
|     >>> convert_empty_to_none(json_data)
 | |
|     {'uid': '[930/WRT-2-2-SIG=>923/WRT-1-9-SIG]-923/AMP-1-13', 'type_variety': 'AMP', \
 | |
| 'metadata': {'location': {'latitude': 0.0, 'longitude': 0.0, 'city': 'Zion', 'region': ''}}, \
 | |
| 'type': 'Multiband_amplifier', 'amplifiers': [{'type_variety': 'AMP_LOW_C', \
 | |
| 'operational': {'gain_target': 12.22, 'delta_p': 4.19, 'out_voa': None, 'tilt_target': 0.0, \
 | |
| 'f_min': 191.3, 'f_max': 196.1}}, {'type_variety': 'AMP_LOW_L', \
 | |
| 'operational': {'gain_target': 12.05, 'delta_p': 4.19, 'out_voa': None, 'tilt_target': 0.0, \
 | |
| 'f_min': 186.1, 'f_max': 190.9}}]}
 | |
| 
 | |
|     """
 | |
|     if isinstance(json_data, dict):
 | |
|         for key, value in json_data.items():
 | |
|             json_data[key] = convert_empty_to_none(value)
 | |
|     elif isinstance(json_data, list):
 | |
|         if len(json_data) == 1 and json_data[0] is None:
 | |
|             return None
 | |
|         for i, elem in enumerate(json_data):
 | |
|             json_data[i] = convert_empty_to_none(elem)
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def remove_namespace_context(json_data: Union[Dict, List, float, int, str, bool, None], namespace: str) \
 | |
|         -> Union[Dict, List, float, int, str, bool, None]:
 | |
|     """Serialisation with yang introduces a namespace in values that
 | |
|     are defined as identity. this function filter them out.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to process.
 | |
|     :type json_data: Union[Dict, List, float, int, str, bool, None]
 | |
|     :param namespace: a namespace string
 | |
|     :type namespace: str
 | |
|     :return: the converted JSON data
 | |
|     :rtype: Union[Dict, List, float, int, str, bool, None]
 | |
| 
 | |
|     >>> a = [{"a": 123, "b": "123:alkdje"}, {"a": 456, "c": "123", "d": "123:123"}]
 | |
|     >>> remove_namespace_context(a, "123:")
 | |
|     [{'a': 123, 'b': 'alkdje'}, {'a': 456, 'c': '123', 'd': '123'}]
 | |
| 
 | |
|     """
 | |
|     if isinstance(json_data, dict):
 | |
|         for key, value in json_data.items():
 | |
|             json_data[key] = remove_namespace_context(value, namespace)
 | |
|     elif isinstance(json_data, list):
 | |
|         for i, elem in enumerate(json_data):
 | |
|             json_data[i] = remove_namespace_context(elem, namespace)
 | |
|     elif isinstance(json_data, str) and namespace in json_data:
 | |
|         return json_data.split(namespace)[1]
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_nf_coef(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy legacy format yang topology format.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     if 'Edfa' not in json_data:
 | |
|         return json_data
 | |
|     for edfa in json_data['Edfa']:
 | |
|         if 'nf_coef' in edfa and not isinstance(edfa['nf_coef'][0], dict):
 | |
|             nf_coef = edfa.pop('nf_coef')
 | |
|             new_nf_coef = [{'coef_order': i, 'nf_coef': c} for i, c in enumerate(nf_coef)]
 | |
|             edfa['nf_coef'] = new_nf_coef
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_back_nf_coef(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy yang format back to legacy json topology format.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert back.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted back JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     if 'Edfa' not in json_data:
 | |
|         return json_data
 | |
|     for edfa in json_data['Edfa']:
 | |
|         if 'nf_coef' in edfa and isinstance(edfa['nf_coef'][0], dict):
 | |
|             nf_coef = edfa.pop('nf_coef')
 | |
|             sorted_nf_coef = sorted(nf_coef, key=lambda x: x['coef_order'])
 | |
|             new_nf_coef = [c['nf_coef'] for c in sorted_nf_coef]
 | |
|             edfa['nf_coef'] = new_nf_coef
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_nf_fit_coef(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy legacy format yang topology format.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     if 'nf_fit_coeff' in json_data and not isinstance(json_data['nf_fit_coeff'][0], dict):
 | |
|         nf_coef = json_data.pop('nf_fit_coeff')
 | |
|         new_nf_coef = [{'coef_order': i, 'nf_coef': c} for i, c in enumerate(nf_coef)]
 | |
|         json_data['nf_fit_coeff'] = new_nf_coef
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_back_nf_fit_coef(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy yang format back to legacy json topology format.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert back.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted back JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     if 'nf_fit_coeff' in json_data and isinstance(json_data['nf_fit_coeff'][0], dict):
 | |
|         nf_coef = json_data.pop('nf_fit_coeff')
 | |
|         sorted_nf_coef = sorted(nf_coef, key=lambda x: x['coef_order'])
 | |
|         new_nf_coef = [c['nf_coef'] for c in sorted_nf_coef]
 | |
|         json_data['nf_fit_coeff'] = new_nf_coef
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_raman_coef(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy legacy format yang topology format.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     for elem in json_data[ELEMENTS_KEY]:
 | |
|         if PARAMS_KEY in elem and RAMAN_COEF_KEY in elem[PARAMS_KEY] \
 | |
|                 and 'g0' in elem[PARAMS_KEY][RAMAN_COEF_KEY]:
 | |
|             raman_coef = elem[PARAMS_KEY].pop(RAMAN_COEF_KEY)
 | |
|             g0_list = raman_coef.pop('g0', [])
 | |
|             frequency_offset_list = raman_coef.pop('frequency_offset', [])
 | |
|             if frequency_offset_list:
 | |
|                 new_raman_coef = {'reference_frequency': raman_coef['reference_frequency'],
 | |
|                                   'g0_per_frequency': [{'frequency_offset': f, 'g0': v}
 | |
|                                                        for f, v in zip(frequency_offset_list, g0_list)]}
 | |
|                 elem[PARAMS_KEY][RAMAN_COEF_KEY] = new_raman_coef
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_back_raman_coef(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy yang format back to legacy json topology format.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert back.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted back JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     for elem in json_data[ELEMENTS_KEY]:
 | |
|         if PARAMS_KEY in elem and RAMAN_COEF_KEY in elem[PARAMS_KEY] \
 | |
|                 and 'g0_per_frequency' in elem[PARAMS_KEY][RAMAN_COEF_KEY]:
 | |
|             raman_coef = elem[PARAMS_KEY].pop(RAMAN_COEF_KEY)
 | |
|             g0_list = [g['g0'] for g in raman_coef.pop('g0_per_frequency', [])]
 | |
|             frequency_offset_list = [f['frequency_offset'] for f in raman_coef.pop('g0_per_frequency', [])]
 | |
|             if frequency_offset_list:
 | |
|                 new_raman_coef = {'reference_frequency': raman_coef['reference_frequency'],
 | |
|                                   'g0': g0_list,
 | |
|                                   'frequency_offset': frequency_offset_list}
 | |
|                 elem[PARAMS_KEY][RAMAN_COEF_KEY] = new_raman_coef
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_raman_efficiency(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy legacy format yang topology format.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     if 'RamanFiber' not in json_data:
 | |
|         return json_data
 | |
|     for fiber_eqpt in json_data['RamanFiber']:
 | |
|         if RAMAN_EFFICIENCY_KEY in fiber_eqpt \
 | |
|                 and 'cr' in fiber_eqpt[RAMAN_EFFICIENCY_KEY]:
 | |
|             raman_efficiency = fiber_eqpt.pop(RAMAN_EFFICIENCY_KEY)
 | |
|             cr_list = raman_efficiency.pop('cr', [])
 | |
|             frequency_offset_list = raman_efficiency.pop('frequency_offset', [])
 | |
|             if frequency_offset_list:
 | |
|                 new_raman_efficiency = [{'frequency_offset': f, 'cr': v}
 | |
|                                         for f, v in zip(frequency_offset_list, cr_list)]
 | |
|                 fiber_eqpt[RAMAN_EFFICIENCY_KEY] = new_raman_efficiency
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def convert_back_raman_efficiency(json_data: Dict) -> Dict:
 | |
|     """Convert gnpy yang format back to legacy json topology format.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert back.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted back JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     if 'RamanFiber' not in json_data:
 | |
|         return json_data
 | |
|     for fiber_eqpt in json_data['RamanFiber']:
 | |
|         if RAMAN_EFFICIENCY_KEY in fiber_eqpt and isinstance(fiber_eqpt[RAMAN_EFFICIENCY_KEY], list):
 | |
|             raman_efficiency = fiber_eqpt.pop(RAMAN_EFFICIENCY_KEY)
 | |
|             cr_list = [c['cr'] for c in raman_efficiency]
 | |
|             frequency_offset_list = [f['frequency_offset'] for f in raman_efficiency]
 | |
|             if frequency_offset_list:
 | |
|                 old_raman_efficiency = {'cr': cr_list,
 | |
|                                         'frequency_offset': frequency_offset_list}
 | |
|                 fiber_eqpt[RAMAN_COEF_KEY] = old_raman_efficiency
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def reorder_keys(data_list: List, key: str) -> List:
 | |
|     """Roarder item in a dict placing the key (the key of a list with YANG meaning) first.
 | |
|     This is required because oopt-gnpy-libyang does not recognize the key when it is not placed first in the data node.
 | |
| 
 | |
|     :param json_data: the list of dictionary items.
 | |
|     :type data_list: List
 | |
|     :return: the converted back JSON data
 | |
|     :rtype: List
 | |
|     """
 | |
|     for item in data_list:
 | |
|         index_value = item.pop(key, None)
 | |
|         if index_value is not None:
 | |
|             # Place key first
 | |
|             new_item = {key: index_value}
 | |
|             # add other items
 | |
|             new_item.update(item)
 | |
|             # replace old element with new element
 | |
|             for k in list(item.keys()):
 | |
|                 item.pop(k)
 | |
|             item.update(new_item)
 | |
|     return data_list
 | |
| 
 | |
| 
 | |
| # next functions because ly requires that the key of a list be in the first position in the item
 | |
| def reorder_route_objects(json_data: Dict) -> Dict:
 | |
|     """Make sure that the index of a route object is placed first in the object.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     for request in json_data['path-request']:
 | |
|         if "explicit-route-objects" in request:
 | |
|             request["explicit-route-objects"]["route-object-include-exclude"] = \
 | |
|                 reorder_keys(request["explicit-route-objects"]["route-object-include-exclude"], "index")
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def reorder_lumped_losses_objects(json_data: Dict) -> Dict:
 | |
|     """Make sure that the position of a lumped loss object is placed first in the object.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     for element in json_data['elements']:
 | |
|         if "params" in element and "lumped_losses" in element["params"]:
 | |
|             element["params"]["lumped_losses"] = reorder_keys(element["params"]["lumped_losses"], "position")
 | |
|     return json_data
 | |
| 
 | |
| 
 | |
| def reorder_raman_pumps(json_data: Dict) -> Dict:
 | |
|     """Make sure that the frequency of a Raman pum object is placed first in the object.
 | |
| 
 | |
|     :param json_data: The input JSON topology data to convert.
 | |
|     :type json_data: Dict
 | |
|     :return: the converted JSON data
 | |
|     :rtype: dict
 | |
|     """
 | |
|     for element in json_data['elements']:
 | |
|         if "operational" in element and "raman_pumps" in element["operational"]:
 | |
|             element["operational"]["raman_pumps"] = reorder_keys(element["operational"]["raman_pumps"], "frequency")
 | |
|     return json_data
 |