# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import math
import contextlib
import functools
import logging
from abc import ABC, abstractmethod
from typing import Iterator, List, Tuple, Optional, Union
import volatility3.framework.symbols.linux.utilities.modules as linux_utilities_modules
from volatility3 import framework
from volatility3.framework import (
constants,
exceptions,
interfaces,
objects,
Deprecation,
)
from volatility3.framework.objects import utility
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.linux import extensions
vollog = logging.getLogger(__name__)
[docs]
class LinuxKernelIntermedSymbols(intermed.IntermediateSymbolTable):
provides = {"type": "interface"}
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# Set-up Linux specific types
self.set_type_class("file", extensions.struct_file)
self.set_type_class("list_head", extensions.list_head)
self.set_type_class("hlist_head", extensions.hlist_head)
self.set_type_class("mm_struct", extensions.mm_struct)
self.set_type_class("super_block", extensions.super_block)
self.set_type_class("task_struct", extensions.task_struct)
self.set_type_class("vm_area_struct", extensions.vm_area_struct)
self.set_type_class("qstr", extensions.qstr)
self.set_type_class("dentry", extensions.dentry)
self.set_type_class("fs_struct", extensions.fs_struct)
self.set_type_class("files_struct", extensions.files_struct)
self.set_type_class("kobject", extensions.kobject)
self.set_type_class("cred", extensions.cred)
self.set_type_class("inode", extensions.inode)
self.set_type_class("idr", extensions.IDR)
self.set_type_class("address_space", extensions.address_space)
self.set_type_class("page", extensions.page)
# Might not exist in the current symbols
self.optional_set_type_class("module", extensions.module)
self.optional_set_type_class("bpf_prog", extensions.bpf_prog)
self.optional_set_type_class("bpf_prog_aux", extensions.bpf_prog_aux)
self.optional_set_type_class("kernel_cap_struct", extensions.kernel_cap_struct)
self.optional_set_type_class("kernel_cap_t", extensions.kernel_cap_t)
self.optional_set_type_class("scatterlist", extensions.scatterlist)
# kernels >= 4.18
self.optional_set_type_class("timespec64", extensions.timespec64)
# kernels < 4.18. Reuses timespec64 obj extension, since both has the same members
self.optional_set_type_class("timespec", extensions.timespec64)
# Mount
self.set_type_class("vfsmount", extensions.vfsmount)
# Might not exist in older kernels or the current symbols
self.optional_set_type_class("mount", extensions.mount)
self.optional_set_type_class("mnt_namespace", extensions.mnt_namespace)
self.optional_set_type_class("rb_root", extensions.rb_root)
# Network
self.set_type_class("net", extensions.net)
self.set_type_class("socket", extensions.socket)
self.set_type_class("sock", extensions.sock)
self.set_type_class("inet_sock", extensions.inet_sock)
self.set_type_class("unix_sock", extensions.unix_sock)
# Might not exist in older kernels or the current symbols
self.optional_set_type_class("netlink_sock", extensions.netlink_sock)
self.optional_set_type_class("vsock_sock", extensions.vsock_sock)
self.optional_set_type_class("packet_sock", extensions.packet_sock)
self.optional_set_type_class("bt_sock", extensions.bt_sock)
self.optional_set_type_class("xdp_sock", extensions.xdp_sock)
# Only found in 6.1+ kernels
self.optional_set_type_class("maple_tree", extensions.maple_tree)
[docs]
class LinuxUtilities(interfaces.configuration.VersionableInterface):
"""Class with multiple useful linux functions."""
_version = (2, 2, 1)
_required_framework_version = (2, 0, 0)
framework.require_interface_version(*_required_framework_version)
@classmethod
def _get_path_file(cls, task, filp) -> str:
"""Returns the file pathname relative to the task's root directory.
Args:
task (task_struct): A reference task
filp (file *): A pointer to an open file
Returns:
str: File pathname relative to the task's root directory.
"""
rdentry = task.fs.get_root_dentry()
rmnt = task.fs.get_root_mnt()
vfsmnt = filp.get_vfsmnt()
dentry = filp.get_dentry()
return cls.do_get_path(rdentry, rmnt, dentry, vfsmnt)
[docs]
@classmethod
def get_path_mnt(cls, task, mnt) -> str:
"""Returns the mount point pathname relative to the task's root directory.
Args:
task (task_struct): A reference task
mnt (vfsmount or mount): A mounted filesystem or a mount point.
- kernels < 3.3.8 type is 'vfsmount'
- kernels >= 3.3.8 type is 'mount'
Returns:
str: Pathname of the mount point relative to the task's root directory.
"""
rdentry = task.fs.get_root_dentry()
rmnt = task.fs.get_root_mnt()
vfsmnt = mnt.get_vfsmnt_current()
dentry = mnt.get_dentry_current()
return cls.do_get_path(rdentry, rmnt, dentry, vfsmnt)
[docs]
@classmethod
def do_get_path(cls, rdentry, rmnt, dentry, vfsmnt) -> Union[None, str]:
"""Returns a pathname of the mount point or file
It mimics the Linux kernel prepend_path function.
Args:
rdentry (dentry *): A pointer to the root dentry
rmnt (vfsmount *): A pointer to the root vfsmount
dentry (dentry *): A pointer to the dentry
vfsmnt (vfsmount *): A pointer to the vfsmount
Returns:
str: Pathname of the mount point or file
"""
path_reversed = []
while dentry != rdentry or not vfsmnt.is_equal(rmnt):
if dentry == vfsmnt.get_mnt_root() or dentry.is_root():
# Escaped?
if dentry != vfsmnt.get_mnt_root():
break
# Global root?
if not vfsmnt.has_parent():
break
dentry = vfsmnt.get_dentry_parent()
vfsmnt = vfsmnt.get_vfsmnt_parent()
continue
parent = dentry.d_parent
dname = dentry.d_name.name_as_str()
path_reversed.append(dname.strip("/"))
dentry = parent
path = "/" + "/".join(reversed(path_reversed))
return path
@classmethod
def _get_new_sock_pipe_path(cls, context, task, filp) -> str:
"""Returns the sock pipe pathname relative to the task's root directory.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
task (task_struct): A reference task
filp (file *): A pointer to a sock pipe open file
Returns:
str: Sock pipe pathname relative to the task's root directory.
"""
# FIXME: This function must be moved to the 'dentry' object extension
# Also, the scope of this function went beyond the sock pipe path, so we need to rename this.
# Once https://github.com/volatilityfoundation/volatility3/pull/1263 is merged, replace the
# dentry inode getters
if not (filp and filp.is_readable()):
return f"<invalid file pointer> {filp:x}"
dentry = filp.get_dentry()
if not (dentry and dentry.is_readable()):
return f"<invalid dentry pointer> {dentry:x}"
kernel_module = cls.get_module_from_volobj_type(context, dentry)
sym_addr = dentry.d_op.d_dname
if not (sym_addr and sym_addr.is_readable()):
return f"<invalid d_dname pointer> {sym_addr:x}"
symbs = list(kernel_module.get_symbols_by_absolute_location(sym_addr))
inode = dentry.d_inode
if not (inode and inode.is_readable() and inode.is_valid()):
return f"<invalid dentry inode> {inode:x}"
if len(symbs) == 1:
sym = symbs[0].split(constants.BANG)[1]
if sym == "sockfs_dname":
pre_name = "socket"
elif sym == "anon_inodefs_dname":
pre_name = "anon_inode"
elif sym == "pipefs_dname":
pre_name = "pipe"
elif sym == "simple_dname":
name = dentry.d_name.name
if name:
pre_name = name.dereference().cast(
"string", max_length=255, errors="replace"
)
return "/" + pre_name + " (deleted)"
else:
pre_name = ""
elif sym == "ns_dname":
# From Kernels 3.19
# In Kernels >= 6.9, see Linux kernel commit 1fa08aece42512be072351f482096d5796edf7ca
# ns_common->stashed change from 'atomic64_t' to 'dentry*'
try:
ns_common_type = kernel_module.get_type("ns_common")
stashed_template = ns_common_type.child_template("stashed")
stashed_type_full_name = stashed_template.vol.type_name
stashed_type_name = stashed_type_full_name.split(constants.BANG)[1]
if stashed_type_name == "atomic64_t":
# 3.19 <= Kernels < 6.9
fsdata_ptr = dentry.d_fsdata
if not (fsdata_ptr and fsdata_ptr.is_readable()):
raise IndexError
ns_ops = fsdata_ptr.dereference().cast("proc_ns_operations")
else:
# Kernels >= 6.9
private_ptr = inode.i_private
if not (private_ptr and private_ptr.is_readable()):
raise IndexError
ns_common = private_ptr.dereference().cast("ns_common")
ns_ops = ns_common.ops
pre_name = utility.pointer_to_string(ns_ops.name, 255)
except IndexError:
pre_name = "<unsupported ns_dname implementation>"
else:
pre_name = f"<unsupported d_op symbol> {sym}"
else:
pre_name = f"<unknown d_dname pointer> {sym_addr:x}"
return f"{pre_name}:[{inode.i_ino:d}]"
[docs]
@classmethod
def path_for_file(cls, context, task, filp) -> str:
"""Returns a file (or sock pipe) pathname relative to the task's root directory.
A 'file' structure doesn't have enough information to properly restore its
full path we need the root mount information from task_struct to determine this
Args:
context: The context to retrieve required elements (layers, symbol tables) from
task (task_struct): A reference task
filp (file *): A pointer to an open file
Returns:
str: A file (or sock pipe) pathname relative to the task's root directory.
"""
# Memory smear protection: Check that both the file and dentry pointers are valid.
try:
dentry = filp.get_dentry()
dentry.is_root()
except exceptions.InvalidAddressException:
return ""
if dentry == 0:
return ""
dname_is_valid = False
# TODO COMPARE THIS IN LSOF OUTPUT TO VOL2
try:
if (
dentry.d_op
and dentry.d_op.has_member("d_dname")
and dentry.d_op.d_dname
):
dname_is_valid = True
except exceptions.InvalidAddressException:
dname_is_valid = False
if dname_is_valid:
ret = LinuxUtilities._get_new_sock_pipe_path(context, task, filp)
else:
ret = LinuxUtilities._get_path_file(task, filp)
return ret
[docs]
@classmethod
def files_descriptors_for_process(
cls,
context: interfaces.context.ContextInterface,
symbol_table: str,
task: interfaces.objects.ObjectInterface,
):
# task.files can be null
if not (task.files and task.files.is_readable()):
return None
fd_table = task.files.get_fds()
if fd_table == 0:
return None
max_fds = task.files.get_max_fds()
# corruption check
if max_fds > 500000:
return None
file_type = symbol_table + constants.BANG + "file"
fds = objects.utility.array_of_pointers(
fd_table, count=max_fds, subtype=file_type, context=context
)
for fd_num, filp in enumerate(fds):
if filp and filp.is_readable():
full_path = LinuxUtilities.path_for_file(context, task, filp)
yield fd_num, filp, full_path
[docs]
@classmethod
@Deprecation.deprecated_method(
replacement=linux_utilities_modules.Modules.mask_mods_list,
replacement_version=(1, 0, 0),
)
def mask_mods_list(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
mods: Iterator[interfaces.objects.ObjectInterface],
) -> List[Tuple[str, int, int]]:
"""
DEPRECATED: use "volatility3.framework.symbols.linux.utilities.modules.Modules.mask_mods_list" instead.
A helper function to mask the starting and end address of kernel modules
"""
return linux_utilities_modules.Modules.mask_mods_list(context, layer_name, mods)
[docs]
@classmethod
def generate_kernel_handler_info(
cls,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
mods_list: Iterator[interfaces.objects.ObjectInterface],
) -> List[Tuple[str, int, int]]:
"""
A helper function that gets the beginning and end address of the kernel module
"""
kernel = context.modules[kernel_module_name]
mask = context.layers[kernel.layer_name].address_mask
start_addr = kernel.object_from_symbol("_text")
start_addr = start_addr.vol.offset & mask
end_addr = kernel.object_from_symbol("_etext")
end_addr = end_addr.vol.offset & mask
return [
(constants.linux.KERNEL_NAME, start_addr, end_addr)
] + linux_utilities_modules.Modules.mask_mods_list(
context, kernel.layer_name, mods_list
)
[docs]
@classmethod
@Deprecation.deprecated_method(
replacement=linux_utilities_modules.Modules.lookup_module_address,
replacement_version=(1, 0, 0),
)
def lookup_module_address(
cls,
kernel_module: interfaces.context.ModuleInterface,
handlers: List[Tuple[str, int, int]],
target_address: int,
) -> Tuple[str, str]:
"""
DEPRECATED: use "volatility3.framework.symbols.linux.utilities.modules.Modules.lookup_module_address" instead.
Searches between the start and end address of the kernel module using target_address.
Returns the module and symbol name of the address provided.
"""
return linux_utilities_modules.Modules.lookup_module_address(
kernel_module.context, kernel_module.name, handlers, target_address
)
[docs]
@classmethod
def walk_internal_list(cls, vmlinux, struct_name, list_member, list_start):
while list_start:
list_struct = vmlinux.object(
object_type=struct_name, offset=list_start.vol.offset
)
yield list_struct
list_start = getattr(list_struct, list_member)
[docs]
@classmethod
def container_of(
cls,
addr: int,
type_name: str,
member_name: str,
vmlinux: interfaces.context.ModuleInterface,
) -> Optional[interfaces.objects.ObjectInterface]:
"""Cast a member of a structure out to the containing structure.
It mimicks the Linux kernel macro container_of() see include/linux.kernel.h
Args:
addr: The pointer to the member.
type_name: The type of the container struct this is embedded in.
member_name: The name of the member within the struct.
vmlinux: The kernel symbols object
Returns:
The constructed object or None
"""
if not addr:
return None
type_dec = vmlinux.get_type(type_name)
member_offset = type_dec.relative_child_offset(member_name)
container_addr = addr - member_offset
return vmlinux.object(
object_type=type_name, offset=container_addr, absolute=True
)
[docs]
@classmethod
def get_module_from_volobj_type(
cls,
context: interfaces.context.ContextInterface,
volobj: interfaces.objects.ObjectInterface,
) -> interfaces.context.ModuleInterface:
"""Get the vmlinux from a vol obj
Args:
context: The context to retrieve required elements (layers, symbol tables) from
volobj (vol object): A vol object
Raises:
ValueError: If it cannot obtain any module from the symbol table
Returns:
A kernel object (vmlinux)
"""
symbol_table = volobj.get_symbol_table_name()
module_names = context.modules.get_modules_by_symbol_tables(symbol_table)
module_names = list(module_names)
if not module_names:
raise ValueError(f"No module using the symbol table '{symbol_table}'")
kernel_module_name = module_names[0]
kernel = context.modules[kernel_module_name]
return kernel
[docs]
@classmethod
def convert_fourcc_code(cls, code: int) -> str:
"""Convert a fourcc integer back to its fourcc string representation.
Args:
code: the numerical representation of the fourcc
Returns:
The fourcc code string.
"""
code_bytes_length = (code.bit_length() + 7) // 8
return "".join(
[chr((code >> (i * 8)) & 0xFF) for i in range(code_bytes_length)]
)
[docs]
class IDStorage(ABC):
"""Abstraction to support both XArray and RadixTree"""
# Dynamic values, these will be initialized later
CHUNK_SHIFT = None
CHUNK_SIZE = None
CHUNK_MASK = None
def __init__(
self,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
):
self.vmlinux = context.modules[kernel_module_name]
self.vmlinux_layer = self.vmlinux.context.layers[self.vmlinux.layer_name]
self.pointer_size = self.vmlinux.get_type("pointer").size
# Dynamically work out the (XA_CHUNK|RADIX_TREE_MAP)_SHIFT values based on
# the node.slots[] array size
node_type = self.vmlinux.get_type(self.node_type_name)
slots_array_size = node_type.child_template("slots").count
# Calculate the LSB index - 1
self.CHUNK_SHIFT = slots_array_size.bit_length() - 1
self.CHUNK_SIZE = 1 << self.CHUNK_SHIFT
self.CHUNK_MASK = self.CHUNK_SIZE - 1
[docs]
@classmethod
def choose_id_storage(
cls,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
) -> "IDStorage":
"""Returns the appropriate ID storage data structure instance for the current kernel implementation.
This is used by the IDR and the PageCache to choose between the XArray and RadixTree.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
kernel_module_name: The name of the kernel module on which to operate
Returns:
The appropriate ID storage instance for the current kernel
"""
vmlinux = context.modules[kernel_module_name]
address_space_type = vmlinux.get_type("address_space")
address_space_has_i_pages = address_space_type.has_member("i_pages")
i_pages_type_name = (
address_space_type.child_template("i_pages").vol.type_name
if address_space_has_i_pages
else ""
)
i_pages_is_xarray = i_pages_type_name.endswith(constants.BANG + "xarray")
i_pages_is_radix_tree_root = i_pages_type_name.endswith(
constants.BANG + "radix_tree_root"
) and vmlinux.get_type("radix_tree_root").has_member("xa_head")
if i_pages_is_xarray or i_pages_is_radix_tree_root:
return XArray(context, kernel_module_name)
else:
return RadixTree(context, kernel_module_name)
@property
@abstractmethod
def node_type_name(self) -> str:
"""Returns the Tree implementation node type name
Returns:
A string with the node type name
"""
raise NotImplementedError()
@property
def tag_internal_value(self) -> int:
"""Returns the internal node flag for the tree"""
raise NotImplementedError()
[docs]
@abstractmethod
def node_is_internal(self, nodep) -> bool:
"""Checks if the node is internal"""
raise NotImplementedError
[docs]
@abstractmethod
def is_node_tagged(self, nodep) -> bool:
"""Checks if the node pointer is tagged"""
raise NotImplementedError
[docs]
@abstractmethod
def untag_node(self, nodep) -> int:
"""Untags a node pointer"""
raise NotImplementedError
[docs]
@abstractmethod
def get_tree_height(self, treep) -> int:
"""Returns the tree height"""
raise NotImplementedError
[docs]
@abstractmethod
def get_node_height(self, nodep) -> int:
"""Returns the node height"""
raise NotImplementedError
[docs]
@abstractmethod
def get_head_node(self, tree) -> int:
"""Returns a pointer to the tree's head"""
raise NotImplementedError
[docs]
@abstractmethod
def is_valid_node(self, nodep) -> bool:
"""Validates a node pointer"""
raise NotImplementedError
[docs]
def nodep_to_node(self, nodep) -> interfaces.objects.ObjectInterface:
"""Instantiates a tree node from its pointer
Args:
nodep: Pointer to the XArray/RadixTree node
Returns:
A XArray/RadixTree node instance
"""
node = self.vmlinux.object(self.node_type_name, offset=nodep, absolute=True)
return node
def _slot_to_nodep(self, slot) -> int:
if self.node_is_internal(slot):
nodep = slot & ~self.tag_internal_value
else:
nodep = slot
return nodep
def _iter_node(self, nodep, height) -> Iterator[int]:
node = self.nodep_to_node(nodep)
node_slots = node.slots
for off in range(self.CHUNK_SIZE):
slot = node_slots[off]
if slot == 0:
continue
nodep = self._slot_to_nodep(slot)
if height == 1:
if self.is_valid_node(nodep):
yield nodep
else:
yield from self._iter_node(nodep, height - 1)
[docs]
def get_entries(self, root: interfaces.objects.ObjectInterface) -> Iterator[int]:
"""Walks the tree data structure
Args:
root: The tree root object
Yields:
A tree node pointer
"""
height = self.get_tree_height(root.vol.offset)
nodep = self.get_head_node(root)
if not (nodep and nodep.is_readable()):
return
# Keep the internal flag before untagging it
is_internal = self.node_is_internal(nodep)
if self.is_node_tagged(nodep):
nodep = self.untag_node(nodep)
if is_internal:
height = self.get_node_height(nodep)
if height == 0:
if self.is_valid_node(nodep):
yield nodep
else:
yield from self._iter_node(nodep, height)
[docs]
class XArray(IDStorage):
XARRAY_TAG_MASK = 3
XARRAY_TAG_INTERNAL = 2
[docs]
def get_tree_height(self, treep) -> int:
return 0
@property
def node_type_name(self) -> str:
return "xa_node"
@property
def tag_internal_value(self) -> int:
return self.XARRAY_TAG_INTERNAL
[docs]
def get_node_height(self, nodep) -> int:
node = self.nodep_to_node(nodep)
return (node.shift // self.CHUNK_SHIFT) + 1
[docs]
def get_head_node(self, tree) -> int:
return tree.xa_head
[docs]
def node_is_internal(self, nodep) -> bool:
return (nodep & self.XARRAY_TAG_MASK) == self.XARRAY_TAG_INTERNAL
[docs]
def is_node_tagged(self, nodep) -> bool:
return (nodep & self.XARRAY_TAG_MASK) != 0
[docs]
def untag_node(self, nodep) -> int:
return nodep & (~self.XARRAY_TAG_MASK)
[docs]
def is_valid_node(self, nodep) -> bool:
# It should have the tag mask clear
return not self.is_node_tagged(nodep)
[docs]
class RadixTree(IDStorage):
RADIX_TREE_INTERNAL_NODE = 1
RADIX_TREE_EXCEPTIONAL_ENTRY = 2
RADIX_TREE_ENTRY_MASK = 3
RADIX_TREE_MAP_SHIFT = 6 # CONFIG_BASE_FULL
# Dynamic values. These will be initialized later
RADIX_TREE_INDEX_BITS = None
RADIX_TREE_MAX_PATH = None
RADIX_TREE_HEIGHT_SHIFT = None
RADIX_TREE_HEIGHT_MASK = None
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
char_bits = 8
self.RADIX_TREE_INDEX_BITS = char_bits * self.pointer_size
self.RADIX_TREE_MAX_PATH = int(
math.ceil(self.RADIX_TREE_INDEX_BITS / float(self.CHUNK_SHIFT))
)
self.RADIX_TREE_HEIGHT_SHIFT = self.RADIX_TREE_MAX_PATH + 1
self.RADIX_TREE_HEIGHT_MASK = (1 << self.RADIX_TREE_HEIGHT_SHIFT) - 1
if not self.vmlinux.has_type("radix_tree_root"):
# In kernels 4.20, RADIX_TREE_INTERNAL_NODE flag took RADIX_TREE_EXCEPTIONAL_ENTRY's
# value. RADIX_TREE_EXCEPTIONAL_ENTRY was removed but that's managed in is_valid_node()
# Note that the Radix Tree is still in use for IDR, even after kernels 4.20 when XArray
# mostly replace it
self.RADIX_TREE_INTERNAL_NODE = 2
@property
def node_type_name(self) -> str:
return "radix_tree_node"
@property
def tag_internal_value(self) -> int:
return self.RADIX_TREE_INTERNAL_NODE
[docs]
def get_tree_height(self, treep) -> int:
with contextlib.suppress(exceptions.SymbolError):
if self.vmlinux.get_type("radix_tree_root").has_member("height"):
# kernels < 4.7 d0891265bbc988dc91ed8580b38eb3dac128581b
radix_tree_root = self.vmlinux.object(
"radix_tree_root", offset=treep, absolute=True
)
return radix_tree_root.height
# kernels >= 4.7
return 0
@functools.cached_property
def _max_height_array(self):
if self.vmlinux.has_symbol("height_to_maxindex"):
# 2.6.24 26fb1589cb0aaec3a0b4418c54f30c1a2b1781f6 <= Kernels < 4.7 d0891265bbc988dc91ed8580b38eb3dac128581b
return self.vmlinux.object_from_symbol("height_to_maxindex")
elif self.vmlinux.has_symbol("height_to_maxnodes"):
# 4.8 c78c66d1ddfdbd2353f3fcfeba0268524537b096 <= kernels < 4.20 8cf2f98411e3a0865026a1061af637161b16d32b
return self.vmlinux.object_from_symbol("height_to_maxnodes")
return None
def _radix_tree_maxindex(self, node, height) -> int:
"""Return the maximum key which can be store into a radix tree with this height."""
if self._max_height_array:
# 2.6.24 <= kernels <= 4.20 See _max_height_array()
return self._max_height_array[height]
else:
# Kernels >= 4.20
return (self.CHUNK_SIZE << node.shift) - 1
[docs]
def get_node_height(self, nodep) -> int:
node = self.nodep_to_node(nodep)
if hasattr(node, "shift"):
# 4.7 <= Kernels < 4.20
height = (node.shift // self.CHUNK_SHIFT) + 1
elif hasattr(node, "path"):
# 3.15 <= Kernels < 4.7
height = node.path & self.RADIX_TREE_HEIGHT_MASK
elif hasattr(node, "height"):
# Kernels < 3.15
height = node.height
else:
raise exceptions.VolatilityException("Cannot find radix-tree node height")
if self._max_height_array and not (0 <= height < self._max_height_array.count):
error_msg = f"Radix Tree node {node.vol.offset:#x} height {height} exceeds max height of {self._max_height_array.count}"
vollog.error(error_msg)
raise exceptions.LinuxPageCacheException(error_msg)
return height
[docs]
def get_head_node(self, tree) -> int:
return tree.rnode
[docs]
def node_is_internal(self, nodep) -> bool:
return (nodep & self.RADIX_TREE_INTERNAL_NODE) != 0
[docs]
def is_node_tagged(self, nodep) -> bool:
return self.node_is_internal(nodep)
[docs]
def untag_node(self, nodep) -> int:
return nodep & (~self.RADIX_TREE_ENTRY_MASK)
def _is_exceptional_node(self, nodep) -> bool:
# In kernels 4.20, exceptional nodes were removed and internal entries took their bitmask
return (
self.vmlinux.has_type("radix_tree_root")
and (nodep & self.RADIX_TREE_ENTRY_MASK)
== self.RADIX_TREE_EXCEPTIONAL_ENTRY
)
[docs]
def is_valid_node(self, nodep) -> bool:
return not self._is_exceptional_node(nodep)
[docs]
class PageCache:
"""Linux Page Cache abstraction"""
def __init__(
self,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
page_cache: interfaces.objects.ObjectInterface,
):
"""
Args:
context: interfaces.context.ContextInterface,
kernel_module_name: The name of the kernel module on which to operate
page_cache: Page cache address space
"""
self.vmlinux = context.modules[kernel_module_name]
self._page_cache = page_cache
self._idstorage = IDStorage.choose_id_storage(context, kernel_module_name)
[docs]
def get_cached_pages(self) -> Iterator[interfaces.objects.ObjectInterface]:
"""Returns all page cache contents
Yields:
Page objects
"""
layer = self.vmlinux.context.layers[self.vmlinux.layer_name]
for page_addr in self._idstorage.get_entries(self._page_cache.i_pages):
if not layer.is_valid(page_addr):
error_msg = f"Invalid cached page address at {page_addr:#x}, aborting"
vollog.error(error_msg)
raise exceptions.LinuxPageCacheException(error_msg)
page = self.vmlinux.object("page", offset=page_addr, absolute=True)
if not page.is_valid():
error_msg = f"Invalid cached page at {page_addr:#x}, aborting"
vollog.error(error_msg)
raise exceptions.LinuxPageCacheException(error_msg)
yield page