import itertools
import logging
import math
import os
import struct
import weakref
from collections import OrderedDict, deque
from itertools import chain
from typing import Optional, Generic, TypeVar, overload, TYPE_CHECKING, List, Iterable, Iterator, Tuple, NoReturn

    from .bool import Bool
    from .fp import FP
    from ..annotation import Annotation

    import cPickle as pickle
except ImportError:
    import pickle

    # Python's build-in MD5 is about 2x faster than hashlib.md5 on short bytestrings
    import _md5 as md5
except ImportError:
    import hashlib as md5

l = logging.getLogger("claripy.ast")

WORKER = bool(os.environ.get("WORKER", False))
md5_unpacker = struct.Struct("2Q")
from_iterable = chain.from_iterable

# pylint:enable=unused-argument
# pylint:disable=unidiomatic-typecheck

T = TypeVar("T", bound="Base")

[docs]class ASTCacheKey(Generic[T]):
[docs] def __init__(self, a: T): self.ast: T = a
def __hash__(self): return hash(self.ast) def __eq__(self, other): return type(self) is type(other) and self.ast._hash == other.ast._hash def __repr__(self): return f"<Key {self.ast._type_name()} {self.ast.__repr__(inner=True)}>"
# # AST variable naming # var_counter = itertools.count() _unique_names = True def _make_name(name: str, size: int, explicit_name: bool = False, prefix: str = "") -> str: if _unique_names and not explicit_name: return "%s%s_%d_%d" % (prefix, name, next(var_counter), size) else: return name def _d(h, cls, state): """ This function is the deserializer for ASTs. It exists to work around the fact that pickle will (normally) call __new__() with no arguments during deserialization. For ASTs, this does not work. """ op, args, length, variables, symbolic, annotations = state return cls.__new__( cls, op, args, length=length, variables=variables, symbolic=symbolic, annotations=annotations, hash=h )
[docs]class Base: """ This is the base class of all claripy ASTs. An AST tracks a tree of operations on arguments. This class should not be instanciated directly - instead, use one of the constructor functions (BVS, BVV, FPS, FPV...) to construct a leaf node and then build more complicated expressions using operations. AST objects have *hash identity*. This means that an AST that has the same hash as another AST will be the *same* object. This is critical for efficient memory usage. As an example, the following is true:: a, b = two different ASTs c = b + a d = b + a assert c is d :ivar op: The operation that is being done on the arguments :ivar args: The arguments that are being used """ __slots__ = [ "op", "args", "variables", "symbolic", "_hash", "_simplified", "_cached_encoded_name", "_cache_key", "_errored", "_eager_backends", "length", "_excavated", "_burrowed", "_uninitialized", "_uc_alloc_depth", "annotations", "simplifiable", "_uneliminatable_annotations", "_relocatable_annotations", "depth", "__weakref__", ] _hash_cache = weakref.WeakValueDictionary() _leaf_cache = weakref.WeakValueDictionary() FULL_SIMPLIFY = 1 LITE_SIMPLIFY = 2 UNSIMPLIFIED = 0 LITE_REPR = 0 MID_REPR = 1 FULL_REPR = 2 def __new__(cls, op, args, add_variables=None, hash=None, **kwargs): # pylint:disable=redefined-builtin """ This is called when you create a new Base object, whether directly or through an operation. It finalizes the arguments (see the _finalize function, above) and then computes a hash. If an AST of this hash already exists, it returns that AST. Otherwise, it creates, initializes, and returns the AST. :param op: The AST operation ('__add__', 'Or', etc) :param args: The arguments to the AST operation (i.e., the objects to add) :param variables: The symbolic variables present in the AST (default: empty set) :param symbolic: A flag saying whether or not the AST is symbolic (default: False) :param length: An integer specifying the bit length of this AST (default: None) :param simplified: A measure of how simplified this AST is. 0 means unsimplified, 1 means fast-simplified (basically, just undoing the Reverse op), and 2 means simplified through z3. :param errored: A set of backends that are known to be unable to handle this AST. :param eager_backends: A list of backends with which to attempt eager evaluation :param annotations: A frozenset of annotations applied onto this AST. """ # if any(isinstance(a, BackendObject) for a in args): # raise Exception('asdf') a_args = args if type(args) is tuple else tuple(args) # initialize the following properties: symbolic, variables and errored need_symbolic = "symbolic" not in kwargs need_variables = "variables" not in kwargs need_errored = "errored" not in kwargs args_have_annotations = None # Note that `args_have_annotations` may not be set if we don't need to set any of the above variables, in which # case it will stay as None, and will be passed to __a_init__() "as is". __a_init__() will properly handle it # there. arg_max_depth = 0 if need_symbolic or need_variables or need_errored: symbolic_flag = False variables_set = set() errored_set = set() for a in a_args: if not isinstance(a, Base): continue if need_symbolic and not symbolic_flag: symbolic_flag |= a.symbolic if need_variables: variables_set |= a.variables if need_errored: errored_set |= a._errored if args_have_annotations is not True: args_have_annotations = args_have_annotations or bool(a.annotations) if arg_max_depth < a.depth: arg_max_depth = a.depth if need_symbolic: kwargs["symbolic"] = symbolic_flag if need_variables: kwargs["variables"] = frozenset(variables_set) if need_errored: kwargs["errored"] = errored_set if type(kwargs["variables"]) is not frozenset: # pylint:disable=unidiomatic-typecheck kwargs["variables"] = frozenset(kwargs["variables"]) if add_variables: kwargs["variables"] = kwargs["variables"] | add_variables eager_backends = list(backends._eager_backends) if "eager_backends" not in kwargs else kwargs["eager_backends"] if not kwargs["symbolic"] and eager_backends is not None and op not in operations.leaf_operations: for eb in eager_backends: try: r = operations._handle_annotations(eb._abstract(, args)), args) if r is not None: return r else: eager_backends.remove(eb) except BackendError: eager_backends.remove(eb) # if we can't be eager anymore, null out the eagerness kwargs["eager_backends"] = None # whether this guy is initialized or not if "uninitialized" not in kwargs: kwargs["uninitialized"] = None if "uc_alloc_depth" not in kwargs: kwargs["uc_alloc_depth"] = None if "annotations" not in kwargs or kwargs["annotations"] is None: annotations = () else: annotations = kwargs["annotations"] # process annotations if "skip_child_annotations" in kwargs: skip_child_annotations = kwargs.pop("skip_child_annotations") else: skip_child_annotations = False if not annotations and not args_have_annotations: uneliminatable_annotations = frozenset() relocatable_annotations = frozenset() else: ast_args = tuple(a for a in a_args if isinstance(a, Base)) uneliminatable_annotations = frozenset( chain( ( from_iterable(a._uneliminatable_annotations for a in ast_args) if not skip_child_annotations else tuple() ), tuple(a for a in annotations if not a.eliminatable and not a.relocatable), ) ) relocatable_annotations = tuple( OrderedDict( (e, True) for e in tuple( chain( ( from_iterable(a._relocatable_annotations for a in ast_args) if not skip_child_annotations else tuple() ), tuple(a for a in annotations if not a.eliminatable and a.relocatable), ) ) ).keys() ) annotations = tuple( chain( ( from_iterable(a._relocatable_annotations for a in ast_args) if not skip_child_annotations else tuple() ), tuple(a for a in annotations), ) ) kwargs["annotations"] = annotations cache = cls._hash_cache if hash is not None: h = hash elif op in {"BVS", "BVV", "BoolS", "BoolV", "FPS", "FPV"} and not annotations: if op == "FPV" and a_args[0] == 0.0 and math.copysign(1, a_args[0]) < 0: # Python does not distinguish between +0.0 and -0.0 so we add sign to tuple to distinguish h = (op, kwargs.get("length", None), ("-",) + a_args) elif op == "FPV" and math.isnan(a_args[0]): # cannot compare nans h = (op, kwargs.get("length", None), ("nan",) + a_args[1:]) else: h = (op, kwargs.get("length", None), a_args) cache = cls._leaf_cache else: h = Base._calc_hash(op, a_args, kwargs) if hash is None else hash self = cache.get(h, None) if self is None: self = super().__new__(cls) depth = arg_max_depth + 1 self.__a_init__( op, a_args, depth=depth, uneliminatable_annotations=uneliminatable_annotations, relocatable_annotations=relocatable_annotations, **kwargs, ) self._hash = h cache[h] = self # else: # if self.args != a_args or self.op != op or self.variables != kwargs['variables']: # raise Exception("CRAP -- hash collision") return self @classmethod def __init_with_annotations__( cls, op, a_args, depth=None, uneliminatable_annotations=None, relocatable_annotations=None, **kwargs ): cache = cls._hash_cache h = Base._calc_hash(op, a_args, kwargs) self = cache.get(h, None) if self is not None: return self self = super().__new__(cls) self.__a_init__( op, a_args, depth=depth, uneliminatable_annotations=uneliminatable_annotations, relocatable_annotations=relocatable_annotations, **kwargs, ) self._hash = h cache[h] = self return self def __reduce__(self): # HASHCONS: these attributes key the cache # BEFORE CHANGING THIS, SEE ALL OTHER INSTANCES OF "HASHCONS" IN THIS FILE return _d, ( self._hash, self.__class__, (self.op, self.args, self.length, self.variables, self.symbolic, self.annotations), )
[docs] def __init__(self, *args, **kwargs): pass
@staticmethod def _calc_hash(op, args, keywords): """ Calculates the hash of an AST, given the operation, args, and kwargs. :param op: The operation. :param args: The arguments to the operation. :param keywords: A dict including the 'symbolic', 'variables', and 'length' items. :returns: a hash. We do it using md5 to avoid hash collisions. (hash(-1) == hash(-2), for example) """ args_tup = tuple(a if type(a) in (int, float) else getattr(a, "_hash", hash(a)) for a in args) # HASHCONS: these attributes key the cache # BEFORE CHANGING THIS, SEE ALL OTHER INSTANCES OF "HASHCONS" IN THIS FILE to_hash = Base._ast_serialize(op, args_tup, keywords) if to_hash is None: # fall back to pickle.dumps to_hash = ( op, args_tup, str(keywords.get("length", None)), hash(keywords["variables"]), keywords["symbolic"], hash(keywords.get("annotations", None)), ) to_hash = pickle.dumps(to_hash, -1) # Why do we use md5 when it's broken? Because speed is more important # than cryptographic integrity here. Then again, look at all those # allocations we're doing here... fast python is painful. hd = md5.md5(to_hash).digest() return md5_unpacker.unpack(hd)[0] # 64 bits @staticmethod def _arg_serialize(arg) -> Optional[bytes]: if arg is None: return b"\x0f" elif arg is True: return b"\x1f" elif arg is False: return b"\x2e" elif type(arg) is int: if arg < 0: if arg >= -0x7FFF: return b"-" + struct.pack("<h", arg) elif arg >= -0x7FFF_FFFF: return b"-" + struct.pack("<i", arg) elif arg >= -0x7FFF_FFFF_FFFF_FFFF: return b"-" + struct.pack("<q", arg) return None else: if arg <= 0xFFFF: return struct.pack("<H", arg) elif arg <= 0xFFFF_FFFF: return struct.pack("<I", arg) elif arg <= 0xFFFF_FFFF_FFFF_FFFF: return struct.pack("<Q", arg) return None elif type(arg) is str: return arg.encode() elif type(arg) is float: return struct.pack("f", arg) elif type(arg) is tuple: arr = [] for elem in arg: b = Base._arg_serialize(elem) if b is None: return None arr.append(b) return b"".join(arr) return None @staticmethod def _ast_serialize(op: str, args_tup, keywords) -> Optional[bytes]: """ Serialize the AST and get a bytestring for hashing. :param op: The operator. :param args_tup: A tuple of arguments. :param keywords: A dict of keywords. :return: The serialized bytestring. """ serialized_args = Base._arg_serialize(args_tup) if serialized_args is None: return None if "length" in keywords: length = Base._arg_serialize(keywords["length"]) if length is None: return None else: length = b"none" variables = struct.pack("<Q", hash(keywords["variables"]) & 0xFFFF_FFFF_FFFF_FFFF) symbolic = b"\x01" if keywords["symbolic"] else b"\x00" if "annotations" in keywords: annotations = struct.pack("<Q", hash(keywords["annotations"]) & 0xFFFF_FFFF_FFFF_FFFF) else: annotations = b"\xf9" return op.encode() + serialized_args + length + variables + symbolic + annotations # pylint:disable=attribute-defined-outside-init def __a_init__( self, op, args, variables=None, symbolic=None, length=None, simplified=0, errored=None, eager_backends=None, uninitialized=None, uc_alloc_depth=None, annotations=None, encoded_name=None, depth=None, uneliminatable_annotations=None, relocatable_annotations=None, ): # pylint:disable=unused-argument """ Initializes an AST. Takes the same arguments as ``Base.__new__()`` We use this instead of ``__init__`` due to python's undesirable behavior w.r.t. automatically calling it on return from ``__new__``. """ # HASHCONS: these attributes key the cache # BEFORE CHANGING THIS, SEE ALL OTHER INSTANCES OF "HASHCONS" IN THIS FILE self.op = op self.args = args if type(args) is tuple else tuple(args) self.length = length self.variables = frozenset(variables) if type(variables) is not frozenset else variables self.symbolic = symbolic self.annotations: Tuple[Annotation] = annotations self._uneliminatable_annotations = uneliminatable_annotations self._relocatable_annotations = relocatable_annotations self.depth = depth if depth is not None else 1 self._eager_backends = eager_backends self._cached_encoded_name = encoded_name self._errored = errored if errored is not None else set() self._simplified = simplified self._cache_key = ASTCacheKey(self) self._excavated = None self._burrowed = None self._uninitialized = uninitialized self._uc_alloc_depth = uc_alloc_depth if len(self.args) == 0: raise ClaripyOperationError("AST with no arguments!") # pylint:enable=attribute-defined-outside-init def __hash__(self): res = self._hash if type(self._hash) is not int: res = hash(self._hash) return res @property def cache_key(self: T) -> ASTCacheKey[T]: """ A key that refers to this AST - this value is appropriate for usage as a key in dictionaries. """ return self._cache_key @property def _encoded_name(self): if self._cached_encoded_name is None: self._cached_encoded_name = self.args[0].encode() # pylint: disable=attribute-defined-outside-init return self._cached_encoded_name # # Collapsing and simplification # # def _models_for(self, backend): # for a in self.args: # backend.convert_expr(a) # else: # yield backend.convert(a)
[docs] def make_like(self: T, op: str, args: Iterable, **kwargs) -> T: if kwargs.pop("simplify", False) is True: # Try to simplify the expression again simplified = simplifications.simpleton.simplify(op, args) else: simplified = None if simplified is not None: op = simplified.op if ( simplified is None and len(kwargs) == 3 and "annotations" in kwargs and kwargs["annotations"] and "skip_child_annotations" in kwargs and kwargs["skip_child_annotations"] is True and "length" in kwargs ): # fast path annotations = tuple(kwargs["annotations"]) uneliminatable_annotations = frozenset( anno for anno in annotations if not anno.eliminatable and not anno.relocatable ) relocatable_annotations = tuple(anno for anno in annotations if not anno.eliminatable and anno.relocatable) return type(self).__init_with_annotations__( op, args, uneliminatable_annotations=uneliminatable_annotations, relocatable_annotations=relocatable_annotations, annotations=annotations, variables=self.variables, uninitialized=self._uninitialized, symbolic=self.symbolic, length=kwargs["length"], depth=self.depth, eager_backends=self._eager_backends, uc_alloc_depth=self._uc_alloc_depth, ) all_operations = operations.leaf_operations_symbolic_with_union if "annotations" not in kwargs: # special case: if self is one of the args, we do not copy annotations over from self since child # annotations will be re-processed during AST creation. if not args or not any(self is arg for arg in args): kwargs["annotations"] = self.annotations if "variables" not in kwargs and op in all_operations: kwargs["variables"] = self.variables if "uninitialized" not in kwargs: kwargs["uninitialized"] = self._uninitialized if "symbolic" not in kwargs and op in all_operations: kwargs["symbolic"] = self.symbolic if simplified is None: # Cannot simplify the expression anymore return type(self)(op, args, **kwargs) else: # The expression is simplified r = type(self)(op, simplified.args, **kwargs) return r
def _rename(self, new_name): if self.op not in {"BVS", "BoolS", "FPS"}: raise ClaripyOperationError("rename is only supported on leaf nodes") new_args = (new_name,) + self.args[1:] return self.make_like(self.op, new_args, length=self.length, variables={new_name}) # # Annotations # def _apply_to_annotations(self, f): return self.make_like(self.op, self.args, annotations=f(self.annotations), skip_child_annotations=True)
[docs] def append_annotation(self: T, a: "Annotation") -> T: """ Appends an annotation to this AST. :param a: the annotation to append :returns: a new AST, with the annotation added """ return self._apply_to_annotations(lambda alist: alist + (a,))
[docs] def append_annotations(self: T, new_tuple: Tuple["Annotation", ...]) -> T: """ Appends several annotations to this AST. :param new_tuple: the tuple of annotations to append :returns: a new AST, with the annotations added """ return self._apply_to_annotations(lambda alist: alist + new_tuple)
[docs] def annotate(self: T, *args: "Annotation", remove_annotations: Optional[Iterable["Annotation"]] = None) -> T: """ Appends annotations to this AST. :param args: the tuple of annotations to append (variadic positional args) :param remove_annotations: annotations to remove :returns: a new AST, with the annotations added """ if not remove_annotations: return self._apply_to_annotations(lambda alist: alist + args) else: return self._apply_to_annotations( lambda alist: tuple(arg for arg in alist if arg not in remove_annotations) + args )
[docs] def insert_annotation(self: T, a: "Annotation") -> T: """ Inserts an annotation to this AST. :param a: the annotation to insert :returns: a new AST, with the annotation added """ return self._apply_to_annotations(lambda alist: (a,) + alist)
[docs] def insert_annotations(self: T, new_tuple: Tuple["Annotation", ...]) -> T: """ Inserts several annotations to this AST. :param new_tuple: the tuple of annotations to insert :returns: a new AST, with the annotations added """ return self._apply_to_annotations(lambda alist: new_tuple + alist)
[docs] def replace_annotations(self: T, new_tuple: Tuple["Annotation", ...]) -> T: """ Replaces annotations on this AST. :param new_tuple: the tuple of annotations to replace the old annotations with :returns: a new AST, with the annotations added """ return self._apply_to_annotations(lambda alist: new_tuple)
[docs] def remove_annotation(self: T, a: "Annotation") -> T: """ Removes an annotation from this AST. :param a: the annotation to remove :returns: a new AST, with the annotation removed """ return self._apply_to_annotations(lambda alist: tuple(oa for oa in alist if oa != a))
[docs] def remove_annotations(self: T, remove_sequence: Iterable["Annotation"]) -> T: """ Removes several annotations from this AST. :param remove_sequence: a sequence/set of the annotations to remove :returns: a new AST, with the annotations removed """ return self._apply_to_annotations(lambda alist: tuple(oa for oa in alist if oa not in remove_sequence))
# # Viewing and debugging #
[docs] def dbg_repr(self, prefix=None) -> str: # pylint:disable=unused-argument """ Returns a debug representation of this AST. """ return self.shallow_repr(max_depth=None, details=Base.FULL_REPR)
def _type_name(self): return self.__class__.__name__ def __repr__(self, inner=False, max_depth=None, explicit_length=False): if WORKER: return "<AST something>" else: return self.shallow_repr(max_depth=max_depth, explicit_length=explicit_length, inner=inner)
[docs] def shallow_repr( self, max_depth=8, explicit_length=False, details=LITE_REPR, inner=False, parent_prec=15, left=True ) -> str: """ Returns a string representation of this AST, but with a maximum depth to prevent floods of text being printed. :param max_depth: The maximum depth to print. :param explicit_length: Print lengths of BVV arguments. :param details: An integer value specifying how detailed the output should be: LITE_REPR - print short repr for both operations and BVs, MID_REPR - print full repr for operations and short for BVs, FULL_REPR - print full repr of both operations and BVs. :param inner: whether or not it is an inner AST :param parent_prec: parent operation precedence level :param left: whether or not it is a left AST :returns: A string representing the AST """ if max_depth is not None and max_depth <= 0: return "<...>" elif self.op in operations.reversed_ops: op = operations.reversed_ops[self.op] args = reversed(self.args) else: op = self.op args = self.args next_max_depth = max_depth - 1 if max_depth is not None else None length = self.length if explicit_length else None # if operation is not in op_precedence, assign the "least operation precedence" op_prec = operations.op_precedence[op] if op in operations.op_precedence else 15 args = [ ( arg.shallow_repr(next_max_depth, explicit_length, details, True, op_prec, idx == 0) if isinstance(arg, Base) else arg ) for idx, arg in enumerate(args) ] prec_diff = parent_prec - op_prec inner_infix_use_par = prec_diff < 0 or prec_diff == 0 and not left inner_repr = self._op_repr(op, args, inner, length, details, inner_infix_use_par) if not inner: return f"<{self._type_name()} {inner_repr}>" else: return inner_repr
@staticmethod def _op_repr(op, args, inner, length, details, inner_infix_use_par): if details < Base.FULL_REPR: if op == "BVS": extras = [] if args[1] is not None: fmt = "%#x" if type(args[1]) is int else "%s" extras.append("min=%s" % (fmt % args[1])) if args[2] is not None: fmt = "%#x" if type(args[2]) is int else "%s" extras.append("max=%s" % (fmt % args[2])) if args[3] is not None: fmt = "%#x" if type(args[3]) is int else "%s" extras.append("stride=%s" % (fmt % args[3])) if args[4] is True: extras.append("UNINITIALIZED") return "{}{}".format(args[0], "{%s}" % ", ".join(extras) if extras else "") elif op == "BoolV": return str(args[0]) elif op == "BVV": if args[0] is None: value = "!" elif args[1] < 10: value = format(args[0], "") else: value = format(args[0], "#x") return value + "#%d" % length if length is not None else value if details < Base.MID_REPR: if op == "If": value = f"if {args[0]} then {args[1]} else {args[2]}" return f"({value})" if inner else value elif op == "Not": return f"!{args[0]}" elif op == "Extract": return f"{args[2]}[{args[0]}:{args[1]}]" elif op == "ZeroExt": value = f"0#{args[0]} .. {args[1]}" return f"({value})" if inner else value elif op in operations.prefix: assert len(args) == 1 value = f"{operations.prefix[op]}{args[0]}" return f"({value})" if inner and inner_infix_use_par else value elif op in operations.infix: value = f" {operations.infix[op]} ".join(args) return f"({value})" if inner and inner_infix_use_par else value return "{}({})".format(op, ", ".join(str(arg) for arg in args))
[docs] def children_asts(self) -> Iterator["Base"]: """ Return an iterator over the nested children ASTs. """ ast_queue = deque([iter(self.args)]) while ast_queue: try: ast = next(ast_queue[-1]) except StopIteration: ast_queue.pop() continue if isinstance(ast, Base): ast_queue.append(iter(ast.args)) l.debug("Yielding AST %s with hash %s with %d children", ast, hash(ast), len(ast.args)) yield ast
[docs] def leaf_asts(self) -> Iterator["Base"]: """ Return an iterator over the leaf ASTs. """ seen = set() ast_queue = deque([self]) while ast_queue: ast = ast_queue.pop() if isinstance(ast, Base) and id(ast.cache_key) not in seen: seen.add(id(ast.cache_key)) if ast.depth == 1: yield ast continue ast_queue.extend(ast.args) continue
# TODO: Deprecate this property @property def recursive_children_asts(self): """ DEPRECATED: Use children_asts() instead. """ return self.children_asts() # TODO: Deprecate this property @property def recursive_leaf_asts(self): """ DEPRECATED: Use leaf_asts() instead. """ return self.leaf_asts()
[docs] def dbg_is_looped(self): l.debug("Checking AST with hash %s for looping", hash(self)) seen = set() for child_ast in self.children_asts(): if hash(child_ast) in seen: return child_ast seen.add(hash(child_ast)) return False
# # Various AST modifications (replacements) #
[docs] def swap_args(self: T, new_args, new_length=None, **kwargs) -> T: """ This returns the same AST, with the arguments swapped out for new_args. """ if len(self.args) == len(new_args) and all(a is b for a, b in zip(self.args, new_args)): return self # symbolic = any(a.symbolic for a in new_args if isinstance(a, Base)) # variables = frozenset.union(frozenset(), *(a.variables for a in new_args if isinstance(a, Base))) length = self.length if new_length is None else new_length a = self.make_like(self.op, new_args, length=length, **kwargs) # if a.op != self.op or a.symbolic != self.symbolic or a.variables != self.variables: # raise ClaripyOperationError("major bug in swap_args()") return a
# # Other helper functions #
[docs] def split(self, split_on: Iterable[str]) -> List: """ Splits the AST if its operation is `split_on` (i.e., return all the arguments). Otherwise, return a list with just the AST. """ if self.op in split_on: return list(self.args) else: return [self]
# we don't support iterating over Base objects def __iter__(self) -> NoReturn: """ This prevents people from iterating over ASTs. """ raise ClaripyOperationError("Please don't iterate over, or split, AST nodes!") def __bool__(self) -> NoReturn: """ This prevents people from accidentally using an AST as a condition. For example, the following was previously common:: a,b = two ASTs if a == b: do something The problem is that `a == b` would return an AST, because an AST can be symbolic and there could be no way to actually know the value of that without a constraint solve. This caused tons of issues. """ raise ClaripyOperationError( "testing Expressions for truthiness does not do what you want, as these expressions can be symbolic" )
[docs] def structurally_match(self: T, o: T) -> bool: """ Structurally compares two A objects, and check if their corresponding leaves are definitely the same A object (name-wise or hash-identity wise). :param o: the other claripy A object :returns: True/False """ # TODO: Convert a and b into canonical forms if self.op != o.op: return False if len(self.args) != len(o.args): return False for arg_a, arg_b in zip(self.args, o.args): if not isinstance(arg_a, Base): if type(arg_a) != type(arg_b): return False # They are not ASTs if arg_a != arg_b: return False else: continue if arg_a.op in operations.leaf_operations: if arg_a is not arg_b: return False else: if not arg_a.structurally_match(arg_b): return False return True
[docs] def replace_dict(self: T, replacements, variable_set=None, leaf_operation=None) -> T: """ Returns this AST with subexpressions replaced by those that can be found in `replacements` dict. :param variable_set: For optimization, ast's without these variables are not checked for replacing. :param replacements: A dictionary of hashes to their replacements. :param leaf_operation: An operation that should be applied to the leaf nodes. :returns: An AST with all instances of ast's in replacements. """ if variable_set is None: variable_set = set() if leaf_operation is None: leaf_operation = lambda x: x arg_queue = [iter([self])] rep_queue = [] ast_queue = [] while arg_queue: try: ast = next(arg_queue[-1]) repl = ast if not isinstance(ast, Base): rep_queue.append(repl) continue elif ast.cache_key in replacements: repl = replacements[ast.cache_key] elif ast.variables >= variable_set: if ast.op in operations.leaf_operations: repl = leaf_operation(ast) if repl is not ast: replacements[ast.cache_key] = repl elif ast.depth > 1: arg_queue.append(iter(ast.args)) ast_queue.append(ast) continue rep_queue.append(repl) continue except StopIteration: arg_queue.pop() if ast_queue: ast = ast_queue.pop() repl = ast args = rep_queue[-len(ast.args) :] del rep_queue[-len(ast.args) :] # Check if replacement occurred. if any((a is not b for a, b in zip(ast.args, args))): repl = ast.make_like(ast.op, tuple(args)) replacements[ast.cache_key] = repl rep_queue.append(repl) assert len(arg_queue) == 0, "arg_queue is not empty" assert len(ast_queue) == 0, "ast_queue is not empty" assert len(rep_queue) == 1, ("rep_queue has unexpected length", len(rep_queue)) return rep_queue.pop()
[docs] def replace(self: T, old, new, variable_set=None, leaf_operation=None) -> T: # pylint:disable=unused-argument """ Returns this AST but with the AST 'old' replaced with AST 'new' in its subexpressions. """ self._check_replaceability(old, new) replacements = {old.cache_key: new} return self.replace_dict(replacements, variable_set=old.variables)
@staticmethod def _check_replaceability(old, new): if not isinstance(old, Base) or not isinstance(new, Base): raise ClaripyReplacementError("replacements must be AST nodes") if type(old) is not type(new): raise ClaripyReplacementError(f"cannot replace type {type(old)} ast with type {type(new)} ast") def _identify_vars(self, all_vars, counter): if self.op == "BVS": if self.args not in all_vars: all_vars[self.args] = BV("BVS", self.args, length=self.length, explicit_name=True) elif self.op == "BoolS": if self.args not in all_vars: all_vars[self.args] = BoolS("var_" + str(next(counter))) else: for arg in self.args: if isinstance(arg, Base): arg._identify_vars(all_vars, counter)
[docs] def canonicalize(self: T, var_map=None, counter=None) -> T: counter = itertools.count() if counter is None else counter var_map = {} if var_map is None else var_map for v in self.leaf_asts(): if v.cache_key not in var_map and v.op in {"BVS", "BoolS", "FPS"}: new_name = "canonical_%d" % next(counter) var_map[v.cache_key] = v._rename(new_name) return var_map, counter, self.replace_dict(var_map)
# # This code handles burrowing ITEs deeper into the ast and excavating # them to shallower levels. # def _burrow_ite(self): if self.op != "If": # print("i'm not an if") return self.swap_args([(a.ite_burrowed if isinstance(a, Base) else a) for a in self.args]) if not all(isinstance(a, Base) for a in self.args): # print("not all my args are bases") return self old_true = self.args[1] old_false = self.args[2] if old_true.op != old_false.op or len(old_true.args) != len(old_false.args): return self if old_true.op == "If": # let's no go into this right now return self if any(a.op in operations.leaf_operations for a in self.args): # burrowing through these is pretty funny return self matches = [old_true.args[i] is old_false.args[i] for i in range(len(old_true.args))] if matches.count(True) != 1 or all(matches): # TODO: handle multiple differences for multi-arg ast nodes # print("wrong number of matches:",matches,old_true,old_false) return self different_idx = matches.index(False) inner_if = If(self.args[0], old_true.args[different_idx], old_false.args[different_idx]) new_args = list(old_true.args) new_args[different_idx] = inner_if.ite_burrowed # print("replaced the",different_idx,"arg:",new_args) return old_true.__class__(old_true.op, new_args, length=self.length) def _excavate_ite(self): ast_queue = [iter([self])] arg_queue = [] op_queue = [] while ast_queue: try: ast = next(ast_queue[-1]) if not isinstance(ast, Base): arg_queue.append(ast) continue if ast.op in operations.leaf_operations: arg_queue.append(ast) continue if ast.annotations: arg_queue.append(ast) continue op_queue.append(ast) ast_queue.append(iter(ast.args)) except StopIteration: ast_queue.pop() if op_queue: op = op_queue.pop() args = arg_queue[-len(op.args) :] del arg_queue[-len(op.args) :] ite_args = [isinstance(a, Base) and a.op == "If" for a in args] if op.op == "If": # if we are an If, call the If handler so that we can take advantage of its simplifiers excavated = If(*args) elif ite_args.count(True) == 0: # if there are no ifs that came to the surface, there's nothing more to do excavated = op.swap_args(args, simplify=True) else: # this gets called when we're *not* in an If, but there are Ifs in the args. # it pulls those Ifs out to the surface. cond = args[ite_args.index(True)].args[0] new_true_args = [] new_false_args = [] for a in args: if not isinstance(a, Base) or a.op != "If": new_true_args.append(a) new_false_args.append(a) elif a.args[0] is cond: new_true_args.append(a.args[1]) new_false_args.append(a.args[2]) elif a.args[0] is Not(cond): new_true_args.append(a.args[2]) new_false_args.append(a.args[1]) else: # weird conditions -- giving up! excavated = op.swap_args(args, simplify=True) break else: excavated = If( cond, op.swap_args(new_true_args, simplify=True), op.swap_args(new_false_args, simplify=True), ) # continue arg_queue.append(excavated) assert len(op_queue) == 0, "op_queue is not empty" assert len(ast_queue) == 0, "ast_queue is not empty" assert len(arg_queue) == 1, ("arg_queue has unexpected length", len(arg_queue)) return arg_queue.pop() @property def ite_burrowed(self: T) -> T: """ Returns an equivalent AST that "burrows" the ITE expressions as deep as possible into the ast, for simpler printing. """ if self._burrowed is None: self._burrowed = self._burrow_ite() # pylint:disable=attribute-defined-outside-init self._burrowed._burrowed = self._burrowed # pylint:disable=attribute-defined-outside-init return self._burrowed @property def ite_excavated(self: T) -> T: """ Returns an equivalent AST that "excavates" the ITE expressions out as far as possible toward the root of the AST, for processing in static analyses. """ if self._excavated is None: self._excavated = self._excavate_ite() # pylint:disable=attribute-defined-outside-init # we set the flag for the children so that we avoid re-excavating during # VSA backend evaluation (since the backend evaluation recursively works on # the excavated ASTs) self._excavated._excavated = self._excavated return self._excavated # # these are convenience operations # def _first_backend(self, what): for b in backends._all_backends: if b in self._errored or b.is_smt_backend: continue try: return getattr(b, what)(self) except BackendError: pass return None @property def concrete_value(self): return self._model_concrete.value # yan I'm gonna kill you @property def cv(self): return self._model_concrete.value @property def v(self): return self._model_concrete.value @property def singlevalued(self) -> bool: return self._first_backend("singlevalued") @property def multivalued(self) -> bool: return self._first_backend("multivalued") @property def cardinality(self) -> int: return self._first_backend("cardinality") @property def concrete(self) -> bool: # fast path if self.op in {"BVV", "BoolV", "FPV"}: return True if self.op in {"BVS", "BoolS", "FPS"}: return False if self.variables: return False return backends.concrete.handles(self) @property def uninitialized(self) -> bool: """ Whether this AST comes from an uninitialized dereference or not. It's only used in under-constrained symbolic execution mode. :returns: True/False/None (unspecified). """ # TODO: It should definitely be moved to the proposed Annotation backend. return self._uninitialized @property def uc_alloc_depth(self) -> int: """ The depth of allocation by lazy-initialization. It's only used in under-constrained symbolic execution mode. :returns: An integer indicating the allocation depth, or None if it's not from lazy-initialization. """ # TODO: It should definitely be moved to the proposed Annotation backend. return self._uc_alloc_depth
[docs] def to_claripy(self: T) -> T: """ Returns itself. Provides compatibility with other classes (such as SimActionObject) which provide a similar method to unwrap to an AST. """ return self
# # Backwards compatibility crap # def __getattr__(self, a): if not a.startswith("_model_"): raise AttributeError(a) model_name = a[7:] if not hasattr(backends, model_name): raise AttributeError(a) try: return getattr(backends, model_name).convert(self) except BackendError: return self
[docs]def simplify(e: T) -> T: if isinstance(e, Base) and e.op in operations.leaf_operations: return e s = e._first_backend("simplify") if s is None: l.debug("Unable to simplify expression") return e else: # Copy some parameters (that should really go to the Annotation backend) s._uninitialized = e.uninitialized s._uc_alloc_depth = e._uc_alloc_depth s._simplified = Base.FULL_SIMPLIFY # dealing with annotations if e.annotations: ast_args = tuple(a for a in e.args if isinstance(a, Base)) annotations = tuple( set(chain(from_iterable(a._relocatable_annotations for a in ast_args), tuple(a for a in e.annotations))) ) if annotations != s.annotations: s = s.remove_annotations(s.annotations) s = s.annotate(*annotations) return s
from ..errors import BackendError, ClaripyOperationError, ClaripyReplacementError from .. import operations from ..backend_manager import backends from ..ast.bool import If, Not, BoolS from import BV from .. import simplifications