# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import abc
import collections.abc
import logging
import functools
import binascii
import stat
import datetime
import socket as socket_module
from typing import Generator, Iterable, Iterator, Optional, Tuple, List, Union, Dict
from volatility3.framework import constants, exceptions, objects, interfaces, symbols
from volatility3.framework.renderers import conversion
from volatility3.framework.constants import linux as linux_constants
from volatility3.framework.layers import linear
from volatility3.framework.objects import utility
from volatility3.framework.symbols import generic, linux, intermed
from volatility3.framework.symbols.linux.extensions import elf
vollog = logging.getLogger(__name__)
# Keep these in a basic module, to prevent import cycles when symbol providers require them
[docs]class module(generic.GenericIntelProcess):
[docs] def is_valid(self):
"""Determine whether it is a valid module object by verifying the self-referential
in module_kobject. This also confirms that the module is actively allocated and
not a remnant of freed memory or a failed module load attempt by verifying the
module memory section sizes.
"""
layer = self._context.layers[self.vol.layer_name]
# Make sure the entire module content is readable
if not layer.is_valid(self.vol.offset, self.vol.size):
return False
core_size = self.get_core_size()
core_text_size = self.get_core_text_size()
init_size = self.get_init_size()
if not (
0 < core_text_size <= linux_constants.MODULE_MAXIMUM_CORE_TEXT_SIZE
and 0 < core_size <= linux_constants.MODULE_MAXIMUM_CORE_SIZE
and core_size + init_size >= linux_constants.MODULE_MINIMUM_SIZE
):
return False
if not (
self.mkobj
and self.mkobj.mod
and self.mkobj.mod.is_readable()
and self.mkobj.mod == self.vol.offset
):
return False
return True
@functools.cached_property
def mod_mem_type(self) -> Dict:
"""Return the mod_mem_type enum choices if available or an empty dict if not"""
# mod_mem_type and module_memory were added in kernel 6.4 which replaces
# module_layout for storing the information around core_layout etc.
# see commit ac3b43283923440900b4f36ca5f9f0b1ca43b70e for more information
symbol_table_name = self.get_symbol_table_name()
mod_mem_type_symname = symbol_table_name + constants.BANG + "mod_mem_type"
symbol_space = self._context.symbol_space
try:
mod_mem_type = symbol_space.get_enumeration(mod_mem_type_symname).choices
except exceptions.SymbolError:
mod_mem_type = {}
vollog.debug(
"Unable to find mod_mem_type enum. This message can be ignored for kernels < 6.4"
)
return mod_mem_type
def _get_mem_type(self, mod_mem_type_name):
module_mem_index = self.mod_mem_type.get(mod_mem_type_name)
if module_mem_index is None:
raise AttributeError(f"Unknown module memory type '{mod_mem_type_name}'")
if not (0 <= module_mem_index < self.mem.count):
raise AttributeError(
f"Invalid module memory type index '{module_mem_index}'"
)
return self.mem[module_mem_index]
def _get_mem_size(self, mod_mem_type_name):
return self._get_mem_type(mod_mem_type_name).size
def _get_mem_base(self, mod_mem_type_name):
return self._get_mem_type(mod_mem_type_name).base
[docs] def get_module_base(self):
if self.has_member("mem"): # kernels 6.4+
return self._get_mem_base("MOD_TEXT")
elif self.has_member("core_layout"):
return self.core_layout.base
elif self.has_member("module_core"):
return self.module_core
raise AttributeError("Unable to get module base")
[docs] def get_init_size(self):
if self.has_member("mem"): # kernels 6.4+
return (
self._get_mem_size("MOD_INIT_TEXT")
+ self._get_mem_size("MOD_INIT_DATA")
+ self._get_mem_size("MOD_INIT_RODATA")
)
elif self.has_member("init_layout"):
return self.init_layout.size
elif self.has_member("init_size"):
return self.init_size
raise AttributeError("Unable to determine .init section size of module")
[docs] def get_core_size(self):
if self.has_member("mem"): # kernels 6.4+
return (
self._get_mem_size("MOD_TEXT")
+ self._get_mem_size("MOD_DATA")
+ self._get_mem_size("MOD_RODATA")
+ self._get_mem_size("MOD_RO_AFTER_INIT")
)
elif self.has_member("core_layout"):
return self.core_layout.size
elif self.has_member("core_size"):
return self.core_size
raise AttributeError("Unable to determine core size of module")
[docs] def get_core_text_size(self):
if self.has_member("mem"): # kernels 6.4+
return self._get_mem_size("MOD_TEXT")
elif self.has_member("core_layout"):
return self.core_layout.text_size
elif self.has_member("core_text_size"):
return self.core_text_size
raise AttributeError("Unable to determine core text size of module")
[docs] def get_module_core(self):
if self.has_member("mem"): # kernels 6.4+
return self._get_mem_base("MOD_TEXT")
elif self.has_member("core_layout"):
return self.core_layout.base
elif self.has_member("module_core"):
return self.module_core
raise AttributeError("Unable to get module core")
[docs] def get_module_init(self):
if self.has_member("mem"): # kernels 6.4+
return self._get_mem_base("MOD_INIT_TEXT")
elif self.has_member("init_layout"):
return self.init_layout.base
elif self.has_member("module_init"):
return self.module_init
raise AttributeError("Unable to get module init")
[docs] def get_name(self):
"""Get the name of the module as a string"""
return utility.array_to_string(self.name)
def _get_sect_count(self, grp):
"""Try to determine the number of valid sections"""
arr = self._context.object(
self.get_symbol_table_name() + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=grp.attrs,
subtype=self._context.symbol_space.get_type(
self.get_symbol_table_name() + constants.BANG + "pointer"
),
count=25,
)
idx = 0
while arr[idx]:
idx = idx + 1
return idx
[docs] def get_sections(self):
"""Get sections of the module"""
if self.sect_attrs.has_member("nsections"):
num_sects = self.sect_attrs.nsections
else:
num_sects = self._get_sect_count(self.sect_attrs.grp)
arr = self._context.object(
self.get_symbol_table_name() + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=self.sect_attrs.attrs.vol.offset,
subtype=self._context.symbol_space.get_type(
self.get_symbol_table_name() + constants.BANG + "module_sect_attr"
),
count=num_sects,
)
for attr in arr:
yield attr
[docs] def get_elf_table_name(self):
elf_table_name = intermed.IntermediateSymbolTable.create(
self._context,
"elf_symbol_table",
"linux",
"elf",
native_types=None,
class_types=elf.class_types,
)
return elf_table_name
[docs] def get_symbols(self):
"""Get symbols of the module
Yields:
A symbol object
"""
if not hasattr(self, "_elf_table_name"):
self._elf_table_name = self.get_elf_table_name()
if symbols.symbol_table_is_64bit(self._context, self.get_symbol_table_name()):
prefix = "Elf64_"
else:
prefix = "Elf32_"
syms = self._context.object(
self.get_symbol_table_name() + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=self.section_symtab,
subtype=self._context.symbol_space.get_type(
self._elf_table_name + constants.BANG + prefix + "Sym"
),
count=self.num_symtab + 1,
)
if self.section_strtab:
for sym in syms:
yield sym
[docs] def get_symbols_names_and_addresses(self) -> Iterable[Tuple[str, int]]:
"""Get names and addresses for each symbol of the module
Yields:
A tuple for each symbol containing the symbol name and its corresponding value
"""
for sym in self.get_symbols():
sym_arr = self._context.object(
self.get_symbol_table_name() + constants.BANG + "array",
layer_name=self.vol.native_layer_name,
offset=self.section_strtab + sym.st_name,
)
try:
sym_name = utility.array_to_string(
sym_arr, 512
) # 512 is the value of KSYM_NAME_LEN kernel constant
except exceptions.InvalidAddressException:
continue
if sym_name != "":
# Normalize sym.st_value offset, which is an address pointing to the symbol value
mask = self._context.layers[self.vol.layer_name].address_mask
sym_address = sym.st_value & mask
yield (sym_name, sym_address)
[docs] def get_symbol(self, wanted_sym_name):
"""Get symbol value for a given symbol name"""
for sym_name, sym_address in self.get_symbols_names_and_addresses():
if wanted_sym_name == sym_name:
return sym_address
return None
[docs] def get_symbol_by_address(self, wanted_sym_address):
"""Get symbol name for a given symbol address"""
for sym_name, sym_address in self.get_symbols_names_and_addresses():
if wanted_sym_address == sym_address:
return sym_name
return None
@property
def section_symtab(self):
if self.has_member("kallsyms"):
return self.kallsyms.symtab
elif self.has_member("symtab"):
return self.symtab
raise AttributeError("Unable to get symtab")
@property
def num_symtab(self):
if self.has_member("kallsyms"):
return int(self.kallsyms.num_symtab)
elif self.has_member("num_symtab"):
return int(self.member("num_symtab"))
raise AttributeError("Unable to determine number of symbols")
@property
def section_strtab(self):
# Newer kernels
if self.has_member("kallsyms"):
return self.kallsyms.strtab
# Older kernels
elif self.has_member("strtab"):
return self.strtab
raise AttributeError("Unable to get strtab")
[docs]class task_struct(generic.GenericIntelProcess):
[docs] def add_process_layer(
self, config_prefix: str = None, preferred_name: str = None
) -> Optional[str]:
"""Constructs a new layer based on the process's DTB.
Returns the name of the Layer or None.
"""
parent_layer = self._context.layers[self.vol.layer_name]
try:
pgd = self.mm.pgd
except exceptions.InvalidAddressException:
return None
if not isinstance(parent_layer, linear.LinearlyMappedLayer):
raise TypeError(
"Parent layer is not a translation layer, unable to construct process layer"
)
dtb, layer_name = parent_layer.translate(pgd)
if not dtb:
return None
if preferred_name is None:
preferred_name = self.vol.layer_name + f"_Process{self.pid}"
# Add the constructed layer and return the name
return self._add_process_layer(
self._context, dtb, config_prefix, preferred_name
)
[docs] def get_process_memory_sections(
self, heap_only: bool = False
) -> Generator[Tuple[int, int], None, None]:
"""Returns a list of sections based on the memory manager's view of
this task's virtual memory."""
for vma in self.mm.get_vma_iter():
start = int(vma.vm_start)
end = int(vma.vm_end)
if heap_only and not (start <= self.mm.brk and end >= self.mm.start_brk):
continue
else:
# FIXME: Check if this actually needs to be printed out or not
vollog.info(
f"adding vma: {start:x} {self.mm.brk:x} | {end:x} {self.mm.start_brk:x}"
)
yield (start, end - start)
@property
def is_kernel_thread(self) -> bool:
"""Checks if this task is a kernel thread.
Returns:
bool: True, if this task is a kernel thread. Otherwise, False.
"""
return (self.flags & linux_constants.PF_KTHREAD) != 0
@property
def is_thread_group_leader(self) -> bool:
"""Checks if this task is a thread group leader.
Returns:
bool: True, if this task is a thread group leader. Otherwise, False.
"""
return self.tgid == self.pid
@property
def is_user_thread(self) -> bool:
"""Checks if this task is a user thread.
Returns:
bool: True, if this task is a user thread. Otherwise, False.
"""
return not self.is_kernel_thread and self.tgid != self.pid
def _get_tasks_iterable(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns the respective iterable to obtain the threads in this process"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
task_struct_symname = f"{vmlinux.symbol_table_name}{constants.BANG}task_struct"
if vmlinux.get_type("task_struct").has_member("signal") and vmlinux.get_type(
"signal_struct"
).has_member("thread_head"):
# kernels >= 6.7 - via signals
return self.signal.thread_head.to_list(task_struct_symname, "thread_node")
elif vmlinux.get_type("task_struct").has_member("thread_group"):
# kernels < 6.7 - via thread_group
return self.thread_group.to_list(task_struct_symname, "thread_group")
raise AttributeError("Unable to find the root dentry")
[docs] def get_threads(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns each thread in this process"""
tasks_iterable = self._get_tasks_iterable()
threads_seen = set([self.vol.offset])
for task in tasks_iterable:
if task.vol.offset not in threads_seen:
threads_seen.add(task.vol.offset)
yield task
@property
def is_being_ptraced(self) -> bool:
"""Returns True if this task is being traced using ptrace"""
return self.ptrace != 0
@property
def is_ptracing(self) -> bool:
"""Returns True if this task is tracing other tasks using ptrace"""
is_tracing = (
self.ptraced.next.is_readable()
and self.ptraced.next.dereference().vol.offset != self.ptraced.vol.offset
)
return is_tracing
[docs] def get_ptrace_tracer_tid(self) -> Optional[int]:
"""Returns the tracer's TID tracing this task"""
return self.parent.pid if self.is_being_ptraced else None
[docs] def get_ptrace_tracee_tids(self) -> List[int]:
"""Returns the list of TIDs being traced by this task"""
task_symbol_table_name = self.get_symbol_table_name()
task_struct_symname = f"{task_symbol_table_name}{constants.BANG}task_struct"
tracing_tid_list = [
task_being_traced.pid
for task_being_traced in self.ptraced.to_list(
task_struct_symname, "ptrace_entry"
)
]
return tracing_tid_list
[docs] def get_ptrace_tracee_flags(self) -> Optional[str]:
"""Returns a string with the ptrace flags"""
return (
linux_constants.PT_FLAGS(self.ptrace).flags
if self.is_being_ptraced
else None
)
def _get_task_start_time(self) -> datetime.timedelta:
"""Returns the task's monotonic start_time as a timedelta.
Returns:
The task's start time as a timedelta object.
"""
for member_name in ("start_boottime", "real_start_time", "start_time"):
if self.has_member(member_name):
start_time_obj = self.member(member_name)
start_time_obj_type = start_time_obj.vol.type_name
start_time_obj_type_name = start_time_obj_type.split(constants.BANG)[1]
if start_time_obj_type_name != "timespec":
# kernels >= 3.17 real_start_time and start_time are u64
# kernels >= 5.5 uses start_boottime which is also a u64
start_time = Timespec64Concrete.new_from_nsec(start_time_obj)
else:
# kernels < 3.17 real_start_time and start_time are timespec
start_time = Timespec64Concrete.new_from_timespec(start_time_obj)
# This is relative to the boot time so it makes sense to be a timedelta.
return start_time.to_timedelta()
raise AttributeError("Unsupported task_struct start_time member")
[docs] def get_time_namespace(self) -> Optional[interfaces.objects.ObjectInterface]:
"""Returns the task's time namespace"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
if not self.has_member("nsproxy"):
# kernels < 2.6.19: ab516013ad9ca47f1d3a936fa81303bfbf734d52
return None
if not vmlinux.get_type("nsproxy").has_member("time_ns"):
# kernels < 5.6 769071ac9f20b6a447410c7eaa55d1a5233ef40c
return None
return self.nsproxy.time_ns
[docs] def get_time_namespace_id(self) -> int:
"""Returns the task's time namespace ID."""
time_ns = self.get_time_namespace()
if not time_ns:
# kernels < 5.6
return None
# We are good. ns_common (ns) was introduced in kernels 3.19. So by the time the
# time namespace was added in kernels 5.6, it already included the ns member.
return time_ns.ns.inum
def _get_time_namespace_offsets(
self,
) -> Optional[interfaces.objects.ObjectInterface]:
"""Returns the time offsets from the task's time namespace."""
time_ns = self.get_time_namespace()
if not time_ns:
# kernels < 5.6
return None
if not time_ns.has_member("offsets"):
# kernels < 5.6 af993f58d69ee9c1f421dfc87c3ed231c113989c
return None
return time_ns.offsets
[docs] def get_time_namespace_monotonic_offset(
self,
) -> Optional[interfaces.objects.ObjectInterface]:
"""Gets task's time namespace monotonic offset
Returns:
a kernel's timespec64 object with the monotonic offset
"""
time_namespace_offsets = self._get_time_namespace_offsets()
if not time_namespace_offsets:
return None
return time_namespace_offsets.monotonic
def _get_time_namespace_boottime_offset(
self,
) -> Optional[interfaces.objects.ObjectInterface]:
"""Gets task's time namespace boottime offset
Returns:
a kernel's timespec64 object with the boottime offset
"""
time_namespace_offsets = self._get_time_namespace_offsets()
if not time_namespace_offsets:
return None
return time_namespace_offsets.boottime
def _get_boottime_raw(self) -> "Timespec64Concrete":
"""Returns the boot time in a Timespec64Concrete object."""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
if vmlinux.has_symbol("tk_core"):
# kernels >= 3.17 | tk_core | 3fdb14fd1df70325e1e91e1203a699a4803ed741
tk_core = vmlinux.object_from_symbol("tk_core")
timekeeper = tk_core.timekeeper
if not timekeeper.offs_real.has_member("tv64"):
# kernels >= 4.10 - Tested on Ubuntu 6.8.0-41
boottime_nsec = timekeeper.offs_real - timekeeper.offs_boot
else:
# 3.17 <= kernels < 4.10 - Tested on Ubuntu 4.4.0-142
boottime_nsec = timekeeper.offs_real.tv64 - timekeeper.offs_boot.tv64
return Timespec64Concrete.new_from_nsec(boottime_nsec)
elif vmlinux.has_symbol("timekeeper") and vmlinux.get_type(
"timekeeper"
).has_member("wall_to_monotonic"):
# 3.4 <= kernels < 3.17 - Tested on Ubuntu 3.13.0-185
timekeeper = vmlinux.object_from_symbol("timekeeper")
# timekeeper.wall_to_monotonic is timespec
boottime = Timespec64Concrete.new_from_timespec(
timekeeper.wall_to_monotonic
)
boottime += timekeeper.total_sleep_time
return boottime.negate()
elif vmlinux.has_symbol("wall_to_monotonic"):
# kernels < 3.4 - Tested on Debian7 3.2.0-4 (3.2.57-3+deb7u2)
wall_to_monotonic = vmlinux.object_from_symbol("wall_to_monotonic")
boottime = Timespec64Concrete.new_from_timespec(wall_to_monotonic)
if vmlinux.has_symbol("total_sleep_time"):
# 2.6.23 <= kernels < 3.4 7c3f1a573237b90ef331267260358a0ec4ac9079
total_sleep_time = vmlinux.object_from_symbol("total_sleep_time")
full_type_name = total_sleep_time.vol.type_name
type_name = full_type_name.split(constants.BANG)[1]
if type_name == "timespec":
# kernels >= 2.6.32 total_sleep_time is a timespec
boottime += total_sleep_time
else:
# kernels < 2.6.32 total_sleep_time is an unsigned long as seconds
boottime.tv_sec += total_sleep_time
return boottime.negate()
raise exceptions.VolatilityException("Unsupported")
[docs] def get_boottime(self, root_time_namespace: bool = True) -> datetime.datetime:
"""Returns the boot time in UTC as a datetime.
Args:
root_time_namespace: If True, it returns the boot time as seen from the root
time namespace. Otherwise, it returns the boot time relative to the
task's time namespace.
Returns:
A datetime with the UTC boot time.
"""
boottime = self._get_boottime_raw()
if not boottime:
return None
if not root_time_namespace:
# Shift boot timestamp according to the task's time namespace offset
boottime_offset_timespec = self._get_time_namespace_boottime_offset()
if boottime_offset_timespec:
# Time namespace support is from kernels 5.6
boottime -= boottime_offset_timespec
return boottime.to_datetime()
[docs] def get_create_time(self) -> datetime.datetime:
"""Retrieves the task's start time from its time namespace.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
vmlinux_module_name: The name of the kernel module on which to operate
task: A reference task
Returns:
A datetime with task's start time
"""
# Typically, we want to see the creation time seen from the root time namespace
boottime = self.get_boottime(root_time_namespace=True)
# The kernel exports only tv_sec to procfs, see kernel's show_stat().
# This means user-space tools, like those in the procps package (e.g., ps, top, etc.),
# only use the boot time seconds to compute dates relatives to this.
boottime = boottime.replace(microsecond=0)
task_start_time_timedelta = self._get_task_start_time()
# NOTE: Do NOT apply the task's time namespace offsets here. While the kernel uses
# timens_add_boottime_ns(), it's not needed here since we're seeing it from the
# root time namespace, not within the task's own time namespace
return boottime + task_start_time_timedelta
[docs]class fs_struct(objects.StructType):
[docs] def get_root_dentry(self):
# < 2.6.26
if self.has_member("rootmnt"):
return self.root
elif self.root.has_member("dentry"):
return self.root.dentry
raise AttributeError("Unable to find the root dentry")
[docs] def get_root_mnt(self):
# < 2.6.26
if self.has_member("rootmnt"):
return self.rootmnt
elif self.root.has_member("mnt"):
return self.root.mnt
raise AttributeError("Unable to find the root mount")
[docs]class maple_tree(objects.StructType):
# include/linux/maple_tree.h
# Mask for Maple Tree Flags
MT_FLAGS_HEIGHT_MASK = 0x7C
MT_FLAGS_HEIGHT_OFFSET = 0x02
# Shift and mask to extract information from maple tree node pointers
MAPLE_NODE_TYPE_SHIFT = 0x03
MAPLE_NODE_TYPE_MASK = 0x0F
MAPLE_NODE_POINTER_MASK = 0xFF
# types of Maple Tree Nodes
MAPLE_DENSE = 0
MAPLE_LEAF_64 = 1
MAPLE_RANGE_64 = 2
MAPLE_ARANGE_64 = 3
[docs] def get_slot_iter(self):
"""Parse the Maple Tree and return every non zero slot."""
maple_tree_offset = self.vol.offset & ~(self.MAPLE_NODE_POINTER_MASK)
expected_maple_tree_depth = (
self.ma_flags & self.MT_FLAGS_HEIGHT_MASK
) >> self.MT_FLAGS_HEIGHT_OFFSET
yield from self._parse_maple_tree_node(
self.ma_root, maple_tree_offset, expected_maple_tree_depth
)
def _parse_maple_tree_node(
self,
maple_tree_entry,
parent,
expected_maple_tree_depth,
seen=None,
current_depth=1,
):
"""Recursively parse Maple Tree Nodes and yield all non empty slots"""
# Create seen set if it does not exist, e.g. on the first call into this recursive function. This
# must be None or an existing set of addresses for MTEs that have already been processed or that
# should otherwise be ignored. If parsing from the root node for example this should be None on the
# first call. If you needed to parse all nodes downwards from part of the tree this should still be
# None. If however you wanted to parse from a node, but ignore some parts of the tree below it then
# this could be populated with the addresses of the nodes you wish to ignore.
if seen is None:
seen = set()
# protect against unlikely loop
if maple_tree_entry in seen:
vollog.warning(
f"The mte {hex(maple_tree_entry)} has all ready been seen, no further results will be produced for this node."
)
return None
else:
seen.add(maple_tree_entry)
# check if we have exceeded the expected depth of this maple tree.
# e.g. when current_depth is larger than expected_maple_tree_depth there may be an issue.
# it is normal that expected_maple_tree_depth is equal to current_depth.
if expected_maple_tree_depth < current_depth:
vollog.warning(
f"The depth for the maple tree at {hex(self.vol.offset)} is {expected_maple_tree_depth}, however when parsing the nodes "
f"a depth of {current_depth} was reached. This is unexpected and may lead to incorrect results."
)
# parse the mte to extract the pointer value, node type, and leaf status
pointer = maple_tree_entry & ~(self.MAPLE_NODE_POINTER_MASK)
node_type = (
maple_tree_entry >> self.MAPLE_NODE_TYPE_SHIFT
) & self.MAPLE_NODE_TYPE_MASK
# create a pointer object for the node parent mte (note this will include flags in the low bits)
symbol_table_name = self.get_symbol_table_name()
node_parent_mte = self._context.object(
symbol_table_name + constants.BANG + "pointer",
layer_name=self.vol.native_layer_name,
offset=pointer,
)
# extract the actual pointer to the parent of this node
node_parent_pointer = node_parent_mte & ~(self.MAPLE_NODE_POINTER_MASK)
# verify that the node_parent_pointer correctly points to the parent
assert node_parent_pointer == parent
# create a node object
node = self._context.object(
symbol_table_name + constants.BANG + "maple_node",
layer_name=self.vol.layer_name,
offset=pointer,
)
# parse the slots based on the node type
if node_type == self.MAPLE_DENSE:
for slot in node.alloc.slot:
if (slot & ~(self.MAPLE_NODE_TYPE_MASK)) != 0:
yield slot
elif node_type == self.MAPLE_LEAF_64:
for slot in node.mr64.slot:
if (slot & ~(self.MAPLE_NODE_TYPE_MASK)) != 0:
yield slot
elif node_type == self.MAPLE_RANGE_64:
for slot in node.mr64.slot:
if (slot & ~(self.MAPLE_NODE_TYPE_MASK)) != 0:
yield from self._parse_maple_tree_node(
slot,
pointer,
expected_maple_tree_depth,
seen,
current_depth + 1,
)
elif node_type == self.MAPLE_ARANGE_64:
for slot in node.ma64.slot:
if (slot & ~(self.MAPLE_NODE_TYPE_MASK)) != 0:
yield from self._parse_maple_tree_node(
slot,
pointer,
expected_maple_tree_depth,
seen,
current_depth + 1,
)
else:
# unkown maple node type
raise AttributeError(
f"Unkown Maple Tree node type {node_type} at offset {hex(pointer)}."
)
[docs]class mm_struct(objects.StructType):
# TODO: As of version 3.0.0 this method should be removed
[docs] def get_mmap_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""
Deprecated: Use either get_vma_iter() or _get_mmap_iter().
"""
vollog.warning(
"This method has been deprecated in favour of using the get_vma_iter() method."
)
yield from self.get_vma_iter()
def _get_mmap_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns an iterator for the mmap list member of an mm_struct. Use this only if
required, get_vma_iter() will choose the correct _get_maple_tree_iter() or
_get_mmap_iter() automatically as required."""
if not self.has_member("mmap"):
raise AttributeError(
"_get_mmap_iter called on mm_struct where no mmap member exists."
)
if not self.mmap:
return None
yield self.mmap
seen = {self.mmap.vol.offset}
link = self.mmap.vm_next
while link != 0 and link.vol.offset not in seen:
yield link
seen.add(link.vol.offset)
link = link.vm_next
# TODO: As of version 3.0.0 this method should be removed
[docs] def get_maple_tree_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""
Deprecated: Use either get_vma_iter() or _get_maple_tree_iter().
"""
vollog.warning(
"This method has been deprecated in favour of using the get_vma_iter() method."
)
yield from self.get_vma_iter()
def _get_maple_tree_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns an iterator for the mm_mt member of an mm_struct. Use this only if
required, get_vma_iter() will choose the correct _get_maple_tree_iter() or
get_mmap_iter() automatically as required."""
if not self.has_member("mm_mt"):
raise AttributeError(
"_get_maple_tree_iter called on mm_struct where no mm_mt member exists."
)
symbol_table_name = self.get_symbol_table_name()
for vma_pointer in self.mm_mt.get_slot_iter():
# convert pointer to vm_area_struct and yield
vma = self._context.object(
symbol_table_name + constants.BANG + "vm_area_struct",
layer_name=self.vol.native_layer_name,
offset=vma_pointer,
)
yield vma
[docs] def get_vma_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns an iterator for the VMAs in an mm_struct. Automatically choosing the mmap or mm_mt as required."""
if self.has_member("mmap"):
yield from self._get_mmap_iter()
elif self.has_member("mm_mt"):
yield from self._get_maple_tree_iter()
else:
raise AttributeError("Unable to find mmap or mm_mt in mm_struct")
[docs]class super_block(objects.StructType):
# include/linux/kdev_t.h
MINORBITS = 20
# Superblock flags
SB_RDONLY = 1 # Mount read-only
SB_NOSUID = 2 # Ignore suid and sgid bits
SB_NODEV = 4 # Disallow access to device special files
SB_NOEXEC = 8 # Disallow program execution
SB_SYNCHRONOUS = 16 # Writes are synced at once
SB_MANDLOCK = 64 # Allow mandatory locks on an FS
SB_DIRSYNC = 128 # Directory modifications are synchronous
SB_NOATIME = 1024 # Do not update access times
SB_NODIRATIME = 2048 # Do not update directory access times
SB_SILENT = 32768
SB_POSIXACL = 1 << 16 # VFS does not apply the umask
SB_KERNMOUNT = 1 << 22 # this is a kern_mount call
SB_I_VERSION = 1 << 23 # Update inode I_version field
SB_LAZYTIME = 1 << 25 # Update the on-disk [acm]times lazily
SB_OPTS = {
SB_SYNCHRONOUS: "sync",
SB_DIRSYNC: "dirsync",
SB_MANDLOCK: "mand",
SB_LAZYTIME: "lazytime",
}
@property
def major(self) -> int:
return self.s_dev >> self.MINORBITS
@property
def minor(self) -> int:
return self.s_dev & ((1 << self.MINORBITS) - 1)
[docs] def get_flags_access(self) -> str:
return "ro" if self.s_flags & self.SB_RDONLY else "rw"
[docs] def get_flags_opts(self) -> Iterable[str]:
sb_opts = [
self.SB_OPTS[sb_opt] for sb_opt in self.SB_OPTS if sb_opt & self.s_flags
]
return sb_opts
[docs] def get_type(self) -> Optional[str]:
"""Gets the superblock filesystem type string"""
s_type_ptr = self.s_type
if not (s_type_ptr and s_type_ptr.is_readable()):
return None
s_type_name_ptr = s_type_ptr.name
if not (s_type_name_ptr and s_type_name_ptr.is_readable()):
return None
mnt_sb_type = utility.pointer_to_string(s_type_name_ptr, count=255)
s_subtype_ptr = self.s_subtype
if s_subtype_ptr and s_subtype_ptr.is_readable():
mnt_sb_subtype = utility.pointer_to_string(s_subtype_ptr, count=255)
mnt_sb_type += "." + mnt_sb_subtype
return mnt_sb_type
[docs]class vm_area_struct(objects.StructType):
perm_flags = {
0x00000001: "r",
0x00000002: "w",
0x00000004: "x",
}
extended_flags = {
0x00000001: "VM_READ",
0x00000002: "VM_WRITE",
0x00000004: "VM_EXEC",
0x00000008: "VM_SHARED",
0x00000010: "VM_MAYREAD",
0x00000020: "VM_MAYWRITE",
0x00000040: "VM_MAYEXEC",
0x00000080: "VM_MAYSHARE",
0x00000100: "VM_GROWSDOWN",
0x00000200: "VM_NOHUGEPAGE",
0x00000400: "VM_PFNMAP",
0x00000800: "VM_DENYWRITE",
0x00001000: "VM_EXECUTABLE",
0x00002000: "VM_LOCKED",
0x00004000: "VM_IO",
0x00008000: "VM_SEQ_READ",
0x00010000: "VM_RAND_READ",
0x00020000: "VM_DONTCOPY",
0x00040000: "VM_DONTEXPAND",
0x00080000: "VM_RESERVED",
0x00100000: "VM_ACCOUNT",
0x00200000: "VM_NORESERVE",
0x00400000: "VM_HUGETLB",
0x00800000: "VM_NONLINEAR",
0x01000000: "VM_MAPPED_COP__VM_HUGEPAGE",
0x02000000: "VM_INSERTPAGE",
0x04000000: "VM_ALWAYSDUMP",
0x08000000: "VM_CAN_NONLINEAR",
0x10000000: "VM_MIXEDMAP",
0x20000000: "VM_SAO",
0x40000000: "VM_PFN_AT_MMAP",
0x80000000: "VM_MERGEABLE",
}
def _parse_flags(self, vm_flags, parse_flags) -> str:
"""Returns an string representation of the flags in a
vm_area_struct."""
retval = ""
for mask, char in parse_flags.items():
if (vm_flags & mask) == mask:
retval = retval + char
else:
retval = retval + "-"
return retval
# only parse the rwx bits
[docs] def get_protection(self) -> str:
return self._parse_flags(self.vm_flags & 0b1111, vm_area_struct.perm_flags)
# used by malfind
[docs] def get_flags(self) -> str:
return self._parse_flags(self.vm_flags, self.extended_flags)
[docs] def get_page_offset(self) -> int:
if self.vm_file == 0:
return 0
parent_layer = self._context.layers[self.vol.layer_name]
return self.vm_pgoff << parent_layer.page_shift
[docs] def get_name(self, context, task):
if self.vm_file != 0:
fname = linux.LinuxUtilities.path_for_file(context, task, self.vm_file)
elif self.vm_start <= task.mm.start_brk and self.vm_end >= task.mm.brk:
fname = "[heap]"
elif self.vm_start <= task.mm.start_stack <= self.vm_end:
fname = "[stack]"
elif (
self.vm_mm.context.has_member("vdso")
and self.vm_start == self.vm_mm.context.vdso
):
fname = "[vdso]"
else:
fname = "Anonymous Mapping"
return fname
# used by malfind
[docs] def is_suspicious(self, proclayer=None):
ret = False
flags_str = self.get_protection()
if flags_str == "rwx":
ret = True
elif flags_str == "r-x" and self.vm_file.dereference().vol.offset == 0:
ret = True
elif proclayer and "x" in flags_str:
for i in range(self.vm_start, self.vm_end, proclayer.page_size):
try:
if proclayer.is_dirty(i):
vollog.warning(
f"Found malicious (dirty+exec) page at {hex(i)} !"
)
# We do not attempt to find other dirty+exec pages once we have found one
ret = True
break
except (
exceptions.PagedInvalidAddressException,
exceptions.InvalidAddressException,
) as excp:
vollog.debug(f"Unable to translate address {hex(i)} : {excp}")
# Abort as it is likely that other addresses in the same range will also fail
ret = False
break
return ret
[docs]class qstr(objects.StructType):
[docs] def name_as_str(self) -> str:
if self.has_member("len"):
str_length = self.len + 1 # Maximum length should include null terminator
else:
str_length = 255
try:
ret = objects.utility.pointer_to_string(self.name, str_length)
except (exceptions.InvalidAddressException, ValueError):
ret = ""
return ret
[docs]class dentry(objects.StructType):
[docs] def path(self) -> str:
"""Based on __dentry_path Linux kernel function"""
reversed_path = []
dentry_seen = set()
current_dentry = self
while (
not current_dentry.is_root()
and current_dentry.vol.offset not in dentry_seen
):
parent = current_dentry.d_parent
reversed_path.append(current_dentry.d_name.name_as_str())
dentry_seen.add(current_dentry.vol.offset)
current_dentry = parent
return "/" + "/".join(reversed(reversed_path))
[docs] def is_root(self) -> bool:
return self.vol.offset == self.d_parent
[docs] def is_subdir(self, old_dentry):
"""Is this dentry a subdirectory of old_dentry?
Returns true if this dentry is a subdirectory of the parent (at any depth).
Otherwise, it returns false.
"""
if self.vol.offset == old_dentry:
return True
return self.d_ancestor(old_dentry)
[docs] def d_ancestor(self, ancestor_dentry):
"""Search for an ancestor
Returns the ancestor dentry which is a child of "ancestor_dentry",
if "ancestor_dentry" is an ancestor of "child_dentry", else None.
"""
dentry_seen = set()
current_dentry = self
while (
not current_dentry.is_root()
and current_dentry.vol.offset not in dentry_seen
):
if current_dentry.d_parent == ancestor_dentry.vol.offset:
return current_dentry
dentry_seen.add(current_dentry.vol.offset)
current_dentry = current_dentry.d_parent
return None
[docs] def get_subdirs(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Walks dentry subdirs
Yields:
A dentry object
"""
if self.has_member("d_sib") and self.has_member("d_children"):
# kernels >= 6.8
walk_member = "d_sib"
list_head_member = self.d_children
elif self.has_member("d_child") and self.has_member("d_subdirs"):
# 2.5.0 <= kernels < 6.8
walk_member = "d_child"
list_head_member = self.d_subdirs
else:
raise exceptions.VolatilityException("Unsupported dentry type")
dentry_type_name = self.get_symbol_table_name() + constants.BANG + "dentry"
yield from list_head_member.to_list(dentry_type_name, walk_member)
[docs] def get_inode(self) -> interfaces.objects.ObjectInterface:
"""Returns the inode associated with this dentry"""
inode_ptr = self.d_inode
if not (inode_ptr and inode_ptr.is_readable() and inode_ptr.is_valid()):
return None
return inode_ptr.dereference()
[docs]class struct_file(objects.StructType):
[docs] def get_dentry(self) -> interfaces.objects.ObjectInterface:
"""Returns a pointer to the dentry associated with this file"""
if self.has_member("f_path"):
return self.f_path.dentry
elif self.has_member("f_dentry"):
return self.f_dentry
else:
raise AttributeError("Unable to find file -> dentry")
[docs] def get_vfsmnt(self) -> interfaces.objects.ObjectInterface:
"""Returns the fs (vfsmount) where this file is mounted"""
if self.has_member("f_path"):
return self.f_path.mnt
elif self.has_member("f_vfsmnt"):
return self.f_vfsmnt
else:
raise AttributeError("Unable to find file -> vfs mount")
[docs] def get_inode(self) -> interfaces.objects.ObjectInterface:
"""Returns an inode associated with this file"""
inode_ptr = None
if self.has_member("f_inode") and self.f_inode and self.f_inode.is_readable():
# Try first the cached value, kernels +3.9
inode_ptr = self.f_inode
if not (inode_ptr and inode_ptr.is_readable() and inode_ptr.is_valid()):
dentry_ptr = self.get_dentry()
if not (dentry_ptr and dentry_ptr.is_readable()):
return None
inode_ptr = dentry_ptr.d_inode
if not (inode_ptr and inode_ptr.is_readable() and inode_ptr.is_valid()):
return None
return inode_ptr.dereference()
[docs]class list_head(objects.StructType, collections.abc.Iterable):
[docs] def to_list(
self,
symbol_type: str,
member: str,
forward: bool = True,
sentinel: bool = True,
layer: Optional[str] = None,
) -> Iterator[interfaces.objects.ObjectInterface]:
"""Returns an iterator of the entries in the list.
Args:
symbol_type: Type of the list elements
member: Name of the list_head member in the list elements
forward: Set false to go backwards
sentinel: Whether self is a "sentinel node", meaning it is not embedded in a member of the list
Sentinel nodes are NOT yielded. See https://en.wikipedia.org/wiki/Sentinel_node for further reference
layer: Name of layer to read from
Yields:
Objects of the type specified via the "symbol_type" argument.
"""
layer = layer or self.vol.layer_name
relative_offset = self._context.symbol_space.get_type(
symbol_type
).relative_child_offset(member)
direction = "prev"
if forward:
direction = "next"
try:
link = getattr(self, direction).dereference()
except exceptions.InvalidAddressException:
return None
if not sentinel:
yield self._context.object(
symbol_type, layer, offset=self.vol.offset - relative_offset
)
seen = {self.vol.offset}
while link.vol.offset not in seen:
obj = self._context.object(
symbol_type, layer, offset=link.vol.offset - relative_offset
)
yield obj
seen.add(link.vol.offset)
try:
link = getattr(link, direction).dereference()
except exceptions.InvalidAddressException:
break
def __iter__(self) -> Iterator[interfaces.objects.ObjectInterface]:
return self.to_list(self.vol.parent.vol.type_name, self.vol.member_name)
[docs]class hlist_head(objects.StructType):
[docs] def to_list(
self,
symbol_type: str,
member: str,
) -> Iterator[interfaces.objects.ObjectInterface]:
"""Returns an iterator of the entries in the list.
This is a doubly linked list; however, it is not circular, so the 'forward' field
doesn't make sense. Also, the sentinel concept doesn't make sense here either;
unlike list_head, the head and nodes each have their own distinct types. A list_head
cannot be a node by itself.
- The 'pprev' of the first 'hlist_node' points to the 'hlist_head', not to the last node.
- The last element 'next' member is NULL
Args:
symbol_type: Type of the list elements
member: Name of the list_head member in the list elements
Yields:
Objects of the type specified via the "symbol_type" argument.
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
current = self.first
while current and current.is_readable():
yield linux.LinuxUtilities.container_of(
current, symbol_type, member, vmlinux
)
current = current.next
[docs]class files_struct(objects.StructType):
[docs] def get_fds(self) -> interfaces.objects.ObjectInterface:
if self.has_member("fdt"):
return self.fdt.fd.dereference()
elif self.has_member("fd"):
return self.fd.dereference()
else:
raise AttributeError("Unable to find files -> file descriptors")
[docs] def get_max_fds(self) -> interfaces.objects.ObjectInterface:
if self.has_member("fdt"):
return self.fdt.max_fds
elif self.has_member("max_fds"):
return self.max_fds
else:
raise AttributeError("Unable to find files -> maximum file descriptors")
[docs]class mount(objects.StructType):
MNT_NOSUID = 0x01
MNT_NODEV = 0x02
MNT_NOEXEC = 0x04
MNT_NOATIME = 0x08
MNT_NODIRATIME = 0x10
MNT_RELATIME = 0x20
MNT_READONLY = 0x40
MNT_SHRINKABLE = 0x100
MNT_WRITE_HOLD = 0x200
MNT_SHARED = 0x1000
MNT_UNBINDABLE = 0x2000
MNT_FLAGS = {
MNT_NOSUID: "nosuid",
MNT_NODEV: "nodev",
MNT_NOEXEC: "noexec",
MNT_NOATIME: "noatime",
MNT_NODIRATIME: "nodiratime",
MNT_RELATIME: "relatime",
}
[docs] def get_mnt_sb(self) -> int:
"""Returns a pointer to the super_block"""
if self.has_member("mnt"):
return self.mnt.mnt_sb
elif self.has_member("mnt_sb"):
return self.mnt_sb
else:
raise AttributeError("Unable to find mount -> super block")
[docs] def get_mnt_root(self):
if self.has_member("mnt"):
return self.mnt.mnt_root
elif self.has_member("mnt_root"):
return self.mnt_root
else:
raise AttributeError("Unable to find mount -> mount root")
[docs] def get_mnt_flags(self):
if self.has_member("mnt"):
return self.mnt.mnt_flags
elif self.has_member("mnt_flags"):
return self.mnt_flags
else:
raise AttributeError("Unable to find mount -> mount flags")
[docs] def get_mnt_parent(self):
"""Gets the fs where we are mounted on
Returns:
A mount pointer
"""
return self.mnt_parent
[docs] def get_mnt_mountpoint(self):
"""Gets the dentry of the mountpoint
Returns:
A dentry pointer
"""
return self.mnt_mountpoint
[docs] def get_parent_mount(self):
return self.mnt.get_parent_mount()
[docs] def has_parent(self) -> bool:
"""Checks if this mount has a parent
Returns:
bool: 'True' if this mount has a parent
"""
return self.mnt_parent != self.vol.offset
[docs] def get_vfsmnt_current(self):
"""Returns the fs where we are mounted on
Returns:
A 'vfsmount'
"""
return self.mnt
[docs] def get_vfsmnt_parent(self):
"""Gets the parent fs (vfsmount) to where it's mounted on
Returns:
A 'vfsmount'
"""
return self.get_mnt_parent().get_vfsmnt_current()
[docs] def get_dentry_current(self):
"""Returns the root of the mounted tree
Returns:
A dentry pointer
"""
vfsmnt = self.get_vfsmnt_current()
dentry = vfsmnt.mnt_root
return dentry
[docs] def get_dentry_parent(self):
"""Returns the parent root of the mounted tree
Returns:
A dentry pointer
"""
return self.get_mnt_parent().get_dentry_current()
[docs] def get_flags_access(self) -> str:
return "ro" if self.get_mnt_flags() & self.MNT_READONLY else "rw"
[docs] def get_flags_opts(self) -> Iterable[str]:
flags = [
self.MNT_FLAGS[mntflag]
for mntflag in self.MNT_FLAGS
if mntflag & self.get_mnt_flags()
]
return flags
[docs] def is_shared(self) -> bool:
return self.get_mnt_flags() & self.MNT_SHARED
[docs] def is_unbindable(self) -> bool:
return self.get_mnt_flags() & self.MNT_UNBINDABLE
[docs] def is_slave(self) -> bool:
return self.mnt_master and self.mnt_master.vol.offset != 0
[docs] def get_devname(self) -> str:
return utility.pointer_to_string(self.mnt_devname, count=255)
[docs] def get_dominating_id(self, root) -> int:
"""Get ID of closest dominating peer group having a representative under the given root."""
mnt_seen = set()
current_mnt = self.mnt_master
while (
current_mnt
and current_mnt.vol.offset != 0
and current_mnt.vol.offset not in mnt_seen
):
peer = current_mnt.get_peer_under_root(self.mnt_ns, root)
if peer and peer.vol.offset != 0:
return peer.mnt_group_id
mnt_seen.add(current_mnt.vol.offset)
current_mnt = current_mnt.mnt_master
return 0
[docs] def get_peer_under_root(self, ns, root):
"""Return true if path is reachable from root.
It mimics the kernel function is_path_reachable(), ref: fs/namespace.c
"""
mnt_seen = set()
current_mnt = self
while current_mnt.vol.offset not in mnt_seen:
if current_mnt.mnt_ns == ns and current_mnt.is_path_reachable(
current_mnt.mnt.mnt_root, root
):
return current_mnt
mnt_seen.add(current_mnt.vol.offset)
current_mnt = current_mnt.next_peer()
if current_mnt.vol.offset == self.vol.offset:
break
return None
[docs] def is_path_reachable(self, current_dentry, root):
"""Return true if path is reachable.
It mimics the kernel function with same name, ref fs/namespace.c:
"""
mnt_seen = set()
current_mnt = self
while (
current_mnt.mnt.vol.offset != root.mnt
and current_mnt.has_parent()
and current_mnt.vol.offset not in mnt_seen
):
current_dentry = current_mnt.mnt_mountpoint
mnt_seen.add(current_mnt.vol.offset)
current_mnt = current_mnt.mnt_parent
return current_mnt.mnt.vol.offset == root.mnt and current_dentry.is_subdir(
root.dentry
)
[docs] def next_peer(self):
table_name = self.vol.type_name.split(constants.BANG)[0]
mount_struct = "{0}{1}mount".format(table_name, constants.BANG)
offset = self._context.symbol_space.get_type(
mount_struct
).relative_child_offset("mnt_share")
return self._context.object(
mount_struct,
self.vol.layer_name,
offset=self.mnt_share.next.vol.offset - offset,
)
[docs]class vfsmount(objects.StructType):
[docs] def is_valid(self):
return (
self.get_mnt_sb() != 0
and self.get_mnt_root() != 0
and self.get_mnt_parent() != 0
)
def _is_kernel_prior_to_struct_mount(self) -> bool:
"""Helper to distinguish between kernels prior to version 3.3.8 that
lacked the 'mount' structure and later versions that have it.
The 'mnt_parent' member was moved from struct 'vfsmount' to struct
'mount' when the latter was introduced.
Alternatively, vmlinux.has_type('mount') can be used here but it is faster.
Returns:
bool: 'True' if the kernel
"""
return self.has_member("mnt_parent")
[docs] def is_equal(self, vfsmount_ptr) -> bool:
"""Helper to make sure it is comparing two pointers to 'vfsmount'.
Depending on the kernel version, the calling object (self) could be
a 'vfsmount \\*' (<3.3.8) or a 'vfsmount' (>=3.3.8). This way we trust
in the framework "auto" dereferencing ability to assure that when we
reach this point 'self' will be a 'vfsmount' already and self.vol.offset
a 'vfsmount \\*' and not a 'vfsmount \\*\\*'. The argument must be a 'vfsmount \\*'.
Typically, it's called from do_get_path().
Args:
vfsmount_ptr (vfsmount *): A pointer to a 'vfsmount'
Raises:
exceptions.VolatilityException: If vfsmount_ptr is not a 'vfsmount \\*'
Returns:
bool: 'True' if the given argument points to the the same 'vfsmount'
as 'self'.
"""
if isinstance(vfsmount_ptr, objects.Pointer):
return self.vol.offset == vfsmount_ptr
else:
raise exceptions.VolatilityException(
"Unexpected argument type. It has to be a 'vfsmount *'"
)
def _get_real_mnt(self):
"""Gets the struct 'mount' containing this 'vfsmount'.
It should be only called from kernels >= 3.3.8 when 'struct mount' was introduced.
Returns:
mount: the struct 'mount' containing this 'vfsmount'.
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
return linux.LinuxUtilities.container_of(
self.vol.offset, "mount", "mnt", vmlinux
)
[docs] def get_vfsmnt_current(self):
"""Returns the current fs where we are mounted on
Returns:
A vfsmount pointer
"""
return self.get_mnt_parent()
[docs] def get_vfsmnt_parent(self):
"""Gets the parent fs (vfsmount) to where it's mounted on
Returns:
For kernels < 3.3.8: A vfsmount pointer
For kernels >= 3.3.8: A vfsmount object
"""
if self._is_kernel_prior_to_struct_mount():
return self.get_mnt_parent()
else:
return self._get_real_mnt().get_vfsmnt_parent()
[docs] def get_dentry_current(self):
"""Returns the root of the mounted tree
Returns:
A dentry pointer
"""
if self._is_kernel_prior_to_struct_mount():
return self.get_mnt_mountpoint()
else:
return self._get_real_mnt().get_dentry_current()
[docs] def get_dentry_parent(self):
"""Returns the parent root of the mounted tree
Returns:
A dentry pointer
"""
if self._is_kernel_prior_to_struct_mount():
return self.get_mnt_mountpoint()
else:
return self._get_real_mnt().get_mnt_mountpoint()
[docs] def get_mnt_parent(self):
"""Gets the mnt_parent member.
Returns:
For kernels < 3.3.8: A vfsmount pointer
For kernels >= 3.3.8: A mount pointer
"""
if self._is_kernel_prior_to_struct_mount():
return self.mnt_parent
else:
return self._get_real_mnt().get_mnt_parent()
[docs] def get_mnt_mountpoint(self):
"""Gets the dentry of the mountpoint
Returns:
A dentry pointer
"""
if self.has_member("mnt_mountpoint"):
return self.mnt_mountpoint
else:
return self._get_real_mnt().mnt_mountpoint
[docs] def get_mnt_root(self):
return self.mnt_root
[docs] def has_parent(self) -> bool:
if self._is_kernel_prior_to_struct_mount():
return self.mnt_parent != self.vol.offset
else:
return self._get_real_mnt().has_parent()
[docs] def get_mnt_sb(self):
"""Returns a pointer to the super_block"""
return self.mnt_sb
[docs] def get_flags_access(self) -> str:
return "ro" if self.mnt_flags & mount.MNT_READONLY else "rw"
[docs] def get_flags_opts(self) -> Iterable[str]:
flags = [
mntflagtxt
for mntflag, mntflagtxt in mount.MNT_FLAGS.items()
if mntflag & self.mnt_flags != 0
]
return flags
[docs] def get_mnt_flags(self):
return self.mnt_flags
[docs] def is_shared(self) -> bool:
return self.get_mnt_flags() & mount.MNT_SHARED
[docs] def is_unbindable(self) -> bool:
return self.get_mnt_flags() & mount.MNT_UNBINDABLE
[docs] def is_slave(self) -> bool:
return self.mnt_master and self.mnt_master.vol.offset != 0
[docs] def get_devname(self) -> str:
return utility.pointer_to_string(self.mnt_devname, count=255)
[docs]class kobject(objects.StructType):
[docs] def reference_count(self):
refcnt = self.kref.refcount
if refcnt.has_member("counter"):
ret = refcnt.counter
else:
ret = refcnt.refs.counter
return ret
[docs]class mnt_namespace(objects.StructType):
[docs] def get_inode(self):
if self.has_member("proc_inum"):
return self.proc_inum
elif self.has_member("ns") and self.ns.has_member("inum"):
return self.ns.inum
else:
raise AttributeError("Unable to find mnt_namespace inode")
[docs] def get_mount_points(
self,
) -> Iterator[interfaces.objects.ObjectInterface]:
"""Yields the mount points for this mount namespace.
Yields:
mount struct instances
"""
table_name = self.vol.type_name.split(constants.BANG)[0]
if self.has_member("list"):
# kernels < 6.8
mnt_type = table_name + constants.BANG + "mount"
if not self._context.symbol_space.has_type(mnt_type):
# In kernels < 3.3, the 'mount' struct didn't exist, and the 'mnt_list'
# member was part of the 'vfsmount' struct.
mnt_type = table_name + constants.BANG + "vfsmount"
yield from self.list.to_list(mnt_type, "mnt_list")
elif (
self.has_member("mounts")
and self.mounts.vol.type_name == table_name + constants.BANG + "rb_root"
):
# kernels >= 6.8
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(
self._context, self
)
for node in self.mounts.get_nodes():
mnt = linux.LinuxUtilities.container_of(
node, "mount", "mnt_list", vmlinux
)
yield mnt
else:
raise exceptions.VolatilityException(
"Unsupported kernel mount namespace implementation"
)
[docs]class net(objects.StructType):
[docs] def get_inode(self):
if self.has_member("proc_inum"):
# 3.8.13 <= kernel < 3.19.8
return self.proc_inum
elif self.has_member("ns") and self.ns.has_member("inum"):
# kernel >= 3.19.8
return self.ns.inum
else:
# kernel < 3.8.13
raise AttributeError("Unable to find net_namespace inode")
[docs]class socket(objects.StructType):
def _get_vol_kernel(self):
symbol_table_arr = self.vol.type_name.split("!", 1)
symbol_table = symbol_table_arr[0] if len(symbol_table_arr) == 2 else None
module_names = list(
self._context.modules.get_modules_by_symbol_tables(symbol_table)
)
if not module_names:
raise ValueError(f"No module using the symbol table {symbol_table}")
kernel_module_name = module_names[0]
kernel = self._context.modules[kernel_module_name]
return kernel
[docs] def get_inode(self):
try:
kernel = self._get_vol_kernel()
except ValueError:
return 0
socket_alloc = linux.LinuxUtilities.container_of(
self.vol.offset, "socket_alloc", "socket", kernel
)
vfs_inode = socket_alloc.vfs_inode
return vfs_inode.i_ino
[docs] def get_state(self):
socket_state_idx = self.state
if 0 <= socket_state_idx < len(linux_constants.SOCKET_STATES):
return linux_constants.SOCKET_STATES[socket_state_idx]
[docs]class sock(objects.StructType):
[docs] def get_family(self):
family_idx = self.__sk_common.skc_family
if 0 <= family_idx < len(linux_constants.SOCK_FAMILY):
return linux_constants.SOCK_FAMILY[family_idx]
[docs] def get_type(self):
return linux_constants.SOCK_TYPES.get(self.sk_type, "")
[docs] def get_inode(self):
if not self.sk_socket:
return 0
return self.sk_socket.get_inode()
[docs] def get_protocol(self):
return None
[docs] def get_state(self):
# Return the generic socket state
if self.has_member("sk"):
return self.sk.sk_socket.get_state()
return self.sk_socket.get_state()
[docs]class unix_sock(objects.StructType):
[docs] def get_name(self):
if not self.addr:
return None
sockaddr_un = self.addr.name.cast("sockaddr_un")
saddr = str(utility.array_to_string(sockaddr_un.sun_path))
return saddr
[docs] def get_protocol(self):
return None
[docs] def get_state(self):
"""Return a string representing the sock state."""
# Unix socket states reuse (a subset) of the inet_sock states contants
if self.sk.get_type() == "STREAM":
state_idx = self.sk.__sk_common.skc_state
if 0 <= state_idx < len(linux_constants.TCP_STATES):
return linux_constants.TCP_STATES[state_idx]
else:
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs] def get_inode(self):
return self.sk.get_inode()
[docs]class inet_sock(objects.StructType):
[docs] def get_family(self):
family_idx = self.sk.__sk_common.skc_family
if 0 <= family_idx < len(linux_constants.SOCK_FAMILY):
return linux_constants.SOCK_FAMILY[family_idx]
[docs] def get_protocol(self):
# If INET6 family and a proto is defined, we use that specific IPv6 protocol.
# Otherwise, we use the standard IP protocol.
protocol = linux_constants.IP_PROTOCOLS.get(self.sk.sk_protocol)
if self.get_family() == "AF_INET6":
protocol = linux_constants.IPV6_PROTOCOLS.get(self.sk.sk_protocol, protocol)
return protocol
[docs] def get_state(self):
"""Return a string representing the sock state."""
if self.sk.get_type() == "STREAM":
state_idx = self.sk.__sk_common.skc_state
if 0 <= state_idx < len(linux_constants.TCP_STATES):
return linux_constants.TCP_STATES[state_idx]
else:
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs] def get_src_port(self):
sport_le = getattr(self, "sport", getattr(self, "inet_sport", None))
if sport_le is not None:
return socket_module.htons(sport_le)
[docs] def get_dst_port(self):
sk_common = self.sk.__sk_common
if hasattr(sk_common, "skc_portpair"):
dport_le = sk_common.skc_portpair & 0xFFFF
elif hasattr(self, "dport"):
dport_le = self.dport
elif hasattr(self, "inet_dport"):
dport_le = self.inet_dport
elif hasattr(sk_common, "skc_dport"):
dport_le = sk_common.skc_dport
else:
return None
return socket_module.htons(dport_le)
[docs] def get_src_addr(self):
sk_common = self.sk.__sk_common
family = sk_common.skc_family
if family == socket_module.AF_INET:
addr_size = 4
if hasattr(self, "rcv_saddr"):
saddr = self.rcv_saddr
elif hasattr(self, "inet_rcv_saddr"):
saddr = self.inet_rcv_saddr
else:
saddr = sk_common.skc_rcv_saddr
elif family == socket_module.AF_INET6:
addr_size = 16
saddr = self.pinet6.saddr
else:
return None
parent_layer = self._context.layers[self.vol.layer_name]
try:
addr_bytes = parent_layer.read(saddr.vol.offset, addr_size)
except exceptions.InvalidAddressException:
vollog.debug(
f"Unable to read socket src address from {saddr.vol.offset:#x}"
)
return None
return socket_module.inet_ntop(family, addr_bytes)
[docs] def get_dst_addr(self):
sk_common = self.sk.__sk_common
family = sk_common.skc_family
if family == socket_module.AF_INET:
if hasattr(self, "daddr") and self.daddr:
daddr = self.daddr
elif hasattr(self, "inet_daddr") and self.inet_daddr:
daddr = self.inet_daddr
else:
daddr = sk_common.skc_daddr
addr_size = 4
elif family == socket_module.AF_INET6:
if hasattr(self.pinet6, "daddr"):
daddr = self.pinet6.daddr
else:
daddr = sk_common.skc_v6_daddr
addr_size = 16
else:
return None
parent_layer = self._context.layers[self.vol.layer_name]
try:
addr_bytes = parent_layer.read(daddr.vol.offset, addr_size)
except exceptions.InvalidAddressException:
vollog.debug(
f"Unable to read socket dst address from {daddr.vol.offset:#x}"
)
return None
return socket_module.inet_ntop(family, addr_bytes)
[docs]class netlink_sock(objects.StructType):
[docs] def get_protocol(self):
protocol_idx = self.sk.sk_protocol
if 0 <= protocol_idx < len(linux_constants.NETLINK_PROTOCOLS):
return linux_constants.NETLINK_PROTOCOLS[protocol_idx]
[docs] def get_state(self):
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs] def get_portid(self):
if self.has_member("pid"):
# kernel < 3.7.10
return self.pid
if self.has_member("portid"):
# kernel >= 3.7.10
return self.portid
else:
raise AttributeError("Unable to find a source port id")
[docs] def get_dst_portid(self):
if self.has_member("dst_pid"):
# kernel < 3.7.10
return self.dst_pid
if self.has_member("dst_portid"):
# kernel >= 3.7.10
return self.dst_portid
else:
raise AttributeError("Unable to find a destination port id")
[docs]class vsock_sock(objects.StructType):
[docs] def get_protocol(self):
# The protocol should always be 0 for vsocks
return None
[docs] def get_state(self):
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs]class packet_sock(objects.StructType):
[docs] def get_protocol(self):
eth_proto = socket_module.htons(self.num)
if eth_proto == 0:
return None
elif eth_proto in linux_constants.ETH_PROTOCOLS:
return linux_constants.ETH_PROTOCOLS[eth_proto]
else:
return f"0x{eth_proto:x}"
[docs] def get_state(self):
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs]class bt_sock(objects.StructType):
[docs] def get_protocol(self):
type_idx = self.sk.sk_protocol
if 0 <= type_idx < len(linux_constants.BLUETOOTH_PROTOCOLS):
return linux_constants.BLUETOOTH_PROTOCOLS[type_idx]
[docs] def get_state(self):
state_idx = self.sk.__sk_common.skc_state
if 0 <= state_idx < len(linux_constants.BLUETOOTH_STATES):
return linux_constants.BLUETOOTH_STATES[state_idx]
[docs]class xdp_sock(objects.StructType):
[docs] def get_protocol(self):
# The protocol should always be 0 for xdp_sock
return None
[docs] def get_state(self):
# xdp_sock.state is an enum
return self.state.lookup()
[docs]class bpf_prog(objects.StructType):
[docs] def get_type(self) -> Union[str, None]:
"""Returns a string with the eBPF program type"""
# The program type was in `bpf_prog_aux::prog_type` from 3.18.140 to
# 4.1.52 before it was moved to `bpf_prog::type`
if self.has_member("type"):
# kernel >= 4.1.52
return self.type.description
if self.has_member("aux") and self.aux:
if self.aux.has_member("prog_type"):
# 3.18.140 <= kernel < 4.1.52
return self.aux.prog_type.description
# kernel < 3.18.140
return None
[docs] def get_tag(self) -> Union[str, None]:
"""Returns a string with the eBPF program tag"""
# 'tag' was added in kernels 4.10
if not self.has_member("tag"):
return None
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
prog_tag_addr = self.tag.vol.offset
prog_tag_size = self.tag.count
prog_tag_bytes = vmlinux_layer.read(prog_tag_addr, prog_tag_size)
prog_tag = binascii.hexlify(prog_tag_bytes).decode()
return prog_tag
[docs] def get_name(self) -> Union[str, None]:
"""Returns a string with the eBPF program name"""
if not self.has_member("aux"):
# 'prog_aux' was added in kernels 3.18
return None
return self.aux.get_name()
[docs]class bpf_prog_aux(objects.StructType):
[docs] def get_name(self) -> Union[str, None]:
"""Returns a string with the eBPF program name"""
if not self.has_member("name"):
# 'name' was added in kernels 4.15
return None
if not self.name:
return None
return utility.array_to_string(self.name)
[docs]class cred(objects.StructType):
# struct cred was added in kernels 2.6.29
def _get_cred_int_value(self, member: str) -> int:
"""Helper to obtain the right cred member value for the current kernel.
Args:
member (str): The requested cred member name to obtain its value
Raises:
AttributeError: When the requested cred member doesn't exist
AttributeError: When the cred implementation is not supported.
Returns:
int: The cred member value
"""
if not self.has_member(member):
raise AttributeError(f"struct cred doesn't have a '{member}' member")
cred_val = self.member(member)
if hasattr(cred_val, "val"):
# From kernels 3.5.7 on it is a 'kuid_t' type
value = cred_val.val
elif isinstance(cred_val, objects.Integer):
# From at least 2.6.30 and until 3.5.7 it was a 'uid_t' type which was an 'unsigned int'
value = cred_val
else:
raise AttributeError("Kernel struct cred is not supported")
return int(value)
@property
def euid(self):
"""Returns the effective user ID
Returns:
int: the effective user ID value
"""
return self._get_cred_int_value("euid")
[docs]class kernel_cap_struct(objects.StructType):
# struct kernel_cap_struct exists from 2.1.92 <= kernels < 6.3
[docs] @classmethod
def get_last_cap_value(cls) -> int:
"""Returns the latest capability ID supported by the framework.
Returns:
int: The latest capability ID supported by the framework.
"""
return len(linux_constants.CAPABILITIES) - 1
[docs] def get_kernel_cap_full(self) -> int:
"""Return the maximum value allowed for this kernel for a capability
Returns:
int: The capability full bitfield mask
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
try:
cap_last_cap = vmlinux.object_from_symbol(symbol_name="cap_last_cap")
except exceptions.SymbolError:
# It should be a kernel < 3.2, let's use our list of capabilities
cap_last_cap = self.get_last_cap_value()
return (1 << cap_last_cap + 1) - 1
[docs] @classmethod
def capabilities_to_string(cls, capabilities_bitfield: int) -> List[str]:
"""Translates a capability bitfield to a list of capability strings.
Args:
capabilities_bitfield (int): The capability bitfield value.
Returns:
List[str]: A list of capability strings.
"""
capabilities = []
for bit, name in enumerate(linux_constants.CAPABILITIES):
if capabilities_bitfield & (1 << bit) != 0:
capabilities.append(name)
return capabilities
[docs] def get_capabilities(self) -> int:
"""Returns the capability bitfield value
Returns:
int: The capability bitfield value.
"""
if not self.has_member("cap"):
raise exceptions.VolatilityException(
"Unsupported kernel capabilities implementation"
)
if isinstance(self.cap, objects.Array):
if len(self.cap) == 1:
# At least in the vanilla kernel, from 2.6.24 to 2.6.25
# kernel_cap_struct::cap become a two elements array.
# However, in some distros or custom kernel can technically
# be _KERNEL_CAPABILITY_U32S = _LINUX_CAPABILITY_U32S_1
# Leaving this code here for the sake of ensuring completeness.
cap_value = self.cap[0]
elif len(self.cap) == 2:
# In 2.6.25.x <= kernels < 6.3 kernel_cap_struct::cap is a two
# elements __u32 array that constitutes a 64bit bitfield.
cap_value = (self.cap[1] << 32) | self.cap[0]
else:
raise exceptions.VolatilityException(
"Unsupported kernel capabilities implementation"
)
else:
# In kernels < 2.6.25.x kernel_cap_struct::cap is a __u32
cap_value = self.cap
return cap_value & self.get_kernel_cap_full()
[docs] def enumerate_capabilities(self) -> List[str]:
"""Returns the list of capability strings.
Returns:
List[str]: The list of capability strings.
"""
capabilities_value = self.get_capabilities()
return self.capabilities_to_string(capabilities_value)
[docs] def has_capability(self, capability: str) -> bool:
"""Checks if the given capability string is enabled.
Args:
capability (str): A string representing the capability i.e. dac_read_search
Raises:
AttributeError: If the given capability is unknown to the framework.
Returns:
bool: "True" if the given capability is enabled.
"""
if capability not in linux_constants.CAPABILITIES:
raise AttributeError(f"Unknown capability with name '{capability}'")
cap_value = 1 << linux_constants.CAPABILITIES.index(capability)
return cap_value & self.get_capabilities() != 0
[docs]class kernel_cap_t(kernel_cap_struct):
# In kernels 6.3 kernel_cap_struct became the kernel_cap_t typedef
[docs] def get_capabilities(self) -> int:
"""Returns the capability bitfield value
Returns:
int: The capability bitfield value.
"""
if self.has_member("val"):
# In kernels >= 6.3 kernel_cap_t::val is a u64
cap_value = self.val
else:
raise exceptions.VolatilityException(
"Unsupported kernel capabilities implementation"
)
return cap_value & self.get_kernel_cap_full()
[docs]class Timespec64Abstract(abc.ABC):
"""Abstract class to handle all required timespec64 operations, convertions and
adjustments."""
[docs] @classmethod
def new_from_timespec(cls, other) -> "Timespec64Concrete":
"""Creates a new instance from an Timespec64Abstract subclass object"""
if not isinstance(other, Timespec64Abstract):
raise TypeError("Requires an object subclass of Timespec64Abstract")
tv_sec = int(other.tv_sec)
tv_nsec = int(other.tv_nsec)
return Timespec64Concrete(tv_sec=tv_sec, tv_nsec=tv_nsec)
[docs] @classmethod
def new_from_nsec(cls, nsec) -> "Timespec64Concrete":
"""Creates a new instance from an integer in nanoseconds"""
# Based on ns_to_timespec64()
if nsec > 0:
tv_sec = nsec // linux_constants.NSEC_PER_SEC
tv_nsec = nsec % linux_constants.NSEC_PER_SEC
elif nsec < 0:
tv_sec = -((-nsec - 1) // linux_constants.NSEC_PER_SEC) - 1
rem = (-nsec - 1) % linux_constants.NSEC_PER_SEC
tv_nsec = linux_constants.NSEC_PER_SEC - rem - 1
else:
tv_sec = tv_nsec = 0
return Timespec64Concrete(tv_sec=tv_sec, tv_nsec=tv_nsec)
[docs] def to_datetime(self) -> datetime.datetime:
"""Converts this Timespec64Abstract subclass object to a UTC aware datetime"""
# pylint: disable=E1101
return conversion.unixtime_to_datetime(
self.tv_sec + self.tv_nsec / linux_constants.NSEC_PER_SEC
)
[docs] def to_timedelta(self) -> datetime.timedelta:
"""Converts this Timespec64Abstract subclass object to timedelta"""
# pylint: disable=E1101
return datetime.timedelta(
seconds=self.tv_sec + self.tv_nsec / linux_constants.NSEC_PER_SEC
)
def __add__(self, other) -> "Timespec64Concrete":
"""Returns a new Timespec64Concrete object that sums the current values with those
in the timespec argument"""
if not isinstance(other, Timespec64Abstract):
raise TypeError("Requires an object subclass of Timespec64Abstract")
# pylint: disable=E1101
result = Timespec64Concrete(
tv_sec=self.tv_sec + other.tv_sec,
tv_nsec=self.tv_nsec + other.tv_nsec,
)
result.normalize()
return result
def __sub__(self, other) -> "Timespec64Concrete":
"""Returns a new Timespec64Abstract object that subtracts the values in the timespec
argument from the current object's values"""
if not isinstance(other, Timespec64Abstract):
raise TypeError("Requires an object subclass of Timespec64Abstract")
return self + other.negate()
[docs] def negate(self) -> "Timespec64Concrete":
"""Returns a new Timespec64Concrete object with the values of the current object negated"""
# pylint: disable=E1101
result = Timespec64Concrete(
tv_sec=-self.tv_sec,
tv_nsec=-self.tv_nsec,
)
result.normalize()
return result
[docs] def normalize(self):
"""Normalize any overflow in tv_sec and tv_nsec."""
# Based on kernel's set_normalized_timespec64()
# pylint: disable=E1101
while self.tv_nsec >= linux_constants.NSEC_PER_SEC:
self.tv_nsec -= linux_constants.NSEC_PER_SEC
self.tv_sec += 1
while self.tv_nsec < 0:
self.tv_nsec += linux_constants.NSEC_PER_SEC
self.tv_sec -= 1
[docs]class Timespec64Concrete(Timespec64Abstract):
"""Handle all required timespec64 operations, convertions and adjustments.
This is used to dynamically create timespec64-like objects, each with its own variables
and the same methods as a timespec64 object extension.
"""
def __init__(self, tv_sec=0, tv_nsec=0):
self.tv_sec = tv_sec
self.tv_nsec = tv_nsec
[docs]class timespec64(Timespec64Abstract, objects.StructType):
"""Handle all required timespec64 operations, convertions and adjustments.
This works as an extension of the timespec64 object while maintaining the same methods
as a Timespec64Concrete object.
"""
[docs]class inode(objects.StructType):
[docs] def is_valid(self) -> bool:
# i_count is a 'signed' counter (atomic_t). Smear, or essentially a wrong inode
# pointer, will easily cause an integer overflow here.
return self.i_ino > 0 and self.i_count.counter >= 0
@property
def is_dir(self) -> bool:
"""Returns True if the inode is a directory"""
return stat.S_ISDIR(self.i_mode) != 0
@property
def is_reg(self) -> bool:
"""Returns True if the inode is a regular file"""
return stat.S_ISREG(self.i_mode) != 0
@property
def is_link(self) -> bool:
"""Returns True if the inode is a symlink"""
return stat.S_ISLNK(self.i_mode) != 0
@property
def is_fifo(self) -> bool:
"""Returns True if the inode is a FIFO"""
return stat.S_ISFIFO(self.i_mode) != 0
@property
def is_sock(self) -> bool:
"""Returns True if the inode is a socket"""
return stat.S_ISSOCK(self.i_mode) != 0
@property
def is_block(self) -> bool:
"""Returns True if the inode is a block device"""
return stat.S_ISBLK(self.i_mode) != 0
@property
def is_char(self) -> bool:
"""Returns True if the inode is a char device"""
return stat.S_ISCHR(self.i_mode) != 0
@property
def is_sticky(self) -> bool:
"""Returns True if the sticky bit is set"""
return (self.i_mode & stat.S_ISVTX) != 0
[docs] def get_inode_type(self) -> Union[str, None]:
"""Returns inode type name
Returns:
The inode type name
"""
if self.is_dir:
return "DIR"
elif self.is_reg:
return "REG"
elif self.is_link:
return "LNK"
elif self.is_fifo:
return "FIFO"
elif self.is_sock:
return "SOCK"
elif self.is_char:
return "CHR"
elif self.is_block:
return "BLK"
else:
return None
def _time_member_to_datetime(self, member) -> datetime.datetime:
if self.has_member(f"{member}_sec") and self.has_member(f"{member}_nsec"):
# kernels >= 6.11 it's i_*_sec -> time64_t and i_*_nsec -> u32
# Ref Linux commit 3aa63a569c64e708df547a8913c84e64a06e7853
return conversion.unixtime_to_datetime(
self.member(f"{member}_sec") + self.has_member(f"{member}_nsec") / 1e9
)
elif self.has_member(f"__{member}"):
# 6.6 <= kernels < 6.11 it's a timespec64
# Ref Linux commit 13bc24457850583a2e7203ded05b7209ab4bc5ef / 12cd44023651666bd44baa36a5c999698890debb
return self.member(f"__{member}").to_datetime()
elif self.has_member(member):
# In kernels < 6.6 it's a timespec64 or timespec
return self.member(member).to_datetime()
else:
raise exceptions.VolatilityException(
"Unsupported kernel inode type implementation"
)
[docs] def get_access_time(self) -> datetime.datetime:
"""Returns the inode's last access time
This is updated when inode contents are read
Returns:
A datetime with the inode's last access time
"""
return self._time_member_to_datetime("i_atime")
[docs] def get_modification_time(self) -> datetime.datetime:
"""Returns the inode's last modification time
This is updated when the inode contents change
Returns:
A datetime with the inode's last data modification time
"""
return self._time_member_to_datetime("i_mtime")
[docs] def get_change_time(self) -> datetime.datetime:
"""Returns the inode's last change time
This is updated when the inode metadata changes
Returns:
A datetime with the inode's last change time
"""
return self._time_member_to_datetime("i_ctime")
[docs] def get_file_mode(self) -> str:
"""Returns the inode's file mode as string of the form '-rwxrwxrwx'.
Returns:
The inode's file mode string
"""
return stat.filemode(self.i_mode)
[docs] def get_pages(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Gets the inode's cached pages
Yields:
The inode's cached pages
"""
if not self.i_size:
return
elif not (self.i_mapping and self.i_mapping.nrpages > 0):
return
page_cache = linux.PageCache(
context=self._context,
kernel_module_name="kernel",
page_cache=self.i_mapping.dereference(),
)
yield from page_cache.get_cached_pages()
[docs] def get_contents(self):
"""Get the inode cached pages from the page cache
Yields:
page_index (int): The page index in the Tree. File offset is page_index * PAGE_SIZE.
page_content (str): The page content
"""
for page_obj in self.get_pages():
page_index = int(page_obj.index)
page_content = page_obj.get_content()
yield page_index, page_content
[docs]class address_space(objects.StructType):
@property
def i_pages(self):
"""Returns the appropriate member containing the page cache tree"""
if self.has_member("i_pages"):
# Kernel >= 4.17
return self.member("i_pages")
elif self.has_member("page_tree"):
# Kernel < 4.17
return self.member("page_tree")
raise exceptions.VolatilityException("Unsupported page cache tree")
[docs]class page(objects.StructType):
@property
@functools.lru_cache()
def pageflags_enum(self) -> Dict:
"""Returns 'pageflags' enumeration key/values
Returns:
A dictionary with the pageflags enumeration key/values
"""
# FIXME: It would be even better to use @functools.cached_property instead,
# however, this requires Python +3.8
try:
pageflags_enum = self._context.symbol_space.get_enumeration(
self.get_symbol_table_name() + constants.BANG + "pageflags"
).choices
except exceptions.SymbolError:
vollog.debug(
"Unable to find pageflags enum. This can happen in kernels < 2.6.26 or wrong ISF"
)
# set to empty dict to show that the enum was not found, and so shouldn't be searched for again
pageflags_enum = {}
return pageflags_enum
[docs] def get_flags_list(self) -> List[str]:
"""Returns a list of page flags
Returns:
List of page flags
"""
flags = []
for name, value in self.pageflags_enum.items():
if self.flags & (1 << value) != 0:
flags.append(name)
return flags
[docs] def to_paddr(self) -> int:
"""Converts a page's virtual address to its physical address using the current physical memory model.
Returns:
int: page physical address
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
vmemmap_start = None
if vmlinux.has_symbol("mem_section"):
# SPARSEMEM_VMEMMAP physical memory model: memmap is virtually contiguous
if vmlinux.has_symbol("vmemmap_base"):
# CONFIG_DYNAMIC_MEMORY_LAYOUT - KASLR kernels >= 4.9
vmemmap_start = vmlinux.object_from_symbol("vmemmap_base")
else:
# !CONFIG_DYNAMIC_MEMORY_LAYOUT
if vmlinux_layer._maxvirtaddr < 57:
# 4-Level paging -> VMEMMAP_START = __VMEMMAP_BASE_L4
vmemmap_base_l4 = 0xFFFFEA0000000000
vmemmap_start = vmemmap_base_l4
else:
# 5-Level paging -> VMEMMAP_START = __VMEMMAP_BASE_L5
# FIXME: Once 5-level paging is supported, uncomment the following lines and remove the exception
# vmemmap_base_l5 = 0xFFD4000000000000
# vmemmap_start = vmemmap_base_l5
raise exceptions.VolatilityException(
"5-level paging is not yet supported"
)
elif vmlinux.has_symbol("mem_map"):
# FLATMEM physical memory model, typically 32bit
vmemmap_start = vmlinux.object_from_symbol("mem_map")
elif vmlinux.has_symbol("node_data"):
raise exceptions.VolatilityException("NUMA systems are not yet supported")
else:
raise exceptions.VolatilityException("Unsupported Linux memory model")
if not vmemmap_start:
raise exceptions.VolatilityException(
"Something went wrong, we shouldn't be here"
)
page_type_size = vmlinux.get_type("page").size
pagec = vmlinux_layer.canonicalize(self.vol.offset)
pfn = (pagec - vmemmap_start) // page_type_size
page_paddr = pfn * vmlinux_layer.page_size
return page_paddr
[docs] def get_content(self) -> Union[str, None]:
"""Returns the page content
Returns:
The page content
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
physical_layer = vmlinux.context.layers["memory_layer"]
page_paddr = self.to_paddr()
if not page_paddr:
return None
page_data = physical_layer.read(page_paddr, vmlinux_layer.page_size)
return page_data
[docs]class IDR(objects.StructType):
IDR_BITS = 8
IDR_MASK = (1 << IDR_BITS) - 1
INT_SIZE = 4
MAX_IDR_SHIFT = INT_SIZE * 8 - 1
MAX_IDR_BIT = 1 << MAX_IDR_SHIFT
[docs] def idr_max(self, num_layers: int) -> int:
"""Returns the maximum ID which can be allocated given idr::layers
Args:
num_layers: Number of layers
Returns:
Maximum ID for a given number of layers
"""
# Kernel < 4.17
bits = min([self.INT_SIZE, num_layers * self.IDR_BITS, self.MAX_IDR_SHIFT])
return (1 << bits) - 1
[docs] def idr_find(self, idr_id: int) -> int:
"""Finds an ID within the IDR data structure.
Based on idr_find_slowpath(), 3.9 <= Kernel < 4.11
Args:
idr_id: The IDR lookup ID
Returns:
A pointer to the given ID element
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
if not vmlinux.get_type("idr_layer").has_member("layer"):
vollog.info(
"Unsupported IDR implementation, it should be a very very old kernel, probabably < 2.6"
)
return None
if idr_id < 0:
return None
idr_layer = self.top
if not idr_layer:
return None
n = (idr_layer.layer + 1) * self.IDR_BITS
if idr_id > self.idr_max(idr_layer.layer + 1):
return None
assert n != 0
while n > 0 and idr_layer:
n -= self.IDR_BITS
assert n == idr_layer.layer * self.IDR_BITS
idr_layer = idr_layer.ary[(idr_id >> n) & self.IDR_MASK]
return idr_layer
def _old_kernel_get_entries(self) -> Iterable[int]:
# Kernels < 4.11
cur = self.cur
total = next_id = 0
while next_id < cur:
entry = self.idr_find(next_id)
if entry:
yield entry
total += 1
next_id += 1
def _new_kernel_get_entries(self) -> Iterable[int]:
# Kernels >= 4.11
id_storage = linux.IDStorage.choose_id_storage(
self._context, kernel_module_name="kernel"
)
for page_addr in id_storage.get_entries(root=self.idr_rt):
yield page_addr
[docs] def get_entries(self) -> Iterable[int]:
"""Walks the IDR and yield a pointer associated with each element.
Args:
in_use (int, optional): _description_. Defaults to 0.
Yields:
A pointer associated with each element.
"""
if self.has_member("idr_rt"):
# Kernels >= 4.11
get_entries_func = self._new_kernel_get_entries
else:
# Kernels < 4.11
get_entries_func = self._old_kernel_get_entries
for page_addr in get_entries_func():
yield page_addr
[docs]class rb_root(objects.StructType):
def _walk_nodes(self, root_node) -> Iterator[int]:
"""Traverses the Red-Black tree from the root node and yields a pointer to each
node in this tree.
Args:
root_node: A Red-Black tree node from which to start descending
Yields:
A pointer to every node descending from the specified root node
"""
if not root_node:
return
yield root_node
yield from self._walk_nodes(root_node.rb_left)
yield from self._walk_nodes(root_node.rb_right)
[docs] def get_nodes(self) -> Iterator[int]:
"""Yields a pointer to each node in the Red-Black tree
Yields:
A pointer to every node in the Red-Black tree
"""
yield from self._walk_nodes(root_node=self.rb_node)