#!/usr/bin/env python3 # # Copyright (C) 2021 Broadcom. All rights reserved. The term "Broadcom" # refers solely to the Broadcom Inc. corporate affiliate that owns # the software below. This work is licensed under the OpenAFC Project License, # a copy of which is included with this software program # """ Description The execution always done in specific order: configuration, database, testing. In case there are not available valid responses to compare with need to make acquisition and create new database as following. ./afc_tests.py --addr
--log debug --cmd run """ import argparse import certifi import csv import datetime import hashlib import inspect import io import json import logging import openpyxl as oxl import os import re import requests import shutil import sqlite3 import subprocess import sys import time import smtplib import ssl from email import encoders from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from bs4 import BeautifulSoup from deepdiff import DeepDiff from multiprocessing.pool import Pool from string import Template from _afc_types import * from _afc_errors import * from _version import __version__ from _wfa_types import * AFC_URL_SUFFIX = '/fbrat/ap-afc/' AFC_REQ_NAME = 'availableSpectrumInquiry' AFC_WEBUI_URL_SUFFIX = '/fbrat/ratapi/v1/' AFC_WEBUI_REQ_NAME = 'availableSpectrumInquirySec' AFC_WEBUI_TOKEN = 'about_csrf' headers = {'content-type': 'application/json'} AFC_TEST_DB_FILENAME = 'afc_input.sqlite3' TBL_REQS_NAME = 'test_vectors' TBL_RESPS_NAME = 'test_data' TBL_USERS_NAME = 'user_config' TBL_AFC_CFG_NAME = 'afc_config' TBL_AP_CFG_NAME = 'ap_config' AFC_PROT_NAME = 'https' # metadata variables TESTCASE_ID = "testCaseId" # mandatory keys that need to be read from input text file MANDATORY_METADATA_KEYS = {TESTCASE_ID} app_log = logging.getLogger(__name__) class TestCfg(dict): """Keep test configuration""" def __init__(self): dict.__init__(self) self.update({ 'cmd': '', 'port': 443, 'url_path': AFC_PROT_NAME + '://', 'log_level': logging.INFO, 'db_filename': AFC_TEST_DB_FILENAME, 'tests': None, 'is_test_by_index': True, 'resp': '', 'stress': 0, 'precision': None}) def _send_recv(self, params): """Run AFC test and wait for respons""" data = params.split("'") get_req = '' for item in data: try: json.loads(item) except ValueError as e: continue get_req = item break new_req_json = json.loads(get_req.encode('utf-8')) new_req = json.dumps(new_req_json, sort_keys=True) if (self['webui'] is False): params_data = { 'conn_type': self['conn_type'], 'debug': self['debug'], 'edebug': cfg['elaborated_debug'], 'gui': self['gui'] } else: # emulating request calll from webui params_data = { 'debug': 'True', 'edebug': cfg['elaborated_debug'], 'gui': 'True' } if (self['cache'] == False): params_data['nocache'] = 'True' ser_cert = not self['verif'] cli_certs = () if (self['prot'] == AFC_PROT_NAME and self['verif'] == False): # add mtls certificates if explicitly provided if not isinstance(self['cli_cert'], type(None)): cli_certs = ("".join(self['cli_cert']), "".join(self['cli_key'])) # add tls certificates if explicitly provided if not isinstance(self['ca_cert'], type(None)): ser_cert = "".join(self['ca_cert']) app_log.debug(f"Client {cli_certs}, Server {ser_cert}") before_ts = time.monotonic() rawresp = requests.post( self['url_path'], params=params_data, data=new_req, headers=headers, timeout=600, # 10 min verify=self['verif']) resp = rawresp.json() tId = resp.get('taskId') if ((self['conn_type'] == 'async') and (not isinstance(tId, type(None)))): tState = resp.get('taskState') params_data['task_id'] = tId while (tState == 'PENDING') or (tState == 'PROGRESS'): app_log.debug('_run_test() state %s, tid %s, status %d', tState, tId, rawresp.status_code) time.sleep(2) rawresp = requests.get(self['url_path'], params=params_data) if rawresp.status_code == 200: resp = rawresp.json() break tm_secs = time.monotonic() - before_ts app_log.info('Test done at %.1f secs', tm_secs) return new_req, resp class TestResultComparator: """ AFC Response comparator Private instance attributes: _precision -- Precision for results' comparison in dB. 0 means exact match """ def __init__(self, precision): """ Constructor Arguments: precision -- Precision for results' comparison in dB. 0 means exact match """ assert precision >= 0 self._precision = precision def compare_results(self, ref_str, result_str): """ Compares reference and actual AFC responses Arguments: ref_str -- Reference response JSON in string representation result_str -- Actual response json in string representation Returns list of difference description strings. Empty list means match """ # List of difference description strings diffs = [] # Reference and actual JSON dictionaries jsons = [] for s, kind in [(ref_str, "reference"), (result_str, "result")]: try: jsons.append(json.loads(s)) except json.JSONDecodeError as ex: diffs.append(f"Failed to decode {kind} JSON data: {ex}") return diffs self._recursive_compare(jsons[0], jsons[1], [], diffs) return diffs def _recursive_compare(self, ref_json, result_json, path, diffs): """ Recursive comparator of JSON nodes Arguments: ref_json -- Reference response JSON dictionary result_json -- Actual response JSON dictionary path -- Path (sequence of indices) to node in question diffs -- List of difference description strings to update """ # Items in questions in JSON dictionaries ref_item = self._get_item(ref_json, path) result_item = self._get_item(result_json, path) # Human readable path representation for difference messages path_repr = f"[{']['.join(str(idx) for idx in path)}]" if ref_item == result_item: return # Items are equal - nothing to do # So, items are different. What's the difference? if isinstance(ref_item, dict): # One item is dictionary. Other should also should be dictionary... if not isinstance(result_item, dict): diffs.append(f"Different item types at {path_repr}") return # ... with same set of keys ref_keys = set(ref_item.keys()) result_keys = set(result_item.keys()) for unique_key in (ref_keys ^ result_keys): if self._compare_channel_lists(ref_json, result_json, path + [unique_key], diffs): ref_keys -= {unique_key} result_keys -= {unique_key} if ref_keys != result_keys: msg = f"Different set of keys at {path_repr}" for kind, elems in [("reference", ref_keys - result_keys), ("result", result_keys - ref_keys)]: if elems: msg += \ f" Unique {kind} keys: {', '.join(sorted(elems))}." diffs.append(msg) return # Comparing values for individual keys for key in sorted(ref_keys): self._recursive_compare(ref_json, result_json, path + [key], diffs) elif isinstance(ref_item, list): # One item is list. Other should also be list... if not isinstance(result_item, list): diffs.append(f"Different item types at {path_repr}") return # If this is a channel list (or part thereof - handle it) if self._compare_channel_lists(ref_json, result_json, path, diffs): return # Proceeding with comparison of other list kinds if len(ref_item) != len(result_item): diffs.append( (f"Different list lengths at at {path_repr}: " f"{len(ref_item)} elements in reference vs " f"{len(result_item)} elements in result")) return # Comparing individual elements for i in range(len(ref_item)): self._recursive_compare(ref_json, result_json, path + [i], diffs) else: # Items should be scalars for item, kind in [(ref_item, "Reference"), (result_item, "Result")]: if not isinstance(item, (int, float, str)): diffs.append((f"{kind} data contains unrecognized item " f"type at {path_repr}")) return diffs.append((f"Difference at {path}: reference content is " f"{ref_item}, result content is {result_item}")) def _compare_channel_lists(self, ref_json, result_json, path, diffs): """ Trying to compare channel lists Arguments: ref_json -- Reference response JSON dictionary result_json -- Actual response JSON dictionary path -- Path (sequence of indices) to node in question diffs -- List of difference description strings to update Returns true if channel list comparison was done, no further action required, False if node is not a channel list, should be compared as usual """ if path[-1] == "channelCfi": # Comparison will be made at "maxEirp" return True # Human readable path representation for difference messages path_repr = f"[{']['.join(str(idx) for idx in path)}]" # EIRP dictionaries, indexed by channel identification (number or # frequency range) ref_channels = {} result_channels = {} if path[-1] == "maxEirp": # Channel numbers for kind, src, chan_dict in \ [("reference", ref_json, ref_channels), ("result", result_json, result_channels)]: try: numbers = self._get_item(src, path[:-1] + ["channelCfi"], default_last=[]) chan_dict.update( dict(zip([str(n) for n in numbers], self._get_item(src, path, default_last=[])))) except (TypeError, ValueError, KeyError): diffs.append((f"Unrecognized channel list structure at " f"{path_repr} in {kind}")) return True elif path[-1] == "availableFrequencyInfo": # Channel frequencies for kind, src, chan_dict in \ [("reference", ref_json, ref_channels), ("result", result_json, result_channels)]: try: for freq_info in self._get_item(src, path, default_last=[]): fr = freq_info["frequencyRange"] low = fr['lowFrequency'] high = fr['highFrequency'] for freq in range(low, high): chan_dict[f"[{freq} - {freq+1}"] = \ float(freq_info.get("maxPSD") or freq_info.get("maxPsd")) except (TypeError, ValueError, KeyError): diffs.append((f"Unrecognized frequency list structure at " f"{path_repr} in {kind}")) return True else: return False # Now will compare two channel dictionaries # First looking for unique channels for this_kind, this_channels, other_kind, other_channels in \ [("reference", ref_channels, "result", result_channels), ("result", result_channels, "reference", ref_channels)]: for channel in sorted(set(this_channels.keys()) - set(other_channels.keys())): diffs.append( (f"Channel {channel} present in {path_repr} of " f"{this_kind} with EIRP limit of " f"{this_channels[channel]}dBm, but absent in " f"{other_kind}")) # Then looking for different EIRPs on common channels for channel in sorted(set(ref_channels.keys()) & set(result_channels.keys())): diff = abs(ref_channels[channel] - result_channels[channel]) if diff <= self._precision: continue diffs.append( (f"Different values in {path_repr} for channel {channel}: " f"reference has EIRP of {ref_channels[channel]}dBm, " f"result has EIRP of {result_channels[channel]}dBm, " f"difference is: {diff:g}dB")) return True def _get_item(self, j, path, default_last=None): """ Retrieves item by sequence of indices Arguments: j -- JSON dictionary path -- Sequence of indices default_last -- What to return if item at last index is absent. None means throw exception (if nonlast item is absent - exception is also thrown) Returns retrieved item """ for path_idx, elem_idx in enumerate(path): try: j = j[elem_idx] except (KeyError, IndexError): if (default_last is not None) and \ (path_idx == (len(path) - 1)): return default_last raise return j def json_lookup(key, json_obj, val): """Lookup for key in json and change it value if required""" keepit = [] def lookup(key, json_obj, val, keepit): if isinstance(json_obj, dict): for k, v in json_obj.items(): if isinstance(v, (dict, list)): lookup(key, v, val, keepit) elif k == key: keepit.append(v) if val: json_obj[k] = val elif isinstance(json_obj, list): for node in json_obj: lookup(key, node, val, keepit) return keepit found = lookup(key, json_obj, val, keepit) return found def get_md5(fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def create_email_attachment(filename): part = None with open(filename, "rb") as attachment: # Add file as application/octet-stream # Email client can usually download this automatically as attachment part = MIMEBase("application", "octet-stream") part.set_payload(attachment.read()) # Add header as key/value pair to attachment part encoders.encode_base64(part) part.add_header("Content-Disposition", f"attachment; filename= {filename}",) return part def send_email(cfg): """Send an email to predefined adress using gmail smtp server""" sender = cfg['email_from'] recipient = cfg['email_to'] app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()" f" from: {sender}, to: {recipient}") context = ssl.create_default_context() with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server: server.login(sender, cfg['email_pwd']) body = f"Please find attached responses." message = MIMEMultipart("alternative") message['Subject'] = f"AFC test results" message['From'] = sender message['To'] = recipient if not isinstance(cfg['email_cc'], type(None)): message['Cc'] = cfg['email_cc'] # Turn these into plain/html MIMEText objects message.attach(MIMEText(body, "plain")) message.attach(create_email_attachment(cfg['outfile'][0])) server.sendmail(sender, recipient, message.as_string()) def _send_recv(cfg, req_data, ssn=None): """Send AFC request and receiver it's response""" app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()") new_req_json = json.loads(req_data.encode('utf-8')) new_req = json.dumps(new_req_json, sort_keys=True) if (cfg['webui'] is False): params_data = { 'conn_type': cfg['conn_type'], 'debug': cfg['debug'], 'edebug': cfg['elaborated_debug'], 'gui': cfg['gui'] } if (cfg['cache'] == False): params_data['nocache'] = 'True' post_func = requests.post else: # emulating request call from webui params_data = { 'debug': 'True', 'gui': 'True' } headers['Accept-Encoding'] = 'gzip, defalte' headers['Referer'] = cfg['base_url'] + 'fbrat/www/index.html' headers['X-Csrf-Token'] = cfg['token'] app_log.debug( f"({os.getpid()}) {inspect.stack()[0][3]}()\n" f"Cookies: {requests.utils.dict_from_cookiejar(ssn.cookies)}") post_func = ssn.post ser_cert = () cli_certs = None if ((cfg['prot'] == AFC_PROT_NAME and cfg['verif']) or (cfg['ca_cert'])): # add mtls certificates if explicitly provided if not isinstance(cfg['cli_cert'], type(None)): cli_certs = ("".join(cfg['cli_cert']), "".join(cfg['cli_key'])) # add tls certificates if explicitly provided if not isinstance(cfg['ca_cert'], type(None)): ser_cert = "".join(cfg['ca_cert']) cfg['verif'] = True else: os.environ['REQUESTS_CA_BUNDLE'] = certifi.where() app_log.debug(f"REQUESTS_CA_BUNDLE " f"{os.environ.get('REQUESTS_CA_BUNDLE')}") if "REQUESTS_CA_BUNDLE" in os.environ: ser_cert = "".join(os.environ.get('REQUESTS_CA_BUNDLE')) cfg['verif'] = True else: app_log.error(f"Missing CA certificate while forced.") return app_log.debug(f"Client {cli_certs}, Server {ser_cert}") try: rawresp = post_func( cfg['url_path'], params=params_data, data=new_req, headers=headers, timeout=600, # 10 min cert=cli_certs, verify=ser_cert if cfg['verif'] else False) rawresp.raise_for_status() except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as err: app_log.error(f"{err}") return resp = rawresp.json() tId = resp.get('taskId') if ((cfg['conn_type'] == 'async') and (not isinstance(tId, type(None)))): tState = resp.get('taskState') params_data['task_id'] = tId while (tState == 'PENDING') or (tState == 'PROGRESS'): app_log.debug('_run_test() state %s, tid %s, status %d', tState, tId, rawresp.status_code) time.sleep(2) rawresp = requests.get(cfg['url_path'], params=params_data) if rawresp.status_code == 200: resp = rawresp.json() break app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()" f" Resp status: {rawresp.status_code}") return resp def _send_recv_token(cfg, ssn): """Making login, open session and getting CSRF token""" app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()") token = '' ser_cert = () cli_certs = None app_log.debug(f"=== ser {type(ser_cert)}") if ((cfg['prot'] == AFC_PROT_NAME and cfg['verif']) or (cfg['ca_cert'])): # add mtls certificates if explicitly provided if not isinstance(cfg['cli_cert'], type(None)): cli_certs = ("".join(cfg['cli_cert']), "".join(cfg['cli_key'])) # add tls certificates if explicitly provided if not isinstance(cfg['ca_cert'], type(None)): ser_cert = "".join(cfg['ca_cert']) cfg['verif'] = True else: os.environ['REQUESTS_CA_BUNDLE'] = certifi.where() app_log.debug(f"REQUESTS_CA_BUNDLE " f"{os.environ.get('REQUESTS_CA_BUNDLE')}") if "REQUESTS_CA_BUNDLE" in os.environ: ser_cert = "".join(os.environ.get('REQUESTS_CA_BUNDLE')) cfg['verif'] = True else: app_log.error(f"Missing CA certificate while forced.") return token app_log.debug(f"Client {cli_certs}, Server {ser_cert}") # get login ssn.headers.update({ 'Accept-Encoding': 'gzip, defalte' }) url_login = cfg['base_url'] + 'fbrat/user/sign-in' app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()\n" f"===> URL {url_login}\n" f"===> Status {ssn.headers}\n" f"===> Cookies: {ssn.cookies}\n") try: rawresp = ssn.get(url_login, stream=False, cert=cli_certs, verify=ser_cert if cfg['verif'] else False) except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as err: app_log.error(f"{err}") return token soup = BeautifulSoup(rawresp.text, 'html.parser') inp_tkn = soup.find('input', id='csrf_token') app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()\n" f"<--- Status {rawresp.status_code}\n" f"<--- Headers {rawresp.headers}\n" f"<--- Cookies: {ssn.cookies}\n" f"<--- Input: {inp_tkn}\n") token = inp_tkn.get('value') # fetch username and password from test db con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT * FROM %s\n' % TBL_USERS_NAME) found_user = cur.fetchall() con.close() found_json = json.loads(found_user[0][1]) app_log.debug(f"Found Users: {found_json['username']}") form_data = { 'next': '/', 'reg_next': '/', 'csrf_token': token, 'username': found_json['username'], 'password': found_json['password'] } ssn.headers.update({ 'Content-Type': 'application/x-www-form-urlencoded', 'Referer': url_login }) try: rawresp = ssn.post(url_login, data=form_data, stream=False, cert=cli_certs, verify=ser_cert if cfg['verif'] else False) except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as err: app_log.error(f"{err}") return token app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()\n" f"<--- Status {rawresp.status_code}\n" f"<--- Headers {rawresp.headers}\n" f"<--- Cookies: {ssn.cookies}\n") return token def make_db(filename): """Create DB file only with schema""" app_log.debug('%s()', inspect.stack()[0][3]) if os.path.isfile(filename): app_log.debug('%s() The db file is exists, no need to create new one.', inspect.stack()[0][3]) return True app_log.info('Create DB tables (%s, %s) from source files', TBL_REQS_NAME, TBL_RESPS_NAME) con = sqlite3.connect(filename) cur = con.cursor() cur.execute('CREATE TABLE IF NOT EXISTS ' + TBL_REQS_NAME + ' (test_id varchar(50), data json)') cur.execute('CREATE TABLE IF NOT EXISTS ' + TBL_RESPS_NAME + ' (test_id varchar(50), data json, hash varchar(255))') con.close() return True def compare_afc_config(cfg): """ Compare AFC configuration from the DB with provided one. """ app_log.debug('%s()', inspect.stack()[0][3]) if not os.path.isfile(cfg['db_filename']): app_log.error('Missing DB file %s', cfg['db_filename']) return AFC_ERR con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT * FROM %s' % TBL_AFC_CFG_NAME) found_cfgs = cur.fetchall() con.close() # get record from the input file if isinstance(cfg['infile'], type(None)): app_log.debug('Missing input file to compare with.') return AFC_OK filename = cfg['infile'][0] with open(filename, 'r') as fp_test: while True: rec = fp_test.read() if not rec: break try: get_rec = json.loads(rec) except (ValueError, TypeError) as e: continue break app_log.debug(json.dumps(get_rec, sort_keys=True, indent=4)) get_cfg = '' app_log.debug('Found %d config records', len(found_cfgs)) idx = 0 max_idx = len(found_cfgs) if not isinstance(cfg['idx'], type(None)): idx = cfg['idx'] if idx >= max_idx: app_log.error("The index (%d) is out of range (0 - %d).", idx, max_idx - 1) return AFC_ERR max_idx = idx + 1 while idx < max_idx: for item in list(found_cfgs[idx]): try: get_cfg = json.loads(item) except (ValueError, TypeError) as e: continue break app_log.debug("Record %d:\n%s", idx, json.dumps(get_cfg['afcConfig'], sort_keys=True, indent=4)) get_diff = DeepDiff(get_cfg['afcConfig'], get_rec, report_repetition=True) app_log.info("rec %d:\n%s", idx, get_diff) idx += 1 return AFC_OK def start_acquisition(cfg): """ Fetch test vectors from the DB, drop previous response table, run tests and fill responses in the DB with hash values """ app_log.debug(f"{inspect.stack()[0][3]}()") found_reqs, found_resp, ids, test_ids = _convert_reqs_n_resps_to_dict(cfg) # check if to make acquisition of all tests con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() # drop response table and create new one if all testcases required # to reacquisition all_resps = False if (len(test_ids) == len(found_reqs)): all_resps = True if all_resps: try: app_log.debug(f"{inspect.stack()[0][3]}() " f"drop table {TBL_RESPS_NAME}") cur.execute('DROP TABLE ' + TBL_RESPS_NAME) except Exception as OperationalError: # awkward but bearable app_log.debug('Missing table %s', TBL_RESPS_NAME) cur.execute('CREATE TABLE ' + TBL_RESPS_NAME + ' (test_id varchar(50), data json, hash varchar(255))') app_log.info(f"Number of tests to make acquisition - {len(test_ids)}") for test_id in test_ids: req_id = ids[test_id][0] app_log.debug(f"Request: {req_id}") resp = _send_recv(cfg, json.dumps(found_reqs[test_id][0])) if isinstance(resp, type(None)): app_log.error(f"Test {test_ids} ({req_id}) is Failed.") continue json_lookup('availabilityExpireTime', resp, '0') upd_data = json.dumps(resp, sort_keys=True) hash_obj = hashlib.sha256(upd_data.encode('utf-8')) app_log.debug(f"{inspect.stack()[0][3]}() new: " f"{hash_obj.hexdigest()}") if all_resps: cur.execute('INSERT INTO ' + TBL_RESPS_NAME + ' values ( ?, ?, ?)', [req_id, upd_data, hash_obj.hexdigest()]) con.commit() elif (test_id in found_resp.keys() and found_resp[test_id][1] == hash_obj.hexdigest()): app_log.debug(f"Skip to update hash for {req_id}. " f"Found the same value.") continue else: hash = hash_obj.hexdigest() cur.execute('UPDATE ' + TBL_RESPS_NAME + ' SET ' + 'data = ?, hash = ? WHERE test_id =?', (upd_data, hash, ids[test_id][0])) con.commit() con.close() return AFC_OK def process_jsonline(line): """ Function to process the input line from .txt file containing comma separated json strings """ # convert the input line to list of dictioanry/dictionaries line_list = json.loads("[" + line + "]") request_dict = line_list[0] metadata_dict = line_list[1] if len(line_list) > 1 else {} return request_dict, metadata_dict def get_db_req_resp(cfg): """ Function to retrieve request and response records from the database """ con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT * FROM %s' % TBL_REQS_NAME) found_reqs = cur.fetchall() db_reqs_list = [row[0] for row in found_reqs] cur.execute('SELECT * FROM %s' % TBL_RESPS_NAME) found_resps = cur.fetchall() db_resp_list = [row[0] for row in found_resps] con.close() return db_reqs_list, db_resp_list def insert_reqs_int(filename, con, cur): """ Insert requests from input file to a table in test db. """ with open(filename, 'r') as fp_test: while True: dataline = fp_test.readline() if not dataline: break # process dataline arguments app_log.debug(f"= {dataline}") request_json, metadata_json = process_jsonline(dataline) # reject the request if mandatory metadata arguments are not # present if not MANDATORY_METADATA_KEYS.issubset( set(metadata_json.keys())): # missing mandatory keys in test case input app_log.error("Test case input does not contain required" " mandatory arguments: %s", ", ".join(list( MANDATORY_METADATA_KEYS - set(metadata_json.keys())))) return AFC_ERR app_log.info(f"Insert new request in DB " f"({metadata_json[TESTCASE_ID]})") app_log.debug(f"+ {metadata_json[TESTCASE_ID]}") cur.execute('INSERT INTO ' + TBL_REQS_NAME + ' VALUES ( ?, ?)', (metadata_json[TESTCASE_ID], json.dumps(request_json),)) con.commit() con.close() return AFC_OK def insert_reqs(cfg): """ Insert requests from input file to a table in test db. Drop previous table of requests. """ app_log.debug(f"{inspect.stack()[0][3]}()") if isinstance(cfg['infile'], type(None)): app_log.error(f"Missing input file") return AFC_ERR filename = cfg['infile'][0] app_log.debug(f"{inspect.stack()[0][3]}() {filename}") if not os.path.isfile(filename): app_log.error(f"Missing raw test data file {filename}") return AFC_ERR if not os.path.isfile(cfg['db_filename']): app_log.error(f"Unable to find test db file.") return AFC_ERR con = sqlite3.connect(cfg['db_filename']) # drop existing table of requests and create new one app_log.info(f"Drop table of requests ({TBL_REQS_NAME})") cur = con.cursor() try: cur.execute('DROP TABLE ' + TBL_REQS_NAME) except Exception as OperationalError: app_log.debug(f"Fail to drop, missing table {TBL_REQS_NAME}") cur.execute('CREATE TABLE ' + TBL_REQS_NAME + ' (test_id varchar(50), data json)') con.commit() return insert_reqs_int(filename, con, cur) def extend_reqs(cfg): """ Insert requests from input file to a table in test db. Drop previous table of requests. """ app_log.debug(f"{inspect.stack()[0][3]}()") if isinstance(cfg['infile'], type(None)): app_log.error(f"Missing input file") return AFC_ERR filename = cfg['infile'][0] app_log.debug(f"{inspect.stack()[0][3]}() {filename}") if not os.path.isfile(filename): app_log.error(f"Missing raw test data file {filename}") return AFC_ERR if not os.path.isfile(cfg['db_filename']): app_log.error(f"Unable to find test db file.") return AFC_ERR con = sqlite3.connect(cfg['db_filename']) # add more rows to existing table of requests app_log.info(f"Extending table of requests ({TBL_REQS_NAME})") cur = con.cursor() return insert_reqs_int(filename, con, cur) def insert_devs(cfg): """ Insert device descriptors from input file to a table in test db. Drop previous table of devices. """ app_log.debug(f"{inspect.stack()[0][3]}()") if isinstance(cfg['infile'], type(None)): app_log.error(f"Missing input file") return AFC_ERR filename = cfg['infile'][0] app_log.debug(f"{inspect.stack()[0][3]}() {filename}") if not os.path.isfile(filename): app_log.error(f"Missing raw test data file {filename}") return AFC_ERR if not os.path.isfile(cfg['db_filename']): app_log.error(f"Unable to find test db file.") return AFC_ERR con = sqlite3.connect(cfg['db_filename']) # drop existing table of requests and create new one app_log.info(f"Drop table of devices ({TBL_AP_CFG_NAME})") cur = con.cursor() try: cur.execute('DROP TABLE ' + TBL_AP_CFG_NAME) except Exception as OperationalError: app_log.debug(f"Fail to drop, missing table {TBL_AP_CFG_NAME}") cur.execute('CREATE TABLE ' + TBL_AP_CFG_NAME + ' (ap_config_id, data json, user_id)') cnt = 1 con.commit() with open(filename, 'r') as fp_test: while True: dataline = fp_test.readline() if not dataline or (len(dataline) < 72): break # process dataline arguments app_log.debug(f"= {dataline}") cur.execute('INSERT INTO ' + TBL_AP_CFG_NAME + ' VALUES ( ?, ?, ?)', (cnt, dataline[:-1], 1)) con.commit() cnt += 1 con.close() return AFC_OK def add_reqs(cfg): """Prepare DB source files""" app_log.debug(f"{inspect.stack()[0][3]}()") if isinstance(cfg['infile'], type(None)): app_log.error('Missing input file') return AFC_ERR filename = cfg['infile'][0] app_log.debug('%s() %s', inspect.stack()[0][3], filename) if not os.path.isfile(filename): app_log.error('Missing raw test data file %s', filename) return AFC_ERR if not make_db(cfg['db_filename']): return AFC_ERR # fetch available requests and responses db_reqs_list, db_resp_list = get_db_req_resp(cfg) con = sqlite3.connect(cfg['db_filename']) with open(filename, 'r') as fp_test: while True: dataline = fp_test.readline() if not dataline: break # process dataline arguments request_json, metadata_json = process_jsonline(dataline) # reject the request if mandatory metadata arguments are not # present if not MANDATORY_METADATA_KEYS.issubset( set(metadata_json.keys())): # missing mandatory keys in test case input app_log.error("Test case input does not contain required" " mandatory arguments: %s", ", ".join(list( MANDATORY_METADATA_KEYS - set(metadata_json.keys())))) return AFC_ERR # check if the test case already exists in the database test # vectors if metadata_json[TESTCASE_ID] in db_reqs_list: app_log.error("Test case: %s already exists in database", metadata_json[TESTCASE_ID]) break app_log.info("Executing test case: %s", metadata_json[TESTCASE_ID]) new_req, resp = cfg._send_recv(json.dumps(request_json)) # get request id from a request, response not always has it # the request contains test category new_req_json = json.loads(new_req.encode('utf-8')) req_id = json_lookup('requestId', new_req_json, None) resp_res = json_lookup('shortDescription', resp, None) if (resp_res[0] != 'Success') \ and (req_id[0].lower().find('urs') == -1) \ and (req_id[0].lower().find('ibp') == -1): app_log.error('Failed in test response - %s', resp_res) break app_log.info('Got response for the request') json_lookup('availabilityExpireTime', resp, '0') app_log.info('Insert new request in DB') cur = con.cursor() cur.execute('INSERT INTO ' + TBL_REQS_NAME + ' VALUES ( ?, ?)', (metadata_json[TESTCASE_ID], new_req,)) con.commit() app_log.info('Insert new resp in DB') upd_data = json.dumps(resp, sort_keys=True) hash_obj = hashlib.sha256(upd_data.encode('utf-8')) cur = con.cursor() cur.execute('INSERT INTO ' + TBL_RESPS_NAME + ' values ( ?, ?, ?)', [metadata_json[TESTCASE_ID], upd_data, hash_obj.hexdigest()]) con.commit() con.close() return AFC_OK def dump_table(conn, tbl_name, out_file, pref): app_log.debug(f"{inspect.stack()[0][3]}() {tbl_name}") fp_new = '' if 'single' in out_file: fp_new = open(out_file['single'], 'w') conn.execute(f"SELECT * FROM {tbl_name}") found_data = conn.fetchall() for val in enumerate(found_data): if isinstance(fp_new, io.IOBase): fp_new.write(f"{str(val)}\n") elif 'split' in out_file: tbl_fname = { TBL_REQS_NAME: '_Request.txt', TBL_RESPS_NAME: '_Response.txt' } new_json = json.loads(val[1][1].encode('utf-8')) prefix, name, nbr = val[1][0].split('.') app_log.debug(f"{inspect.stack()[0][3]}() {name} {nbr}") # omit URS testcases if (name.lower().find('urs') != -1) or (pref and not pref == prefix): continue fp_test = open(f"{out_file['split']}/{prefix}_{name}_{nbr}" + f"{tbl_fname[tbl_name]}", 'a') fp_test.write(f"{val[1][1]}\n") fp_test.close() else: # Just dump to the console app_log.info(f"{val[1]}") if isinstance(fp_new, io.IOBase): fp_new.close() def dump_database(cfg): """Dump data from test DB tables""" app_log.debug(f"{inspect.stack()[0][3]}()") find_key = '' found_tables = [] # keep configuration for output path and files # 'single' - only single file for whole output # 'split' - separate file for each response out_file = {} if not os.path.isfile(cfg['db_filename']): app_log.error(f"Missing DB file {cfg['db_filename']}") return AFC_ERR set_dump_db_opts = { 'wfa': [(TBL_REQS_NAME,), (TBL_RESPS_NAME,)], 'all': [(TBL_REQS_NAME,), (TBL_RESPS_NAME,)], 'req': [(TBL_REQS_NAME,)], 'resp': [(TBL_RESPS_NAME,)], 'ap': [('ap_config',)], 'cfg': [('afc_config',)], 'user': [('user_config',)] } prefix = { 'wfa': "AFCS", 'all': None } tbl = 'True' if isinstance(cfg['table'], list): tbl = cfg['table'][0] con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() if tbl in set_dump_db_opts: # Dump only tables with requests and responses found_tables.extend(set_dump_db_opts[tbl]) elif tbl == 'True': # Dump all tables if no options provided cur.execute(f"SELECT name FROM sqlite_master WHERE type='table';") found_tables = cur.fetchall() pref = None if tbl == 'wfa' or tbl == 'all': if tbl in prefix: pref = prefix[tbl] out_file['split'] = './' if not isinstance(cfg['outpath'], type(None)): out_file['split'] = cfg['outpath'][0] + '/' out_file['split'] += WFA_TEST_DIR if os.path.exists(out_file['split']): shutil.rmtree(out_file['split']) os.mkdir(out_file['split']) elif isinstance(cfg['outfile'], type(None)): app_log.error(f"Missing output filename.\n") return AFC_ERR else: out_file['single'] = cfg['outfile'][0] for tbl in enumerate(found_tables): app_log.debug(f"Dump {tbl} to {out_file}") dump_table(cur, tbl[1][0], out_file, pref) con.close() return AFC_OK def export_admin_config(cfg): """Export admin server configuration""" app_log.debug('%s()', inspect.stack()[0][3]) con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT COUNT(*) FROM ' + TBL_AP_CFG_NAME) found_rcds = cur.fetchall() with open(cfg['outfile'][0], 'w') as fp_exp: cur.execute('SELECT * FROM %s' % TBL_AFC_CFG_NAME) found_cfg = cur.fetchall() app_log.debug('Found AfcCfg: %s', found_cfg) cur.execute('SELECT * FROM %s\n' % TBL_USERS_NAME) found_user = cur.fetchall() app_log.debug('Found Users: %s\n', found_user) cur.execute('SELECT * FROM %s\n' % TBL_AP_CFG_NAME) found_aps = cur.fetchall() con.close() aps = '' idx = 0 for count, val in enumerate(found_aps): aps += str(val[1]) + ',' app_log.debug('Found APs: %s\n', aps[:-1]) out_str = '{"afcAdminConfig":' + found_cfg[0][1] + ', '\ '"userConfig":' + found_user[0][1] + ', '\ '"apConfig":[' + aps[:-1] + ']}' fp_exp.write(out_str) app_log.info('Server admin config exported to %s', cfg['outfile'][0]) return AFC_OK def dry_run_test(cfg): """Run one or more requests from provided file""" if isinstance(cfg['infile'], type(None)): app_log.error('Missing input file') return AFC_ERR filename = cfg['infile'][0] app_log.debug('%s() %s', inspect.stack()[0][3], filename) if not os.path.isfile(filename): app_log.error('Missing raw test data file %s', filename) return AFC_ERR with open(filename, 'r') as fp_test: while True: dataline = fp_test.readline() if not dataline: break app_log.info('Request:') app_log.info(dataline) # process dataline arguments request_json, _ = process_jsonline(dataline) resp = _send_recv(cfg, json.dumps(request_json)) # get request id from a request, response not always has it # the request contains test category new_req_json = json.loads(json.dumps(request_json).encode('utf-8')) req_id = json_lookup('requestId', new_req_json, None) resp_res = json_lookup('shortDescription', resp, None) if (resp_res[0] != 'Success') \ and (req_id[0].lower().find('urs') == -1): app_log.error('Failed in test response - %s', resp_res) app_log.debug(resp) break app_log.info('Got response for the request') app_log.info('Resp:') app_log.info(resp) app_log.info('\n\n') json_lookup('availabilityExpireTime', resp, '0') upd_data = json.dumps(resp, sort_keys=True) hash_obj = hashlib.sha256(upd_data.encode('utf-8')) return AFC_OK def get_nbr_testcases(cfg): """ Find APs count on DB table """ if not os.path.isfile(cfg['db_filename']): print('INFO: Missing DB file %s', cfg['db_filename']) return False con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT count("requestId") from ' + TBL_REQS_NAME) found_data = cur.fetchall() db_inquiry_count = found_data[0][0] con.close() app_log.debug("found %s ap lists from db table", db_inquiry_count) return db_inquiry_count def collect_tests2combine(sh, rows, t_ident, t2cmb, cmb_t): """ Lookup for combined test vectors, build of required test vectors to combine """ app_log.debug('%s()\n', inspect.stack()[0][3]) for i in range(1, rows + 1): cell = sh.cell(row=i, column=PURPOSE_CLM) if ((cell.value is None) or (AFC_TEST_IDENT.get(cell.value.lower()) is None) or (cell.value == 'SRI')): continue if (t_ident != 'all') and (cell.value.lower() != t_ident): continue cell = sh.cell(row=i, column=COMBINED_CLM) if cell.value is not None and \ cell.value.upper() != 'NO': raw_list = str(cell.value) test_case_id = sh.cell(row=i, column=UNIT_NAME_CLM).value test_case_id += "." test_case_id += sh.cell(row=i, column=PURPOSE_CLM).value test_case_id += "." test_case_id += str(sh.cell(row=i, column=TEST_VEC_CLM).value) cmb_t[test_case_id] = [] for t in raw_list.split(','): if '-' in t: # found range of test vectors left, right = t.split('-') t2cmb_ident = '' for r in AFC_TEST_IDENT: if r in left.lower(): min = int(left.replace(r.upper(), '')) max = int(right.replace(r.upper(), '')) + 1 t2cmb_ident = r.upper() for cnt in range(min, max): tcase = t2cmb_ident + str(cnt) t2cmb[tcase] = '' cmb_t[test_case_id] += [tcase] else: # found single test vector t2cmb[t] = '' cmb_t[test_case_id] += [t] def _parse_tests_dev_desc(sheet, fp_new, rows): app_log.debug('%s()\n', inspect.stack()[0][3]) for i in range(1, rows + 1): res_str = "" cell = sheet.cell(row=i, column=PURPOSE_CLM) if ((cell.value is None) or (AFC_TEST_IDENT.get(cell.value.lower()) is None) or (cell.value == 'SRI')): continue # skip combined test vectors because device descriptor is missing cell = sheet.cell(row=i, column=COMBINED_CLM) if cell.value is not None and \ cell.value.upper() != 'NO': continue res_str += build_device_desc( sheet.cell(row=i, column=INDOOR_DEPL_CLM).value, sheet.cell(row=i, column=SER_NBR_CLM).value, sheet.cell(row=i, column=RULESET_CLM).value, sheet.cell(row=i, column=ID_CLM).value, True) fp_new.write(res_str + '\n') return res_str def _parse_tests_all(sheet, fp_new, rows, test_ident): app_log.debug('%s()\n', inspect.stack()[0][3]) # collect tests to combine in next loop tests2combine = dict() # gather combined tests combined_tests = dict() collect_tests2combine(sheet, rows, test_ident, tests2combine, combined_tests) if len(combined_tests): app_log.info('Found combined test vectors: %s', ' '.join(combined_tests)) app_log.info('Found test vectors to combine: %s', ' '.join(tests2combine)) for i in range(1, rows + 1): cell = sheet.cell(row=i, column=PURPOSE_CLM) if ((cell.value is None) or (AFC_TEST_IDENT.get(cell.value.lower()) is None) or (cell.value == 'SRI')): continue if (test_ident != 'all') and (cell.value.lower() != test_ident): continue uut = sheet.cell(row=i, column=UNIT_NAME_CLM).value purpose = sheet.cell(row=i, column=PURPOSE_CLM).value test_vec = sheet.cell(row=i, column=TEST_VEC_CLM).value test_case_id = uut + "." + purpose + "." + str(test_vec) # Prepare request header '{"availableSpectrumInquiryRequests": [{' res_str = REQ_INQUIRY_HEADER # check if the test case is combined cell = sheet.cell(row=i, column=COMBINED_CLM) if cell.value is not None and \ cell.value.upper() != 'NO': for item in combined_tests[test_case_id]: res_str += tests2combine[item] + ',' res_str = res_str[:-1] else: # # Inquired Channels # res_str += '{' + REQ_INQ_CHA_HEADER cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_131) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_131) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_132) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_132) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_133) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_133) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_134) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_134) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_136) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_136) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}, ' cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_137) res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value) cell = sheet.cell(row=i, column=CHANNEL_CFI_137) if cell.value is not None: res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value) res_str += '}' + REQ_INQ_CHA_FOOTER + ' ' # # Device descriptor # res_str += REQ_DEV_DESC_HEADER res_str += build_device_desc( sheet.cell(row=i, column=INDOOR_DEPL_CLM).value, sheet.cell(row=i, column=SER_NBR_CLM).value, sheet.cell(row=i, column=RULESET_CLM).value, sheet.cell(row=i, column=ID_CLM).value, False) res_str += ',' # # Inquired Frequency Range # res_str += REQ_INQ_FREQ_RANG_HEADER freq_range = AfcFreqRange() freq_range.set_range_limit( sheet.cell( row=i, column=INQ_FREQ_RNG_LOWFREQ_A), 'low') freq_range.set_range_limit( sheet.cell( row=i, column=INQ_FREQ_RNG_HIGHFREQ_A), 'high') try: res_str += freq_range.append_range() except IncompleteFreqRange as e: app_log.debug(f"{e} - row {i}") freq_range = AfcFreqRange() freq_range.set_range_limit( sheet.cell( row=i, column=INQ_FREQ_RNG_LOWFREQ_B), 'low') freq_range.set_range_limit( sheet.cell( row=i, column=INQ_FREQ_RNG_HIGHFREQ_B), 'high') try: tmp_str = freq_range.append_range() res_str += ', ' + tmp_str except IncompleteFreqRange as e: app_log.debug(f"{e} - row {i}") res_str += REQ_INQ_FREQ_RANG_FOOTER cell = sheet.cell(row=i, column=MINDESIREDPOWER) if (cell.value): res_str += REQ_MIN_DESIRD_PWR + str(cell.value) + ', ' # # Location # res_str += REQ_LOC_HEADER cell = sheet.cell(row=i, column=INDOORDEPLOYMENT) res_str += REQ_LOC_INDOORDEPL + str(cell.value) + ', ' # Location - elevation res_str += REQ_LOC_ELEV_HEADER cell = sheet.cell(row=i, column=ELE_VERTICALUNCERTAINTY) if isinstance(cell.value, int): res_str += REQ_LOC_VERT_UNCERT + str(cell.value) + ', ' cell = sheet.cell(row=i, column=ELE_HEIGHTTYPE) res_str += REQ_LOC_HEIGHT_TYPE + '"' + str(cell.value) + '"' cell = sheet.cell(row=i, column=ELE_HEIGHT) if isinstance(cell.value, int) or isinstance(cell.value, float): res_str += ', ' + REQ_LOC_HEIGHT + str(cell.value) res_str += '}, ' # Location - uncertainty reqion geo_coor = AfcGeoCoordinates(sheet, i) try: res_str += geo_coor.collect_coordinates() except IncompleteGeoCoordinates as e: app_log.debug(e) res_str += REQ_LOC_FOOTER cell = sheet.cell(row=i, column=REQ_ID_CLM) if isinstance(cell.value, str): req_id = cell.value else: req_id = "" res_str += REQ_REQUEST_ID + '"' + req_id + '"' res_str += '}' # collect test vectors required for combining # build test case id in format short_tcid = ''.join(test_case_id.split('.', 1)[1].split('.')) if short_tcid in tests2combine.keys(): tests2combine[short_tcid] = res_str.split( REQ_INQUIRY_HEADER)[1] res_str += REQ_INQUIRY_FOOTER cell = sheet.cell(row=i, column=VERSION_CLM) res_str += REQ_VERSION + '"' + str(cell.value) + '"' res_str += REQ_FOOTER # adding metadata parameters res_str += ', ' # adding test case id res_str += META_HEADER res_str += META_TESTCASE_ID + '"' + test_case_id + '"' res_str += META_FOOTER fp_new.write(res_str + '\n') return AFC_OK def parse_tests(cfg): app_log.debug('%s()\n', inspect.stack()[0][3]) filename = '' out_fname = '' if isinstance(cfg['infile'], type(None)): app_log.error('Missing input file') return AFC_ERR filename = cfg['infile'][0] if not os.path.isfile(filename): app_log.error('Missing raw test data file %s', filename) return AFC_ERR test_ident = cfg['test_id'] if isinstance(cfg['outfile'], type(None)): out_fname = test_ident + NEW_AFC_TEST_SUFX else: out_fname = cfg['outfile'][0] wb = oxl.load_workbook(filename, data_only=True) for sh in wb: app_log.debug('Sheet title: %s', sh.title) app_log.debug('rows %d, cols %d\n', sh.max_row, sh.max_column) # Look for request sheet for s in range(len(wb.sheetnames)): if wb.sheetnames[s] == "Availability Requests": break wb.active = s sheet = wb.active nbr_rows = sheet.max_row app_log.debug('Rows range 1 - %d', nbr_rows + 1) app_log.info('Export tests into file: %s', out_fname) fp_new = open(out_fname, 'w') # partial parse if only certain parameters required. # only for device descriptor for now. if cfg['dev_desc']: res_str = _parse_tests_dev_desc(sheet, fp_new, nbr_rows) fp_new.write(res_str + '\n') else: _parse_tests_all(sheet, fp_new, nbr_rows, test_ident) fp_new.close() return AFC_OK def test_report(fname, runtimedata, testnumdata, testvectordata, test_result, upd_data): """Procedure to generate AFC test results report. Args: runtimedata(str) : Tested case running time. testnumdata(str): Tested case id. testvectordata(int): Tested vector id. test_result(str: Test result Pass is 0 or Fail is 1) upd_data(list): List for test response data Return: Create test results file. """ ts_time = datetime.datetime.fromtimestamp(time.time()).\ strftime('%Y_%m_%d_%H%M%S') # Test results output args data_names = ['Date', 'Test Number', 'Test Vector', 'Running Time', 'Test Result', 'Response data'] data = {'Date': ts_time, 'Test Number': testnumdata, 'Test Vector': testvectordata, 'Running Time': runtimedata, 'Test Result': test_result, 'Response data': upd_data} with open(fname, "a") as f: file_writer = csv.DictWriter(f, fieldnames=data_names) if os.stat(fname).st_size == 0: file_writer.writeheader() file_writer.writerow(data) def _run_tests(cfg, reqs, resps, comparator, ids, test_cases): """ Run tests reqs: {testcaseid: [request_json_str]} resps: {testcaseid: [response_json_str, response_hash]} comparator: reference to object test_cases: [testcaseids] """ app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() {test_cases} " f"{cfg['url_path']}") if not len(resps): app_log.info(f"Unable to compare response data." f"Suggest to make acquisition of responses.") return AFC_ERR all_test_res = AFC_OK accum_secs = 0 app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() " f"{type(cfg['webui'])} ") ssn = None if cfg['webui'] is True: ssn = requests.Session() cfg['token'] = _send_recv_token(cfg, ssn) for test_case in test_cases: # Default reset test_res value test_res = AFC_OK req_id = ids[test_case][0] app_log.info(f"Prepare to run test - {req_id}") if test_case not in reqs: app_log.warning(f"The requested test case {test_case} is " f"invalid/not available in database") continue request_data = reqs[test_case][0] app_log.debug(f"{inspect.stack()[0][3]}() {request_data}") before_ts = time.monotonic() resp = _send_recv(cfg, json.dumps(request_data), ssn) tm_secs = time.monotonic() - before_ts res = f"id {test_case} name {req_id} status $status time {tm_secs:.1f}" res_template = Template(res) if isinstance(resp, type(None)): test_res = AFC_ERR all_test_res = AFC_ERR elif cfg['webui'] is True: pass else: json_lookup('availabilityExpireTime', resp, '0') upd_data = json.dumps(resp, sort_keys=True) diffs = [] hash_obj = hashlib.sha256(upd_data.encode('utf-8')) diffs = comparator.compare_results(ref_str=resps[test_case][0], result_str=upd_data) if (resps[test_case][1] == hash_obj.hexdigest()) \ if cfg['precision'] is None else (not diffs): res = res_template.substitute(status="Ok") else: for line in diffs: app_log.error(f" Difference: {line}") app_log.error(hash_obj.hexdigest()) test_res = AFC_ERR if test_res == AFC_ERR: res = res_template.substitute(status="Fail") app_log.error(res) all_test_res = AFC_ERR accum_secs += tm_secs app_log.info(res) # For saving test results option if not isinstance(cfg['outfile'], type(None)): test_report(cfg['outfile'][0], float(tm_secs), test_case, req_id, ("PASS" if test_res == AFC_OK else "FAIL"), upd_data) app_log.info(f"Total testcases runtime : {round(accum_secs, 1)} secs") if not isinstance(cfg['outfile'], type(None)): send_email(cfg) return all_test_res def prep_and_run_tests(cfg, reqs, resps, ids, test_cases): """ Run tests reqs: {testcaseid: [request_json_str]} resps: {testcaseid: [response_json_str, response_hash]} test_cases: [testcaseids] """ app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()") test_res = AFC_OK results_comparator = TestResultComparator(precision=cfg['precision'] or 0) # calculate max number of tests to run max_nbr_tests = len(test_cases) if not isinstance(cfg['tests2run'], type(None)): max_nbr_tests = int("".join(cfg['tests2run'])) while (max_nbr_tests != 0): app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() " f"Number of tests to run: {max_nbr_tests}") if max_nbr_tests < len(test_cases): del test_cases[-(len(test_cases) - max_nbr_tests):] # if stress mode execute testcases in parallel and concurrent if cfg['stress'] == 1: app_log.debug(f"{inspect.stack()[0][3]}() max {max_nbr_tests}" f" len {len(test_cases)}") inputs = [(cfg, reqs, resps, results_comparator, ids, [test]) for test in test_cases] with Pool(max_nbr_tests) as my_pool: results = my_pool.starmap(_run_tests, inputs) if not any(r == 0 for r in results): test_res = AFC_ERR else: res = _run_tests(cfg, reqs, resps, results_comparator, ids, test_cases) if res != AFC_OK: test_res = res # when required to run more tests than there are testcases # start run run from the beginning max_nbr_tests -= len(test_cases) return test_res def _convert_reqs_n_resps_to_dict(cfg): """ Fetch test vectors and responses from the DB Convert to dictionaries indexed by id or original index from table Prepare list of new indexes """ app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()") if not os.path.isfile(cfg['db_filename']): app_log.error('Missing DB file %s', cfg['db_filename']) return AFC_ERR con = sqlite3.connect(cfg['db_filename']) cur = con.cursor() cur.execute('SELECT * FROM %s' % TBL_REQS_NAME) found_reqs = cur.fetchall() cur.execute('SELECT * FROM %s' % TBL_RESPS_NAME) found_resps = cur.fetchall() con.close() # reformat the reqs_dict and resp_dict accordingly if not isinstance(cfg['testcase_ids'], type(None)): app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() by id") reqs_dict = { row[0]: [json.loads(row[1])] for row in found_reqs } resp_dict = { row[0]: [row[1], row[2]] for row in found_resps } ids_dict = { row[0]: [row[0], req_index + 1] for req_index, row in enumerate(found_reqs) } test_indx = list(map(str.strip, cfg.pop("testcase_ids").split(','))) cfg.pop("testcase_indexes") else: app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() by index") reqs_dict = { str(req_index + 1): [json.loads(row[1])] for req_index, row in enumerate(found_reqs) } resp_dict = { str(resp_index + 1): [row[1], row[2]] for resp_index, row in enumerate(found_resps) } ids_dict = { str(req_index + 1): [row[0], req_index + 1] for req_index, row in enumerate(found_reqs) } if not isinstance(cfg['testcase_indexes'], type(None)): test_indx = list(map(str.strip, cfg.pop("testcase_indexes").split(','))) cfg.pop("testcase_ids") else: test_indx = [ str(item) for item in list(range(1, len(reqs_dict) + 1)) ] # build list of indexes, omitting non-existing elements test_cases = list() for i in range(0, len(test_indx)): if reqs_dict.get(test_indx[i]) is None: app_log.debug(f"Missing value for index {test_indx[i]}") continue test_cases.append(test_indx[i]) app_log.debug(f"{inspect.stack()[0][3]}() Final list of indexes. " f"{test_cases}") return reqs_dict, resp_dict, ids_dict, test_cases def run_test(cfg): """Fetch test vectors from the DB and run tests""" app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() " f"{cfg['tests']}, {cfg['url_path']}") reqs_dict, resp_dict, ids, test_cases = _convert_reqs_n_resps_to_dict(cfg) return prep_and_run_tests(cfg, reqs_dict, resp_dict, ids, test_cases) def stress_run(cfg): """ Get test vectors from the database and run tests in parallel """ app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() " f"{cfg['tests']}, {cfg['url_path']}") cfg['stress'] = 1 return run_test(cfg) def _run_cert_tests(cfg): """ Run tests """ app_log.debug( f"({os.getpid()}) {inspect.stack()[0][3]}() {cfg['url_path']}") test_res = AFC_OK try: if isinstance(cfg['cli_cert'], type(None)): rawresp = requests.get(cfg['url_path'], verify="".join(cfg['ca_cert'])) else: rawresp = requests.get(cfg['url_path'], cert=("".join(cfg['cli_cert']), "".join(cfg['cli_key'])), verify="".join(cfg['ca_cert'])) except Exception as e: app_log.error(f"({os.getpid()}) {inspect.stack()[0][3]}() {e}") test_res = AFC_ERR except OSError as os_err: proc = psutil.Process() app_log.error(f"({os.getpid()}) {inspect.stack()[0][3]}() " f"{os_err} - {proc.open_files()}") test_res = AFC_ERR else: if rawresp.status_code != 200: test_res = AFC_ERR return test_res def run_cert_tests(cfg): """ Run certificate tests """ app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()") test_res = AFC_OK # calculate max number of tests to run if isinstance(cfg['tests2run'], type(None)): app_log.error(f"Missing number of tests to run.") return AFC_ERR max_nbr_tests = int("".join(cfg['tests2run'])) before_ts = time.monotonic() while (max_nbr_tests != 0): # if stress mode execute testcases in parallel and concurrent inputs = [(cfg)] with Pool(max_nbr_tests) as my_pool: results = my_pool.map(_run_cert_tests, inputs) if not any(r == 0 for r in results): test_res = AFC_ERR max_nbr_tests -= 1 tm_secs = time.monotonic() - before_ts app_log.info(f"Tests {max_nbr_tests} done at {tm_secs:.3f} s") return test_res log_level_map = { 'debug': logging.DEBUG, # 10 'info': logging.INFO, # 20 'warn': logging.WARNING, # 30 'err': logging.ERROR, # 40 'crit': logging.CRITICAL, # 50 } def set_log_level(opt): app_log.setLevel(log_level_map[opt]) return log_level_map[opt] def get_version(cfg): """Get AFC test utility version""" app_log.info('AFC Test utility version %s', __version__) app_log.info('AFC Test db hash %s', get_md5(AFC_TEST_DB_FILENAME)) def pre_hook(cfg): """Execute provided functionality prior to main command""" app_log.debug(f"{inspect.stack()[0][3]}()") return subprocess.call(cfg['prefix_cmd']) def parse_run_test_args(cfg): """Parse arguments for command 'run_test'""" app_log.debug(f"{inspect.stack()[0][3]}()") if isinstance(cfg['addr'], list): # check if provided required certification files if (cfg['prot'] != AFC_PROT_NAME): # update URL if not the default protocol cfg['url_path'] = cfg['prot'] + '://' cfg['url_path'] += str(cfg['addr'][0]) + ':' + str(cfg['port']) cfg['base_url'] = cfg['url_path'] + '/' if cfg['webui'] is False: cfg['url_path'] += AFC_URL_SUFFIX + AFC_REQ_NAME else: cfg['url_token'] = cfg['url_path'] + AFC_WEBUI_URL_SUFFIX +\ AFC_WEBUI_TOKEN cfg['url_path'] += AFC_URL_SUFFIX + AFC_WEBUI_REQ_NAME return AFC_OK def parse_run_cert_args(cfg): """Parse arguments for command 'run_cert'""" app_log.debug(f"{inspect.stack()[0][3]}()") if ((not isinstance(cfg['addr'], list)) or isinstance(cfg['ca_cert'], type(None))): app_log.error(f"{inspect.stack()[0][3]}() Missing arguments") return AFC_ERR cfg['url_path'] = cfg['prot'] + '://' + str(cfg['addr'][0]) +\ ':' + str(cfg['port']) + '/' return AFC_OK # available commands to execute in alphabetical order execution_map = { 'add_reqs': [add_reqs, parse_run_test_args], 'ins_reqs': [insert_reqs, parse_run_test_args], 'ext_reqs': [extend_reqs, parse_run_test_args], 'ins_devs': [insert_devs, parse_run_test_args], 'cmp_cfg': [compare_afc_config, parse_run_test_args], 'dry_run': [dry_run_test, parse_run_test_args], 'dump_db': [dump_database, parse_run_test_args], 'get_nbr_testcases': [get_nbr_testcases, parse_run_test_args], 'exp_adm_cfg': [export_admin_config, parse_run_test_args], 'parse_tests': [parse_tests, parse_run_test_args], 'reacq': [start_acquisition, parse_run_test_args], 'run': [run_test, parse_run_test_args], 'run_cert': [run_cert_tests, parse_run_cert_args], 'stress': [stress_run, parse_run_test_args], 'ver': [get_version, parse_run_test_args], } def make_arg_parser(): """Define command line options""" args_parser = argparse.ArgumentParser( epilog=__doc__.strip(), formatter_class=argparse.RawTextHelpFormatter) args_parser.add_argument('--addr', nargs=1, type=str, help="
- set AFC Server address.\n") args_parser.add_argument('--prot', nargs='?', choices=['https', 'http'], default='https', help=" - set connection protocol " "(default=https).\n") args_parser.add_argument('--port', nargs='?', default='443', type=int, help=" - set connection port " "(default=443).\n") args_parser.add_argument('--conn_type', nargs='?', choices=['sync', 'async'], default='sync', help=" - set connection to be " "synchronous or asynchronous (default=sync).\n") args_parser.add_argument('--conn_tm', nargs='?', default=None, type=int, help=" - set timeout for asynchronous " "connection (default=None). \n") args_parser.add_argument('--verif', action='store_true', help=" - skip SSL verification " "on post request.\n") args_parser.add_argument('--outfile', nargs=1, type=str, help=" - set filename for output " "of tests results.\n") args_parser.add_argument( '--outpath', nargs=1, type=str, help=" - set path to output filename for " "results output.\n") args_parser.add_argument('--infile', nargs=1, type=str, help=" - set filename as a source " "for test requests.\n") args_parser.add_argument('--debug', action='store_true', help="during a request make files " "with details for debugging.\n") args_parser.add_argument('--elaborated_debug', action='store_true', help="during a request make files " "with even more details for debugging.\n") args_parser.add_argument('--gui', action='store_true', help="during a request make files " "with details for debugging.\n") args_parser.add_argument('--webui', action='store_true', help="during a request make files\n") args_parser.add_argument('--log', type=set_log_level, default='info', dest='log_level', help=" - set " "logging level (default=info).\n") args_parser.add_argument('--testcase_indexes', nargs='?', help=" - set single or group of tests " "to run.\n") args_parser.add_argument( '--testcase_ids', nargs='?', help=" - set single or group of test case ids " "to run.\n") args_parser.add_argument('--table', nargs=1, type=str, help=" - set " "database table name.\n") args_parser.add_argument('--idx', nargs='?', type=int, help=" - set table record index.\n") args_parser.add_argument('--test_id', default='all', help="WFA test identifier, for example " "srs, urs, fsp, ibp, sip, etc (default=all).\n") args_parser.add_argument( "--precision", metavar="PRECISION_DB", type=float, help="Maximum allowed deviation of power limits from " "reference values in dB. 0 means exact match is " "required. Default is to use hash-based exact match " "comparison") args_parser.add_argument('--cache', action='store_true', help="enable cache during a request, otherwise " "disabled.\n") args_parser.add_argument( '--tests2run', nargs=1, type=str, help=" - the total number of tests to run.\n") args_parser.add_argument( '--ca_cert', nargs=1, type=str, help=" - set CA certificate filename to " "verify the remote server.\n") args_parser.add_argument( '--cli_cert', nargs=1, type=str, help=" - set client certificate filename.\n") args_parser.add_argument( '--cli_key', nargs=1, type=str, help=" - set client private key filename.\n") args_parser.add_argument('--dev_desc', action='store_true', help="parse only device descriptors values.\n") args_parser.add_argument( '--prefix_cmd', nargs='*', type=str, help="hook to call currently provided command before " "main command specified by --cmd option.\n") args_parser.add_argument('--email_from', type=str, help=" - set sender email.\n") args_parser.add_argument('--email_to', type=str, help=" - set receiver email.\n") args_parser.add_argument( '--email_cc', type=str, help=" - set receiver of cc email.\n") args_parser.add_argument('--email_pwd', type=str, help=" - set sender email password.\n") args_parser.add_argument( '--cmd', choices=execution_map.keys(), nargs='?', help="run - run test from DB and compare.\n" "dry_run - run test from file and show response.\n" "exp_adm_cfg - export admin config into a file.\n" "add_reqs - run test from provided file and insert with response into " "the databsse.\n" "ins_reqs - insert test vectors from provided file into the test db.\n" "ins_devs - insert device descriptors from provided file " "into the test db.\n" "dump_db - dump tables from the database.\n" "get_nbr_testcases - return number of testcases.\n" "parse_tests - parse WFA provided tests into a files.\n" "reacq - reacquision every test from the database and insert new " "responses.\n" "cmp_cfg - compare AFC config from the DB to provided from a file.\n" "stress - run tests in stress mode.\n" "ver - get version.\n") return args_parser def prepare_args(parser, cfg): """Prepare required parameters""" app_log.debug(f"{inspect.stack()[0][3]}() {parser.parse_args()}") cfg.update(vars(parser.parse_args())) # check if test indexes and test ids are given if cfg["testcase_indexes"] and cfg["testcase_ids"]: # reject the request app_log.error('Please use either "--testcase_indexes"' ' or "--testcase_ids" but not both') return AFC_ERR return execution_map[cfg['cmd']][1](cfg) def main_execution(cfg): """Call all requested commands""" app_log.debug(f"{inspect.stack()[0][3]}()") if (isinstance(cfg['prefix_cmd'], list) and (pre_hook(cfg) == AFC_ERR)): return AFC_ERR if isinstance(cfg['cmd'], type(None)): parser.print_help() return AFC_ERR return execution_map[cfg['cmd']][0](cfg) def main(): """Main function of the utility""" app_log.setLevel(logging.INFO) console_log = logging.StreamHandler() console_log.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) app_log.addHandler(console_log) res = AFC_OK parser = make_arg_parser() test_cfg = TestCfg() if prepare_args(parser, test_cfg) == AFC_ERR: # error in preparing arguments res = AFC_ERR else: res = main_execution(test_cfg) sys.exit(res) if __name__ == '__main__': try: main() except KeyboardInterrupt: sys.exit(1) # Local Variables: # mode: Python # indent-tabs-mode: nil # python-indent: 4 # End: # # vim: sw=4:et:tw=80:cc=+1