Files
openafc_final/tests/afc_tests.py
2024-03-25 10:11:24 -04:00

2191 lines
79 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Copyright (C) 2021 Broadcom. All rights reserved. The term "Broadcom"
# refers solely to the Broadcom Inc. corporate affiliate that owns
# the software below. This work is licensed under the OpenAFC Project License,
# a copy of which is included with this software program
#
"""
Description
The execution always done in specific order: configuration, database, testing.
In case there are not available valid responses to compare with
need to make acquisition and create new database as following.
./afc_tests.py --addr <address> --log debug --cmd run
"""
import argparse
import certifi
import csv
import datetime
import hashlib
import inspect
import io
import json
import logging
import openpyxl as oxl
import os
import re
import requests
import shutil
import sqlite3
import subprocess
import sys
import time
import smtplib
import ssl
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from bs4 import BeautifulSoup
from deepdiff import DeepDiff
from multiprocessing.pool import Pool
from string import Template
from _afc_types import *
from _afc_errors import *
from _version import __version__
from _wfa_types import *
AFC_URL_SUFFIX = '/fbrat/ap-afc/'
AFC_REQ_NAME = 'availableSpectrumInquiry'
AFC_WEBUI_URL_SUFFIX = '/fbrat/ratapi/v1/'
AFC_WEBUI_REQ_NAME = 'availableSpectrumInquirySec'
AFC_WEBUI_TOKEN = 'about_csrf'
headers = {'content-type': 'application/json'}
AFC_TEST_DB_FILENAME = 'afc_input.sqlite3'
TBL_REQS_NAME = 'test_vectors'
TBL_RESPS_NAME = 'test_data'
TBL_USERS_NAME = 'user_config'
TBL_AFC_CFG_NAME = 'afc_config'
TBL_AP_CFG_NAME = 'ap_config'
AFC_PROT_NAME = 'https'
# metadata variables
TESTCASE_ID = "testCaseId"
# mandatory keys that need to be read from input text file
MANDATORY_METADATA_KEYS = {TESTCASE_ID}
app_log = logging.getLogger(__name__)
class TestCfg(dict):
"""Keep test configuration"""
def __init__(self):
dict.__init__(self)
self.update({
'cmd': '',
'port': 443,
'url_path': AFC_PROT_NAME + '://',
'log_level': logging.INFO,
'db_filename': AFC_TEST_DB_FILENAME,
'tests': None,
'is_test_by_index': True,
'resp': '',
'stress': 0,
'precision': None})
def _send_recv(self, params):
"""Run AFC test and wait for respons"""
data = params.split("'")
get_req = ''
for item in data:
try:
json.loads(item)
except ValueError as e:
continue
get_req = item
break
new_req_json = json.loads(get_req.encode('utf-8'))
new_req = json.dumps(new_req_json, sort_keys=True)
if (self['webui'] is False):
params_data = {
'conn_type': self['conn_type'],
'debug': self['debug'],
'edebug': cfg['elaborated_debug'],
'gui': self['gui']
}
else:
# emulating request calll from webui
params_data = {
'debug': 'True',
'edebug': cfg['elaborated_debug'],
'gui': 'True'
}
if (self['cache'] == False):
params_data['nocache'] = 'True'
ser_cert = not self['verif']
cli_certs = ()
if (self['prot'] == AFC_PROT_NAME and
self['verif'] == False):
# add mtls certificates if explicitly provided
if not isinstance(self['cli_cert'], type(None)):
cli_certs = ("".join(self['cli_cert']),
"".join(self['cli_key']))
# add tls certificates if explicitly provided
if not isinstance(self['ca_cert'], type(None)):
ser_cert = "".join(self['ca_cert'])
app_log.debug(f"Client {cli_certs}, Server {ser_cert}")
before_ts = time.monotonic()
rawresp = requests.post(
self['url_path'],
params=params_data,
data=new_req,
headers=headers,
timeout=600, # 10 min
verify=self['verif'])
resp = rawresp.json()
tId = resp.get('taskId')
if ((self['conn_type'] == 'async') and
(not isinstance(tId, type(None)))):
tState = resp.get('taskState')
params_data['task_id'] = tId
while (tState == 'PENDING') or (tState == 'PROGRESS'):
app_log.debug('_run_test() state %s, tid %s, status %d',
tState, tId, rawresp.status_code)
time.sleep(2)
rawresp = requests.get(self['url_path'],
params=params_data)
if rawresp.status_code == 200:
resp = rawresp.json()
break
tm_secs = time.monotonic() - before_ts
app_log.info('Test done at %.1f secs', tm_secs)
return new_req, resp
class TestResultComparator:
""" AFC Response comparator
Private instance attributes:
_precision -- Precision for results' comparison in dB. 0 means exact match
"""
def __init__(self, precision):
""" Constructor
Arguments:
precision -- Precision for results' comparison in dB. 0 means exact
match
"""
assert precision >= 0
self._precision = precision
def compare_results(self, ref_str, result_str):
""" Compares reference and actual AFC responses
Arguments:
ref_str -- Reference response JSON in string representation
result_str -- Actual response json in string representation
Returns list of difference description strings. Empty list means match
"""
# List of difference description strings
diffs = []
# Reference and actual JSON dictionaries
jsons = []
for s, kind in [(ref_str, "reference"), (result_str, "result")]:
try:
jsons.append(json.loads(s))
except json.JSONDecodeError as ex:
diffs.append(f"Failed to decode {kind} JSON data: {ex}")
return diffs
self._recursive_compare(jsons[0], jsons[1], [], diffs)
return diffs
def _recursive_compare(self, ref_json, result_json, path, diffs):
""" Recursive comparator of JSON nodes
Arguments:
ref_json -- Reference response JSON dictionary
result_json -- Actual response JSON dictionary
path -- Path (sequence of indices) to node in question
diffs -- List of difference description strings to update
"""
# Items in questions in JSON dictionaries
ref_item = self._get_item(ref_json, path)
result_item = self._get_item(result_json, path)
# Human readable path representation for difference messages
path_repr = f"[{']['.join(str(idx) for idx in path)}]"
if ref_item == result_item:
return # Items are equal - nothing to do
# So, items are different. What's the difference?
if isinstance(ref_item, dict):
# One item is dictionary. Other should also should be dictionary...
if not isinstance(result_item, dict):
diffs.append(f"Different item types at {path_repr}")
return
# ... with same set of keys
ref_keys = set(ref_item.keys())
result_keys = set(result_item.keys())
for unique_key in (ref_keys ^ result_keys):
if self._compare_channel_lists(ref_json, result_json,
path + [unique_key], diffs):
ref_keys -= {unique_key}
result_keys -= {unique_key}
if ref_keys != result_keys:
msg = f"Different set of keys at {path_repr}"
for kind, elems in [("reference", ref_keys - result_keys),
("result", result_keys - ref_keys)]:
if elems:
msg += \
f" Unique {kind} keys: {', '.join(sorted(elems))}."
diffs.append(msg)
return
# Comparing values for individual keys
for key in sorted(ref_keys):
self._recursive_compare(ref_json, result_json, path + [key],
diffs)
elif isinstance(ref_item, list):
# One item is list. Other should also be list...
if not isinstance(result_item, list):
diffs.append(f"Different item types at {path_repr}")
return
# If this is a channel list (or part thereof - handle it)
if self._compare_channel_lists(ref_json, result_json, path, diffs):
return
# Proceeding with comparison of other list kinds
if len(ref_item) != len(result_item):
diffs.append(
(f"Different list lengths at at {path_repr}: "
f"{len(ref_item)} elements in reference vs "
f"{len(result_item)} elements in result"))
return
# Comparing individual elements
for i in range(len(ref_item)):
self._recursive_compare(ref_json, result_json, path + [i],
diffs)
else:
# Items should be scalars
for item, kind in [(ref_item, "Reference"),
(result_item, "Result")]:
if not isinstance(item, (int, float, str)):
diffs.append((f"{kind} data contains unrecognized item "
f"type at {path_repr}"))
return
diffs.append((f"Difference at {path}: reference content is "
f"{ref_item}, result content is {result_item}"))
def _compare_channel_lists(self, ref_json, result_json, path, diffs):
""" Trying to compare channel lists
Arguments:
ref_json -- Reference response JSON dictionary
result_json -- Actual response JSON dictionary
path -- Path (sequence of indices) to node in question
diffs -- List of difference description strings to update
Returns true if channel list comparison was done, no further action
required, False if node is not a channel list, should be compared as
usual
"""
if path[-1] == "channelCfi":
# Comparison will be made at "maxEirp"
return True
# Human readable path representation for difference messages
path_repr = f"[{']['.join(str(idx) for idx in path)}]"
# EIRP dictionaries, indexed by channel identification (number or
# frequency range)
ref_channels = {}
result_channels = {}
if path[-1] == "maxEirp":
# Channel numbers
for kind, src, chan_dict in \
[("reference", ref_json, ref_channels),
("result", result_json, result_channels)]:
try:
numbers = self._get_item(src, path[:-1] + ["channelCfi"],
default_last=[])
chan_dict.update(
dict(zip([str(n) for n in numbers],
self._get_item(src, path, default_last=[]))))
except (TypeError, ValueError, KeyError):
diffs.append((f"Unrecognized channel list structure at "
f"{path_repr} in {kind}"))
return True
elif path[-1] == "availableFrequencyInfo":
# Channel frequencies
for kind, src, chan_dict in \
[("reference", ref_json, ref_channels),
("result", result_json, result_channels)]:
try:
for freq_info in self._get_item(src, path,
default_last=[]):
fr = freq_info["frequencyRange"]
low = fr['lowFrequency']
high = fr['highFrequency']
for freq in range(low, high):
chan_dict[f"[{freq} - {freq+1}"] = \
float(freq_info.get("maxPSD") or
freq_info.get("maxPsd"))
except (TypeError, ValueError, KeyError):
diffs.append((f"Unrecognized frequency list structure at "
f"{path_repr} in {kind}"))
return True
else:
return False
# Now will compare two channel dictionaries
# First looking for unique channels
for this_kind, this_channels, other_kind, other_channels in \
[("reference", ref_channels, "result", result_channels),
("result", result_channels, "reference", ref_channels)]:
for channel in sorted(set(this_channels.keys()) -
set(other_channels.keys())):
diffs.append(
(f"Channel {channel} present in {path_repr} of "
f"{this_kind} with EIRP limit of "
f"{this_channels[channel]}dBm, but absent in "
f"{other_kind}"))
# Then looking for different EIRPs on common channels
for channel in sorted(set(ref_channels.keys()) &
set(result_channels.keys())):
diff = abs(ref_channels[channel] - result_channels[channel])
if diff <= self._precision:
continue
diffs.append(
(f"Different values in {path_repr} for channel {channel}: "
f"reference has EIRP of {ref_channels[channel]}dBm, "
f"result has EIRP of {result_channels[channel]}dBm, "
f"difference is: {diff:g}dB"))
return True
def _get_item(self, j, path, default_last=None):
""" Retrieves item by sequence of indices
Arguments:
j -- JSON dictionary
path -- Sequence of indices
default_last -- What to return if item at last index is absent. None
means throw exception (if nonlast item is absent -
exception is also thrown)
Returns retrieved item
"""
for path_idx, elem_idx in enumerate(path):
try:
j = j[elem_idx]
except (KeyError, IndexError):
if (default_last is not None) and \
(path_idx == (len(path) - 1)):
return default_last
raise
return j
def json_lookup(key, json_obj, val):
"""Lookup for key in json and change it value if required"""
keepit = []
def lookup(key, json_obj, val, keepit):
if isinstance(json_obj, dict):
for k, v in json_obj.items():
if isinstance(v, (dict, list)):
lookup(key, v, val, keepit)
elif k == key:
keepit.append(v)
if val:
json_obj[k] = val
elif isinstance(json_obj, list):
for node in json_obj:
lookup(key, node, val, keepit)
return keepit
found = lookup(key, json_obj, val, keepit)
return found
def get_md5(fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def create_email_attachment(filename):
part = None
with open(filename, "rb") as attachment:
# Add file as application/octet-stream
# Email client can usually download this automatically as attachment
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
# Add header as key/value pair to attachment part
encoders.encode_base64(part)
part.add_header("Content-Disposition",
f"attachment; filename= {filename}",)
return part
def send_email(cfg):
"""Send an email to predefined adress using gmail smtp server"""
sender = cfg['email_from']
recipient = cfg['email_to']
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()"
f" from: {sender}, to: {recipient}")
context = ssl.create_default_context()
with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server:
server.login(sender, cfg['email_pwd'])
body = f"Please find attached responses."
message = MIMEMultipart("alternative")
message['Subject'] = f"AFC test results"
message['From'] = sender
message['To'] = recipient
if not isinstance(cfg['email_cc'], type(None)):
message['Cc'] = cfg['email_cc']
# Turn these into plain/html MIMEText objects
message.attach(MIMEText(body, "plain"))
message.attach(create_email_attachment(cfg['outfile'][0]))
server.sendmail(sender, recipient, message.as_string())
def _send_recv(cfg, req_data, ssn=None):
"""Send AFC request and receiver it's response"""
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()")
new_req_json = json.loads(req_data.encode('utf-8'))
new_req = json.dumps(new_req_json, sort_keys=True)
if (cfg['webui'] is False):
params_data = {
'conn_type': cfg['conn_type'],
'debug': cfg['debug'],
'edebug': cfg['elaborated_debug'],
'gui': cfg['gui']
}
if (cfg['cache'] == False):
params_data['nocache'] = 'True'
post_func = requests.post
else:
# emulating request call from webui
params_data = {
'debug': 'True',
'gui': 'True'
}
headers['Accept-Encoding'] = 'gzip, defalte'
headers['Referer'] = cfg['base_url'] + 'fbrat/www/index.html'
headers['X-Csrf-Token'] = cfg['token']
app_log.debug(
f"({os.getpid()}) {inspect.stack()[0][3]}()\n"
f"Cookies: {requests.utils.dict_from_cookiejar(ssn.cookies)}")
post_func = ssn.post
ser_cert = ()
cli_certs = None
if ((cfg['prot'] == AFC_PROT_NAME and
cfg['verif']) or (cfg['ca_cert'])):
# add mtls certificates if explicitly provided
if not isinstance(cfg['cli_cert'], type(None)):
cli_certs = ("".join(cfg['cli_cert']), "".join(cfg['cli_key']))
# add tls certificates if explicitly provided
if not isinstance(cfg['ca_cert'], type(None)):
ser_cert = "".join(cfg['ca_cert'])
cfg['verif'] = True
else:
os.environ['REQUESTS_CA_BUNDLE'] = certifi.where()
app_log.debug(f"REQUESTS_CA_BUNDLE "
f"{os.environ.get('REQUESTS_CA_BUNDLE')}")
if "REQUESTS_CA_BUNDLE" in os.environ:
ser_cert = "".join(os.environ.get('REQUESTS_CA_BUNDLE'))
cfg['verif'] = True
else:
app_log.error(f"Missing CA certificate while forced.")
return
app_log.debug(f"Client {cli_certs}, Server {ser_cert}")
try:
rawresp = post_func(
cfg['url_path'],
params=params_data,
data=new_req,
headers=headers,
timeout=600, # 10 min
cert=cli_certs,
verify=ser_cert if cfg['verif'] else False)
rawresp.raise_for_status()
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as err:
app_log.error(f"{err}")
return
resp = rawresp.json()
tId = resp.get('taskId')
if ((cfg['conn_type'] == 'async') and
(not isinstance(tId, type(None)))):
tState = resp.get('taskState')
params_data['task_id'] = tId
while (tState == 'PENDING') or (tState == 'PROGRESS'):
app_log.debug('_run_test() state %s, tid %s, status %d',
tState, tId, rawresp.status_code)
time.sleep(2)
rawresp = requests.get(cfg['url_path'],
params=params_data)
if rawresp.status_code == 200:
resp = rawresp.json()
break
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()"
f" Resp status: {rawresp.status_code}")
return resp
def _send_recv_token(cfg, ssn):
"""Making login, open session and getting CSRF token"""
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()")
token = ''
ser_cert = ()
cli_certs = None
app_log.debug(f"=== ser {type(ser_cert)}")
if ((cfg['prot'] == AFC_PROT_NAME and
cfg['verif']) or (cfg['ca_cert'])):
# add mtls certificates if explicitly provided
if not isinstance(cfg['cli_cert'], type(None)):
cli_certs = ("".join(cfg['cli_cert']), "".join(cfg['cli_key']))
# add tls certificates if explicitly provided
if not isinstance(cfg['ca_cert'], type(None)):
ser_cert = "".join(cfg['ca_cert'])
cfg['verif'] = True
else:
os.environ['REQUESTS_CA_BUNDLE'] = certifi.where()
app_log.debug(f"REQUESTS_CA_BUNDLE "
f"{os.environ.get('REQUESTS_CA_BUNDLE')}")
if "REQUESTS_CA_BUNDLE" in os.environ:
ser_cert = "".join(os.environ.get('REQUESTS_CA_BUNDLE'))
cfg['verif'] = True
else:
app_log.error(f"Missing CA certificate while forced.")
return token
app_log.debug(f"Client {cli_certs}, Server {ser_cert}")
# get login
ssn.headers.update({
'Accept-Encoding': 'gzip, defalte'
})
url_login = cfg['base_url'] + 'fbrat/user/sign-in'
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()\n"
f"===> URL {url_login}\n"
f"===> Status {ssn.headers}\n"
f"===> Cookies: {ssn.cookies}\n")
try:
rawresp = ssn.get(url_login,
stream=False,
cert=cli_certs,
verify=ser_cert if cfg['verif'] else False)
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as err:
app_log.error(f"{err}")
return token
soup = BeautifulSoup(rawresp.text, 'html.parser')
inp_tkn = soup.find('input', id='csrf_token')
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()\n"
f"<--- Status {rawresp.status_code}\n"
f"<--- Headers {rawresp.headers}\n"
f"<--- Cookies: {ssn.cookies}\n"
f"<--- Input: {inp_tkn}\n")
token = inp_tkn.get('value')
# fetch username and password from test db
con = sqlite3.connect(cfg['db_filename'])
cur = con.cursor()
cur.execute('SELECT * FROM %s\n' % TBL_USERS_NAME)
found_user = cur.fetchall()
con.close()
found_json = json.loads(found_user[0][1])
app_log.debug(f"Found Users: {found_json['username']}")
form_data = {
'next': '/',
'reg_next': '/',
'csrf_token': token,
'username': found_json['username'],
'password': found_json['password']
}
ssn.headers.update({
'Content-Type': 'application/x-www-form-urlencoded',
'Referer': url_login
})
try:
rawresp = ssn.post(url_login,
data=form_data,
stream=False,
cert=cli_certs,
verify=ser_cert if cfg['verif'] else False)
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as err:
app_log.error(f"{err}")
return token
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()\n"
f"<--- Status {rawresp.status_code}\n"
f"<--- Headers {rawresp.headers}\n"
f"<--- Cookies: {ssn.cookies}\n")
return token
def make_db(filename):
"""Create DB file only with schema"""
app_log.debug('%s()', inspect.stack()[0][3])
if os.path.isfile(filename):
app_log.debug('%s() The db file is exists, no need to create new one.',
inspect.stack()[0][3])
return True
app_log.info('Create DB tables (%s, %s) from source files',
TBL_REQS_NAME, TBL_RESPS_NAME)
con = sqlite3.connect(filename)
cur = con.cursor()
cur.execute('CREATE TABLE IF NOT EXISTS ' + TBL_REQS_NAME +
' (test_id varchar(50), data json)')
cur.execute('CREATE TABLE IF NOT EXISTS ' + TBL_RESPS_NAME +
' (test_id varchar(50), data json, hash varchar(255))')
con.close()
return True
def compare_afc_config(cfg):
"""
Compare AFC configuration from the DB with provided one.
"""
app_log.debug('%s()', inspect.stack()[0][3])
if not os.path.isfile(cfg['db_filename']):
app_log.error('Missing DB file %s', cfg['db_filename'])
return AFC_ERR
con = sqlite3.connect(cfg['db_filename'])
cur = con.cursor()
cur.execute('SELECT * FROM %s' % TBL_AFC_CFG_NAME)
found_cfgs = cur.fetchall()
con.close()
# get record from the input file
if isinstance(cfg['infile'], type(None)):
app_log.debug('Missing input file to compare with.')
return AFC_OK
filename = cfg['infile'][0]
with open(filename, 'r') as fp_test:
while True:
rec = fp_test.read()
if not rec:
break
try:
get_rec = json.loads(rec)
except (ValueError, TypeError) as e:
continue
break
app_log.debug(json.dumps(get_rec, sort_keys=True, indent=4))
get_cfg = ''
app_log.debug('Found %d config records', len(found_cfgs))
idx = 0
max_idx = len(found_cfgs)
if not isinstance(cfg['idx'], type(None)):
idx = cfg['idx']
if idx >= max_idx:
app_log.error("The index (%d) is out of range (0 - %d).",
idx, max_idx - 1)
return AFC_ERR
max_idx = idx + 1
while idx < max_idx:
for item in list(found_cfgs[idx]):
try:
get_cfg = json.loads(item)
except (ValueError, TypeError) as e:
continue
break
app_log.debug("Record %d:\n%s",
idx,
json.dumps(get_cfg['afcConfig'],
sort_keys=True,
indent=4))
get_diff = DeepDiff(get_cfg['afcConfig'],
get_rec,
report_repetition=True)
app_log.info("rec %d:\n%s", idx, get_diff)
idx += 1
return AFC_OK
def start_acquisition(cfg):
"""
Fetch test vectors from the DB, drop previous response table,
run tests and fill responses in the DB with hash values
"""
app_log.debug(f"{inspect.stack()[0][3]}()")
found_reqs, found_resp, ids, test_ids = _convert_reqs_n_resps_to_dict(cfg)
# check if to make acquisition of all tests
con = sqlite3.connect(cfg['db_filename'])
cur = con.cursor()
# drop response table and create new one if all testcases required
# to reacquisition
all_resps = False
if (len(test_ids) == len(found_reqs)):
all_resps = True
if all_resps:
try:
app_log.debug(f"{inspect.stack()[0][3]}() "
f"drop table {TBL_RESPS_NAME}")
cur.execute('DROP TABLE ' + TBL_RESPS_NAME)
except Exception as OperationalError:
# awkward but bearable
app_log.debug('Missing table %s', TBL_RESPS_NAME)
cur.execute('CREATE TABLE ' + TBL_RESPS_NAME +
' (test_id varchar(50), data json, hash varchar(255))')
app_log.info(f"Number of tests to make acquisition - {len(test_ids)}")
for test_id in test_ids:
req_id = ids[test_id][0]
app_log.debug(f"Request: {req_id}")
resp = _send_recv(cfg, json.dumps(found_reqs[test_id][0]))
if isinstance(resp, type(None)):
app_log.error(f"Test {test_ids} ({req_id}) is Failed.")
continue
json_lookup('availabilityExpireTime', resp, '0')
upd_data = json.dumps(resp, sort_keys=True)
hash_obj = hashlib.sha256(upd_data.encode('utf-8'))
app_log.debug(f"{inspect.stack()[0][3]}() new: "
f"{hash_obj.hexdigest()}")
if all_resps:
cur.execute('INSERT INTO ' + TBL_RESPS_NAME + ' values ( ?, ?, ?)',
[req_id,
upd_data,
hash_obj.hexdigest()])
con.commit()
elif (test_id in found_resp.keys() and
found_resp[test_id][1] == hash_obj.hexdigest()):
app_log.debug(f"Skip to update hash for {req_id}. "
f"Found the same value.")
continue
else:
hash = hash_obj.hexdigest()
cur.execute('UPDATE ' + TBL_RESPS_NAME + ' SET ' +
'data = ?, hash = ? WHERE test_id =?',
(upd_data, hash, ids[test_id][0]))
con.commit()
con.close()
return AFC_OK
def process_jsonline(line):
"""
Function to process the input line from .txt file containing comma
separated json strings
"""
# convert the input line to list of dictioanry/dictionaries
line_list = json.loads("[" + line + "]")
request_dict = line_list[0]
metadata_dict = line_list[1] if len(line_list) > 1 else {}
return request_dict, metadata_dict
def get_db_req_resp(cfg):
"""
Function to retrieve request and response records from the database
"""
con = sqlite3.connect(cfg['db_filename'])
cur = con.cursor()
cur.execute('SELECT * FROM %s' % TBL_REQS_NAME)
found_reqs = cur.fetchall()
db_reqs_list = [row[0] for row in found_reqs]
cur.execute('SELECT * FROM %s' % TBL_RESPS_NAME)
found_resps = cur.fetchall()
db_resp_list = [row[0] for row in found_resps]
con.close()
return db_reqs_list, db_resp_list
def insert_reqs_int(filename, con, cur):
"""
Insert requests from input file to a table in test db.
"""
with open(filename, 'r') as fp_test:
while True:
dataline = fp_test.readline()
if not dataline:
break
# process dataline arguments
app_log.debug(f"= {dataline}")
request_json, metadata_json = process_jsonline(dataline)
# reject the request if mandatory metadata arguments are not
# present
if not MANDATORY_METADATA_KEYS.issubset(
set(metadata_json.keys())):
# missing mandatory keys in test case input
app_log.error("Test case input does not contain required"
" mandatory arguments: %s",
", ".join(list(
MANDATORY_METADATA_KEYS - set(metadata_json.keys()))))
return AFC_ERR
app_log.info(f"Insert new request in DB "
f"({metadata_json[TESTCASE_ID]})")
app_log.debug(f"+ {metadata_json[TESTCASE_ID]}")
cur.execute('INSERT INTO ' + TBL_REQS_NAME + ' VALUES ( ?, ?)',
(metadata_json[TESTCASE_ID],
json.dumps(request_json),))
con.commit()
con.close()
return AFC_OK
def insert_reqs(cfg):
"""
Insert requests from input file to a table in test db.
Drop previous table of requests.
"""
app_log.debug(f"{inspect.stack()[0][3]}()")
if isinstance(cfg['infile'], type(None)):
app_log.error(f"Missing input file")
return AFC_ERR
filename = cfg['infile'][0]
app_log.debug(f"{inspect.stack()[0][3]}() {filename}")
if not os.path.isfile(filename):
app_log.error(f"Missing raw test data file {filename}")
return AFC_ERR
if not os.path.isfile(cfg['db_filename']):
app_log.error(f"Unable to find test db file.")
return AFC_ERR
con = sqlite3.connect(cfg['db_filename'])
# drop existing table of requests and create new one
app_log.info(f"Drop table of requests ({TBL_REQS_NAME})")
cur = con.cursor()
try:
cur.execute('DROP TABLE ' + TBL_REQS_NAME)
except Exception as OperationalError:
app_log.debug(f"Fail to drop, missing table {TBL_REQS_NAME}")
cur.execute('CREATE TABLE ' + TBL_REQS_NAME +
' (test_id varchar(50), data json)')
con.commit()
return insert_reqs_int(filename, con, cur)
def extend_reqs(cfg):
"""
Insert requests from input file to a table in test db.
Drop previous table of requests.
"""
app_log.debug(f"{inspect.stack()[0][3]}()")
if isinstance(cfg['infile'], type(None)):
app_log.error(f"Missing input file")
return AFC_ERR
filename = cfg['infile'][0]
app_log.debug(f"{inspect.stack()[0][3]}() {filename}")
if not os.path.isfile(filename):
app_log.error(f"Missing raw test data file {filename}")
return AFC_ERR
if not os.path.isfile(cfg['db_filename']):
app_log.error(f"Unable to find test db file.")
return AFC_ERR
con = sqlite3.connect(cfg['db_filename'])
# add more rows to existing table of requests
app_log.info(f"Extending table of requests ({TBL_REQS_NAME})")
cur = con.cursor()
return insert_reqs_int(filename, con, cur)
def insert_devs(cfg):
"""
Insert device descriptors from input file to a table in test db.
Drop previous table of devices.
"""
app_log.debug(f"{inspect.stack()[0][3]}()")
if isinstance(cfg['infile'], type(None)):
app_log.error(f"Missing input file")
return AFC_ERR
filename = cfg['infile'][0]
app_log.debug(f"{inspect.stack()[0][3]}() {filename}")
if not os.path.isfile(filename):
app_log.error(f"Missing raw test data file {filename}")
return AFC_ERR
if not os.path.isfile(cfg['db_filename']):
app_log.error(f"Unable to find test db file.")
return AFC_ERR
con = sqlite3.connect(cfg['db_filename'])
# drop existing table of requests and create new one
app_log.info(f"Drop table of devices ({TBL_AP_CFG_NAME})")
cur = con.cursor()
try:
cur.execute('DROP TABLE ' + TBL_AP_CFG_NAME)
except Exception as OperationalError:
app_log.debug(f"Fail to drop, missing table {TBL_AP_CFG_NAME}")
cur.execute('CREATE TABLE ' + TBL_AP_CFG_NAME +
' (ap_config_id, data json, user_id)')
cnt = 1
con.commit()
with open(filename, 'r') as fp_test:
while True:
dataline = fp_test.readline()
if not dataline or (len(dataline) < 72):
break
# process dataline arguments
app_log.debug(f"= {dataline}")
cur.execute('INSERT INTO ' + TBL_AP_CFG_NAME +
' VALUES ( ?, ?, ?)', (cnt, dataline[:-1], 1))
con.commit()
cnt += 1
con.close()
return AFC_OK
def add_reqs(cfg):
"""Prepare DB source files"""
app_log.debug(f"{inspect.stack()[0][3]}()")
if isinstance(cfg['infile'], type(None)):
app_log.error('Missing input file')
return AFC_ERR
filename = cfg['infile'][0]
app_log.debug('%s() %s', inspect.stack()[0][3], filename)
if not os.path.isfile(filename):
app_log.error('Missing raw test data file %s', filename)
return AFC_ERR
if not make_db(cfg['db_filename']):
return AFC_ERR
# fetch available requests and responses
db_reqs_list, db_resp_list = get_db_req_resp(cfg)
con = sqlite3.connect(cfg['db_filename'])
with open(filename, 'r') as fp_test:
while True:
dataline = fp_test.readline()
if not dataline:
break
# process dataline arguments
request_json, metadata_json = process_jsonline(dataline)
# reject the request if mandatory metadata arguments are not
# present
if not MANDATORY_METADATA_KEYS.issubset(
set(metadata_json.keys())):
# missing mandatory keys in test case input
app_log.error("Test case input does not contain required"
" mandatory arguments: %s",
", ".join(list(
MANDATORY_METADATA_KEYS - set(metadata_json.keys()))))
return AFC_ERR
# check if the test case already exists in the database test
# vectors
if metadata_json[TESTCASE_ID] in db_reqs_list:
app_log.error("Test case: %s already exists in database",
metadata_json[TESTCASE_ID])
break
app_log.info("Executing test case: %s", metadata_json[TESTCASE_ID])
new_req, resp = cfg._send_recv(json.dumps(request_json))
# get request id from a request, response not always has it
# the request contains test category
new_req_json = json.loads(new_req.encode('utf-8'))
req_id = json_lookup('requestId', new_req_json, None)
resp_res = json_lookup('shortDescription', resp, None)
if (resp_res[0] != 'Success') \
and (req_id[0].lower().find('urs') == -1) \
and (req_id[0].lower().find('ibp') == -1):
app_log.error('Failed in test response - %s', resp_res)
break
app_log.info('Got response for the request')
json_lookup('availabilityExpireTime', resp, '0')
app_log.info('Insert new request in DB')
cur = con.cursor()
cur.execute('INSERT INTO ' + TBL_REQS_NAME + ' VALUES ( ?, ?)',
(metadata_json[TESTCASE_ID], new_req,))
con.commit()
app_log.info('Insert new resp in DB')
upd_data = json.dumps(resp, sort_keys=True)
hash_obj = hashlib.sha256(upd_data.encode('utf-8'))
cur = con.cursor()
cur.execute('INSERT INTO ' + TBL_RESPS_NAME + ' values ( ?, ?, ?)',
[metadata_json[TESTCASE_ID],
upd_data, hash_obj.hexdigest()])
con.commit()
con.close()
return AFC_OK
def dump_table(conn, tbl_name, out_file, pref):
app_log.debug(f"{inspect.stack()[0][3]}() {tbl_name}")
fp_new = ''
if 'single' in out_file:
fp_new = open(out_file['single'], 'w')
conn.execute(f"SELECT * FROM {tbl_name}")
found_data = conn.fetchall()
for val in enumerate(found_data):
if isinstance(fp_new, io.IOBase):
fp_new.write(f"{str(val)}\n")
elif 'split' in out_file:
tbl_fname = {
TBL_REQS_NAME: '_Request.txt',
TBL_RESPS_NAME: '_Response.txt'
}
new_json = json.loads(val[1][1].encode('utf-8'))
prefix, name, nbr = val[1][0].split('.')
app_log.debug(f"{inspect.stack()[0][3]}() {name} {nbr}")
# omit URS testcases
if (name.lower().find('urs') != -1) or (pref and not pref == prefix):
continue
fp_test = open(f"{out_file['split']}/{prefix}_{name}_{nbr}" +
f"{tbl_fname[tbl_name]}", 'a')
fp_test.write(f"{val[1][1]}\n")
fp_test.close()
else:
# Just dump to the console
app_log.info(f"{val[1]}")
if isinstance(fp_new, io.IOBase):
fp_new.close()
def dump_database(cfg):
"""Dump data from test DB tables"""
app_log.debug(f"{inspect.stack()[0][3]}()")
find_key = ''
found_tables = []
# keep configuration for output path and files
# 'single' - only single file for whole output
# 'split' - separate file for each response
out_file = {}
if not os.path.isfile(cfg['db_filename']):
app_log.error(f"Missing DB file {cfg['db_filename']}")
return AFC_ERR
set_dump_db_opts = {
'wfa': [(TBL_REQS_NAME,), (TBL_RESPS_NAME,)],
'all': [(TBL_REQS_NAME,), (TBL_RESPS_NAME,)],
'req': [(TBL_REQS_NAME,)],
'resp': [(TBL_RESPS_NAME,)],
'ap': [('ap_config',)],
'cfg': [('afc_config',)],
'user': [('user_config',)]
}
prefix = {
'wfa': "AFCS",
'all': None
}
tbl = 'True'
if isinstance(cfg['table'], list):
tbl = cfg['table'][0]
con = sqlite3.connect(cfg['db_filename'])
cur = con.cursor()
if tbl in set_dump_db_opts:
# Dump only tables with requests and responses
found_tables.extend(set_dump_db_opts[tbl])
elif tbl == 'True':
# Dump all tables if no options provided
cur.execute(f"SELECT name FROM sqlite_master WHERE type='table';")
found_tables = cur.fetchall()
pref = None
if tbl == 'wfa' or tbl == 'all':
if tbl in prefix:
pref = prefix[tbl]
out_file['split'] = './'
if not isinstance(cfg['outpath'], type(None)):
out_file['split'] = cfg['outpath'][0] + '/'
out_file['split'] += WFA_TEST_DIR
if os.path.exists(out_file['split']):
shutil.rmtree(out_file['split'])
os.mkdir(out_file['split'])
elif isinstance(cfg['outfile'], type(None)):
app_log.error(f"Missing output filename.\n")
return AFC_ERR
else:
out_file['single'] = cfg['outfile'][0]
for tbl in enumerate(found_tables):
app_log.debug(f"Dump {tbl} to {out_file}")
dump_table(cur, tbl[1][0], out_file, pref)
con.close()
return AFC_OK
def export_admin_config(cfg):
"""Export admin server configuration"""
app_log.debug('%s()', inspect.stack()[0][3])
con = sqlite3.connect(cfg['db_filename'])
cur = con.cursor()
cur.execute('SELECT COUNT(*) FROM ' + TBL_AP_CFG_NAME)
found_rcds = cur.fetchall()
with open(cfg['outfile'][0], 'w') as fp_exp:
cur.execute('SELECT * FROM %s' % TBL_AFC_CFG_NAME)
found_cfg = cur.fetchall()
app_log.debug('Found AfcCfg: %s', found_cfg)
cur.execute('SELECT * FROM %s\n' % TBL_USERS_NAME)
found_user = cur.fetchall()
app_log.debug('Found Users: %s\n', found_user)
cur.execute('SELECT * FROM %s\n' % TBL_AP_CFG_NAME)
found_aps = cur.fetchall()
con.close()
aps = ''
idx = 0
for count, val in enumerate(found_aps):
aps += str(val[1]) + ','
app_log.debug('Found APs: %s\n', aps[:-1])
out_str = '{"afcAdminConfig":' + found_cfg[0][1] + ', '\
'"userConfig":' + found_user[0][1] + ', '\
'"apConfig":[' + aps[:-1] + ']}'
fp_exp.write(out_str)
app_log.info('Server admin config exported to %s', cfg['outfile'][0])
return AFC_OK
def dry_run_test(cfg):
"""Run one or more requests from provided file"""
if isinstance(cfg['infile'], type(None)):
app_log.error('Missing input file')
return AFC_ERR
filename = cfg['infile'][0]
app_log.debug('%s() %s', inspect.stack()[0][3], filename)
if not os.path.isfile(filename):
app_log.error('Missing raw test data file %s', filename)
return AFC_ERR
with open(filename, 'r') as fp_test:
while True:
dataline = fp_test.readline()
if not dataline:
break
app_log.info('Request:')
app_log.info(dataline)
# process dataline arguments
request_json, _ = process_jsonline(dataline)
resp = _send_recv(cfg, json.dumps(request_json))
# get request id from a request, response not always has it
# the request contains test category
new_req_json = json.loads(json.dumps(request_json).encode('utf-8'))
req_id = json_lookup('requestId', new_req_json, None)
resp_res = json_lookup('shortDescription', resp, None)
if (resp_res[0] != 'Success') \
and (req_id[0].lower().find('urs') == -1):
app_log.error('Failed in test response - %s', resp_res)
app_log.debug(resp)
break
app_log.info('Got response for the request')
app_log.info('Resp:')
app_log.info(resp)
app_log.info('\n\n')
json_lookup('availabilityExpireTime', resp, '0')
upd_data = json.dumps(resp, sort_keys=True)
hash_obj = hashlib.sha256(upd_data.encode('utf-8'))
return AFC_OK
def get_nbr_testcases(cfg):
""" Find APs count on DB table """
if not os.path.isfile(cfg['db_filename']):
print('INFO: Missing DB file %s', cfg['db_filename'])
return False
con = sqlite3.connect(cfg['db_filename'])
cur = con.cursor()
cur.execute('SELECT count("requestId") from ' + TBL_REQS_NAME)
found_data = cur.fetchall()
db_inquiry_count = found_data[0][0]
con.close()
app_log.debug("found %s ap lists from db table", db_inquiry_count)
return db_inquiry_count
def collect_tests2combine(sh, rows, t_ident, t2cmb, cmb_t):
"""
Lookup for combined test vectors,
build of required test vectors to combine
"""
app_log.debug('%s()\n', inspect.stack()[0][3])
for i in range(1, rows + 1):
cell = sh.cell(row=i, column=PURPOSE_CLM)
if ((cell.value is None) or
(AFC_TEST_IDENT.get(cell.value.lower()) is None) or
(cell.value == 'SRI')):
continue
if (t_ident != 'all') and (cell.value.lower() != t_ident):
continue
cell = sh.cell(row=i, column=COMBINED_CLM)
if cell.value is not None and \
cell.value.upper() != 'NO':
raw_list = str(cell.value)
test_case_id = sh.cell(row=i, column=UNIT_NAME_CLM).value
test_case_id += "."
test_case_id += sh.cell(row=i, column=PURPOSE_CLM).value
test_case_id += "."
test_case_id += str(sh.cell(row=i, column=TEST_VEC_CLM).value)
cmb_t[test_case_id] = []
for t in raw_list.split(','):
if '-' in t:
# found range of test vectors
left, right = t.split('-')
t2cmb_ident = ''
for r in AFC_TEST_IDENT:
if r in left.lower():
min = int(left.replace(r.upper(), ''))
max = int(right.replace(r.upper(), '')) + 1
t2cmb_ident = r.upper()
for cnt in range(min, max):
tcase = t2cmb_ident + str(cnt)
t2cmb[tcase] = ''
cmb_t[test_case_id] += [tcase]
else:
# found single test vector
t2cmb[t] = ''
cmb_t[test_case_id] += [t]
def _parse_tests_dev_desc(sheet, fp_new, rows):
app_log.debug('%s()\n', inspect.stack()[0][3])
for i in range(1, rows + 1):
res_str = ""
cell = sheet.cell(row=i, column=PURPOSE_CLM)
if ((cell.value is None) or
(AFC_TEST_IDENT.get(cell.value.lower()) is None) or
(cell.value == 'SRI')):
continue
# skip combined test vectors because device descriptor is missing
cell = sheet.cell(row=i, column=COMBINED_CLM)
if cell.value is not None and \
cell.value.upper() != 'NO':
continue
res_str += build_device_desc(
sheet.cell(row=i, column=INDOOR_DEPL_CLM).value,
sheet.cell(row=i, column=SER_NBR_CLM).value,
sheet.cell(row=i, column=RULESET_CLM).value,
sheet.cell(row=i, column=ID_CLM).value,
True)
fp_new.write(res_str + '\n')
return res_str
def _parse_tests_all(sheet, fp_new, rows, test_ident):
app_log.debug('%s()\n', inspect.stack()[0][3])
# collect tests to combine in next loop
tests2combine = dict()
# gather combined tests
combined_tests = dict()
collect_tests2combine(sheet, rows, test_ident,
tests2combine,
combined_tests)
if len(combined_tests):
app_log.info('Found combined test vectors: %s',
' '.join(combined_tests))
app_log.info('Found test vectors to combine: %s',
' '.join(tests2combine))
for i in range(1, rows + 1):
cell = sheet.cell(row=i, column=PURPOSE_CLM)
if ((cell.value is None) or
(AFC_TEST_IDENT.get(cell.value.lower()) is None) or
(cell.value == 'SRI')):
continue
if (test_ident != 'all') and (cell.value.lower() != test_ident):
continue
uut = sheet.cell(row=i, column=UNIT_NAME_CLM).value
purpose = sheet.cell(row=i, column=PURPOSE_CLM).value
test_vec = sheet.cell(row=i, column=TEST_VEC_CLM).value
test_case_id = uut + "." + purpose + "." + str(test_vec)
# Prepare request header '{"availableSpectrumInquiryRequests": [{'
res_str = REQ_INQUIRY_HEADER
# check if the test case is combined
cell = sheet.cell(row=i, column=COMBINED_CLM)
if cell.value is not None and \
cell.value.upper() != 'NO':
for item in combined_tests[test_case_id]:
res_str += tests2combine[item] + ','
res_str = res_str[:-1]
else:
#
# Inquired Channels
#
res_str += '{' + REQ_INQ_CHA_HEADER
cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_131)
res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value)
cell = sheet.cell(row=i, column=CHANNEL_CFI_131)
if cell.value is not None:
res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value)
res_str += '}, '
cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_132)
res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value)
cell = sheet.cell(row=i, column=CHANNEL_CFI_132)
if cell.value is not None:
res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value)
res_str += '}, '
cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_133)
res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value)
cell = sheet.cell(row=i, column=CHANNEL_CFI_133)
if cell.value is not None:
res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value)
res_str += '}, '
cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_134)
res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value)
cell = sheet.cell(row=i, column=CHANNEL_CFI_134)
if cell.value is not None:
res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value)
res_str += '}, '
cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_136)
res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value)
cell = sheet.cell(row=i, column=CHANNEL_CFI_136)
if cell.value is not None:
res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value)
res_str += '}, '
cell = sheet.cell(row=i, column=GLOBALOPERATINGCLASS_137)
res_str += '{' + REQ_INQ_CHA_GL_OPER_CLS + str(cell.value)
cell = sheet.cell(row=i, column=CHANNEL_CFI_137)
if cell.value is not None:
res_str += ', ' + REQ_INQ_CHA_CHANCFI + str(cell.value)
res_str += '}' + REQ_INQ_CHA_FOOTER + ' '
#
# Device descriptor
#
res_str += REQ_DEV_DESC_HEADER
res_str += build_device_desc(
sheet.cell(row=i, column=INDOOR_DEPL_CLM).value,
sheet.cell(row=i, column=SER_NBR_CLM).value,
sheet.cell(row=i, column=RULESET_CLM).value,
sheet.cell(row=i, column=ID_CLM).value,
False)
res_str += ','
#
# Inquired Frequency Range
#
res_str += REQ_INQ_FREQ_RANG_HEADER
freq_range = AfcFreqRange()
freq_range.set_range_limit(
sheet.cell(
row=i,
column=INQ_FREQ_RNG_LOWFREQ_A),
'low')
freq_range.set_range_limit(
sheet.cell(
row=i,
column=INQ_FREQ_RNG_HIGHFREQ_A),
'high')
try:
res_str += freq_range.append_range()
except IncompleteFreqRange as e:
app_log.debug(f"{e} - row {i}")
freq_range = AfcFreqRange()
freq_range.set_range_limit(
sheet.cell(
row=i,
column=INQ_FREQ_RNG_LOWFREQ_B),
'low')
freq_range.set_range_limit(
sheet.cell(
row=i,
column=INQ_FREQ_RNG_HIGHFREQ_B),
'high')
try:
tmp_str = freq_range.append_range()
res_str += ', ' + tmp_str
except IncompleteFreqRange as e:
app_log.debug(f"{e} - row {i}")
res_str += REQ_INQ_FREQ_RANG_FOOTER
cell = sheet.cell(row=i, column=MINDESIREDPOWER)
if (cell.value):
res_str += REQ_MIN_DESIRD_PWR + str(cell.value) + ', '
#
# Location
#
res_str += REQ_LOC_HEADER
cell = sheet.cell(row=i, column=INDOORDEPLOYMENT)
res_str += REQ_LOC_INDOORDEPL + str(cell.value) + ', '
# Location - elevation
res_str += REQ_LOC_ELEV_HEADER
cell = sheet.cell(row=i, column=ELE_VERTICALUNCERTAINTY)
if isinstance(cell.value, int):
res_str += REQ_LOC_VERT_UNCERT + str(cell.value) + ', '
cell = sheet.cell(row=i, column=ELE_HEIGHTTYPE)
res_str += REQ_LOC_HEIGHT_TYPE + '"' + str(cell.value) + '"'
cell = sheet.cell(row=i, column=ELE_HEIGHT)
if isinstance(cell.value, int) or isinstance(cell.value, float):
res_str += ', ' + REQ_LOC_HEIGHT + str(cell.value)
res_str += '}, '
# Location - uncertainty reqion
geo_coor = AfcGeoCoordinates(sheet, i)
try:
res_str += geo_coor.collect_coordinates()
except IncompleteGeoCoordinates as e:
app_log.debug(e)
res_str += REQ_LOC_FOOTER
cell = sheet.cell(row=i, column=REQ_ID_CLM)
if isinstance(cell.value, str):
req_id = cell.value
else:
req_id = ""
res_str += REQ_REQUEST_ID + '"' + req_id + '"'
res_str += '}'
# collect test vectors required for combining
# build test case id in format <purpose><test vector>
short_tcid = ''.join(test_case_id.split('.', 1)[1].split('.'))
if short_tcid in tests2combine.keys():
tests2combine[short_tcid] = res_str.split(
REQ_INQUIRY_HEADER)[1]
res_str += REQ_INQUIRY_FOOTER
cell = sheet.cell(row=i, column=VERSION_CLM)
res_str += REQ_VERSION + '"' + str(cell.value) + '"'
res_str += REQ_FOOTER
# adding metadata parameters
res_str += ', '
# adding test case id
res_str += META_HEADER
res_str += META_TESTCASE_ID + '"' + test_case_id + '"'
res_str += META_FOOTER
fp_new.write(res_str + '\n')
return AFC_OK
def parse_tests(cfg):
app_log.debug('%s()\n', inspect.stack()[0][3])
filename = ''
out_fname = ''
if isinstance(cfg['infile'], type(None)):
app_log.error('Missing input file')
return AFC_ERR
filename = cfg['infile'][0]
if not os.path.isfile(filename):
app_log.error('Missing raw test data file %s', filename)
return AFC_ERR
test_ident = cfg['test_id']
if isinstance(cfg['outfile'], type(None)):
out_fname = test_ident + NEW_AFC_TEST_SUFX
else:
out_fname = cfg['outfile'][0]
wb = oxl.load_workbook(filename, data_only=True)
for sh in wb:
app_log.debug('Sheet title: %s', sh.title)
app_log.debug('rows %d, cols %d\n', sh.max_row, sh.max_column)
# Look for request sheet
for s in range(len(wb.sheetnames)):
if wb.sheetnames[s] == "Availability Requests":
break
wb.active = s
sheet = wb.active
nbr_rows = sheet.max_row
app_log.debug('Rows range 1 - %d', nbr_rows + 1)
app_log.info('Export tests into file: %s', out_fname)
fp_new = open(out_fname, 'w')
# partial parse if only certain parameters required.
# only for device descriptor for now.
if cfg['dev_desc']:
res_str = _parse_tests_dev_desc(sheet, fp_new, nbr_rows)
fp_new.write(res_str + '\n')
else:
_parse_tests_all(sheet, fp_new, nbr_rows, test_ident)
fp_new.close()
return AFC_OK
def test_report(fname, runtimedata, testnumdata, testvectordata,
test_result, upd_data):
"""Procedure to generate AFC test results report.
Args:
runtimedata(str) : Tested case running time.
testnumdata(str): Tested case id.
testvectordata(int): Tested vector id.
test_result(str: Test result Pass is 0 or Fail is 1)
upd_data(list): List for test response data
Return:
Create test results file.
"""
ts_time = datetime.datetime.fromtimestamp(time.time()).\
strftime('%Y_%m_%d_%H%M%S')
# Test results output args
data_names = ['Date', 'Test Number', 'Test Vector', 'Running Time',
'Test Result', 'Response data']
data = {'Date': ts_time, 'Test Number': testnumdata,
'Test Vector': testvectordata, 'Running Time': runtimedata,
'Test Result': test_result, 'Response data': upd_data}
with open(fname, "a") as f:
file_writer = csv.DictWriter(f, fieldnames=data_names)
if os.stat(fname).st_size == 0:
file_writer.writeheader()
file_writer.writerow(data)
def _run_tests(cfg, reqs, resps, comparator, ids, test_cases):
"""
Run tests
reqs: {testcaseid: [request_json_str]}
resps: {testcaseid: [response_json_str, response_hash]}
comparator: reference to object
test_cases: [testcaseids]
"""
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() {test_cases} "
f"{cfg['url_path']}")
if not len(resps):
app_log.info(f"Unable to compare response data."
f"Suggest to make acquisition of responses.")
return AFC_ERR
all_test_res = AFC_OK
accum_secs = 0
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() "
f"{type(cfg['webui'])} ")
ssn = None
if cfg['webui'] is True:
ssn = requests.Session()
cfg['token'] = _send_recv_token(cfg, ssn)
for test_case in test_cases:
# Default reset test_res value
test_res = AFC_OK
req_id = ids[test_case][0]
app_log.info(f"Prepare to run test - {req_id}")
if test_case not in reqs:
app_log.warning(f"The requested test case {test_case} is "
f"invalid/not available in database")
continue
request_data = reqs[test_case][0]
app_log.debug(f"{inspect.stack()[0][3]}() {request_data}")
before_ts = time.monotonic()
resp = _send_recv(cfg, json.dumps(request_data), ssn)
tm_secs = time.monotonic() - before_ts
res = f"id {test_case} name {req_id} status $status time {tm_secs:.1f}"
res_template = Template(res)
if isinstance(resp, type(None)):
test_res = AFC_ERR
all_test_res = AFC_ERR
elif cfg['webui'] is True:
pass
else:
json_lookup('availabilityExpireTime', resp, '0')
upd_data = json.dumps(resp, sort_keys=True)
diffs = []
hash_obj = hashlib.sha256(upd_data.encode('utf-8'))
diffs = comparator.compare_results(ref_str=resps[test_case][0],
result_str=upd_data)
if (resps[test_case][1] == hash_obj.hexdigest()) \
if cfg['precision'] is None else (not diffs):
res = res_template.substitute(status="Ok")
else:
for line in diffs:
app_log.error(f" Difference: {line}")
app_log.error(hash_obj.hexdigest())
test_res = AFC_ERR
if test_res == AFC_ERR:
res = res_template.substitute(status="Fail")
app_log.error(res)
all_test_res = AFC_ERR
accum_secs += tm_secs
app_log.info(res)
# For saving test results option
if not isinstance(cfg['outfile'], type(None)):
test_report(cfg['outfile'][0], float(tm_secs),
test_case, req_id,
("PASS" if test_res == AFC_OK else "FAIL"),
upd_data)
app_log.info(f"Total testcases runtime : {round(accum_secs, 1)} secs")
if not isinstance(cfg['outfile'], type(None)):
send_email(cfg)
return all_test_res
def prep_and_run_tests(cfg, reqs, resps, ids, test_cases):
"""
Run tests
reqs: {testcaseid: [request_json_str]}
resps: {testcaseid: [response_json_str, response_hash]}
test_cases: [testcaseids]
"""
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()")
test_res = AFC_OK
results_comparator = TestResultComparator(precision=cfg['precision'] or 0)
# calculate max number of tests to run
max_nbr_tests = len(test_cases)
if not isinstance(cfg['tests2run'], type(None)):
max_nbr_tests = int("".join(cfg['tests2run']))
while (max_nbr_tests != 0):
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() "
f"Number of tests to run: {max_nbr_tests}")
if max_nbr_tests < len(test_cases):
del test_cases[-(len(test_cases) - max_nbr_tests):]
# if stress mode execute testcases in parallel and concurrent
if cfg['stress'] == 1:
app_log.debug(f"{inspect.stack()[0][3]}() max {max_nbr_tests}"
f" len {len(test_cases)}")
inputs = [(cfg, reqs, resps, results_comparator, ids, [test])
for test in test_cases]
with Pool(max_nbr_tests) as my_pool:
results = my_pool.starmap(_run_tests, inputs)
if not any(r == 0 for r in results):
test_res = AFC_ERR
else:
res = _run_tests(cfg, reqs, resps,
results_comparator, ids, test_cases)
if res != AFC_OK:
test_res = res
# when required to run more tests than there are testcases
# start run run from the beginning
max_nbr_tests -= len(test_cases)
return test_res
def _convert_reqs_n_resps_to_dict(cfg):
"""
Fetch test vectors and responses from the DB
Convert to dictionaries indexed by id or original index from table
Prepare list of new indexes
"""
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()")
if not os.path.isfile(cfg['db_filename']):
app_log.error('Missing DB file %s', cfg['db_filename'])
return AFC_ERR
con = sqlite3.connect(cfg['db_filename'])
cur = con.cursor()
cur.execute('SELECT * FROM %s' % TBL_REQS_NAME)
found_reqs = cur.fetchall()
cur.execute('SELECT * FROM %s' % TBL_RESPS_NAME)
found_resps = cur.fetchall()
con.close()
# reformat the reqs_dict and resp_dict accordingly
if not isinstance(cfg['testcase_ids'], type(None)):
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() by id")
reqs_dict = {
row[0]: [json.loads(row[1])]
for row in found_reqs
}
resp_dict = {
row[0]: [row[1], row[2]]
for row in found_resps
}
ids_dict = {
row[0]: [row[0], req_index + 1]
for req_index, row in enumerate(found_reqs)
}
test_indx = list(map(str.strip, cfg.pop("testcase_ids").split(',')))
cfg.pop("testcase_indexes")
else:
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() by index")
reqs_dict = {
str(req_index + 1): [json.loads(row[1])]
for req_index, row in enumerate(found_reqs)
}
resp_dict = {
str(resp_index + 1): [row[1], row[2]]
for resp_index, row in enumerate(found_resps)
}
ids_dict = {
str(req_index + 1): [row[0], req_index + 1]
for req_index, row in enumerate(found_reqs)
}
if not isinstance(cfg['testcase_indexes'], type(None)):
test_indx = list(map(str.strip,
cfg.pop("testcase_indexes").split(',')))
cfg.pop("testcase_ids")
else:
test_indx = [
str(item) for item in list(range(1, len(reqs_dict) + 1))
]
# build list of indexes, omitting non-existing elements
test_cases = list()
for i in range(0, len(test_indx)):
if reqs_dict.get(test_indx[i]) is None:
app_log.debug(f"Missing value for index {test_indx[i]}")
continue
test_cases.append(test_indx[i])
app_log.debug(f"{inspect.stack()[0][3]}() Final list of indexes. "
f"{test_cases}")
return reqs_dict, resp_dict, ids_dict, test_cases
def run_test(cfg):
"""Fetch test vectors from the DB and run tests"""
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() "
f"{cfg['tests']}, {cfg['url_path']}")
reqs_dict, resp_dict, ids, test_cases = _convert_reqs_n_resps_to_dict(cfg)
return prep_and_run_tests(cfg, reqs_dict, resp_dict, ids, test_cases)
def stress_run(cfg):
"""
Get test vectors from the database and run tests
in parallel
"""
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}() "
f"{cfg['tests']}, {cfg['url_path']}")
cfg['stress'] = 1
return run_test(cfg)
def _run_cert_tests(cfg):
"""
Run tests
"""
app_log.debug(
f"({os.getpid()}) {inspect.stack()[0][3]}() {cfg['url_path']}")
test_res = AFC_OK
try:
if isinstance(cfg['cli_cert'], type(None)):
rawresp = requests.get(cfg['url_path'],
verify="".join(cfg['ca_cert']))
else:
rawresp = requests.get(cfg['url_path'],
cert=("".join(cfg['cli_cert']),
"".join(cfg['cli_key'])),
verify="".join(cfg['ca_cert']))
except Exception as e:
app_log.error(f"({os.getpid()}) {inspect.stack()[0][3]}() {e}")
test_res = AFC_ERR
except OSError as os_err:
proc = psutil.Process()
app_log.error(f"({os.getpid()}) {inspect.stack()[0][3]}() "
f"{os_err} - {proc.open_files()}")
test_res = AFC_ERR
else:
if rawresp.status_code != 200:
test_res = AFC_ERR
return test_res
def run_cert_tests(cfg):
"""
Run certificate tests
"""
app_log.debug(f"({os.getpid()}) {inspect.stack()[0][3]}()")
test_res = AFC_OK
# calculate max number of tests to run
if isinstance(cfg['tests2run'], type(None)):
app_log.error(f"Missing number of tests to run.")
return AFC_ERR
max_nbr_tests = int("".join(cfg['tests2run']))
before_ts = time.monotonic()
while (max_nbr_tests != 0):
# if stress mode execute testcases in parallel and concurrent
inputs = [(cfg)]
with Pool(max_nbr_tests) as my_pool:
results = my_pool.map(_run_cert_tests, inputs)
if not any(r == 0 for r in results):
test_res = AFC_ERR
max_nbr_tests -= 1
tm_secs = time.monotonic() - before_ts
app_log.info(f"Tests {max_nbr_tests} done at {tm_secs:.3f} s")
return test_res
log_level_map = {
'debug': logging.DEBUG, # 10
'info': logging.INFO, # 20
'warn': logging.WARNING, # 30
'err': logging.ERROR, # 40
'crit': logging.CRITICAL, # 50
}
def set_log_level(opt):
app_log.setLevel(log_level_map[opt])
return log_level_map[opt]
def get_version(cfg):
"""Get AFC test utility version"""
app_log.info('AFC Test utility version %s', __version__)
app_log.info('AFC Test db hash %s', get_md5(AFC_TEST_DB_FILENAME))
def pre_hook(cfg):
"""Execute provided functionality prior to main command"""
app_log.debug(f"{inspect.stack()[0][3]}()")
return subprocess.call(cfg['prefix_cmd'])
def parse_run_test_args(cfg):
"""Parse arguments for command 'run_test'"""
app_log.debug(f"{inspect.stack()[0][3]}()")
if isinstance(cfg['addr'], list):
# check if provided required certification files
if (cfg['prot'] != AFC_PROT_NAME):
# update URL if not the default protocol
cfg['url_path'] = cfg['prot'] + '://'
cfg['url_path'] += str(cfg['addr'][0]) + ':' + str(cfg['port'])
cfg['base_url'] = cfg['url_path'] + '/'
if cfg['webui'] is False:
cfg['url_path'] += AFC_URL_SUFFIX + AFC_REQ_NAME
else:
cfg['url_token'] = cfg['url_path'] + AFC_WEBUI_URL_SUFFIX +\
AFC_WEBUI_TOKEN
cfg['url_path'] += AFC_URL_SUFFIX + AFC_WEBUI_REQ_NAME
return AFC_OK
def parse_run_cert_args(cfg):
"""Parse arguments for command 'run_cert'"""
app_log.debug(f"{inspect.stack()[0][3]}()")
if ((not isinstance(cfg['addr'], list)) or
isinstance(cfg['ca_cert'], type(None))):
app_log.error(f"{inspect.stack()[0][3]}() Missing arguments")
return AFC_ERR
cfg['url_path'] = cfg['prot'] + '://' + str(cfg['addr'][0]) +\
':' + str(cfg['port']) + '/'
return AFC_OK
# available commands to execute in alphabetical order
execution_map = {
'add_reqs': [add_reqs, parse_run_test_args],
'ins_reqs': [insert_reqs, parse_run_test_args],
'ext_reqs': [extend_reqs, parse_run_test_args],
'ins_devs': [insert_devs, parse_run_test_args],
'cmp_cfg': [compare_afc_config, parse_run_test_args],
'dry_run': [dry_run_test, parse_run_test_args],
'dump_db': [dump_database, parse_run_test_args],
'get_nbr_testcases': [get_nbr_testcases, parse_run_test_args],
'exp_adm_cfg': [export_admin_config, parse_run_test_args],
'parse_tests': [parse_tests, parse_run_test_args],
'reacq': [start_acquisition, parse_run_test_args],
'run': [run_test, parse_run_test_args],
'run_cert': [run_cert_tests, parse_run_cert_args],
'stress': [stress_run, parse_run_test_args],
'ver': [get_version, parse_run_test_args],
}
def make_arg_parser():
"""Define command line options"""
args_parser = argparse.ArgumentParser(
epilog=__doc__.strip(),
formatter_class=argparse.RawTextHelpFormatter)
args_parser.add_argument('--addr', nargs=1, type=str,
help="<address> - set AFC Server address.\n")
args_parser.add_argument('--prot', nargs='?', choices=['https', 'http'],
default='https',
help="<http|https> - set connection protocol "
"(default=https).\n")
args_parser.add_argument('--port', nargs='?', default='443',
type=int,
help="<port> - set connection port "
"(default=443).\n")
args_parser.add_argument('--conn_type', nargs='?',
choices=['sync', 'async'], default='sync',
help="<sync|async> - set connection to be "
"synchronous or asynchronous (default=sync).\n")
args_parser.add_argument('--conn_tm', nargs='?', default=None, type=int,
help="<secs> - set timeout for asynchronous "
"connection (default=None). \n")
args_parser.add_argument('--verif', action='store_true',
help="<verif> - skip SSL verification "
"on post request.\n")
args_parser.add_argument('--outfile', nargs=1, type=str,
help="<filename> - set filename for output "
"of tests results.\n")
args_parser.add_argument(
'--outpath',
nargs=1,
type=str,
help="<filepath> - set path to output filename for "
"results output.\n")
args_parser.add_argument('--infile', nargs=1, type=str,
help="<filename> - set filename as a source "
"for test requests.\n")
args_parser.add_argument('--debug', action='store_true',
help="during a request make files "
"with details for debugging.\n")
args_parser.add_argument('--elaborated_debug', action='store_true',
help="during a request make files "
"with even more details for debugging.\n")
args_parser.add_argument('--gui', action='store_true',
help="during a request make files "
"with details for debugging.\n")
args_parser.add_argument('--webui', action='store_true',
help="during a request make files\n")
args_parser.add_argument('--log', type=set_log_level,
default='info', dest='log_level',
help="<info|debug|warn|err|crit> - set "
"logging level (default=info).\n")
args_parser.add_argument('--testcase_indexes', nargs='?',
help="<N1,...> - set single or group of tests "
"to run.\n")
args_parser.add_argument(
'--testcase_ids',
nargs='?',
help="<N1,...> - set single or group of test case ids "
"to run.\n")
args_parser.add_argument('--table', nargs=1, type=str,
help="<wfa|req|resp|ap|cfg|user> - set "
"database table name.\n")
args_parser.add_argument('--idx', nargs='?',
type=int,
help="<index> - set table record index.\n")
args_parser.add_argument('--test_id',
default='all',
help="WFA test identifier, for example "
"srs, urs, fsp, ibp, sip, etc (default=all).\n")
args_parser.add_argument(
"--precision",
metavar="PRECISION_DB",
type=float,
help="Maximum allowed deviation of power limits from "
"reference values in dB. 0 means exact match is "
"required. Default is to use hash-based exact match "
"comparison")
args_parser.add_argument('--cache', action='store_true',
help="enable cache during a request, otherwise "
"disabled.\n")
args_parser.add_argument(
'--tests2run',
nargs=1,
type=str,
help="<nbr> - the total number of tests to run.\n")
args_parser.add_argument(
'--ca_cert',
nargs=1,
type=str,
help="<filename> - set CA certificate filename to "
"verify the remote server.\n")
args_parser.add_argument(
'--cli_cert',
nargs=1,
type=str,
help="<filename> - set client certificate filename.\n")
args_parser.add_argument(
'--cli_key',
nargs=1,
type=str,
help="<filename> - set client private key filename.\n")
args_parser.add_argument('--dev_desc', action='store_true',
help="parse only device descriptors values.\n")
args_parser.add_argument(
'--prefix_cmd',
nargs='*',
type=str,
help="hook to call currently provided command before "
"main command specified by --cmd option.\n")
args_parser.add_argument('--email_from', type=str,
help="<email address> - set sender email.\n")
args_parser.add_argument('--email_to', type=str,
help="<email address> - set receiver email.\n")
args_parser.add_argument(
'--email_cc',
type=str,
help="<email address> - set receiver of cc email.\n")
args_parser.add_argument('--email_pwd', type=str,
help="<filename> - set sender email password.\n")
args_parser.add_argument(
'--cmd',
choices=execution_map.keys(),
nargs='?',
help="run - run test from DB and compare.\n"
"dry_run - run test from file and show response.\n"
"exp_adm_cfg - export admin config into a file.\n"
"add_reqs - run test from provided file and insert with response into "
"the databsse.\n"
"ins_reqs - insert test vectors from provided file into the test db.\n"
"ins_devs - insert device descriptors from provided file "
"into the test db.\n"
"dump_db - dump tables from the database.\n"
"get_nbr_testcases - return number of testcases.\n"
"parse_tests - parse WFA provided tests into a files.\n"
"reacq - reacquision every test from the database and insert new "
"responses.\n"
"cmp_cfg - compare AFC config from the DB to provided from a file.\n"
"stress - run tests in stress mode.\n"
"ver - get version.\n")
return args_parser
def prepare_args(parser, cfg):
"""Prepare required parameters"""
app_log.debug(f"{inspect.stack()[0][3]}() {parser.parse_args()}")
cfg.update(vars(parser.parse_args()))
# check if test indexes and test ids are given
if cfg["testcase_indexes"] and cfg["testcase_ids"]:
# reject the request
app_log.error('Please use either "--testcase_indexes"'
' or "--testcase_ids" but not both')
return AFC_ERR
return execution_map[cfg['cmd']][1](cfg)
def main_execution(cfg):
"""Call all requested commands"""
app_log.debug(f"{inspect.stack()[0][3]}()")
if (isinstance(cfg['prefix_cmd'], list) and
(pre_hook(cfg) == AFC_ERR)):
return AFC_ERR
if isinstance(cfg['cmd'], type(None)):
parser.print_help()
return AFC_ERR
return execution_map[cfg['cmd']][0](cfg)
def main():
"""Main function of the utility"""
app_log.setLevel(logging.INFO)
console_log = logging.StreamHandler()
console_log.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
app_log.addHandler(console_log)
res = AFC_OK
parser = make_arg_parser()
test_cfg = TestCfg()
if prepare_args(parser, test_cfg) == AFC_ERR:
# error in preparing arguments
res = AFC_ERR
else:
res = main_execution(test_cfg)
sys.exit(res)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
sys.exit(1)
# Local Variables:
# mode: Python
# indent-tabs-mode: nil
# python-indent: 4
# End:
#
# vim: sw=4:et:tw=80:cc=+1