#!/usr/bin/env python3 # # Copyright (C) 2021 Broadcom. All rights reserved. The term "Broadcom" # refers solely to the Broadcom Inc. corporate affiliate that owns # the software below. This work is licensed under the OpenAFC Project License, # a copy of which is included with this software program # """ Description The execution always done in specific order: configuration, database, testing. In case there are not available valid responses to compare with need to make acquisition and create new database as following. ./afc_tests.py --addr
--log debug --cmd run """ import argparse import certifi import csv import datetime import hashlib import inspect import io import json import logging import openpyxl as oxl import os import re import requests import shutil import sqlite3 import subprocess import sys import time import smtplib import ssl from email import encoders from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from bs4 import BeautifulSoup from deepdiff import DeepDiff from multiprocessing.pool import Pool from string import Template from _afc_types import * from _afc_errors import * from _version import __version__ from _wfa_types import * AFC_URL_SUFFIX = '/fbrat/ap-afc/' AFC_REQ_NAME = 'availableSpectrumInquiry' AFC_WEBUI_URL_SUFFIX = '/fbrat/ratapi/v1/' AFC_WEBUI_REQ_NAME = 'availableSpectrumInquirySec' AFC_WEBUI_TOKEN = 'about_csrf' headers = {'content-type': 'application/json'} AFC_TEST_DB_FILENAME = 'afc_input.sqlite3' TBL_REQS_NAME = 'test_vectors' TBL_RESPS_NAME = 'test_data' TBL_USERS_NAME = 'user_config' TBL_AFC_CFG_NAME = 'afc_config' TBL_AP_CFG_NAME = 'ap_config' AFC_PROT_NAME = 'https' # metadata variables TESTCASE_ID = "testCaseId" # mandatory keys that need to be read from input text file MANDATORY_METADATA_KEYS = {TESTCASE_ID} app_log = logging.getLogger(__name__) class TestCfg(dict): """Keep test configuration""" def __init__(self): dict.__init__(self) self.update({ 'cmd': '', 'port': 443, 'url_path': AFC_PROT_NAME + '://', 'log_level': logging.INFO, 'db_filename': AFC_TEST_DB_FILENAME, 'tests': None, 'is_test_by_index': True, 'resp': '', 'stress': 0, 'precision': None}) def _send_recv(self, params): """Run AFC test and wait for respons""" data = params.split("'") get_req = '' for item in data: try: json.loads(item) except ValueError as e: continue get_req = item break new_req_json = json.loads(get_req.encode('utf-8')) new_req = json.dumps(new_req_json, sort_keys=True) if (self['webui'] is False): params_data = { 'conn_type': self['conn_type'], 'debug': self['debug'], 'edebug': cfg['elaborated_debug'], 'gui': self['gui'] } else: # emulating request calll from webui params_data = { 'debug': 'True', 'edebug': cfg['elaborated_debug'], 'gui': 'True' } if (self['cache'] == False): params_data['nocache'] = 'True' ser_cert = not self['verif'] cli_certs = () if (self['prot'] == AFC_PROT_NAME and self['verif'] == False): # add mtls certificates if explicitly provided if not isinstance(self['cli_cert'], type(None)): cli_certs = ("".join(self['cli_cert']), "".join(self['cli_key'])) # add tls certificates if explicitly provided if not isinstance(self['ca_cert'], type(None)): ser_cert = "".join(self['ca_cert']) app_log.debug(f"Client {cli_certs}, Server {ser_cert}") before_ts = time.monotonic() rawresp = requests.post( self['url_path'], params=params_data, data=new_req, headers=headers, timeout=600, # 10 min verify=self['verif']) resp = rawresp.json() tId = resp.get('taskId') if ((self['conn_type'] == 'async') and (not isinstance(tId, type(None)))): tState = resp.get('taskState') params_data['task_id'] = tId while (tState == 'PENDING') or (tState == 'PROGRESS'): app_log.debug('_run_test() state %s, tid %s, status %d', tState, tId, rawresp.status_code) time.sleep(2) rawresp = requests.get(self['url_path'], params=params_data) if rawresp.status_code == 200: resp = rawresp.json() break tm_secs = time.monotonic() - before_ts app_log.info('Test done at %.1f secs', tm_secs) return new_req, resp class TestResultComparator: """ AFC Response comparator Private instance attributes: _precision -- Precision for results' comparison in dB. 0 means exact match """ def __init__(self, precision): """ Constructor Arguments: precision -- Precision for results' comparison in dB. 0 means exact match """ assert precision >= 0 self._precision = precision def compare_results(self, ref_str, result_str): """ Compares reference and actual AFC responses Arguments: ref_str -- Reference response JSON in string representation result_str -- Actual response json in string representation Returns list of difference description strings. Empty list means match """ # List of difference description strings diffs = [] # Reference and actual JSON dictionaries jsons = [] for s, kind in [(ref_str, "reference"), (result_str, "result")]: try: jsons.append(json.loads(s)) except json.JSONDecodeError as ex: diffs.append(f"Failed to decode {kind} JSON data: {ex}") return diffs self._recursive_compare(jsons[0], jsons[1], [], diffs) return diffs def _recursive_compare(self, ref_json, result_json, path, diffs): """ Recursive comparator of JSON nodes Arguments: ref_json -- Reference response JSON dictionary result_json -- Actual response JSON dictionary path -- Path (sequence of indices) to node in question diffs -- List of difference description strings to update """ # Items in questions in JSON dictionaries ref_item = self._get_item(ref_json, path) result_item = self._get_item(result_json, path) # Human readable path representation for difference messages path_repr = f"[{']['.join(str(idx) for idx in path)}]" if ref_item == result_item: return # Items are equal - nothing to do # So, items are different. What's the difference? if isinstance(ref_item, dict): # One item is dictionary. Other should also should be dictionary... if not isinstance(result_item, dict): diffs.append(f"Different item types at {path_repr}") return # ... with same set of keys ref_keys = set(ref_item.keys()) result_keys = set(result_item.keys()) for unique_key in (ref_keys ^ result_keys): if self._compare_channel_lists(ref_json, result_json, path + [unique_key], diffs): ref_keys -= {unique_key} result_keys -= {unique_key} if ref_keys != result_keys: msg = f"Different set of keys at {path_repr}" for kind, elems in [("reference", ref_keys - result_keys), ("result", result_keys - ref_keys)]: if elems: msg += \ f" Unique {kind} keys: {', '.join(sorted(elems))}." diffs.append(msg) return # Comparing values for individual keys for key in sorted(ref_keys): self._recursive_compare(ref_json, result_json, path + [key], diffs) elif isinstance(ref_item, list): # One item is list. Other should also be list... if not isinstance(result_item, list): diffs.append(f"Different item types at {path_repr}") return # If this is a channel list (or part thereof - handle it) if self._compare_channel_lists(ref_json, result_json, path, diffs): return # Proceeding with comparison of other list kinds if len(ref_item) != len(result_item): diffs.append( (f"Different list lengths at at {path_repr}: " f"{len(ref_item)} elements in reference vs " f"{len(result_item)} elements in result")) return # Comparing individual elements for i in range(len(ref_item)): self._recursive_compare(ref_json, result_json, path + [i], diffs) else: # Items should be scalars for item, kind in [(ref_item, "Reference"), (result_item, "Result")]: if not isinstance(item, (int, float, str)): diffs.append((f"{kind} data contains unrecognized item " f"type at {path_repr}")) return diffs.append((f"Difference at {path}: reference content is " f"{ref_item}, result content is {result_item}")) def _compare_channel_lists(self, ref_json, result_json, path, diffs): """ Trying to compare channel lists Arguments: ref_json -- Reference response JSON dictionary result_json -- Actual response JSON dictionary path -- Path (sequence of indices) to node in question diffs -- List of difference description strings to update Returns true if channel list comparison was done, no further action required, False if node is not a channel list, should be compared as usual """ if path[-1] == "channelCfi": # Comparison will be made at "maxEirp" return True # Human readable path representation for difference messages path_repr = f"[{']['.join(str(idx) for idx in path)}]" # EIRP dictionaries, indexed by channel identification (number or # frequency range) ref_channels = {} result_channels = {} if path[-1] == "maxEirp": # Channel numbers for kind, src, chan_dict in \ [("reference", ref_json, ref_channels), ("result", result_json, result_channels)]: try: numbers = self._get_item(src, path[:-1] + ["channelCfi"], default_last=[]) chan_dict.update( dict(zip([str(n) for n in numbers], self._get_item(src, path, default_last=[])))) except (TypeError, ValueError, KeyError): diffs.append((f"Unrecognized channel list structure at " f"{path_repr} in {kind}")) return True elif path[-1] == "availableFrequencyInfo": # Channel frequencies for kind, src, chan_dict in \ [("reference", ref_json, ref_channels), ("result", result_json, result_channels)]: try: for freq_info in self._get_item(src, path, default_last=[]): fr = freq_info["frequencyRange"] low = fr['lowFrequency'] high = fr['highFrequency'] for freq in range(low, high): chan_dict[f"[{freq} - {freq+1}"] = \ float(freq_info.get("maxPSD") or freq_info.get("maxPsd")) except (TypeError, ValueError, KeyError): diffs.append((f"Unrecognized frequency list structure at " f"{path_repr} in {kind}")) return True else: return False # Now will compare two channel dictionaries # First looking for unique channels for this_kind, this_channels, other_kind, other_channels in \ [("reference", ref_channels, "result", result_channels), ("result", result_channels, "reference", ref_channels)]: for channel in sorted(set(this_channels.keys()) - set(other_channels.keys())): diffs.append( (f"Channel {channel} present in {path_repr} of " f"{this_kind} with EIRP limit of " f"{this_channels[channel]}dBm, but absent in " f"{other_kind}")) # Then looking for different EIRPs on common channels for channel in sorted(set(ref_channels.keys()) & set(result_channels.keys())): diff = abs(ref_channels[channel] - result_channels[channel]) if diff <= self._precision: continue diffs.append( (f"Different values in {path_repr} for channel {channel}: " f"reference has EIRP of {ref_channels[channel]}dBm, " f"result has EIRP of {result_channels[channel]}dBm, " f"difference is: {diff:g}dB")) return True def _get_item(self, j, path, default_last=None): """ Retrieves item by sequence of indices Arguments: j -- JSON dictionary path -- Sequence of indices default_last -- What to return if item at last index is absent. None means throw exception (if nonlast item is absent - exception is also thrown) Returns retrieved item """ for path_idx, elem_idx in enumerate(path): try: j = j[elem_idx] except (KeyError, IndexError): if (default_last is not None) and \ (path_idx == (len(path) - 1)): return default_last raise return j def json_lookup(key, json_obj, val): """Lookup for key in json and change it value if required""" keepit = [] def lookup(key, json_obj, val, keepit): if isinstance(json_obj, dict): for k, v in json_obj.items(): if isinstance(v, (dict, list)): lookup(key, v, val, keepit) elif k == key: keepit.append(v) if val: json_obj[k] = val elif isinstance(json_obj, list): for node in json_obj: lookup(key, node, val, keepit) return keepit found = lookup(key, json_obj, val, keepit) return found def get_md5(fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def create_email_attachment(filename): part = None with open(filename, "rb") as attachment: # Add file as application/octet-stream # Email client can usually download this automatically as attachment part = MIMEBase("application", "octet-stream") part.set_payload(attachment.read()) # Add header as key/value pair to attachment part encoders.encode_base64(part) part.add_header("Content-Disposition", f"attachment; filename= {filename}",) return part def send_email(cfg): """Send an email to predefined adress using gmail smtp server""" sender = cfg['email_from'] recipient = cfg['email_to'] app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()" f" from: {sender}, to: {recipient}") context = ssl.create_default_context() with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server: server.login(sender, cfg['email_pwd']) body = f"Please find attached responses." message = MIMEMultipart("alternative") message['Subject'] = f"AFC test results" message['From'] = sender message['To'] = recipient if not isinstance(cfg['email_cc'], type(None)): message['Cc'] = cfg['email_cc'] # Turn these into plain/html MIMEText objects message.attach(MIMEText(body, "plain")) message.attach(create_email_attachment(cfg['outfile'][0])) server.sendmail(sender, recipient, message.as_string()) def _send_recv(cfg, req_data, ssn=None): """Send AFC request and receiver it's response""" app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()") new_req_json = json.loads(req_data.encode('utf-8')) new_req = json.dumps(new_req_json, sort_keys=True) if (cfg['webui'] is False): params_data = { 'conn_type': cfg['conn_type'], 'debug': cfg['debug'], 'edebug': cfg['elaborated_debug'], 'gui': cfg['gui'] } if (cfg['cache'] == False): params_data['nocache'] = 'True' post_func = requests.post else: # emulating request call from webui params_data = { 'debug': 'True', 'gui': 'True' } headers['Accept-Encoding'] = 'gzip, defalte' headers['Referer'] = cfg['base_url'] + 'fbrat/www/index.html' headers['X-Csrf-Token'] = cfg['token'] app_log.debug( f"({os.getpid()}) {inspect.stack()[0][3]}()\n" f"Cookies: {requests.utils.dict_from_cookiejar(ssn.cookies)}") post_func = ssn.post ser_cert = () cli_certs = None if ((cfg['prot'] == AFC_PROT_NAME and cfg['verif']) or (cfg['ca_cert'])): # add mtls certificates if explicitly provided if not isinstance(cfg['cli_cert'], type(None)): cli_certs = ("".join(cfg['cli_cert']), "".join(cfg['cli_key'])) # add tls certificates if explicitly provided if not isinstance(cfg['ca_cert'], type(None)): ser_cert = "".join(cfg['ca_cert']) cfg['verif'] = True else: os.environ['REQUESTS_CA_BUNDLE'] = certifi.where() app_log.debug(f"REQUESTS_CA_BUNDLE " f"{os.environ.get('REQUESTS_CA_BUNDLE')}") if "REQUESTS_CA_BUNDLE" in os.environ: ser_cert = "".join(os.environ.get('REQUESTS_CA_BUNDLE')) cfg['verif'] = True else: app_log.error(f"Missing CA certificate while forced.") return app_log.debug(f"Client {cli_certs}, Server {ser_cert}") try: rawresp = post_func( cfg['url_path'], params=params_data, data=new_req, headers=headers, timeout=600, # 10 min cert=cli_certs, verify=ser_cert if cfg['verif'] else False) rawresp.raise_for_status() except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as err: app_log.error(f"{err}") return resp = rawresp.json() tId = resp.get('taskId') if ((cfg['conn_type'] == 'async') and (not isinstance(tId, type(None)))): tState = resp.get('taskState') params_data['task_id'] = tId while (tState == 'PENDING') or (tState == 'PROGRESS'): app_log.debug('_run_test() state %s, tid %s, status %d', tState, tId, rawresp.status_code) time.sleep(2) rawresp = requests.get(cfg['url_path'], params=params_data) if rawresp.status_code == 200: resp = rawresp.json() break app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()" f" Resp status: {rawresp.status_code}") return resp def _send_recv_token(cfg, ssn): """Making login, open session and getting CSRF token""" app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()") token = '' ser_cert = () cli_certs = None app_log.debug(f"=== ser {type(ser_cert)}") if ((cfg['prot'] == AFC_PROT_NAME and cfg['verif']) or (cfg['ca_cert'])): # add mtls certificates if explicitly provided if not isinstance(cfg['cli_cert'], type(None)): cli_certs = ("".join(cfg['cli_cert']), "".join(cfg['cli_key'])) # add tls certificates if explicitly provided if not isinstance(cfg['ca_cert'], type(None)): ser_cert = "".join(cfg['ca_cert']) cfg['verif'] = True else: os.environ['REQUESTS_CA_BUNDLE'] = certifi.where() app_log.debug(f"REQUESTS_CA_BUNDLE " f"{os.environ.get('REQUESTS_CA_BUNDLE')}") if "REQUESTS_CA_BUNDLE" in os.environ: ser_cert = "".join(os.environ.get('REQUESTS_CA_BUNDLE')) cfg['verif'] = True else: app_log.error(f"Missing CA certificate while forced.") return token app_log.debug(f"Client {cli_certs}, Server {ser_cert}") # get login ssn.headers.update({ 'Accept-Encoding': 'gzip, defalte' }) url_login = cfg['base_url'] + 'fbrat/user/sign-in' app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()\n" f"===> URL {url_login}\n" f"===> Status {ssn.headers}\n" f"===> Cookies: {ssn.cookies}\n") try: rawresp = ssn.get(url_login, stream=False, cert=cli_certs, verify=ser_cert if cfg['verif'] else False) except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as err: app_log.error(f"{err}") return token soup = BeautifulSoup(rawresp.text, 'html.parser') inp_tkn = soup.find('input', id='csrf_token') app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()\n" f"<--- Status {rawresp.status_code}\n" f"<--- Headers {rawresp.headers}\n" f"<--- Cookies: {ssn.cookies}\n" f"<--- Input: {inp_tkn}\n") token = inp_tkn.get('value') # fetch username and password from test db con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT * FROM %s\n' % TBL_USERS_NAME) found_user = cur.fetchall() con.close() found_json = json.loads(found_user[0][1]) app_log.debug(f"Found Users: {found_json['username']}") form_data = { 'next': '/', 'reg_next': '/', 'csrf_token': token, 'username': found_json['username'], 'password': found_json['password'] } ssn.headers.update({ 'Content-Type': 'application/x-www-form-urlencoded', 'Referer': url_login }) try: rawresp = ssn.post(url_login, data=form_data, stream=False, cert=cli_certs, verify=ser_cert if cfg['verif'] else False) except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as err: app_log.error(f"{err}") return token app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()\n" f"<--- Status {rawresp.status_code}\n" f"<--- Headers {rawresp.headers}\n" f"<--- Cookies: {ssn.cookies}\n") return token def make_db(filename): """Create DB file only with schema""" app_log.debug('%s()', inspect.stack()[0][3]) if os.path.isfile(filename): app_log.debug('%s() The db file is exists, no need to create new one.', inspect.stack()[0][3]) return True app_log.info('Create DB tables (%s, %s) from source files', TBL_REQS_NAME, TBL_RESPS_NAME) con = sqlite3.connect(filename) cur = con.cursor() cur.execute('CREATE TABLE IF NOT EXISTS ' + TBL_REQS_NAME + ' (test_id varchar(50), data json)') cur.execute('CREATE TABLE IF NOT EXISTS ' + TBL_RESPS_NAME + ' (test_id varchar(50), data json, hash varchar(255))') con.close() return True def compare_afc_config(cfg): """ Compare AFC configuration from the DB with provided one. """ app_log.debug('%s()', inspect.stack()[0][3]) if not os.path.isfile(cfg['db_filename']): app_log.error('Missing DB file %s', cfg['db_filename']) return AFC_ERR con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT * FROM %s' % TBL_AFC_CFG_NAME) found_cfgs = cur.fetchall() con.close() # get record from the input file if isinstance(cfg['infile'], type(None)): app_log.debug('Missing input file to compare with.') return AFC_OK filename = cfg['infile'][0] with open(filename, 'r') as fp_test: while True: rec = fp_test.read() if not rec: break try: get_rec = json.loads(rec) except (ValueError, TypeError) as e: continue break app_log.debug(json.dumps(get_rec, sort_keys=True, indent=4)) get_cfg = '' app_log.debug('Found %d config records', len(found_cfgs)) idx = 0 max_idx = len(found_cfgs) if not isinstance(cfg['idx'], type(None)): idx = cfg['idx'] if idx >= max_idx: app_log.error("The index (%d) is out of range (0 - %d).", idx, max_idx - 1) return AFC_ERR max_idx = idx + 1 while idx < max_idx: for item in list(found_cfgs[idx]): try: get_cfg = json.loads(item) except (ValueError, TypeError) as e: continue break app_log.debug("Record %d:\n%s", idx, json.dumps(get_cfg['afcConfig'], sort_keys=True, indent=4)) get_diff = DeepDiff(get_cfg['afcConfig'], get_rec, report_repetition=True) app_log.info("rec %d:\n%s", idx, get_diff) idx += 1 return AFC_OK def start_acquisition(cfg): """ Fetch test vectors from the DB, drop previous response table, run tests and fill responses in the DB with hash values """ app_log.debug(f"{inspect.stack()[0][3]}()") found_reqs, found_resp, ids, test_ids = _convert_reqs_n_resps_to_dict(cfg) # check if to make acquisition of all tests con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() # drop response table and create new one if all testcases required # to reacquisition all_resps = False if (len(test_ids) == len(found_reqs)): all_resps = True if all_resps: try: app_log.debug(f"{inspect.stack()[0][3]}() " f"drop table {TBL_RESPS_NAME}") cur.execute('DROP TABLE ' + TBL_RESPS_NAME) except Exception as OperationalError: # awkward but bearable app_log.debug('Missing table %s', TBL_RESPS_NAME) cur.execute('CREATE TABLE ' + TBL_RESPS_NAME + ' (test_id varchar(50), data json, hash varchar(255))') app_log.info(f"Number of tests to make acquisition - {len(test_ids)}") for test_id in test_ids: req_id = ids[test_id][0] app_log.debug(f"Request: {req_id}") resp = _send_recv(cfg, json.dumps(found_reqs[test_id][0])) if isinstance(resp, type(None)): app_log.error(f"Test {test_ids} ({req_id}) is Failed.") continue json_lookup('availabilityExpireTime', resp, '0') upd_data = json.dumps(resp, sort_keys=True) hash_obj = hashlib.sha256(upd_data.encode('utf-8')) app_log.debug(f"{inspect.stack()[0][3]}() new: " f"{hash_obj.hexdigest()}") if all_resps: cur.execute('INSERT INTO ' + TBL_RESPS_NAME + ' values ( ?, ?, ?)', [req_id, upd_data, hash_obj.hexdigest()]) con.commit() elif (test_id in found_resp.keys() and found_resp[test_id][1] == hash_obj.hexdigest()): app_log.debug(f"Skip to update hash for {req_id}. " f"Found the same value.") continue else: hash = hash_obj.hexdigest() cur.execute('UPDATE ' + TBL_RESPS_NAME + ' SET ' + 'data = ?, hash = ? WHERE test_id =?', (upd_data, hash, ids[test_id][0])) con.commit() con.close() return AFC_OK def process_jsonline(line): """ Function to process the input line from .txt file containing comma separated json strings """ # convert the input line to list of dictioanry/dictionaries line_list = json.loads("[" + line + "]") request_dict = line_list[0] metadata_dict = line_list[1] if len(line_list) > 1 else {} return request_dict, metadata_dict def get_db_req_resp(cfg): """ Function to retrieve request and response records from the database """ con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT * FROM %s' % TBL_REQS_NAME) found_reqs = cur.fetchall() db_reqs_list = [row[0] for row in found_reqs] cur.execute('SELECT * FROM %s' % TBL_RESPS_NAME) found_resps = cur.fetchall() db_resp_list = [row[0] for row in found_resps] con.close() return db_reqs_list, db_resp_list def insert_reqs_int(filename, con, cur): """ Insert requests from input file to a table in test db. """ with open(filename, 'r') as fp_test: while True: dataline = fp_test.readline() if not dataline: break # process dataline arguments app_log.debug(f"= {dataline}") request_json, metadata_json = process_jsonline(dataline) # reject the request if mandatory metadata arguments are not # present if not MANDATORY_METADATA_KEYS.issubset( set(metadata_json.keys())): # missing mandatory keys in test case input app_log.error("Test case input does not contain required" " mandatory arguments: %s", ", ".join(list( MANDATORY_METADATA_KEYS - set(metadata_json.keys())))) return AFC_ERR app_log.info(f"Insert new request in DB " f"({metadata_json[TESTCASE_ID]})") app_log.debug(f"+ {metadata_json[TESTCASE_ID]}") cur.execute('INSERT INTO ' + TBL_REQS_NAME + ' VALUES ( ?, ?)', (metadata_json[TESTCASE_ID], json.dumps(request_json),)) con.commit() con.close() return AFC_OK def insert_reqs(cfg): """ Insert requests from input file to a table in test db. Drop previous table of requests. """ app_log.debug(f"{inspect.stack()[0][3]}()") if isinstance(cfg['infile'], type(None)): app_log.error(f"Missing input file") return AFC_ERR filename = cfg['infile'][0] app_log.debug(f"{inspect.stack()[0][3]}() {filename}") if not os.path.isfile(filename): app_log.error(f"Missing raw test data file {filename}") return AFC_ERR if not os.path.isfile(cfg['db_filename']): app_log.error(f"Unable to find test db file.") return AFC_ERR con = sqlite3.connect(cfg['db_filename']) # drop existing table of requests and create new one app_log.info(f"Drop table of requests ({TBL_REQS_NAME})") cur = con.cursor() try: cur.execute('DROP TABLE ' + TBL_REQS_NAME) except Exception as OperationalError: app_log.debug(f"Fail to drop, missing table {TBL_REQS_NAME}") cur.execute('CREATE TABLE ' + TBL_REQS_NAME + ' (test_id varchar(50), data json)') con.commit() return insert_reqs_int(filename, con, cur) def extend_reqs(cfg): """ Insert requests from input file to a table in test db. Drop previous table of requests. """ app_log.debug(f"{inspect.stack()[0][3]}()") if isinstance(cfg['infile'], type(None)): app_log.error(f"Missing input file") return AFC_ERR filename = cfg['infile'][0] app_log.debug(f"{inspect.stack()[0][3]}() {filename}") if not os.path.isfile(filename): app_log.error(f"Missing raw test data file {filename}") return AFC_ERR if not os.path.isfile(cfg['db_filename']): app_log.error(f"Unable to find test db file.") return AFC_ERR con = sqlite3.connect(cfg['db_filename']) # add more rows to existing table of requests app_log.info(f"Extending table of requests ({TBL_REQS_NAME})") cur = con.cursor() return insert_reqs_int(filename, con, cur) def insert_devs(cfg): """ Insert device descriptors from input file to a table in test db. Drop previous table of devices. """ app_log.debug(f"{inspect.stack()[0][3]}()") if isinstance(cfg['infile'], type(None)): app_log.error(f"Missing input file") return AFC_ERR filename = cfg['infile'][0] app_log.debug(f"{inspect.stack()[0][3]}() {filename}") if not os.path.isfile(filename): app_log.error(f"Missing raw test data file {filename}") return AFC_ERR if not os.path.isfile(cfg['db_filename']): app_log.error(f"Unable to find test db file.") return AFC_ERR con = sqlite3.connect(cfg['db_filename']) # drop existing table of requests and create new one app_log.info(f"Drop table of devices ({TBL_AP_CFG_NAME})") cur = con.cursor() try: cur.execute('DROP TABLE ' + TBL_AP_CFG_NAME) except Exception as OperationalError: app_log.debug(f"Fail to drop, missing table {TBL_AP_CFG_NAME}") cur.execute('CREATE TABLE ' + TBL_AP_CFG_NAME + ' (ap_config_id, data json, user_id)') cnt = 1 con.commit() with open(filename, 'r') as fp_test: while True: dataline = fp_test.readline() if not dataline or (len(dataline) < 72): break # process dataline arguments app_log.debug(f"= {dataline}") cur.execute('INSERT INTO ' + TBL_AP_CFG_NAME + ' VALUES ( ?, ?, ?)', (cnt, dataline[:-1], 1)) con.commit() cnt += 1 con.close() return AFC_OK def add_reqs(cfg): """Prepare DB source files""" app_log.debug(f"{inspect.stack()[0][3]}()") if isinstance(cfg['infile'], type(None)): app_log.error('Missing input file') return AFC_ERR filename = cfg['infile'][0] app_log.debug('%s() %s', inspect.stack()[0][3], filename) if not os.path.isfile(filename): app_log.error('Missing raw test data file %s', filename) return AFC_ERR if not make_db(cfg['db_filename']): return AFC_ERR # fetch available requests and responses db_reqs_list, db_resp_list = get_db_req_resp(cfg) con = sqlite3.connect(cfg['db_filename']) with open(filename, 'r') as fp_test: while True: dataline = fp_test.readline() if not dataline: break # process dataline arguments request_json, metadata_json = process_jsonline(dataline) # reject the request if mandatory metadata arguments are not # present if not MANDATORY_METADATA_KEYS.issubset( set(metadata_json.keys())): # missing mandatory keys in test case input app_log.error("Test case input does not contain required" " mandatory arguments: %s", ", ".join(list( MANDATORY_METADATA_KEYS - set(metadata_json.keys())))) return AFC_ERR # check if the test case already exists in the database test # vectors if metadata_json[TESTCASE_ID] in db_reqs_list: app_log.error("Test case: %s already exists in database", metadata_json[TESTCASE_ID]) break app_log.info("Executing test case: %s", metadata_json[TESTCASE_ID]) new_req, resp = cfg._send_recv(json.dumps(request_json)) # get request id from a request, response not always has it # the request contains test category new_req_json = json.loads(new_req.encode('utf-8')) req_id = json_lookup('requestId', new_req_json, None) resp_res = json_lookup('shortDescription', resp, None) if (resp_res[0] != 'Success') \ and (req_id[0].lower().find('urs') == -1) \ and (req_id[0].lower().find('ibp') == -1): app_log.error('Failed in test response - %s', resp_res) break app_log.info('Got response for the request') json_lookup('availabilityExpireTime', resp, '0') app_log.info('Insert new request in DB') cur = con.cursor() cur.execute('INSERT INTO ' + TBL_REQS_NAME + ' VALUES ( ?, ?)', (metadata_json[TESTCASE_ID], new_req,)) con.commit() app_log.info('Insert new resp in DB') upd_data = json.dumps(resp, sort_keys=True) hash_obj = hashlib.sha256(upd_data.encode('utf-8')) cur = con.cursor() cur.execute('INSERT INTO ' + TBL_RESPS_NAME + ' values ( ?, ?, ?)', [metadata_json[TESTCASE_ID], upd_data, hash_obj.hexdigest()]) con.commit() con.close() return AFC_OK def dump_table(conn, tbl_name, out_file, pref): app_log.debug(f"{inspect.stack()[0][3]}() {tbl_name}") fp_new = '' if 'single' in out_file: fp_new = open(out_file['single'], 'w') conn.execute(f"SELECT * FROM {tbl_name}") found_data = conn.fetchall() for val in enumerate(found_data): if isinstance(fp_new, io.IOBase): fp_new.write(f"{str(val)}\n") elif 'split' in out_file: tbl_fname = { TBL_REQS_NAME: '_Request.txt', TBL_RESPS_NAME: '_Response.txt' } new_json = json.loads(val[1][1].encode('utf-8')) prefix, name, nbr = val[1][0].split('.') app_log.debug(f"{inspect.stack()[0][3]}() {name} {nbr}") # omit URS testcases if (name.lower().find('urs') != -1) or (pref and not pref == prefix): continue fp_test = open(f"{out_file['split']}/{prefix}_{name}_{nbr}" + f"{tbl_fname[tbl_name]}", 'a') fp_test.write(f"{val[1][1]}\n") fp_test.close() else: # Just dump to the console app_log.info(f"{val[1]}") if isinstance(fp_new, io.IOBase): fp_new.close() def dump_database(cfg): """Dump data from test DB tables""" app_log.debug(f"{inspect.stack()[0][3]}()") find_key = '' found_tables = [] # keep configuration for output path and files # 'single' - only single file for whole output # 'split' - separate file for each response out_file = {} if not os.path.isfile(cfg['db_filename']): app_log.error(f"Missing DB file {cfg['db_filename']}") return AFC_ERR set_dump_db_opts = { 'wfa': [(TBL_REQS_NAME,), (TBL_RESPS_NAME,)], 'all': [(TBL_REQS_NAME,), (TBL_RESPS_NAME,)], 'req': [(TBL_REQS_NAME,)], 'resp': [(TBL_RESPS_NAME,)], 'ap': [('ap_config',)], 'cfg': [('afc_config',)], 'user': [('user_config',)] } prefix = { 'wfa': "AFCS", 'all': None } tbl = 'True' if isinstance(cfg['table'], list): tbl = cfg['table'][0] con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() if tbl in set_dump_db_opts: # Dump only tables with requests and responses found_tables.extend(set_dump_db_opts[tbl]) elif tbl == 'True': # Dump all tables if no options provided cur.execute(f"SELECT name FROM sqlite_master WHERE type='table';") found_tables = cur.fetchall() pref = None if tbl == 'wfa' or tbl == 'all': if tbl in prefix: pref = prefix[tbl] out_file['split'] = './' if not isinstance(cfg['outpath'], type(None)): out_file['split'] = cfg['outpath'][0] + '/' out_file['split'] += WFA_TEST_DIR if os.path.exists(out_file['split']): shutil.rmtree(out_file['split']) os.mkdir(out_file['split']) elif isinstance(cfg['outfile'], type(None)): app_log.error(f"Missing output filename.\n") return AFC_ERR else: out_file['single'] = cfg['outfile'][0] for tbl in enumerate(found_tables): app_log.debug(f"Dump {tbl} to {out_file}") dump_table(cur, tbl[1][0], out_file, pref) con.close() return AFC_OK def export_admin_config(cfg): """Export admin server configuration""" app_log.debug('%s()', inspect.stack()[0][3]) con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT COUNT(*) FROM ' + TBL_AP_CFG_NAME) found_rcds = cur.fetchall() with open(cfg['outfile'][0], 'w') as fp_exp: cur.execute('SELECT * FROM %s' % TBL_AFC_CFG_NAME) found_cfg = cur.fetchall() app_log.debug('Found AfcCfg: %s', found_cfg) cur.execute('SELECT * FROM %s\n' % TBL_USERS_NAME) found_user = cur.fetchall() app_log.debug('Found Users: %s\n', found_user) cur.execute('SELECT * FROM %s\n' % TBL_AP_CFG_NAME) found_aps = cur.fetchall() con.close() aps = '' idx = 0 for count, val in enumerate(found_aps): aps += str(val[1]) + ',' app_log.debug('Found APs: %s\n', aps[:-1]) out_str = '{"afcAdminConfig":' + found_cfg[0][1] + ', '\ '"userConfig":' + found_user[0][1] + ', '\ '"apConfig":[' + aps[:-1] + ']}' fp_exp.write(out_str) app_log.info('Server admin config exported to %s', cfg['outfile'][0]) return AFC_OK def dry_run_test(cfg): """Run one or more requests from provided file""" if isinstance(cfg['infile'], type(None)): app_log.error('Missing input file') return AFC_ERR filename = cfg['infile'][0] app_log.debug('%s() %s', inspect.stack()[0][3], filename) if not os.path.isfile(filename): app_log.error('Missing raw test data file %s', filename) return AFC_ERR with open(filename, 'r') as fp_test: while True: dataline = fp_test.readline() if not dataline: break app_log.info('Request:') app_log.info(dataline) # process dataline arguments request_json, _ = process_jsonline(dataline) resp = _send_recv(cfg, json.dumps(request_json)) # get request id from a request, response not always has it # the request contains test category new_req_json = json.loads(json.dumps(request_json).encode('utf-8')) req_id = json_lookup('requestId', new_req_json, None) resp_res = json_lookup('shortDescription', resp, None) if (resp_res[0] != 'Success') \ and (req_id[0].lower().find('urs') == -1): app_log.error('Failed in test response - %s', resp_res) app_log.debug(resp) break app_log.info('Got response for the request') app_log.info('Resp:') app_log.info(resp) app_log.info('\n\n') json_lookup('availabilityExpireTime', resp, '0') upd_data = json.dumps(resp, sort_keys=True) hash_obj = hashlib.sha256(upd_data.encode('utf-8')) return AFC_OK def get_nbr_testcases(cfg): """ Find APs count on DB table """ if not os.path.isfile(cfg['db_filename']): print('INFO: Missing DB file %s', cfg['db_filename']) return False con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT count("requestId") from ' + TBL_REQS_NAME) found_data = cur.fetchall() db_inquiry_count = found_data[0][0] con.close() app_log.debug("found %s ap lists from db table", db_inquiry_count) return db_inquiry_count def collect_tests2combine(sh, rows, t_ident, t2cmb, cmb_t): """ Lookup for combined test vectors, build of required test vectors to combine """ app_log.debug('%s()\n', inspect.stack()[0][3]) for i in range(1, rows + 1): cell = sh.cell(row=i, column=PURPOSE_CLM) if ((cell.value is None) or (AFC_TEST_IDENT.get(cell.value.lower()) is None) or (cell.value == 'SRI')): continue if (t_ident != 'all') and (cell.value.lower() != t_ident): continue cell = sh.cell(row=i, column=COMBINED_CLM) if cell.value is not None and \ cell.value.upper() != 'NO': raw_list = str(cell.value) test_case_id = sh.cell(row=i, column=UNIT_NAME_CLM).value test_case_id += "." test_case_id += sh.cell(row=i, column=PURPOSE_CLM).value test_case_id += "." test_case_id += str(sh.cell(row=i, column=TEST_VEC_CLM).value) cmb_t[test_case_id] = [] for t in raw_list.split(','): if '-' in t: # found range of test vectors left, right = t.split('-') t2cmb_ident = '' for r in AFC_TEST_IDENT: if r in left.lower(): min = int(left.replace(r.upper(), '')) max = int(right.replace(r.upper(), '')) + 1 t2cmb_ident = r.upper() for cnt in range(min, max): tcase = t2cmb_ident + str(cnt) t2cmb[tcase] = '' cmb_t[test_case_id] += [tcase] else: # found single test vector t2cmb[t] = '' cmb_t[test_case_id] += [t] def _parse_tests_dev_desc(sheet, fp_new, rows): app_log.debug('%s()\n', inspect.stack()[0][3]) for i in range(1, rows + 1): res_str = "" cell = sheet.cell(row=i, column=PURPOSE_CLM) if ((cell.value is None) or (AFC_TEST_IDENT.get(cell.value.lower()) is None) or (cell.value == 'SRI')): continue # skip combined test vectors because device descriptor is missing cell = sheet.cell(row=i, column=COMBINED_CLM) if cell.value is not None and \ cell.value.upper() != 'NO': continue res_str += build_device_desc( sheet.cell(row=i, column=INDOOR_DEPL_CLM).value, sheet.cell(row=i, column=SER_NBR_CLM).value, sheet.cell(row=i, column=RULESET_CLM).value, sheet.cell(row=i, column=ID_CLM).value, True) fp_new.write(res_str + '\n') return res_str def _parse_tests_all(sheet, fp_new, rows, test_ident): app_log.debug('%s()\n', inspect.stack()[0][3]) # collect tests to combine in next loop tests2combine = dict() # gather combined tests combined_tests = dict() collect_tests2combine(sheet, rows, test_ident, tests2combine, combined_tests) if len(combined_tests): app_log.info('Found combined test vectors: %s', ' '.join(combined_tests)) app_log.info('Found test vectors to combine: %s', ' '.join(tests2combine)) for i in range(1, rows + 1): cell = sheet.cell(row=i, column=PURPOSE_CLM) if ((cell.value is None) or (AFC_TEST_IDENT.get(cell.value.lower()) is None) or (cell.value == 'SRI')): continue if (test_ident != 'all') and (cell.value.lower() != test_ident): continue uut = sheet.cell(row=i, column=UNIT_NAME_CLM).value purpose = sheet.cell(row=i, column=PURPOSE_CLM).value test_vec = sheet.cell(row=i, column=TEST_VEC_CLM).value test_case_id = uut + "." + purpose + "." + str(test_vec) # Prepare request header '{"availableSpectrumInquiryRequests": [{' res_str = REQ_INQUIRY_HEADER # check if the test case is combined cell = sheet.cell(row=i, column=COMBINED_CLM) if cell.value is not None and \ cell.value.upper() != 'NO': for item in combined_tests[test_case_id]: res_str += tests2combine[item] + ',' res_str = res_str[:-1] else: # # Inquired Channels # res_str += '{' + REQ_INQ_CHA_HEADER cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_131) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_131) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_132) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_132) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_133) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_133) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_134) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_134) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_136) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_136) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_137) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_137) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}' + REQ_INQ_CHA_FOOTER + ' ' # # Device descriptor # res_str += REQ_DEV_DESC_HEADER res_str += build_device_desc( sheet.cell(row=i, column=INDOOR_DEPL_CLM).value, sheet.cell(row=i, column=SER_NBR_CLM).value, sheet.cell(row=i, column=RULESET_CLM).value, sheet.cell(row=i, column=ID_CLM).value, False) res_str += ',' # # Inquired Frequency Range # res_str += REQ_INQ_FREQ_RANG_HEADER freq_range = AfcFreqRange() freq_range.set_range_limit( sheet.cell( row=i, column=INQ_FREQ_RNG_LOWFREQ_A), 'low') freq_range.set_range_limit( sheet.cell( row=i, column=INQ_FREQ_RNG_HIGHFREQ_A), 'high') try: res_str += freq_range.append_range() except IncompleteFreqRange as e: app_log.debug(f"{e} - row {i}") freq_range = AfcFreqRange() freq_range.set_range_limit( sheet.cell( row=i, column=INQ_FREQ_RNG_LOWFREQ_B), 'low') freq_range.set_range_limit( sheet.cell( row=i, column=INQ_FREQ_RNG_HIGHFREQ_B), 'high') try: tmp_str = freq_range.append_range() res_str += ', ' + tmp_str except IncompleteFreqRange as e: app_log.debug(f"{e} - row {i}") res_str += REQ_INQ_FREQ_RANG_FOOTER cell = sheet.cell(row=i, column=MINDESIREDPOWER) if (cell.value): res_str += REQ_MIN_DESIRD_PWR + str(cell.value) + ', ' # # Location # res_str += REQ_LOC_HEADER cell = sheet.cell(row=i, column=INDOORDEPLOYMENT) res_str += REQ_LOC_INDOORDEPL + str(cell.value) + ', ' # Location - elevation res_str += REQ_LOC_ELEV_HEADER cell = sheet.cell(row=i, column=ELE_VERTICALUNCERTAINTY) if isinstance(cell.value, int): res_str += REQ_LOC_VERT_UNCERT + str(cell.value) + ', ' cell = sheet.cell(row=i, column=ELE_HEIGHTTYPE) res_str += REQ_LOC_HEIGHT_TYPE + '"' + str(cell.value) + '"' cell = sheet.cell(row=i, column=ELE_HEIGHT) if isinstance(cell.value, int) or isinstance(cell.value, float): res_str += ', ' + REQ_LOC_HEIGHT + str(cell.value) res_str += '}, ' # Location - uncertainty reqion geo_coor = AfcGeoCoordinates(sheet, i) try: res_str += geo_coor.collect_coordinates() except IncompleteGeoCoordinates as e: app_log.debug(e) res_str += REQ_LOC_FOOTER cell = sheet.cell(row=i, column=REQ_ID_CLM) if isinstance(cell.value, str): req_id = cell.value else: req_id = "" res_str += REQ_REQUEST_ID + '"' + req_id + '"' res_str += '}' # collect test vectors required for combining # build test case id in format