Source code for volatility3.framework.symbols.linux

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import math
import contextlib
from abc import ABC, abstractmethod
from typing import Iterator, List, Tuple, Optional, Union

from volatility3 import framework
from volatility3.framework import constants, exceptions, interfaces, objects
from volatility3.framework.objects import utility
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.linux import extensions


[docs]class LinuxKernelIntermedSymbols(intermed.IntermediateSymbolTable): provides = {"type": "interface"} def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # Set-up Linux specific types self.set_type_class("file", extensions.struct_file) self.set_type_class("list_head", extensions.list_head) self.set_type_class("hlist_head", extensions.hlist_head) self.set_type_class("mm_struct", extensions.mm_struct) self.set_type_class("super_block", extensions.super_block) self.set_type_class("task_struct", extensions.task_struct) self.set_type_class("vm_area_struct", extensions.vm_area_struct) self.set_type_class("qstr", extensions.qstr) self.set_type_class("dentry", extensions.dentry) self.set_type_class("fs_struct", extensions.fs_struct) self.set_type_class("files_struct", extensions.files_struct) self.set_type_class("kobject", extensions.kobject) self.set_type_class("cred", extensions.cred) self.set_type_class("inode", extensions.inode) self.set_type_class("idr", extensions.IDR) self.set_type_class("address_space", extensions.address_space) self.set_type_class("page", extensions.page) # Might not exist in the current symbols self.optional_set_type_class("module", extensions.module) self.optional_set_type_class("bpf_prog", extensions.bpf_prog) self.optional_set_type_class("bpf_prog_aux", extensions.bpf_prog_aux) self.optional_set_type_class("kernel_cap_struct", extensions.kernel_cap_struct) self.optional_set_type_class("kernel_cap_t", extensions.kernel_cap_t) # kernels >= 4.18 self.optional_set_type_class("timespec64", extensions.timespec64) # kernels < 4.18. Reuses timespec64 obj extension, since both has the same members self.optional_set_type_class("timespec", extensions.timespec64) # Mount self.set_type_class("vfsmount", extensions.vfsmount) # Might not exist in older kernels or the current symbols self.optional_set_type_class("mount", extensions.mount) self.optional_set_type_class("mnt_namespace", extensions.mnt_namespace) self.optional_set_type_class("rb_root", extensions.rb_root) # Network self.set_type_class("net", extensions.net) self.set_type_class("socket", extensions.socket) self.set_type_class("sock", extensions.sock) self.set_type_class("inet_sock", extensions.inet_sock) self.set_type_class("unix_sock", extensions.unix_sock) # Might not exist in older kernels or the current symbols self.optional_set_type_class("netlink_sock", extensions.netlink_sock) self.optional_set_type_class("vsock_sock", extensions.vsock_sock) self.optional_set_type_class("packet_sock", extensions.packet_sock) self.optional_set_type_class("bt_sock", extensions.bt_sock) self.optional_set_type_class("xdp_sock", extensions.xdp_sock) # Only found in 6.1+ kernels self.optional_set_type_class("maple_tree", extensions.maple_tree)
[docs]class LinuxUtilities(interfaces.configuration.VersionableInterface): """Class with multiple useful linux functions.""" _version = (2, 1, 1) _required_framework_version = (2, 0, 0) framework.require_interface_version(*_required_framework_version) @classmethod def _get_path_file(cls, task, filp) -> str: """Returns the file pathname relative to the task's root directory. Args: task (task_struct): A reference task filp (file *): A pointer to an open file Returns: str: File pathname relative to the task's root directory. """ rdentry = task.fs.get_root_dentry() rmnt = task.fs.get_root_mnt() vfsmnt = filp.get_vfsmnt() dentry = filp.get_dentry() return cls.do_get_path(rdentry, rmnt, dentry, vfsmnt)
[docs] @classmethod def get_path_mnt(cls, task, mnt) -> str: """Returns the mount point pathname relative to the task's root directory. Args: task (task_struct): A reference task mnt (vfsmount or mount): A mounted filesystem or a mount point. - kernels < 3.3.8 type is 'vfsmount' - kernels >= 3.3.8 type is 'mount' Returns: str: Pathname of the mount point relative to the task's root directory. """ rdentry = task.fs.get_root_dentry() rmnt = task.fs.get_root_mnt() vfsmnt = mnt.get_vfsmnt_current() dentry = mnt.get_dentry_current() return cls.do_get_path(rdentry, rmnt, dentry, vfsmnt)
[docs] @classmethod def do_get_path(cls, rdentry, rmnt, dentry, vfsmnt) -> Union[None, str]: """Returns a pathname of the mount point or file It mimics the Linux kernel prepend_path function. Args: rdentry (dentry *): A pointer to the root dentry rmnt (vfsmount *): A pointer to the root vfsmount dentry (dentry *): A pointer to the dentry vfsmnt (vfsmount *): A pointer to the vfsmount Returns: str: Pathname of the mount point or file """ path_reversed = [] while dentry != rdentry or not vfsmnt.is_equal(rmnt): if dentry == vfsmnt.get_mnt_root() or dentry.is_root(): # Escaped? if dentry != vfsmnt.get_mnt_root(): break # Global root? if not vfsmnt.has_parent(): break dentry = vfsmnt.get_dentry_parent() vfsmnt = vfsmnt.get_vfsmnt_parent() continue parent = dentry.d_parent dname = dentry.d_name.name_as_str() path_reversed.append(dname.strip("/")) dentry = parent path = "/" + "/".join(reversed(path_reversed)) return path
@classmethod def _get_new_sock_pipe_path(cls, context, task, filp) -> str: """Returns the sock pipe pathname relative to the task's root directory. Args: context: The context to retrieve required elements (layers, symbol tables) from task (task_struct): A reference task filp (file *): A pointer to a sock pipe open file Returns: str: Sock pipe pathname relative to the task's root directory. """ # FIXME: This function must be moved to the 'dentry' object extension # Also, the scope of this function went beyond the sock pipe path, so we need to rename this. # Once https://github.com/volatilityfoundation/volatility3/pull/1263 is merged, replace the # dentry inode getters if not (filp and filp.is_readable()): return f"<invalid file pointer> {filp:x}" dentry = filp.get_dentry() if not (dentry and dentry.is_readable()): return f"<invalid dentry pointer> {dentry:x}" kernel_module = cls.get_module_from_volobj_type(context, dentry) sym_addr = dentry.d_op.d_dname if not (sym_addr and sym_addr.is_readable()): return f"<invalid d_dname pointer> {sym_addr:x}" symbs = list(kernel_module.get_symbols_by_absolute_location(sym_addr)) inode = dentry.d_inode if not (inode and inode.is_readable() and inode.is_valid()): return f"<invalid dentry inode> {inode:x}" if len(symbs) == 1: sym = symbs[0].split(constants.BANG)[1] if sym == "sockfs_dname": pre_name = "socket" elif sym == "anon_inodefs_dname": pre_name = "anon_inode" elif sym == "pipefs_dname": pre_name = "pipe" elif sym == "simple_dname": name = dentry.d_name.name if name: pre_name = name.dereference().cast( "string", max_length=255, errors="replace" ) return "/" + pre_name + " (deleted)" else: pre_name = "" elif sym == "ns_dname": # From Kernels 3.19 # In Kernels >= 6.9, see Linux kernel commit 1fa08aece42512be072351f482096d5796edf7ca # ns_common->stashed change from 'atomic64_t' to 'dentry*' try: ns_common_type = kernel_module.get_type("ns_common") stashed_template = ns_common_type.child_template("stashed") stashed_type_full_name = stashed_template.vol.type_name stashed_type_name = stashed_type_full_name.split(constants.BANG)[1] if stashed_type_name == "atomic64_t": # 3.19 <= Kernels < 6.9 fsdata_ptr = dentry.d_fsdata if not (fsdata_ptr and fsdata_ptr.is_readable()): raise IndexError ns_ops = fsdata_ptr.dereference().cast("proc_ns_operations") else: # Kernels >= 6.9 private_ptr = inode.i_private if not (private_ptr and private_ptr.is_readable()): raise IndexError ns_common = private_ptr.dereference().cast("ns_common") ns_ops = ns_common.ops pre_name = utility.pointer_to_string(ns_ops.name, 255) except IndexError: pre_name = "<unsupported ns_dname implementation>" else: pre_name = f"<unsupported d_op symbol> {sym}" else: pre_name = f"<unknown d_dname pointer> {sym_addr:x}" return f"{pre_name}:[{inode.i_ino:d}]"
[docs] @classmethod def path_for_file(cls, context, task, filp) -> str: """Returns a file (or sock pipe) pathname relative to the task's root directory. A 'file' structure doesn't have enough information to properly restore its full path we need the root mount information from task_struct to determine this Args: context: The context to retrieve required elements (layers, symbol tables) from task (task_struct): A reference task filp (file *): A pointer to an open file Returns: str: A file (or sock pipe) pathname relative to the task's root directory. """ # Memory smear protection: Check that both the file and dentry pointers are valid. try: dentry = filp.get_dentry() dentry.is_root() except exceptions.InvalidAddressException: return "" if dentry == 0: return "" dname_is_valid = False # TODO COMPARE THIS IN LSOF OUTPUT TO VOL2 try: if ( dentry.d_op and dentry.d_op.has_member("d_dname") and dentry.d_op.d_dname ): dname_is_valid = True except exceptions.InvalidAddressException: dname_is_valid = False if dname_is_valid: ret = LinuxUtilities._get_new_sock_pipe_path(context, task, filp) else: ret = LinuxUtilities._get_path_file(task, filp) return ret
[docs] @classmethod def files_descriptors_for_process( cls, context: interfaces.context.ContextInterface, symbol_table: str, task: interfaces.objects.ObjectInterface, ): # task.files can be null if not (task.files and task.files.is_readable()): return None fd_table = task.files.get_fds() if fd_table == 0: return None max_fds = task.files.get_max_fds() # corruption check if max_fds > 500000: return None file_type = symbol_table + constants.BANG + "file" fds = objects.utility.array_of_pointers( fd_table, count=max_fds, subtype=file_type, context=context ) for fd_num, filp in enumerate(fds): if filp and filp.is_readable(): full_path = LinuxUtilities.path_for_file(context, task, filp) yield fd_num, filp, full_path
[docs] @classmethod def mask_mods_list( cls, context: interfaces.context.ContextInterface, layer_name: str, mods: Iterator[interfaces.objects.ObjectInterface], ) -> List[Tuple[str, int, int]]: """ A helper function to mask the starting and end address of kernel modules """ mask = context.layers[layer_name].address_mask return [ ( utility.array_to_string(mod.name), mod.get_module_base() & mask, (mod.get_module_base() & mask) + mod.get_core_size(), ) for mod in mods ]
[docs] @classmethod def generate_kernel_handler_info( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, mods_list: Iterator[interfaces.objects.ObjectInterface], ) -> List[Tuple[str, int, int]]: """ A helper function that gets the beginning and end address of the kernel module """ kernel = context.modules[kernel_module_name] mask = context.layers[kernel.layer_name].address_mask start_addr = kernel.object_from_symbol("_text") start_addr = start_addr.vol.offset & mask end_addr = kernel.object_from_symbol("_etext") end_addr = end_addr.vol.offset & mask return [ (constants.linux.KERNEL_NAME, start_addr, end_addr) ] + LinuxUtilities.mask_mods_list(context, kernel.layer_name, mods_list)
[docs] @classmethod def lookup_module_address( cls, kernel_module: interfaces.context.ModuleInterface, handlers: List[Tuple[str, int, int]], target_address: int, ): """ Searches between the start and end address of the kernel module using target_address. Returns the module and symbol name of the address provided. """ mod_name = "UNKNOWN" symbol_name = "N/A" for name, start, end in handlers: if start <= target_address <= end: mod_name = name if name == constants.linux.KERNEL_NAME: symbols = list( kernel_module.get_symbols_by_absolute_location(target_address) ) if len(symbols): symbol_name = ( symbols[0].split(constants.BANG)[1] if constants.BANG in symbols[0] else symbols[0] ) break return mod_name, symbol_name
[docs] @classmethod def walk_internal_list(cls, vmlinux, struct_name, list_member, list_start): while list_start: list_struct = vmlinux.object( object_type=struct_name, offset=list_start.vol.offset ) yield list_struct list_start = getattr(list_struct, list_member)
[docs] @classmethod def container_of( cls, addr: int, type_name: str, member_name: str, vmlinux: interfaces.context.ModuleInterface, ) -> Optional[interfaces.objects.ObjectInterface]: """Cast a member of a structure out to the containing structure. It mimicks the Linux kernel macro container_of() see include/linux.kernel.h Args: addr: The pointer to the member. type_name: The type of the container struct this is embedded in. member_name: The name of the member within the struct. vmlinux: The kernel symbols object Returns: The constructed object or None """ if not addr: return None type_dec = vmlinux.get_type(type_name) member_offset = type_dec.relative_child_offset(member_name) container_addr = addr - member_offset return vmlinux.object( object_type=type_name, offset=container_addr, absolute=True )
[docs] @classmethod def get_module_from_volobj_type( cls, context: interfaces.context.ContextInterface, volobj: interfaces.objects.ObjectInterface, ) -> interfaces.context.ModuleInterface: """Get the vmlinux from a vol obj Args: context: The context to retrieve required elements (layers, symbol tables) from volobj (vol object): A vol object Raises: ValueError: If it cannot obtain any module from the symbol table Returns: A kernel object (vmlinux) """ symbol_table = volobj.get_symbol_table_name() module_names = context.modules.get_modules_by_symbol_tables(symbol_table) module_names = list(module_names) if not module_names: raise ValueError(f"No module using the symbol table '{symbol_table}'") kernel_module_name = module_names[0] kernel = context.modules[kernel_module_name] return kernel
[docs]class IDStorage(ABC): """Abstraction to support both XArray and RadixTree""" # Dynamic values, these will be initialized later CHUNK_SHIFT = None CHUNK_SIZE = None CHUNK_MASK = None def __init__( self, context: interfaces.context.ContextInterface, kernel_module_name: str, ): self.vmlinux = context.modules[kernel_module_name] self.vmlinux_layer = self.vmlinux.context.layers[self.vmlinux.layer_name] self.pointer_size = self.vmlinux.get_type("pointer").size # Dynamically work out the (XA_CHUNK|RADIX_TREE_MAP)_SHIFT values based on # the node.slots[] array size node_type = self.vmlinux.get_type(self.node_type_name) slots_array_size = node_type.child_template("slots").count # Calculate the LSB index - 1 self.CHUNK_SHIFT = slots_array_size.bit_length() - 1 self.CHUNK_SIZE = 1 << self.CHUNK_SHIFT self.CHUNK_MASK = self.CHUNK_SIZE - 1
[docs] @classmethod def choose_id_storage( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, ) -> "IDStorage": """Returns the appropriate ID storage data structure instance for the current kernel implementation. This is used by the IDR and the PageCache to choose between the XArray and RadixTree. Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the kernel module on which to operate Returns: The appropriate ID storage instance for the current kernel """ vmlinux = context.modules[kernel_module_name] address_space_type = vmlinux.get_type("address_space") address_space_has_i_pages = address_space_type.has_member("i_pages") i_pages_type_name = ( address_space_type.child_template("i_pages").vol.type_name if address_space_has_i_pages else "" ) i_pages_is_xarray = i_pages_type_name.endswith(constants.BANG + "xarray") i_pages_is_radix_tree_root = i_pages_type_name.endswith( constants.BANG + "radix_tree_root" ) and vmlinux.get_type("radix_tree_root").has_member("xa_head") if i_pages_is_xarray or i_pages_is_radix_tree_root: return XArray(context, kernel_module_name) else: return RadixTree(context, kernel_module_name)
@property @abstractmethod def node_type_name(self) -> str: """Returns the Tree implementation node type name Returns: A string with the node type name """ raise NotImplementedError() @property def tag_internal_value(self) -> int: """Returns the internal node flag for the tree""" raise NotImplementedError()
[docs] @abstractmethod def node_is_internal(self, nodep) -> bool: """Checks if the node is internal""" raise NotImplementedError
[docs] @abstractmethod def is_node_tagged(self, nodep) -> bool: """Checks if the node pointer is tagged""" raise NotImplementedError
[docs] @abstractmethod def untag_node(self, nodep) -> int: """Untags a node pointer""" raise NotImplementedError
[docs] @abstractmethod def get_tree_height(self, treep) -> int: """Returns the tree height""" raise NotImplementedError
[docs] @abstractmethod def get_node_height(self, nodep) -> int: """Returns the node height""" raise NotImplementedError
[docs] @abstractmethod def get_head_node(self, tree) -> int: """Returns a pointer to the tree's head""" raise NotImplementedError
[docs] @abstractmethod def is_valid_node(self, nodep) -> bool: """Validates a node pointer""" raise NotImplementedError
[docs] def nodep_to_node(self, nodep) -> interfaces.objects.ObjectInterface: """Instanciates a tree node from its pointer Args: nodep: Pointer to the XArray/RadixTree node Returns: A XArray/RadixTree node instance """ node = self.vmlinux.object(self.node_type_name, offset=nodep, absolute=True) return node
def _slot_to_nodep(self, slot) -> int: if self.node_is_internal(slot): nodep = slot & ~self.tag_internal_value else: nodep = slot return nodep def _iter_node(self, nodep, height) -> Iterator[int]: node = self.nodep_to_node(nodep) node_slots = node.slots for off in range(self.CHUNK_SIZE): slot = node_slots[off] if slot == 0: continue nodep = self._slot_to_nodep(slot) if height == 1: if self.is_valid_node(nodep): yield nodep else: for child_node in self._iter_node(nodep, height - 1): yield child_node
[docs] def get_entries(self, root: interfaces.objects.ObjectInterface) -> Iterator[int]: """Walks the tree data structure Args: root: The tree root object Yields: A tree node pointer """ height = self.get_tree_height(root.vol.offset) nodep = self.get_head_node(root) if not nodep: return # Keep the internal flag before untagging it is_internal = self.node_is_internal(nodep) if self.is_node_tagged(nodep): nodep = self.untag_node(nodep) if is_internal: height = self.get_node_height(nodep) if height == 0: if self.is_valid_node(nodep): yield nodep else: for child_node in self._iter_node(nodep, height): yield child_node
[docs]class XArray(IDStorage): XARRAY_TAG_MASK = 3 XARRAY_TAG_INTERNAL = 2
[docs] def get_tree_height(self, treep) -> int: return 0
@property def node_type_name(self) -> str: return "xa_node" @property def tag_internal_value(self) -> int: return self.XARRAY_TAG_INTERNAL
[docs] def get_node_height(self, nodep) -> int: node = self.nodep_to_node(nodep) return (node.shift / self.CHUNK_SHIFT) + 1
[docs] def get_head_node(self, tree) -> int: return tree.xa_head
[docs] def node_is_internal(self, nodep) -> bool: return (nodep & self.XARRAY_TAG_MASK) == self.XARRAY_TAG_INTERNAL
[docs] def is_node_tagged(self, nodep) -> bool: return (nodep & self.XARRAY_TAG_MASK) != 0
[docs] def untag_node(self, nodep) -> int: return nodep & (~self.XARRAY_TAG_MASK)
[docs] def is_valid_node(self, nodep) -> bool: # It should have the tag mask clear return not self.is_node_tagged(nodep)
[docs]class RadixTree(IDStorage): RADIX_TREE_INTERNAL_NODE = 1 RADIX_TREE_EXCEPTIONAL_ENTRY = 2 RADIX_TREE_ENTRY_MASK = 3 # Dynamic values. These will be initialized later RADIX_TREE_INDEX_BITS = None RADIX_TREE_MAX_PATH = None RADIX_TREE_HEIGHT_SHIFT = None RADIX_TREE_HEIGHT_MASK = None def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) char_bits = 8 self.RADIX_TREE_INDEX_BITS = char_bits * self.pointer_size self.RADIX_TREE_MAX_PATH = int( math.ceil(self.RADIX_TREE_INDEX_BITS / float(self.CHUNK_SHIFT)) ) self.RADIX_TREE_HEIGHT_SHIFT = self.RADIX_TREE_MAX_PATH + 1 self.RADIX_TREE_HEIGHT_MASK = (1 << self.RADIX_TREE_HEIGHT_SHIFT) - 1 if not self.vmlinux.has_type("radix_tree_root"): # In kernels 4.20, RADIX_TREE_INTERNAL_NODE flag took RADIX_TREE_EXCEPTIONAL_ENTRY's # value. RADIX_TREE_EXCEPTIONAL_ENTRY was removed but that's managed in is_valid_node() # Note that the Radix Tree is still in use for IDR, even after kernels 4.20 when XArray # mostly replace it self.RADIX_TREE_INTERNAL_NODE = 2 @property def node_type_name(self) -> str: return "radix_tree_node" @property def tag_internal_value(self) -> int: return self.RADIX_TREE_INTERNAL_NODE
[docs] def get_tree_height(self, treep) -> int: with contextlib.suppress(exceptions.SymbolError): if self.vmlinux.get_type("radix_tree_root").has_member("height"): # kernels < 4.7.10 radix_tree_root = self.vmlinux.object( "radix_tree_root", offset=treep, absolute=True ) return radix_tree_root.height # kernels >= 4.7.10 return 0
def _radix_tree_maxindex(self, node, height) -> int: """Return the maximum key which can be store into a radix tree with this height.""" if not self.vmlinux.has_symbol("height_to_maxindex"): # Kernels >= 4.7 return (self.CHUNK_SIZE << node.shift) - 1 else: # Kernels < 4.7 height_to_maxindex_array = self.vmlinux.object_from_symbol( "height_to_maxindex" ) maxindex = height_to_maxindex_array[height] return maxindex
[docs] def get_node_height(self, nodep) -> int: node = self.nodep_to_node(nodep) if hasattr(node, "shift"): # 4.7 <= Kernels < 4.20 return (node.shift / self.CHUNK_SHIFT) + 1 elif hasattr(node, "path"): # 3.15 <= Kernels < 4.7 return node.path & self.RADIX_TREE_HEIGHT_MASK elif hasattr(node, "height"): # Kernels < 3.15 return node.height else: raise exceptions.VolatilityException("Cannot find radix-tree node height")
[docs] def get_head_node(self, tree) -> int: return tree.rnode
[docs] def node_is_internal(self, nodep) -> bool: return (nodep & self.RADIX_TREE_INTERNAL_NODE) != 0
[docs] def is_node_tagged(self, nodep) -> bool: return self.node_is_internal(nodep)
[docs] def untag_node(self, nodep) -> int: return nodep & (~self.RADIX_TREE_ENTRY_MASK)
[docs] def is_valid_node(self, nodep) -> bool: # In kernels 4.20, exceptional nodes were removed and internal entries took their bitmask if self.vmlinux.has_type("radix_tree_root"): return ( nodep & self.RADIX_TREE_ENTRY_MASK ) != self.RADIX_TREE_EXCEPTIONAL_ENTRY return True
[docs]class PageCache(object): """Linux Page Cache abstraction""" def __init__( self, context: interfaces.context.ContextInterface, kernel_module_name: str, page_cache: interfaces.objects.ObjectInterface, ): """ Args: context: interfaces.context.ContextInterface, kernel_module_name: The name of the kernel module on which to operate page_cache: Page cache address space """ self.vmlinux = context.modules[kernel_module_name] self._page_cache = page_cache self._idstorage = IDStorage.choose_id_storage(context, kernel_module_name)
[docs] def get_cached_pages(self) -> Iterator[interfaces.objects.ObjectInterface]: """Returns all page cache contents Yields: Page objects """ for page_addr in self._idstorage.get_entries(self._page_cache.i_pages): if not page_addr: continue page = self.vmlinux.object("page", offset=page_addr, absolute=True) if page: yield page