Source code for volatility3.framework.symbols.linux

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
from typing import Iterator, List, Tuple, Optional

from volatility3 import framework
from volatility3.framework import constants, exceptions, interfaces, objects
from volatility3.framework.objects import utility
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.linux import extensions


[docs]class LinuxKernelIntermedSymbols(intermed.IntermediateSymbolTable): provides = {"type": "interface"} def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # Set-up Linux specific types self.set_type_class("file", extensions.struct_file) self.set_type_class("list_head", extensions.list_head) self.set_type_class("mm_struct", extensions.mm_struct) self.set_type_class("super_block", extensions.super_block) self.set_type_class("task_struct", extensions.task_struct) self.set_type_class("vm_area_struct", extensions.vm_area_struct) self.set_type_class("qstr", extensions.qstr) self.set_type_class("dentry", extensions.dentry) self.set_type_class("fs_struct", extensions.fs_struct) self.set_type_class("files_struct", extensions.files_struct) self.set_type_class("kobject", extensions.kobject) # Might not exist in the current symbols self.optional_set_type_class("module", extensions.module) # Mount self.set_type_class("vfsmount", extensions.vfsmount) # Might not exist in older kernels or the current symbols self.optional_set_type_class("mount", extensions.mount) self.optional_set_type_class("mnt_namespace", extensions.mnt_namespace) # Network self.set_type_class("net", extensions.net) self.set_type_class("socket", extensions.socket) self.set_type_class("sock", extensions.sock) self.set_type_class("inet_sock", extensions.inet_sock) self.set_type_class("unix_sock", extensions.unix_sock) # Might not exist in older kernels or the current symbols self.optional_set_type_class("netlink_sock", extensions.netlink_sock) self.optional_set_type_class("vsock_sock", extensions.vsock_sock) self.optional_set_type_class("packet_sock", extensions.packet_sock) self.optional_set_type_class("bt_sock", extensions.bt_sock) self.optional_set_type_class("xdp_sock", extensions.xdp_sock)
[docs]class LinuxUtilities(interfaces.configuration.VersionableInterface): """Class with multiple useful linux functions.""" _version = (2, 0, 0) _required_framework_version = (2, 0, 0) framework.require_interface_version(*_required_framework_version) # based on __d_path from the Linux kernel @classmethod def _do_get_path(cls, rdentry, rmnt, dentry, vfsmnt) -> str: ret_path: List[str] = [] while dentry != rdentry or vfsmnt != rmnt: dname = dentry.path() if dname == "": break ret_path.insert(0, dname.strip("/")) if dentry == vfsmnt.get_mnt_root() or dentry == dentry.d_parent: if vfsmnt.get_mnt_parent() == vfsmnt: break dentry = vfsmnt.get_mnt_mountpoint() vfsmnt = vfsmnt.get_mnt_parent() continue parent = dentry.d_parent dentry = parent # if we did not gather any valid dentrys in the path, then the entire file is # either 1) smeared out of memory or 2) de-allocated and corresponding structures overwritten # we return an empty string in this case to avoid confusion with something like a handle to the root # directory (e.g., "/") if not ret_path: return "" ret_val = "/".join([str(p) for p in ret_path if p != ""]) if ret_val.startswith(("socket:", "pipe:")): if ret_val.find("]") == -1: try: inode = dentry.d_inode ino = inode.i_ino except exceptions.InvalidAddressException: ino = 0 ret_val = ret_val[:-1] + f":[{ino}]" else: ret_val = ret_val.replace("/", "") elif ret_val != "inotify": ret_val = "/" + ret_val return ret_val # method used by 'older' kernels # TODO: lookup when dentry_operations->d_name was merged into the mainline kernel for exact version @classmethod def _get_path_file(cls, task, filp) -> str: rdentry = task.fs.get_root_dentry() rmnt = task.fs.get_root_mnt() dentry = filp.get_dentry() vfsmnt = filp.get_vfsmnt() return LinuxUtilities._do_get_path(rdentry, rmnt, dentry, vfsmnt) @classmethod def _get_new_sock_pipe_path(cls, context, task, filp) -> str: dentry = filp.get_dentry() sym_addr = dentry.d_op.d_dname symbol_table_arr = sym_addr.vol.type_name.split("!") symbol_table = None if len(symbol_table_arr) == 2: symbol_table = symbol_table_arr[0] for module_name in context.modules.get_modules_by_symbol_tables(symbol_table): kernel_module = context.modules[module_name] break else: raise ValueError(f"No module using the symbol table {symbol_table}") symbs = list(kernel_module.get_symbols_by_absolute_location(sym_addr)) if len(symbs) == 1: sym = symbs[0].split(constants.BANG)[1] if sym == "sockfs_dname": pre_name = "socket" elif sym == "anon_inodefs_dname": pre_name = "anon_inode" elif sym == "pipefs_dname": pre_name = "pipe" elif sym == "simple_dname": pre_name = cls._get_path_file(task, filp) else: pre_name = f"<unsupported d_op symbol: {sym}>" ret = f"{pre_name}:[{dentry.d_inode.i_ino:d}]" else: ret = f"<invalid d_dname pointer> {sym_addr:x}" return ret # a 'file' structure doesn't have enough information to properly restore its full path # we need the root mount information from task_struct to determine this
[docs] @classmethod def path_for_file(cls, context, task, filp) -> str: try: dentry = filp.get_dentry() except exceptions.InvalidAddressException: return "" if dentry == 0: return "" dname_is_valid = False # TODO COMPARE THIS IN LSOF OUTPUT TO VOL2 try: if ( dentry.d_op and dentry.d_op.has_member("d_dname") and dentry.d_op.d_dname ): dname_is_valid = True except exceptions.InvalidAddressException: dname_is_valid = False if dname_is_valid: ret = LinuxUtilities._get_new_sock_pipe_path(context, task, filp) else: ret = LinuxUtilities._get_path_file(task, filp) return ret
[docs] @classmethod def files_descriptors_for_process( cls, context: interfaces.context.ContextInterface, symbol_table: str, task: interfaces.objects.ObjectInterface, ): # task.files can be null if not task.files: return fd_table = task.files.get_fds() if fd_table == 0: return max_fds = task.files.get_max_fds() # corruption check if max_fds > 500000: return file_type = symbol_table + constants.BANG + "file" fds = objects.utility.array_of_pointers( fd_table, count=max_fds, subtype=file_type, context=context ) for fd_num, filp in enumerate(fds): if filp != 0: full_path = LinuxUtilities.path_for_file(context, task, filp) yield fd_num, filp, full_path
[docs] @classmethod def mask_mods_list( cls, context: interfaces.context.ContextInterface, layer_name: str, mods: Iterator[interfaces.objects.ObjectInterface], ) -> List[Tuple[str, int, int]]: """ A helper function to mask the starting and end address of kernel modules """ mask = context.layers[layer_name].address_mask return [ ( utility.array_to_string(mod.name), mod.get_module_base() & mask, (mod.get_module_base() & mask) + mod.get_core_size(), ) for mod in mods ]
[docs] @classmethod def generate_kernel_handler_info( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, mods_list: Iterator[interfaces.objects.ObjectInterface], ) -> List[Tuple[str, int, int]]: """ A helper function that gets the beginning and end address of the kernel module """ kernel = context.modules[kernel_module_name] mask = context.layers[kernel.layer_name].address_mask start_addr = kernel.object_from_symbol("_text") start_addr = start_addr.vol.offset & mask end_addr = kernel.object_from_symbol("_etext") end_addr = end_addr.vol.offset & mask return [ (constants.linux.KERNEL_NAME, start_addr, end_addr) ] + LinuxUtilities.mask_mods_list(context, kernel.layer_name, mods_list)
[docs] @classmethod def lookup_module_address( cls, kernel_module: interfaces.context.ModuleInterface, handlers: List[Tuple[str, int, int]], target_address: int, ): """ Searches between the start and end address of the kernel module using target_address. Returns the module and symbol name of the address provided. """ mod_name = "UNKNOWN" symbol_name = "N/A" for name, start, end in handlers: if start <= target_address <= end: mod_name = name if name == constants.linux.KERNEL_NAME: symbols = list( kernel_module.get_symbols_by_absolute_location(target_address) ) if len(symbols): symbol_name = ( symbols[0].split(constants.BANG)[1] if constants.BANG in symbols[0] else symbols[0] ) break return mod_name, symbol_name
[docs] @classmethod def walk_internal_list(cls, vmlinux, struct_name, list_member, list_start): while list_start: list_struct = vmlinux.object( object_type=struct_name, offset=list_start.vol.offset ) yield list_struct list_start = getattr(list_struct, list_member)
[docs] @classmethod def container_of( cls, addr: int, type_name: str, member_name: str, vmlinux: interfaces.context.ModuleInterface, ) -> Optional[interfaces.objects.ObjectInterface]: """Cast a member of a structure out to the containing structure. It mimicks the Linux kernel macro container_of() see include/linux.kernel.h Args: addr: The pointer to the member. type_name: The type of the container struct this is embedded in. member_name: The name of the member within the struct. vmlinux: The kernel symbols object Returns: The constructed object or None """ if not addr: return type_dec = vmlinux.get_type(type_name) member_offset = type_dec.relative_child_offset(member_name) container_addr = addr - member_offset return vmlinux.object( object_type=type_name, offset=container_addr, absolute=True )