From 6a9deffbfb5e364fcb320149feb09a64c5e23fb6 Mon Sep 17 00:00:00 2001 From: k4lizen <124312252+k4lizen@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:10:10 +0200 Subject: [PATCH] Add functions for retrieving process mappings (#2371) * add to process mapping function, and stack, heap, vdso, vvar, libc _mapping properties * better libc string mapping detection #2370 * return all mappings instead of just the first one * add single/multiple option to every _mapping function * undo @property tags * added function doc comments * added elf_mapping * add get_mapping and refactor helpers to use it * better doc * add _location functions * _location_from_mappings for libc and musl locations * _location_from_mappings doc * change comment * fix executable -> path * change comment * python2.7 support * changelog * add address_mapping() * get_mapping comment typo * mention the path is an exact match * add references to wrapped functions * add missing 'the's, make sentences one line, fix 'exact' path description in get_mapping_location * add private and shared fields to permissions object * maps doctest * get_mapping doctest * stack_mapping doctest * heap_mapping doctest * improve stack_mapping and heap_mapping with address example * vvar and vdso _mapping doctest * libc_mapping doctest * elf_mapping doctest * _location_from_mappings doctest * get_mapping_location doctest * heap and stack _location doctest * vdso vvar _location doctest * improved vdso and vvar doctest * elf_location doctest * libc_location doctest * address_mapping bugfix * address_mapping doctest * fix meow check * fix len(mappings) check * actually fix meow check * why dont docs support format string? * removed _location functions * removed the rest of the _location related functions * added lib_size() * add lib_size() example * replace f'' with correct syntax * remove _location() from other doctests * cleanup == in doctest * change " to ' in doctests --------- Co-authored-by: peace-maker --- CHANGELOG.md | 2 + pwnlib/tubes/process.py | 435 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 436 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 982554804..be1969f95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ The table below shows which release corresponds to each branch, and what date th ## 4.14.0 (`dev`) +- [#2371][2371] Add functions for retrieving process mappings - [#2360][2360] Add offline parameter for `search_by_hash` series function - [#2356][2356] Add local libc database provider for libcdb - [#2374][2374] libcdb.unstrip_libc: debug symbols are fetched only if not present @@ -90,6 +91,7 @@ The table below shows which release corresponds to each branch, and what date th - [#2382][2382] added optional port, gdb_args and gdbserver_args parameters to gdb.debug() - [#2435][2435] Speed up gdbserver handshake in gdb.debug() +[2371]: https://github.com/Gallopsled/pwntools/pull/2371 [2360]: https://github.com/Gallopsled/pwntools/pull/2360 [2356]: https://github.com/Gallopsled/pwntools/pull/2356 [2374]: https://github.com/Gallopsled/pwntools/pull/2374 diff --git a/pwnlib/tubes/process.py b/pwnlib/tubes/process.py index 4bbcf56e2..73c378771 100644 --- a/pwnlib/tubes/process.py +++ b/pwnlib/tubes/process.py @@ -12,6 +12,7 @@ import subprocess import sys import time +from collections import namedtuple IS_WINDOWS = sys.platform.startswith('win') @@ -883,6 +884,437 @@ def __pty_make_controlling_tty(self, tty_fd): else: os.close(fd) + def maps(self): + """maps() -> [mapping] + + Returns a list of process mappings. + A mapping object has the following fields: + addr, address (addr alias), start (addr alias), end, size, perms, path, rss, pss, shared_clean, shared_dirty, private_clean, private_dirty, referenced, anonymous, swap + perms is a permissions object, with the following fields: + read, write, execute, private, shared, string + + Example: + + >>> p = process(['cat']) + >>> p.sendline(b"meow") + >>> p.recvline() + b'meow\\n' + >>> proc_maps = open("/proc/" + str(p.pid) + "/maps", "r").readlines() + >>> pwn_maps = p.maps() + >>> len(proc_maps) == len(pwn_maps) + True + >>> checker_arr = [] + >>> for proc, pwn in zip(proc_maps, pwn_maps): + ... proc = proc.split(' ') + ... p_addrs = proc[0].split('-') + ... checker_arr.append(int(p_addrs[0], 16) == pwn.addr == pwn.address == pwn.start) + ... checker_arr.append(int(p_addrs[1], 16) == pwn.end) + ... checker_arr.append(pwn.size == pwn.end - pwn.start) + ... checker_arr.append(pwn.perms.string == proc[1]) + ... proc_path = proc[-1].strip() + ... checker_arr.append(pwn.path == proc_path or (pwn.path == '[anon]' and proc_path == '')) + ... + >>> checker_arr == [True] * len(proc_maps) * 5 + True + + """ + + """ + Useful information about this can be found at: https://man7.org/linux/man-pages/man5/proc.5.html + specifically the /proc/pid/maps section. + + memory_maps() returns a list of pmmap_ext objects + + The definition (from psutil/_pslinux.py) is: + pmmap_grouped = namedtuple( + 'pmmap_grouped', + ['path', 'rss', 'size', 'pss', 'shared_clean', 'shared_dirty', + 'private_clean', 'private_dirty', 'referenced', 'anonymous', 'swap']) + pmmap_ext = namedtuple( + 'pmmap_ext', 'addr perms ' + ' '.join(pmmap_grouped._fields)) + + + Here is an example of a pmmap_ext entry: + pmmap_ext(addr='15555551c000-155555520000', perms='r--p', path='[vvar]', rss=0, size=16384, pss=0, shared_clean=0, shared_dirty=0, private_clean=0, private_dirty=0, referenced=0, anonymous=0, swap=0) + """ + + permissions = namedtuple("permissions", "read write execute private shared string") + mapping = namedtuple("mapping", + "addr address start end size perms path rss pss shared_clean shared_dirty private_clean private_dirty referenced anonymous swap") + # addr = address (alias) = start (alias) + + from pwnlib.util.proc import memory_maps + raw_maps = memory_maps(self.pid) + + maps = [] + # raw_mapping + for r_m in raw_maps: + p_perms = permissions('r' in r_m.perms, 'w' in r_m.perms, 'x' in r_m.perms, 'p' in r_m.perms, 's' in r_m.perms, r_m.perms) + addr_split = r_m.addr.split('-') + p_addr = int(addr_split[0], 16) + p_mapping = mapping(p_addr, p_addr, p_addr, int(addr_split[1], 16), r_m.size, p_perms, r_m.path, r_m.rss, + r_m.pss, r_m.shared_clean, r_m.shared_dirty, r_m.private_clean, r_m.private_dirty, + r_m.referenced, r_m.anonymous, r_m.swap) + maps.append(p_mapping) + + return maps + + def get_mapping(self, path_value, single=True): + """get_mapping(path_value, single=True) -> mapping + get_mapping(path_value, False) -> [mapping] + + Arguments: + path_value(str): The exact path of the requested mapping, + valid values are also [stack], [heap], etc.. + single(bool=True): Whether to only return the first + mapping matched, or all of them. + + Returns found mapping(s) in process memory according to + path_value. + + Example: + + >>> p = process(['cat']) + >>> mapping = p.get_mapping('[stack]') + >>> mapping.path == '[stack]' + True + >>> mapping.perms.execute + False + >>> + >>> mapping = p.get_mapping('does not exist') + >>> print(mapping) + None + >>> + >>> mappings = p.get_mapping(which('cat'), single=False) + >>> len(mappings) > 1 + True + + """ + all_maps = self.maps() + + if single: + for mapping in all_maps: + if path_value == mapping.path: + return mapping + return None + + m_mappings = [] + for mapping in all_maps: + if path_value == mapping.path: + m_mappings.append(mapping) + return m_mappings + + def stack_mapping(self, single=True): + """stack_mapping(single=True) -> mapping + stack_mapping(False) -> [mapping] + + Arguments: + single(bool=True): Whether to only return the first + mapping matched, or all of them. + + Returns :meth:`.process.get_mapping` with '[stack]' and single as arguments. + + Example: + + >>> p = process(['cat']) + >>> mapping = p.stack_mapping() + >>> mapping.path + '[stack]' + >>> mapping.perms.execute + False + >>> mapping.perms.write + True + >>> hex(mapping.address) # doctest: +SKIP + '0x7fffd99fe000' + >>> mappings = p.stack_mapping(single=False) + >>> len(mappings) + 1 + + """ + return self.get_mapping('[stack]', single) + + def heap_mapping(self, single=True): + """heap_mapping(single=True) -> mapping + heap_mapping(False) -> [mapping] + + Arguments: + single(bool=True): Whether to only return the first + mapping matched, or all of them. + + Returns :meth:`.process.get_mapping` with '[heap]' and single as arguments. + + Example: + + >>> p = process(['cat']) + >>> p.sendline(b'meow') + >>> p.recvline() + b'meow\\n' + >>> mapping = p.heap_mapping() + >>> mapping.path + '[heap]' + >>> mapping.perms.execute + False + >>> mapping.perms.write + True + >>> hex(mapping.address) # doctest: +SKIP + '0x557650fae000' + >>> mappings = p.heap_mapping(single=False) + >>> len(mappings) + 1 + + """ + return self.get_mapping('[heap]', single) + + def vdso_mapping(self, single=True): + """vdso_mapping(single=True) -> mapping + vdso_mapping(False) -> [mapping] + + Arguments: + single(bool=True): Whether to only return the first + mapping matched, or all of them. + + Returns :meth:`.process.get_mapping` with '[vdso]' and single as arguments. + + Example: + + >>> p = process(['cat']) + >>> mapping = p.vdso_mapping() + >>> mapping.path + '[vdso]' + >>> mapping.perms.execute + True + >>> mapping.perms.write + False + >>> hex(mapping.address) # doctest: +SKIP + '0x7ffcf13af000' + >>> mappings = p.vdso_mapping(single=False) + >>> len(mappings) + 1 + + """ + return self.get_mapping('[vdso]', single) + + def vvar_mapping(self, single=True): + """vvar_mapping(single=True) -> mapping + vvar_mapping(False) -> [mapping] + + Arguments: + single(bool=True): Whether to only return the first + mapping matched, or all of them. + + Returns :meth:`.process.get_mapping` with '[vvar]' and single as arguments. + + Example: + + >>> p = process(['cat']) + >>> mapping = p.vvar_mapping() + >>> mapping.path + '[vvar]' + >>> mapping.perms.execute + False + >>> mapping.perms.write + False + >>> hex(mapping.address) # doctest: +SKIP + '0x7ffee5f60000' + >>> mappings = p.vvar_mapping(single=False) + >>> len(mappings) + 1 + + """ + return self.get_mapping('[vvar]', single) + + def libc_mapping(self, single=True): + """libc_mapping(single=True) -> mapping + libc_mapping(False) -> [mapping] + + Arguments: + single(bool=True): Whether to only return the first + mapping matched, or all of them. + + Returns either the first libc mapping found in process memory, + or all libc mappings, depending on "single". + + Example: + + >>> p = process(['cat']) + >>> p.sendline(b'meow') + >>> p.recvline() + b'meow\\n' + >>> mapping = p.libc_mapping() + >>> mapping.path # doctest: +ELLIPSIS + '...libc...' + >>> mapping.perms.execute + False + >>> mapping.perms.write + False + >>> hex(mapping.address) # doctest: +SKIP + '0x7fbde7fd7000' + >>> + >>> mappings = p.libc_mapping(single=False) + >>> len(mappings) > 1 + True + >>> hex(mappings[1].address) # doctest: +SKIP + '0x7fbde7ffd000' + >>> mappings[0].end == mappings[1].start + True + >>> mappings[1].perms.execute + True + + """ + all_maps = self.maps() + + if single: + for mapping in all_maps: + lib_basename = os.path.basename(mapping.path) + if 'libc.so' in lib_basename or ('libc-' in lib_basename and '.so' in lib_basename): + return mapping + return None + + l_mappings = [] + for mapping in all_maps: + lib_basename = os.path.basename(mapping.path) + if 'libc.so' in lib_basename or ('libc-' in lib_basename and '.so' in lib_basename): + l_mappings.append(mapping) + return l_mappings + + def musl_mapping(self, single=True): + """musl_mapping(single=True) -> mapping + musl_mapping(False) -> [mapping] + + Arguments: + single(bool=True): Whether to only return the first + mapping matched, or all of them. + + Returns either the first musl mapping found in process memory, + or all musl mappings, depending on "single". + """ + all_maps = self.maps() + + if single: + for mapping in all_maps: + lib_basename = os.path.basename(mapping.path) + if 'musl.so' in lib_basename or ('musl-' in lib_basename and '.so' in lib_basename): + return mapping + return None + + m_mappings = [] + for mapping in all_maps: + lib_basename = os.path.basename(mapping.path) + if 'musl.so' in lib_basename or ('musl-' in lib_basename and '.so' in lib_basename): + m_mappings.append(mapping) + return m_mappings + + def elf_mapping(self, single=True): + """elf_mapping(single=True) -> mapping + elf_mapping(False) -> [mapping] + + Arguments: + single(bool=True): Whether to only return the first + mapping matched, or all of them. + + Returns :meth:`.process.get_mapping` with the :meth:`.process.elf` path and single as arguments. + + Example: + + >>> p = process(['cat']) + >>> p.sendline(b'meow') + >>> p.recvline() + b'meow\\n' + >>> mapping = p.elf_mapping() + >>> mapping.path # doctest: +ELLIPSIS + '...cat...' + >>> mapping.perms.execute + False + >>> mapping.perms.write + False + >>> hex(mapping.address) # doctest: +SKIP + '0x55a2abba0000' + >>> mappings = p.elf_mapping(single=False) + >>> len(mappings) > 1 + True + >>> hex(mappings[1].address) # doctest: +SKIP + '0x55a2abba2000' + >>> mappings[0].end == mappings[1].start + True + >>> mappings[1].perms.execute + True + + """ + return self.get_mapping(self.elf.path, single) + + def lib_size(self, path_value): + """lib_size(path_value) -> int + + Arguments: + path_value(str): The exact path of the shared library + loaded by the process + + Returns the size of the shared library in process memory. + If the library is not found, zero is returned. + + Example: + + >>> from pwn import * + >>> p = process(['cat']) + >>> libc_size = p.lib_size(p.libc.path) + >>> hex(libc_size) # doctest: +SKIP + '0x1d5000' + >>> libc_mappings = p.libc_mapping(single=False) + >>> libc_size == (libc_mappings[-1].end - libc_mappings[0].start) + True + + """ + + # Expecting this to be sorted + lib_mappings = self.get_mapping(path_value, single=False) + + if len(lib_mappings) == 0: + return 0 + + is_contiguous = True + total_size = lib_mappings[0].size + for i in range(1, len(lib_mappings)): + total_size += lib_mappings[i].size + + if lib_mappings[i].start != lib_mappings[i - 1].end: + is_contiguous = False + + if not is_contiguous: + log.warn("lib_size(): %s mappings aren't contiguous" % path_value) + + return total_size + + def address_mapping(self, address): + """address_mapping(address) -> mapping + + Returns the mapping at the specified address. + + Example: + + >>> p = process(['cat']) + >>> p.sendline(b'meow') + >>> p.recvline() + b'meow\\n' + >>> libc = p.libc_mapping().address + >>> heap = p.heap_mapping().address + >>> elf = p.elf_mapping().address + >>> p.address_mapping(libc).path # doctest: +ELLIPSIS + '.../libc...' + >>> p.address_mapping(heap + 0x123).path + '[heap]' + >>> p.address_mapping(elf + 0x1234).path # doctest: +ELLIPSIS + '.../cat' + >>> p.address_mapping(elf - 0x1234) == None + True + + """ + + all_maps = self.maps() + for mapping in all_maps: + if mapping.addr <= address < mapping.end: + return mapping + return None + def libs(self): """libs() -> dict @@ -937,7 +1369,8 @@ def libc(self): from pwnlib.elf import ELF for lib, address in self.libs().items(): - if 'libc.so' in lib or 'libc-' in lib: + lib_basename = os.path.basename(lib) + if 'libc.so' in lib_basename or ('libc-' in lib_basename and '.so' in lib_basename): e = ELF(lib) e.address = address return e