ec: Add annotation feature to the stack analyzer.

Get stack analyzer supported to read annotation file and do basic
annotation on the callgraph.

The basic annotation includes:
1. Add missing calls to the callgraph
2. Ignore functions on the callgraph

BUG=chromium:648840
BRANCH=none
TEST=extra/stack_analyzer/stack_analyzer_unittest.py
     make BOARD=elm && extra/stack_analyzer/stack_analyzer.py \
         --objdump=arm-none-eabi-objdump \
         --addr2line=arm-none-eabi-addr2line \
         --export_taskinfo=./build/elm/util/export_taskinfo.so \
         --section=RW \
         --annotation=./extra/stack_analyzer/example_annotation.yaml \
         ./build/elm/RW/ec.RW.elf
     make BOARD=elm SECTION=RW \
         ANNOTATION=./extra/stack_analyzer/example_annotation.yaml \
         analyzestack

Change-Id: I4cc7c34f422655708a7312db3f6b4416e1af917f
Signed-off-by: Che-yu Wu <cheyuw@google.com>
Reviewed-on: https://chromium-review.googlesource.com/614825
Reviewed-by: Nicolas Boichat <drinkcat@chromium.org>
Reviewed-by: Vincent Palatin <vpalatin@chromium.org>
This commit is contained in:
Che-yu Wu
2017-08-09 12:33:40 +08:00
committed by chrome-bot
parent 20c3de1c36
commit ef09835e19
5 changed files with 413 additions and 27 deletions

View File

@@ -554,6 +554,7 @@ analyzestack: $(out)/util/export_taskinfo.so
fi; \
extra/stack_analyzer/stack_analyzer.py --objdump "$(OBJDUMP)" \
--addr2line "$(ADDR2LINE)" --section "$$SECTION" \
$(if $(ANNOTATION),--annotation $(ANNOTATION),) \
--export_taskinfo "$$EXPORT_TASKINFO" "$$ELF"
.SECONDARY:

View File

@@ -12,6 +12,7 @@ Make sure the firmware of your target board has been built.
In `src/platform/ec`, run
```
make BOARD=${BOARD} SECTION=${SECTION} analyzestack
make BOARD=${BOARD} SECTION=${SECTION} ANNOTATION=${ANNOTATION} analyzestack
```
The `${SECTION}` can be `RO` or `RW`.
The `${SECTION}` can be `RO` or `RW`. The `${ANNOTATION}` is a optional
annotation file, see the example_annotation.yaml.

View File

@@ -0,0 +1,18 @@
# Add some missing calls.
add:
# console_task also calls command_display_accel_info and command_accel_init.
console_task:
- command_display_accel_info
- command_accel_init
# Function name can be followed by [source code path] to indicate where is it
# declared (there may be several functions with the same name).
motion_lid_calc[common/motion_lid.c]:
- get_range[driver/accel_kionix.c]
# Remove some call paths.
remove:
# Remove all callsites pointing to panic_assert_fail.
- panic_assert_fail
- panic
- software_panic

View File

@@ -16,9 +16,12 @@
from __future__ import print_function
import argparse
import collections
import ctypes
import os
import re
import subprocess
import yaml
SECTION_RO = 'RO'
@@ -132,7 +135,7 @@ class Callsite(object):
"""Function callsite.
Attributes:
address: Address of callsite location.
address: Address of callsite location. None if it is unknown.
target: Callee address.
is_tail: A bool indicates that it is a tailing call.
callee: Resolved callee function. None if it hasn't been resolved.
@@ -359,17 +362,25 @@ class StackAnalyzer(object):
Analyze: Run the stack analysis.
"""
def __init__(self, options, symbols, tasklist):
# Errors of annotation resolving.
ANNOTATION_ERROR_INVALID = 'invalid signature'
ANNOTATION_ERROR_NOTFOUND = 'function is not found'
ANNOTATION_ERROR_AMBIGUOUS = 'signature is ambiguous'
def __init__(self, options, symbols, tasklist, annotation):
"""Constructor.
Args:
options: Namespace from argparse.parse_args().
symbols: Symbol list.
tasklist: Task list.
annotation: Annotation config.
"""
self.options = options
self.symbols = symbols
self.tasklist = tasklist
self.annotation = annotation
self.address_to_line_cache = {}
def AddressToLine(self, address):
"""Convert address to line.
@@ -383,6 +394,9 @@ class StackAnalyzer(object):
Raises:
StackAnalyzerError: If addr2line is failed.
"""
if address in self.address_to_line_cache:
return self.address_to_line_cache[address]
try:
line_text = subprocess.check_output([self.options.addr2line,
'-e',
@@ -393,7 +407,9 @@ class StackAnalyzer(object):
except OSError:
raise StackAnalyzerError('Failed to run addr2line.')
return line_text.strip()
line_text = line_text.strip()
self.address_to_line_cache[address] = line_text
return line_text
def AnalyzeDisassembly(self, disasm_text):
"""Parse the disassembly text, analyze, and build a map of all functions.
@@ -559,6 +575,186 @@ class StackAnalyzer(object):
return function_map
def MappingAnnotation(self, function_map, signature_set):
"""Map annotation signatures to functions.
Args:
function_map: Function map.
signature_set: Set of annotation signatures.
Returns:
Map of signatures to functions, set of signatures which can't be resolved.
"""
C_FUNCTION_NAME = r'_A-Za-z0-9'
ADDRTOLINE_FAILED_SYMBOL = '??'
# To eliminate the suffix appended by compilers, try to extract the
# C function name from the prefix of symbol name.
# Example: SHA256_transform.constprop.28
prefix_name_regex = re.compile(
r'^(?P<name>[{0}]+)([^{0}].*)?$'.format(C_FUNCTION_NAME))
# Example: get_range[driver/accel_kionix.c]
annotation_signature_regex = re.compile(
r'^(?P<name>[{}]+)(\[(?P<path>.+)\])?$'.format(C_FUNCTION_NAME))
# Example: driver/accel_kionix.c:321 and ??:0
addrtoline_regex = re.compile(r'^(?P<path>.+):\d+$')
# Build the symbol map indexed by symbol name. If there are multiple symbols
# with the same name, add them into a set. (e.g. symbols of static function
# with the same name)
symbol_map = collections.defaultdict(set)
for symbol in self.symbols:
if symbol.symtype == 'F':
# Function symbol.
result = prefix_name_regex.match(symbol.name)
if result is not None:
function = function_map.get(symbol.address)
# Ignore the symbol not in disassembly.
if function is not None:
# If there are multiple symbol with the same name and point to the
# same function, the set will deduplicate them.
symbol_map[result.group('name').strip()].add(function)
# Build the signature map indexed by annotation signature.
signature_map = {}
failed_sigs = set()
symbol_path_map = {}
for sig in signature_set:
result = annotation_signature_regex.match(sig)
if result is None:
failed_sigs.add((sig, self.ANNOTATION_ERROR_INVALID))
continue
name = result.group('name').strip()
path = result.group('path')
functions = symbol_map.get(name)
if functions is None:
failed_sigs.add((sig, self.ANNOTATION_ERROR_NOTFOUND))
continue
if name not in symbol_path_map:
# Lazy symbol path resolving. Since the addr2line isn't fast, only
# resolve needed symbol paths.
group_map = collections.defaultdict(list)
for function in functions:
result = addrtoline_regex.match(self.AddressToLine(function.address))
# Assume the output of addr2line is always well-formed.
assert result is not None
symbol_path = result.group('path').strip()
if symbol_path == ADDRTOLINE_FAILED_SYMBOL:
continue
# Group the functions with the same symbol signature (symbol name +
# symbol path). Assume they are the same copies and do the same
# annotation operations of them because we don't know which copy is
# indicated by the users.
group_map[os.path.realpath(symbol_path)].append(function)
symbol_path_map[name] = group_map
# Symbol matching.
function_group = None
group_map = symbol_path_map[name]
if len(group_map) > 0:
if path is None:
if len(group_map) > 1:
# There is ambiguity but the path isn't specified.
failed_sigs.add((sig, self.ANNOTATION_ERROR_AMBIGUOUS))
continue
# No path signature but all symbol signatures of functions are same.
# Assume they are the same functions, so there is no ambiguity.
(function_group,) = group_map.values()
else:
function_group = group_map.get(os.path.realpath(path.strip()))
if function_group is None:
failed_sigs.add((sig, self.ANNOTATION_ERROR_NOTFOUND))
continue
# The function_group is a list of all the same functions (according to
# our assumption) which should be annotated together.
signature_map[sig] = function_group
return (signature_map, failed_sigs)
def ResolveAnnotation(self, function_map):
"""Resolve annotation.
Args:
function_map: Function map.
Returns:
Set of added call edges, set of invalid paths, set of annotation
signatures which can't be resolved.
"""
# Collect annotation signatures.
annotation_add_map = self.annotation.get('add', {})
annotation_remove_list = self.annotation.get('remove', [])
signature_set = set(annotation_remove_list)
for src_sig, dst_sigs in annotation_add_map.items():
signature_set.add(src_sig)
signature_set.update(dst_sigs)
signature_set = {sig.strip() for sig in signature_set}
# Map signatures to functions.
(signature_map, failed_sigs) = self.MappingAnnotation(function_map,
signature_set)
# Generate the annotation sets.
add_set = set()
remove_set = set()
for src_sig, dst_sigs in annotation_add_map.items():
src_funcs = signature_map.get(src_sig)
if src_funcs is None:
continue
for dst_sig in dst_sigs:
dst_funcs = signature_map.get(dst_sig)
if dst_funcs is None:
continue
# Duplicate the call edge for all the same source and destination
# functions.
for src_func in src_funcs:
for dst_func in dst_funcs:
add_set.add((src_func, dst_func))
for remove_sig in annotation_remove_list:
remove_funcs = signature_map.get(remove_sig)
if remove_funcs is not None:
# Add all the same functions.
remove_set.update(remove_funcs)
return add_set, remove_set, failed_sigs
def PreprocessCallGraph(self, function_map, add_set, remove_set):
"""Preprocess the callgraph.
It will add the missing call edges, and remove simple invalid paths (the
paths only have one vertex) from the function_map.
Args:
function_map: Function map.
add_set: Set of missing call edges.
remove_set: Set of invalid paths.
"""
for src_func, dst_func in add_set:
# TODO(cheyuw): Support tailing call annotation.
src_func.callsites.append(
Callsite(None, dst_func.address, False, dst_func))
for function in function_map.values():
cleaned_callsites = []
for callsite in function.callsites:
if callsite.callee not in remove_set:
cleaned_callsites.append(callsite)
function.callsites = cleaned_callsites
def AnalyzeCallGraph(self, function_map):
"""Analyze call graph.
@@ -654,7 +850,11 @@ class StackAnalyzer(object):
return cycle_groups
def Analyze(self):
"""Run the stack analysis."""
"""Run the stack analysis.
Raises:
StackAnalyzerError: If disassembly fails.
"""
# Analyze disassembly.
try:
disasm_text = subprocess.check_output([self.options.objdump,
@@ -666,6 +866,8 @@ class StackAnalyzer(object):
raise StackAnalyzerError('Failed to run objdump.')
function_map = self.AnalyzeDisassembly(disasm_text)
(add_set, remove_set, failed_sigs) = self.ResolveAnnotation(function_map)
self.PreprocessCallGraph(function_map, add_set, remove_set)
cycle_groups = self.AnalyzeCallGraph(function_map)
# Print the results of task-aware stack analysis.
@@ -693,6 +895,11 @@ class StackAnalyzer(object):
print(output)
curr_func = curr_func.stack_successor
if len(failed_sigs) > 0:
print('Failed to resolve some annotation signatures:')
for sig, error in failed_sigs:
print('\t{}: {}'.format(sig, error))
def ParseArgs():
"""Parse commandline arguments.
@@ -710,6 +917,8 @@ def ParseArgs():
help='the path of objdump')
parser.add_argument('--addr2line', default='addr2line',
help='the path of addr2line')
parser.add_argument('--annotation', default=None,
help='the path of annotation file')
# TODO(cheyuw): Add an option for dumping stack usage of all functions.
@@ -799,6 +1008,22 @@ def main():
try:
options = ParseArgs()
# Load annotation config.
if options.annotation is None:
annotation = {}
else:
try:
with open(options.annotation, 'r') as annotation_file:
annotation = yaml.safe_load(annotation_file)
except yaml.YAMLError:
raise StackAnalyzerError('Failed to parse annotation file.')
except IOError:
raise StackAnalyzerError('Failed to open annotation file.')
if not isinstance(annotation, dict):
raise StackAnalyzerError('Invalid annotation file.')
# Generate and parse the symbols.
try:
symbol_text = subprocess.check_output([options.objdump,
@@ -819,7 +1044,7 @@ def main():
tasklist = LoadTasklist(options.section, export_taskinfo, symbols)
analyzer = StackAnalyzer(options, symbols, tasklist)
analyzer = StackAnalyzer(options, symbols, tasklist, annotation)
analyzer.Analyze()
except StackAnalyzerError as e:
print('Error: {}'.format(e))

View File

@@ -128,15 +128,21 @@ class StackAnalyzerTest(unittest.TestCase):
symbols = [sa.Symbol(0x1000, 'F', 0x15C, 'hook_task'),
sa.Symbol(0x2000, 'F', 0x51C, 'console_task'),
sa.Symbol(0x3200, 'O', 0x124, '__just_data'),
sa.Symbol(0x4000, 'F', 0x11C, 'touchpad_calc')]
sa.Symbol(0x4000, 'F', 0x11C, 'touchpad_calc'),
sa.Symbol(0x5000, 'F', 0x12C, 'touchpad_calc.constprop.42'),
sa.Symbol(0x12000, 'F', 0x13C, 'trackpad_range'),
sa.Symbol(0x13000, 'F', 0x200, 'inlined_mul'),
sa.Symbol(0x13100, 'F', 0x200, 'inlined_mul'),
sa.Symbol(0x13100, 'F', 0x200, 'inlined_mul_alias')]
tasklist = [sa.Task('HOOKS', 'hook_task', 2048, 0x1000),
sa.Task('CONSOLE', 'console_task', 460, 0x2000)]
options = mock.MagicMock(elf_path='./ec.RW.elf',
export_taskinfo='none',
export_taskinfo='fake',
section='RW',
objdump='objdump',
addr2line='addr2line')
self.analyzer = sa.StackAnalyzer(options, symbols, tasklist)
addr2line='addr2line',
annotation=None)
self.analyzer = sa.StackAnalyzer(options, symbols, tasklist, {})
def testParseSymbolText(self):
symbol_text = (
@@ -191,6 +197,112 @@ class StackAnalyzerTest(unittest.TestCase):
tasklist = sa.LoadTasklist('RW', export_taskinfo, self.analyzer.symbols)
self.assertEqual(tasklist, expect_rw_tasklist)
def testResolveAnnotation(self):
funcs = {
0x1000: sa.Function(0x1000, 'hook_task', 0, []),
0x2000: sa.Function(0x2000, 'console_task', 0, []),
0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []),
0x5000: sa.Function(0x5000, 'touchpad_calc.constprop.42', 0, []),
0x13000: sa.Function(0x13000, 'inlined_mul', 0, []),
0x13100: sa.Function(0x13100, 'inlined_mul', 0, []),
}
# Set address_to_line_cache to fake the results of addr2line.
self.analyzer.address_to_line_cache = {
0x1000: 'a.c:10',
0x2000: 'b.c:20',
0x4000: './a.c:30',
0x5000: 'b.c:40',
0x12000: 't.c:10',
0x13000: 'x.c:12',
0x13100: 'x.c:12',
}
self.analyzer.annotation = {
'add': {
'hook_task': ['touchpad_calc[a.c]', 'hook_task'],
'console_task': ['touchpad_calc[b.c]', 'inlined_mul_alias'],
'hook_task[q.c]': ['hook_task'],
'inlined_mul[x.c]': ['inlined_mul'],
},
'remove': {
'touchpad?calc',
'touchpad_calc',
'touchpad_calc[a.c]',
'task_unk[a.c]',
'touchpad_calc[../a.c]',
'trackpad_range',
'inlined_mul',
},
}
signature_set = set(self.analyzer.annotation['remove'])
for src_sig, dst_sigs in self.analyzer.annotation['add'].items():
signature_set.add(src_sig)
signature_set.update(dst_sigs)
(signature_map, failed_sigs) = self.analyzer.MappingAnnotation(
funcs, signature_set)
(add_set, remove_set, failed_sigs) = self.analyzer.ResolveAnnotation(funcs)
expect_signature_map = {
'hook_task': {funcs[0x1000]},
'touchpad_calc[a.c]': {funcs[0x4000]},
'touchpad_calc[b.c]': {funcs[0x5000]},
'console_task': {funcs[0x2000]},
'inlined_mul_alias': {funcs[0x13100]},
'inlined_mul[x.c]': {funcs[0x13000], funcs[0x13100]},
'inlined_mul': {funcs[0x13000], funcs[0x13100]},
}
self.assertEqual(len(signature_map), len(expect_signature_map))
for sig, funclist in signature_map.items():
self.assertEqual(set(funclist), expect_signature_map[sig])
self.assertEqual(add_set, {
(funcs[0x1000], funcs[0x4000]),
(funcs[0x1000], funcs[0x1000]),
(funcs[0x2000], funcs[0x5000]),
(funcs[0x2000], funcs[0x13100]),
(funcs[0x13000], funcs[0x13000]),
(funcs[0x13000], funcs[0x13100]),
(funcs[0x13100], funcs[0x13000]),
(funcs[0x13100], funcs[0x13100]),
})
self.assertEqual(remove_set, {
funcs[0x4000],
funcs[0x13000],
funcs[0x13100]
})
self.assertEqual(failed_sigs, {
('touchpad?calc', sa.StackAnalyzer.ANNOTATION_ERROR_INVALID),
('touchpad_calc', sa.StackAnalyzer.ANNOTATION_ERROR_AMBIGUOUS),
('hook_task[q.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
('task_unk[a.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
('touchpad_calc[../a.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
('trackpad_range', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
})
def testPreprocessCallGraph(self):
funcs = {
0x1000: sa.Function(0x1000, 'hook_task', 0, []),
0x2000: sa.Function(0x2000, 'console_task', 0, []),
0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []),
}
funcs[0x1000].callsites = [
sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])]
funcs[0x2000].callsites = [
sa.Callsite(0x2002, 0x1000, False, funcs[0x1000])]
add_set = {(funcs[0x2000], funcs[0x4000]), (funcs[0x4000], funcs[0x1000])}
remove_set = {funcs[0x1000]}
self.analyzer.PreprocessCallGraph(funcs, add_set, remove_set)
expect_funcs = {
0x1000: sa.Function(0x1000, 'hook_task', 0, []),
0x2000: sa.Function(0x2000, 'console_task', 0, []),
0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []),
}
expect_funcs[0x2000].callsites = [
sa.Callsite(None, 0x4000, False, expect_funcs[0x4000])]
self.assertEqual(funcs, expect_funcs)
def testAnalyzeDisassembly(self):
disasm_text = (
'\n'
@@ -280,22 +392,23 @@ class StackAnalyzerTest(unittest.TestCase):
@mock.patch('subprocess.check_output')
def testAddressToLine(self, checkoutput_mock):
checkoutput_mock.return_value = 'test.c [1]'
self.assertEqual(self.analyzer.AddressToLine(0x1000), 'test.c [1]')
self.assertEqual(self.analyzer.AddressToLine(0x1234), 'test.c [1]')
checkoutput_mock.assert_called_once_with(
['addr2line', '-e', './ec.RW.elf', '1000'])
['addr2line', '-e', './ec.RW.elf', '1234'])
with self.assertRaisesRegexp(sa.StackAnalyzerError,
'addr2line failed to resolve lines.'):
checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '')
self.analyzer.AddressToLine(0x1000)
self.analyzer.AddressToLine(0x5678)
with self.assertRaisesRegexp(sa.StackAnalyzerError,
'Failed to run addr2line.'):
checkoutput_mock.side_effect = OSError()
self.analyzer.AddressToLine(0x1000)
self.analyzer.AddressToLine(0x9012)
@mock.patch('subprocess.check_output')
def testAnalyze(self, checkoutput_mock):
@mock.patch('stack_analyzer.StackAnalyzer.AddressToLine')
def testAnalyze(self, addrtoline_mock, checkoutput_mock):
disasm_text = (
'\n'
'Disassembly of section .text:\n'
@@ -311,29 +424,33 @@ class StackAnalyzerTest(unittest.TestCase):
' 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n'
)
addrtoline_mock.return_value = '??:0'
self.analyzer.annotation = {'remove': ['fake_func']}
with mock.patch('__builtin__.print') as print_mock:
checkoutput_mock.side_effect = [disasm_text, '?', '?', '?']
checkoutput_mock.return_value = disasm_text
self.analyzer.Analyze()
print_mock.assert_has_calls([
mock.call(
'Task: HOOKS, Max size: 224 (0 + 224), Allocated size: 2048'),
mock.call('Call Trace:'),
mock.call('\thook_task (0) 1000 [?]'),
mock.call('\thook_task (0) 1000 [??:0]'),
mock.call(
'Task: CONSOLE, Max size: 232 (8 + 224), Allocated size: 460'),
mock.call('Call Trace:'),
mock.call('\tconsole_task (8) 2000 [?]'),
mock.call('\tconsole_task (8) 2000 [??:0]'),
mock.call('Failed to resolve some annotation signatures:'),
mock.call('\tfake_func: function is not found'),
])
with self.assertRaisesRegexp(sa.StackAnalyzerError,
'Failed to run objdump.'):
checkoutput_mock.side_effect = [OSError(), '?', '?', '?']
checkoutput_mock.side_effect = OSError()
self.analyzer.Analyze()
with self.assertRaisesRegexp(sa.StackAnalyzerError,
'objdump failed to disassemble.'):
checkoutput_mock.side_effect = [subprocess.CalledProcessError(1, ''), '?',
'?', '?']
checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '')
self.analyzer.Analyze()
@mock.patch('subprocess.check_output')
@@ -342,11 +459,35 @@ class StackAnalyzerTest(unittest.TestCase):
symbol_text = ('1000 g F .text 0000015c .hidden hook_task\n'
'2000 g F .text 0000051c .hidden console_task\n')
parseargs_mock.return_value = mock.MagicMock(elf_path='./ec.RW.elf',
export_taskinfo='none',
section='RW',
objdump='objdump',
addr2line='addr2line')
args = mock.MagicMock(elf_path='./ec.RW.elf',
export_taskinfo='fake',
section='RW',
objdump='objdump',
addr2line='addr2line',
annotation='fake')
parseargs_mock.return_value = args
with mock.patch('__builtin__.print') as print_mock:
sa.main()
print_mock.assert_called_once_with(
'Error: Failed to open annotation file.')
with mock.patch('__builtin__.print') as print_mock:
with mock.patch('__builtin__.open', mock.mock_open()) as open_mock:
open_mock.return_value.read.side_effect = ['{', '']
sa.main()
open_mock.assert_called_once_with('fake', 'r')
print_mock.assert_called_once_with(
'Error: Failed to parse annotation file.')
with mock.patch('__builtin__.print') as print_mock:
with mock.patch('__builtin__.open',
mock.mock_open(read_data='')) as open_mock:
sa.main()
print_mock.assert_called_once_with(
'Error: Invalid annotation file.')
args.annotation = None
with mock.patch('__builtin__.print') as print_mock:
checkoutput_mock.return_value = symbol_text