Files
OpenCellular/cts/cts.py
Chris Chen 62af706a02 cts: Fixed bug with conflicting error codes
BRANCH=None
BUG=None
TEST=Manual
- Build and run meta tests, look for listed output

Change-Id: Idd46fb92791c1d0576be95f3e4cda8cdca66daef
Reviewed-on: https://chromium-review.googlesource.com/367401
Commit-Ready: Chris Chen <twothreecc@google.com>
Tested-by: Chris Chen <twothreecc@google.com>
Reviewed-by: Randall Spangler <rspangler@chromium.org>
2016-08-10 15:30:27 -07:00

720 lines
22 KiB
Python
Executable File

#!/usr/bin/python2
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# This file is a utility to quickly flash boards
import argparse
import collections
import fcntl
import os
import select
import subprocess as sp
import time
from copy import deepcopy
from abc import ABCMeta, abstractmethod
import xml.etree.ElementTree as et
CTS_CORRUPTED_CODE = -2 # The test didn't execute correctly
CTS_CONFLICTING_CODE = -1 # Error codes should never conflict
CTS_SUCCESS_CODE = 0
CTS_COLOR_RED = '#fb7d7d'
CTS_COLOR_GREEN = '#7dfb9f'
TH_BOARD = 'stm32l476g-eval'
OCD_SCRIPT_DIR = '/usr/local/share/openocd/scripts'
MAX_SUITE_TIME_SEC = 3
CTS_DEBUG_START = '[DEBUG]'
CTS_DEBUG_END = '[DEBUG_END]'
class Board(object):
"""Class representing a single board connected to a host machine
This class is abstract, subclasses must define the updateSerial()
method
Attributes:
board: String containing actual type of board, i.e. nucleo-f072rb
config: Directory of board config file relative to openocd's
scripts directory
hla_serial: String containing board's hla_serial number (if board
is an stm32 board)
tty_port: String that is the path to the tty port which board's
UART outputs to
_tty_descriptor: String of file descriptor for tty_port
"""
configs = {
'stm32l476g-eval': 'board/stm32l4discovery.cfg',
'nucleo-f072rb': 'board/st_nucleo_f0.cfg'
}
__metaclass__ = ABCMeta # This is an Abstract Base Class (ABC)
def __init__(self, board, hla_serial=None, flash_offset='0x08000000'):
"""Initializes a board object with given attributes
Args:
board: String containing board name
hla_serial: Serial number if board's adaptor is an HLA
"""
self.board = board
self.hla_serial = hla_serial
self.tty_port = None
self._tty_descriptor = None
self.flash_offset = flash_offset
@abstractmethod
def updateSerial(self):
"""Subclass should implement this"""
pass
def sendOpenOcdCommands(self, commands):
"""Send a command to the board via openocd
Args:
commands: A list of commands to send
"""
args = ['openocd', '-s', OCD_SCRIPT_DIR,
'-f', Board.configs[self.board], '-c', 'hla_serial ' + self.hla_serial]
for cmd in commands:
args += ['-c', cmd]
args += ['-c', 'shutdown']
sp.call(args)
def make(self, module, ec_dir, debug=False):
"""Builds test suite module for board
Args:
module: String of the test module you are building,
i.e. gpio, timer, etc.
ec_dir: String of the ec directory path
debug: True means compile in debug messages when building (may
affect test results)
"""
cmds = ['make',
'--directory=' + ec_dir,
'BOARD=' + self.board,
'CTS_MODULE=' + module,
'-j',
'-B']
if debug:
cmds.append('CTS_DEBUG=TRUE')
print 'EC directory is ' + ec_dir
print (
'Building module \'' + module + '\' for ' + self.board +
'with debug = ' + str(debug))
sp.call(cmds)
def flash(self):
"""Flashes board with most recent build ec.bin"""
flash_cmds = [
'reset_config connect_assert_srst',
'init',
'reset init',
'flash write_image erase build/' +
self.board +
'/ec.bin ' +
self.flash_offset,
'reset']
self.sendOpenOcdCommands(flash_cmds)
def toString(self):
s = ('Type: Board\n'
'board: ' + self.board + '\n'
'hla_serial: ' + self.hla_serial + '\n'
'config: ' + Board.configs[self.board] + '\n'
'tty_port ' + self.tty_port + '\n'
'_tty_descriptor: ' + str(self._tty_descriptor) + '\n')
return s
def reset(self):
"""Reset board (used when can't connect to TTY)"""
self.sendOpenOcdCommands(['init', 'reset init', 'resume'])
def setupForOutput(self):
"""Call this before trying to call readOutput for the first time.
This is not in the initialization because caller only should call
this function after serial numbers are setup
"""
self.updateSerial()
self.reset()
self._identifyTtyPort()
# In testing 3 retries is enough to reset board (2 can fail)
num_file_setup_retries = 3
# In testing, 10 seconds is sufficient to allow board to reconnect
reset_wait_time_seconds = 10
try:
self._getDevFileDescriptor()
# If board was just connected, must be reset to be read from
except (IOError, OSError):
for i in range(num_file_setup_retries):
self.reset()
time.sleep(reset_wait_time_seconds)
try:
self._getDevFileDescriptor()
break
except (IOError, OSError):
continue
if self._tty_descriptor is None:
raise ValueError('Unable to read ' + self.name + '\n'
'If you are running cat on a ttyACMx file,\n'
'please kill that process and try again')
def readAvailableBytes(self):
"""Read info from a serial port described by a file descriptor
Return:
Bytes that UART has output
"""
buf = []
while True:
if select.select([self._tty_descriptor], [], [], 1)[0]:
buf.append(os.read(self._tty_descriptor, 1))
else:
break
result = ''.join(buf)
return result
def _identifyTtyPort(self):
"""Saves this board's serial port"""
dev_dir = '/dev/'
id_prefix = 'ID_SERIAL_SHORT='
num_reset_tries = 3
reset_wait_time_s = 10
com_devices = [f for f in os.listdir(
dev_dir) if f.startswith('ttyACM')]
for i in range(num_reset_tries):
for device in com_devices:
self.tty_port = os.path.join(dev_dir, device)
properties = sp.check_output(['udevadm',
'info',
'-a',
'-n',
self.tty_port,
'--query=property'])
for line in [l.strip() for l in properties.split('\n')]:
if line.startswith(id_prefix):
if self.hla_serial == line[len(id_prefix):]:
return
if i != num_reset_tries - 1: # No need to reset the obard the last time
self.reset() # May need to reset to connect
time.sleep(reset_wait_time_s)
# If we get here without returning, something is wrong
raise RuntimeError('The device dev path could not be found')
def _getDevFileDescriptor(self):
"""Read available bytes from device dev path"""
fd = os.open(self.tty_port, os.O_RDONLY)
flag = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
self._tty_descriptor = fd
class TestHarness(Board):
"""Subclass of Board representing a Test Harness
Attributes:
serial_path: Path to file containing serial number
"""
def __init__(self, serial_path=None):
"""Initializes a board object with given attributes
Args:
serial_path: Path to file containing serial number
"""
Board.__init__(self, TH_BOARD)
self.serial_path = serial_path
def updateSerial(self):
"""Loads serial number from saved location"""
if self.hla_serial:
return # serial was already loaded
try:
with open(self.serial_path, mode='r') as ser_f:
self.hla_serial = ser_f.read()
return
except IOError:
msg = ('Your th hla_serial may not have been saved.\n'
'Connect only your th and run ./cts --setup, then try again.')
raise RuntimeError(msg)
def saveSerial(self):
"""Saves the th serial number to a file
Return: the serial number saved
"""
serial = Cts.getSerialNumbers()
if len(serial) != 1:
msg = ('TH could not be identified.\n'
'\nConnect your TH and remove other st-link devices')
raise RuntimeError(msg)
else:
ser = serial[0]
if not ser:
msg = ('Unable to save serial')
raise RuntimeError(msg)
if not os.path.exists(os.path.dirname(self.serial_path)):
os.makedirs(os.path.dirname(self.serial_path))
with open(self.serial_path, mode='w') as ser_f:
ser_f.write(ser)
self.hla_serial = ser
return ser
class DeviceUnderTest(Board):
"""Subclass of Board representing a DUT board
Attributes:
th: Reference to test harness board to which this DUT is attached
"""
def __init__(self, board, th, hla_ser=None, f_offset='0x08000000'):
"""Initializes a Device Under Test object with given attributes
Args:
board: String containing board name
th: Reference to test harness board to which this DUT is attached
hla_serial: Serial number if board uses an HLA adaptor
"""
Board.__init__(self, board, hla_serial=hla_ser, flash_offset=f_offset)
self.th = th
def updateSerial(self):
"""Stores the DUT's serial number.
Precondition: The DUT and TH must both be connected, and th.hla_serial
must hold the correct value (the th's serial #)
"""
if self.hla_serial != None:
return # serial was already set ('' is a valid serial)
serials = Cts.getSerialNumbers()
dut = [s for s in serials if self.th.hla_serial != s]
if len(dut) == 1:
self.hla_serial = dut[0]
return # Found your other st-link device serial!
else:
raise RuntimeError('Your TH serial number is incorrect, or your have'
' too many st-link devices attached.')
# If len(dut) is 0 then your dut doesn't use an st-link device, so we
# don't have to worry about its serial number
class Cts(object):
"""Class that represents a CTS testing setup and provides
interface to boards (building, flashing, etc.)
Attributes:
dut: DeviceUnderTest object representing dut
th: TestHarness object representing th
module: Name of module to build/run tests for
ec_directory: String containing path to EC top level directory
test_names: List of strings of test names contained in given module
test_results: Dictionary of results of each test from module, with
keys being test name strings and values being test result integers
return_codes: Dict of strings of return codes, with a code's integer
value being the index for the corresponding string representation
debug: Boolean that indicates whether or not on-board debug message
printing should be enabled when building.
debug_output: Dictionary mapping test name to an array contain debug
messages sent while it was running
"""
def __init__(self, ec_dir,
dut='nucleo-f072rb', module='gpio', debug=False):
"""Initializes cts class object with given arguments.
Args:
dut: Name of Device Under Test (DUT) board
ec_dir: String path to ec directory
dut: Name of board to use for DUT
module: Name of module to build/run tests for
debug: Boolean that indicates whether or not on-board debug message
printing should be enabled when building.
"""
self.results_dir = '/tmp/cts_results'
self.module = module
self.debug = debug
self.ec_directory = ec_dir
self.th = TestHarness()
self.dut = DeviceUnderTest(dut, self.th) # DUT constructor needs TH
th_ser_path = os.path.join(
self.ec_directory,
'build',
self.th.board,
'th_hla_serial')
self.module = module
testlist_path = os.path.join(
self.ec_directory,
'cts',
self.module,
'cts.testlist')
self.test_names = Cts._getMacroArgs(testlist_path, 'CTS_TEST')
self.debug_output = {}
for test in self.test_names:
self.debug_output[test] = []
self.th.serial_path = th_ser_path
return_codes_path = os.path.join(self.ec_directory,
'cts',
'common',
'cts.rc')
self.return_codes = dict(enumerate(Cts._getMacroArgs(
return_codes_path, 'CTS_RC_')))
self.return_codes[CTS_CONFLICTING_CODE] = 'RESULTS CONFLICT'
self.return_codes[CTS_CORRUPTED_CODE] = 'CORRUPTED'
self.test_results = collections.OrderedDict()
def make(self):
self.dut.make(self.module, self.ec_directory, self.debug)
self.th.make(self.module, self.ec_directory, self.debug)
def flashBoards(self):
"""Flashes th and dut boards with their most recently build ec.bin"""
self.updateSerials()
self.th.flash()
self.dut.flash()
def setup(self):
"""Saves th serial number if th only is connected.
Return:
Serial number that was saved
"""
return self.th.saveSerial()
def updateSerials(self):
"""Updates serials of both th and dut, in that order (order matters)"""
self.th.updateSerial()
self.dut.updateSerial()
def resetBoards(self):
"""Resets the boards and allows them to run tests
Due to current (7/27/16) version of sync function,
both boards must be rest and halted, with the th
resuming first, in order for the test suite to run
in sync
"""
self.updateSerials()
self.th.sendOpenOcdCommands(['init', 'reset halt'])
self.dut.sendOpenOcdCommands(['init', 'reset halt'])
self.th.sendOpenOcdCommands(['init', 'resume'])
self.dut.sendOpenOcdCommands(['init', 'resume'])
@staticmethod
def getSerialNumbers():
"""Gets serial numbers of all st-link v2.1 board attached to host
Returns:
List of serials
"""
usb_args = ['lsusb', '-v', '-d', '0x0483:0x374b']
usb_process = sp.Popen(usb_args, stdout=sp.PIPE, shell=False)
st_link_info = usb_process.communicate()[0]
st_serials = []
for line in st_link_info.split('\n'):
if 'iSerial' in line:
st_serials.append(line.split()[2].strip())
return st_serials
@staticmethod
def _getMacroArgs(filepath, macro):
"""Get list of args of a certain macro in a file when macro is used
by itself on a line
Args:
filepath: String containing absolute path to the file
macro: String containing text of macro to get args of
"""
args = []
with open(filepath, 'r') as fl:
for ln in [ln for ln in fl.readlines(
) if ln.strip().startswith(macro)]:
ln = ln.strip()[len(macro):]
args.append(ln.strip('()').replace(',', ''))
return args
def extractDebugOutput(self, output):
"""Append the debug messages from output to self.debug_output
Args:
output: String containing output from which to extract debug
messages
"""
lines = [ln.strip() for ln in output.split('\n')]
test_num = 0
i = 0
message_buf = []
while i < len(lines):
if test_num >= len(self.test_names):
break
if lines[i].strip() == CTS_DEBUG_START:
i += 1
msg = ''
while i < len(lines):
if lines[i] == CTS_DEBUG_END:
break
else:
msg += lines[i] + '\n'
i += 1
message_buf.append(msg)
else:
current_test = self.test_names[test_num]
if lines[i].strip().startswith(current_test):
self.debug_output[current_test] += message_buf
message_buf = []
test_num += 1
i += 1
def _parseOutput(self, r1, r2):
"""Parse the outputs of the DUT and TH together
Args;
r1: String output of one of the DUT or the TH (order does not matter)
r2: String output of one of the DUT or the TH (order does not matter)
"""
self.test_results.clear() # empty out any old results
first_corrupted_test = len(self.test_names)
self.extractDebugOutput(r1)
self.extractDebugOutput(r2)
for output_str in [r1, r2]:
test_num = 0
for ln in [ln.strip() for ln in output_str.split('\n')]:
tokens = ln.split()
if len(tokens) != 2:
continue
test = tokens[0].strip()
if test not in self.test_names:
continue
try:
return_code = int(tokens[1])
except ValueError: # Second token is not an int
continue
if test != self.test_names[test_num]:
first_corrupted_test = test_num
break # Results after this test are corrupted
elif self.test_results.get(
test,
CTS_SUCCESS_CODE) == CTS_SUCCESS_CODE:
self.test_results[test] = return_code
elif return_code == CTS_SUCCESS_CODE:
pass
elif return_code != self.test_results[test]:
self.test_results[test] = CTS_CONFLICTING_CODE
test_num += 1
if test_num != len(self.test_names): # If a suite didn't finish
first_corrupted_test = min(first_corrupted_test, test_num)
if first_corrupted_test < len(self.test_names):
for test in self.test_names[first_corrupted_test:]:
self.test_results[test] = CTS_CORRUPTED_CODE
def _resultsAsString(self):
"""Takes saved results and returns a duplicate of their dictionary
with the return codes replaces with their string representation
Returns:
dictionary with test name strings as keys and test result strings
as values
"""
result = deepcopy(self.test_results)
# Convert codes to strings
for test, code in result.items():
result[test] = self.return_codes.get(code, 'UNKNOWN %d' % code)
return result
def prettyResults(self):
"""Takes saved results and returns a string representation of them
Return: Dictionary similar to self.test_results, but with strings
instead of error codes
"""
res = self._resultsAsString()
t_long = max(len(s) for s in res.keys())
e_max_len = max(len(s) for s in res.values())
pretty_results = 'CTS Test Results for ' + self.module + ' module:\n'
for test, code in res.items():
align_str = '\n{0:<' + str(t_long) + \
'} {1:>' + str(e_max_len) + '}'
pretty_results += align_str.format(test, code)
return pretty_results
def resultsAsHtml(self):
res = self._resultsAsString()
root = et.Element('html')
head = et.SubElement(root, 'head')
style = et.SubElement(head, 'style')
style.text = ('table, td, th {border: 1px solid black;}'
'body {font-family: \"Lucida Console\", Monaco, monospace')
body = et.SubElement(root, 'body')
table = et.SubElement(body, 'table')
table.set('style','width:100%')
title_row = et.SubElement(table, 'tr')
test_name_title = et.SubElement(title_row, 'th')
test_name_title.text = 'Test Name'
test_name_title.set('style', 'white-space : nowrap')
test_results_title = et.SubElement(title_row, 'th')
test_results_title.text = 'Test Result'
test_results_title.set('style', 'white-space : nowrap')
test_debug_title = et.SubElement(title_row, 'th')
test_debug_title.text = 'Debug Output'
test_debug_title.set('style', 'width:99%')
for name, result in res.items():
row = et.SubElement(table, 'tr')
name_e = et.SubElement(row, 'td')
name_e.text = name
name_e.set('style', 'white-space : nowrap')
result_e = et.SubElement(row, 'td')
result_e.text = result
result_e.set('style', 'white-space : nowrap')
debug_e = et.SubElement(row, 'td')
debug_e.set('style', 'width:99%')
debug_e.set('style', 'white-space : pre-wrap')
if len(self.debug_output[name]) == 0:
debug_e.text = 'None'
else:
combined_message = ''
for msg in self.debug_output[name]:
combined_message += msg
combined_message = combined_message
debug_e.text = combined_message
if result == self.return_codes[CTS_SUCCESS_CODE]:
result_e.set('bgcolor', CTS_COLOR_GREEN)
else:
result_e.set('bgcolor', CTS_COLOR_RED)
return et.tostring(root, method='html')
def resetAndRecord(self):
"""Resets boards, records test results in results dir"""
self.updateSerials()
self.dut.setupForOutput()
self.th.setupForOutput()
self.dut.readAvailableBytes() # clear buffer
self.th.readAvailableBytes()
bad_cat_message = (
'Output missing from boards.\n'
'If you are running cat on a ttyACMx file,\n'
'please kill that process and try again'
)
self.resetBoards()
time.sleep(MAX_SUITE_TIME_SEC)
dut_results = self.dut.readAvailableBytes()
th_results = self.th.readAvailableBytes()
if not dut_results or not th_results:
raise ValueError(bad_cat_message)
self._parseOutput(dut_results, th_results)
pretty_results = self.prettyResults()
html_results = self.resultsAsHtml()
dest = os.path.join(
self.results_dir,
self.dut.board,
self.module + '.html'
)
if not os.path.exists(os.path.dirname(dest)):
os.makedirs(os.path.dirname(dest))
with open(dest, 'w') as fl:
fl.write(html_results)
print pretty_results
def main():
"""Main entry point for cts script from command line"""
ec_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')
os.chdir(ec_dir)
dut_board = 'nucleo-f072rb' # nucleo by default
module = 'gpio' # gpio by default
debug = False
parser = argparse.ArgumentParser(description='Used to build/flash boards')
parser.add_argument('-d',
'--dut',
help='Specify DUT you want to build/flash')
parser.add_argument('-m',
'--module',
help='Specify module you want to build/flash')
parser.add_argument('--debug',
action='store_true',
help='If building, build with debug printing enabled. This may'
'change test results')
parser.add_argument('-s',
'--setup',
action='store_true',
help='Connect only the th to save its serial')
parser.add_argument('-b',
'--build',
action='store_true',
help='Build test suite (no flashing)')
parser.add_argument('-f',
'--flash',
action='store_true',
help='Flash boards with most recent image and record results')
parser.add_argument('-r',
'--reset',
action='store_true',
help='Reset boards and save test results (no flashing)')
args = parser.parse_args()
if args.module:
module = args.module
if args.dut:
dut_board = args.dut
if args.debug:
debug = args.debug
cts_suite = Cts(ec_dir, module=module, dut=dut_board, debug=debug)
if args.setup:
serial = cts_suite.setup()
print 'Your th hla_serial # has been saved as: ' + serial
elif args.reset:
cts_suite.resetAndRecord()
elif args.build:
cts_suite.make()
elif args.flash:
cts_suite.flashBoards()
else:
cts_suite.make()
cts_suite.flashBoards()
if __name__ == "__main__":
main()