#!/usr/bin/env python3 # -*- coding: utf-8 -*- # SPDX-License-Identifier: BSD-3-Clause # test_spectrum_assignment # Copyright (C) 2025 Telecom Infra Project and GNPy contributors # see AUTHORS.rst for a list of contributors """ test spectrum assignment utilities """ from pathlib import Path from copy import deepcopy import json from math import ceil import pytest from gnpy.core.network import build_network from gnpy.core.utils import lin2db, automatic_nch from gnpy.core.elements import Roadm, Transceiver from gnpy.core.exceptions import ServiceError, SpectrumError from gnpy.topology.request import compute_path_dsjctn, find_reversed_path, deduplicate_disjunctions, PathRequest from gnpy.topology.spectrum_assignment import (build_oms_list, align_grids, nvalue_to_frequency, bitmap_sum, Bitmap, spectrum_selection, pth_assign_spectrum, build_path_oms_id_list, aggregate_oms_bitmap) from gnpy.tools.json_io import (load_equipment, load_network, requests_from_json, disjunctions_from_json, _check_one_request) TEST_DIR = Path(__file__).parent DATA_DIR = TEST_DIR / 'data' EQPT_FILENAME = DATA_DIR / 'eqpt_config.json' NETWORK_FILENAME = DATA_DIR / 'testTopology_auto_design_expected.json' SERVICE_FILENAME = DATA_DIR / 'testTopology_services_expected.json' EXTRA_CONFIGS = {"std_medium_gain_advanced_config.json": DATA_DIR / "std_medium_gain_advanced_config.json", "Juniper-BoosterHG.json": DATA_DIR / "Juniper-BoosterHG.json"} grid = 0.00625e12 slot = 0.0125e12 guardband = 25.0e9 cband_freq_min = 191.3e12 cband_freq_max = 196.1e12 @pytest.fixture() def equipment(): equipment = load_equipment(EQPT_FILENAME, EXTRA_CONFIGS) return equipment @pytest.fixture() def setup(equipment): """common setup for tests: builds network, equipment and oms only once""" network = load_network(NETWORK_FILENAME, equipment) spectrum = equipment['SI']['default'] p_db = spectrum.power_dbm p_total_db = p_db + lin2db(automatic_nch(spectrum.f_min, spectrum.f_max, spectrum.spacing)) build_network(network, equipment, p_db, p_total_db) oms_list = build_oms_list(network, equipment) return network, oms_list def test_oms(setup): """tests that the OMS is between two ROADMs, that there is no ROADM or transceivers in the OMS except end points, checks that the id of OMS is present in the element and that the element OMS id is consistant """ network, oms_list = setup for oms in oms_list: assert isinstance(oms.el_list[0], Roadm) and isinstance(oms.el_list[-1], Roadm) for i, elem in enumerate(oms.el_list[1:-1]): assert not isinstance(elem, Roadm) assert elem in network.nodes() assert elem.oms.oms_id == oms.oms_id assert elem.uid == oms.el_id_list[i + 1] @pytest.mark.parametrize('nval', [0, 10, -255]) @pytest.mark.parametrize('mval', [1, 8]) def test_acceptable_values(nval, mval, setup): """Reasonable parameters should not throw""" network, oms_list = setup random_oms = oms_list[5] random_oms.assign_spectrum(nval, mval) @pytest.mark.parametrize('nval,mval', ( (0, 600), (-300, 1), (500, 1), (0, -2), (0, 4.5), (0.5, 8), )) def test_wrong_values(nval, mval, setup): """Out of range or invalid values""" network, oms_list = setup random_oms = oms_list[5] with pytest.raises(SpectrumError): random_oms.assign_spectrum(nval, mval) @pytest.mark.parametrize('nmin', [-288, -260, -300]) @pytest.mark.parametrize('nmax', [480, 320, 450]) def test_aligned(nmin, nmax, setup): """Checks that the OMS grid is correctly aligned Note that bitmap index uses guardband on both ends so that if center frequencies nmin, nmax = -200, +200, min/max navalue in bitmap are -224, +223, which makes 223 -(-224) +1 frequencies. """ network, oms_list = setup nguard = guardband / grid center = 193.1e12 # amplification band freq_min = center + nmin * grid - guardband freq_max = center + nmax * grid + guardband random_oms = oms_list[10] # We're always starting with full C-band assert pytest.approx(nvalue_to_frequency(random_oms.spectrum_bitmap.freq_index_min) * 1e-12, abs=1e-12) == 191.3 assert pytest.approx(nvalue_to_frequency(random_oms.spectrum_bitmap.freq_index_max) * 1e-12, abs=1e-12) == 196.1 ind_max = len(random_oms.spectrum_bitmap.bitmap) - 1 # "inner" frequencies, without the guard band inner_n_min = random_oms.spectrum_bitmap.getn(0) + nguard inner_n_max = random_oms.spectrum_bitmap.getn(ind_max) - nguard assert inner_n_min == random_oms.spectrum_bitmap.freq_index_min assert inner_n_max == random_oms.spectrum_bitmap.freq_index_max assert inner_n_min == -288 assert inner_n_max == 480 # checks that changes are applied on bitmap and freq attributes of Bitmap object random_oms.update_spectrum(freq_min, freq_max, grid=grid, guardband=guardband) assert nmin == random_oms.spectrum_bitmap.freq_index_min assert nmax == random_oms.spectrum_bitmap.freq_index_max print('Adjusted spectrum: {:f} - {:f}'.format( nvalue_to_frequency(random_oms.spectrum_bitmap.freq_index_min) * 1e-12, nvalue_to_frequency(random_oms.spectrum_bitmap.freq_index_max) * 1e-12 )) ind_max = len(random_oms.spectrum_bitmap.bitmap) - 1 inner_n_min = random_oms.spectrum_bitmap.getn(0) + nguard inner_n_max = random_oms.spectrum_bitmap.getn(ind_max) - nguard assert inner_n_min == random_oms.spectrum_bitmap.freq_index_min assert inner_n_max == random_oms.spectrum_bitmap.freq_index_max oms_list = align_grids(oms_list) ind_max = len(random_oms.spectrum_bitmap.bitmap) - 1 nvalmin = random_oms.spectrum_bitmap.getn(0) nvalmax = random_oms.spectrum_bitmap.getn(ind_max) assert nvalmin <= nmin and nvalmax >= nmax @pytest.mark.parametrize('nval1', [0, 15, 24]) @pytest.mark.parametrize('nval2', [8, 12]) def test_assign_and_sum(nval1, nval2, setup): """checks that bitmap sum gives correct result""" network, oms_list = setup guardband = grid mval = 4 # slot in 12.5GHz freq_max = cband_freq_min + 24 * grid # arbitrary test on oms #10 and #11 # first reduce the grid to 24 center frequencies to ease reading when test fails oms1 = oms_list[10] oms1.update_spectrum(cband_freq_min, freq_max, grid=grid, guardband=guardband) oms2 = oms_list[11] oms2.update_spectrum(cband_freq_min, freq_max, grid=grid, guardband=guardband) print('initial spectrum') print(nvalue_to_frequency(oms1.spectrum_bitmap.freq_index_min) * 1e-12, nvalue_to_frequency(oms1.spectrum_bitmap.freq_index_max) * 1e-12) # checks initial values consistancy ind_max = len(oms1.spectrum_bitmap.bitmap) - 1 print('with guardband', oms1.spectrum_bitmap.getn(0), oms1.spectrum_bitmap.getn(ind_max)) print('without guardband', oms1.spectrum_bitmap.freq_index_min, oms1.spectrum_bitmap.freq_index_max) # if requested slots exceed grid spectrum should not be assigned and assignment # should return False if ((nval1 - mval) < oms1.spectrum_bitmap.getn(0) or (nval1 + mval - 1) > oms1.spectrum_bitmap.getn(ind_max)): with pytest.raises(SpectrumError): oms1.assign_spectrum(nval1, mval) for elem in oms1.spectrum_bitmap.bitmap: assert elem == 1 else: oms2.assign_spectrum(nval2, mval) print(oms2.spectrum_bitmap.bitmap) test2 = bitmap_sum(oms1.spectrum_bitmap.bitmap, oms2.spectrum_bitmap.bitmap) print(test2) range1 = range(oms1.spectrum_bitmap.geti(nval1) - mval, oms1.spectrum_bitmap.geti(nval1) + mval - 1) range2 = range(oms2.spectrum_bitmap.geti(nval2) - mval, oms2.spectrum_bitmap.geti(nval2) + mval - 1) for elem in range1: print(f'value should be zero at index {elem}') assert test2[elem] == 0 for elem in range2: print(f'value should be zero at index {elem}') assert test2[elem] == 0 def test_bitmap_assignment(setup): """test that a bitmap can be assigned""" _, oms_list = setup random_oms = oms_list[2] random_oms.assign_spectrum(13, 7) btmp = deepcopy(random_oms.spectrum_bitmap.bitmap) # try a first assignment that must pass freq_min = nvalue_to_frequency(random_oms.spectrum_bitmap.n_min) freq_max = nvalue_to_frequency(random_oms.spectrum_bitmap.n_max) _ = Bitmap(freq_min, freq_max, grid=0.00625e12, guardband=guardband, bitmap=btmp) # try a wrong assignment that should not pass btmp = btmp[1:-1] with pytest.raises(SpectrumError): _ = Bitmap(cband_freq_min, cband_freq_max, grid=0.00625e12, guardband=guardband, bitmap=btmp) @pytest.fixture() def services(equipment): """common setup for service list: builds service only once""" with open(SERVICE_FILENAME, encoding='utf-8') as my_f: services = json.loads(my_f.read()) return services @pytest.fixture() def requests(equipment, services): """common setup for requests, builds requests list only once""" requests = requests_from_json(services, equipment) return requests def test_spectrum_assignment_on_path(equipment, setup, requests): """test assignment functions on path and network""" network, oms_list = setup req = [deepcopy(requests[1])] paths = compute_path_dsjctn(network, equipment, req, []) first_path_oms = build_path_oms_id_list(paths[0]) print(req) for nval in range(100): req = [deepcopy(requests[1])] test_oms = aggregate_oms_bitmap(first_path_oms, oms_list) center_n, startn, stopn = spectrum_selection(test_oms, 4) pth_assign_spectrum(paths, req, oms_list, [find_reversed_path(paths[0])]) print(f'testing on following oms {first_path_oms}') # check that only 96 channels are feasible if nval >= 96: print(center_n, startn, stopn) print('only 96 channels of 4 slots pass in this grid') assert center_n is None and startn is None and stopn is None else: print(center_n, startn, stopn) print('at least 96 channels of 4 slots should pass in this grid') assert center_n is not None and startn is not None and stopn is not None req = [requests[2]] paths = compute_path_dsjctn(network, equipment, req, []) second_path_oms = build_path_oms_id_list(paths[0]) test_oms = aggregate_oms_bitmap(second_path_oms, oms_list) center_n, startn, stopn = spectrum_selection(test_oms, 4, 478) print(oms_list[0].spectrum_bitmap.freq_index_max) print(oms_list[0]) print(center_n, startn, stopn) print('spectrum selection error: should be None') assert center_n is None and startn is None and stopn is None center_n, startn, stopn = spectrum_selection(test_oms, 4, 477) print(center_n, startn, stopn) print('spectrum selection error should not be None') assert center_n is not None and startn is not None and stopn is not None @pytest.fixture() def request_set(): """creates default request dict""" return { 'request_id': '0', 'source': 'trx a', 'bidir': False, 'destination': 'trx g', 'trx_type': 'Voyager', 'trx_mode': 'mode 1', 'format': 'mode1', 'spacing': 50e9, 'nodes_list': [], 'loose_list': [], 'f_min': 191.1e12, 'f_max': 196.3e12, 'baud_rate': 32e9, 'OSNR': 14, 'bit_rate': 100e9, 'cost': 1, 'roll_off': 0.15, 'tx_osnr': 38, 'tx_power': 0.001, 'penalties': {}, 'min_spacing': 37.5e9, 'nb_channel': None, 'power': 0, 'path_bandwidth': 800e9, 'equalization_offset_db': 0} def test_freq_slot_exist(setup, equipment, request_set): """test that assignment works even if effective_freq_slot is not populated""" network, oms_list = setup params = request_set params['effective_freq_slot'] = [{'N': None, 'M': None}] rqs = [PathRequest(**params)] paths = compute_path_dsjctn(network, equipment, rqs, []) pth_assign_spectrum(paths, rqs, oms_list, [find_reversed_path(paths[0])]) assert rqs[0].N == [-256] assert rqs[0].M == [32] def test_inconsistant_freq_slot(setup, equipment, request_set): """test that an inconsistant M correctly raises an error""" network, oms_list = setup params = request_set # minimum required nb of slots is 32 (800Gbit/100Gbit/s channels each occupying 50GHz ie 4 slots) params['effective_freq_slot'] = [{'N': 0, 'M': 4}] with pytest.raises(ServiceError): _check_one_request(params, 196.05e12) params['trx_mode'] = None rqs = [PathRequest(**params)] paths = compute_path_dsjctn(network, equipment, rqs, []) pth_assign_spectrum(paths, rqs, oms_list, [find_reversed_path(paths[0])]) assert rqs[0].blocking_reason == 'NOT_ENOUGH_RESERVED_SPECTRUM' @pytest.mark.parametrize('req_n, req_m, final_n, final_m, blocking_reason, raises_error', [ # regular requests that should be correctly assigned: ([-100], [32], [-100], [32], None, False), ([150], [50], [150], [50], None, False), # if n is None, there should be an assignment (enough spectrum cases) # and the center frequency should be set on the lower part of the spectrum based on m value if it exists # or based on 32 ([None], [32], [-256], [32], None, False), ([None], [40], [-248], [40], None, False), ([-100], [None], [-100], [32], None, False), ([None], [None], [-256], [32], None, False), # -280 and 60 center indexes should result in unfeasible spectrum, either out of band or # overlapping with occupied spectrum. The requested spectrum is not available ([None], [300], None, None, 'NO_SPECTRUM', False), ([-280], [None], None, None, 'NO_SPECTRUM', False), ([-60], [40], None, None, 'NO_SPECTRUM', False), # raises service error: M value too small ([-60], [3], None, None, 'NOT_ENOUGH_RESERVED_SPECTRUM', True), # 20 is smaller than min 32 required nb of slots so should also be blocked ([-60], [20], None, None, 'NOT_ENOUGH_RESERVED_SPECTRUM', False), # multiple assignments ([-100, -164], [16, 16], [-100, -164], [16, 16], None, False), ([-100, -164], [32, 32], [-100, -164], [32, 32], None, False), ([-100, -164], [None, None], [-164], [32], None, False), ([None, None], [16, 16], [-272, -240], [16, 16], None, False), ([None, None, None], [16, 16, None], [-272, -240], [16, 16], None, False), ([None, None], [None, None], [-256], [32], None, False), ([-272, None], [16, 16], [-272, -240], [16, 16], None, False), ([-272, 100], [None, 16], [-272, 100], [16, 16], None, False), # first assign defined Ms whatever the N (but order them), and then uses imposed N. Fill in with the max # available nb of slots (centered on N). ([-88, -100, -116, None], [8, None, 12, None], [-88, -100, -116, -280], [8, 4, 12, 8], None, False), # If no M is defined, uses th Ns to fill in with the max possible nb of slots (with respecte to request, # here it is 32 slots) ([-88, -106, -116, None], [None, None, None, None], [-116], [32], None, False), # if one defined N, M is not applicable then blocks the spectrum (even f other slots are OK) # only 2 slots remains between -104 (-100 - 4) and -108 (-112 + 4). So (-106, None) is not feasible, because min # required M is 4 for Voyager, Mode 1 ([-100, -106, -112], [4, None, 4], None, None, 'NO_SPECTRUM', False), # required nb of channels is 8 with 4 slots each. Next two spectrum are not providing enough spectrum # raises service error: not enough nb of channels ([-88, -100, -116], [4, 4, 4], None, None, 'NOT_ENOUGH_RESERVED_SPECTRUM', True), ([-88, -100, -116], [4, None, 4], None, None, 'NO_SPECTRUM', False), # only 4 slots remains between -96 (-88 -8) and -104 (-116 + 12), and centered on -100, so N = -101 is not # feasible whatever the M. ([-88, -101, -116, None], [8, 4, 12, None], None, None, 'NO_SPECTRUM', False), ([-88, -101, -116, -250], [4, 4, 12, 12], None, None, 'NO_SPECTRUM', False), ([-88, -101, -116, None], [8, None, 12, None], None, None, 'NO_SPECTRUM', False), # raises service error: slots overlap ([-88, -81, -116, -136], [8, 8, 12, 8], None, None, 'NO_SPECTRUM', True), ]) def test_n_m_requests(setup, equipment, req_n, req_m, final_n, final_m, blocking_reason, raises_error, request_set): """test that various N and M values for a request end up with the correct path assignment""" network, oms_list = setup # add an occupation on one of the span of the expected path OMS list on both directions # as defined by its offsets within the OMS list: [17, 20, 13, 22] and reversed path [19, 16, 21, 26] expected_path = [17, 20, 13, 22] expected_oms = [13, 16, 17, 19, 20, 21, 22, 26] some_oms = oms_list[expected_oms[3]] some_oms.assign_spectrum(-30, 32) # means that spectrum is occupied from indexes -62 to 1 on reversed path params = request_set params['effective_freq_slot'] = [{'N': n, 'M': m} for n, m in zip(req_n, req_m)] if raises_error: with pytest.raises(ServiceError): _check_one_request(params, 196.3e12) rqs = [PathRequest(**params)] paths = compute_path_dsjctn(network, equipment, rqs, []) # check that the computed path is the expected one (independant of blocking issues due to spectrum) path_oms = list(set([e.oms_id for e in paths[0] if not isinstance(e, (Transceiver, Roadm))])) assert path_oms == expected_path # function to be tested: pth_assign_spectrum(paths, rqs, oms_list, [find_reversed_path(paths[0])]) # check that spectrum is correctly assigned assert rqs[0].N == final_n assert rqs[0].M == final_m assert getattr(rqs[0], 'blocking_reason', None) == blocking_reason def test_reversed_direction(equipment, setup, requests, services): """checks that if spectrum is selected on one direction it is also selected on reversed direction""" network, oms_list = setup dsjn = disjunctions_from_json(services) dsjn = deduplicate_disjunctions(dsjn) paths = compute_path_dsjctn(network, equipment, requests, dsjn) rev_pths = [] for pth in paths: if pth: rev_pths.append(find_reversed_path(pth)) else: rev_pths.append([]) # build the list of spectrum slots that will be used for each request. For this purpose # play the selection part of path_assign_spectrum spectrum_list = [] for i, pth in enumerate(paths): if pth: number_wl = ceil(requests[i].path_bandwidth / requests[i].bit_rate) requested_m = ceil(requests[i].spacing / slot) * number_wl path_oms = build_path_oms_id_list(pth) test_oms = aggregate_oms_bitmap(path_oms, oms_list) center_n, startn, stopn = spectrum_selection(test_oms, requested_m, requested_n=None) spectrum_list.append([center_n, startn, stopn]) else: spectrum_list.append([]) pth_assign_spectrum(paths, requests, oms_list, rev_pths) # pth-assign concatenates path and reversed path for i, pth in enumerate(paths): # verifies that each element (not trx and not roadm) in the path has same # spectrum occupation if pth: this_path = [elem for elem in pth if not isinstance(elem, Roadm) and not isinstance(elem, Transceiver)] print(f'path {[el.uid for el in this_path]}') this_revpath = [elem for elem in rev_pths[i] if not isinstance(elem, Roadm) and not isinstance(elem, Transceiver)] print(f'rev_path {[el.uid for el in this_revpath]}') print('') for j, elem in enumerate(this_revpath): imin = elem.oms.spectrum_bitmap.geti(spectrum_list[i][1]) imax = elem.oms.spectrum_bitmap.geti(spectrum_list[i][2]) print(f'rev_elem {elem.uid}') print(f' elem {this_path[len(this_path)-j-1].uid}') print(f'\trev_spectrum: {elem.oms.spectrum_bitmap.bitmap[imin:imax]}') print(f'\t spectrum: ' + f'{this_path[len(this_path)-j-1].oms.spectrum_bitmap.bitmap[imin:imax]}') assert elem.oms.spectrum_bitmap.bitmap[imin:imax] == \ this_path[len(this_path) - j - 1].oms.spectrum_bitmap.bitmap[imin:imax]