mirror of
https://github.com/Telecominfraproject/OpenCellular.git
synced 2026-01-04 14:01:54 +00:00
tigertail: tigertool command line api
This tool allows an easy commandline interface to set the USB-C mux position, as well as init and reboot. BRANCH=None BUG=b:35849284 TEST=flash, control tigertail successfully Change-Id: I8d60c215fee04de158c22edca5377c3c6cd48cf0 Reviewed-on: https://chromium-review.googlesource.com/493617 Commit-Ready: Nick Sanders <nsanders@chromium.org> Tested-by: Nick Sanders <nsanders@chromium.org> Reviewed-by: Aseda Aboagye <aaboagye@chromium.org>
This commit is contained in:
18
extra/tigertool/README.md
Normal file
18
extra/tigertool/README.md
Normal file
@@ -0,0 +1,18 @@
|
||||
# tigertool
|
||||
|
||||
tigertool.py is a commandline utility to control the tigertail USB-C mux.
|
||||
It supports changing the mux status to port A, B, or off.
|
||||
You can set a serial number to use multiple tigertails at once.
|
||||
|
||||
## Usage
|
||||
Typical usage to set the mux port<br>
|
||||
```./tigertail.py -m [A|B|off] -s [serialno]```<br>
|
||||
|
||||
Reboot the tigertail<br>
|
||||
```./tigertail.py --reboot```<br>
|
||||
|
||||
Set the serial number, when only one tigertail is plugged<br>
|
||||
```./tigertail.py --setserialno=[serialno]```<br>
|
||||
|
||||
Tigertail can support up to 20V 3A on the mux and passes through all
|
||||
USB-C lines except SBU.
|
||||
5
extra/tigertool/ecusb/__init__.py
Normal file
5
extra/tigertool/ecusb/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
__all__ = ['tiny_servo_common', 'stm32usb', 'stm32uart', 'pty_driver']
|
||||
291
extra/tigertool/ecusb/pty_driver.py
Normal file
291
extra/tigertool/ecusb/pty_driver.py
Normal file
@@ -0,0 +1,291 @@
|
||||
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
"""ptyDriver class
|
||||
|
||||
This class takes a pty interface and can send commands and expect results
|
||||
as regex. This is useful for automating console based interfaces, such as
|
||||
the CrOS EC console commands.
|
||||
"""
|
||||
|
||||
import ast
|
||||
import errno
|
||||
import fcntl
|
||||
import os
|
||||
import pexpect
|
||||
import fdpexpect
|
||||
|
||||
|
||||
# Expecting a result in 3 seconds is plenty even for slow platforms.
|
||||
DEFAULT_UART_TIMEOUT = 3
|
||||
|
||||
|
||||
class ptyError(Exception):
|
||||
"""Exception class for pty errors."""
|
||||
|
||||
|
||||
UART_PARAMS = {
|
||||
'uart_cmd': None,
|
||||
'uart_multicmd': None,
|
||||
'uart_regexp': None,
|
||||
'uart_timeout': DEFAULT_UART_TIMEOUT,
|
||||
}
|
||||
|
||||
|
||||
class ptyDriver(object):
|
||||
"""Automate interactive commands on a pty interface."""
|
||||
def __init__(self, interface, params, fast=False):
|
||||
"""Init class variables."""
|
||||
self._child = None
|
||||
self._fd = None
|
||||
self._interface = interface
|
||||
self._pty_path = self._interface.get_pty()
|
||||
self._dict = UART_PARAMS.copy()
|
||||
self._fast = fast
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
"""Close any open files and interfaces."""
|
||||
if self._fd:
|
||||
self._close()
|
||||
self._interface.close()
|
||||
|
||||
def _open(self):
|
||||
"""Connect to serial device and create pexpect interface."""
|
||||
assert self._fd is None
|
||||
self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK)
|
||||
# Don't allow forked processes to access.
|
||||
fcntl.fcntl(self._fd, fcntl.F_SETFD,
|
||||
fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
|
||||
self._child = fdpexpect.fdspawn(self._fd)
|
||||
# pexpect defaults to a 100ms delay before sending characters, to
|
||||
# work around race conditions in ssh. We don't need this feature
|
||||
# so we'll change delaybeforesend from 0.1 to 0.001 to speed things up.
|
||||
if self._fast:
|
||||
self._child.delaybeforesend = 0.001
|
||||
|
||||
def _close(self):
|
||||
"""Close serial device connection."""
|
||||
os.close(self._fd)
|
||||
self._fd = None
|
||||
self._child = None
|
||||
|
||||
def _flush(self):
|
||||
"""Flush device output to prevent previous messages interfering."""
|
||||
if self._child.sendline('') != 1:
|
||||
raise ptyError('Failed to send newline.')
|
||||
while True:
|
||||
try:
|
||||
self._child.expect('.', timeout=0.2)
|
||||
except (pexpect.TIMEOUT, pexpect.EOF):
|
||||
break
|
||||
except OSError as e:
|
||||
# EAGAIN indicates no data available, maybe we didn't wait long enough.
|
||||
if e.errno != errno.EAGAIN:
|
||||
raise
|
||||
break
|
||||
|
||||
def _send(self, cmds):
|
||||
"""Send command to EC.
|
||||
|
||||
This function always flushes serial device before sending, and is used as
|
||||
a wrapper function to make sure the channel is always flushed before
|
||||
sending commands.
|
||||
|
||||
Args:
|
||||
cmds: The commands to send to the device, either a list or a string.
|
||||
|
||||
Raises:
|
||||
ptyError: Raised when writing to the device fails.
|
||||
"""
|
||||
self._flush()
|
||||
if not isinstance(cmds, list):
|
||||
cmds = [cmds]
|
||||
for cmd in cmds:
|
||||
if self._child.sendline(cmd) != len(cmd) + 1:
|
||||
raise ptyError('Failed to send command.')
|
||||
|
||||
def _issue_cmd(self, cmds):
|
||||
"""Send command to the device and do not wait for response.
|
||||
|
||||
Args:
|
||||
cmds: The commands to send to the device, either a list or a string.
|
||||
"""
|
||||
self._issue_cmd_get_results(cmds, [])
|
||||
|
||||
def _issue_cmd_get_results(self, cmds,
|
||||
regex_list, timeout=DEFAULT_UART_TIMEOUT):
|
||||
"""Send command to the device and wait for response.
|
||||
|
||||
This function waits for response message matching a regular
|
||||
expressions.
|
||||
|
||||
Args:
|
||||
cmds: The commands issued, either a list or a string.
|
||||
regex_list: List of Regular expressions used to match response message.
|
||||
Note1, list must be ordered.
|
||||
Note2, empty list sends and returns.
|
||||
timeout: time to wait for matching results before failing.
|
||||
|
||||
Returns:
|
||||
List of tuples, each of which contains the entire matched string and
|
||||
all the subgroups of the match. None if not matched.
|
||||
For example:
|
||||
response of the given command:
|
||||
High temp: 37.2
|
||||
Low temp: 36.4
|
||||
regex_list:
|
||||
['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
|
||||
returns:
|
||||
[('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
|
||||
|
||||
Raises:
|
||||
ptyError: If timed out waiting for a response
|
||||
"""
|
||||
result_list = []
|
||||
self._open()
|
||||
try:
|
||||
self._send(cmds)
|
||||
for regex in regex_list:
|
||||
self._child.expect(regex, timeout)
|
||||
match = self._child.match
|
||||
lastindex = match.lastindex if match and match.lastindex else 0
|
||||
# Create a tuple which contains the entire matched string and all
|
||||
# the subgroups of the match.
|
||||
result = match.group(*range(lastindex + 1)) if match else None
|
||||
result_list.append(result)
|
||||
except pexpect.TIMEOUT:
|
||||
raise ptyError('Timeout waiting for response.')
|
||||
finally:
|
||||
self._close()
|
||||
return result_list
|
||||
|
||||
def _issue_cmd_get_multi_results(self, cmd, regex):
|
||||
"""Send command to the device and wait for multiple response.
|
||||
|
||||
This function waits for arbitrary number of response message
|
||||
matching a regular expression.
|
||||
|
||||
Args:
|
||||
cmd: The command issued.
|
||||
regex: Regular expression used to match response message.
|
||||
|
||||
Returns:
|
||||
List of tuples, each of which contains the entire matched string and
|
||||
all the subgroups of the match. None if not matched.
|
||||
"""
|
||||
result_list = []
|
||||
self._open()
|
||||
try:
|
||||
self._send(cmd)
|
||||
while True:
|
||||
try:
|
||||
self._child.expect(regex, timeout=0.1)
|
||||
match = self._child.match
|
||||
lastindex = match.lastindex if match and match.lastindex else 0
|
||||
# Create a tuple which contains the entire matched string and all
|
||||
# the subgroups of the match.
|
||||
result = match.group(*range(lastindex + 1)) if match else None
|
||||
result_list.append(result)
|
||||
except pexpect.TIMEOUT:
|
||||
break
|
||||
finally:
|
||||
self._close()
|
||||
return result_list
|
||||
|
||||
def _Set_uart_timeout(self, timeout):
|
||||
"""Set timeout value for waiting for the device response.
|
||||
|
||||
Args:
|
||||
timeout: Timeout value in second.
|
||||
"""
|
||||
self._dict['uart_timeout'] = timeout
|
||||
|
||||
def _Get_uart_timeout(self):
|
||||
"""Get timeout value for waiting for the device response.
|
||||
|
||||
Returns:
|
||||
Timeout value in second.
|
||||
"""
|
||||
return self._dict['uart_timeout']
|
||||
|
||||
def _Set_uart_regexp(self, regexp):
|
||||
"""Set the list of regular expressions which matches the command response.
|
||||
|
||||
Args:
|
||||
regexp: A string which contains a list of regular expressions.
|
||||
"""
|
||||
if not isinstance(regexp, str):
|
||||
raise ptyError('The argument regexp should be a string.')
|
||||
self._dict['uart_regexp'] = ast.literal_eval(regexp)
|
||||
|
||||
def _Get_uart_regexp(self):
|
||||
"""Get the list of regular expressions which matches the command response.
|
||||
|
||||
Returns:
|
||||
A string which contains a list of regular expressions.
|
||||
"""
|
||||
return str(self._dict['uart_regexp'])
|
||||
|
||||
def _Set_uart_cmd(self, cmd):
|
||||
"""Set the UART command and send it to the device.
|
||||
|
||||
If ec_uart_regexp is 'None', the command is just sent and it doesn't care
|
||||
about its response.
|
||||
|
||||
If ec_uart_regexp is not 'None', the command is send and its response,
|
||||
which matches the regular expression of ec_uart_regexp, will be kept.
|
||||
Use its getter to obtain this result. If no match after ec_uart_timeout
|
||||
seconds, a timeout error will be raised.
|
||||
|
||||
Args:
|
||||
cmd: A string of UART command.
|
||||
"""
|
||||
if self._dict['uart_regexp']:
|
||||
self._dict['uart_cmd'] = self._issue_cmd_get_results(
|
||||
cmd, self._dict['uart_regexp'], self._dict['uart_timeout'])
|
||||
else:
|
||||
self._dict['uart_cmd'] = None
|
||||
self._issue_cmd(cmd)
|
||||
|
||||
def _Set_uart_multicmd(self, cmds):
|
||||
"""Set multiple UART commands and send them to the device.
|
||||
|
||||
Note that ec_uart_regexp is not supported to match the results.
|
||||
|
||||
Args:
|
||||
cmds: A semicolon-separated string of UART commands.
|
||||
"""
|
||||
self._issue_cmd(cmds.split(';'))
|
||||
|
||||
def _Get_uart_cmd(self):
|
||||
"""Get the result of the latest UART command.
|
||||
|
||||
Returns:
|
||||
A string which contains a list of tuples, each of which contains the
|
||||
entire matched string and all the subgroups of the match. 'None' if
|
||||
the ec_uart_regexp is 'None'.
|
||||
"""
|
||||
return str(self._dict['uart_cmd'])
|
||||
|
||||
def _Set_uart_capture(self, cmd):
|
||||
"""Set UART capture mode (on or off).
|
||||
|
||||
Once capture is enabled, UART output could be collected periodically by
|
||||
invoking _Get_uart_stream() below.
|
||||
|
||||
Args:
|
||||
cmd: True for on, False for off
|
||||
"""
|
||||
self._interface.set_capture_active(cmd)
|
||||
|
||||
def _Get_uart_capture(self):
|
||||
"""Get the UART capture mode (on or off)."""
|
||||
return self._interface.get_capture_active()
|
||||
|
||||
def _Get_uart_stream(self):
|
||||
"""Get uart stream generated since last time."""
|
||||
return self._interface.get_stream()
|
||||
235
extra/tigertool/ecusb/stm32uart.py
Normal file
235
extra/tigertool/ecusb/stm32uart.py
Normal file
@@ -0,0 +1,235 @@
|
||||
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
"""Allow creation of uart/console interface via stm32 usb endpoint."""
|
||||
|
||||
import os
|
||||
import select
|
||||
import sys
|
||||
import termios
|
||||
import threading
|
||||
import time
|
||||
import tty
|
||||
import usb
|
||||
|
||||
import stm32usb
|
||||
|
||||
|
||||
class SuartError(Exception):
|
||||
"""Class for exceptions of Suart."""
|
||||
def __init__(self, msg, value=0):
|
||||
"""SuartError constructor.
|
||||
|
||||
Args:
|
||||
msg: string, message describing error in detail
|
||||
value: integer, value of error when non-zero status returned. Default=0
|
||||
"""
|
||||
super(SuartError, self).__init__(msg, value)
|
||||
self.msg = msg
|
||||
self.value = value
|
||||
|
||||
|
||||
class Suart(object):
|
||||
"""Provide interface to stm32 serial usb endpoint."""
|
||||
def __init__(self, vendor=0x18d1, product=0x501a, interface=0,
|
||||
serialname=None, ftdi_context=None):
|
||||
"""Suart contstructor.
|
||||
|
||||
Initializes stm32 USB stream interface.
|
||||
|
||||
Args:
|
||||
vendor: usb vendor id of stm32 device
|
||||
product: usb product id of stm32 device
|
||||
interface: interface number of stm32 device to use
|
||||
serialname: n/a. Defaults to None.
|
||||
ftdi_context: n/a. Defaults to None.
|
||||
|
||||
Raises:
|
||||
SuartError: If init fails
|
||||
"""
|
||||
self._ptym = None
|
||||
self._ptys = None
|
||||
self._ptyname = None
|
||||
self._rx_thread = None
|
||||
self._tx_thread = None
|
||||
self._susb = stm32usb.Susb(vendor=vendor, product=product,
|
||||
interface=interface, serialname=serialname)
|
||||
self._running = False
|
||||
|
||||
def __del__(self):
|
||||
"""Suart destructor."""
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
"""Stop all running threads."""
|
||||
self._running = False
|
||||
if self._rx_thread:
|
||||
self._rx_thread.join(2)
|
||||
self._rx_thread = None
|
||||
if self._tx_thread:
|
||||
self._tx_thread.join(2)
|
||||
self._tx_thread = None
|
||||
|
||||
def run_rx_thread(self):
|
||||
"""Background loop to pass data from USB to pty."""
|
||||
ep = select.epoll()
|
||||
ep.register(self._ptym, select.EPOLLHUP)
|
||||
try:
|
||||
while self._running:
|
||||
events = ep.poll(0)
|
||||
# Check if the pty is connected to anything, or hungup.
|
||||
if not events:
|
||||
try:
|
||||
r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS)
|
||||
if r:
|
||||
os.write(self._ptym, r)
|
||||
|
||||
# If we miss some characters on pty disconnect, that's fine.
|
||||
# ep.read() also throws USBError on timeout, which we discard.
|
||||
except OSError:
|
||||
pass
|
||||
except usb.core.USBError:
|
||||
pass
|
||||
else:
|
||||
time.sleep(.1)
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
def run_tx_thread(self):
|
||||
"""Background loop to pass data from pty to USB."""
|
||||
ep = select.epoll()
|
||||
ep.register(self._ptym, select.EPOLLHUP)
|
||||
try:
|
||||
while self._running:
|
||||
events = ep.poll(0)
|
||||
# Check if the pty is connected to anything, or hungup.
|
||||
if not events:
|
||||
try:
|
||||
r = os.read(self._ptym, 64)
|
||||
if r:
|
||||
self._susb._write_ep.write(r, self._susb.TIMEOUT_MS)
|
||||
|
||||
except OSError:
|
||||
pass
|
||||
except usb.core.USBError:
|
||||
pass
|
||||
else:
|
||||
time.sleep(.1)
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
def run(self):
|
||||
"""Creates pthreads to poll stm32 & PTY for data."""
|
||||
m, s = os.openpty()
|
||||
self._ptyname = os.ttyname(s)
|
||||
|
||||
self._ptym = m
|
||||
self._ptys = s
|
||||
|
||||
os.fchmod(s, 0o660)
|
||||
|
||||
# Change the owner and group of the PTY to the user who started servod.
|
||||
try:
|
||||
uid = int(os.environ.get('SUDO_UID', -1))
|
||||
except TypeError:
|
||||
uid = -1
|
||||
|
||||
try:
|
||||
gid = int(os.environ.get('SUDO_GID', -1))
|
||||
except TypeError:
|
||||
gid = -1
|
||||
os.fchown(s, uid, gid)
|
||||
|
||||
tty.setraw(self._ptym, termios.TCSADRAIN)
|
||||
|
||||
# Generate a HUP flag on pty slave fd.
|
||||
os.fdopen(s).close()
|
||||
|
||||
self._running = True
|
||||
|
||||
self._rx_thread = threading.Thread(target=self.run_rx_thread, args=[])
|
||||
self._rx_thread.daemon = True
|
||||
self._rx_thread.start()
|
||||
|
||||
self._tx_thread = threading.Thread(target=self.run_tx_thread, args=[])
|
||||
self._tx_thread.daemon = True
|
||||
self._tx_thread.start()
|
||||
|
||||
def get_uart_props(self):
|
||||
"""Get the uart's properties.
|
||||
|
||||
Returns:
|
||||
dict where:
|
||||
baudrate: integer of uarts baudrate
|
||||
bits: integer, number of bits of data Can be 5|6|7|8 inclusive
|
||||
parity: integer, parity of 0-2 inclusive where:
|
||||
0: no parity
|
||||
1: odd parity
|
||||
2: even parity
|
||||
sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
|
||||
0: 1 stop bit
|
||||
1: 1.5 stop bits
|
||||
2: 2 stop bits
|
||||
"""
|
||||
return {
|
||||
'baudrate': 115200,
|
||||
'bits': 8,
|
||||
'parity': 0,
|
||||
'sbits': 1,
|
||||
}
|
||||
|
||||
def set_uart_props(self, line_props):
|
||||
"""Set the uart's properties.
|
||||
|
||||
Note that Suart cannot set properties
|
||||
and will fail if the properties are not the default 115200,8n1.
|
||||
|
||||
Args:
|
||||
line_props: dict where:
|
||||
baudrate: integer of uarts baudrate
|
||||
bits: integer, number of bits of data ( prior to stop bit)
|
||||
parity: integer, parity of 0-2 inclusive where
|
||||
0: no parity
|
||||
1: odd parity
|
||||
2: even parity
|
||||
sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
|
||||
0: 1 stop bit
|
||||
1: 1.5 stop bits
|
||||
2: 2 stop bits
|
||||
|
||||
Raises:
|
||||
SuartError: If requested line properties are not the default.
|
||||
"""
|
||||
curr_props = self.get_uart_props()
|
||||
for prop in line_props:
|
||||
if line_props[prop] != curr_props[prop]:
|
||||
raise SuartError('Line property %s cannot be set from %s to %s' % (
|
||||
prop, curr_props[prop], line_props[prop]))
|
||||
return True
|
||||
|
||||
def get_pty(self):
|
||||
"""Gets path to pty for communication to/from uart.
|
||||
|
||||
Returns:
|
||||
String path to the pty connected to the uart
|
||||
"""
|
||||
return self._ptyname
|
||||
|
||||
|
||||
def main():
|
||||
"""Run a suart test with the default parameters."""
|
||||
try:
|
||||
sobj = Suart()
|
||||
sobj.run()
|
||||
|
||||
# run() is a thread so just busy wait to mimic server.
|
||||
while True:
|
||||
# Ours sleeps to eleven!
|
||||
time.sleep(11)
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
108
extra/tigertool/ecusb/stm32usb.py
Normal file
108
extra/tigertool/ecusb/stm32usb.py
Normal file
@@ -0,0 +1,108 @@
|
||||
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
"""Allows creation of an interface via stm32 usb."""
|
||||
|
||||
import usb
|
||||
|
||||
|
||||
class SusbError(Exception):
|
||||
"""Class for exceptions of Susb."""
|
||||
def __init__(self, msg, value=0):
|
||||
"""SusbError constructor.
|
||||
|
||||
Args:
|
||||
msg: string, message describing error in detail
|
||||
value: integer, value of error when non-zero status returned. Default=0
|
||||
"""
|
||||
super(SusbError, self).__init__(msg, value)
|
||||
self.msg = msg
|
||||
self.value = value
|
||||
|
||||
|
||||
class Susb(object):
|
||||
"""Provide stm32 USB functionality.
|
||||
|
||||
Instance Variables:
|
||||
_read_ep: pyUSB read endpoint for this interface
|
||||
_write_ep: pyUSB write endpoint for this interface
|
||||
"""
|
||||
READ_ENDPOINT = 0x81
|
||||
WRITE_ENDPOINT = 0x1
|
||||
TIMEOUT_MS = 100
|
||||
|
||||
def __init__(self, vendor=0x18d1,
|
||||
product=0x5027, interface=1, serialname=None, logger=None):
|
||||
"""Susb constructor.
|
||||
|
||||
Discovers and connects to stm32 USB endpoints.
|
||||
|
||||
Args:
|
||||
vendor: usb vendor id of stm32 device.
|
||||
product: usb product id of stm32 device.
|
||||
interface: interface number ( 1 - 4 ) of stm32 device to use.
|
||||
serialname: string of device serialname.
|
||||
logger: none
|
||||
|
||||
Raises:
|
||||
SusbError: An error accessing Susb object
|
||||
"""
|
||||
self._vendor = vendor
|
||||
self._product = product
|
||||
self._interface = interface
|
||||
self._serialname = serialname
|
||||
self._find_device()
|
||||
|
||||
def _find_device(self):
|
||||
"""Set up the usb endpoint"""
|
||||
# Find the stm32.
|
||||
dev_list = usb.core.find(idVendor=self._vendor, idProduct=self._product,
|
||||
find_all=True)
|
||||
if not dev_list:
|
||||
raise SusbError('USB device not found')
|
||||
|
||||
# Check if we have multiple stm32s and we've specified the serial.
|
||||
dev = None
|
||||
if self._serialname:
|
||||
for d in dev_list:
|
||||
dev_serial = usb.util.get_string(d, d.iSerialNumber)
|
||||
if dev_serial == self._serialname:
|
||||
dev = d
|
||||
break
|
||||
if dev is None:
|
||||
raise SusbError('USB device(%s) not found' % self._serialname)
|
||||
else:
|
||||
try:
|
||||
dev = dev_list.next()
|
||||
except StopIteration:
|
||||
raise SusbError('USB device %04x:%04x not found' % (
|
||||
self._vendor, self._product))
|
||||
|
||||
# If we can't set configuration, it's already been set.
|
||||
try:
|
||||
dev.set_configuration()
|
||||
except usb.core.USBError:
|
||||
pass
|
||||
|
||||
# Get an endpoint instance.
|
||||
cfg = dev.get_active_configuration()
|
||||
intf = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interface)
|
||||
self._intf = intf
|
||||
if not intf:
|
||||
raise SusbError('Interface %04x:%04x - 0x%x not found' % (
|
||||
self._vendor, self._product, self._interface))
|
||||
|
||||
# Detach raiden.ko if it is loaded. CCD endpoints support either a kernel
|
||||
# module driver that produces a ttyUSB, or direct endpoint access, but
|
||||
# can't do both at the same time.
|
||||
if dev.is_kernel_driver_active(intf.bInterfaceNumber) is True:
|
||||
dev.detach_kernel_driver(intf.bInterfaceNumber)
|
||||
|
||||
read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT
|
||||
read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number)
|
||||
self._read_ep = read_ep
|
||||
|
||||
write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT
|
||||
write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number)
|
||||
self._write_ep = write_ep
|
||||
145
extra/tigertool/ecusb/tiny_servo_common.py
Normal file
145
extra/tigertool/ecusb/tiny_servo_common.py
Normal file
@@ -0,0 +1,145 @@
|
||||
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
"""Utilities for using lightweight console functions."""
|
||||
|
||||
import datetime
|
||||
import errno
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import pty_driver
|
||||
import stm32uart
|
||||
|
||||
|
||||
class TinyServoError(Exception):
|
||||
"""Exceptions."""
|
||||
|
||||
|
||||
def log(output):
|
||||
"""Print output to console, logfiles can be added here.
|
||||
|
||||
Args:
|
||||
output: string to output.
|
||||
"""
|
||||
sys.stdout.write(output)
|
||||
sys.stdout.write('\n')
|
||||
sys.stdout.flush()
|
||||
|
||||
def check_usb(vidpid):
|
||||
"""Check if |vidpid| is present on the system's USB.
|
||||
|
||||
Args:
|
||||
vidpid: string representation of the usb vid:pid, eg. '18d1:2001'
|
||||
|
||||
Returns: True if found, Flase, otherwise.
|
||||
"""
|
||||
if subprocess.call(['lsusb', '-d', vidpid], stdout=open('/dev/null', 'w')):
|
||||
return False
|
||||
return True
|
||||
|
||||
def check_usb_sn(vidpid):
|
||||
"""Return the serial number
|
||||
|
||||
Return the serial number of the first USB device with VID:PID vidpid,
|
||||
or None if no device is found. This will not work well with two of
|
||||
the same device attached.
|
||||
|
||||
Args:
|
||||
vidpid: string representation of the usb vid:pid, eg. '18d1:2001'
|
||||
|
||||
Returns: string serial number if found, None otherwise.
|
||||
"""
|
||||
output = subprocess.check_output(['lsusb', '-v', '-d', vidpid])
|
||||
m = re.search(r'^\s*iSerial\s+(.*)$', output, flags=re.M)
|
||||
if m:
|
||||
return m.group(1)
|
||||
|
||||
return None
|
||||
|
||||
def wait_for_usb_remove(vidpid, timeout=None):
|
||||
"""Wait for USB device with vidpid to be removed.
|
||||
|
||||
Wrapper for wait_for_usb below
|
||||
"""
|
||||
wait_for_usb(vidpid, timeout=timeout, desiredpresence=False)
|
||||
|
||||
def wait_for_usb(vidpid, timeout=None, desiredpresence=True):
|
||||
"""Wait for usb device with vidpid to be present/absent.
|
||||
|
||||
Args:
|
||||
vidpid: string representation of the usb vid:pid, eg. '18d1:2001'
|
||||
timeout: timeout in seconds, None for no timeout.
|
||||
desiredpresence: True for present, False for not present.
|
||||
|
||||
Raises:
|
||||
TinyServoError: on timeout.
|
||||
"""
|
||||
if timeout:
|
||||
finish = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
|
||||
while check_usb(vidpid) != desiredpresence:
|
||||
time.sleep(.1)
|
||||
if timeout:
|
||||
if datetime.datetime.now() > finish:
|
||||
raise TinyServoError('Timeout', 'Timeout waiting for USB %s' % vidpid)
|
||||
|
||||
def do_serialno(serialno, pty):
|
||||
"""Set serialnumber 'serialno' via ec console 'pty'.
|
||||
|
||||
Commands are:
|
||||
# > serialno set 1234
|
||||
# Saving serial number
|
||||
# Serial number: 1234
|
||||
|
||||
Args:
|
||||
serialno: string serial number to set.
|
||||
pty: tinyservo console to send commands.
|
||||
|
||||
Raises:
|
||||
TinyServoError: on failure to set.
|
||||
ptyError: on command interface error.
|
||||
"""
|
||||
cmd = 'serialno set %s' % serialno
|
||||
regex = 'Serial number: (.*)$'
|
||||
|
||||
results = pty._issue_cmd_get_results(cmd, [regex])[0]
|
||||
sn = results[1].strip().strip('\n\r')
|
||||
|
||||
if sn == serialno:
|
||||
log('Success !')
|
||||
log('Serial set to %s' % sn)
|
||||
else:
|
||||
log('Serial number set to %s but saved as %s.' % (serialno, sn))
|
||||
raise TinyServoError(
|
||||
'Serial Number',
|
||||
'Serial number set to %s but saved as %s.' % (serialno, sn))
|
||||
|
||||
def setup_tinyservod(vidpid, interface, serialno=None):
|
||||
"""Set up a pty
|
||||
|
||||
Set up a pty to the ec console in order
|
||||
to send commands. Returns a pty_driver object.
|
||||
|
||||
Args:
|
||||
vidpid: string vidpid of device to access.
|
||||
interface: not used.
|
||||
serialno: string serial no of device requested, optional.
|
||||
|
||||
Returns: pty object
|
||||
|
||||
Raises:
|
||||
UsbError, SusbError: on device not found
|
||||
"""
|
||||
vidstr, pidstr = vidpid.split(':')
|
||||
vid = int(vidstr, 16)
|
||||
pid = int(pidstr, 16)
|
||||
suart = stm32uart.Suart(vendor=vid, product=pid,
|
||||
interface=interface, serialname=serialno)
|
||||
suart.run()
|
||||
pty = pty_driver.ptyDriver(suart, [])
|
||||
|
||||
return pty
|
||||
57
extra/tigertool/flash_dfu.sh
Executable file
57
extra/tigertool/flash_dfu.sh
Executable file
@@ -0,0 +1,57 @@
|
||||
#!/bin/bash
|
||||
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
FLAGS_timeout=600
|
||||
IMG=${1:-tigertail.bin}
|
||||
|
||||
echo "Flashing ${IMG}"
|
||||
|
||||
error() {
|
||||
printf "%s\n" "$*" >&2
|
||||
}
|
||||
|
||||
die() {
|
||||
[[ "$#*" == "0" ]] || error "$@"
|
||||
exit 1
|
||||
}
|
||||
|
||||
flash_stm32_dfu() {
|
||||
local DFU_DEVICE=0483:df11
|
||||
local ADDR=0x08000000
|
||||
|
||||
[[ -e "${IMG}" ]] || die "File ${IMG} not found!"
|
||||
|
||||
# Check for a suitable local dfu-util
|
||||
local LOCAL_DFU_UTIL=$(which dfu-util)
|
||||
if [[ -n "${LOCAL_DFU_UTIL}" ]]; then
|
||||
DFU_VERSION=$("${LOCAL_DFU_UTIL}" -V | head -n1 | cut -d' ' -f2)
|
||||
if [[ "${DFU_VERSION}" < "0.7" ]]; then
|
||||
LOCAL_DFU_UTIL=""
|
||||
fi
|
||||
fi
|
||||
local DFU_UTIL=${LOCAL_DFU_UTIL:-'./dfu-util'}
|
||||
|
||||
which "${DFU_UTIL}" &> /dev/null || die \
|
||||
"no dfu-util util found. Did you 'sudo emerge dfu-util'."
|
||||
|
||||
local dev_cnt=$(lsusb -d "${DFU_DEVICE}" | wc -l)
|
||||
if [ $dev_cnt -eq 0 ] ; then
|
||||
die "unable to locate dfu device at ${DFU_DEVICE}."
|
||||
elif [ $dev_cnt -ne 1 ] ; then
|
||||
die "too many dfu devices (${dev_cnt}). Disconnect all but one."
|
||||
fi
|
||||
|
||||
local SIZE=$(wc -c "${IMG}" | cut -d' ' -f1)
|
||||
# Remove read protection.
|
||||
sudo timeout -k 10 -s 9 "${FLAGS_timeout}" \
|
||||
${DFU_UTIL} -a 0 -s ${ADDR}:${SIZE}:force:unprotect -D "${IMG}"
|
||||
# Wait for mass-erase and reboot after unprotection.
|
||||
sleep 1
|
||||
# Actual image flashing.
|
||||
sudo timeout -k 10 -s 9 "${FLAGS_timeout}" \
|
||||
$DFU_UTIL -a 0 -s ${ADDR}:${SIZE} -D "${IMG}"
|
||||
}
|
||||
|
||||
flash_stm32_dfu
|
||||
28
extra/tigertool/make_pkg.sh
Executable file
28
extra/tigertool/make_pkg.sh
Executable file
@@ -0,0 +1,28 @@
|
||||
#!/bin/bash
|
||||
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
# Make sure we are in the correct dir.
|
||||
cd "$( dirname "${BASH_SOURCE[0]}" )" || exit
|
||||
|
||||
# Clean and previous cruft.
|
||||
rm -rf build
|
||||
|
||||
DEST=build/tigertool
|
||||
DATE=$(date +"%Y%m%d")
|
||||
|
||||
mkdir -p "${DEST}"
|
||||
cp ../usb_serial/console.py "${DEST}"
|
||||
cp ../../../../../chroot/usr/bin/dfu-util "${DEST}"
|
||||
cp flash_dfu.sh "${DEST}"
|
||||
cp tigertool.py "${DEST}"
|
||||
|
||||
cp -r ecusb "${DEST}"
|
||||
cp -r ../../../../../chroot/usr/lib64/python2.7/site-packages/usb "${DEST}"
|
||||
find "${DEST}" -name "*.py[co]" -delete
|
||||
cp -r ../usb_serial "${DEST}"
|
||||
|
||||
(cd build; tar -czf tigertool_${DATE}.tgz tigertool)
|
||||
|
||||
echo "Done packaging tigertool_${DATE}.tgz"
|
||||
133
extra/tigertool/tigertool.py
Executable file
133
extra/tigertool/tigertool.py
Executable file
@@ -0,0 +1,133 @@
|
||||
#!/usr/bin/python2
|
||||
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
"""Script to control tigertail USB-C Mux board."""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
|
||||
import ecusb.tiny_servo_common as c
|
||||
|
||||
STM_VIDPID = '18d1:5027'
|
||||
serialno = 'Uninitialized'
|
||||
|
||||
def do_mux(mux, pty):
|
||||
"""Set mux via ec console 'pty'.
|
||||
|
||||
Commands are:
|
||||
# > mux A
|
||||
# TYPE-C mux is A
|
||||
"""
|
||||
validmux = ['A', 'B', 'off']
|
||||
if mux not in validmux:
|
||||
c.log('Mux setting %s invalid, try one of %s' % (mux, validmux))
|
||||
return False
|
||||
|
||||
cmd = '\r\nmux %s\r\n' % mux
|
||||
regex = 'TYPE\-C mux is ([^\s\r\n]*)\r'
|
||||
|
||||
results = pty._issue_cmd_get_results(cmd, [regex])[0]
|
||||
result = results[1].strip().strip('\n\r')
|
||||
|
||||
if result != mux:
|
||||
c.log('Mux set to %s but saved as %s.' % (mux, result))
|
||||
return False
|
||||
c.log('Mux set to %s' % result)
|
||||
return True
|
||||
|
||||
def do_reboot(pty):
|
||||
"""Reboot via ec console pty
|
||||
|
||||
Command is: reboot.
|
||||
"""
|
||||
cmd = '\r\nreboot\r\n'
|
||||
regex = 'Rebooting'
|
||||
|
||||
try:
|
||||
results = pty._issue_cmd_get_results(cmd, [regex])[0]
|
||||
time.sleep(1)
|
||||
c.log(results)
|
||||
except Exception as e:
|
||||
c.log(e)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def do_sysjump(region, pty):
|
||||
"""Set region via ec console 'pty'.
|
||||
|
||||
Commands are:
|
||||
# > sysjump rw
|
||||
"""
|
||||
validregion = ['ro', 'rw']
|
||||
if region not in validregion:
|
||||
c.log('Region setting %s invalid, try one of %s' % (
|
||||
region, validregion))
|
||||
return False
|
||||
|
||||
cmd = '\r\nsysjump %s\r\n' % region
|
||||
try:
|
||||
pty._issue_cmd(cmd)
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
c.log(e)
|
||||
return False
|
||||
|
||||
c.log('Region requested %s' % region)
|
||||
return True
|
||||
|
||||
def get_parser():
|
||||
parser = argparse.ArgumentParser(
|
||||
description=__doc__)
|
||||
parser.add_argument('-s', '--serialno', type=str, default=None,
|
||||
help='serial number of board to use')
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
group.add_argument('--setserialno', type=str, default=None,
|
||||
help='serial number to set on the board.')
|
||||
group.add_argument('-m', '--mux', type=str, default=None,
|
||||
help='mux selection')
|
||||
group.add_argument('-r', '--sysjump', type=str, default=None,
|
||||
help='region selection')
|
||||
group.add_argument('--reboot', action='store_true',
|
||||
help='reboot tigertail')
|
||||
return parser
|
||||
|
||||
def main(argv):
|
||||
parser = get_parser()
|
||||
opts = parser.parse_args(argv)
|
||||
|
||||
result = True
|
||||
|
||||
# Let's make sure there's a tigertail
|
||||
# If nothing found in 5 seconds, fail.
|
||||
c.wait_for_usb(STM_VIDPID, 5.)
|
||||
|
||||
pty = c.setup_tinyservod(STM_VIDPID, 0, serialno=opts.serialno)
|
||||
|
||||
if opts.setserialno:
|
||||
try:
|
||||
c.do_serialno(opts.setserialno, pty)
|
||||
except Exception:
|
||||
result = False
|
||||
|
||||
elif opts.mux:
|
||||
result &= do_mux(opts.mux, pty)
|
||||
|
||||
elif opts.sysjump:
|
||||
result &= do_sysjump(opts.sysjump, pty)
|
||||
|
||||
elif opts.reboot:
|
||||
result &= do_reboot(pty)
|
||||
|
||||
if result:
|
||||
c.log('PASS')
|
||||
else:
|
||||
c.log('FAIL')
|
||||
exit(-1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main(sys.argv[1:]))
|
||||
Reference in New Issue
Block a user