diff --git a/util/ec3po/__init__.py b/util/ec3po/__init__.py new file mode 100755 index 0000000000..e69de29bb2 diff --git a/util/ec3po/console.py b/util/ec3po/console.py new file mode 100755 index 0000000000..de657b7ae0 --- /dev/null +++ b/util/ec3po/console.py @@ -0,0 +1,694 @@ +#!/usr/bin/python2 +# Copyright 2015 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""EC-3PO Interactive Console Interface + +console provides the console interface between the user and the interpreter. It +handles the presentation of the EC console including editing methods as well as +session-persistent command history. +""" +from __future__ import print_function +import argparse +from chromite.lib import cros_logging as logging +import multiprocessing +import os +import pty +import select +import sys + +import interpreter + + +PROMPT = '> ' +CONSOLE_INPUT_LINE_SIZE = 80 # Taken from the CONFIG_* with the same name. +CONSOLE_MAX_READ = 100 # Max bytes to read at a time from the user. + + +class EscState(object): + """Class which contains an enumeration for states of ESC sequences.""" + ESC_START = 1 + ESC_BRACKET = 2 + ESC_BRACKET_1 = 3 + ESC_BRACKET_3 = 4 + ESC_BRACKET_8 = 5 + + +class ControlKey(object): + """Class which contains codes for various control keys.""" + BACKSPACE = 0x08 + CTRL_A = 0x01 + CTRL_B = 0x02 + CTRL_D = 0x04 + CTRL_E = 0x05 + CTRL_F = 0x06 + CTRL_K = 0x0b + CTRL_N = 0xe + CTRL_P = 0x10 + CARRIAGE_RETURN = 0x0d + ESC = 0x1b + + +class MoveCursorError(Exception): + """Exception class for errors when moving the cursor.""" + pass + + +class Console(object): + """Class which provides the console interface between the EC and the user. + + This class essentially represents the console interface between the user and + the EC. It handles all of the console editing behaviour + + Attributes: + master_pty: File descriptor to the master side of the PTY. Used for driving + output to the user and receiving user input. + user_pty: A string representing the PTY name of the served console. + cmd_pipe: A multiprocessing.Connection object which represents the console + side of the command pipe. This must be a bidirectional pipe. Console + commands and responses utilize this pipe. + dbg_pipe: A multiprocessing.Connection object which represents the console's + read-only side of the debug pipe. This must be a unidirectional pipe + attached to the intepreter. EC debug messages use this pipe. + input_buffer: A string representing the current input command. + input_buffer_pos: An integer representing the current position in the buffer + to insert a char. + partial_cmd: A string representing the command entered on a line before + pressing the up arrow keys. + esc_state: An integer represeting the current state within an escape + sequence. + line_limit: An integer representing the maximum number of characters on a + line. + history: A list of strings containing the past entered console commands. + history_pos: An integer representing the current history buffer position. + This index is used to show previous commands. + prompt: A string representing the console prompt displayed to the user. + """ + + def __init__(self, master_pty, user_pty, cmd_pipe, dbg_pipe): + """Initalises a Console object with the provided arguments. + + Args: + master_pty: File descriptor to the master side of the PTY. Used for driving + output to the user and receiving user input. + user_pty: A string representing the PTY name of the served console. + cmd_pipe: A multiprocessing.Connection object which represents the console + side of the command pipe. This must be a bidirectional pipe. Console + commands and responses utilize this pipe. + dbg_pipe: A multiprocessing.Connection object which represents the console's + read-only side of the debug pipe. This must be a unidirectional pipe + attached to the intepreter. EC debug messages use this pipe. + """ + self.master_pty = master_pty + self.user_pty = user_pty + self.cmd_pipe = cmd_pipe + self.dbg_pipe = dbg_pipe + self.input_buffer = '' + self.input_buffer_pos = 0 + self.partial_cmd = '' + self.esc_state = 0 + self.line_limit = CONSOLE_INPUT_LINE_SIZE + self.history = [] + self.history_pos = 0 + self.prompt = PROMPT + + def __str__(self): + """Show internal state of Console object as a string.""" + string = [] + string.append('master_pty: %s' % self.master_pty) + string.append('user_pty: %s' % self.user_pty) + string.append('cmd_pipe: %s' % self.cmd_pipe) + string.append('dbg_pipe: %s' % self.dbg_pipe) + string.append('input_buffer: %s' % self.input_buffer) + string.append('input_buffer_pos: %d' % self.input_buffer_pos) + string.append('esc_state: %d' % self.esc_state) + string.append('line_limit: %d' % self.line_limit) + string.append('history: [\'' + '\', \''.join(self.history) + '\']') + string.append('history_pos: %d' % self.history_pos) + string.append('prompt: \'%s\'' % self.prompt) + string.append('partial_cmd: \'%s\''% self.partial_cmd) + return '\n'.join(string) + + def PrintHistory(self): + """Print the history of entered commands.""" + fd = self.master_pty + # Make it pretty by figuring out how wide to pad the numbers. + wide = (len(self.history) / 10) + 1 + for i in range(len(self.history)): + line = ' %*d %s\r\n' % (wide, i, self.history[i]) + os.write(fd, line) + + def ShowPreviousCommand(self): + """Shows the previous command from the history list.""" + # There's nothing to do if there's no history at all. + if not self.history: + logging.debug('No history to print.') + return + + # Don't do anything if there's no more history to show. + if self.history_pos == 0: + logging.debug('No more history to show.') + return + + logging.debug('current history position: %d.', self.history_pos) + + # Decrement the history buffer position. + self.history_pos -= 1 + logging.debug('new history position.: %d', self.history_pos) + + # Save the text entered on the console if any. + if self.history_pos == len(self.history)-1: + logging.debug('saving partial_cmd: \'%s\'', self.input_buffer) + self.partial_cmd = self.input_buffer + + # Backspace the line. + for _ in range(self.input_buffer_pos): + self.SendBackspace() + + # Print the last entry in the history buffer. + logging.debug('printing previous entry %d - %s', self.history_pos, + self.history[self.history_pos]) + fd = self.master_pty + prev_cmd = self.history[self.history_pos] + os.write(fd, prev_cmd) + # Update the input buffer. + self.input_buffer = prev_cmd + self.input_buffer_pos = len(prev_cmd) + + def ShowNextCommand(self): + """Shows the next command from the history list.""" + # Don't do anything if there's no history at all. + if not self.history: + logging.debug('History buffer is empty.') + return + + fd = self.master_pty + + logging.debug('current history position: %d', self.history_pos) + # Increment the history position. + self.history_pos += 1 + + # Restore the partial cmd. + if self.history_pos == len(self.history): + logging.debug('Restoring partial command of \'%s\'', self.partial_cmd) + # Backspace the line. + for _ in range(self.input_buffer_pos): + self.SendBackspace() + # Print the partially entered command if any. + os.write(fd, self.partial_cmd) + self.input_buffer = self.partial_cmd + self.input_buffer_pos = len(self.input_buffer) + # Now that we've printed it, clear the partial cmd storage. + self.partial_cmd = '' + # Reset history position. + self.history_pos = len(self.history) + return + + logging.debug('new history position: %d', self.history_pos) + if self.history_pos > len(self.history)-1: + logging.debug('No more history to show.') + self.history_pos -= 1 + logging.debug('Reset history position to %d', self.history_pos) + return + + # Backspace the line. + for _ in range(self.input_buffer_pos): + self.SendBackspace() + + # Print the newer entry from the history buffer. + logging.debug('printing next entry %d - %s', self.history_pos, + self.history[self.history_pos]) + next_cmd = self.history[self.history_pos] + os.write(fd, next_cmd) + # Update the input buffer. + self.input_buffer = next_cmd + self.input_buffer_pos = len(next_cmd) + logging.debug('new history position: %d.', self.history_pos) + + def SliceOutChar(self): + """Remove a char from the line and shift everything over 1 column.""" + fd = self.master_pty + # Remove the character at the input_buffer_pos by slicing it out. + self.input_buffer = self.input_buffer[0:self.input_buffer_pos] + \ + self.input_buffer[self.input_buffer_pos+1:] + # Write the rest of the line + moved_col = os.write(fd, self.input_buffer[self.input_buffer_pos:]) + # Write a space to clear out the last char + moved_col += os.write(fd, ' ') + # Update the input buffer position. + self.input_buffer_pos += moved_col + # Reset the cursor + self.MoveCursor('left', moved_col) + + def HandleEsc(self, byte): + """HandleEsc processes escape sequences. + + Args: + byte: An integer representing the current byte in the sequence. + """ + # We shouldn't be handling an escape sequence if we haven't seen one. + assert self.esc_state != 0 + + if self.esc_state is EscState.ESC_START: + logging.debug('ESC_START') + if byte == ord('['): + self.esc_state = EscState.ESC_BRACKET + return + + else: + logging.error('Unexpected sequence. %c' % byte) + self.esc_state = 0 + + elif self.esc_state is EscState.ESC_BRACKET: + logging.debug('ESC_BRACKET') + # Left Arrow key was pressed. + if byte == ord('D'): + logging.debug('Left arrow key pressed.') + self.MoveCursor('left', 1) + self.esc_state = 0 # Reset the state. + return + + # Right Arrow key. + elif byte == ord('C'): + logging.debug('Right arrow key pressed.') + self.MoveCursor('right', 1) + self.esc_state = 0 # Reset the state. + return + + # Up Arrow key. + elif byte == ord('A'): + logging.debug('Up arrow key pressed.') + self.ShowPreviousCommand() + # Reset the state. + self.esc_state = 0 # Reset the state. + return + + # Down Arrow key. + elif byte == ord('B'): + logging.debug('Down arrow key pressed.') + self.ShowNextCommand() + # Reset the state. + self.esc_state = 0 # Reset the state. + return + + # For some reason, minicom sends a 1 instead of 7. /shrug + # TODO(aaboagye): Figure out why this happens. + elif byte == ord('1') or byte == ord('7'): + self.esc_state = EscState.ESC_BRACKET_1 + + elif byte == ord('3'): + self.esc_state = EscState.ESC_BRACKET_3 + + elif byte == ord('8'): + self.esc_state = EscState.ESC_BRACKET_8 + + else: + logging.error(r'Bad or unhandled escape sequence. got ^[%c\(%d)' + % (chr(byte), byte)) + self.esc_state = 0 + return + + elif self.esc_state is EscState.ESC_BRACKET_1: + logging.debug('ESC_BRACKET_1') + # HOME key. + if byte == ord('~'): + logging.debug('Home key pressed.') + self.MoveCursor('left', self.input_buffer_pos) + self.esc_state = 0 # Reset the state. + logging.debug('ESC sequence complete.') + return + + elif self.esc_state is EscState.ESC_BRACKET_3: + logging.debug('ESC_BRACKET_3') + # DEL key. + if byte == ord('~'): + logging.debug('Delete key pressed.') + if self.input_buffer_pos != len(self.input_buffer): + self.SliceOutChar() + self.esc_state = 0 # Reset the state. + + elif self.esc_state is EscState.ESC_BRACKET_8: + logging.debug('ESC_BRACKET_8') + # END key. + if byte == ord('~'): + logging.debug('End key pressed.') + self.MoveCursor('right', + len(self.input_buffer) - self.input_buffer_pos) + self.esc_state = 0 # Reset the state. + logging.debug('ESC sequence complete.') + return + + else: + logging.error('Unexpected sequence. %c' % byte) + self.esc_state = 0 + + else: + logging.error('Unexpected sequence. %c' % byte) + self.esc_state = 0 + + def ProcessInput(self): + """Captures the input determines what actions to take.""" + # There's nothing to do if the input buffer is empty. + if len(self.input_buffer) == 0: + return + + # Don't store 2 consecutive identical commands in the history. + if (self.history and self.history[-1] != self.input_buffer + or not self.history): + self.history.append(self.input_buffer) + + # Split the command up by spaces. + line = self.input_buffer.split(' ') + logging.debug('cmd: %s' % (self.input_buffer)) + cmd = line[0].lower() + + # The 'history' command is a special case that we handle locally. + if cmd == 'history': + self.PrintHistory() + return + + # Send the command to the interpreter. + logging.debug('Sending command to interpreter.') + self.cmd_pipe.send(self.input_buffer) + + def HandleChar(self, byte): + """HandleChar does a certain action when it receives a character. + + Args: + byte: An integer representing the character received from the user. + """ + # Keep handling the ESC sequence if we're in the middle of it. + if self.esc_state != 0: + self.HandleEsc(byte) + return + + # When we're at the end of the line, we should only allow going backwards, + # backspace, carriage return, up, or down. The arrow keys are escape + # sequences, so we let the escape...escape. + if (self.input_buffer_pos >= self.line_limit and + byte not in [ControlKey.CTRL_B, ControlKey.ESC, ControlKey.BACKSPACE, + ControlKey.CTRL_A, ControlKey.CARRIAGE_RETURN, + ControlKey.CTRL_P, ControlKey.CTRL_N]): + return + + # If the input buffer is full we can't accept new chars. + buffer_full = len(self.input_buffer) >= self.line_limit + + fd = self.master_pty + + # Carriage_Return/Enter + if byte == ControlKey.CARRIAGE_RETURN: + logging.debug('Enter key pressed.') + # Put a carriage return/newline and the print the prompt. + os.write(fd, '\r\n') + + # TODO(aaboagye): When we control the printing of all output, print the + # prompt AFTER printing all the output. We can't do it yet because we + # don't know how much is coming from the EC. + + # Print the prompt. + os.write(fd, self.prompt) + # Process the input. + self.ProcessInput() + # Now, clear the buffer. + self.input_buffer = '' + self.input_buffer_pos = 0 + # Reset history buffer pos. + self.history_pos = len(self.history) + # Clear partial command. + self.partial_cmd = '' + + # Backspace + elif byte == ControlKey.BACKSPACE: + logging.debug('Backspace pressed.') + if self.input_buffer_pos > 0: + # Move left 1 column. + self.MoveCursor('left', 1) + # Remove the character at the input_buffer_pos by slicing it out. + self.SliceOutChar() + + logging.debug('input_buffer_pos: %d' % (self.input_buffer_pos)) + + # Ctrl+A. Move cursor to beginning of the line + elif byte == ControlKey.CTRL_A: + logging.debug('Control+A pressed.') + self.MoveCursor('left', self.input_buffer_pos) + + # Ctrl+B. Move cursor left 1 column. + elif byte == ControlKey.CTRL_B: + logging.debug('Control+B pressed.') + self.MoveCursor('left', 1) + + # Ctrl+D. Delete a character. + elif byte == ControlKey.CTRL_D: + logging.debug('Control+D pressed.') + if self.input_buffer_pos != len(self.input_buffer): + # Remove the character by slicing it out. + self.SliceOutChar() + + # Ctrl+E. Move cursor to end of the line. + elif byte == ControlKey.CTRL_E: + logging.debug('Control+E pressed.') + self.MoveCursor('right', + len(self.input_buffer) - self.input_buffer_pos) + + # Ctrl+F. Move cursor right 1 column. + elif byte == ControlKey.CTRL_F: + logging.debug('Control+F pressed.') + self.MoveCursor('right', 1) + + # Ctrl+K. Kill line. + elif byte == ControlKey.CTRL_K: + logging.debug('Control+K pressed.') + self.KillLine() + + # Ctrl+N. Next line. + elif byte == ControlKey.CTRL_N: + logging.debug('Control+N pressed.') + self.ShowNextCommand() + + # Ctrl+P. Previous line. + elif byte == ControlKey.CTRL_P: + logging.debug('Control+P pressed.') + self.ShowPreviousCommand() + + # ESC sequence + elif byte == ControlKey.ESC: + # Starting an ESC sequence + self.esc_state = EscState.ESC_START + + # Only print printable chars. + elif IsPrintable(byte): + # Drop the character if we're full. + if buffer_full: + logging.debug('Dropped char: %c(%d)', byte, byte) + return + # Print the character. + os.write(fd, chr(byte)) + # Print the rest of the line (if any). + extra_bytes_written = os.write(fd, + self.input_buffer[self.input_buffer_pos:]) + + # Recreate the input buffer. + self.input_buffer = (self.input_buffer[0:self.input_buffer_pos] + + ('%c' % byte) + + self.input_buffer[self.input_buffer_pos:]) + # Update the input buffer position. + self.input_buffer_pos += 1 + extra_bytes_written + + # Reset the cursor if we wrote any extra bytes. + if extra_bytes_written: + self.MoveCursor('left', extra_bytes_written) + + logging.debug('input_buffer_pos: %d' % (self.input_buffer_pos)) + + def MoveCursor(self, direction, count): + """MoveCursor moves the cursor left or right by count columns. + + Args: + direction: A string that should be either 'left' or 'right' representing + the direction to move the cursor on the console. + count: An integer representing how many columns the cursor should be + moved. + + Raises: + ValueError: If the direction is not equal to 'left' or 'right'. + """ + # If there's nothing to move, we're done. + if not count: + return + fd = self.master_pty + seq = '\033[' + str(count) + if direction == 'left': + # Bind the movement. + if count > self.input_buffer_pos: + count = self.input_buffer_pos + seq += 'D' + logging.debug('move cursor left %d', count) + self.input_buffer_pos -= count + + elif direction == 'right': + # Bind the movement. + if (count + self.input_buffer_pos) > len(self.input_buffer): + count = 0 + seq += 'C' + logging.debug('move cursor right %d', count) + self.input_buffer_pos += count + + else: + raise MoveCursorError(('The only valid directions are \'left\' and ' + '\'right\'')) + + logging.debug('input_buffer_pos: %d' % self.input_buffer_pos) + # Move the cursor. + if count != 0: + os.write(fd, seq) + + def KillLine(self): + """Kill the rest of the line based on the input buffer position.""" + # Killing the line is killing all the text to the right. + diff = len(self.input_buffer) - self.input_buffer_pos + logging.debug('diff: %d' % diff) + # Diff shouldn't be negative, but if it is for some reason, let's try to + # correct the cursor. + if diff < 0: + logging.warning('Resetting input buffer position to %d...', + len(self.input_buffer)) + self.MoveCursor('left', -diff) + return + if diff: + self.MoveCursor('right', diff) + for _ in range(diff): + self.SendBackspace() + self.input_buffer_pos -= diff + self.input_buffer = self.input_buffer[0:self.input_buffer_pos] + + def SendBackspace(self): + """Backspace a character on the console.""" + os.write(self.master_pty, '\033[1D \033[1D') + + +def IsPrintable(byte): + """Determines if a byte is printable. + + Args: + byte: An integer potentially representing a printable character. + + Returns: + A boolean indicating whether the byte is a printable character. + """ + return byte >= ord(' ') and byte <= ord('~') + + +def StartLoop(console): + """Starts the infinite loop of console processing. + + Args: + console: A Console object that has been properly initialzed. + """ + logging.info('EC Console is being served on %s.', console.user_pty) + logging.debug(console) + while True: + # Check to see if pipes or the console are ready for reading. + read_list = [console.master_pty, console.cmd_pipe, console.dbg_pipe] + ready_for_reading = select.select(read_list, [], [])[0] + + for obj in ready_for_reading: + if obj is console.master_pty: + logging.debug('Input from user') + # Convert to bytes so we can look for non-printable chars such as + # Ctrl+A, Ctrl+E, etc. + line = bytearray(os.read(console.master_pty, CONSOLE_MAX_READ)) + for i in line: + # Handle each character as it arrives. + console.HandleChar(i) + + elif obj is console.cmd_pipe: + data = console.cmd_pipe.recv() + # Write it to the user console. + logging.debug('|CMD|->\'%s\'', data) + os.write(console.master_pty, data) + + elif obj is console.dbg_pipe: + data = console.dbg_pipe.recv() + # Write it to the user console. + logging.debug('|DBG|->\'%s\'', data) + os.write(console.master_pty, data) + + +def main(): + """Kicks off the EC-3PO interactive console interface and interpreter. + + We create some pipes to communicate with an interpreter, instantiate an + interpreter, create a PTY pair, and begin serving the console interface. + """ + # Set up argument parser. + parser = argparse.ArgumentParser(description=('Start interactive EC console ' + 'and interpreter.')) + # TODO(aaboagye): Eventually get this from servod. + parser.add_argument('ec_uart_pty', + help=('The full PTY name that the EC UART' + ' is present on. eg: /dev/pts/12')) + parser.add_argument('--log-level', + default='info', + help=('info, debug, warning, error, or critical')) + + # Parse arguments. + args = parser.parse_args() + + # Can't do much without an EC to talk to. + if not args.ec_uart_pty: + parser.print_help() + sys.exit(1) + + # Set logging level. + args.log_level = args.log_level.lower() + if args.log_level == 'info': + log_level = logging.INFO + elif args.log_level == 'debug': + log_level = logging.DEBUG + elif args.log_level == 'warning': + log_level = logging.WARNING + elif args.log_level == 'error': + log_level = logging.ERROR + elif args.log_level == 'critical': + log_level = logging.CRITICAL + else: + print('Error: Invalid log level.') + parser.print_help() + sys.exit(1) + + # Start logging with a timestamp, module, and log level shown in each log + # entry. + logging.basicConfig(level=log_level, format=('%(asctime)s - %(module)s -' + ' %(levelname)s - %(message)s')) + + # Create some pipes to communicate between the interpreter and the console. + # The command pipe is bidirectional. + cmd_pipe_interactive, cmd_pipe_interp = multiprocessing.Pipe() + # The debug pipe is unidirectional from interpreter to console only. + dbg_pipe_interactive, dbg_pipe_interp = multiprocessing.Pipe(duplex=False) + + # Create an interpreter instance. + itpr = interpreter.Interpreter(args.ec_uart_pty, cmd_pipe_interp, + dbg_pipe_interp, log_level) + + # Spawn an interpreter process. + itpr_process = multiprocessing.Process(target=interpreter.StartLoop, + args=(itpr,)) + # Make sure to kill the interpreter when we terminate. + itpr_process.daemon = True + # Start the interpreter. + itpr_process.start() + + # Open a new pseudo-terminal pair + (master_pty, user_pty) = pty.openpty() + # Create a console. + console = Console(master_pty, os.ttyname(user_pty), cmd_pipe_interactive, + dbg_pipe_interactive) + # Start serving the console. + StartLoop(console) + + +if __name__ == '__main__': + main() diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py new file mode 100755 index 0000000000..7e9e4ab9f1 --- /dev/null +++ b/util/ec3po/console_unittest.py @@ -0,0 +1,1112 @@ +#!/usr/bin/python2 +# Copyright 2015 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""Unit tests for the EC-3PO Console interface.""" +from __future__ import print_function +import binascii +import mock +import multiprocessing +import tempfile +import unittest + +import console + +ESC_STRING = chr(console.ControlKey.ESC) + +class Keys(object): + """A class that contains the escape sequences for special keys.""" + LEFT_ARROW = [console.ControlKey.ESC, ord('['), ord('D')] + RIGHT_ARROW = [console.ControlKey.ESC, ord('['), ord('C')] + UP_ARROW = [console.ControlKey.ESC, ord('['), ord('A')] + DOWN_ARROW = [console.ControlKey.ESC, ord('['), ord('B')] + HOME = [console.ControlKey.ESC, ord('['), ord('1'), ord('~')] + END = [console.ControlKey.ESC, ord('['), ord('8'), ord('~')] + DEL = [console.ControlKey.ESC, ord('['), ord('3'), ord('~')] + +class OutputStream(object): + """A class that has methods which return common console output.""" + + @staticmethod + def MoveCursorLeft(count): + """Produces what would be printed to the console if the cursor moved left. + + Args: + count: An integer representing how many columns to move left. + + Returns: + string: A string which contains what would be printed to the console if + the cursor moved left. + """ + string = ESC_STRING + string += '[' + str(count) + 'D' + return string + + @staticmethod + def MoveCursorRight(count): + """Produces what would be printed to the console if the cursor moved right. + + Args: + count: An integer representing how many columns to move right. + + Returns: + string: A string which contains what would be printed to the console if + the cursor moved right. + """ + string = ESC_STRING + string += '[' + str(count) + 'C' + return string + +BACKSPACE_STRING = '' +# Move cursor left 1 column. +BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) +# Write a space. +BACKSPACE_STRING += ' ' +# Move cursor left 1 column. +BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) + +class TestConsoleEditingMethods(unittest.TestCase): + """Test case to verify all console editing methods.""" + + def setUp(self): + """Setup the test harness.""" + # Create a temp file and set both the master and slave PTYs to the file to + # create a loopback. + self.tempfile = tempfile.TemporaryFile() + + # Create some dummy pipes. These won't be used since we'll mock out sends + # to the interpreter. + dummy_pipe_end_0, dummy_pipe_end_1 = multiprocessing.Pipe() + self.console = console.Console(self.tempfile.fileno(), self.tempfile, + dummy_pipe_end_0, dummy_pipe_end_1) + + # Mock out sends to the interpreter. + multiprocessing.Pipe.send = mock.MagicMock() + + def StringToByteList(self, string): + """Converts a string to list of bytes. + + Args: + string: A literal string to turn into a list of bytes. + + Returns: + A list of integers representing the byte value of each character in the + string. + """ + return [ord(c) for c in string] + + def BadConsoleOutput(self, expected, got): + """Format the console output into readable text. + + Args: + expected: A list of bytes representing the expected output console + stream. + got: A list of byte representing the actual output console stream. + + Returns: + string: A formatted string which shows the expected console output stream + and the actual console output stream. + """ + esc_state = 0 + string = 'Incorrect console output stream.\n' + string += 'exp: |' + count = 0 + for char in expected: + if esc_state != 0: + if esc_state == console.EscState.ESC_START: + if char == '[': + esc_state = console.EscState.ESC_BRACKET + elif esc_state == console.EscState.ESC_BRACKET: + if char == 'D': + string += '[cursor left ' + str(count) + ' cols]' + esc_state = 0 + elif char == 'C': + string += '[cursor right ' + str(count) + ' cols]' + esc_state = 0 + else: + count = int(char) + # Print if it's printable. + elif console.IsPrintable(ord(char)): + string += char + else: + # It might be a sequence of some type. + if ord(char) == console.ControlKey.ESC: + # Need to look at the following sequence. + esc_state = console.EscState.ESC_START + else: + string += '{' + binascii.hexlify(char) + '}' + + string += '|\n\ngot: |' + for char in got: + if esc_state != 0: + if esc_state == console.EscState.ESC_START: + if char == '[': + esc_state = console.EscState.ESC_BRACKET + elif esc_state == console.EscState.ESC_BRACKET: + if char == 'D': + string += '[cursor left ' + str(count) + ' cols]' + esc_state = 0 + elif char == 'C': + string += '[cursor right ' + str(count) + ' cols]' + esc_state = 0 + else: + count = int(char) + # Print if it's printable. + elif console.IsPrintable(ord(char)): + string += char + else: + # It might be a sequence of some type. + if ord(char) == console.ControlKey.ESC: + # Need to look at the following sequence. + esc_state = console.EscState.ESC_START + else: + string += '{' + binascii.hexlify(char) + '}' + string += '|\n\n' + + # TODO(aaboagye): It would be nice to replace all those move left 1, ' ', + # move left 1, with backspace. + + return string + + def CheckConsoleOutput(self, exp_console_out): + """Verify what was sent out the console matches what we expect. + + Args: + exp_console_out: A string representing the console output stream. + """ + # Read what was sent out the console. + self.tempfile.seek(0) + console_out = self.tempfile.read() + + self.assertEqual(exp_console_out, + console_out, + (self.BadConsoleOutput(exp_console_out, console_out) + + str(self.console))) + + def CheckInputBuffer(self, exp_input_buffer): + """Verify that the input buffer contains what we expect. + + Args: + exp_input_buffer: A string containing the contents of the current input + buffer. + """ + self.assertEqual(exp_input_buffer, self.console.input_buffer, + ('input buffer does not match expected.\n' + 'expected: |' + exp_input_buffer + '|\n' + 'got: |' + self.console.input_buffer + '|\n' + + str(self.console))) + + def CheckInputBufferPosition(self, exp_pos): + """Verify the input buffer position. + + Args: + exp_pos: An integer representing the expected input buffer position. + """ + self.assertEqual(exp_pos, self.console.input_buffer_pos, + 'input buffer position is incorrect.\ngot: ' + + str(self.console.input_buffer_pos) + '\nexp: ' + + str(exp_pos) + '\n' + str(self.console)) + + def CheckHistoryBuffer(self, exp_history): + """Verify that the items in the history buffer are what we expect. + + Args: + exp_history: A list of strings representing the expected contents of the + history buffer. + """ + # First, check to see if the length is what we expect. + self.assertEqual(len(exp_history), len(self.console.history), + ('The number of items in the history is unexpected.\n' + 'exp: ' + str(len(exp_history)) + '\n' + 'got: ' + str(len(self.console.history)) + '\n' + 'internal state:\n' + str(self.console))) + + # Next, check the actual contents of the history buffer. + for i in range(len(exp_history)): + self.assertEqual(exp_history[i], self.console.history[i], + ('history buffer contents are incorrect.\n' + 'exp: ' + exp_history[i] + '\n' + 'got: ' + self.console.history[i] + '\n' + 'internal state:\n' + str(self.console))) + + def test_EnteringChars(self): + """Verify that characters are echoed onto the console.""" + test_str = 'abc' + input_stream = self.StringToByteList(test_str) + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # Check the input position. + exp_pos = len(test_str) + self.CheckInputBufferPosition(exp_pos) + + # Verify that the input buffer is correct. + expected_buffer = test_str + self.CheckInputBuffer(expected_buffer) + + # Check console output + exp_console_out = test_str + self.CheckConsoleOutput(exp_console_out) + + def test_EnteringDeletingMoreCharsThanEntered(self): + """Verify that we can press backspace more than we have entered chars.""" + test_str = 'spamspam' + input_stream = self.StringToByteList(test_str) + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # Now backspace 1 more than what we sent. + input_stream = [] + for _ in range(len(test_str) + 1): + input_stream.append(console.ControlKey.BACKSPACE) + + # Send that sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, verify that input buffer position is 0. + self.CheckInputBufferPosition(0) + + # Next, examine the output stream for the correct sequence. + exp_console_out = test_str + for _ in range(len(test_str)): + exp_console_out += BACKSPACE_STRING + + # Now, verify that we got what we expected. + self.CheckConsoleOutput(exp_console_out) + + def test_EnteringMoreThanCharLimit(self): + """Verify that we drop characters when the line is too long.""" + test_str = self.console.line_limit * 'o' # All allowed. + test_str += 5 * 'x' # All should be dropped. + input_stream = self.StringToByteList(test_str) + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, we expect that input buffer position should be equal to the line + # limit. + exp_pos = self.console.line_limit + self.CheckInputBufferPosition(exp_pos) + + # The input buffer should only hold until the line limit. + exp_buffer = test_str[0:self.console.line_limit] + self.CheckInputBuffer(exp_buffer) + + # Lastly, check that the extra characters are not printed. + exp_console_out = exp_buffer + self.CheckConsoleOutput(exp_console_out) + + def test_ValidKeysOnLongLine(self): + """Verify that we can still press valid keys if the line is too long.""" + # Fill the line. + test_str = self.console.line_limit * 'o' + exp_console_out = test_str + # Try to fill it even more; these should all be dropped. + test_str += 5 * 'x' + input_stream = self.StringToByteList(test_str) + + # We should be able to press the following keys: + # - Backspace + # - Arrow Keys/CTRL+B/CTRL+F/CTRL+P/CTRL+N + # - Delete + # - Home/CTRL+A + # - End/CTRL+E + # - Carriage Return + + # Backspace 1 character + input_stream.append(console.ControlKey.BACKSPACE) + exp_console_out += BACKSPACE_STRING + # Refill the line. + input_stream.extend(self.StringToByteList('o')) + exp_console_out += 'o' + + # Left arrow key. + input_stream.extend(Keys.LEFT_ARROW) + exp_console_out += OutputStream.MoveCursorLeft(1) + + # Right arrow key. + input_stream.extend(Keys.RIGHT_ARROW) + exp_console_out += OutputStream.MoveCursorRight(1) + + # CTRL+B + input_stream.append(console.ControlKey.CTRL_B) + exp_console_out += OutputStream.MoveCursorLeft(1) + + # CTRL+F + input_stream.append(console.ControlKey.CTRL_F) + exp_console_out += OutputStream.MoveCursorRight(1) + + # Let's press enter now so we can test up and down. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + exp_console_out += '\r\n' + self.console.prompt + + # Up arrow key. + input_stream.extend(Keys.UP_ARROW) + exp_console_out += test_str[:self.console.line_limit] + + # Down arrow key. + input_stream.extend(Keys.DOWN_ARROW) + # Since the line was blank, we have to backspace the entire line. + exp_console_out += self.console.line_limit * BACKSPACE_STRING + + # CTRL+P + input_stream.append(console.ControlKey.CTRL_P) + exp_console_out += test_str[:self.console.line_limit] + + # CTRL+N + input_stream.append(console.ControlKey.CTRL_N) + # Since the line was blank, we have to backspace the entire line. + exp_console_out += self.console.line_limit * BACKSPACE_STRING + + # Press the Up arrow key to reprint the long line. + input_stream.extend(Keys.UP_ARROW) + exp_console_out += test_str[:self.console.line_limit] + + # Press the Home key to jump to the beginning of the line. + input_stream.extend(Keys.HOME) + exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) + + # Press the End key to jump to the end of the line. + input_stream.extend(Keys.END) + exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) + + # Press CTRL+A to jump to the beginning of the line. + input_stream.append(console.ControlKey.CTRL_A) + exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) + + # Press CTRL+E to jump to the end of the line. + input_stream.extend(Keys.END) + exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) + + # Move left one column so we can delete a character. + input_stream.extend(Keys.LEFT_ARROW) + exp_console_out += OutputStream.MoveCursorLeft(1) + + # Press the delete key. + input_stream.extend(Keys.DEL) + # This should look like a space, and then move cursor left 1 column since + # we're at the end of line. + exp_console_out += ' ' + OutputStream.MoveCursorLeft(1) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify everything happened correctly. + self.CheckConsoleOutput(exp_console_out) + + def test_BackspaceOnEmptyLine(self): + """Verify that we can backspace on an empty line with no bad effects.""" + # Send a single backspace. + test_str = [console.ControlKey.BACKSPACE] + + # Send the characters in. + for byte in test_str: + self.console.HandleChar(byte) + + # Check the input position. + exp_pos = 0 + self.CheckInputBufferPosition(exp_pos) + + # Check that buffer is empty. + exp_input_buffer = '' + self.CheckInputBuffer(exp_input_buffer) + + # Check that the console output is empty. + exp_console_out = '' + self.CheckConsoleOutput(exp_console_out) + + def test_BackspaceWithinLine(self): + """Verify that we shift the chars over when backspacing within a line.""" + # Misspell 'help' + test_str = 'heelp' + input_stream = self.StringToByteList(test_str) + # Use the arrow key to go back to fix it. + # Move cursor left 1 column. + input_stream.extend(2*Keys.LEFT_ARROW) + # Backspace once to remove the extra 'e'. + input_stream.append(console.ControlKey.BACKSPACE) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify the input buffer + exp_input_buffer = 'help' + self.CheckInputBuffer(exp_input_buffer) + + # Verify the input buffer position. It should be at 2 (cursor over the 'l') + self.CheckInputBufferPosition(2) + + # We expect the console output to be the test string, with two moves to the + # left, another move left, and then the rest of the line followed by a + # space. + exp_console_out = test_str + exp_console_out += 2 * OutputStream.MoveCursorLeft(1) + + # Move cursor left 1 column. + exp_console_out += OutputStream.MoveCursorLeft(1) + # Rest of the line and a space. (test_str in this case) + exp_console_out += 'lp ' + # Reset the cursor 2 + 1 to the left. + exp_console_out += OutputStream.MoveCursorLeft(3) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_JumpToBeginningOfLineViaCtrlA(self): + """Verify that we can jump to the beginning of a line with Ctrl+A.""" + # Enter some chars and press CTRL+A + test_str = 'abc' + input_stream = self.StringToByteList(test_str) + [console.ControlKey.CTRL_A] + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect to see our test string followed by a move cursor left. + exp_console_out = test_str + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + + # Check to see what whas printed on the console. + self.CheckConsoleOutput(exp_console_out) + + # Check that the input buffer position is now 0. + self.CheckInputBufferPosition(0) + + # Check input buffer still contains our test string. + self.CheckInputBuffer(test_str) + + def test_JumpToBeginningOfLineViaHomeKey(self): + """Jump to beginning of line via HOME key.""" + test_str = 'version' + input_stream = self.StringToByteList(test_str) + input_stream.extend(Keys.HOME) + + # Send out the stream. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, verify that input buffer position is now 0. + self.CheckInputBufferPosition(0) + + # Next, verify that the input buffer did not change. + self.CheckInputBuffer(test_str) + + # Lastly, check that the cursor moved correctly. + exp_console_out = test_str + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + self.CheckConsoleOutput(exp_console_out) + + def test_JumpToEndOfLineViaEndKey(self): + """Jump to the end of the line using the END key.""" + test_str = 'version' + input_stream = self.StringToByteList(test_str) + input_stream += [console.ControlKey.CTRL_A] + # Now, jump to the end of the line. + input_stream.extend(Keys.END) + + # Send out the stream. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is correct. This should be at the + # end of the test string. + self.CheckInputBufferPosition(len(test_str)) + + # The expected output should be the test string, followed by a jump to the + # beginning of the line, and lastly a jump to the end of the line. + exp_console_out = test_str + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + # Now the jump back to the end of the line. + exp_console_out += OutputStream.MoveCursorRight(len(test_str)) + + # Verify console output stream. + self.CheckConsoleOutput(exp_console_out) + + def test_JumpToEndOfLineViaCtrlE(self): + """Enter some chars and then try to jump to the end. (Should be a no-op)""" + test_str = 'sysinfo' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CTRL_E) + + # Send out the stream + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position isn't any further than we expect. + # At this point, the position should be at the end of the test string. + self.CheckInputBufferPosition(len(test_str)) + + # Now, let's try to jump to the beginning and then jump back to the end. + input_stream = [console.ControlKey.CTRL_A, console.ControlKey.CTRL_E] + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Perform the same verification. + self.CheckInputBufferPosition(len(test_str)) + + # Lastly try to jump again, beyond the end. + input_stream = [console.ControlKey.CTRL_E] + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Perform the same verification. + self.CheckInputBufferPosition(len(test_str)) + + # We expect to see the test string, a jump to the begining of the line, and + # one jump to the end of the line. + exp_console_out = test_str + # Jump to beginning. + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + # Jump back to end. + exp_console_out += OutputStream.MoveCursorRight(len(test_str)) + + # Verify the console output. + self.CheckConsoleOutput(exp_console_out) + + def test_MoveLeftWithArrowKey(self): + """Move cursor left one column with arrow key.""" + test_str = 'tastyspam' + input_stream = self.StringToByteList(test_str) + input_stream.extend(Keys.LEFT_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1 less than the length. + self.CheckInputBufferPosition(len(test_str) - 1) + + # Also, verify that the input buffer is not modified. + self.CheckInputBuffer(test_str) + + # We expect the test string, followed by a one column move left. + exp_console_out = test_str + OutputStream.MoveCursorLeft(1) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_MoveLeftWithCtrlB(self): + """Move cursor back one column with Ctrl+B.""" + test_str = 'tastyspam' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CTRL_B) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1 less than the length. + self.CheckInputBufferPosition(len(test_str) - 1) + + # Also, verify that the input buffer is not modified. + self.CheckInputBuffer(test_str) + + # We expect the test string, followed by a one column move left. + exp_console_out = test_str + OutputStream.MoveCursorLeft(1) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_MoveRightWithArrowKey(self): + """Move cursor one column to the right with the arrow key.""" + test_str = 'version' + input_stream = self.StringToByteList(test_str) + # Jump to beginning of line. + input_stream.append(console.ControlKey.CTRL_A) + # Press right arrow key. + input_stream.extend(Keys.RIGHT_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1. + self.CheckInputBufferPosition(1) + + # Also, verify that the input buffer is not modified. + self.CheckInputBuffer(test_str) + + # We expect the test string, followed by a jump to the beginning of the + # line, and finally a move right 1. + exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) + + # A move right 1 column. + exp_console_out += OutputStream.MoveCursorRight(1) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_MoveRightWithCtrlF(self): + """Move cursor forward one column with Ctrl+F.""" + test_str = 'panicinfo' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CTRL_A) + # Now, move right one column. + input_stream.append(console.ControlKey.CTRL_F) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1. + self.CheckInputBufferPosition(1) + + # Also, verify that the input buffer is not modified. + self.CheckInputBuffer(test_str) + + # We expect the test string, followed by a jump to the beginning of the + # line, and finally a move right 1. + exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) + + # A move right 1 column. + exp_console_out += OutputStream.MoveCursorRight(1) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_ImpossibleMoveLeftWithArrowKey(self): + """Verify that we can't move left at the beginning of the line.""" + # We shouldn't be able to move left if we're at the beginning of the line. + input_stream = Keys.LEFT_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Nothing should have been output. + exp_console_output = '' + self.CheckConsoleOutput(exp_console_output) + + # The input buffer position should still be 0. + self.CheckInputBufferPosition(0) + + # The input buffer itself should be empty. + self.CheckInputBuffer('') + + def test_ImpossibleMoveRightWithArrowKey(self): + """Verify that we can't move right at the end of the line.""" + # We shouldn't be able to move right if we're at the end of the line. + input_stream = Keys.RIGHT_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Nothing should have been output. + exp_console_output = '' + self.CheckConsoleOutput(exp_console_output) + + # The input buffer position should still be 0. + self.CheckInputBufferPosition(0) + + # The input buffer itself should be empty. + self.CheckInputBuffer('') + + def test_KillEntireLine(self): + """Verify that we can kill an entire line with Ctrl+K.""" + test_str = 'accelinfo on' + input_stream = self.StringToByteList(test_str) + # Jump to beginning of line and then kill it with Ctrl+K. + input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K]) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, we expect that the input buffer is empty. + self.CheckInputBuffer('') + + # The buffer position should be 0. + self.CheckInputBufferPosition(0) + + # What we expect to see on the console stream should be the following. The + # test string, a jump to the beginning of the line, then jump back to the + # end of the line and replace the line with spaces. + exp_console_out = test_str + # Jump to beginning of line. + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + # Jump to end of line. + exp_console_out += OutputStream.MoveCursorRight(len(test_str)) + # Replace line with spaces, which looks like backspaces. + for _ in range(len(test_str)): + exp_console_out += BACKSPACE_STRING + + # Verify the console output. + self.CheckConsoleOutput(exp_console_out) + + def test_KillPartialLine(self): + """Verify that we can kill a portion of a line.""" + test_str = 'accelread 0 1' + input_stream = self.StringToByteList(test_str) + len_to_kill = 5 + for _ in range(len_to_kill): + # Move cursor left + input_stream.extend(Keys.LEFT_ARROW) + # Now kill + input_stream.append(console.ControlKey.CTRL_K) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, check that the input buffer was truncated. + exp_input_buffer = test_str[:-len_to_kill] + self.CheckInputBuffer(exp_input_buffer) + + # Verify the input buffer position. + self.CheckInputBufferPosition(len(test_str) - len_to_kill) + + # The console output stream that we expect is the test string followed by a + # move left of len_to_kill, then a jump to the end of the line and backspace + # of len_to_kill. + exp_console_out = test_str + for _ in range(len_to_kill): + # Move left 1 column. + exp_console_out += OutputStream.MoveCursorLeft(1) + # Then jump to the end of the line + exp_console_out += OutputStream.MoveCursorRight(len_to_kill) + # Backspace of len_to_kill + for _ in range(len_to_kill): + exp_console_out += BACKSPACE_STRING + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + def test_InsertingCharacters(self): + """Verify that we can insert charcters within the line.""" + test_str = 'accel 0 1' # Here we forgot the 'read' part in 'accelread' + input_stream = self.StringToByteList(test_str) + # We need to move over to the 'l' and add read. + insertion_point = test_str.find('l') + 1 + for i in range(len(test_str) - insertion_point): + # Move cursor left. + input_stream.extend(Keys.LEFT_ARROW) + # Now, add in 'read' + added_str = 'read' + input_stream.extend(self.StringToByteList(added_str)) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, verify that the input buffer is correct. + exp_input_buffer = test_str[:insertion_point] + added_str + exp_input_buffer += test_str[insertion_point:] + self.CheckInputBuffer(exp_input_buffer) + + # Verify that the input buffer position is correct. + exp_input_buffer_pos = insertion_point + len(added_str) + self.CheckInputBufferPosition(exp_input_buffer_pos) + + # The console output stream that we expect is the test string, followed by + # move cursor left until the 'l' was found, the added test string while + # shifting characters around. + exp_console_out = test_str + for i in range(len(test_str) - insertion_point): + # Move cursor left. + exp_console_out += OutputStream.MoveCursorLeft(1) + + # Now for each character, write the rest of the line will be shifted to the + # right one column. + for i in range(len(added_str)): + # Printed character. + exp_console_out += added_str[i] + # The rest of the line + exp_console_out += test_str[insertion_point:] + # Reset the cursor back left + reset_dist = len(test_str[insertion_point:]) + exp_console_out += OutputStream.MoveCursorLeft(reset_dist) + + # Verify the console output. + self.CheckConsoleOutput(exp_console_out) + + def test_StoreCommandHistory(self): + """Verify that entered commands are stored in the history.""" + test_commands = [] + test_commands.append('help') + test_commands.append('version') + test_commands.append('accelread 0 1') + input_stream = [] + for c in test_commands: + input_stream.extend(self.StringToByteList(c)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect to have the test commands in the history buffer. + exp_history_buf = test_commands + self.CheckHistoryBuffer(exp_history_buf) + + def test_CycleUpThruCommandHistory(self): + """Verify that the UP arrow key will print itmes in the history buffer.""" + # Enter some commands. + test_commands = ['version', 'accelrange 0', 'battery', 'gettime'] + input_stream = [] + for command in test_commands: + input_stream.extend(self.StringToByteList(command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Now, hit the UP arrow key to print the previous entries. + for i in range(len(test_commands)): + input_stream.extend(Keys.UP_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be test commands with prompts printed in + # between, followed by line kills with the previous test commands printed. + exp_console_out = '' + for i in range(len(test_commands)): + exp_console_out += test_commands[i] + '\r\n' + self.console.prompt + + # When we press up, the line should be cleared and print the previous buffer + # entry. + for i in range(len(test_commands)-1, 0, -1): + exp_console_out += test_commands[i] + # Backspace to the beginning. + for i in range(len(test_commands[i])): + exp_console_out += BACKSPACE_STRING + + # The last command should just be printed out with no backspacing. + exp_console_out += test_commands[0] + + # Now, verify. + self.CheckConsoleOutput(exp_console_out) + + def test_UpArrowOnEmptyHistory(self): + """Ensure nothing happens if the history is empty.""" + # Press the up arrow key twice. + input_stream = 2 * Keys.UP_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect nothing to have happened. + exp_console_out = '' + exp_input_buffer = '' + exp_input_buffer_pos = 0 + exp_history_buf = [] + + # Verify. + self.CheckConsoleOutput(exp_console_out) + self.CheckInputBufferPosition(exp_input_buffer_pos) + self.CheckInputBuffer(exp_input_buffer) + self.CheckHistoryBuffer(exp_history_buf) + + def test_UpArrowDoesNotGoOutOfBounds(self): + """Verify that pressing the up arrow many times won't go out of bounds.""" + # Enter one command. + test_str = 'help version' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + # Then press the up arrow key twice. + input_stream.extend(2 * Keys.UP_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the history buffer is correct. + exp_history_buf = [test_str] + self.CheckHistoryBuffer(exp_history_buf) + + # We expect that the console output should only contain our entered command, + # a new prompt, and then our command aggain. + exp_console_out = test_str + '\r\n' + self.console.prompt + # Pressing up should reprint the command we entered. + exp_console_out += test_str + + # Verify. + self.CheckConsoleOutput(exp_console_out) + + def test_CycleDownThruCommandHistory(self): + """Verify that we can select entries by hitting the down arrow.""" + # Enter at least 4 commands. + test_commands = ['version', 'accelrange 0', 'battery', 'gettime'] + input_stream = [] + for command in test_commands: + input_stream.extend(self.StringToByteList(command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Now, hit the UP arrow key twice to print the previous two entries. + for i in range(2): + input_stream.extend(Keys.UP_ARROW) + + # Now, hit the DOWN arrow key twice to print the newer entries. + input_stream.extend(2*Keys.DOWN_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be commands that we entered, followed by + # prompts, then followed by our last two commands in reverse. Then, we + # should see the last entry in the list, followed by the saved partial cmd + # of a blank line. + exp_console_out = '' + for i in range(len(test_commands)): + exp_console_out += test_commands[i] + '\r\n' + self.console.prompt + + # When we press up, the line should be cleared and print the previous buffer + # entry. + for i in range(len(test_commands)-1, 1, -1): + exp_console_out += test_commands[i] + # Backspace to the beginning. + for i in range(len(test_commands[i])): + exp_console_out += BACKSPACE_STRING + + # When we press down, it should have cleared the last command (which we + # covered with the previous for loop), and then prints the next command. + exp_console_out += test_commands[3] + for i in range(len(test_commands[3])): + exp_console_out += BACKSPACE_STRING + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + # Verify input buffer. + exp_input_buffer = '' # Empty because our partial command was empty. + exp_input_buffer_pos = len(exp_input_buffer) + self.CheckInputBuffer(exp_input_buffer) + self.CheckInputBufferPosition(exp_input_buffer_pos) + + def test_SavingPartialCommandWhenNavigatingHistory(self): + """Verify that partial commands are saved when navigating history.""" + # Enter a command. + test_str = 'accelinfo' + input_stream = self.StringToByteList(test_str) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Enter a partial command. + partial_cmd = 'ver' + input_stream.extend(self.StringToByteList(partial_cmd)) + + # Hit the UP arrow key. + input_stream.extend(Keys.UP_ARROW) + # Then, the DOWN arrow key. + input_stream.extend(Keys.DOWN_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be the command we entered, a prompt, the + # partial command, clearing of the partial command, the command entered, + # clearing of the command entered, and then the partial command. + exp_console_out = test_str + '\r\n' + self.console.prompt + exp_console_out += partial_cmd + for _ in range(len(partial_cmd)): + exp_console_out += BACKSPACE_STRING + exp_console_out += test_str + for _ in range(len(test_str)): + exp_console_out += BACKSPACE_STRING + exp_console_out += partial_cmd + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + # Verify input buffer. + exp_input_buffer = partial_cmd + exp_input_buffer_pos = len(exp_input_buffer) + self.CheckInputBuffer(exp_input_buffer) + self.CheckInputBufferPosition(exp_input_buffer_pos) + + def test_DownArrowOnEmptyHistory(self): + """Ensure nothing happens if the history is empty.""" + # Then press the up down arrow twice. + input_stream = 2 * Keys.DOWN_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect nothing to have happened. + exp_console_out = '' + exp_input_buffer = '' + exp_input_buffer_pos = 0 + exp_history_buf = [] + + # Verify. + self.CheckConsoleOutput(exp_console_out) + self.CheckInputBufferPosition(exp_input_buffer_pos) + self.CheckInputBuffer(exp_input_buffer) + self.CheckHistoryBuffer(exp_history_buf) + + def test_DeleteCharsUsingDELKey(self): + """Verify that we can delete characters using the DEL key.""" + test_str = 'version' + input_stream = self.StringToByteList(test_str) + + # Hit the left arrow key 2 times. + input_stream.extend(2 * Keys.LEFT_ARROW) + + # Press the DEL key. + input_stream.extend(Keys.DEL) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be the command we entered, 2 individual cursor + # moves to the left, and then removing a char and shifting everything to the + # left one column. + exp_console_out = test_str + exp_console_out += 2 * OutputStream.MoveCursorLeft(1) + + # Remove the char by shifting everything to the left one, slicing out the + # remove char. + exp_console_out += test_str[-1:] + ' ' + + # Reset the cursor by moving back 2 columns because of the 'n' and space. + exp_console_out += OutputStream.MoveCursorLeft(2) + + # Verify console output. + self.CheckConsoleOutput(exp_console_out) + + # Verify input buffer. The input buffer should have the char sliced out and + # be positioned where the char was removed. + exp_input_buffer = test_str[:-2] + test_str[-1:] + exp_input_buffer_pos = len(exp_input_buffer) - 1 + self.CheckInputBuffer(exp_input_buffer) + self.CheckInputBufferPosition(exp_input_buffer_pos) + + def test_RepeatedCommandInHistory(self): + """Verify that we don't store 2 consecutive identical commands in history""" + # Enter a few commands. + test_commands = ['version', 'accelrange 0', 'battery', 'gettime'] + # Repeat the last command. + test_commands.append(test_commands[len(test_commands)-1]) + + input_stream = [] + for command in test_commands: + input_stream.extend(self.StringToByteList(command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the history buffer is correct. The last command, since + # it was repeated, should not have been added to the history. + exp_history_buf = test_commands[0:len(test_commands)-1] + self.CheckHistoryBuffer(exp_history_buf) + + +if __name__ == '__main__': + unittest.main() diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py new file mode 100755 index 0000000000..4c31037584 --- /dev/null +++ b/util/ec3po/interpreter.py @@ -0,0 +1,259 @@ +#!/usr/bin/python2 +# Copyright 2015 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""EC-3PO EC Interpreter + +interpreter provides the interpretation layer between the EC UART and the user. +It recives commands through its command pipe, formats the commands for the EC, +and sends the command to the EC. It also presents data from the EC to either be +displayed via the interactive console interface, or some other consumer. It +additionally supports automatic command retrying if the EC drops a character in +a command. +""" +from __future__ import print_function +from chromite.lib import cros_logging as logging +import os +import Queue +import select + + +COMMAND_RETRIES = 3 # Number of attempts to retry a command. +EC_MAX_READ = 1024 # Max bytes to read at a time from the EC. + + +class Interpreter(object): + """Class which provides the interpretation layer between the EC and user. + + This class essentially performs all of the intepretation for the EC and the + user. It handles all of the automatic command retrying as well as the + formation of commands. + + Attributes: + ec_uart_pty: A string representing the EC UART to connect to. + cmd_pipe: A multiprocessing.Connection object which represents the + Interpreter side of the command pipe. This must be a bidirectional pipe. + Commands and responses will utilize this pipe. + dbg_pipe: A multiprocessing.Connection object which represents the + Interpreter side of the debug pipe. This must be a unidirectional pipe + with write capabilities. EC debug output will utilize this pipe. + cmd_retries: An integer representing the number of attempts the console + should retry commands if it receives an error. + log_level: An integer representing the numeric value of the log level. + inputs: A list of objects that the intpreter selects for reading. + Initially, these are the EC UART and the command pipe. + outputs: A list of objects that the interpreter selects for writing. + ec_cmd_queue: A FIFO queue used for sending commands down to the EC UART. + cmd_in_progress: A string that represents the current command sent to the + EC that is pending reception verification. + """ + def __init__(self, ec_uart_pty, cmd_pipe, dbg_pipe, log_level=logging.INFO): + """Intializes an Interpreter object with the provided args. + + Args: + ec_uart_pty: A string representing the EC UART to connect to. + cmd_pipe: A multiprocessing.Connection object which represents the + Interpreter side of the command pipe. This must be a bidirectional + pipe. Commands and responses will utilize this pipe. + dbg_pipe: A multiprocessing.Connection object which represents the + Interpreter side of the debug pipe. This must be a unidirectional pipe + with write capabilities. EC debug output will utilize this pipe. + cmd_retries: An integer representing the number of attempts the console + should retry commands if it receives an error. + log_level: An optional integer representing the numeric value of the log + level. By default, the log level will be logging.INFO (20). + """ + self.ec_uart_pty = open(ec_uart_pty, 'a+') + self.cmd_pipe = cmd_pipe + self.dbg_pipe = dbg_pipe + self.cmd_retries = COMMAND_RETRIES + self.log_level = log_level + self.inputs = [self.ec_uart_pty, self.cmd_pipe] + self.outputs = [] + self.ec_cmd_queue = Queue.Queue() + self.cmd_in_progress = '' + + def EnqueueCmd(self, packed_cmd): + """Enqueue a packed console command to be sent to the EC UART. + + Args: + packed_cmd: A string which contains the packed command to be sent. + """ + # Enqueue a packed command to be sent to the EC. + self.ec_cmd_queue.put(packed_cmd) + logging.debug('Commands now in queue: %d', self.ec_cmd_queue.qsize()) + # Add the EC UART as an output to be serviced. + self.outputs.append(self.ec_uart_pty) + + def PackCommand(self, raw_cmd): + r"""Packs a command for use with error checking. + + For error checking, we pack console commands in a particular format. The + format is as follows: + + &&[x][x][x][x]&{cmd}\n\n + ^ ^ ^^ ^^ ^ ^-- 2 newlines. + | | || || |-- the raw console command. + | | || ||-- 1 ampersand. + | | ||____|--- 2 hex digits representing the CRC8 of cmd. + | |____|-- 2 hex digits reprsenting the length of cmd. + |-- 2 ampersands + + Args: + raw_cmd: A pre-packed string which contains the raw command. + + Returns: + A string which contains the packed command. + """ + # The command format is as follows. + # &&[x][x][x][x]&{cmd}\n\n + packed_cmd = [] + packed_cmd.append('&&') + # The first pair of hex digits are the length of the command. + packed_cmd.append('%02x' % len(raw_cmd)) + # Then the CRC8 of cmd. + packed_cmd.append('%02x' % Crc8(raw_cmd)) + packed_cmd.append('&') + # Now, the raw command followed by 2 newlines. + packed_cmd.append(raw_cmd) + packed_cmd.append('\n\n') + return ''.join(packed_cmd) + + def ProcessCommand(self, command): + """Captures the input determines what actions to take. + + Args: + command: A string representing the command sent by the user. + """ + command = command.strip() + # There's nothing to do if the command is empty. + if len(command) == 0: + return + + # All other commands need to be packed first before they go to the EC. + packed_cmd = self.PackCommand(command) + logging.debug('packed cmd: ' + packed_cmd) + self.EnqueueCmd(packed_cmd) + # TODO(aaboagye): Make a dict of commands and keys and eventually, handle + # partial matching based on unique prefixes. + + def CheckECResponse(self): + """Checks the response from the EC for any errors.""" + # An invalid response is at most 4 bytes. + data = os.read(self.ec_uart_pty.fileno(), 4) + if '&E' not in data: + # No error received. Clear the command in progress. + self.cmd_in_progress = '' + # Reset the retry count. + self.cmd_retries = COMMAND_RETRIES + # Forward the data to the user. + self.dbg_pipe.send(data) + elif self.cmd_retries > 0: + # The EC encountered an error. We'll have to retry again. + logging.warning('EC replied with error. Retrying.') + self.cmd_retries -= 1 + logging.warning('Retries remaining: %d', self.cmd_retries) + # Add the EC UART to the writers again. + self.outputs.append(self.ec_uart_pty) + else: + # We're out of retries, so just give up. + logging.error('Command failed. No retries left.') + # Clear the command in progress. + self.cmd_in_progress = '' + # Reset the retry count. + self.cmd_retries = COMMAND_RETRIES + + def SendCmdToEC(self): + """Sends a command to the EC.""" + # If we're retrying a command, just try to send it again. + if self.cmd_retries < COMMAND_RETRIES: + cmd = self.cmd_in_progress + else: + # If we're not retrying, we should not be writing to the EC if we have no + # items in our command queue. + assert not self.ec_cmd_queue.empty() + # Get the command to send. + cmd = self.ec_cmd_queue.get() + + # Send the command. + logging.debug('Sending command to EC.') + self.ec_uart_pty.write(cmd) + self.ec_uart_pty.flush() + + # Now, that we've sent the command we will need to make sure the EC + # received it without an error. Store the current command as in + # progress. We will clear this if the EC responds with a non-error. + self.cmd_in_progress = cmd + # Remove the EC UART from the writers while we wait for a response. + self.outputs.remove(self.ec_uart_pty) + + +def Crc8(data): + """Calculates the CRC8 of data. + + The generator polynomial used is: x^8 + x^2 + x + 1. + This is the same implementation that is used in the EC. + + Args: + data: A string of data that we wish to calculate the CRC8 on. + + Returns: + crc >> 8: An integer representing the CRC8 value. + """ + crc = 0 + for byte in data: + crc ^= (ord(byte) << 8) + for _ in range(8): + if crc & 0x8000: + crc ^= (0x1070 << 3) + crc <<= 1 + return crc >> 8 + + +def StartLoop(interp): + """Starts an infinite loop of servicing the user and the EC. + + StartLoop checks to see if there are any commands to process, processing them + if any, and forwards EC output to the user. + + When sending a command to the EC, we send the command once and check the + response to see if the EC encountered an error when receiving the command. An + error condition is reported to the interpreter by a string with at least one + '&' and 'E'. The full string is actually '&&EE', however it's possible that + the leading ampersand or trailing 'E' could be dropped. If an error is + encountered, the interpreter will retry up to the amount configured. + + Args: + interp: An Interpreter object that has been properly initialised. + """ + while True: + readable, writeable, _ = select.select(interp.inputs, interp.outputs, []) + + for obj in readable: + # Handle any debug prints from the EC. + if obj is interp.ec_uart_pty: + logging.debug('EC has data') + if interp.cmd_in_progress: + # A command was just sent to the EC. We need to check to see if the + # EC is telling us that it received a corrupted command. + logging.debug('Command in progress so checking response...') + interp.CheckECResponse() + + # Read what the EC sent us. + data = os.read(obj.fileno(), EC_MAX_READ) + logging.debug('got: \'%s\'', data) + # For now, just forward everything the EC sends us. + logging.debug('Forwarding to user...') + interp.dbg_pipe.send(data) + + # Handle any commands from the user. + elif obj is interp.cmd_pipe: + logging.debug('Command data available. Begin processing.') + data = interp.cmd_pipe.recv() + # Process the command. + interp.ProcessCommand(data) + + for obj in writeable: + # Send a command to the EC. + if obj is interp.ec_uart_pty: + interp.SendCmdToEC()