mirror of
https://github.com/Telecominfraproject/OpenCellular.git
synced 2025-12-30 10:31:02 +00:00
This patch makes cts.py call reset_halt and resume instead of calling send_openocd_commands directly. BUG=none BRANCH=none TEST=Run util/run_ects.py. Change-Id: I179fb73d41842b927fda81c153848887bb2dff57 Signed-off-by: Daisuke Nojiri <dnojiri@chromium.org> Reviewed-on: https://chromium-review.googlesource.com/553581 Reviewed-by: Randall Spangler <rspangler@chromium.org>
437 lines
14 KiB
Python
Executable File
437 lines
14 KiB
Python
Executable File
#!/usr/bin/python
|
|
#
|
|
# Copyright 2016 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
# A script which builds, flashes, and runs EC CTS
|
|
#
|
|
# Software prerequisites:
|
|
# - openocd version 0.10 or above
|
|
# - lsusb
|
|
# - udevadm
|
|
#
|
|
# To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables
|
|
# to the host and execute the script:
|
|
# $ ./cts.py
|
|
# It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR.
|
|
|
|
|
|
import argparse
|
|
import os
|
|
import shutil
|
|
import time
|
|
import common.board as board
|
|
|
|
|
|
CTS_RC_PREFIX = 'CTS_RC_'
|
|
DEFAULT_TH = 'stm32l476g-eval'
|
|
DEFAULT_DUT = 'nucleo-f072rb'
|
|
MAX_SUITE_TIME_SEC = 5
|
|
CTS_TEST_RESULT_DIR = '/tmp/ects'
|
|
|
|
# Host only return codes. Make sure they match values in cts.rc
|
|
CTS_RC_DID_NOT_START = -1 # test did not run.
|
|
CTS_RC_DID_NOT_END = -2 # test did not run.
|
|
CTS_RC_DUPLICATE_RUN = -3 # test was run multiple times.
|
|
CTS_RC_INVALID_RETURN_CODE = -4 # failed to parse return code
|
|
|
|
|
|
class Cts(object):
|
|
"""Class that represents a eCTS run.
|
|
|
|
Attributes:
|
|
dut: DeviceUnderTest object representing DUT
|
|
th: TestHarness object representing a test harness
|
|
module: Name of module to build/run tests for
|
|
testlist: List of strings of test names contained in given module
|
|
return_codes: Dict of strings of return codes, with a code's integer
|
|
value being the index for the corresponding string representation
|
|
"""
|
|
|
|
def __init__(self, ec_dir, th, dut, module):
|
|
"""Initializes cts class object with given arguments.
|
|
|
|
Args:
|
|
ec_dir: Path to ec directory
|
|
th: Name of the test harness board
|
|
dut: Name of the device under test board
|
|
module: Name of module to build/run tests for (e.g. gpio, interrupt)
|
|
"""
|
|
self.results_dir = os.path.join(CTS_TEST_RESULT_DIR, dut, module)
|
|
if os.path.isdir(self.results_dir):
|
|
shutil.rmtree(self.results_dir)
|
|
else:
|
|
os.makedirs(self.results_dir)
|
|
self.ec_dir = ec_dir
|
|
self.module = module
|
|
serial_path = os.path.join(CTS_TEST_RESULT_DIR, 'th_serial')
|
|
self.th = board.TestHarness(th, module, self.results_dir, serial_path)
|
|
self.dut = board.DeviceUnderTest(dut, self.th, module, self.results_dir)
|
|
cts_dir = os.path.join(self.ec_dir, 'cts')
|
|
testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist')
|
|
return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc')
|
|
self.get_return_codes(return_codes_path)
|
|
self.testlist = self.get_macro_args(testlist_path, 'CTS_TEST')
|
|
|
|
def build(self):
|
|
"""Build images for DUT and TH."""
|
|
print 'Building DUT image...'
|
|
if not self.dut.build(self.ec_dir):
|
|
raise RuntimeError('Building module %s for DUT failed' % (self.module))
|
|
print 'Building TH image...'
|
|
if not self.th.build(self.ec_dir):
|
|
raise RuntimeError('Building module %s for TH failed' % (self.module))
|
|
|
|
def flash_boards(self):
|
|
"""Flashes TH and DUT with their most recently built ec.bin."""
|
|
cts_module = 'cts_' + self.module
|
|
image_path = os.path.join('build', self.th.board, cts_module, 'ec.bin')
|
|
self.identify_boards()
|
|
print 'Flashing TH with', image_path
|
|
if not self.th.flash(image_path):
|
|
raise RuntimeError('Flashing TH failed')
|
|
image_path = os.path.join('build', self.dut.board, cts_module, 'ec.bin')
|
|
print 'Flashing DUT with', image_path
|
|
if not self.dut.flash(image_path):
|
|
raise RuntimeError('Flashing DUT failed')
|
|
|
|
def setup(self):
|
|
"""Setup boards."""
|
|
self.th.save_serial()
|
|
|
|
def identify_boards(self):
|
|
"""Updates serials of TH and DUT in that order (order matters)."""
|
|
self.th.get_serial()
|
|
self.dut.get_serial()
|
|
|
|
def get_macro_args(self, filepath, macro):
|
|
"""Get list of args of a macro in a file when macro.
|
|
|
|
Args:
|
|
filepath: String containing absolute path to the file
|
|
macro: String containing text of macro to get args of
|
|
|
|
Returns:
|
|
List of dictionaries where each entry is:
|
|
'name': Test name,
|
|
'th_string': Expected string from TH,
|
|
'dut_string': Expected string from DUT,
|
|
"""
|
|
tests = []
|
|
with open(filepath, 'r') as f:
|
|
lines = f.readlines()
|
|
joined = ''.join(lines).replace('\\\n', '').splitlines()
|
|
for l in joined:
|
|
if not l.strip().startswith(macro):
|
|
continue
|
|
d = {}
|
|
l = l.strip()[len(macro):]
|
|
l = l.strip('()').split(',')
|
|
d['name'] = l[0].strip()
|
|
d['th_rc'] = self.get_return_code_value(l[1].strip().strip('"'))
|
|
d['th_string'] = l[2].strip().strip('"')
|
|
d['dut_rc'] = self.get_return_code_value(l[3].strip().strip('"'))
|
|
d['dut_string'] = l[4].strip().strip('"')
|
|
tests.append(d)
|
|
return tests
|
|
|
|
def get_return_codes(self, filepath):
|
|
"""Read return code names from the return code definition file."""
|
|
self.return_codes = {}
|
|
val = 0
|
|
with open(filepath, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line.startswith(CTS_RC_PREFIX):
|
|
continue
|
|
line = line.split(',')[0]
|
|
if '=' in line:
|
|
tokens = line.split('=')
|
|
line = tokens[0].strip()
|
|
val = int(tokens[1].strip())
|
|
self.return_codes[line] = val
|
|
val += 1
|
|
|
|
def parse_output(self, output):
|
|
"""Parse console output from DUT or TH.
|
|
|
|
Args:
|
|
output: String containing consoule output
|
|
|
|
Returns:
|
|
List of dictionaries where each key and value are:
|
|
name = 'ects_test_x',
|
|
started = True/False,
|
|
ended = True/False,
|
|
rc = CTS_RC_*,
|
|
output = All text between 'ects_test_x start' and 'ects_test_x end'
|
|
"""
|
|
results = []
|
|
i = 0
|
|
for test in self.testlist:
|
|
results.append({})
|
|
results[i]['name'] = test['name']
|
|
results[i]['started'] = False
|
|
results[i]['rc'] = CTS_RC_DID_NOT_START
|
|
results[i]['string'] = False
|
|
results[i]['output'] = []
|
|
i += 1
|
|
|
|
i = 0
|
|
for ln in [ln.strip() for ln in output.split('\n')]:
|
|
if i + 1 > len(results):
|
|
break
|
|
tokens = ln.split()
|
|
if len(tokens) >= 2:
|
|
if tokens[0].strip() == results[i]['name']:
|
|
if tokens[1].strip() == 'start':
|
|
# start line found
|
|
if results[i]['started']: # Already started
|
|
results[i]['rc'] = CTS_RC_DUPLICATE_RUN
|
|
else:
|
|
results[i]['rc'] = CTS_RC_DID_NOT_END
|
|
results[i]['started'] = True
|
|
continue
|
|
elif results[i]['started'] and tokens[1].strip() == 'end':
|
|
# end line found
|
|
results[i]['rc'] = CTS_RC_INVALID_RETURN_CODE
|
|
if len(tokens) == 3:
|
|
try:
|
|
results[i]['rc'] = int(tokens[2].strip())
|
|
except ValueError:
|
|
pass
|
|
# Since index is incremented when 'end' is encountered, we don't
|
|
# need to check duplicate 'end'.
|
|
i += 1
|
|
continue
|
|
if results[i]['started']:
|
|
results[i]['output'].append(ln)
|
|
|
|
return results
|
|
|
|
def get_return_code_name(self, code, strip_prefix=False):
|
|
name = ''
|
|
for k, v in self.return_codes.iteritems():
|
|
if v == code:
|
|
if strip_prefix:
|
|
name = k[len(CTS_RC_PREFIX):]
|
|
else:
|
|
name = k
|
|
return name
|
|
|
|
def get_return_code_value(self, name):
|
|
if name:
|
|
return self.return_codes[name]
|
|
return 0
|
|
|
|
def evaluate_run(self, dut_output, th_output):
|
|
"""Parse outputs to derive test results.
|
|
|
|
Args:
|
|
dut_output: String output of DUT
|
|
th_output: String output of TH
|
|
|
|
Returns:
|
|
th_results: list of test results for TH
|
|
dut_results: list of test results for DUT
|
|
"""
|
|
th_results = self.parse_output(th_output)
|
|
dut_results = self.parse_output(dut_output)
|
|
|
|
# Search for expected string in each output
|
|
for i, v in enumerate(self.testlist):
|
|
if v['th_string'] in th_results[i]['output'] or not v['th_string']:
|
|
th_results[i]['string'] = True
|
|
if v['dut_string'] in dut_results[i]['output'] or not v['dut_string']:
|
|
dut_results[i]['string'] = True
|
|
|
|
return th_results, dut_results
|
|
|
|
def print_result(self, th_results, dut_results):
|
|
"""Print results to the screen.
|
|
|
|
Args:
|
|
th_results: list of test results for TH
|
|
dut_results: list of test results for DUT
|
|
"""
|
|
len_test_name = max(len(s['name']) for s in self.testlist)
|
|
len_code_name = max(len(self.get_return_code_name(v, True))
|
|
for v in self.return_codes.values())
|
|
|
|
head = '{:^' + str(len_test_name) + '} '
|
|
head += '{:^' + str(len_code_name) + '} '
|
|
head += '{:^' + str(len_code_name) + '}'
|
|
head += '{:^' + str(len(' TH_STR')) + '}'
|
|
head += '{:^' + str(len(' DUT_STR')) + '}'
|
|
head += '{:^' + str(len(' RESULT')) + '}\n'
|
|
fmt = '{:' + str(len_test_name) + '} '
|
|
fmt += '{:>' + str(len_code_name) + '} '
|
|
fmt += '{:>' + str(len_code_name) + '}'
|
|
fmt += '{:>' + str(len(' TH_STR')) + '}'
|
|
fmt += '{:>' + str(len(' DUT_STR')) + '}'
|
|
fmt += '{:>' + str(len(' RESULT')) + '}\n'
|
|
|
|
self.formatted_results = head.format(
|
|
'TEST NAME', 'TH_RC', 'DUT_RC',
|
|
' TH_STR', ' DUT_STR', ' RESULT')
|
|
for i, d in enumerate(dut_results):
|
|
th_cn = self.get_return_code_name(th_results[i]['rc'], True)
|
|
dut_cn = self.get_return_code_name(dut_results[i]['rc'], True)
|
|
th_res = self.evaluate_result(th_results[i],
|
|
self.testlist[i]['th_rc'],
|
|
self.testlist[i]['th_string'])
|
|
dut_res = self.evaluate_result(dut_results[i],
|
|
self.testlist[i]['dut_rc'],
|
|
self.testlist[i]['dut_string'])
|
|
self.formatted_results += fmt.format(
|
|
d['name'], th_cn, dut_cn,
|
|
'YES' if th_results[i]['string'] else 'NO',
|
|
'YES' if dut_results[i]['string'] else 'NO',
|
|
'PASS' if th_res and dut_res else 'FAIL')
|
|
|
|
def evaluate_result(self, result, expected_rc, expected_string):
|
|
if result['rc'] != expected_rc:
|
|
return False
|
|
if expected_string and expected_string not in result['output']:
|
|
return False
|
|
return True
|
|
|
|
def run(self):
|
|
"""Resets boards, records test results in results dir."""
|
|
print 'Reading serials...'
|
|
self.identify_boards()
|
|
print 'Opening DUT tty...'
|
|
self.dut.setup_tty()
|
|
print 'Opening TH tty...'
|
|
self.th.setup_tty()
|
|
|
|
# Boards might be still writing to tty. Wait a few seconds before flashing.
|
|
time.sleep(3)
|
|
|
|
# clear buffers
|
|
print 'Clearing DUT tty...'
|
|
self.dut.read_tty()
|
|
print 'Clearing TH tty...'
|
|
self.th.read_tty()
|
|
|
|
# Resets the boards and allows them to run tests
|
|
# Due to current (7/27/16) version of sync function,
|
|
# both boards must be rest and halted, with the th
|
|
# resuming first, in order for the test suite to run in sync
|
|
print 'Halting TH...'
|
|
if not self.th.reset_halt():
|
|
raise RuntimeError('Failed to halt TH')
|
|
print 'Halting DUT...'
|
|
if not self.dut.reset_halt():
|
|
raise RuntimeError('Failed to halt DUT')
|
|
print 'Resuming TH...'
|
|
if not self.th.resume():
|
|
raise RuntimeError('Failed to resume TH')
|
|
print 'Resuming DUT...'
|
|
if not self.dut.resume():
|
|
raise RuntimeError('Failed to resume DUT')
|
|
|
|
time.sleep(MAX_SUITE_TIME_SEC)
|
|
|
|
print 'Reading DUT tty...'
|
|
dut_output, _ = self.dut.read_tty()
|
|
self.dut.close_tty()
|
|
print 'Reading TH tty...'
|
|
th_output, _ = self.th.read_tty()
|
|
self.th.close_tty()
|
|
|
|
print 'Halting TH...'
|
|
if not self.th.reset_halt():
|
|
raise RuntimeError('Failed to halt TH')
|
|
print 'Halting DUT...'
|
|
if not self.dut.reset_halt():
|
|
raise RuntimeError('Failed to halt DUT')
|
|
|
|
if not dut_output or not th_output:
|
|
raise ValueError('Output missing from boards. If you have a process '
|
|
'reading ttyACMx, please kill that process and try '
|
|
'again.')
|
|
|
|
print 'Pursing results...'
|
|
th_results, dut_results = self.evaluate_run(dut_output, th_output)
|
|
|
|
# Print out results
|
|
self.print_result(th_results, dut_results)
|
|
|
|
# Write results
|
|
dest = os.path.join(self.results_dir, 'results.log')
|
|
with open(dest, 'w') as fl:
|
|
fl.write(self.formatted_results)
|
|
|
|
# Write UART outputs
|
|
dest = os.path.join(self.results_dir, 'uart_th.log')
|
|
with open(dest, 'w') as fl:
|
|
fl.write(th_output)
|
|
dest = os.path.join(self.results_dir, 'uart_dut.log')
|
|
with open(dest, 'w') as fl:
|
|
fl.write(dut_output)
|
|
|
|
print self.formatted_results
|
|
|
|
# TODO(chromium:735652): Should set exit code for the shell
|
|
|
|
|
|
def main():
|
|
ec_dir = os.path.realpath(os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)), '..'))
|
|
os.chdir(ec_dir)
|
|
|
|
dut = DEFAULT_DUT
|
|
module = 'meta'
|
|
|
|
parser = argparse.ArgumentParser(description='Used to build/flash boards')
|
|
parser.add_argument('-d',
|
|
'--dut',
|
|
help='Specify DUT you want to build/flash')
|
|
parser.add_argument('-m',
|
|
'--module',
|
|
help='Specify module you want to build/flash')
|
|
parser.add_argument('-s',
|
|
'--setup',
|
|
action='store_true',
|
|
help='Connect only the TH to save its serial')
|
|
parser.add_argument('-b',
|
|
'--build',
|
|
action='store_true',
|
|
help='Build test suite (no flashing)')
|
|
parser.add_argument('-f',
|
|
'--flash',
|
|
action='store_true',
|
|
help='Flash boards with most recent images')
|
|
parser.add_argument('-r',
|
|
'--run',
|
|
action='store_true',
|
|
help='Run tests without flashing')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.module:
|
|
module = args.module
|
|
|
|
if args.dut:
|
|
dut = args.dut
|
|
|
|
cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module)
|
|
|
|
if args.setup:
|
|
cts.setup()
|
|
elif args.build:
|
|
cts.build()
|
|
elif args.flash:
|
|
cts.flash_boards()
|
|
elif args.run:
|
|
cts.run()
|
|
else:
|
|
cts.build()
|
|
cts.flash_boards()
|
|
cts.run()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|