# ruff: noqa: F401
from __future__ import annotations
import functools
import sys
import contextlib
from collections import defaultdict
from inspect import Signature
from typing import TYPE_CHECKING, TypeVar, Generic, cast
from collections.abc import Callable
from types import NoneType
from itertools import chain
from traceback import format_exception
import logging
import time
import typing
from rich import progress
from angr.misc.plugins import PluginVendor, VendorPreset
from angr.misc.ux import deprecated
from angr.misc import telemetry
if TYPE_CHECKING:
from angr.knowledge_base import KnowledgeBase
from angr.project import Project
from typing_extensions import ParamSpec
from .identifier import Identifier
from .callee_cleanup_finder import CalleeCleanupFinder
from .vsa_ddg import VSA_DDG
from .cdg import CDG
from .bindiff import BinDiff
from .cfg import CFGEmulated
from .cfg import CFBlanket
from .cfg import CFG
from .cfg import CFGFast
from .static_hooker import StaticHooker
from .ddg import DDG
from .congruency_check import CongruencyCheck
from .reassembler import Reassembler
from .backward_slice import BackwardSlice
from .binary_optimizer import BinaryOptimizer
from .vfg import VFG
from .loopfinder import LoopFinder
from .disassembly import Disassembly
from .veritesting import Veritesting
from .code_tagging import CodeTagging
from .boyscout import BoyScout
from .variable_recovery import VariableRecoveryFast
from .variable_recovery import VariableRecovery
from .reaching_definitions import ReachingDefinitionsAnalysis
from .complete_calling_conventions import CompleteCallingConventionsAnalysis
from .decompiler.clinic import Clinic
from .propagator import PropagatorAnalysis
from .calling_convention import CallingConventionAnalysis
from .decompiler.decompiler import Decompiler
from .xrefs import XRefsAnalysis
AnalysisParams = ParamSpec("AnalysisParams")
l = logging.getLogger(name=__name__)
t = telemetry.get_tracer(name=__name__)
[docs]
class AnalysisLogEntry:
[docs]
def __init__(self, message, exc_info=False):
if exc_info:
(e_type, value, traceback) = sys.exc_info()
self.exc_type = e_type
self.exc_value = value
self.exc_traceback = traceback
else:
self.exc_type = None
self.exc_value = None
self.exc_traceback = None
self.message = message
[docs]
def format(self) -> str:
if self.exc_traceback is None:
return self.message
return "\n".join((*format_exception(self.exc_type, self.exc_value, self.exc_traceback), "", self.message))
def __getstate__(self):
return (
str(self.__dict__.get("exc_type")),
str(self.__dict__.get("exc_value")),
str(self.__dict__.get("exc_traceback")),
self.message,
)
def __setstate__(self, s):
self.exc_type, self.exc_value, self.exc_traceback, self.message = s
def __repr__(self):
if self.exc_type is None:
msg_str = repr(self.message)
if len(msg_str) > 70:
msg_str = msg_str[:66] + "..."
if msg_str[0] in ('"', "'"):
msg_str += msg_str[0]
return f"<AnalysisLogEntry {msg_str}>"
msg_str = repr(self.message)
if len(msg_str) > 40:
msg_str = msg_str[:36] + "..."
if msg_str[0] in ('"', "'"):
msg_str += msg_str[0]
return f"<AnalysisLogEntry {msg_str} with {self.exc_type.__name__}: {self.exc_value}>"
A = TypeVar("A", bound="Analysis")
[docs]
class AnalysesHub(PluginVendor[A]):
"""
This class contains functions for all the registered and runnable analyses,
"""
[docs]
def __init__(self, project):
super().__init__()
self.project = project
@deprecated()
def reload_analyses(self): # pylint: disable=no-self-use
return
def _init_plugin(self, plugin_cls: type[A]) -> AnalysisFactory[A]:
return AnalysisFactory(self.project, plugin_cls)
def __getstate__(self):
s = super().__getstate__()
return (s, self.project)
def __setstate__(self, sd):
s, self.project = sd
super().__setstate__(s)
def __getitem__(self, plugin_cls: type[A]) -> AnalysisFactory[A]:
return AnalysisFactory(self.project, plugin_cls)
[docs]
class KnownAnalysesPlugin(typing.Protocol):
Identifier: type[Identifier]
CalleeCleanupFinder: type[CalleeCleanupFinder]
VSA_DDG: type[VSA_DDG]
CDG: type[CDG]
BinDiff: type[BinDiff]
CFGEmulated: type[CFGEmulated]
CFB: type[CFBlanket]
CFBlanket: type[CFBlanket]
CFG: type[CFG]
CFGFast: type[CFGFast]
StaticHooker: type[StaticHooker]
DDG: type[DDG]
CongruencyCheck: type[CongruencyCheck]
Reassembler: type[Reassembler]
BackwardSlice: type[BackwardSlice]
BinaryOptimizer: type[BinaryOptimizer]
VFG: type[VFG]
LoopFinder: type[LoopFinder]
Disassembly: type[Disassembly]
Veritesting: type[Veritesting]
CodeTagging: type[CodeTagging]
BoyScout: type[BoyScout]
VariableRecoveryFast: type[VariableRecoveryFast]
VariableRecovery: type[VariableRecovery]
ReachingDefinitions: type[ReachingDefinitionsAnalysis]
CompleteCallingConventions: type[CompleteCallingConventionsAnalysis]
Clinic: type[Clinic]
Propagator: type[PropagatorAnalysis]
CallingConvention: type[CallingConventionAnalysis]
Decompiler: type[Decompiler]
XRefs: type[XRefsAnalysis]
[docs]
class AnalysesHubWithDefault(AnalysesHub, KnownAnalysesPlugin):
"""
This class has type-hinting for all built-in analyses plugin
"""
[docs]
class AnalysisFactory(Generic[A]):
[docs]
def __init__(self, project: Project, analysis_cls: type[A]):
self._project = project
self._analysis_cls = analysis_cls
self.__doc__ = ""
self.__doc__ += analysis_cls.__doc__ or ""
self.__doc__ += analysis_cls.__init__.__doc__ or ""
self.__call__.__func__.__signature__ = Signature.from_callable(analysis_cls.__init__)
[docs]
def prep(
self,
fail_fast=False,
kb: KnowledgeBase | None = None,
progress_callback: Callable | None = None,
show_progressbar: bool = False,
) -> type[A]:
@functools.wraps(self._analysis_cls.__init__)
@t.start_as_current_span(self._analysis_cls.__name__)
def wrapper(*args, **kwargs):
span = telemetry.get_current_span()
sig = cast(Signature, self.__call__.__func__.__signature__)
bound = sig.bind(None, *args, **kwargs)
for name, val in chain(bound.arguments.items(), bound.arguments.get("kwargs", {}).items()):
if name in ("kwargs", "self"):
continue
if isinstance(val, (str, bytes, bool, int, float, NoneType)):
if val is None:
span.set_attribute(f"arg.{name}.is_none", True)
else:
span.set_attribute(f"arg.{name}", val)
elif isinstance(val, (list, tuple, set, frozenset)):
listval = list(val)
if not listval or (
isinstance(listval[0], (str, bytes, bool, int, float))
and all(type(sval) is type(listval[0]) for sval in listval)
):
span.set_attribute(f"arg.{name}", listval)
elif isinstance(val, dict):
listval_keys = list(val)
listval_values = list(val.values())
if not listval_keys or (
isinstance(listval_keys[0], (str, bytes, bool, int, float))
and all(type(sval) is type(listval_keys[0]) for sval in listval_keys)
):
span.set_attribute(f"arg.{name}.keys", listval_keys)
if not listval_values or (
isinstance(listval_values[0], (str, bytes, bool, int, float))
and all(type(sval) is type(listval_values[0]) for sval in listval_values)
):
span.set_attribute(f"arg.{name}.values", listval_values)
else:
span.set_attribute(f"arg.{name}.unrepresentable", True)
if self._project.filename is not None:
span.set_attribute("project.binary_name", self._project.filename)
span.set_attribute("project.arch_name", self._project.arch.name)
oself = object.__new__(self._analysis_cls)
oself.named_errors = defaultdict(list)
oself.errors = []
oself.log = []
oself._fail_fast = fail_fast
oself._name = self._analysis_cls.__name__
oself.project = self._project
oself.kb = kb or self._project.kb
oself._progress_callback = progress_callback
oself._show_progressbar = show_progressbar
oself.__init__(*args, **kwargs)
return oself
return wrapper # type: ignore
def __call__(self, *args, **kwargs) -> A:
fail_fast = kwargs.pop("fail_fast", False)
kb = kwargs.pop("kb", self._project.kb)
progress_callback = kwargs.pop("progress_callback", None)
show_progressbar = kwargs.pop("show_progressbar", False)
w = self.prep(
fail_fast=fail_fast, kb=kb, progress_callback=progress_callback, show_progressbar=show_progressbar
)
r = w(*args, **kwargs)
# clean up so that it's always pickleable
r._progressbar = None
return r
[docs]
class Analysis:
"""
This class represents an analysis on the program.
:ivar project: The project for this analysis.
:type project: angr.Project
:ivar KnowledgeBase kb: The knowledgebase object.
:ivar _progress_callback: A callback function for receiving the progress of this analysis. It only takes
one argument, which is a float number from 0.0 to 100.0 indicating the current
progress.
:ivar bool _show_progressbar: If a progressbar should be shown during the analysis. It's independent from
_progress_callback.
:ivar progress.Progress _progressbar: The progress bar object.
"""
project: Project
kb: KnowledgeBase
_fail_fast: bool
_name: str
errors: list[AnalysisLogEntry] = []
named_errors: defaultdict[str, list[AnalysisLogEntry]] = defaultdict(list)
_progress_callback = None
_show_progressbar = False
_progressbar = None
_task = None
_PROGRESS_WIDGETS = [
progress.TaskProgressColumn(),
progress.BarColumn(),
progress.TextColumn("Elapsed Time:"),
progress.TimeElapsedColumn(),
progress.TextColumn("Time:"),
progress.TimeRemainingColumn(),
progress.TextColumn("{task.description}"),
]
@contextlib.contextmanager
def _resilience(self, name=None, exception=Exception):
try:
yield
except exception: # pylint:disable=broad-except
if self._fail_fast:
raise
else:
error = AnalysisLogEntry("exception occurred", exc_info=True)
l.error("Caught and logged %s with resilience: %s", error.exc_type.__name__, error.exc_value)
if name is None:
self.errors.append(error)
else:
self.named_errors[name].append(error)
def _initialize_progressbar(self):
"""
Initialize the progressbar.
:return: None
"""
self._progressbar = progress.Progress(*self._PROGRESS_WIDGETS)
self._task = self._progressbar.add_task(total=100, description="")
self._progressbar.start()
def _update_progress(self, percentage, text=None, **kwargs):
"""
Update the progress with a percentage, including updating the progressbar as well as calling the progress
callback.
:param float percentage: Percentage of the progressbar. from 0.0 to 100.0.
:param kwargs: Other parameters that will be passed to the progress_callback handler.
:return: None
"""
if self._show_progressbar:
if self._progressbar is None:
self._initialize_progressbar()
self._progressbar.update(self._task, completed=percentage)
if text is not None and self._progressbar:
self._progressbar.update(self._task, description=text)
if self._progress_callback is not None:
self._progress_callback(percentage, text=text, **kwargs) # pylint:disable=not-callable
def _finish_progress(self):
"""
Mark the progressbar as finished.
:return: None
"""
if self._show_progressbar:
if self._progressbar is None:
self._initialize_progressbar()
if self._progressbar is not None:
self._progressbar.update(self._task, completed=100)
self._progressbar.stop()
self._progressbar = None
if self._progress_callback is not None:
self._progress_callback(100.0) # pylint:disable=not-callable
@staticmethod
def _release_gil(ctr, freq, sleep_time=0.001):
"""
Periodically calls time.sleep() and releases the GIL so other threads (like, GUI threads) have a much better
chance to be scheduled, and other critical components (like the GUI) can be kept responsiveness.
This is, of course, a hack before we move all computational intensive tasks to pure C++ implementations.
:param int ctr: A number provided by the caller.
:param int freq: How frequently time.sleep() should be called. time.sleep() is called when ctr % freq == 0.
:param sleep_time: Number (or fraction) of seconds to sleep.
:return: None
"""
if ctr != 0 and ctr % freq == 0:
time.sleep(sleep_time)
def __getstate__(self):
d = dict(self.__dict__)
if "_progressbar" in d:
del d["_progressbar"]
if "_progress_callback" in d:
del d["_progress_callback"]
if "_statusbar" in d:
del d["_statusbar"]
return d
def __setstate__(self, state):
self.__dict__.update(state)
def __repr__(self):
return f"<{self._name} Analysis Result at {id(self):#x}>"
default_analyses = VendorPreset()
AnalysesHub.register_preset("default", default_analyses)