Source code for volatility3.framework.symbols.mac

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
from typing import Iterator, Any, Iterable, List, Tuple, Set

from volatility3.framework import interfaces, objects, exceptions, constants
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.mac import extensions


[docs]class MacKernelIntermedSymbols(intermed.IntermediateSymbolTable): provides = {"type": "interface"} def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.set_type_class("proc", extensions.proc) self.set_type_class("fileglob", extensions.fileglob) self.set_type_class("vnode", extensions.vnode) self.set_type_class("vm_map_entry", extensions.vm_map_entry) self.set_type_class("vm_map_object", extensions.vm_map_object) self.set_type_class("socket", extensions.socket) self.set_type_class("inpcb", extensions.inpcb) self.set_type_class("ifnet", extensions.ifnet) self.set_type_class("sockaddr_dl", extensions.sockaddr_dl) self.set_type_class("sockaddr", extensions.sockaddr) self.set_type_class("sysctl_oid", extensions.sysctl_oid) self.set_type_class("kauth_scope", extensions.kauth_scope) # https://developer.apple.com/documentation/kernel/queue_head_t self.set_type_class("queue_entry", extensions.queue_entry) self.optional_set_type_class("queue_head_t", extensions.queue_entry)
[docs]class MacUtilities(interfaces.configuration.VersionableInterface): """Class with multiple useful mac functions.""" """ Version History: 1.1.0 -> added walk_list_head API 1.2.0 -> added walk_slist API 1.3.0 -> add parameter to lookup_module_address to pass kernel module name """ _version = (1, 3, 0) _required_framework_version = (2, 0, 0)
[docs] @classmethod def mask_mods_list( cls, context: interfaces.context.ContextInterface, layer_name: str, mods: Iterator[Any], ) -> List[Tuple[interfaces.objects.ObjectInterface, Any, Any]]: """ A helper function to mask the starting and end address of kernel modules """ mask = context.layers[layer_name].address_mask return [ ( objects.utility.array_to_string(mod.name), mod.address & mask, (mod.address & mask) + mod.size, ) for mod in mods ]
[docs] @classmethod def generate_kernel_handler_info( cls, context: interfaces.context.ContextInterface, layer_name: str, kernel, # ikelos - how to type this?? mods_list: Iterator[Any], ): try: start_addr = kernel.object_from_symbol("vm_kernel_stext") except exceptions.SymbolError: start_addr = kernel.object_from_symbol("stext") try: end_addr = kernel.object_from_symbol("vm_kernel_etext") except exceptions.SymbolError: end_addr = kernel.object_from_symbol("etext") mask = context.layers[layer_name].address_mask start_addr = start_addr & mask end_addr = end_addr & mask return [("__kernel__", start_addr, end_addr)] + MacUtilities.mask_mods_list( context, layer_name, mods_list )
[docs] @classmethod def lookup_module_address( cls, context: interfaces.context.ContextInterface, handlers: Iterator[Any], target_address, kernel_module_name: str = None, ): mod_name = "UNKNOWN" symbol_name = "N/A" module_shift = 0 if kernel_module_name: module = context.modules[kernel_module_name] module_shift = module.offset for name, start, end in handlers: if start <= target_address <= end: mod_name = name if name == "__kernel__": symbols = list( context.symbol_space.get_symbols_by_location( target_address - module_shift ) ) if len(symbols) > 0: symbol_name = ( str(symbols[0].split(constants.BANG)[1]) if constants.BANG in symbols[0] else str(symbols[0]) ) break return mod_name, symbol_name
[docs] @classmethod def files_descriptors_for_process( cls, context: interfaces.context.ContextInterface, symbol_table_name: str, task: interfaces.objects.ObjectInterface, ): """Creates a generator for the file descriptors of a process Args: symbol_table_name: The name of the symbol table associated with the process context: task: The process structure to enumerate file descriptors from Return: A 3 element tuple is yielded for each file descriptor: 1) The file's object 2) The path referenced by the descriptor. The path is either empty, the full path of the file in the file system, or the formatted name for sockets, pipes, etc. 3) The file descriptor number """ try: num_fds = task.p_fd.fd_lastfile except exceptions.InvalidAddressException: num_fds = 1024 try: nfiles = task.p_fd.fd_nfiles except exceptions.InvalidAddressException: nfiles = 1024 if nfiles > num_fds: num_fds = nfiles if num_fds > 4096: num_fds = 1024 file_type = symbol_table_name + constants.BANG + "fileproc" try: table_addr = task.p_fd.fd_ofiles.dereference() except exceptions.InvalidAddressException: return None fds = objects.utility.array_of_pointers( table_addr, count=num_fds, subtype=file_type, context=context ) for fd_num, f in enumerate(fds): if f != 0: try: ftype = f.f_fglob.get_fg_type() except exceptions.InvalidAddressException: continue if ftype == "VNODE": vnode = f.f_fglob.fg_data.dereference().cast("vnode") path = vnode.full_path() elif ftype: path = f"<{ftype.lower()}>" yield f, path, fd_num
@classmethod def _walk_iterable( cls, queue: interfaces.objects.ObjectInterface, list_head_member: str, list_next_member: str, next_member: str, max_elements: int = 4096, ) -> Iterable[interfaces.objects.ObjectInterface]: seen: Set[int] = set() try: current = queue.member(attr=list_head_member) except exceptions.InvalidAddressException: return None while current: if current.vol.offset in seen: break seen.add(current.vol.offset) if len(seen) == max_elements: break if current.is_readable(): yield current try: current = current.member(attr=next_member).member(attr=list_next_member) except exceptions.InvalidAddressException: break
[docs] @classmethod def walk_tailq( cls, queue: interfaces.objects.ObjectInterface, next_member: str, max_elements: int = 4096, ) -> Iterable[interfaces.objects.ObjectInterface]: for element in cls._walk_iterable( queue, "tqh_first", "tqe_next", next_member, max_elements ): yield element
[docs] @classmethod def walk_list_head( cls, queue: interfaces.objects.ObjectInterface, next_member: str, max_elements: int = 4096, ) -> Iterable[interfaces.objects.ObjectInterface]: for element in cls._walk_iterable( queue, "lh_first", "le_next", next_member, max_elements ): yield element
[docs] @classmethod def walk_slist( cls, queue: interfaces.objects.ObjectInterface, next_member: str, max_elements: int = 4096, ) -> Iterable[interfaces.objects.ObjectInterface]: for element in cls._walk_iterable( queue, "slh_first", "sle_next", next_member, max_elements ): yield element