Source code for cle.backends.elf.metaelf
from __future__ import annotations
import logging
import os
from collections import OrderedDict
from enum import Enum
import elftools
import pyvex
from elftools.elf.descriptions import describe_ei_osabi
from elftools.elf.dynamic import DynamicSection
from elftools.elf.enums import ENUM_DT_FLAGS
from cle.address_translator import AT
from cle.backends.backend import Backend
from cle.backends.symbol import SymbolType
from cle.utils import stream_or_path
__all__ = ("MetaELF", "Relro", "maybedecode")
log = logging.getLogger(name=__name__)
def maybedecode(string):
# so... it turns out that pyelftools is garbage and will transparently give you either strings or bytestrings
# based on pretty much nothing whatsoever
return string if isinstance(string, str) else string.decode()
def _get_relro(elf):
# The tests for partial and full RELRO have been taken from
# checksec.sh v1.5 (https://www.trapkit.de/tools/checksec/):
# - Partial RELRO has a 'GNU_RELRO' segment
# - Full RELRO also has a 'BIND_NOW' flag in the dynamic section
if not any(seg.header.p_type == "PT_GNU_RELRO" for seg in elf.iter_segments()):
return Relro.NONE
dyn_sec = elf.get_section_by_name(".dynamic")
if dyn_sec is None or not isinstance(dyn_sec, DynamicSection):
return Relro.PARTIAL
flags = [tag for tag in dyn_sec.iter_tags() if tag.entry.d_tag == "DT_FLAGS"]
if len(flags) != 1:
return Relro.PARTIAL
return (
Relro.FULL
if flags[0].entry.d_val & ENUM_DT_FLAGS["DF_BIND_NOW"] == ENUM_DT_FLAGS["DF_BIND_NOW"]
else Relro.PARTIAL
)
[docs]
class MetaELF(Backend):
"""
A base class that implements functions used by all backends that can load an ELF.
"""
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
tmp_reader = elftools.elf.elffile.ELFFile(self._binary_stream)
self.os = describe_ei_osabi(tmp_reader.header.e_ident.EI_OSABI)
self.elfflags = tmp_reader.header.e_flags
self.relro = _get_relro(tmp_reader)
self._plt = {}
self._ppc64_abiv1_initial_rtoc = None
self._cached_plt = None
self._cached_reverse_plt = None
supported_filetypes = ["elf"]
def _block(self, addr, skip_stmts=False):
# for sanity checking. we live in the world of heuristics now.
thumb = self.arch.name.startswith("ARM") and addr % 2 == 1
realaddr = addr
if thumb:
realaddr -= 1
dat = self._block_bytes(realaddr, 40)
return pyvex.IRSB(dat, addr, self.arch, bytes_offset=1 if thumb else 0, opt_level=1, skip_stmts=skip_stmts)
def _block_bytes(self, addr, size):
return self.memory.load(AT.from_lva(addr, self).to_rva(), size)
def _block_references_addr(self, block, addr):
if addr in [c.value for c in block.all_constants]:
return True
if self.arch.name != "X86":
return False
# search for tX = GET(ebx) -> Add32(tX, got_addr - addr)
if ".got.plt" in self.sections_map:
got_sec = self.sections_map[".got.plt"]
elif self.relro is Relro.FULL:
got_sec = self.sections_map[".got"]
else:
return False
tx = None
for stmt in block.statements:
if (
stmt.tag == "Ist_WrTmp"
and stmt.data.tag == "Iex_Get"
and stmt.data.offset == self.arch.registers["ebx"][0]
):
tx = stmt.tmp
if (
tx is not None
and stmt.tag == "Ist_WrTmp"
and stmt.data.tag == "Iex_Binop"
and stmt.data.op == "Iop_Add32"
):
args = sorted(stmt.data.args, key=str)
if (
args[0].tag == "Iex_Const"
and args[0].con.value == addr - got_sec.vaddr
and args[1].tag == "Iex_RdTmp"
and args[1].tmp == tx
):
return True
return False
def _add_plt_stub(self, name, addr, sanity_check=True):
# addr is an LVA
if addr <= 0:
return False
target_addr = self.jmprel[name].linked_addr
try:
if sanity_check and not self._block_references_addr(self._block(addr), target_addr):
return False
except (pyvex.PyVEXError, KeyError):
return False
else:
self._plt[name] = AT.from_lva(addr, self).to_rva()
return True
def _load_plt(self):
# The main problem here is that there's literally no good way to do this.
# like, I read through the binutils source and they have a hacked up solution for each arch
# that performs actual comparisons against the machine code in the plt section.
# it's pretty bad.
# we sanity-check all our attempts by requiring that the block lifted at the given address
# references the GOT slot for the symbol.
plt_secs = []
if ".plt" in self.sections_map:
plt_secs = [self.sections_map[".plt"]]
if ".plt.got" in self.sections_map:
plt_secs = [self.sections_map[".plt.got"]]
if ".MIPS.stubs" in self.sections_map:
plt_secs.append(self.sections_map[".MIPS.stubs"])
if ".plt.sec" in self.sections_map:
plt_secs.append(self.sections_map[".plt.sec"])
self.jmprel = OrderedDict(sorted(self.jmprel.items(), key=lambda x: x[1].linked_addr))
func_jmprel = OrderedDict(
(k, v)
for k, v in self.jmprel.items()
if v.symbol.type not in (SymbolType.TYPE_OBJECT, SymbolType.TYPE_SECTION, SymbolType.TYPE_OTHER)
)
# ATTEMPT 1: some arches will just leave the plt stub addr in the import symbol
if self.arch.name in ("ARM", "ARMEL", "ARMHF", "ARMCortexM", "AARCH64", "MIPS32", "MIPS64"):
for name, reloc in func_jmprel.items():
if not plt_secs or any(plt_sec.contains_addr(reloc.symbol.linked_addr) for plt_sec in plt_secs):
self._add_plt_stub(name, reloc.symbol.linked_addr, sanity_check=bool(plt_secs))
# ATTEMPT 2: on intel chips the data in the got slot pre-relocation points to a lazy-resolver
# stub immediately after the plt stub
if self.arch.name in ("X86", "AMD64"):
for name, reloc in func_jmprel.items():
try:
self._add_plt_stub(name, self.memory.unpack_word(reloc.relative_addr) - 6, sanity_check=True)
except KeyError:
pass
# do another sanity check
if len(set(self._plt.values())) != len(self._plt):
self._plt = {}
# ATTEMPT 3: one ppc scheme I've seen is that there are 16-byte stubs packed together
# right before the resolution stubs.
if self.arch.name in ("PPC32",):
resolver_stubs = sorted(
(self.memory.unpack_word(reloc.relative_addr), name) for name, reloc in func_jmprel.items()
)
if resolver_stubs:
stubs_table = resolver_stubs[0][0] - 16 * len(resolver_stubs)
for i, (_, name) in enumerate(resolver_stubs):
self._add_plt_stub(name, stubs_table + i * 16)
if len(self._plt) == len(func_jmprel):
# real quick, bail out before shit hits the fan
return
# ATTEMPT 4:
# ok. time to go in on this.
# try to find a single plt stub, anywhere. if we already have one, use that, otherwise
# try to scan forward from _start to __libc_start_main to find that one.
# then, scan forward and backward from that stub to find the rest of them. yikes!
# keep a timer so we don't get stuck. keep this short and sweet.
def tick():
tick.bailout_timer -= 1
if tick.bailout_timer <= 0:
raise TimeoutError()
tick.bailout_timer = 5
def scan_forward(addr, name, push=False):
names = [name] if not isinstance(name, (list, tuple)) else name
def block_is_good(blk):
all_constants = {c.value for c in blk.all_constants}
for name in names:
gotslot = func_jmprel[name].linked_addr
if gotslot in all_constants:
block_is_good.name = name
return True
return False
block_is_good.name = None
def is_endbr(addr):
if self.arch.name not in ("X86", "AMD64"):
return False
return self._block_bytes(addr, 4) in (b"\xf3\x0f\x1e\xfa", b"\xf3\x0f\x1e\xfb")
instruction_alignment = self.arch.instruction_alignment
if self.arch.name in ("ARMEL", "ARMHF"):
# hard code alignment for ARM code
instruction_alignment = 4
try:
while True:
tick()
bb = self._block(addr, skip_stmts=False)
step_forward = False
# the block shouldn't touch any cc_* registers
if self.arch.name in ("X86", "AMD64", "ARMEL", "ARMHF", "ARMCortexM"):
cc_regs = {
self.arch.registers["cc_op"][0],
self.arch.registers["cc_ndep"][0],
self.arch.registers["cc_dep1"][0],
self.arch.registers["cc_dep2"][0],
}
if any(
[isinstance(stmt, pyvex.IRStmt.Put) and stmt.offset in cc_regs for stmt in bb.statements]
):
step_forward = True
elif any(
[
isinstance(stmt, pyvex.IRStmt.WrTmp)
and isinstance(stmt.data, pyvex.IRExpr.Get)
and stmt.data.offset in cc_regs
for stmt in bb.statements
]
):
step_forward = True
if step_forward:
# only steps one instruction forward
addr += instruction_alignment
continue
if block_is_good(bb):
break
if bb.jumpkind == "Ijk_NoDecode":
addr += instruction_alignment
else:
addr += bb.size
# "push" means try to increase the address as far as we can without regard for semantics
# the alternative is to only try to lop off nop instructions
# make sure we don't push through endbr
if push:
if block_is_good.name is None:
raise ValueError("block_is_good.name cannot be None.")
old_name = block_is_good.name
block = self._block(addr, skip_stmts=True)
if len(block.instruction_addresses) > 1 and not is_endbr(block.instruction_addresses[0]):
for instruction in block.instruction_addresses[1:]:
candidate_block = self._block(instruction, skip_stmts=False)
if block_is_good(candidate_block) and block_is_good.name == old_name:
addr = candidate_block.addr
if is_endbr(instruction):
break
else:
break
block_is_good.name = old_name
else:
cont = True
while cont:
cont = False
seen_imark = False
# we need to access bb.statements
if bb.statements is None:
# relift without skipping statements
bb = self._block(bb.addr, skip_stmts=False)
for stmt in bb.statements:
if stmt.tag == "Ist_IMark":
if seen_imark:
# good????
bb = self._block(stmt.addr, skip_stmts=False)
if block_is_good(bb):
addr = stmt.addr
cont = True
break
else:
seen_imark = True
elif stmt.tag == "Ist_Put" and stmt.offset == bb.offsIP:
continue
else:
# there's some behavior, not good
break
return self._add_plt_stub(block_is_good.name, addr)
except (TimeoutError, ValueError, KeyError, pyvex.PyVEXError):
return False
if not self._plt and "__libc_start_main" in func_jmprel and self.entry != 0:
# try to scan forward through control flow to find __libc_start_main!
try:
last_jk = None
addr = self.entry
bb = self._block(addr, skip_stmts=True)
target = bb.default_exit_target
while target is not None:
tick()
last_jk = bb.jumpkind
addr = target
bb = self._block(addr, skip_stmts=True)
target = bb.default_exit_target
if last_jk == "Ijk_Call":
self._add_plt_stub("__libc_start_main", addr)
except (TimeoutError, KeyError, pyvex.PyVEXError):
pass
# if func_jmprel.keys()[0] not in self._plt:
if not set(func_jmprel.keys()).intersection(self._plt.keys()):
# Check if we have a .plt section
if not plt_secs:
# WAHP WAHP
return
# some binaries have a bunch of CET stubs before the PLTs, and
# in the worst case we might have to skip over each one of
# these... so we set the bailout timer accordingly
def initial_bailout_timer(func_jmprel):
return len(func_jmprel) + 5
if plt_secs:
# LAST TRY: Find the first block to references ANY GOT slot
tick.bailout_timer = initial_bailout_timer(func_jmprel)
scan_forward(min(plt_sec.vaddr for plt_sec in plt_secs), list(func_jmprel.keys()), push=True)
if not self._plt:
# \(_^^)/
return
# if we've gotten this far there is at least one plt slot address known, guaranteed.
plt_hitlist = [
(name, AT.from_rva(self._plt[name], self).to_lva() if name in self._plt else None) for name in func_jmprel
]
name, addr = plt_hitlist[0]
if addr is None and plt_secs:
# try to resolve the very first entry
tick.bailout_timer = initial_bailout_timer(func_jmprel)
guessed_addr = min(plt_sec.vaddr for plt_sec in plt_secs)
scan_forward(guessed_addr, name, push=True)
if name in self._plt:
# resolved :-)
plt_hitlist[0] = (name, AT.from_rva(self._plt[name], self).to_lva())
next_addr = None
for i, (name, addr) in enumerate(plt_hitlist):
if addr is None:
if next_addr is None:
continue
tick.bailout_timer = 5
scan_forward(next_addr, name, push=True)
if name in self._plt:
addr = AT.from_rva(self._plt[name], self).to_lva()
if addr is not None:
b0 = self._block(addr, skip_stmts=True)
stub_size = b0.size
if isinstance(b0.next, pyvex.expr.Const) and b0.next.con.value == addr + b0.size:
b1 = self._block(addr + b0.size, skip_stmts=True)
stub_size += b1.size
next_addr = addr + stub_size
@property
def plt(self):
"""
Maps names to addresses.
"""
if self._cached_plt is None:
self._cached_plt = {k: AT.from_rva(self._plt[k], self).to_mva() for k in self._plt}
return self._cached_plt
@property
def reverse_plt(self):
"""
Maps addresses to names.
"""
if self._cached_reverse_plt is None:
self._cached_reverse_plt = {AT.from_rva(self._plt[k], self).to_mva(): k for k in self._plt}
return self._cached_reverse_plt
@property
def is_ppc64_abiv1(self):
"""
Returns whether the arch is PowerPC64 ABIv1.
:return: True if PowerPC64 ABIv1, False otherwise.
"""
return self.arch.name == "PPC64" and self.elfflags & 3 < 2
@property
def is_ppc64_abiv2(self):
"""
Returns whether the arch is PowerPC64 ABIv2.
:return: True if PowerPC64 ABIv2, False otherwise.
"""
return self.arch.name == "PPC64" and self.elfflags & 3 == 2
@property
def ppc64_initial_rtoc(self):
"""
Get initial rtoc value for PowerPC64 architecture.
"""
if self.is_ppc64_abiv1:
return self._ppc64_abiv1_initial_rtoc
elif self.is_ppc64_abiv2:
return self._ppc64_abiv2_get_initial_rtoc()
else:
return None
def _ppc64_abiv1_entry_fix(self):
"""
On PowerPC64, the e_flags elf header entry's lowest two bits determine the ABI type. in ABIv1, the entry point
given in the elf headers is not actually the entry point, but rather the address in memory where there
exists a pointer to the entry point.
Utter bollocks, but this function should fix it.
"""
if self.is_ppc64_abiv1:
ep_offset = self._entry
self._entry = self.memory.unpack_word(AT.from_lva(ep_offset, self).to_rva())
self._ppc64_abiv1_initial_rtoc = self.memory.unpack_word(AT.from_lva(ep_offset + 8, self).to_rva())
def _ppc64_abiv2_get_initial_rtoc(self):
"""
Guess initial table of contents value for PPC64 based on .got section.
According to PPC64 ABIv2 Specification (Section 3.3): "the TOC pointer
register typically points to the beginning of the .got section +
0x8000." Guess the initial rtoc value based on that to handle the
typical case.
"""
got_section = self.sections_map.get(".got", None)
if got_section is None:
log.warning("Failed to guess initial rtoc value due to missing .got")
return None
return got_section.vaddr + 0x8000
[docs]
@staticmethod
def extract_soname(path):
with stream_or_path(path) as f:
try:
e = elftools.elf.elffile.ELFFile(f)
for seg in e.iter_segments():
if seg.header.p_type == "PT_NULL":
break
elif seg.header.p_type == "PT_DYNAMIC":
for tag in seg.iter_tags():
if tag.entry.d_tag == "DT_SONAME":
return maybedecode(tag.soname)
if isinstance(path, str):
return os.path.basename(path)
except elftools.common.exceptions.ELFError:
pass
return None