import logging
from collections import defaultdict
from typing import Tuple
import claripy
from angr.analyses import ForwardAnalysis, visitors
from angr.analyses import AnalysesHub
from ...errors import SimMemoryMissingError
from ...storage.memory_mixins.paged_memory.pages.multi_values import MultiValues
from ... import BP, BP_AFTER
from ...sim_variable import SimRegisterVariable, SimStackVariable
from ...code_location import CodeLocation
from .variable_recovery_base import VariableRecoveryBase, VariableRecoveryStateBase
from .annotations import StackLocationAnnotation
l = logging.getLogger(name=__name__)
[docs]class VariableRecoveryState(VariableRecoveryStateBase):
"""
The abstract state of variable recovery analysis.
:ivar angr.knowledge.variable_manager.VariableManager variable_manager: The variable manager.
"""
[docs] def __init__(self, block_addr, analysis, arch, func, concrete_states, stack_region=None, register_region=None):
super().__init__(block_addr, analysis, arch, func, stack_region=stack_region, register_region=register_region)
self._concrete_states = concrete_states
# register callbacks
self.register_callbacks(self.concrete_states)
def __repr__(self):
return "<VRAbstractState>"
@property
def concrete_states(self):
return self._concrete_states
@concrete_states.setter
def concrete_states(self, v):
self._concrete_states = v
[docs] def get_concrete_state(self, addr):
"""
:param addr:
:return:
"""
for s in self.concrete_states:
if s.ip.concrete_value == addr:
return s
return None
[docs] def copy(self):
state = VariableRecoveryState(
self.block_addr,
self._analysis,
self.arch,
self.function,
self._concrete_states,
stack_region=self.stack_region.copy(),
register_region=self.register_region.copy(),
)
return state
[docs] def register_callbacks(self, concrete_states):
"""
:param concrete_states:
:return:
"""
for concrete_state in concrete_states:
# clear existing breakpoints
# TODO: all breakpoints are removed. Fix this later by only removing breakpoints that we added
for bp_type in ("reg_read", "reg_write", "mem_read", "mem_write", "instruction"):
concrete_state.inspect._breakpoints[bp_type] = []
concrete_state.inspect.add_breakpoint(
"reg_read", BP(when=BP_AFTER, enabled=True, action=self._hook_register_read)
)
concrete_state.inspect.add_breakpoint("reg_write", BP(enabled=True, action=self._hook_register_write))
concrete_state.inspect.add_breakpoint(
"mem_read", BP(when=BP_AFTER, enabled=True, action=self._hook_memory_read)
)
concrete_state.inspect.add_breakpoint("mem_write", BP(enabled=True, action=self._hook_memory_write))
[docs] def merge(self, others: Tuple["VariableRecoveryState"], successor=None) -> Tuple["VariableRecoveryState", bool]:
"""
Merge two abstract states.
:param others: Other abstract states to merge.
:return: The merged abstract state.
:rtype: VariableRecoveryState, and a boolean that indicates if any merge has happened.
"""
self.phi_variables = {}
self.successor_block_addr = successor
merged_concrete_states = [self._concrete_states[0]] # self._merge_concrete_states(other)
new_stack_region = self.stack_region.copy()
new_stack_region.set_state(self)
merge_occurred = new_stack_region.merge([other.stack_region for other in others], None)
new_register_region = self.register_region.copy()
new_register_region.set_state(self)
merge_occurred |= new_register_region.merge([other.register_region for other in others], None)
self.phi_variables = {}
self.successor_block_addr = None
return (
VariableRecoveryState(
successor,
self._analysis,
self.arch,
self.function,
merged_concrete_states,
stack_region=new_stack_region,
register_region=new_register_region,
),
merge_occurred,
)
def _merge_concrete_states(self, other):
"""
:param VariableRecoveryState other:
:return:
:rtype: list
"""
merged = []
for s in self.concrete_states:
other_state = other.get_concrete_state(s.ip.concrete_value)
if other_state is not None:
s = s.merge(other_state)
merged.append(s)
return merged
#
# SimInspect callbacks
#
def _hook_register_read(self, state):
reg_read_offset = state.inspect.reg_read_offset
if isinstance(reg_read_offset, claripy.ast.BV):
if reg_read_offset.multivalued:
# Multi-valued register offsets are not supported
l.warning("Multi-valued register offsets are not supported.")
return
reg_read_offset = state.solver.eval(reg_read_offset)
reg_read_length = state.inspect.reg_read_length
reg_read_expr = state.inspect.reg_read_expr
if reg_read_offset == state.arch.sp_offset and reg_read_length == state.arch.bytes:
# TODO: make sure the sp is not overwritten by something that we are not tracking
return
var_offset = reg_read_offset
try:
_: MultiValues = self.register_region.load(reg_read_offset, reg_read_length)
except SimMemoryMissingError:
# the variable being read doesn't exist before
variable = SimRegisterVariable(
reg_read_offset,
reg_read_length,
ident=self.variable_manager[self.func_addr].next_variable_ident("register"),
region=self.func_addr,
)
data = self.annotate_with_variables(reg_read_expr, [(0, variable)])
self.register_region.store(var_offset, data)
# record this variable in variable manager
self.variable_manager[self.func_addr].add_variable("register", var_offset, variable)
def _hook_register_write(self, state):
reg_write_offset = state.inspect.reg_write_offset
if isinstance(reg_write_offset, claripy.ast.BV):
if reg_write_offset.multivalued:
# Multi-valued register offsets are not supported
l.warning("Multi-valued register offsets are not supported.")
return
reg_write_offset = state.solver.eval(reg_write_offset)
if reg_write_offset == state.arch.sp_offset:
# it's updating stack pointer. skip
return
reg_write_expr = state.inspect.reg_write_expr
reg_write_length = len(reg_write_expr) // 8
# annotate it
# reg_write_expr = reg_write_expr.annotate(VariableSourceAnnotation.from_state(state))
state.inspect.reg_write_expr = reg_write_expr
existing_vars = self.variable_manager[self.func_addr].find_variables_by_stmt(
state.scratch.bbl_addr, state.scratch.stmt_idx, "register"
)
if not existing_vars:
# create the variable
variable = SimRegisterVariable(
reg_write_offset,
reg_write_length,
ident=self.variable_manager[self.func_addr].next_variable_ident("register"),
region=self.func_addr,
)
var_offset = reg_write_offset
data = self.top(reg_write_length * self.arch.byte_width)
data = self.annotate_with_variables(data, [(0, variable)])
self.register_region.store(var_offset, data)
# record this variable in variable manager
self.variable_manager[self.func_addr].set_variable("register", var_offset, variable)
self.variable_manager[self.func_addr].write_to(variable, 0, self._codeloc_from_state(state))
# is it writing a pointer to a stack variable into the register?
# e.g. lea eax, [ebp-0x40]
stack_offset = self._addr_to_stack_offset(reg_write_expr)
if stack_offset is not None:
# it is!
# unfortunately we don't know the size. We use size None for now.
if stack_offset not in self.stack_region:
lea_size = 1
new_var = SimStackVariable(
stack_offset,
lea_size,
base="bp",
ident=self.variable_manager[self.func_addr].next_variable_ident("stack"),
region=self.func_addr,
)
reg_write_expr = self.annotate_with_variables(reg_write_expr, [(0, new_var)])
self.stack_region.store(
self.stack_addr_from_offset(stack_offset), reg_write_expr, endness=self.arch.memory_endness
)
# record this variable in variable manager
self.variable_manager[self.func_addr].add_variable("stack", stack_offset, new_var)
existing_vars = set()
try:
vs: MultiValues = self.stack_region.load(self.stack_addr_from_offset(stack_offset), size=1)
for values in vs.values():
for v in values:
for offset, var in self.extract_variables(v):
existing_vars.add((offset, var))
except SimMemoryMissingError:
pass
for offset, var in existing_vars:
self.variable_manager[self.func_addr].reference_at(var, offset, self._codeloc_from_state(state))
def _hook_memory_read(self, state):
mem_read_address = state.inspect.mem_read_address
mem_read_expr = state.inspect.mem_read_expr
mem_read_length = state.inspect.mem_read_length
endness = state.inspect.mem_read_endness
stack_offset = self._addr_to_stack_offset(mem_read_address)
if stack_offset is None:
# it's not a stack access
# TODO:
pass
else:
if stack_offset not in self.stack_region:
# this stack offset is not covered by any existing stack variable
ident_sort = "argument" if stack_offset > 0 else "stack"
variable = SimStackVariable(
stack_offset,
mem_read_length,
base="bp",
ident=self.variable_manager[self.func_addr].next_variable_ident(ident_sort),
region=self.func_addr,
)
mem_read_expr = self.annotate_with_variables(mem_read_expr, [(0, variable)])
self.stack_region.store(self.stack_addr_from_offset(stack_offset), mem_read_expr, endness=endness)
# record this variable in variable manager
self.variable_manager[self.func_addr].add_variable("stack", stack_offset, variable)
# load existing variables
existing_variables = set()
try:
vs: MultiValues = self.stack_region.load(
self.stack_addr_from_offset(stack_offset), size=mem_read_length, endness=endness
)
for values in vs.values():
for v in values:
for offset, var in self.extract_variables(v):
existing_variables.add((offset, var))
except SimMemoryMissingError:
pass
if len(existing_variables) > 1:
# create a phi node for all other variables
l.warning(
"Reading memory with overlapping variables: %s. Ignoring all but the first one.", existing_variables
)
if existing_variables:
offset, variable = next(iter(existing_variables))
self.variable_manager[self.func_addr].read_from(variable, offset, self._codeloc_from_state(state))
def _hook_memory_write(self, state):
mem_write_address = state.inspect.mem_write_address
mem_write_expr = state.inspect.mem_write_expr
mem_write_length = len(mem_write_expr) // 8
endness = state.inspect.mem_write_endness
stack_offset = self._addr_to_stack_offset(mem_write_address)
if stack_offset is None:
# it's not a stack access
# TODO:
pass
else:
# we always add a new variable to keep it SSA
variable = SimStackVariable(
stack_offset,
mem_write_length,
base="bp",
ident=self.variable_manager[self.func_addr].next_variable_ident("stack"),
region=self.func_addr,
)
mem_write_expr = self.annotate_with_variables(mem_write_expr, [(0, variable)])
self.stack_region.store(self.stack_addr_from_offset(stack_offset), mem_write_expr, endness=endness)
# record this variable in variable manager
self.variable_manager[self.func_addr].add_variable("stack", stack_offset, variable)
self.variable_manager[self.func_addr].write_to(variable, 0, self._codeloc_from_state(state))
#
# Util methods
#
def _normalize_register_offset(self, offset): # pylint:disable=no-self-use
# TODO:
return offset
@staticmethod
def _codeloc_from_state(state):
return CodeLocation(state.scratch.bbl_addr, state.scratch.stmt_idx, ins_addr=state.scratch.ins_addr)
def _to_signed(self, n):
if n >= 2 ** (self.arch.bits - 1):
# convert it to a negative number
return n - 2**self.arch.bits
return n
def _addr_to_stack_offset(self, addr):
"""
Convert an address to a stack offset.
:param claripy.ast.Base addr: The address to convert from.
:return: A stack offset if the addr comes from the stack pointer, or None if the address
does not come from the stack pointer.
"""
def _parse(addr):
if addr.op == "__add__":
# __add__ might have multiple arguments
parsed = [_parse(arg) for arg in addr.args]
annotated = [True for annotated, _ in parsed if annotated is True]
if len(annotated) != 1:
# either nothing is annotated, or more than one element is annotated
raise ValueError()
return True, sum(off for _, off in parsed)
elif addr.op == "__sub__":
# __sub__ might have multiple arguments
parsed = [_parse(arg) for arg in addr.args]
first_annotated, first_offset = parsed[0]
if first_annotated is False:
# the first argument is not annotated. we don't support it.
raise ValueError()
if any(annotated for annotated, _ in parsed[1:]):
# more than one argument is annotated. we don't support it.
raise ValueError()
return True, first_offset - sum(off for _, off in parsed[1:])
else:
anno = next(iter(anno for anno in addr.annotations if isinstance(anno, StackLocationAnnotation)), None)
if anno is None:
if addr.op == "BVV":
return False, addr.concrete_value
raise ValueError()
return True, anno.offset
# find the annotated AST
try:
annotated, offset = _parse(addr)
except ValueError:
return None
if not annotated:
return None
return self._to_signed(offset)
[docs]class VariableRecovery(ForwardAnalysis, VariableRecoveryBase): # pylint:disable=abstract-method
"""
Recover "variables" from a function using forced execution.
While variables play a very important role in programming, it does not really exist after compiling. However, we can
still identify and recovery their counterparts in binaries. It is worth noting that not every variable in source
code can be identified in binaries, and not every recognized variable in binaries have a corresponding variable in
the original source code. In short, there is no guarantee that the variables we identified/recognized in a binary
are the same variables in its source code.
This analysis uses heuristics to identify and recovers the following types of variables:
- Register variables.
- Stack variables.
- Heap variables. (not implemented yet)
- Global variables. (not implemented yet)
This analysis takes a function as input, and performs a data-flow analysis on nodes. It runs concrete execution on
every statement and hooks all register/memory accesses to discover all places that are accessing variables. It is
slow, but has a more accurate analysis result. For a fast but inaccurate variable recovery, you may consider using
VariableRecoveryFast.
This analysis follows SSA, which means every write creates a new variable in registers or memory (statck, heap,
etc.). Things may get tricky when overlapping variable (in memory, as you cannot really have overlapping accesses
to registers) accesses exist, and in such cases, a new variable will be created, and this new variable will overlap
with one or more existing varaibles. A decision procedure (which is pretty much TODO) is required at the end of this
analysis to resolve the conflicts between overlapping variables.
"""
[docs] def __init__(self, func, max_iterations=20, store_live_variables=False):
"""
:param knowledge.Function func: The function to analyze.
"""
function_graph_visitor = visitors.FunctionGraphVisitor(func)
VariableRecoveryBase.__init__(self, func, max_iterations, store_live_variables)
ForwardAnalysis.__init__(
self, order_jobs=True, allow_merging=True, allow_widening=False, graph_visitor=function_graph_visitor
)
self._node_iterations = defaultdict(int)
self._analyze()
#
# Main analysis routines
#
def _pre_analysis(self):
self.initialize_dominance_frontiers()
def _pre_job_handling(self, job):
pass
def _initial_abstract_state(self, node):
concrete_state = self.project.factory.blank_state(
addr=node.addr, mode="fastpath" # we don't want to do any solving
)
# annotate the stack pointer
concrete_state.regs.sp = concrete_state.regs.sp.annotate(StackLocationAnnotation(8))
# give it enough stack space
concrete_state.regs.bp = concrete_state.regs.sp + 0x100000
return VariableRecoveryState(node.addr, self, self.project.arch, self.function, [concrete_state])
def _merge_states(self, node, *states: VariableRecoveryState):
if len(states) == 1:
return states[0], True
merged_state, merge_occurred = states[0].merge(states[1:], successor=node.addr)
return merged_state, not merge_occurred
def _run_on_node(self, node, state):
"""
Take an input abstract state, execute the node, and derive an output state.
:param angr.Block node: The node to work on.
:param VariableRecoveryState state: The input state.
:return: A tuple of (changed, new output state).
:rtype: tuple
"""
l.debug("Analyzing block %#x, iteration %d.", node.addr, self._node_iterations[node])
concrete_state = state.get_concrete_state(node.addr)
if concrete_state is None:
# didn't find any state going to here
l.error("_run_on_node(): cannot find any state for address %#x.", node.addr)
return False, state
state = state.copy()
self._instates[node.addr] = state
if self._node_iterations[node] >= self._max_iterations:
l.debug("Skip node %s as we have iterated %d times on it.", node, self._node_iterations[node])
return False, state
state.register_callbacks([concrete_state])
successors = self.project.factory.successors(
concrete_state,
addr=node.addr,
size=node.size,
opt_level=1,
cross_insn_opt=False,
)
output_states = successors.all_successors
state.concrete_states = [state for state in output_states if not state.ip.symbolic]
self._outstates[node.addr] = state
self._node_iterations[node] += 1
return True, state
def _intra_analysis(self):
pass
def _post_analysis(self):
# TODO: only re-assign variable names to those that are newly changed
self.variable_manager.initialize_variable_names()
if self._store_live_variables:
for addr, state in self._outstates.items():
self.variable_manager[self.function.addr].set_live_variables(
addr,
state.downsize_region(state.register_region),
state.downsize_region(state.stack_region),
)
AnalysesHub.register_default("VariableRecovery", VariableRecovery)