mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-10-29 18:12:34 +00:00
47 lines
1.7 KiB
Python
47 lines
1.7 KiB
Python
import json
|
|
import os
|
|
|
|
from cloudshell.api.cloudshell_api import UpdateTopologyGlobalInputsRequest
|
|
|
|
import argparse
|
|
|
|
from common import wait_for_provisioning_status, get_session
|
|
|
|
run_number = os.environ.get('GITHUB_RUN_NUMBER', 1)
|
|
run_id = os.environ.get('GITHUB_JOB', 'job')
|
|
workflow = os.environ.get('GITHUB_WORKFLOW', "workflow")
|
|
marker_expression = os.environ.get('MARKER_EXPRESSION', 'sanity')
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--global-inputs', help="JSON dictionary that contains global inputs that will be passed to Quali", default="{}")
|
|
parser.add_argument('--reservation-duration', help='duration of reservation', default=360)
|
|
parser.add_argument('--reservation-id-file', help='file that the reservation ID will be written to', default='./reservation_id.txt')
|
|
parser.add_argument('blueprint', help='name of blueprint to create reservation from')
|
|
args = parser.parse_args()
|
|
|
|
session = get_session()
|
|
|
|
global_inputs = {}
|
|
try:
|
|
global_inputs = json.loads(args.global_inputs)
|
|
except json.JSONDecodeError as e:
|
|
print(f'failed to decode global inputs: {e}')
|
|
exit(1)
|
|
|
|
reservation = session.CreateImmediateTopologyReservation(
|
|
reservationName=f'{workflow}/{run_number}/{run_id}',
|
|
owner=session.username,
|
|
durationInMinutes=args.reservation_duration,
|
|
topologyFullPath=args.blueprint,
|
|
globalInputs=[UpdateTopologyGlobalInputsRequest(key, value) for key, value in global_inputs.items()]
|
|
).Reservation
|
|
|
|
with open(args.reservation_id_file, 'w') as f:
|
|
f.write(reservation.Id)
|
|
|
|
wait_for_provisioning_status(session, reservation.Id, ['Ready'], ['Teardown'])
|
|
|
|
if __name__ == '__main__':
|
|
main()
|