mirror of
https://github.com/Telecominfraproject/OpenCellular.git
synced 2026-01-08 16:41:55 +00:00
This test checks we cannot overwrite current running system image. BUG=chrome-os-partner:10262 TEST=Test passed Change-Id: I72be277c9de2114e72000a102d8b885e842ef15a Reviewed-on: https://gerrit.chromium.org/gerrit/27006 Commit-Ready: Vic Yang <victoryang@chromium.org> Reviewed-by: Vic Yang <victoryang@chromium.org> Tested-by: Vic Yang <victoryang@chromium.org>
266 lines
8.5 KiB
Python
Executable File
266 lines
8.5 KiB
Python
Executable File
#!/usr/bin/python
|
|
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
#
|
|
# Python wrapper script for running tests under QEMU
|
|
#
|
|
|
|
import errno
|
|
import imp
|
|
import json
|
|
import os
|
|
import optparse
|
|
import re
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
QEMU_BINARY="qemu-system-arm"
|
|
QEMU_OPTIONS=["-machine","lm4f232h5","-serial","stdio","-display","none"]
|
|
|
|
def trace(msg):
|
|
sys.stdout.write(msg)
|
|
|
|
class QEMUError(Exception):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def __str__(self):
|
|
return "QEMU Error:" + repr(self.value)
|
|
|
|
class QEMUInstance:
|
|
PORT=3456
|
|
QMP_ADDR=("127.0.0.1", PORT)
|
|
|
|
def __run_qemu(self, cmdline, redirect_stdio=False):
|
|
trace("Starting QEMU binary ...\n")
|
|
if redirect_stdio:
|
|
stdin = subprocess.PIPE
|
|
stdout = subprocess.PIPE
|
|
else:
|
|
stdin = None
|
|
stdout = None
|
|
self.__qemu = subprocess.Popen(cmdline, shell=False, bufsize=16384,
|
|
stdin=stdin, stdout=stdout, close_fds=True)
|
|
trace("QEMU started pid:%d\n" % (self.__qemu.pid))
|
|
self.__qemu.wait()
|
|
trace("QEMU has terminated\n")
|
|
|
|
def __init__(self, qemu_bin, firmware, romcode = None, testmode = False):
|
|
self.__events = []
|
|
cmdline = [qemu_bin] + QEMU_OPTIONS + ["-kernel",firmware,"-qmp","tcp:%s:%d" % self.QMP_ADDR]
|
|
if romcode:
|
|
cmdline += ["-bios",romcode]
|
|
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.__sock.bind(self.QMP_ADDR)
|
|
self.__sock.listen(1)
|
|
|
|
self.__thr = threading.Thread(target=QEMUInstance.__run_qemu,args=(self,cmdline,testmode))
|
|
self.__thr.start()
|
|
try:
|
|
trace("Waiting for QEMU connection ...\n")
|
|
self.__sock, _ = self.__sock.accept()
|
|
self.__sockfd = self.__sock.makefile()
|
|
except socket.error:
|
|
raise QEMUError('Cannot connect to QMP server')
|
|
|
|
version = self.__json_recv()
|
|
if version is None or not version.has_key('QMP'):
|
|
raise QEMUError('Not QMP support')
|
|
# Test basic communication with QMP
|
|
resp = self.send_qmp('qmp_capabilities')
|
|
if not "return" in resp:
|
|
raise QEMUError('QMP not working properly')
|
|
trace("QMP connected\n")
|
|
|
|
def __json_recv(self, only_event=False):
|
|
while True:
|
|
data = self.__sockfd.readline()
|
|
if not data:
|
|
return
|
|
return json.loads(data)
|
|
|
|
def send_qmp(self, name, args=None):
|
|
qmp_cmd = { 'execute': name }
|
|
if args:
|
|
qmp_cmd['arguments'] = args
|
|
try:
|
|
self.__sock.sendall(json.dumps(qmp_cmd))
|
|
except socket.error, err:
|
|
if err[0] == errno.EPIPE:
|
|
return
|
|
raise QEMUError("Error on QMP socket:" + err)
|
|
return self.__json_recv()
|
|
|
|
def serial_readline(self):
|
|
return self.__qemu.stdout.readline()
|
|
|
|
def serial_write(self, string):
|
|
self.__qemu.stdin.write(string)
|
|
self.__qemu.stdin.flush()
|
|
|
|
def get_event(self, blocking=True):
|
|
if not blocking:
|
|
self.__sock.setblocking(0)
|
|
try:
|
|
val = self.__json_recv()
|
|
except socket.error, err:
|
|
if err[0] == errno.EAGAIN:
|
|
# Nothing available
|
|
return None
|
|
if not blocking:
|
|
self.__sock.setblocking(1)
|
|
return val
|
|
|
|
def close(self):
|
|
# Try to terminate QEMU gracefully
|
|
if self.__qemu.poll() == None:
|
|
self.send_qmp("quit")
|
|
time.sleep(0.1)
|
|
# Force termination if the process is still here :
|
|
if self.__qemu.poll() == None:
|
|
self.__qemu.terminate()
|
|
self.__thr.join()
|
|
self.__sock.close()
|
|
self.__sockfd.close()
|
|
|
|
class TestFailure(Exception):
|
|
def __init__(self, reason):
|
|
self.value = reason
|
|
|
|
def __str__(self):
|
|
return "reason:" + repr(self.value)
|
|
|
|
class EcTest:
|
|
def __init__(self, qemu_bin, firmware, romcode, test):
|
|
self.__qemu_bin = qemu_bin
|
|
self.__firmware = firmware
|
|
self.__romcode = romcode
|
|
self.__test = test
|
|
|
|
def timeout_handler(self, signum, frame):
|
|
raise TestFailure("Timeout waiting for %s" % self.__timeout_reason)
|
|
|
|
def wait_output(self, string, use_re = False, timeout = 5):
|
|
self.__timeout_reason = string
|
|
old_handler = signal.signal(signal.SIGALRM, lambda
|
|
s,f:self.timeout_handler(s,f))
|
|
if use_re:
|
|
regexp = re.compile(string)
|
|
signal.alarm(timeout)
|
|
while True:
|
|
ln = self.__qemu.serial_readline()
|
|
trace("[EC]%s" % ln)
|
|
if use_re:
|
|
res = regexp.search(ln)
|
|
if res:
|
|
signal.alarm(0)
|
|
signal.signal(signal.SIGALRM, old_handler)
|
|
return res.groupdict()
|
|
else:
|
|
if string in ln:
|
|
signal.alarm(0)
|
|
signal.signal(signal.SIGALRM, old_handler)
|
|
return
|
|
|
|
def check_no_output(self, string, use_re = False, timeout = 1):
|
|
success = False
|
|
try:
|
|
self.wait_output(string, use_re=use_re, timeout=timeout)
|
|
except:
|
|
success = True
|
|
return success
|
|
|
|
|
|
def wait_prompt(self):
|
|
self.wait_output("> ")
|
|
|
|
def ec_command(self, cmd):
|
|
self.__qemu.serial_write(cmd + '\r\n')
|
|
|
|
def trace(self, msg):
|
|
trace(msg)
|
|
|
|
def report(self, msg):
|
|
sys.stderr.write(" === TEST %s ===\n" % msg)
|
|
|
|
def fail(self, msg):
|
|
raise TestFailure(msg)
|
|
|
|
def run_test(self):
|
|
try:
|
|
self.__qemu = QEMUInstance(self.__qemu_bin, self.__firmware,
|
|
self.__romcode, True)
|
|
except QEMUError as e:
|
|
self.report("QEMU FATAL ERROR: " + e.value)
|
|
return 1
|
|
|
|
# Set up import path so each test can import other modules inside 'test'
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(self.__test)))
|
|
|
|
testmod = imp.load_module("testmodule", file(self.__test,"r"),
|
|
self.__test, (".py","r",imp.PY_SOURCE))
|
|
self.report("RUN: %s" % os.path.basename(self.__test))
|
|
try:
|
|
res = testmod.test(self)
|
|
except TestFailure as e:
|
|
res = False
|
|
self.report("FAIL: %s" % e.value)
|
|
self.__qemu.close()
|
|
if res:
|
|
self.report("PASS")
|
|
return 0
|
|
return 1
|
|
|
|
def run_interactive(qemu_bin, firmware, romcode):
|
|
try:
|
|
qemu = QEMUInstance(qemu_bin, firmware, romcode, False)
|
|
except QEMUError as e:
|
|
sys.stderr.write('FATAL: %s\n' % e.value)
|
|
return 1
|
|
|
|
# Dummy testing code : TODO remove
|
|
#print qemu.send_qmp("query-commands")
|
|
#print qemu.send_qmp("human-monitor-command",
|
|
# { 'command-line': "sendkey ctrl-alt-f1 50",'cpu-index': 0 })
|
|
while True:
|
|
msg = qemu.get_event()
|
|
trace("[EVENT]%s\n" % msg)
|
|
if msg.has_key("event") and msg["event"] == "RESET":
|
|
break
|
|
qemu.close()
|
|
return 0
|
|
|
|
def parse_cmdline(basedir):
|
|
parser = optparse.OptionParser("usage: %prog [options] [testname]")
|
|
parser.add_option("-b", "--board", dest="board", default="bds",
|
|
help="board to use")
|
|
parser.add_option("-i", "--image", dest="image",
|
|
help="firmware image filename")
|
|
parser.add_option("-r", "--rom", dest="romcode",
|
|
default=os.path.join(basedir,"util","rom_lm4fs1ge5bb.bin"),
|
|
help="ROM code image filename")
|
|
parser.add_option("-q", "--qemu", dest="qemu_bin",
|
|
default=os.path.join(basedir,"util",QEMU_BINARY),
|
|
help="Qemu binary path")
|
|
(options, args) = parser.parse_args()
|
|
if options.image:
|
|
image = options.image
|
|
else:
|
|
image = os.path.join(basedir,"build",options.board,"ec.bin")
|
|
|
|
return options.qemu_bin, image,options.romcode, args
|
|
|
|
if __name__ == '__main__':
|
|
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),".."))
|
|
qemu_bin, image, romcode, tests = parse_cmdline(basedir)
|
|
if len(tests) > 0:
|
|
res = EcTest(qemu_bin, image, romcode, tests[0]).run_test()
|
|
else:
|
|
res = run_interactive(qemu_bin, image, romcode)
|
|
sys.exit(res)
|