cts: Refactor cts.py

Noteworthy changes:
- Move Board and its child classes in common/board.py
- Separate flashing and resetting. Flashing used to imply running tests.
- Move up constants up for better visibility
- Change default suite to 'meta'
- Removed redundant code
- Lots of renames (all lower case names, shorter names, etc.)

BUG=none
BRANCH=none
TEST=Ran meta test and verify the results match the expectations

Change-Id: I158d96e2ee104767d25b2e721d5206e528600381
Reviewed-on: https://chromium-review.googlesource.com/383911
Commit-Ready: Daisuke Nojiri <dnojiri@chromium.org>
Tested-by: Daisuke Nojiri <dnojiri@chromium.org>
Reviewed-by: Randall Spangler <rspangler@chromium.org>
This commit is contained in:
Daisuke Nojiri
2016-09-08 15:28:39 -07:00
committed by chrome-bot
parent 91bd09c856
commit d6b0a1cc88
3 changed files with 433 additions and 431 deletions

0
cts/common/__init__.py Normal file
View File

319
cts/common/board.py Normal file
View File

@@ -0,0 +1,319 @@
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from abc import ABCMeta, abstractmethod
import fcntl
import os
import select
import subprocess as sp
import time
OCD_SCRIPT_DIR = '/usr/local/share/openocd/scripts'
OPENOCD_CONFIGS = {
'stm32l476g-eval': 'board/stm32l4discovery.cfg',
'nucleo-f072rb': 'board/st_nucleo_f0.cfg',
}
FLASH_OFFSETS = {
'stm32l476g-eval': '0x08000000',
'nucleo-f072rb': '0x08000000',
}
class Board(object):
"""Class representing a single board connected to a host machine
Attributes:
board: String containing actual type of board, i.e. nucleo-f072rb
config: Directory of board config file relative to openocd's
scripts directory
hla_serial: String containing board's hla_serial number (if board
is an stm32 board)
tty_port: String that is the path to the tty port which board's
UART outputs to
tty: String of file descriptor for tty_port
"""
__metaclass__ = ABCMeta # This is an Abstract Base Class (ABC)
def __init__(self, board, hla_serial=None):
"""Initializes a board object with given attributes
Args:
board: String containing board name
hla_serial: Serial number if board's adaptor is an HLA
"""
if not board in OPENOCD_CONFIGS:
msg = 'OpenOcd configuration not found for ' + board
raise RuntimeError(msg)
self.openocd_config = OPENOCD_CONFIGS[board]
if not board in FLASH_OFFSETS:
msg = 'Flash offset not found for ' + board
raise RuntimeError(msg)
self.flash_offset = FLASH_OFFSETS[board]
self.board = board
self.hla_serial = hla_serial
self.tty_port = None
self.tty = None
@staticmethod
def get_stlink_serials():
"""Gets serial numbers of all st-link v2.1 board attached to host
Returns:
List of serials
"""
usb_args = ['lsusb', '-v', '-d', '0x0483:0x374b']
usb_process = sp.Popen(usb_args, stdout=sp.PIPE, shell=False)
st_link_info = usb_process.communicate()[0]
st_serials = []
for line in st_link_info.split('\n'):
if not 'iSerial' in line:
continue
words = line.split()
if len(words) <= 2:
continue
st_serials.append(words[2].strip())
return st_serials
@abstractmethod
def get_serial(self):
"""Subclass should implement this"""
pass
def send_open_ocd_commands(self, commands):
"""Send a command to the board via openocd
Args:
commands: A list of commands to send
"""
args = ['openocd', '-s', OCD_SCRIPT_DIR,
'-f', self.openocd_config, '-c', 'hla_serial ' + self.hla_serial]
for cmd in commands:
args += ['-c', cmd]
args += ['-c', 'shutdown']
sp.call(args)
def build(self, module, ec_dir, debug=False):
"""Builds test suite module for board
Args:
module: String of the test module you are building,
i.e. gpio, timer, etc.
ec_dir: String of the ec directory path
debug: True means compile in debug messages when building (may
affect test results)
"""
cmds = ['make',
'--directory=' + ec_dir,
'BOARD=' + self.board,
'CTS_MODULE=' + module,
'-j',
'-B']
if debug:
cmds.append('CTS_DEBUG=TRUE')
print ' '.join(cmds)
sp.call(cmds)
def flash(self):
"""Flashes board with most recent build ec.bin"""
image_path = os.path.join('build', self.board, 'ec.bin')
cmd = ['reset_config connect_assert_srst',
'init',
'reset init',
'flash write_image erase %s %s' % (image_path, self.flash_offset)]
self.send_open_ocd_commands(cmd)
def to_string(self):
s = ('Type: Board\n'
'board: ' + self.board + '\n'
'hla_serial: ' + self.hla_serial + '\n'
'openocd_config: ' + self.openocd_config + '\n'
'tty_port: ' + self.tty_port + '\n'
'tty: ' + str(self.tty) + '\n')
return s
def reset(self):
"""Reset board (used when can't connect to TTY)"""
self.send_open_ocd_commands(['init', 'reset init', 'resume'])
def setup_tty(self):
"""Call this before trying to call readOutput for the first time.
This is not in the initialization because caller only should call
this function after serial numbers are setup
"""
self.get_serial()
self.reset()
self.identify_tty_port()
# In testing 3 retries is enough to reset board (2 can fail)
num_file_setup_retries = 3
# In testing, 10 seconds is sufficient to allow board to reconnect
reset_wait_time_seconds = 10
tty = None
try:
tty = self.open_tty()
# If board was just connected, must be reset to be read from
except (IOError, OSError):
for i in range(num_file_setup_retries):
self.reset()
time.sleep(reset_wait_time_seconds)
try:
tty = self.open_tty()
break
except (IOError, OSError):
continue
if not tty:
raise ValueError('Unable to read ' + self.name + '\n'
'If you are running cat on a ttyACMx file,\n'
'please kill that process and try again')
self.tty = tty
def read_tty(self):
"""Read info from a serial port described by a file descriptor
Return:
Bytes that UART has output
"""
buf = []
while True:
if select.select([self.tty], [], [], 1)[0]:
buf.append(os.read(self.tty, 1))
else:
break
result = ''.join(buf)
return result
def identify_tty_port(self):
"""Saves this board's serial port"""
dev_dir = '/dev'
id_prefix = 'ID_SERIAL_SHORT='
num_reset_tries = 3
reset_wait_time_s = 10
com_devices = [f for f in os.listdir(dev_dir) if f.startswith('ttyACM')]
for i in range(num_reset_tries):
for device in com_devices:
self.tty_port = os.path.join(dev_dir, device)
properties = sp.check_output(['udevadm',
'info',
'-a',
'-n',
self.tty_port,
'--query=property'])
for line in [l.strip() for l in properties.split('\n')]:
if line.startswith(id_prefix):
if self.hla_serial == line[len(id_prefix):]:
return
if i != num_reset_tries - 1: # No need to reset the board the last time
self.reset() # May need to reset to connect
time.sleep(reset_wait_time_s)
# If we get here without returning, something is wrong
raise RuntimeError('The device dev path could not be found')
def open_tty(self):
"""Read available bytes from device dev path"""
fd = os.open(self.tty_port, os.O_RDONLY)
flag = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
return fd
class TestHarness(Board):
"""Subclass of Board representing a Test Harness
Attributes:
serial_path: Path to file containing serial number
"""
def __init__(self, board, serial_path):
"""Initializes a board object with given attributes
Args:
serial_path: Path to file containing serial number
"""
Board.__init__(self, board=board)
self.serial_path = serial_path
def get_serial(self):
"""Loads serial number from saved location"""
if self.hla_serial:
return # serial was already loaded
try:
with open(self.serial_path, mode='r') as ser_f:
self.hla_serial = ser_f.read()
return
except IOError:
msg = ('Your TH board has not been identified.\n'
'Connect only TH and run the script --setup, then try again.')
raise RuntimeError(msg)
def save_serial(self):
"""Saves the TH serial number to a file"""
serials = Board.get_stlink_serials()
if len(serials) > 1:
msg = ('There are more than one test board connected to the host.'
'\nConnect only the test harness and remove other boards.')
raise RuntimeError(msg)
if len(serials) < 1:
msg = ('No test boards were found.'
'\nTry to run the script outside chroot.')
raise RuntimeError(msg)
serial = serials[0]
dir = os.path.dirname(self.serial_path)
if not os.path.exists(dir):
os.makedirs(dir)
with open(self.serial_path, mode='w') as ser_f:
ser_f.write(serial)
self.hla_serial = serial
print 'Your TH serial', serial, 'has been saved as', self.serial_path
return
class DeviceUnderTest(Board):
"""Subclass of Board representing a DUT board
Attributes:
th: Reference to test harness board to which this DUT is attached
"""
def __init__(self, board, th, hla_ser=None):
"""Initializes a Device Under Test object with given attributes
Args:
board: String containing board name
th: Reference to test harness board to which this DUT is attached
hla_serial: Serial number if board uses an HLA adaptor
"""
Board.__init__(self, board, hla_serial=hla_ser)
self.th = th
def get_serial(self):
"""Get serial number.
Precondition: The DUT and TH must both be connected, and th.hla_serial
must hold the correct value (the th's serial #)
"""
if self.hla_serial != None:
# serial was already set ('' is a valid serial)
return
serials = Board.get_stlink_serials()
dut = [s for s in serials if self.th.hla_serial != s]
# If len(dut) is 0 then your dut doesn't use an st-link device, so we
# don't have to worry about its serial number
if len(dut) != 1:
msg = ('Your TH serial number is incorrect, or your have'
' too many st-link devices attached.')
raise RuntimeError(msg)
# Found your other st-link device serial!
self.hla_serial = dut[0]
return

View File

@@ -3,306 +3,41 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# This file is a utility to quickly flash boards
# A script which builds, flashes, and runs EC CTS
#
# Software prerequisites:
# - openocd version 0.10 or above
# - lsusb
# - udevadm
#
# To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables
# to the host and execute the script:
# $ ./cts.py --debug
# It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR.
import argparse
import collections
import fcntl
import os
import select
import subprocess as sp
import time
import common.board as board
from copy import deepcopy
from abc import ABCMeta, abstractmethod
import xml.etree.ElementTree as et
from twisted.python.syslog import DEFAULT_FACILITY
CTS_CORRUPTED_CODE = -2 # The test didn't execute correctly
CTS_CONFLICTING_CODE = -1 # Error codes should never conflict
CTS_SUCCESS_CODE = 0
CTS_COLOR_RED = '#fb7d7d'
CTS_COLOR_GREEN = '#7dfb9f'
TH_BOARD = 'stm32l476g-eval'
OCD_SCRIPT_DIR = '/usr/local/share/openocd/scripts'
DEFAULT_TH = 'stm32l476g-eval'
DEFAULT_DUT = 'nucleo-f072rb'
MAX_SUITE_TIME_SEC = 3
CTS_DEBUG_START = '[DEBUG]'
CTS_DEBUG_END = '[DEBUG_END]'
CTS_TEST_RESULT_DIR = '/tmp/cts'
class Board(object):
"""Class representing a single board connected to a host machine
This class is abstract, subclasses must define the updateSerial()
method
Attributes:
board: String containing actual type of board, i.e. nucleo-f072rb
config: Directory of board config file relative to openocd's
scripts directory
hla_serial: String containing board's hla_serial number (if board
is an stm32 board)
tty_port: String that is the path to the tty port which board's
UART outputs to
_tty_descriptor: String of file descriptor for tty_port
"""
configs = {
'stm32l476g-eval': 'board/stm32l4discovery.cfg',
'nucleo-f072rb': 'board/st_nucleo_f0.cfg'
}
__metaclass__ = ABCMeta # This is an Abstract Base Class (ABC)
def __init__(self, board, hla_serial=None, flash_offset='0x08000000'):
"""Initializes a board object with given attributes
Args:
board: String containing board name
hla_serial: Serial number if board's adaptor is an HLA
"""
self.board = board
self.hla_serial = hla_serial
self.tty_port = None
self._tty_descriptor = None
self.flash_offset = flash_offset
@abstractmethod
def updateSerial(self):
"""Subclass should implement this"""
pass
def sendOpenOcdCommands(self, commands):
"""Send a command to the board via openocd
Args:
commands: A list of commands to send
"""
args = ['openocd', '-s', OCD_SCRIPT_DIR,
'-f', Board.configs[self.board], '-c', 'hla_serial ' + self.hla_serial]
for cmd in commands:
args += ['-c', cmd]
args += ['-c', 'shutdown']
sp.call(args)
def make(self, module, ec_dir, debug=False):
"""Builds test suite module for board
Args:
module: String of the test module you are building,
i.e. gpio, timer, etc.
ec_dir: String of the ec directory path
debug: True means compile in debug messages when building (may
affect test results)
"""
cmds = ['make',
'--directory=' + ec_dir,
'BOARD=' + self.board,
'CTS_MODULE=' + module,
'-j',
'-B']
if debug:
cmds.append('CTS_DEBUG=TRUE')
print 'EC directory is ' + ec_dir
print (
'Building module \'' + module + '\' for ' + self.board +
'with debug = ' + str(debug))
sp.call(cmds)
def flash(self):
"""Flashes board with most recent build ec.bin"""
flash_cmds = [
'reset_config connect_assert_srst',
'init',
'reset init',
'flash write_image erase build/' +
self.board +
'/ec.bin ' +
self.flash_offset,
'reset']
self.sendOpenOcdCommands(flash_cmds)
def toString(self):
s = ('Type: Board\n'
'board: ' + self.board + '\n'
'hla_serial: ' + self.hla_serial + '\n'
'config: ' + Board.configs[self.board] + '\n'
'tty_port ' + self.tty_port + '\n'
'_tty_descriptor: ' + str(self._tty_descriptor) + '\n')
return s
def reset(self):
"""Reset board (used when can't connect to TTY)"""
self.sendOpenOcdCommands(['init', 'reset init', 'resume'])
def setupForOutput(self):
"""Call this before trying to call readOutput for the first time.
This is not in the initialization because caller only should call
this function after serial numbers are setup
"""
self.updateSerial()
self.reset()
self._identifyTtyPort()
# In testing 3 retries is enough to reset board (2 can fail)
num_file_setup_retries = 3
# In testing, 10 seconds is sufficient to allow board to reconnect
reset_wait_time_seconds = 10
try:
self._getDevFileDescriptor()
# If board was just connected, must be reset to be read from
except (IOError, OSError):
for i in range(num_file_setup_retries):
self.reset()
time.sleep(reset_wait_time_seconds)
try:
self._getDevFileDescriptor()
break
except (IOError, OSError):
continue
if self._tty_descriptor is None:
raise ValueError('Unable to read ' + self.name + '\n'
'If you are running cat on a ttyACMx file,\n'
'please kill that process and try again')
def readAvailableBytes(self):
"""Read info from a serial port described by a file descriptor
Return:
Bytes that UART has output
"""
buf = []
while True:
if select.select([self._tty_descriptor], [], [], 1)[0]:
buf.append(os.read(self._tty_descriptor, 1))
else:
break
result = ''.join(buf)
return result
def _identifyTtyPort(self):
"""Saves this board's serial port"""
dev_dir = '/dev/'
id_prefix = 'ID_SERIAL_SHORT='
num_reset_tries = 3
reset_wait_time_s = 10
com_devices = [f for f in os.listdir(
dev_dir) if f.startswith('ttyACM')]
for i in range(num_reset_tries):
for device in com_devices:
self.tty_port = os.path.join(dev_dir, device)
properties = sp.check_output(['udevadm',
'info',
'-a',
'-n',
self.tty_port,
'--query=property'])
for line in [l.strip() for l in properties.split('\n')]:
if line.startswith(id_prefix):
if self.hla_serial == line[len(id_prefix):]:
return
if i != num_reset_tries - 1: # No need to reset the obard the last time
self.reset() # May need to reset to connect
time.sleep(reset_wait_time_s)
# If we get here without returning, something is wrong
raise RuntimeError('The device dev path could not be found')
def _getDevFileDescriptor(self):
"""Read available bytes from device dev path"""
fd = os.open(self.tty_port, os.O_RDONLY)
flag = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
self._tty_descriptor = fd
class TestHarness(Board):
"""Subclass of Board representing a Test Harness
Attributes:
serial_path: Path to file containing serial number
"""
def __init__(self, serial_path=None):
"""Initializes a board object with given attributes
Args:
serial_path: Path to file containing serial number
"""
Board.__init__(self, TH_BOARD)
self.serial_path = serial_path
def updateSerial(self):
"""Loads serial number from saved location"""
if self.hla_serial:
return # serial was already loaded
try:
with open(self.serial_path, mode='r') as ser_f:
self.hla_serial = ser_f.read()
return
except IOError:
msg = ('Your th hla_serial may not have been saved.\n'
'Connect only your th and run ./cts --setup, then try again.')
raise RuntimeError(msg)
def saveSerial(self):
"""Saves the th serial number to a file
Return: the serial number saved
"""
serial = Cts.getSerialNumbers()
if len(serial) != 1:
msg = ('TH could not be identified.\n'
'\nConnect your TH and remove other st-link devices')
raise RuntimeError(msg)
else:
ser = serial[0]
if not ser:
msg = ('Unable to save serial')
raise RuntimeError(msg)
if not os.path.exists(os.path.dirname(self.serial_path)):
os.makedirs(os.path.dirname(self.serial_path))
with open(self.serial_path, mode='w') as ser_f:
ser_f.write(ser)
self.hla_serial = ser
return ser
class DeviceUnderTest(Board):
"""Subclass of Board representing a DUT board
Attributes:
th: Reference to test harness board to which this DUT is attached
"""
def __init__(self, board, th, hla_ser=None, f_offset='0x08000000'):
"""Initializes a Device Under Test object with given attributes
Args:
board: String containing board name
th: Reference to test harness board to which this DUT is attached
hla_serial: Serial number if board uses an HLA adaptor
"""
Board.__init__(self, board, hla_serial=hla_ser, flash_offset=f_offset)
self.th = th
def updateSerial(self):
"""Stores the DUT's serial number.
Precondition: The DUT and TH must both be connected, and th.hla_serial
must hold the correct value (the th's serial #)
"""
if self.hla_serial != None:
return # serial was already set ('' is a valid serial)
serials = Cts.getSerialNumbers()
dut = [s for s in serials if self.th.hla_serial != s]
if len(dut) == 1:
self.hla_serial = dut[0]
return # Found your other st-link device serial!
else:
raise RuntimeError('Your TH serial number is incorrect, or your have'
' too many st-link devices attached.')
# If len(dut) is 0 then your dut doesn't use an st-link device, so we
# don't have to worry about its serial number
class Cts(object):
"""Class that represents a CTS testing setup and provides
@@ -312,7 +47,6 @@ class Cts(object):
dut: DeviceUnderTest object representing dut
th: TestHarness object representing th
module: Name of module to build/run tests for
ec_directory: String containing path to EC top level directory
test_names: List of strings of test names contained in given module
test_results: Dictionary of results of each test from module, with
keys being test name strings and values being test result integers
@@ -324,8 +58,7 @@ class Cts(object):
messages sent while it was running
"""
def __init__(self, ec_dir,
dut='nucleo-f072rb', module='gpio', debug=False):
def __init__(self, ec_dir, dut, module, debug=False):
"""Initializes cts class object with given arguments.
Args:
@@ -334,105 +67,66 @@ class Cts(object):
dut: Name of board to use for DUT
module: Name of module to build/run tests for
debug: Boolean that indicates whether or not on-board debug message
printing should be enabled when building.
printing should be enabled.
"""
self.results_dir = '/tmp/cts_results'
self.results_dir = CTS_TEST_RESULT_DIR
self.ec_dir = ec_dir
self.module = module
self.debug = debug
self.ec_directory = ec_dir
self.th = TestHarness()
self.dut = DeviceUnderTest(dut, self.th) # DUT constructor needs TH
th_ser_path = os.path.join(
self.ec_directory,
'build',
self.th.board,
'th_hla_serial')
self.module = module
testlist_path = os.path.join(
self.ec_directory,
'cts',
self.module,
'cts.testlist')
self.test_names = Cts._getMacroArgs(testlist_path, 'CTS_TEST')
serial_path = os.path.join(self.ec_dir, 'build', 'cts_th_serial')
self.th = board.TestHarness(DEFAULT_TH, serial_path)
self.dut = board.DeviceUnderTest(dut, self.th)
cts_dir = os.path.join(self.ec_dir, 'cts')
testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist')
self.test_names = Cts.get_macro_args(testlist_path, 'CTS_TEST')
self.debug_output = {}
for test in self.test_names:
self.debug_output[test] = []
self.th.serial_path = th_ser_path
return_codes_path = os.path.join(self.ec_directory,
'cts',
'common',
'cts.rc')
self.return_codes = dict(enumerate(Cts._getMacroArgs(
return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc')
self.return_codes = dict(enumerate(Cts.get_macro_args(
return_codes_path, 'CTS_RC_')))
self.return_codes[CTS_CONFLICTING_CODE] = 'RESULTS CONFLICT'
self.return_codes[CTS_CORRUPTED_CODE] = 'CORRUPTED'
self.test_results = collections.OrderedDict()
def make(self):
self.dut.make(self.module, self.ec_directory, self.debug)
self.th.make(self.module, self.ec_directory, self.debug)
def build(self):
"""Build images for DUT and TH"""
self.dut.build(self.module, self.ec_dir, self.debug)
self.th.build(self.module, self.ec_dir, self.debug)
def flashBoards(self):
def flash_boards(self):
"""Flashes th and dut boards with their most recently build ec.bin"""
self.updateSerials()
self.identify_boards()
self.th.flash()
self.dut.flash()
def setup(self):
"""Saves th serial number if th only is connected.
"""Setup boards"""
self.th.save_serial()
Return:
Serial number that was saved
"""
return self.th.saveSerial()
def updateSerials(self):
def identify_boards(self):
"""Updates serials of both th and dut, in that order (order matters)"""
self.th.updateSerial()
self.dut.updateSerial()
self.th.get_serial()
self.dut.get_serial()
def resetBoards(self):
def reset_boards(self):
"""Resets the boards and allows them to run tests
Due to current (7/27/16) version of sync function,
both boards must be rest and halted, with the th
resuming first, in order for the test suite to run
in sync
"""
self.updateSerials()
self.th.sendOpenOcdCommands(['init', 'reset halt'])
self.dut.sendOpenOcdCommands(['init', 'reset halt'])
self.th.sendOpenOcdCommands(['init', 'resume'])
self.dut.sendOpenOcdCommands(['init', 'resume'])
self.identify_boards()
self.th.send_open_ocd_commands(['init', 'reset halt'])
self.dut.send_open_ocd_commands(['init', 'reset halt'])
self.th.send_open_ocd_commands(['init', 'resume'])
self.dut.send_open_ocd_commands(['init', 'resume'])
@staticmethod
def getSerialNumbers():
"""Gets serial numbers of all st-link v2.1 board attached to host
Returns:
List of serials
"""
usb_args = ['lsusb', '-v', '-d', '0x0483:0x374b']
usb_process = sp.Popen(usb_args, stdout=sp.PIPE, shell=False)
st_link_info = usb_process.communicate()[0]
st_serials = []
for line in st_link_info.split('\n'):
if 'iSerial' in line:
st_serials.append(line.split()[2].strip())
return st_serials
@staticmethod
def _getMacroArgs(filepath, macro):
def get_macro_args(filepath, macro):
"""Get list of args of a certain macro in a file when macro is used
by itself on a line
@@ -441,14 +135,15 @@ class Cts(object):
macro: String containing text of macro to get args of
"""
args = []
with open(filepath, 'r') as fl:
for ln in [ln for ln in fl.readlines(
) if ln.strip().startswith(macro)]:
ln = ln.strip()[len(macro):]
args.append(ln.strip('()').replace(',', ''))
with open(filepath, 'r') as f:
for l in f.readlines():
if not l.strip().startswith(macro):
continue
l = l.strip()[len(macro):]
args.append(l.strip('()').replace(',', ''))
return args
def extractDebugOutput(self, output):
def extract_debug_output(self, output):
"""Append the debug messages from output to self.debug_output
Args:
@@ -480,7 +175,7 @@ class Cts(object):
test_num += 1
i += 1
def _parseOutput(self, r1, r2):
def parse_output(self, r1, r2):
"""Parse the outputs of the DUT and TH together
Args;
@@ -491,8 +186,8 @@ class Cts(object):
first_corrupted_test = len(self.test_names)
self.extractDebugOutput(r1)
self.extractDebugOutput(r2)
self.extract_debug_output(r1)
self.extract_debug_output(r2)
for output_str in [r1, r2]:
test_num = 0
@@ -527,7 +222,7 @@ class Cts(object):
for test in self.test_names[first_corrupted_test:]:
self.test_results[test] = CTS_CORRUPTED_CODE
def _resultsAsString(self):
def _results_as_string(self):
"""Takes saved results and returns a duplicate of their dictionary
with the return codes replaces with their string representation
@@ -541,13 +236,13 @@ class Cts(object):
result[test] = self.return_codes.get(code, 'UNKNOWN %d' % code)
return result
def prettyResults(self):
def prettify_results(self):
"""Takes saved results and returns a string representation of them
Return: Dictionary similar to self.test_results, but with strings
instead of error codes
"""
res = self._resultsAsString()
res = self._results_as_string()
t_long = max(len(s) for s in res.keys())
e_max_len = max(len(s) for s in res.values())
@@ -560,8 +255,8 @@ class Cts(object):
return pretty_results
def resultsAsHtml(self):
res = self._resultsAsString()
def results_as_html(self):
res = self._results_as_string()
root = et.Element('html')
head = et.SubElement(root, 'head')
style = et.SubElement(head, 'style')
@@ -607,39 +302,33 @@ class Cts(object):
return et.tostring(root, method='html')
def resetAndRecord(self):
def run(self):
"""Resets boards, records test results in results dir"""
self.updateSerials()
self.dut.setupForOutput()
self.th.setupForOutput()
self.identify_boards()
self.dut.setup_tty()
self.th.setup_tty()
self.dut.readAvailableBytes() # clear buffer
self.th.readAvailableBytes()
bad_cat_message = (
'Output missing from boards.\n'
'If you are running cat on a ttyACMx file,\n'
'please kill that process and try again'
)
# clear buffers
self.dut.read_tty()
self.th.read_tty()
self.resetBoards()
self.reset_boards()
time.sleep(MAX_SUITE_TIME_SEC)
dut_results = self.dut.readAvailableBytes()
th_results = self.th.readAvailableBytes()
dut_results = self.dut.read_tty()
th_results = self.th.read_tty()
if not dut_results or not th_results:
msg = ('Output missing from boards. If you have a process reading '
'ttyACMx, please kill that process and try again.')
raise ValueError(bad_cat_message)
self._parseOutput(dut_results, th_results)
pretty_results = self.prettyResults()
html_results = self.resultsAsHtml()
self.parse_output(dut_results, th_results)
pretty_results = self.prettify_results()
html_results = self.results_as_html()
dest = os.path.join(
self.results_dir,
self.dut.board,
self.module + '.html'
)
dest = os.path.join(self.results_dir, self.dut.board, self.module + '.html')
if not os.path.exists(os.path.dirname(dest)):
os.makedirs(os.path.dirname(dest))
@@ -648,42 +337,43 @@ class Cts(object):
print pretty_results
def main():
"""Main entry point for cts script from command line"""
ec_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')
"""Main entry point for CTS script from command line"""
ec_dir = os.path.realpath(os.path.join(
os.path.dirname(os.path.abspath(__file__)), '..'))
os.chdir(ec_dir)
dut_board = 'nucleo-f072rb' # nucleo by default
module = 'gpio' # gpio by default
debug = False
dut = DEFAULT_DUT
module = 'meta'
parser = argparse.ArgumentParser(description='Used to build/flash boards')
parser.add_argument('-d',
'--dut',
help='Specify DUT you want to build/flash')
'--dut',
help='Specify DUT you want to build/flash')
parser.add_argument('-m',
'--module',
help='Specify module you want to build/flash')
'--module',
help='Specify module you want to build/flash')
parser.add_argument('--debug',
action='store_true',
help='If building, build with debug printing enabled. This may'
'change test results')
action='store_true',
help=('If building, build with debug printing enabled. '
'This may change test results'))
parser.add_argument('-s',
'--setup',
action='store_true',
help='Connect only the th to save its serial')
'--setup',
action='store_true',
help='Connect only the TH to save its serial')
parser.add_argument('-b',
'--build',
action='store_true',
help='Build test suite (no flashing)')
'--build',
action='store_true',
help='Build test suite (no flashing)')
parser.add_argument('-f',
'--flash',
action='store_true',
help='Flash boards with most recent image and record results')
'--flash',
action='store_true',
help='Flash boards with most recent images')
parser.add_argument('-r',
'--reset',
action='store_true',
help='Reset boards and save test results (no flashing)')
'--run',
action='store_true',
help='Run tests without flashing')
args = parser.parse_args()
@@ -691,29 +381,22 @@ def main():
module = args.module
if args.dut:
dut_board = args.dut
dut = args.dut
if args.debug:
debug = args.debug
cts_suite = Cts(ec_dir, module=module, dut=dut_board, debug=debug)
cts = Cts(ec_dir, dut=dut, module=module, debug=args.debug)
if args.setup:
serial = cts_suite.setup()
print 'Your th hla_serial # has been saved as: ' + serial
elif args.reset:
cts_suite.resetAndRecord()
cts.setup()
elif args.build:
cts_suite.make()
cts.build()
elif args.flash:
cts_suite.flashBoards()
cts.flash_boards()
elif args.run:
cts.run()
else:
cts_suite.make()
cts_suite.flashBoards()
cts.build()
cts.flash_boards()
cts.run()
if __name__ == "__main__":
main()