Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libcdb: improve the search speed of search_by_symbol_offsets #2413

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ The table below shows which release corresponds to each branch, and what date th
- [#2376][2376] Return buffered data on first EOF in tube.readline()
- [#2387][2387] Convert apport_corefile() output from bytes-like object to string
- [#2388][2388] libcdb: add `offline_only` to `search_by_symbol_offsets`
- [#2413][2413] libcdb: improve the search speed of `search_by_symbol_offsets`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebase on latest dev please and move this to the 4.15.0 changelog

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to rebase just the CHANGELOG.md. Do I need open a new PR?


[2360]: https://github.com/Gallopsled/pwntools/pull/2360
[2356]: https://github.com/Gallopsled/pwntools/pull/2356
Expand All @@ -95,6 +96,7 @@ The table below shows which release corresponds to each branch, and what date th
[2376]: https://github.com/Gallopsled/pwntools/pull/2376
[2387]: https://github.com/Gallopsled/pwntools/pull/2387
[2388]: https://github.com/Gallopsled/pwntools/pull/2388
[2413]: https://github.com/Gallopsled/pwntools/pull/2413

## 4.13.0 (`beta`)

Expand Down
161 changes: 128 additions & 33 deletions pwnlib/libcdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import six
import tempfile
import struct

from pwnlib.context import context
from pwnlib.elf import ELF
Expand All @@ -23,8 +24,35 @@

log = getLogger(__name__)

HASHES = {
'build_id': lambda path: enhex(ELF(path, checksec=False).buildid or b''),

def _turbofast_extract_build_id(path):
"""
Elf_External_Note:

0x00 +--------+
| namesz | <- Size of entry's owner string
0x04 +--------+
| descsz | <- Size of the note descriptor
0x08 +--------+
| type | <- Interpretation of the descriptor
0x0c +--------+
| name | <- Start of the name+desc data
... +--------
| desc |
... +--------+
"""
data = read(path, 0x1000)
# search NT_GNU_BUILD_ID and b"GNU\x00" (type+name)
idx = data.find(bytes.fromhex("03000000 474e5500"))
if idx == -1:
return enhex(b'')
the-soloist marked this conversation as resolved.
Show resolved Hide resolved
descsz, = struct.unpack("<L", data[idx-4: idx])
return enhex(data[idx+8: idx+8+descsz])


TYPES = {
'libs_id': None,
'build_id': _turbofast_extract_build_id,
'sha1': sha1filehex,
'sha256': sha256filehex,
'md5': md5filehex,
Expand All @@ -42,13 +70,16 @@

# https://gitlab.com/libcdb/libcdb wasn't updated after 2019,
# but still is a massive database of older libc binaries.
def provider_libcdb(hex_encoded_id, hash_type):
def provider_libcdb(hex_encoded_id, search_type):
if search_type == 'libs_id':
return None

# Deferred import because it's slow
import requests
from six.moves import urllib

# Build the URL using the requested hash type
url_base = "https://gitlab.com/libcdb/libcdb/raw/master/hashes/%s/" % hash_type
url_base = "https://gitlab.com/libcdb/libcdb/raw/master/hashes/%s/" % search_type
url = urllib.parse.urljoin(url_base, hex_encoded_id)

data = b""
Expand All @@ -58,15 +89,15 @@ def provider_libcdb(hex_encoded_id, hash_type):
data = wget(url, timeout=20)

if not data:
log.warn_once("Could not fetch libc for %s %s from libcdb", hash_type, hex_encoded_id)
log.warn_once("Could not fetch libc for %s %s from libcdb", search_type, hex_encoded_id)
break

# GitLab serves up symlinks with
if data.startswith(b'..'):
url = os.path.dirname(url) + '/'
url = urllib.parse.urljoin(url.encode('utf-8'), data)
except requests.RequestException as e:
log.warn_once("Failed to fetch libc for %s %s from libcdb: %s", hash_type, hex_encoded_id, e)
log.warn_once("Failed to fetch libc for %s %s from libcdb: %s", search_type, hex_encoded_id, e)
return data

def query_libc_rip(params):
Expand All @@ -86,16 +117,16 @@ def query_libc_rip(params):
return None

# https://libc.rip/
def provider_libc_rip(hex_encoded_id, hash_type):
def provider_libc_rip(search_target, search_type):
# Build the request for the hash type
# https://github.com/niklasb/libc-database/blob/master/searchengine/api.yml
if hash_type == 'build_id':
hash_type = 'buildid'
params = {hash_type: hex_encoded_id}
if search_type == 'build_id':
search_type = 'buildid'
params = {search_type: search_target}
the-soloist marked this conversation as resolved.
Show resolved Hide resolved

libc_match = query_libc_rip(params)
if not libc_match:
log.warn_once("Could not find libc info for %s %s on libc.rip", hash_type, hex_encoded_id)
log.warn_once("Could not find libc info for %s %s on libc.rip", search_type, search_target)
return None

if len(libc_match) > 1:
Expand All @@ -107,13 +138,13 @@ def provider_libc_rip(hex_encoded_id, hash_type):
data = wget(url, timeout=20)

if not data:
log.warn_once("Could not fetch libc binary for %s %s from libc.rip", hash_type, hex_encoded_id)
log.warn_once("Could not fetch libc binary for %s %s from libc.rip", search_type, search_target)
return None
return data

# Check if the local system libc matches the requested hash.
def provider_local_system(hex_encoded_id, hash_type):
if hash_type == 'id':
def provider_local_system(hex_encoded_id, search_type):
if search_type == 'libs_id':
return None
shell_path = os.environ.get('SHELL', None) or '/bin/sh'
if not os.path.exists(shell_path):
Expand All @@ -123,22 +154,29 @@ def provider_local_system(hex_encoded_id, hash_type):
if not local_libc:
log.debug('Cannot lookup libc from shell %r. Skipping local system libc matching.', shell_path)
return None
if HASHES[hash_type](local_libc.path) == hex_encoded_id:
if TYPES[search_type](local_libc.path) == hex_encoded_id:
return local_libc.data
return None

# Offline search https://github.com/niklasb/libc-database for hash type
def provider_local_database(hex_encoded_id, hash_type):
def provider_local_database(search_target, search_type):
if not context.local_libcdb:
return None

localdb = Path(context.local_libcdb)
if not localdb.is_dir():
return None

log.debug("Searching local libc database, %s: %s", hash_type, hex_encoded_id)
# Handle the specific search type 'libs_id'
if search_type == 'libs_id':
libc_list = list(localdb.rglob("%s.so" % search_target))
if len(libc_list) == 0:
return None
return read(libc_list[0])

log.debug("Searching local libc database, %s: %s", search_type, search_target)
for libc_path in localdb.rglob("*.so"):
if hex_encoded_id == HASHES[hash_type](libc_path):
if search_target == TYPES[search_type](libc_path):
return read(libc_path)

return None
Expand Down Expand Up @@ -185,11 +223,28 @@ def query_local_database(params):
"online": [provider_libcdb, provider_libc_rip]
}

def search_by_hash(hex_encoded_id, hash_type='build_id', unstrip=True, offline_only=False):
assert hash_type in HASHES, hash_type
def search_by_hash(search_target, search_type='build_id', unstrip=True, offline_only=False):
"""search_by_hash(str, str, bool, bool) -> str
Arguments:
search_target(str):
Use for searching the libc. This could be a hex encoded ID (`hex_encoded_id`) or a library
name (`libs_id`). Depending on `search_type`, this can represent different types of encoded
values or names.
search_type(str):
The type of the search to be performed, it should be one of the keys in the `TYPES` dictionary.
unstrip(bool):
Try to fetch debug info for the libc and apply it to the downloaded file.
offline_only(bool):
If True, restricts the search to offline providers only (local database). If False, it will also
search online providers. Default is False.

Returns:
The path to the cached directory containing the downloaded libraries.
"""
assert search_type in TYPES, search_type

# Ensure that the libcdb cache directory exists
cache, cache_valid = _check_elf_cache('libcdb', hex_encoded_id, hash_type)
cache, cache_valid = _check_elf_cache('libcdb', search_target, search_type)
if cache_valid:
return cache

Expand All @@ -203,12 +258,12 @@ def search_by_hash(hex_encoded_id, hash_type='build_id', unstrip=True, offline_o

# Run through all available libc database providers to see if we have a match.
for provider in providers:
data = provider(hex_encoded_id, hash_type)
data = provider(search_target, search_type)
if data and data.startswith(b'\x7FELF'):
break

if not data:
log.warn_once("Could not find libc for %s %s anywhere", hash_type, hex_encoded_id)
log.warn_once("Could not find libc for %s %s anywhere", search_type, search_target)

# Save whatever we got to the cache
write(cache, data or b'')
Expand Down Expand Up @@ -257,7 +312,7 @@ def _search_debuginfo_by_hash(base_url, hex_encoded_id):

return cache

def _check_elf_cache(cache_type, hex_encoded_id, hash_type):
def _check_elf_cache(cache_type, search_target, search_type):
"""
Check if there already is an ELF file for this hash in the cache.

Expand All @@ -270,14 +325,14 @@ def _check_elf_cache(cache_type, hex_encoded_id, hash_type):
True
"""
# Ensure that the cache directory exists
cache_dir = os.path.join(context.cache_dir, cache_type, hash_type)
cache_dir = os.path.join(context.cache_dir, cache_type, search_type)

if not os.path.isdir(cache_dir):
os.makedirs(cache_dir)

# If we already downloaded the file, and it looks even passingly like
# a valid ELF file, return it.
cache = os.path.join(cache_dir, hex_encoded_id)
cache = os.path.join(cache_dir, search_target)

if not os.path.exists(cache):
return cache, False
Expand All @@ -289,7 +344,7 @@ def _check_elf_cache(cache_type, hex_encoded_id, hash_type):
# Retry failed lookups after some time
if time.time() > os.path.getmtime(cache) + NEGATIVE_CACHE_EXPIRY:
return cache, False
log.info_once("Skipping invalid cached ELF %s", hex_encoded_id)
log.info_once("Skipping invalid cached ELF %s", search_target)
return None, False

log.info_once("Using cached data from %r", cache)
Expand Down Expand Up @@ -583,7 +638,7 @@ def _handle_multiple_matching_libcs(matching_libcs):
selected_index = options("Select the libc version to use:", [libc['id'] for libc in matching_libcs])
return matching_libcs[selected_index]

def search_by_symbol_offsets(symbols, select_index=None, unstrip=True, return_as_list=False, offline_only=False):
def search_by_symbol_offsets(symbols, select_index=None, unstrip=True, return_as_list=False, offline_only=False, search_type='build_id'):
"""
Lookup possible matching libc versions based on leaked function addresses.

Expand All @@ -608,6 +663,8 @@ def search_by_symbol_offsets(symbols, select_index=None, unstrip=True, return_as
offline_only(bool):
When pass `offline_only=True`, restricts search mode to offline sources only,
disable online lookup. Defaults to `False`, and enable both offline and online providers.
search_type(str):
An option to select searched hash.

Returns:
Path to the downloaded library on disk, or :const:`None`.
Expand All @@ -626,6 +683,8 @@ def search_by_symbol_offsets(symbols, select_index=None, unstrip=True, return_as
>>> for buildid in matched_libcs: # doctest +SKIP
... libc = ELF(search_by_build_id(buildid)) # doctest +SKIP
"""
assert search_type in TYPES, search_type

for symbol, address in symbols.items():
if isinstance(address, int):
symbols[symbol] = hex(address)
Expand Down Expand Up @@ -661,21 +720,50 @@ def search_by_symbol_offsets(symbols, select_index=None, unstrip=True, return_as
if return_as_list:
return [libc['buildid'] for libc in matching_list]

# replace 'build_id' to 'buildid'
match_type = search_type.replace("_", "")

# If there's only one match, return it directly
if len(matching_list) == 1:
return search_by_build_id(matching_list[0]['buildid'], unstrip=unstrip, offline_only=offline_only)
return search_by_hash(matching_list[0][match_type], search_type=search_type, unstrip=unstrip, offline_only=offline_only)

# If a specific index is provided, validate it and return the selected libc
if select_index is not None:
if select_index > 0 and select_index <= len(matching_list):
return search_by_build_id(matching_list[select_index - 1]['buildid'], unstrip=unstrip, offline_only=offline_only)
return search_by_hash(matching_list[select_index - 1][match_type], search_type=search_type, unstrip=unstrip, offline_only=offline_only)
else:
log.error('Invalid selected libc index. %d is not in the range of 1-%d.', select_index, len(matching_list))
return None

# Handle multiple matches interactively if no index is specified
selected_libc = _handle_multiple_matching_libcs(matching_list)
return search_by_build_id(selected_libc['buildid'], unstrip=unstrip, offline_only=offline_only)
return search_by_hash(selected_libc[match_type], search_type=search_type, unstrip=unstrip, offline_only=offline_only)

def search_by_libs_id(libs_id, unstrip=True, offline_only=False):
"""
Given a hex-encoded Build ID, attempt to download a matching libc from libcdb.

Arguments:
libs_id(str):
Libs ID (e.g. 'libc6_...') of the library
unstrip(bool):
Try to fetch debug info for the libc and apply it to the downloaded file.
offline_only(bool):
When pass `offline_only=True`, restricts search mode to offline sources only,
disable online lookup. Defaults to `False`, and enable both offline and online providers.

Returns:
Path to the downloaded library on disk, or :const:`None`.

Examples:

>>> None == search_by_id('XX')
True
>>> filename = search_by_id('libc6_2.31-3_amd64')
>>> hex(ELF(filename).symbols.read)
'0xeef40'
"""
return search_by_hash(libs_id, 'libs_id', unstrip, offline_only)

def search_by_build_id(hex_encoded_id, unstrip=True, offline_only=False):
"""
Expand Down Expand Up @@ -819,9 +907,16 @@ def _pack_libs_info(path, libs_id, libs_url, syms):
info["libs_url"] = libs_url
info["download_url"] = ""

for hash_type, hash_func in HASHES.items():
for search_type, hash_func in TYPES.items():
# pass libs_id
if search_type == 'libs_id':
continue

# replace 'build_id' to 'buildid'
info[hash_type.replace("_", "")] = hash_func(path)
if search_type == 'build_id':
search_type = search_type.replace("_", "")

info[search_type] = hash_func(path)

default_symbol_list = [
"__libc_start_main_ret", "dup2", "printf", "puts", "read", "system", "str_bin_sh"
Expand Down