# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import abc
import collections.abc
import logging
import functools
import binascii
import stat
import datetime
import uuid
from typing import (
Generator,
Iterable,
Iterator,
Optional,
Tuple,
List,
Union,
Dict,
Callable,
)
from volatility3.framework import constants, exceptions, objects, interfaces, symbols
from volatility3.framework.renderers import conversion
from volatility3.framework.constants import linux as linux_constants
from volatility3.framework.layers import linear, intel
from volatility3.framework.objects import utility
from volatility3.framework.symbols import generic, linux, intermed
from volatility3.framework.symbols.linux.extensions import elf
vollog = logging.getLogger(__name__)
# Keep these in a basic module, to prevent import cycles when symbol providers require them
[docs]
class module(generic.GenericIntelProcess):
[docs]
def is_valid(self):
"""Determine whether it is a valid module object by verifying the self-referential
in module_kobject. This also confirms that the module is actively allocated and
not a remnant of freed memory or a failed module load attempt by verifying the
module memory section sizes.
"""
layer = self._context.layers[self.vol.layer_name]
# Make sure the entire module content is readable
if not layer.is_valid(self.vol.offset, self.vol.size):
return False
core_size = self.get_core_size()
core_text_size = self.get_core_text_size()
init_size = self.get_init_size()
if not (
0 < core_text_size <= linux_constants.MODULE_MAXIMUM_CORE_TEXT_SIZE
and 0 < core_size <= linux_constants.MODULE_MAXIMUM_CORE_SIZE
and core_size + init_size >= linux_constants.MODULE_MINIMUM_SIZE
):
return False
if not (
self.mkobj
and self.mkobj.mod
and self.mkobj.mod.is_readable()
and self.mkobj.mod == self.vol.offset
):
return False
return True
@functools.cached_property
def mod_mem_type(self) -> Dict:
"""Return the mod_mem_type enum choices if available or an empty dict if not"""
# mod_mem_type and module_memory were added in kernel 6.4 which replaces
# module_layout for storing the information around core_layout etc.
# see commit ac3b43283923440900b4f36ca5f9f0b1ca43b70e for more information
symbol_table_name = self.get_symbol_table_name()
mod_mem_type_symname = symbol_table_name + constants.BANG + "mod_mem_type"
symbol_space = self._context.symbol_space
try:
mod_mem_type = symbol_space.get_enumeration(mod_mem_type_symname).choices
except exceptions.SymbolError:
mod_mem_type = {}
vollog.debug(
"Unable to find mod_mem_type enum. This message can be ignored for kernels < 6.4"
)
return mod_mem_type
def _get_mem_type(self, mod_mem_type_name):
module_mem_index = self.mod_mem_type.get(mod_mem_type_name)
if module_mem_index is None:
raise AttributeError(f"Unknown module memory type '{mod_mem_type_name}'")
if not (0 <= module_mem_index < self.mem.count):
raise AttributeError(
f"Invalid module memory type index '{module_mem_index}'"
)
return self.mem[module_mem_index]
def _get_mem_size(self, mod_mem_type_name) -> int:
return self._get_mem_type(mod_mem_type_name).size
def _get_mem_base(self, mod_mem_type_name) -> int:
return self._get_mem_type(mod_mem_type_name).base
[docs]
def get_module_base(self) -> int:
if self.has_member("mem"): # kernels 6.4+
return self._get_mem_base("MOD_TEXT")
elif self.has_member("core_layout"):
return self.core_layout.base
elif self.has_member("module_core"):
return self.module_core
raise AttributeError("Unable to get module base")
[docs]
def get_init_size(self) -> int:
if self.has_member("mem"): # kernels 6.4+
return (
self._get_mem_size("MOD_INIT_TEXT")
+ self._get_mem_size("MOD_INIT_DATA")
+ self._get_mem_size("MOD_INIT_RODATA")
)
elif self.has_member("init_layout"):
return self.init_layout.size
elif self.has_member("init_size"):
return self.init_size
raise AttributeError("Unable to determine .init section size of module")
[docs]
def get_core_size(self) -> int:
if self.has_member("mem"): # kernels 6.4+
return (
self._get_mem_size("MOD_TEXT")
+ self._get_mem_size("MOD_DATA")
+ self._get_mem_size("MOD_RODATA")
+ self._get_mem_size("MOD_RO_AFTER_INIT")
)
elif self.has_member("core_layout"):
return self.core_layout.size
elif self.has_member("core_size"):
return self.core_size
raise AttributeError("Unable to determine core size of module")
[docs]
def get_core_text_size(self) -> int:
if self.has_member("mem"): # kernels 6.4+
return self._get_mem_size("MOD_TEXT")
elif self.has_member("core_layout"):
return self.core_layout.text_size
elif self.has_member("core_text_size"):
return self.core_text_size
raise AttributeError("Unable to determine core text size of module")
[docs]
def get_module_core(self) -> objects.Pointer:
if self.has_member("mem"): # kernels 6.4+
return self._get_mem_base("MOD_TEXT")
elif self.has_member("core_layout"):
return self.core_layout.base
elif self.has_member("module_core"):
return self.module_core
raise AttributeError("Unable to get module core")
[docs]
def get_module_init(self) -> objects.Pointer:
if self.has_member("mem"): # kernels 6.4+
return self._get_mem_base("MOD_INIT_TEXT")
elif self.has_member("init_layout"):
return self.init_layout.base
elif self.has_member("module_init"):
return self.module_init
raise AttributeError("Unable to get module init")
[docs]
def get_name(self) -> Optional[str]:
"""Get the name of the module as a string"""
try:
return utility.array_to_string(self.name)
except exceptions.InvalidAddressException:
return None
def _get_sect_count(self, grp: interfaces.objects.ObjectInterface) -> int:
"""Try to determine the number of valid sections. Support for kernels > 6.14-rc1.
Resources:
- https://github.com/torvalds/linux/commit/d8959b947a8dfab1047c6fd5e982808f65717bfe
- https://github.com/torvalds/linux/commit/e0349c46cb4fbbb507fa34476bd70f9c82bad359
"""
if grp.has_member("bin_attrs"):
arr_offset_ptr = grp.bin_attrs
arr_subtype = "bin_attribute"
else:
arr_offset_ptr = grp.attrs
arr_subtype = "attribute"
if not arr_offset_ptr.is_readable():
vollog.log(
constants.LOGLEVEL_V,
f"Cannot dereference the pointer to the NULL-terminated list of binary attributes for module at offset {self.vol.offset:#x}",
)
return 0
# We chose 100 as an arbitrary guard value to prevent
# looping forever in extreme cases, and because 100 is not expected
# to be a valid number of sections. If that still happens,
# Vol3 module processing will indicate that it is missing information
# with the following message:
# "Unable to reconstruct the ELF for module struct at"
# See PR #1773 for more information.
bin_attrs_list = utility.dynamically_sized_array_of_pointers(
context=self._context,
array=arr_offset_ptr.dereference(),
subtype=self.get_symbol_table_name() + constants.BANG + arr_subtype,
iterator_guard_value=100,
)
return len(bin_attrs_list)
@functools.cached_property
def number_of_sections(self) -> int:
# Dropped in 6.14-rc1: d8959b947a8dfab1047c6fd5e982808f65717bfe
if self.sect_attrs.has_member("nsections"):
return self.sect_attrs.nsections
return self._get_sect_count(self.sect_attrs.grp)
[docs]
def get_sections(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Get a list of section attributes for the given module."""
if self.number_of_sections == 0:
vollog.debug(
f"Invalid number of sections ({self.number_of_sections}) for module at offset {self.vol.offset:#x}"
)
return []
symbol_table_name = self.get_symbol_table_name()
arr = self._context.object(
symbol_table_name + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=self.sect_attrs.attrs.vol.offset,
subtype=self.sect_attrs.attrs.vol.subtype,
count=self.number_of_sections,
)
yield from arr
[docs]
def get_elf_table_name(self):
elf_table_name = intermed.IntermediateSymbolTable.create(
self._context,
"elf_symbol_table",
"linux",
"elf",
native_types=None,
class_types=elf.class_types,
)
return elf_table_name
[docs]
def get_symbols(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Get ELF symbol objects for this module"""
if not self.section_strtab or self.num_symtab < 1:
return None
elf_table_name = self.get_elf_table_name()
symbol_table_name = self.get_symbol_table_name()
is_64bit = symbols.symbol_table_is_64bit(
context=self._context, symbol_table_name=symbol_table_name
)
sym_name = "Elf64_Sym" if is_64bit else "Elf32_Sym"
sym_type = self._context.symbol_space.get_type(
elf_table_name + constants.BANG + sym_name
)
elf_syms = self._context.object(
symbol_table_name + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=self.section_symtab,
subtype=sym_type,
count=self.num_symtab,
)
for elf_sym_obj in elf_syms:
# Prepare the symbol object for methods like get_name()
elf_sym_obj.cached_strtab = self.section_strtab
yield elf_sym_obj
[docs]
def get_symbols_names_and_addresses(
self, max_symbols: int = 4096
) -> Iterable[Tuple[str, int]]:
"""Get names and addresses for each symbol of the module
Yields:
A tuple for each symbol containing the symbol name and its corresponding value
"""
layer = self._context.layers[self.vol.layer_name]
for iteration_counter, elf_sym_obj in enumerate(self.get_symbols()):
if iteration_counter > max_symbols:
vollog.debug(
f"Hit maximum symbols ({max_symbols}) for ELF at {self.vol.offset:#x} in layer {self.vol.layer_name}"
)
return
sym_name = elf_sym_obj.get_name()
if not sym_name:
continue
# Normalize sym.st_value offset, which is an address pointing to the symbol value
sym_address = elf_sym_obj.st_value & layer.address_mask
yield (sym_name, sym_address)
[docs]
@functools.lru_cache
def get_module_address_boundaries(self) -> Optional[Tuple[int, int]]:
"""Return the module address boundaries based on its symbol addresses"""
if not self.section_strtab or self.num_symtab < 1:
return None
elf_table_name = self.get_elf_table_name()
symbol_table_name = self.get_symbol_table_name()
is_64bit = symbols.symbol_table_is_64bit(
context=self._context, symbol_table_name=symbol_table_name
)
sym_name = "Elf64_Sym" if is_64bit else "Elf32_Sym"
sym_type = self._context.symbol_space.get_type(
elf_table_name + constants.BANG + sym_name
)
elf_syms = self._context.object(
symbol_table_name + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=self.section_symtab,
subtype=sym_type,
count=self.num_symtab,
)
# They should be sorted, but just in case
elf_syms_sorted = sorted(elf_syms, key=lambda x: x.st_value)
layer = self._context.layers[self.vol.layer_name]
# The first elf_sym is null
first_symbol = elf_syms_sorted[1]
last_symbol = elf_syms_sorted[-1]
minimum_address = first_symbol.st_value & layer.address_mask
maximum_address = (
last_symbol.st_value & layer.address_mask + last_symbol.st_size
)
return minimum_address, maximum_address
[docs]
def get_symbol(self, wanted_sym_name) -> Optional[int]:
"""Get symbol address for a given symbol name"""
for sym_name, sym_address in self.get_symbols_names_and_addresses():
if wanted_sym_name == sym_name:
return sym_address
return None
[docs]
def get_symbol_by_address(self, wanted_sym_address) -> Optional[str]:
"""Get symbol name for a given symbol address"""
for sym_name, sym_address in self.get_symbols_names_and_addresses():
if wanted_sym_address == sym_address:
return sym_name
return None
@property
def section_symtab(self) -> Optional[interfaces.objects.ObjectInterface]:
try:
if self.has_member("kallsyms"):
return self.kallsyms.symtab
elif self.has_member("symtab"):
return self.symtab
except exceptions.InvalidAddressException:
vollog.debug(
f"Page fault encountered when accessing symtab of ELF at {self.vol.offset:#x} in {self.vol.layer_name}"
)
return None
raise AttributeError("Unable to get symtab")
@property
def num_symtab(self) -> Optional[int]:
try:
if self.has_member("kallsyms"):
return int(self.kallsyms.num_symtab)
elif self.has_member("num_symtab"):
return int(self.member("num_symtab"))
except exceptions.InvalidAddressException:
vollog.debug(
f"Page fault encountered when accessing num_symtab of ELF at {self.vol.offset:#x} in {self.vol.layer_name}"
)
return None
raise AttributeError("Unable to determine number of symbols")
@property
def section_strtab(self) -> Optional[interfaces.objects.ObjectInterface]:
try:
# Newer kernels
if self.has_member("kallsyms"):
return self.kallsyms.strtab
# Older kernels
elif self.has_member("strtab"):
return self.strtab
except exceptions.InvalidAddressException:
vollog.debug(
f"Page fault encountered when accessing strtab of ELF at {self.vol.offset:#x} in {self.vol.layer_name}"
)
return None
raise AttributeError("Unable to get strtab")
@property
def section_typetab(self) -> Optional[interfaces.objects.ObjectInterface]:
try:
if self.has_member("kallsyms") and self.kallsyms.has_member("typetab"):
# kernels >= 4.5 8244062ef1e54502ef55f54cced659913f244c3e: kallsyms was added
# kernels >= 5.2 1c7651f43777cdd59c1aaa82c87324d3e7438c7b: types have its own array
return self.kallsyms.typetab
except exceptions.InvalidAddressException:
vollog.debug(
f"Page fault encountered when accessing typetab of ELF at {self.vol.offset:#x} in {self.vol.layer_name}"
)
return None
raise AttributeError("Unable to get typetab section, it needs a kernel >= 5.2")
[docs]
def get_symbol_type(
self, symbol: interfaces.objects.ObjectInterface, symbol_index: int
) -> Optional[str]:
"""Determines the type of a given ELF symbol.
Args:
symbol: The ELF symbol object (elf_sym)
symbol_index: The index of the symbol within the type table
Returns:
A single-character string representing the symbol type
"""
try:
if self.has_member("kallsyms") and self.kallsyms.has_member("typetab"):
# kernels >= 5.2 1c7651f43777cdd59c1aaa82c87324d3e7438c7b types have its own array
layer = self._context.layers[self.vol.layer_name]
sym_type = layer.read(self.section_typetab + symbol_index, 1)
sym_type = sym_type.decode("utf-8", errors="ignore")
else:
# kernels < 5.2 the type was stored in the st_info
sym_type = chr(symbol.st_info)
except exceptions.InvalidAddressException:
vollog.debug(
f"Page fault encountered when accessing symbol type of index {symbol_index} of ELF at {self.vol.offset:#x} in {self.vol.layer_name}"
)
return None
return sym_type
[docs]
class task_struct(generic.GenericIntelProcess):
[docs]
def is_valid(self) -> bool:
layer = self._context.layers[self.vol.layer_name]
# Make sure the entire task content is readable
if not layer.is_valid(self.vol.offset, self.vol.size):
return False
if self.pid < 0 or self.tgid < 0:
return False
if self.has_member("signal") and not (
self.signal and self.signal.is_readable()
):
return False
if self.has_member("nsproxy") and not (
self.nsproxy and self.nsproxy.is_readable()
):
return False
if self.has_member("real_parent") and not (
self.real_parent and self.real_parent.is_readable()
):
return False
if (
self.has_member("active_mm")
and self.active_mm
and not self.active_mm.is_readable()
):
return False
if self.mm:
if not self.mm.is_readable():
return False
if self.mm != self.active_mm:
return False
return True
[docs]
@functools.lru_cache
def add_process_layer(
self, config_prefix: Optional[str] = None, preferred_name: Optional[str] = None
) -> Optional[str]:
"""Constructs a new layer based on the process's DTB.
Returns the name of the Layer or None.
"""
parent_layer = self._context.layers[self.vol.layer_name]
try:
pgd = self.mm.pgd
except exceptions.InvalidAddressException:
return None
if not isinstance(parent_layer, linear.LinearlyMappedLayer):
raise TypeError(
"Parent layer is not a translation layer, unable to construct process layer"
)
try:
dtb, layer_name = parent_layer.translate(pgd)
except exceptions.InvalidAddressException:
return None
if preferred_name is None:
preferred_name = self.vol.layer_name + f"_Process{self.pid}"
# Add the constructed layer and return the name
return self._add_process_layer(
self._context, dtb, config_prefix, preferred_name
)
[docs]
def get_address_space_layer(
self,
) -> Optional[interfaces.layers.TranslationLayerInterface]:
"""Returns the task layer for this task's address space."""
task_layer_name = (
self.vol.layer_name if self.is_kernel_thread else self.add_process_layer()
)
if not task_layer_name:
return None
return self._context.layers[task_layer_name]
[docs]
def get_process_memory_sections(
self, heap_only: bool = False
) -> Generator[Tuple[int, int], None, None]:
"""Returns a list of sections based on the memory manager's view of
this task's virtual memory."""
for vma in self.mm.get_vma_iter():
start = int(vma.vm_start)
end = int(vma.vm_end)
if heap_only and not (start <= self.mm.brk and end >= self.mm.start_brk):
continue
else:
# FIXME: Check if this actually needs to be printed out or not
vollog.info(
f"adding vma: {start:x} {self.mm.brk:x} | {end:x} {self.mm.start_brk:x}"
)
yield (start, end - start)
@property
def is_kernel_thread(self) -> bool:
"""Checks if this task is a kernel thread.
Returns:
bool: True, if this task is a kernel thread. Otherwise, False.
"""
return (self.flags & linux_constants.PF_KTHREAD) != 0
@property
def is_thread_group_leader(self) -> bool:
"""Checks if this task is a thread group leader.
Returns:
bool: True, if this task is a thread group leader. Otherwise, False.
"""
return self.tgid == self.pid
@property
def is_user_thread(self) -> bool:
"""Checks if this task is a user thread.
Returns:
bool: True, if this task is a user thread. Otherwise, False.
"""
return not self.is_kernel_thread and self.tgid != self.pid
def _get_tasks_iterable(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns the respective iterable to obtain the threads in this process"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
task_struct_symname = f"{vmlinux.symbol_table_name}{constants.BANG}task_struct"
if vmlinux.get_type("task_struct").has_member("signal") and vmlinux.get_type(
"signal_struct"
).has_member("thread_head"):
# kernels >= 6.7 - via signals
return self.signal.thread_head.to_list(task_struct_symname, "thread_node")
elif vmlinux.get_type("task_struct").has_member("thread_group"):
# kernels < 6.7 - via thread_group
return self.thread_group.to_list(task_struct_symname, "thread_group")
raise AttributeError("Unable to find the root dentry")
[docs]
def get_threads(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns each thread in this process"""
tasks_iterable = self._get_tasks_iterable()
threads_seen = set([self.vol.offset])
for task in tasks_iterable:
if not task.is_valid():
continue
if task.vol.offset not in threads_seen:
threads_seen.add(task.vol.offset)
yield task
@property
def is_being_ptraced(self) -> bool:
"""Returns True if this task is being traced using ptrace"""
return self.ptrace != 0
@property
def is_ptracing(self) -> bool:
"""Returns True if this task is tracing other tasks using ptrace"""
is_tracing = (
self.ptraced.next.is_readable()
and self.ptraced.next.dereference().vol.offset != self.ptraced.vol.offset
)
return is_tracing
[docs]
def get_ptrace_tracer_tid(self) -> Optional[int]:
"""Returns the tracer's TID tracing this task"""
return self.parent.pid if self.is_being_ptraced else None
[docs]
def get_ptrace_tracee_tids(self) -> List[int]:
"""Returns the list of TIDs being traced by this task"""
task_symbol_table_name = self.get_symbol_table_name()
task_struct_symname = f"{task_symbol_table_name}{constants.BANG}task_struct"
tracing_tid_list = [
task_being_traced.pid
for task_being_traced in self.ptraced.to_list(
task_struct_symname, "ptrace_entry"
)
]
return tracing_tid_list
[docs]
def get_ptrace_tracee_flags(self) -> Optional[str]:
"""Returns a string with the ptrace flags"""
return (
linux_constants.PT_FLAGS(self.ptrace).flags
if self.is_being_ptraced
else None
)
@property
def state(self):
if self.has_member("__state"):
return self.member("__state")
elif self.has_member("state"):
return self.member("state")
else:
raise AttributeError("Unsupported task_struct: Cannot find state")
def _get_task_start_time(self) -> datetime.timedelta:
"""Returns the task's monotonic start_time as a timedelta.
Returns:
The task's start time as a timedelta object.
"""
for member_name in ("start_boottime", "real_start_time", "start_time"):
if self.has_member(member_name):
start_time_obj = self.member(member_name)
start_time_obj_type = start_time_obj.vol.type_name
start_time_obj_type_name = start_time_obj_type.split(constants.BANG)[1]
if start_time_obj_type_name != "timespec":
# kernels >= 3.17 real_start_time and start_time are u64
# kernels >= 5.5 uses start_boottime which is also a u64
start_time = Timespec64Concrete.new_from_nsec(start_time_obj)
else:
# kernels < 3.17 real_start_time and start_time are timespec
start_time = Timespec64Concrete.new_from_timespec(start_time_obj)
# This is relative to the boot time so it makes sense to be a timedelta.
return start_time.to_timedelta()
raise AttributeError("Unsupported task_struct start_time member")
[docs]
def get_time_namespace(self) -> Optional[interfaces.objects.ObjectInterface]:
"""Returns the task's time namespace"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
if not self.has_member("nsproxy"):
# kernels < 2.6.19: ab516013ad9ca47f1d3a936fa81303bfbf734d52
return None
if not vmlinux.get_type("nsproxy").has_member("time_ns"):
# kernels < 5.6 769071ac9f20b6a447410c7eaa55d1a5233ef40c
return None
return self.nsproxy.time_ns
[docs]
def get_time_namespace_id(self) -> int:
"""Returns the task's time namespace ID."""
time_ns = self.get_time_namespace()
if not time_ns:
# kernels < 5.6
return None
# We are good. ns_common (ns) was introduced in kernels 3.19. So by the time the
# time namespace was added in kernels 5.6, it already included the ns member.
return time_ns.ns.inum
def _get_time_namespace_offsets(
self,
) -> Optional[interfaces.objects.ObjectInterface]:
"""Returns the time offsets from the task's time namespace."""
time_ns = self.get_time_namespace()
if not time_ns:
# kernels < 5.6
return None
if not time_ns.has_member("offsets"):
# kernels < 5.6 af993f58d69ee9c1f421dfc87c3ed231c113989c
return None
return time_ns.offsets
[docs]
def get_time_namespace_monotonic_offset(
self,
) -> Optional[interfaces.objects.ObjectInterface]:
"""Gets task's time namespace monotonic offset
Returns:
a kernel's timespec64 object with the monotonic offset
"""
time_namespace_offsets = self._get_time_namespace_offsets()
if not time_namespace_offsets:
return None
return time_namespace_offsets.monotonic
def _get_time_namespace_boottime_offset(
self,
) -> Optional[interfaces.objects.ObjectInterface]:
"""Gets task's time namespace boottime offset
Returns:
a kernel's timespec64 object with the boottime offset
"""
time_namespace_offsets = self._get_time_namespace_offsets()
if not time_namespace_offsets:
return None
return time_namespace_offsets.boottime
def _get_boottime_raw(self) -> "Timespec64Concrete":
"""Returns the boot time in a Timespec64Concrete object."""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
if vmlinux.has_symbol("tk_core"):
# kernels >= 3.17 | tk_core | 3fdb14fd1df70325e1e91e1203a699a4803ed741
tk_core = vmlinux.object_from_symbol("tk_core")
timekeeper = tk_core.timekeeper
if not timekeeper.offs_real.has_member("tv64"):
# kernels >= 4.10 - Tested on Ubuntu 6.8.0-41
boottime_nsec = timekeeper.offs_real - timekeeper.offs_boot
else:
# 3.17 <= kernels < 4.10 - Tested on Ubuntu 4.4.0-142
boottime_nsec = timekeeper.offs_real.tv64 - timekeeper.offs_boot.tv64
return Timespec64Concrete.new_from_nsec(boottime_nsec)
elif vmlinux.has_symbol("timekeeper") and vmlinux.get_type(
"timekeeper"
).has_member("wall_to_monotonic"):
# 3.4 <= kernels < 3.17 - Tested on Ubuntu 3.13.0-185
timekeeper = vmlinux.object_from_symbol("timekeeper")
# timekeeper.wall_to_monotonic is timespec
boottime = Timespec64Concrete.new_from_timespec(
timekeeper.wall_to_monotonic
)
boottime += timekeeper.total_sleep_time
return boottime.negate()
elif vmlinux.has_symbol("wall_to_monotonic"):
# kernels < 3.4 - Tested on Debian7 3.2.0-4 (3.2.57-3+deb7u2)
wall_to_monotonic = vmlinux.object_from_symbol("wall_to_monotonic")
boottime = Timespec64Concrete.new_from_timespec(wall_to_monotonic)
if vmlinux.has_symbol("total_sleep_time"):
# 2.6.23 <= kernels < 3.4 7c3f1a573237b90ef331267260358a0ec4ac9079
total_sleep_time = vmlinux.object_from_symbol("total_sleep_time")
full_type_name = total_sleep_time.vol.type_name
type_name = full_type_name.split(constants.BANG)[1]
if type_name == "timespec":
# kernels >= 2.6.32 total_sleep_time is a timespec
boottime += total_sleep_time
else:
# kernels < 2.6.32 total_sleep_time is an unsigned long as seconds
boottime.tv_sec += total_sleep_time
return boottime.negate()
raise exceptions.VolatilityException("Unsupported")
[docs]
def get_boottime(
self, root_time_namespace: bool = True
) -> Optional[datetime.datetime]:
"""Returns the boot time in UTC as a datetime.
Args:
root_time_namespace: If True, it returns the boot time as seen from the root
time namespace. Otherwise, it returns the boot time relative to the
task's time namespace.
Returns:
A datetime with the UTC boot time.
"""
boottime = self._get_boottime_raw()
if not boottime:
return None
if not root_time_namespace:
# Shift boot timestamp according to the task's time namespace offset
boottime_offset_timespec = self._get_time_namespace_boottime_offset()
if boottime_offset_timespec:
# Time namespace support is from kernels 5.6
boottime -= boottime_offset_timespec
return boottime.to_datetime()
[docs]
def get_create_time(self) -> Optional[datetime.datetime]:
"""Retrieves the task's start time from its time namespace.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
vmlinux_module_name: The name of the kernel module on which to operate
task: A reference task
Returns:
A datetime with task's start time
"""
# Typically, we want to see the creation time seen from the root time namespace
boottime = self.get_boottime(root_time_namespace=True)
# The kernel exports only tv_sec to procfs, see kernel's show_stat().
# This means user-space tools, like those in the procps package (e.g., ps, top, etc.),
# only use the boot time seconds to compute dates relatives to this.
if boottime is None:
return None
boottime = boottime.replace(microsecond=0)
task_start_time_timedelta = self._get_task_start_time()
# NOTE: Do NOT apply the task's time namespace offsets here. While the kernel uses
# timens_add_boottime_ns(), it's not needed here since we're seeing it from the
# root time namespace, not within the task's own time namespace
return boottime + task_start_time_timedelta
[docs]
def get_parent_pid(self) -> int:
"""Returns the parent process ID (PPID)
This method replicates the Linux kernel's `getppid` syscall behavior.
Avoid using `task.parent`; instead, use this function for accurate results.
"""
if self.real_parent and self.real_parent.is_readable():
ppid = self.real_parent.tgid
else:
ppid = 0
return ppid
[docs]
class fs_struct(objects.StructType):
[docs]
def get_root_dentry(self):
# < 2.6.26
if self.has_member("rootmnt"):
return self.root
elif self.root.has_member("dentry"):
return self.root.dentry
raise AttributeError("Unable to find the root dentry")
[docs]
def get_root_mnt(self):
# < 2.6.26
if self.has_member("rootmnt"):
return self.rootmnt
elif self.root.has_member("mnt"):
return self.root.mnt
raise AttributeError("Unable to find the root mount")
[docs]
class maple_tree(objects.StructType):
# include/linux/maple_tree.h
# Mask for Maple Tree Flags
MT_FLAGS_HEIGHT_MASK = 0x7C
MT_FLAGS_HEIGHT_OFFSET = 0x02
# Shift and mask to extract information from maple tree node pointers
MAPLE_NODE_TYPE_SHIFT = 0x03
MAPLE_NODE_TYPE_MASK = 0x0F
MAPLE_NODE_POINTER_MASK = 0xFF
# types of Maple Tree Nodes
MAPLE_DENSE = 0
MAPLE_LEAF_64 = 1
MAPLE_RANGE_64 = 2
MAPLE_ARANGE_64 = 3
[docs]
def get_slot_iter(self):
"""Parse the Maple Tree and return every non zero slot."""
maple_tree_offset = self.vol.offset & ~(self.MAPLE_NODE_POINTER_MASK)
expected_maple_tree_depth = (
self.ma_flags & self.MT_FLAGS_HEIGHT_MASK
) >> self.MT_FLAGS_HEIGHT_OFFSET
yield from self._parse_maple_tree_node(
self.ma_root, maple_tree_offset, expected_maple_tree_depth
)
def _parse_maple_tree_node(
self,
maple_tree_entry,
parent,
expected_maple_tree_depth,
seen=None,
current_depth=1,
) -> Optional[int]:
"""Recursively parse Maple Tree Nodes and yield all non empty slots"""
# Create seen set if it does not exist, e.g. on the first call into this recursive function. This
# must be None or an existing set of addresses for MTEs that have already been processed or that
# should otherwise be ignored. If parsing from the root node for example this should be None on the
# first call. If you needed to parse all nodes downwards from part of the tree this should still be
# None. If however you wanted to parse from a node, but ignore some parts of the tree below it then
# this could be populated with the addresses of the nodes you wish to ignore.
if seen is None:
seen = set()
# protect against unlikely loop
if maple_tree_entry in seen:
vollog.warning(
f"The mte {hex(maple_tree_entry)} has all ready been seen, no further results will be produced for this node."
)
return None
else:
seen.add(maple_tree_entry)
# check if we have exceeded the expected depth of this maple tree.
# e.g. when current_depth is larger than expected_maple_tree_depth there may be an issue.
# it is normal that expected_maple_tree_depth is equal to current_depth.
if expected_maple_tree_depth < current_depth:
vollog.warning(
f"The depth for the maple tree at {hex(self.vol.offset)} is {expected_maple_tree_depth}, however when parsing the nodes "
f"a depth of {current_depth} was reached. This is unexpected and may lead to incorrect results."
)
# parse the mte to extract the pointer value, node type, and leaf status
pointer = maple_tree_entry & ~(self.MAPLE_NODE_POINTER_MASK)
node_type = (
maple_tree_entry >> self.MAPLE_NODE_TYPE_SHIFT
) & self.MAPLE_NODE_TYPE_MASK
# create a pointer object for the node parent mte (note this will include flags in the low bits)
symbol_table_name = self.get_symbol_table_name()
node_parent_mte = self._context.object(
symbol_table_name + constants.BANG + "pointer",
layer_name=self.vol.native_layer_name,
offset=pointer,
)
# extract the actual pointer to the parent of this node
node_parent_pointer = node_parent_mte & ~(self.MAPLE_NODE_POINTER_MASK)
# verify that the node_parent_pointer correctly points to the parent
if node_parent_pointer != parent:
return None
# create a node object
node = self._context.object(
symbol_table_name + constants.BANG + "maple_node",
layer_name=self.vol.layer_name,
offset=pointer,
)
# parse the slots based on the node type
if node_type == self.MAPLE_DENSE:
for slot in node.alloc.slot:
if (slot & ~(self.MAPLE_NODE_TYPE_MASK)) != 0:
yield slot
elif node_type == self.MAPLE_LEAF_64:
for slot in node.mr64.slot:
if (slot & ~(self.MAPLE_NODE_TYPE_MASK)) != 0:
yield slot
elif node_type == self.MAPLE_RANGE_64:
for slot in node.mr64.slot:
if (slot & ~(self.MAPLE_NODE_TYPE_MASK)) != 0:
yield from self._parse_maple_tree_node(
slot,
pointer,
expected_maple_tree_depth,
seen,
current_depth + 1,
)
elif node_type == self.MAPLE_ARANGE_64:
for slot in node.ma64.slot:
if (slot & ~(self.MAPLE_NODE_TYPE_MASK)) != 0:
yield from self._parse_maple_tree_node(
slot,
pointer,
expected_maple_tree_depth,
seen,
current_depth + 1,
)
else:
# unknown maple node type
raise AttributeError(
f"Unknown Maple Tree node type {node_type} at offset {hex(pointer)}."
)
[docs]
class mm_struct(objects.StructType):
# TODO: As of version 3.0.0 this method should be removed
[docs]
def get_mmap_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""
Deprecated: Use either get_vma_iter() or _get_mmap_iter().
"""
vollog.warning(
"This method has been deprecated in favour of using the get_vma_iter() method."
)
yield from self.get_vma_iter()
def _get_mmap_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns an iterator for the mmap list member of an mm_struct. Use this only if
required, get_vma_iter() will choose the correct _get_maple_tree_iter() or
_get_mmap_iter() automatically as required.
Yields:
vm_area_struct objects
"""
if not self.has_member("mmap"):
raise AttributeError(
"_get_mmap_iter called on mm_struct where no mmap member exists."
)
vma_pointer = self.mmap
if not (vma_pointer and vma_pointer.is_readable()):
return None
vma_object = vma_pointer.dereference()
yield vma_object
seen = {vma_pointer}
vma_pointer = vma_pointer.vm_next
while vma_pointer and vma_pointer.is_readable() and vma_pointer not in seen:
vma_object = vma_pointer.dereference()
yield vma_object
seen.add(vma_pointer)
vma_pointer = vma_pointer.vm_next
# TODO: As of version 3.0.0 this method should be removed
[docs]
def get_maple_tree_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""
Deprecated: Use either get_vma_iter() or _get_maple_tree_iter().
"""
vollog.warning(
"This method has been deprecated in favour of using the get_vma_iter() method."
)
yield from self.get_vma_iter()
def _get_maple_tree_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns an iterator for the mm_mt member of an mm_struct. Use this only if
required, get_vma_iter() will choose the correct _get_maple_tree_iter() or
get_mmap_iter() automatically as required.
Yields:
vm_area_struct objects
"""
if not self.has_member("mm_mt"):
raise AttributeError(
"_get_maple_tree_iter called on mm_struct where no mm_mt member exists."
)
symbol_table_name = self.get_symbol_table_name()
for vma_pointer in self.mm_mt.get_slot_iter():
try:
vma_object = vma_pointer.dereference().cast(
symbol_table_name + constants.BANG + "vm_area_struct"
)
except exceptions.InvalidAddressException:
continue
# The slots will hold values related to their slot if they are invalid
# Before this check, this function was returning objects on the first page of memory...
if vma_object.vol.offset < 0x1000:
continue
yield vma_object
def _do_get_vma_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns an iterator for the VMAs in an mm_struct.
Automatically choosing the mmap or mm_mt as required.
Yields:
vm_area_struct objects
"""
if self.has_member("mmap"):
# kernels < 6.1
yield from self._get_mmap_iter()
elif self.has_member("mm_mt"):
# kernels >= 6.1 d4af56c5c7c6781ca6ca8075e2cf5bc119ed33d1
yield from self._get_maple_tree_iter()
else:
raise AttributeError("Unable to find mmap or mm_mt in mm_struct")
[docs]
def get_vma_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns an iterator for the VMAs in an mm_struct.
Automatically choosing the mmap or mm_mt as required.
Yields:
vm_area_struct objects
"""
for vma in self._do_get_vma_iter():
if not vma.is_valid():
vollog.debug(f"Skipping invalid vm_area_struct at {vma.vol.offset:#x}")
continue
yield vma
[docs]
class super_block(objects.StructType):
# include/linux/kdev_t.h
MINORBITS = 20
# Superblock flags
SB_RDONLY = 1 # Mount read-only
SB_NOSUID = 2 # Ignore suid and sgid bits
SB_NODEV = 4 # Disallow access to device special files
SB_NOEXEC = 8 # Disallow program execution
SB_SYNCHRONOUS = 16 # Writes are synced at once
SB_MANDLOCK = 64 # Allow mandatory locks on an FS
SB_DIRSYNC = 128 # Directory modifications are synchronous
SB_NOATIME = 1024 # Do not update access times
SB_NODIRATIME = 2048 # Do not update directory access times
SB_SILENT = 32768
SB_POSIXACL = 1 << 16 # VFS does not apply the umask
SB_KERNMOUNT = 1 << 22 # this is a kern_mount call
SB_I_VERSION = 1 << 23 # Update inode I_version field
SB_LAZYTIME = 1 << 25 # Update the on-disk [acm]times lazily
SB_OPTS = {
SB_SYNCHRONOUS: "sync",
SB_DIRSYNC: "dirsync",
SB_MANDLOCK: "mand",
SB_LAZYTIME: "lazytime",
}
@functools.cached_property
def major(self) -> int:
return self.s_dev >> self.MINORBITS
@functools.cached_property
def minor(self) -> int:
return self.s_dev & ((1 << self.MINORBITS) - 1)
@functools.cached_property
def uuid(self) -> str:
if not self.has_member("s_uuid"):
raise AttributeError(
"super_block struct does not support s_uuid direct attribute access, probably indicating a kernel version < 2.6.39-rc1."
)
if self.s_uuid.has_member("b"):
uuid_as_ints = self.s_uuid.b
else:
uuid_as_ints = self.s_uuid
return str(uuid.UUID(bytes=bytes(uuid_as_ints)))
[docs]
def get_flags_access(self) -> str:
return "ro" if self.s_flags & self.SB_RDONLY else "rw"
[docs]
def get_flags_opts(self) -> Iterable[str]:
sb_opts = [
self.SB_OPTS[sb_opt] for sb_opt in self.SB_OPTS if sb_opt & self.s_flags
]
return sb_opts
[docs]
def get_type(self) -> Optional[str]:
"""Gets the superblock filesystem type string"""
s_type_ptr = self.s_type
if not (s_type_ptr and s_type_ptr.is_readable()):
return None
s_type_name_ptr = s_type_ptr.name
if not (s_type_name_ptr and s_type_name_ptr.is_readable()):
return None
mnt_sb_type = utility.pointer_to_string(s_type_name_ptr, count=255)
s_subtype_ptr = self.s_subtype
if s_subtype_ptr and s_subtype_ptr.is_readable():
mnt_sb_subtype = utility.pointer_to_string(s_subtype_ptr, count=255)
mnt_sb_type += "." + mnt_sb_subtype
return mnt_sb_type
[docs]
class vm_area_struct(objects.StructType):
perm_flags = {
0x00000001: "r",
0x00000002: "w",
0x00000004: "x",
}
extended_flags = {
0x00000001: "VM_READ",
0x00000002: "VM_WRITE",
0x00000004: "VM_EXEC",
0x00000008: "VM_SHARED",
0x00000010: "VM_MAYREAD",
0x00000020: "VM_MAYWRITE",
0x00000040: "VM_MAYEXEC",
0x00000080: "VM_MAYSHARE",
0x00000100: "VM_GROWSDOWN",
0x00000200: "VM_NOHUGEPAGE",
0x00000400: "VM_PFNMAP",
0x00000800: "VM_DENYWRITE",
0x00001000: "VM_EXECUTABLE",
0x00002000: "VM_LOCKED",
0x00004000: "VM_IO",
0x00008000: "VM_SEQ_READ",
0x00010000: "VM_RAND_READ",
0x00020000: "VM_DONTCOPY",
0x00040000: "VM_DONTEXPAND",
0x00080000: "VM_RESERVED",
0x00100000: "VM_ACCOUNT",
0x00200000: "VM_NORESERVE",
0x00400000: "VM_HUGETLB",
0x00800000: "VM_NONLINEAR",
0x01000000: "VM_MAPPED_COP__VM_HUGEPAGE",
0x02000000: "VM_INSERTPAGE",
0x04000000: "VM_ALWAYSDUMP",
0x08000000: "VM_CAN_NONLINEAR",
0x10000000: "VM_MIXEDMAP",
0x20000000: "VM_SAO",
0x40000000: "VM_PFN_AT_MMAP",
0x80000000: "VM_MERGEABLE",
}
def _parse_flags(self, vm_flags, parse_flags) -> str:
"""Returns an string representation of the flags in a
vm_area_struct."""
retval = ""
for mask, char in parse_flags.items():
if (vm_flags & mask) == mask:
retval = retval + char
else:
retval = retval + "-"
return retval
[docs]
def is_valid(self) -> bool:
"""Validate a VMA struct to prevent processing smeared entries."""
try:
start = self.vm_start
end = self.vm_end
self.get_protection()
except exceptions.InvalidAddressException:
return False
layer = self._context.layers[self.vol.layer_name]
length = end - start
if (
(start > end)
or (start == 0 and length == 0)
or (length % layer.page_size != 0)
):
return False
if self.vm_file != 0:
try:
inode = self.vm_file.get_inode()
except exceptions.InvalidAddressException:
return False
# Verify that a file-backed VMA's page offset
# is not greater than the size of the file's inode.
# Check only inode sizes greater than 0 to account for
# special devices (e.g. "/dev/dri/card0") and prevent false negatives.
if inode.i_size > 0 and self.get_page_offset() > inode.i_size:
return False
return True
# only parse the rwx bits
[docs]
def get_protection(self) -> str:
return self._parse_flags(self.vm_flags & 0b1111, vm_area_struct.perm_flags)
# used by malfind
[docs]
def get_flags(self) -> str:
return self._parse_flags(self.vm_flags, self.extended_flags)
[docs]
def get_page_offset(self) -> int:
if self.vm_file == 0:
return 0
parent_layer = self._context.layers[self.vol.layer_name]
return self.vm_pgoff << parent_layer.page_shift
def _do_get_name(self, context, task) -> str:
if self.vm_file != 0:
fname = linux.LinuxUtilities.path_for_file(context, task, self.vm_file)
elif self.vm_start <= task.mm.start_brk and self.vm_end >= task.mm.brk:
fname = "[heap]"
elif self.vm_start <= task.mm.start_stack <= self.vm_end:
fname = "[stack]"
elif (
self.vm_mm.context.has_member("vdso")
and self.vm_start == self.vm_mm.context.vdso
):
fname = "[vdso]"
else:
fname = "Anonymous Mapping"
return fname
[docs]
def get_name(self, context, task) -> Optional[str]:
try:
return self._do_get_name(context, task)
except exceptions.InvalidAddressException:
return None
[docs]
def get_malicious_pages(self, proclayer) -> List[int]:
"""Identifies and returns a list of potentially malicious memory pages.
A page is considered malicious if it is:
- Executable (protection flags match 'r-x')
- Dirty (modified since process start, according to proclayer.is_dirty())
Args:
proclayer: The process's memory layer
Returns:
List[int]: A list of virtual addresses for pages flagged as potentially malicious.
"""
malicious_pages = []
flags_str = self.get_protection()
if (
proclayer
and "r-x" in flags_str
and self.vm_file.dereference().vol.offset != 0
):
for i in range(self.vm_start, self.vm_end, proclayer.page_size):
try:
if proclayer.is_dirty(i):
vollog.debug(f"Found malicious (dirty+exec) page at {hex(i)} !")
malicious_pages.append(i)
except (
exceptions.PagedInvalidAddressException,
exceptions.InvalidAddressException,
) as excp:
vollog.debug(f"Unable to translate address {hex(i)} : {excp}")
# Abort as it is likely that other addresses in the same range will also fail
break
return malicious_pages
# previously used by malfind
[docs]
def is_suspicious(self, proclayer=None):
ret = False
flags_str = self.get_protection()
if flags_str == "rwx":
ret = True
elif flags_str == "r-x" and self.vm_file.dereference().vol.offset == 0:
ret = True
elif proclayer and "x" in flags_str:
for i in range(self.vm_start, self.vm_end, proclayer.page_size):
try:
if proclayer.is_dirty(i):
vollog.warning(
f"Found malicious page(s) inside (dirty+exec) region {hex(self.vm_start)} !"
)
# We do not attempt to find other dirty+exec pages once we have found one
ret = True
break
except (
exceptions.PagedInvalidAddressException,
exceptions.InvalidAddressException,
) as excp:
vollog.debug(f"Unable to translate address {hex(i)} : {excp}")
# Abort as it is likely that other addresses in the same range will also fail
ret = False
break
return ret
[docs]
class qstr(objects.StructType):
[docs]
def name_as_str(self) -> str:
if self.has_member("len"):
str_length = self.len + 1 # Maximum length should include null terminator
else:
str_length = 255
try:
ret = utility.pointer_to_string(self.name, str_length)
except (exceptions.InvalidAddressException, ValueError):
ret = ""
return ret
[docs]
class dentry(objects.StructType):
[docs]
def path(self) -> str:
"""Based on __dentry_path Linux kernel function"""
reversed_path = []
dentry_seen = set()
current_dentry = self
while (
not current_dentry.is_root()
and current_dentry.vol.offset not in dentry_seen
):
parent = current_dentry.d_parent
reversed_path.append(current_dentry.d_name.name_as_str())
dentry_seen.add(current_dentry.vol.offset)
current_dentry = parent
return "/" + "/".join(reversed(reversed_path))
[docs]
def is_root(self) -> bool:
return self.vol.offset == self.d_parent
[docs]
def is_subdir(self, old_dentry):
"""Is this dentry a subdirectory of old_dentry?
Returns true if this dentry is a subdirectory of the parent (at any depth).
Otherwise, it returns false.
"""
if self.vol.offset == old_dentry:
return True
return self.d_ancestor(old_dentry)
[docs]
def d_ancestor(self, ancestor_dentry):
"""Search for an ancestor
Returns the ancestor dentry which is a child of "ancestor_dentry",
if "ancestor_dentry" is an ancestor of "child_dentry", else None.
"""
dentry_seen = set()
current_dentry = self
while (
not current_dentry.is_root()
and current_dentry.vol.offset not in dentry_seen
):
if current_dentry.d_parent == ancestor_dentry.vol.offset:
return current_dentry
dentry_seen.add(current_dentry.vol.offset)
current_dentry = current_dentry.d_parent
return None
[docs]
def get_subdirs(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Walks dentry subdirs
Yields:
A dentry object
"""
if self.has_member("d_sib") and self.has_member("d_children"):
# kernels >= 6.8
walk_member = "d_sib"
list_head_member = self.d_children
elif self.has_member("d_child") and self.has_member("d_subdirs"):
# 3.19.0 <= kernels < 6.8
walk_member = "d_child"
list_head_member = self.d_subdirs
elif self.has_member("d_u") and self.has_member("d_subdirs"):
# kernels < 3.19
# Actually, 'd_u.d_child' but to_list() doesn't support something like that.
# Since, it's an union, everything is at the same offset than 'd_u'.
walk_member = "d_u"
list_head_member = self.d_subdirs
else:
raise exceptions.VolatilityException("Unsupported dentry type")
dentry_type_name = self.get_symbol_table_name() + constants.BANG + "dentry"
yield from list_head_member.to_list(dentry_type_name, walk_member)
[docs]
def get_inode(self) -> Optional[interfaces.objects.ObjectInterface]:
"""Returns the inode associated with this dentry"""
inode_ptr = self.d_inode
if not (inode_ptr and inode_ptr.is_readable() and inode_ptr.is_valid()):
return None
return inode_ptr.dereference()
[docs]
class struct_file(objects.StructType):
[docs]
def get_dentry(self) -> interfaces.objects.ObjectInterface:
"""Returns a pointer to the dentry associated with this file"""
if self.has_member("f_path"):
return self.f_path.dentry
raise AttributeError("Unable to find file -> dentry")
[docs]
def get_vfsmnt(self) -> interfaces.objects.ObjectInterface:
"""Returns the fs (vfsmount) where this file is mounted"""
if self.has_member("f_path"):
return self.f_path.mnt
raise AttributeError("Unable to find file -> vfs mount")
[docs]
def get_inode(self) -> Optional[interfaces.objects.ObjectInterface]:
"""Returns an inode associated with this file"""
inode_ptr = None
if self.has_member("f_inode") and self.f_inode and self.f_inode.is_readable():
# Try first the cached value, kernels +3.9
inode_ptr = self.f_inode
if not (inode_ptr and inode_ptr.is_readable() and inode_ptr.is_valid()):
dentry_ptr = self.get_dentry()
if not (dentry_ptr and dentry_ptr.is_readable()):
return None
inode_ptr = dentry_ptr.d_inode
if not (inode_ptr and inode_ptr.is_readable() and inode_ptr.is_valid()):
return None
return inode_ptr.dereference()
[docs]
class list_head(objects.StructType, collections.abc.Iterable):
[docs]
def to_list(
self,
symbol_type: str,
member: str,
forward: bool = True,
sentinel: bool = True,
layer: Optional[str] = None,
) -> Iterator[interfaces.objects.ObjectInterface]:
"""Returns an iterator of the entries in the list.
Args:
symbol_type: Type of the list elements
member: Name of the list_head member in the list elements
forward: Set false to go backwards
sentinel: Whether self is a "sentinel node", meaning it is not embedded in a member of the list
Sentinel nodes are NOT yielded. See https://en.wikipedia.org/wiki/Sentinel_node for further reference
layer: Name of layer to read from
Yields:
Objects of the type specified via the "symbol_type" argument.
"""
layer_name = layer or self.vol.layer_name
trans_layer = self._context.layers[layer_name]
if not trans_layer.is_valid(self.vol.offset):
return None
relative_offset = self._context.symbol_space.get_type(
symbol_type
).relative_child_offset(member)
direction = "next" if forward else "prev"
link_ptr = getattr(self, direction)
if not (link_ptr and link_ptr.is_readable()):
return None
link = link_ptr.dereference()
if not sentinel:
obj_offset = self.vol.offset - relative_offset
if not trans_layer.is_valid(obj_offset):
return None
yield self._context.object(symbol_type, layer_name, offset=obj_offset)
seen = {self.vol.offset}
while link.vol.offset not in seen:
obj_offset = link.vol.offset - relative_offset
if not trans_layer.is_valid(obj_offset):
return None
yield self._context.object(symbol_type, layer_name, offset=obj_offset)
seen.add(link.vol.offset)
link_ptr = getattr(link, direction)
if not (link_ptr and link_ptr.is_readable()):
break
link = link_ptr.dereference()
def __iter__(self) -> Iterator[interfaces.objects.ObjectInterface]:
return self.to_list(self.vol.parent.vol.type_name, self.vol.member_name)
[docs]
class hlist_head(objects.StructType):
[docs]
def to_list(
self,
symbol_type: str,
member: str,
) -> Iterator[interfaces.objects.ObjectInterface]:
"""Returns an iterator of the entries in the list.
This is a doubly linked list; however, it is not circular, so the 'forward' field
doesn't make sense. Also, the sentinel concept doesn't make sense here either;
unlike list_head, the head and nodes each have their own distinct types. A list_head
cannot be a node by itself.
- The 'pprev' of the first 'hlist_node' points to the 'hlist_head', not to the last node.
- The last element 'next' member is NULL
Args:
symbol_type: Type of the list elements
member: Name of the list_head member in the list elements
Yields:
Objects of the type specified via the "symbol_type" argument.
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
current = self.first
while current and current.is_readable():
yield linux.LinuxUtilities.container_of(
current, symbol_type, member, vmlinux
)
current = current.next
[docs]
class files_struct(objects.StructType):
[docs]
def get_fds(self) -> interfaces.objects.ObjectInterface:
if self.has_member("fdt"):
return self.fdt.fd.dereference()
elif self.has_member("fd"):
return self.fd.dereference()
else:
raise AttributeError("Unable to find files -> file descriptors")
[docs]
def get_max_fds(self) -> interfaces.objects.ObjectInterface:
if self.has_member("fdt"):
return self.fdt.max_fds
elif self.has_member("max_fds"):
return self.max_fds
else:
raise AttributeError("Unable to find files -> maximum file descriptors")
[docs]
class mount(objects.StructType):
MNT_NOSUID = 0x01
MNT_NODEV = 0x02
MNT_NOEXEC = 0x04
MNT_NOATIME = 0x08
MNT_NODIRATIME = 0x10
MNT_RELATIME = 0x20
MNT_READONLY = 0x40
MNT_SHRINKABLE = 0x100
MNT_WRITE_HOLD = 0x200
MNT_SHARED = 0x1000
MNT_UNBINDABLE = 0x2000
MNT_FLAGS = {
MNT_NOSUID: "nosuid",
MNT_NODEV: "nodev",
MNT_NOEXEC: "noexec",
MNT_NOATIME: "noatime",
MNT_NODIRATIME: "nodiratime",
MNT_RELATIME: "relatime",
}
[docs]
def get_mnt_sb(self) -> int:
"""Returns a pointer to the super_block"""
if self.has_member("mnt"):
return self.mnt.mnt_sb
elif self.has_member("mnt_sb"):
return self.mnt_sb
else:
raise AttributeError("Unable to find mount -> super block")
[docs]
def get_mnt_root(self):
if self.has_member("mnt"):
return self.mnt.mnt_root
elif self.has_member("mnt_root"):
return self.mnt_root
else:
raise AttributeError("Unable to find mount -> mount root")
[docs]
def get_mnt_flags(self):
if self.has_member("mnt"):
return self.mnt.mnt_flags
elif self.has_member("mnt_flags"):
return self.mnt_flags
else:
raise AttributeError("Unable to find mount -> mount flags")
[docs]
def get_mnt_parent(self):
"""Gets the fs where we are mounted on
Returns:
A mount pointer
"""
return self.mnt_parent
[docs]
def get_mnt_mountpoint(self):
"""Gets the dentry of the mountpoint
Returns:
A dentry pointer
"""
return self.mnt_mountpoint
[docs]
def get_parent_mount(self):
return self.mnt.get_parent_mount()
[docs]
def has_parent(self) -> bool:
"""Checks if this mount has a parent
Returns:
bool: 'True' if this mount has a parent
"""
return self.mnt_parent != self.vol.offset
[docs]
def get_vfsmnt_current(self):
"""Returns the fs where we are mounted on
Returns:
A 'vfsmount'
"""
return self.mnt
[docs]
def get_vfsmnt_parent(self):
"""Gets the parent fs (vfsmount) to where it's mounted on
Returns:
A 'vfsmount'
"""
return self.get_mnt_parent().get_vfsmnt_current()
[docs]
def get_dentry_current(self):
"""Returns the root of the mounted tree
Returns:
A dentry pointer
"""
vfsmnt = self.get_vfsmnt_current()
dentry_pointer = vfsmnt.mnt_root
return dentry_pointer
[docs]
def get_dentry_parent(self):
"""Returns the parent root of the mounted tree
Returns:
A dentry pointer
"""
return self.get_mnt_parent().get_dentry_current()
[docs]
def get_flags_access(self) -> str:
return "ro" if self.get_mnt_flags() & self.MNT_READONLY else "rw"
[docs]
def get_flags_opts(self) -> Iterable[str]:
flags = [
self.MNT_FLAGS[mntflag]
for mntflag in self.MNT_FLAGS
if mntflag & self.get_mnt_flags()
]
return flags
[docs]
def is_shared(self) -> bool:
return self.get_mnt_flags() & self.MNT_SHARED
[docs]
def is_unbindable(self) -> bool:
return self.get_mnt_flags() & self.MNT_UNBINDABLE
[docs]
def is_slave(self) -> bool:
return self.mnt_master and self.mnt_master.vol.offset != 0
[docs]
def get_devname(self) -> str:
return utility.pointer_to_string(self.mnt_devname, count=255)
[docs]
def get_dominating_id(self, root) -> int:
"""Get ID of closest dominating peer group having a representative under the given root."""
mnt_seen = set()
current_mnt = self.mnt_master
while (
current_mnt
and current_mnt.vol.offset != 0
and current_mnt.vol.offset not in mnt_seen
):
peer = current_mnt.get_peer_under_root(self.mnt_ns, root)
if peer and peer.vol.offset != 0:
return peer.mnt_group_id
mnt_seen.add(current_mnt.vol.offset)
current_mnt = current_mnt.mnt_master
return 0
[docs]
def get_peer_under_root(self, ns, root):
"""Return true if path is reachable from root.
It mimics the kernel function is_path_reachable(), ref: fs/namespace.c
"""
mnt_seen = set()
current_mnt = self
while current_mnt.vol.offset not in mnt_seen:
if current_mnt.mnt_ns == ns and current_mnt.is_path_reachable(
current_mnt.mnt.mnt_root, root
):
return current_mnt
mnt_seen.add(current_mnt.vol.offset)
current_mnt = current_mnt.next_peer()
if current_mnt.vol.offset == self.vol.offset:
break
return None
[docs]
def is_path_reachable(self, current_dentry, root):
"""Return true if path is reachable.
It mimics the kernel function with same name, ref fs/namespace.c:
"""
mnt_seen = set()
current_mnt = self
while (
current_mnt.mnt.vol.offset != root.mnt
and current_mnt.has_parent()
and current_mnt.vol.offset not in mnt_seen
):
current_dentry = current_mnt.mnt_mountpoint
mnt_seen.add(current_mnt.vol.offset)
current_mnt = current_mnt.mnt_parent
return current_mnt.mnt.vol.offset == root.mnt and current_dentry.is_subdir(
root.dentry
)
[docs]
def next_peer(self):
table_name = self.vol.type_name.split(constants.BANG)[0]
mount_struct = f"{table_name}{constants.BANG}mount"
offset = self._context.symbol_space.get_type(
mount_struct
).relative_child_offset("mnt_share")
return self._context.object(
mount_struct,
self.vol.layer_name,
offset=self.mnt_share.next.vol.offset - offset,
)
[docs]
class vfsmount(objects.StructType):
[docs]
def is_valid(self):
return (
self.get_mnt_sb() != 0
and self.get_mnt_root() != 0
and self.get_mnt_parent() != 0
)
def _is_kernel_prior_to_struct_mount(self) -> bool:
"""Helper to distinguish between kernels prior to version 3.3 which lacked the
'mount' struct, versus later versions that include it.
See 7d6fec45a5131918b51dcd76da52f2ec86a85be6.
# Following that commit, also in kernel version 3.3 (3376f34fff5be9954fd9a9c4fd68f4a0a36d480e),
# the 'mnt_parent' member was relocated from the 'vfsmount' struct to the newly
# introduced 'mount' struct.
Returns:
'True' if the kernel lacks the 'mount' struct, typically indicating kernel < 3.3.
"""
return self.has_member("mnt_parent")
[docs]
def is_equal(self, vfsmount_ptr) -> bool:
"""Helper to make sure it is comparing two pointers to 'vfsmount'.
Depending on the kernel version, see 3376f34fff5be9954fd9a9c4fd68f4a0a36d480e,
the calling object (self) could be a 'vfsmount \\*' (<3.3) or a 'vfsmount' (>=3.3).
This way we trust in the framework "auto" dereferencing ability to assure that
when we reach this point 'self' will be a 'vfsmount' already and self.vol.offset
a 'vfsmount \\*' and not a 'vfsmount \\*\\*'. The argument must be a 'vfsmount \\*'.
Typically, it's called from do_get_path().
Args:
vfsmount_ptr: A pointer to a 'vfsmount'
Raises:
exceptions.VolatilityException: If vfsmount_ptr is not a 'vfsmount \\*'
Returns:
'True' if the given argument points to the same 'vfsmount' as 'self'.
"""
if isinstance(vfsmount_ptr, objects.Pointer):
return self.vol.offset == vfsmount_ptr
else:
raise exceptions.VolatilityException(
"Unexpected argument type. It has to be a 'vfsmount *'"
)
def _get_real_mnt(self) -> interfaces.objects.ObjectInterface:
"""Gets the struct 'mount' containing this 'vfsmount'.
It should be only called from kernels >= 3.3 when 'struct mount' was introduced.
See 7d6fec45a5131918b51dcd76da52f2ec86a85be6
Returns:
The 'mount' object containing this 'vfsmount'.
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
return linux.LinuxUtilities.container_of(
self.vol.offset, "mount", "mnt", vmlinux
)
[docs]
def get_vfsmnt_current(self):
"""Returns the current fs where we are mounted on
Returns:
A vfsmount pointer
"""
return self.get_mnt_parent()
[docs]
def get_vfsmnt_parent(self):
"""Gets the parent fs (vfsmount) to where it's mounted on
Returns:
For kernels < 3.3: A vfsmount pointer
For kernels >= 3.3: A vfsmount object
"""
if self._is_kernel_prior_to_struct_mount():
return self.get_mnt_parent()
else:
return self._get_real_mnt().get_vfsmnt_parent()
[docs]
def get_dentry_current(self):
"""Returns the root of the mounted tree
Returns:
A dentry pointer
"""
if self._is_kernel_prior_to_struct_mount():
return self.get_mnt_mountpoint()
else:
return self._get_real_mnt().get_dentry_current()
[docs]
def get_dentry_parent(self):
"""Returns the parent root of the mounted tree
Returns:
A dentry pointer
"""
if self._is_kernel_prior_to_struct_mount():
return self.get_mnt_mountpoint()
else:
return self._get_real_mnt().get_mnt_mountpoint()
[docs]
def get_mnt_parent(self):
"""Gets the mnt_parent member.
Returns:
For kernels < 3.3: A vfsmount pointer
For kernels >= 3.3: A mount pointer
"""
if self._is_kernel_prior_to_struct_mount():
return self.mnt_parent
else:
return self._get_real_mnt().get_mnt_parent()
[docs]
def get_mnt_mountpoint(self):
"""Gets the dentry of the mountpoint
Returns:
A dentry pointer
"""
if self.has_member("mnt_mountpoint"):
return self.mnt_mountpoint
else:
return self._get_real_mnt().mnt_mountpoint
[docs]
def get_mnt_root(self):
return self.mnt_root
[docs]
def has_parent(self) -> bool:
if self._is_kernel_prior_to_struct_mount():
return self.mnt_parent != self.vol.offset
else:
return self._get_real_mnt().has_parent()
[docs]
def get_mnt_sb(self):
"""Returns a pointer to the super_block"""
return self.mnt_sb
[docs]
def get_flags_access(self) -> str:
return "ro" if self.mnt_flags & mount.MNT_READONLY else "rw"
[docs]
def get_flags_opts(self) -> Iterable[str]:
flags = [
mntflagtxt
for mntflag, mntflagtxt in mount.MNT_FLAGS.items()
if mntflag & self.mnt_flags != 0
]
return flags
[docs]
def get_mnt_flags(self):
return self.mnt_flags
[docs]
def is_shared(self) -> bool:
return self.get_mnt_flags() & mount.MNT_SHARED
[docs]
def is_unbindable(self) -> bool:
return self.get_mnt_flags() & mount.MNT_UNBINDABLE
[docs]
def is_slave(self) -> bool:
return self.mnt_master and self.mnt_master.vol.offset != 0
[docs]
def get_devname(self) -> str:
return utility.pointer_to_string(self.mnt_devname, count=255)
[docs]
class kobject(objects.StructType):
[docs]
def reference_count(self):
refcnt = self.kref.refcount
if refcnt.has_member("counter"):
ret = refcnt.counter
else:
ret = refcnt.refs.counter
return ret
[docs]
class mnt_namespace(objects.StructType):
[docs]
def get_inode(self):
if self.has_member("proc_inum"):
# 98f842e675f96ffac96e6c50315790912b2812be 3.8 <= kernels < 3.19
return self.proc_inum
elif self.has_member("ns") and self.ns.has_member("inum"):
# kernels >= 3.19 435d5f4bb2ccba3b791d9ef61d2590e30b8e806e
return self.ns.inum
else:
raise AttributeError("Unable to find mnt_namespace inode")
[docs]
def get_mount_points(
self,
) -> Iterator[Optional[interfaces.objects.ObjectInterface]]:
"""Yields the mount points for this mount namespace.
Yields:
mount struct instances
"""
table_name = self.vol.type_name.split(constants.BANG)[0]
if self.has_member("list"):
# kernels < 6.8
mnt_type = table_name + constants.BANG + "mount"
if not self._context.symbol_space.has_type(mnt_type):
# In kernels < 3.3, the 'mount' struct didn't exist, and the 'mnt_list'
# member was part of the 'vfsmount' struct.
mnt_type = table_name + constants.BANG + "vfsmount"
yield from self.list.to_list(mnt_type, "mnt_list")
elif (
self.has_member("mounts")
and self.mounts.vol.type_name == table_name + constants.BANG + "rb_root"
):
# kernels >= 6.8
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(
self._context, self
)
for node in self.mounts.get_nodes():
# See kernel's node_to_mount()
mnt = linux.LinuxUtilities.container_of(
node, "mount", "mnt_node", vmlinux
)
yield mnt
else:
raise exceptions.VolatilityException(
"Unsupported kernel mount namespace implementation"
)
[docs]
class bpf_prog(objects.StructType):
_BPF_PROG_CHUNK_SHIFT = 6
_BPF_PROG_CHUNK_SIZE = 1 << _BPF_PROG_CHUNK_SHIFT
_BPF_PROG_CHUNK_MASK = ~(_BPF_PROG_CHUNK_SIZE - 1)
[docs]
def get_type(self) -> Union[str, None]:
"""Returns a string with the eBPF program type"""
# The program type was in `bpf_prog_aux::prog_type` from 3.18.140 to
# 4.1.52 before it was moved to `bpf_prog::type`
if self.has_member("type"):
# kernel >= 4.1.52
return self.type.description
if self.has_member("aux") and self.aux:
if self.aux.has_member("prog_type"):
# 3.18.140 <= kernel < 4.1.52
return self.aux.prog_type.description
# kernel < 3.18.140
return None
[docs]
def get_tag(self) -> Union[str, None]:
"""Returns a string with the eBPF program tag"""
# 'tag' was added in kernels 4.10
if not self.has_member("tag"):
return None
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
prog_tag_addr = self.tag.vol.offset
prog_tag_size = self.tag.count
if not vmlinux_layer.is_valid(prog_tag_addr, prog_tag_size):
vollog.debug("Unable to read bpf tag string from 0x%x", prog_tag_addr)
return None
prog_tag_bytes = vmlinux_layer.read(prog_tag_addr, prog_tag_size)
prog_tag = binascii.hexlify(prog_tag_bytes).decode()
return prog_tag
[docs]
def get_name(self) -> Union[str, None]:
"""Returns a string with the eBPF program name"""
if not self.has_member("aux"):
# 'prog_aux' was added in kernels 3.18
return None
try:
return self.aux.get_name()
except exceptions.InvalidAddressException:
return None
[docs]
def bpf_jit_binary_hdr_address(self) -> int:
"""Return the jitted BPF program start address
Based on bpf_jit_binary_hdr()
Returns:
The BPF program address
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
# In 5.18 (33c9805860e584b194199cab1a1e81f4e6395408) <= kernels < 6.0 (1d5f82d9dd477d5c66e0214a68c3e4f308eadd6d)
# 'bpf_prog_aux' has a 'use_bpf_prog_pack' member
bpf_prog_aux_has_use_bpf_prog_pack = vmlinux.get_type(
"bpf_prog_aux"
).has_member("use_bpf_prog_pack")
if bpf_prog_aux_has_use_bpf_prog_pack and self.aux.use_bpf_prog_pack:
long_mask = (1 << vmlinux_layer.bits_per_register) - 1
addr_mask = self._BPF_PROG_CHUNK_MASK & long_mask
else:
addr_mask = vmlinux_layer.page_mask
real_start = self.bpf_func
return real_start & addr_mask
[docs]
def get_address_region(self) -> Tuple[int, int]:
"""Returns the start and end memory addresses of the BPF program.
Based on bpf_get_prog_addr_region()
Returns:
A tuple with the addresses representing the memory range (start, end) of the BPF program.
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
# Based on bpf_get_prog_addr_region()
bpf_start_address = self.bpf_jit_binary_hdr_address()
if vmlinux.has_type("bpf_binary_header"):
# kernels >= 3.11 314beb9bcabfd6b4542ccbced2402af2c6f6142a
bpf_binary_header = vmlinux.object(
object_type="bpf_binary_header", offset=bpf_start_address, absolute=True
)
pages = bpf_binary_header.pages
else:
# kernels < 3.11 The first member is always the size
pages = vmlinux.object(
object_type="unsigned int", offset=bpf_start_address, absolute=True
)
bpf_end_address = bpf_start_address + pages * vmlinux_layer.page_size
return bpf_start_address, bpf_end_address
[docs]
class bpf_prog_aux(objects.StructType):
[docs]
def get_name(self) -> Union[str, None]:
"""Returns a string with the eBPF program name"""
if not self.has_member("name"):
# 'name' was added in kernels 4.15
return None
try:
if not self.name:
return None
return utility.array_to_string(self.name)
except exceptions.InvalidAddressException:
return None
[docs]
class cred(objects.StructType):
# struct cred was added in kernels 2.6.29
def _get_cred_int_value(self, member: str) -> int:
"""Helper to obtain the right cred member value for the current kernel.
Args:
member (str): The requested cred member name to obtain its value
Raises:
AttributeError: When the requested cred member doesn't exist
AttributeError: When the cred implementation is not supported.
Returns:
int: The cred member value
"""
if not self.has_member(member):
raise AttributeError(f"struct cred doesn't have a '{member}' member")
cred_val = self.member(member)
if hasattr(cred_val, "val"):
# From kernels 3.5.7 on it is a 'kuid_t' type
value = cred_val.val
elif isinstance(cred_val, objects.Integer):
# From at least 2.6.30 and until 3.5.7 it was a 'uid_t' type which was an 'unsigned int'
value = cred_val
else:
raise AttributeError("Kernel struct cred is not supported")
return int(value)
@property
def uid(self) -> int:
"""Returns the real user ID
Returns:
The real user ID value
"""
return self._get_cred_int_value("uid")
@property
def gid(self) -> int:
"""Returns the real user ID
Returns:
The real user ID value
"""
return self._get_cred_int_value("gid")
@property
def euid(self) -> int:
"""Returns the effective user ID
Returns:
The effective user ID value
"""
return self._get_cred_int_value("euid")
@property
def egid(self) -> int:
"""Returns the effective group ID
Returns:
int: the effective user ID value
"""
return self._get_cred_int_value("egid")
[docs]
class kernel_cap_struct(objects.StructType):
# struct kernel_cap_struct exists from 2.1.92 <= kernels < 6.3
[docs]
@classmethod
def get_last_cap_value(cls) -> int:
"""Returns the latest capability ID supported by the framework.
Returns:
int: The latest capability ID supported by the framework.
"""
return len(linux_constants.CAPABILITIES) - 1
[docs]
def get_kernel_cap_full(self) -> int:
"""Return the maximum value allowed for this kernel for a capability
Returns:
int: The capability full bitfield mask
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
try:
cap_last_cap = vmlinux.object_from_symbol(symbol_name="cap_last_cap")
except exceptions.SymbolError:
# It should be a kernel < 3.2, let's use our list of capabilities
cap_last_cap = self.get_last_cap_value()
return (1 << cap_last_cap + 1) - 1
[docs]
@classmethod
def capabilities_to_string(cls, capabilities_bitfield: int) -> List[str]:
"""Translates a capability bitfield to a list of capability strings.
Args:
capabilities_bitfield (int): The capability bitfield value.
Returns:
List[str]: A list of capability strings.
"""
capabilities = []
for bit, name in enumerate(linux_constants.CAPABILITIES):
if capabilities_bitfield & (1 << bit) != 0:
capabilities.append(name)
return capabilities
[docs]
def get_capabilities(self) -> int:
"""Returns the capability bitfield value
Returns:
int: The capability bitfield value.
"""
if not self.has_member("cap"):
raise exceptions.VolatilityException(
"Unsupported kernel capabilities implementation"
)
if isinstance(self.cap, objects.Array):
if len(self.cap) == 1:
# At least in the vanilla kernel, from 2.6.24 to 2.6.25
# kernel_cap_struct::cap become a two elements array.
# However, in some distros or custom kernel can technically
# be _KERNEL_CAPABILITY_U32S = _LINUX_CAPABILITY_U32S_1
# Leaving this code here for the sake of ensuring completeness.
cap_value = self.cap[0]
elif len(self.cap) == 2:
# In 2.6.25.x <= kernels < 6.3 kernel_cap_struct::cap is a two
# elements __u32 array that constitutes a 64bit bitfield.
cap_value = (self.cap[1] << 32) | self.cap[0]
else:
raise exceptions.VolatilityException(
"Unsupported kernel capabilities implementation"
)
else:
# In kernels < 2.6.25.x kernel_cap_struct::cap is a __u32
cap_value = self.cap
return cap_value & self.get_kernel_cap_full()
[docs]
def enumerate_capabilities(self) -> List[str]:
"""Returns the list of capability strings.
Returns:
List[str]: The list of capability strings.
"""
capabilities_value = self.get_capabilities()
return self.capabilities_to_string(capabilities_value)
[docs]
def has_capability(self, capability: str) -> bool:
"""Checks if the given capability string is enabled.
Args:
capability (str): A string representing the capability i.e. dac_read_search
Raises:
AttributeError: If the given capability is unknown to the framework.
Returns:
bool: "True" if the given capability is enabled.
"""
if capability not in linux_constants.CAPABILITIES:
raise AttributeError(f"Unknown capability with name '{capability}'")
cap_value = 1 << linux_constants.CAPABILITIES.index(capability)
return cap_value & self.get_capabilities() != 0
[docs]
class kernel_cap_t(kernel_cap_struct):
# In kernels 6.3 kernel_cap_struct became the kernel_cap_t typedef
[docs]
def get_capabilities(self) -> int:
"""Returns the capability bitfield value
Returns:
int: The capability bitfield value.
"""
if self.has_member("val"):
# In kernels >= 6.3 kernel_cap_t::val is a u64
cap_value = self.val
else:
raise exceptions.VolatilityException(
"Unsupported kernel capabilities implementation"
)
return cap_value & self.get_kernel_cap_full()
[docs]
class Timespec64Abstract(abc.ABC):
"""Abstract class to handle all required timespec64 operations, conversions and
adjustments."""
[docs]
@classmethod
def new_from_timespec(cls, other) -> "Timespec64Concrete":
"""Creates a new instance from an Timespec64Abstract subclass object"""
if not isinstance(other, Timespec64Abstract):
raise TypeError("Requires an object subclass of Timespec64Abstract")
tv_sec = int(other.tv_sec)
tv_nsec = int(other.tv_nsec)
return Timespec64Concrete(tv_sec=tv_sec, tv_nsec=tv_nsec)
[docs]
@classmethod
def new_from_nsec(cls, nsec) -> "Timespec64Concrete":
"""Creates a new instance from an integer in nanoseconds"""
# Based on ns_to_timespec64()
if nsec > 0:
tv_sec = nsec // linux_constants.NSEC_PER_SEC
tv_nsec = nsec % linux_constants.NSEC_PER_SEC
elif nsec < 0:
tv_sec = -((-nsec - 1) // linux_constants.NSEC_PER_SEC) - 1
rem = (-nsec - 1) % linux_constants.NSEC_PER_SEC
tv_nsec = linux_constants.NSEC_PER_SEC - rem - 1
else:
tv_sec = tv_nsec = 0
return Timespec64Concrete(tv_sec=tv_sec, tv_nsec=tv_nsec)
[docs]
def to_datetime(self) -> datetime.datetime:
"""Converts this Timespec64Abstract subclass object to a UTC aware datetime"""
# pylint: disable=E1101
return conversion.unixtime_to_datetime(
self.tv_sec + self.tv_nsec / linux_constants.NSEC_PER_SEC
)
[docs]
def to_timedelta(self) -> datetime.timedelta:
"""Converts this Timespec64Abstract subclass object to timedelta"""
# pylint: disable=E1101
return datetime.timedelta(
seconds=self.tv_sec + self.tv_nsec / linux_constants.NSEC_PER_SEC
)
def __add__(self, other) -> "Timespec64Concrete":
"""Returns a new Timespec64Concrete object that sums the current values with those
in the timespec argument"""
if not isinstance(other, Timespec64Abstract):
raise TypeError("Requires an object subclass of Timespec64Abstract")
# pylint: disable=E1101
result = Timespec64Concrete(
tv_sec=self.tv_sec + other.tv_sec,
tv_nsec=self.tv_nsec + other.tv_nsec,
)
result.normalize()
return result
def __sub__(self, other) -> "Timespec64Concrete":
"""Returns a new Timespec64Abstract object that subtracts the values in the timespec
argument from the current object's values"""
if not isinstance(other, Timespec64Abstract):
raise TypeError("Requires an object subclass of Timespec64Abstract")
return self + other.negate()
[docs]
def negate(self) -> "Timespec64Concrete":
"""Returns a new Timespec64Concrete object with the values of the current object negated"""
# pylint: disable=E1101
result = Timespec64Concrete(
tv_sec=-self.tv_sec,
tv_nsec=-self.tv_nsec,
)
result.normalize()
return result
[docs]
def normalize(self):
"""Normalize any overflow in tv_sec and tv_nsec."""
# Based on kernel's set_normalized_timespec64()
# pylint: disable=E1101
while self.tv_nsec >= linux_constants.NSEC_PER_SEC:
self.tv_nsec -= linux_constants.NSEC_PER_SEC
self.tv_sec += 1
while self.tv_nsec < 0:
self.tv_nsec += linux_constants.NSEC_PER_SEC
self.tv_sec -= 1
[docs]
class Timespec64Concrete(Timespec64Abstract):
"""Handle all required timespec64 operations, conversions and adjustments.
This is used to dynamically create timespec64-like objects, each with its own variables
and the same methods as a timespec64 object extension.
"""
def __init__(self, tv_sec=0, tv_nsec=0):
self.tv_sec = tv_sec
self.tv_nsec = tv_nsec
[docs]
class timespec64(Timespec64Abstract, objects.StructType):
"""Handle all required timespec64 operations, conversions and adjustments.
This works as an extension of the timespec64 object while maintaining the same methods
as a Timespec64Concrete object.
"""
[docs]
class inode(objects.StructType):
[docs]
def is_valid(self) -> bool:
# i_count is a 'signed' counter (atomic_t). Smear, or essentially a wrong inode
# pointer, will easily cause an integer overflow here.
return self.i_ino > 0 and self.i_count.counter >= 0
@property
def is_dir(self) -> bool:
"""Returns True if the inode is a directory"""
return stat.S_ISDIR(self.i_mode) != 0
@property
def is_reg(self) -> bool:
"""Returns True if the inode is a regular file"""
return stat.S_ISREG(self.i_mode) != 0
@property
def is_link(self) -> bool:
"""Returns True if the inode is a symlink"""
return stat.S_ISLNK(self.i_mode) != 0
@property
def is_fifo(self) -> bool:
"""Returns True if the inode is a FIFO"""
return stat.S_ISFIFO(self.i_mode) != 0
@property
def is_sock(self) -> bool:
"""Returns True if the inode is a socket"""
return stat.S_ISSOCK(self.i_mode) != 0
@property
def is_block(self) -> bool:
"""Returns True if the inode is a block device"""
return stat.S_ISBLK(self.i_mode) != 0
@property
def is_char(self) -> bool:
"""Returns True if the inode is a char device"""
return stat.S_ISCHR(self.i_mode) != 0
@property
def is_sticky(self) -> bool:
"""Returns True if the sticky bit is set"""
return (self.i_mode & stat.S_ISVTX) != 0
[docs]
def get_inode_type(self) -> Union[str, None]:
"""Returns inode type name
Returns:
The inode type name
"""
if self.is_dir:
return "DIR"
elif self.is_reg:
return "REG"
elif self.is_link:
return "LNK"
elif self.is_fifo:
return "FIFO"
elif self.is_sock:
return "SOCK"
elif self.is_char:
return "CHR"
elif self.is_block:
return "BLK"
else:
return None
def _time_member_to_datetime(
self, member
) -> Union[datetime.datetime, interfaces.renderers.BaseAbsentValue]:
if self.has_member(f"{member}_sec") and self.has_member(f"{member}_nsec"):
# kernels >= 6.11 it's i_*_sec -> time64_t and i_*_nsec -> u32
# Ref Linux commit 3aa63a569c64e708df547a8913c84e64a06e7853
return conversion.unixtime_to_datetime(
self.member(f"{member}_sec") + self.has_member(f"{member}_nsec") / 1e9
)
elif self.has_member(f"__{member}"):
# 6.6 <= kernels < 6.11 it's a timespec64
# Ref Linux commit 13bc24457850583a2e7203ded05b7209ab4bc5ef / 12cd44023651666bd44baa36a5c999698890debb
return self.member(f"__{member}").to_datetime()
elif self.has_member(member):
# In kernels < 6.6 it's a timespec64 or timespec
return self.member(member).to_datetime()
else:
raise exceptions.VolatilityException(
"Unsupported kernel inode type implementation"
)
[docs]
def get_access_time(self) -> datetime.datetime:
"""Returns the inode's last access time
This is updated when inode contents are read
Returns:
A datetime with the inode's last access time
"""
return self._time_member_to_datetime("i_atime")
[docs]
def get_modification_time(self) -> datetime.datetime:
"""Returns the inode's last modification time
This is updated when the inode contents change
Returns:
A datetime with the inode's last data modification time
"""
return self._time_member_to_datetime("i_mtime")
[docs]
def get_change_time(self) -> datetime.datetime:
"""Returns the inode's last change time
This is updated when the inode metadata changes
Returns:
A datetime with the inode's last change time
"""
return self._time_member_to_datetime("i_ctime")
[docs]
def get_file_mode(self) -> str:
"""Returns the inode's file mode as string of the form '-rwxrwxrwx'.
Returns:
The inode's file mode string
"""
return stat.filemode(self.i_mode)
[docs]
def get_pages(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Gets the inode's cached pages
Yields:
The inode's cached pages
"""
if not self.i_size:
return
if not (
self.i_mapping
and self.i_mapping.is_readable()
and self.i_mapping.nrpages > 0
):
return
page_cache = linux.PageCache(
context=self._context,
kernel_module_name="kernel",
page_cache=self.i_mapping.dereference(),
)
yield from page_cache.get_cached_pages()
[docs]
def get_contents(self) -> Iterable[Tuple[int, bytes]]:
"""Get the inode cached pages from the page cache
Yields:
page_index (int): The page index in the Tree. File offset is page_index * PAGE_SIZE.
page_content (bytes): The page content
"""
for page_obj in self.get_pages():
if page_obj.mapping != self.i_mapping:
vollog.warning(
f"Cached page at {page_obj.vol.offset:#x} has a mismatched address space with the inode. Skipping page"
)
continue
page_index = int(page_obj.index)
page_content = page_obj.get_content()
if page_content:
yield page_index, page_content
[docs]
class address_space(objects.StructType):
@property
def i_pages(self):
"""Returns the appropriate member containing the page cache tree"""
if self.has_member("i_pages"):
# Kernel >= 4.17 b93b016313b3ba8003c3b8bb71f569af91f19fc7
return self.member("i_pages")
elif self.has_member("page_tree"):
# Kernel < 4.17
return self.member("page_tree")
raise exceptions.VolatilityException("Unsupported page cache tree")
[docs]
class page(objects.StructType):
[docs]
def is_valid(self) -> bool:
if self.mapping and not self.mapping.is_readable():
return False
if self.to_paddr() < 0:
return False
return True
@functools.cached_property
def pageflags_enum(self) -> Dict:
"""Returns 'pageflags' enumeration key/values
Returns:
A dictionary with the pageflags enumeration key/values
"""
try:
pageflags_enum = self._context.symbol_space.get_enumeration(
self.get_symbol_table_name() + constants.BANG + "pageflags"
).choices
except exceptions.SymbolError:
vollog.debug(
"Unable to find pageflags enum. This can happen in kernels < 2.6.26 or wrong ISF"
)
# set to empty dict to show that the enum was not found, and so shouldn't be searched for again
pageflags_enum = {}
return pageflags_enum
@functools.cached_property
def _intel_vmemmap_start(self) -> int:
"""Determine the start of the struct page array, for Intel systems.
Returns:
int: vmemmap_start address
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
vmemmap_start = None
if vmlinux.has_symbol("mem_section"):
# SPARSEMEM_VMEMMAP physical memory model: memmap is virtually contiguous
if vmlinux.has_symbol("vmemmap_base"):
# CONFIG_DYNAMIC_MEMORY_LAYOUT - KASLR kernels >= 4.9
vmemmap_start = vmlinux.object_from_symbol("vmemmap_base")
else:
# !CONFIG_DYNAMIC_MEMORY_LAYOUT
if vmlinux_layer._maxvirtaddr < 57:
# 4-Level paging -> VMEMMAP_START = __VMEMMAP_BASE_L4
vmemmap_base_l4 = 0xFFFFEA0000000000
vmemmap_start = vmemmap_base_l4
else:
# 5-Level paging -> VMEMMAP_START = __VMEMMAP_BASE_L5
# FIXME: Once 5-level paging is supported, uncomment the following lines and remove the exception
# vmemmap_base_l5 = 0xFFD4000000000000
# vmemmap_start = vmemmap_base_l5
raise exceptions.VolatilityException(
"5-level paging is not yet supported"
)
elif vmlinux.has_symbol("mem_map"):
# FLATMEM physical memory model, typically 32bit
vmemmap_start = vmlinux.object_from_symbol("mem_map")
elif vmlinux.has_symbol("node_data"):
raise exceptions.VolatilityException("NUMA systems are not yet supported")
else:
raise exceptions.VolatilityException("Unsupported Linux memory model")
if not vmemmap_start:
raise exceptions.VolatilityException(
"Something went wrong, we shouldn't be here"
)
return vmemmap_start
def _intel_to_paddr(self) -> int:
"""Converts a page's virtual address to its physical address using the current Intel memory model.
Returns:
int: page physical address
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
pagec = vmlinux_layer.canonicalize(self.vol.offset)
pfn = (pagec - self._intel_vmemmap_start) // vmlinux.get_type("page").size
page_paddr = pfn * vmlinux_layer.page_size
return page_paddr
[docs]
def to_paddr(self) -> int:
"""Converts a page's virtual address to its physical address using the current CPU memory model.
Returns:
int: page physical address
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
if isinstance(vmlinux_layer, intel.Intel):
page_paddr = self._intel_to_paddr()
else:
raise exceptions.LayerException(
f"Architecture {type(vmlinux_layer)} vmemmap_start calculation isn't currently supported."
)
return page_paddr
[docs]
def get_content(self) -> Union[bytes, None]:
"""Returns the page content
Returns:
The page content
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
vmlinux_layer = vmlinux.context.layers[vmlinux.layer_name]
physical_layer_name = self._context.layers[self.vol.layer_name].config.get(
"memory_layer", self.vol.layer_name
)
physical_layer = self._context.layers[physical_layer_name]
page_paddr = self.to_paddr()
if not page_paddr:
return None
if not physical_layer.is_valid(page_paddr, length=vmlinux_layer.page_size):
vollog.debug(
"Unable to read page 0x%x content at 0x%x", self.vol.offset, page_paddr
)
return None
return physical_layer.read(page_paddr, vmlinux_layer.page_size)
[docs]
def get_flags_list(self) -> List[str]:
"""Returns a list of page flags
Returns:
List of page flags
"""
flags = []
for name, value in self.pageflags_enum.items():
if self.flags & (1 << value) != 0:
flags.append(name)
return flags
[docs]
class IDR(objects.StructType):
IDR_BITS = 8
IDR_MASK = (1 << IDR_BITS) - 1
INT_SIZE = 4
MAX_IDR_SHIFT = INT_SIZE * 8 - 1
MAX_IDR_BIT = 1 << MAX_IDR_SHIFT
[docs]
def idr_max(self, num_layers: int) -> int:
"""Returns the maximum ID which can be allocated given idr::layers
Args:
num_layers: Number of layers
Returns:
Maximum ID for a given number of layers
"""
# Kernel < 4.17
bits = min([self.INT_SIZE, num_layers * self.IDR_BITS, self.MAX_IDR_SHIFT])
return (1 << bits) - 1
[docs]
def idr_find(self, idr_id: int) -> Optional[int]:
"""Finds an ID within the IDR data structure.
Based on idr_find_slowpath(), 3.9 <= Kernel < 4.11
Args:
idr_id: The IDR lookup ID
Returns:
A pointer to the given ID element
"""
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
if not vmlinux.get_type("idr_layer").has_member("layer"):
vollog.info(
"Unsupported IDR implementation, it should be a very very old kernel, probably < 2.6"
)
return None
if idr_id < 0:
return None
idr_layer = self.top
if not idr_layer:
return None
n = (idr_layer.layer + 1) * self.IDR_BITS
if idr_id > self.idr_max(idr_layer.layer + 1):
return None
assert n != 0
while n > 0 and idr_layer:
n -= self.IDR_BITS
assert n == idr_layer.layer * self.IDR_BITS
idr_layer = idr_layer.ary[(idr_id >> n) & self.IDR_MASK]
return idr_layer
def _old_kernel_get_entries(self) -> Iterable[int]:
# Kernels < 4.11
cur = self.cur
total = next_id = 0
while next_id < cur:
entry = self.idr_find(next_id)
if entry:
yield entry
total += 1
next_id += 1
def _new_kernel_get_entries(self) -> Iterable[int]:
# Kernels >= 4.11
id_storage = linux.IDStorage.choose_id_storage(
self._context, kernel_module_name="kernel"
)
yield from id_storage.get_entries(root=self.idr_rt)
[docs]
def get_entries(self) -> Iterable[int]:
"""Walks the IDR and yield a pointer associated with each element.
Args:
in_use (int, optional): _description_. Defaults to 0.
Yields:
A pointer associated with each element.
"""
if self.has_member("idr_rt"):
# Kernels >= 4.11
get_entries_func = self._new_kernel_get_entries
else:
# Kernels < 4.11
get_entries_func = self._old_kernel_get_entries
yield from get_entries_func()
[docs]
class rb_root(objects.StructType):
def _walk_nodes(
self, root_node: interfaces.objects.ObjectInterface
) -> Iterator[int]:
"""Traverses the Red-Black tree from the root node and yields a pointer to each
node in this tree.
Args:
root_node: A Red-Black tree node pointer from which to start descending
Yields:
A pointer to every node descending from the specified root node
"""
if not (root_node and root_node.is_readable()):
return
yield root_node
yield from self._walk_nodes(root_node.rb_left)
yield from self._walk_nodes(root_node.rb_right)
[docs]
def get_nodes(self) -> Iterator[int]:
"""Yields a pointer to each node in the Red-Black tree
Yields:
A pointer to every node in the Red-Black tree
"""
yield from self._walk_nodes(root_node=self.rb_node)
[docs]
class scatterlist(objects.StructType):
SG_CHAIN = 0x01
SG_END = 0x02
SG_PAGE_LINK_MASK = SG_CHAIN | SG_END
def _sg_flags(self) -> int:
return self.page_link & self.SG_PAGE_LINK_MASK
def _sg_is_chain(self) -> int:
return self._sg_flags() & self.SG_CHAIN
def _sg_is_last(self) -> int:
return self._sg_flags() & self.SG_END
def _sg_chain_ptr(self) -> int:
"""Clears the last two bits basically."""
return self.page_link & ~self.SG_PAGE_LINK_MASK
def _sg_dma_len(self) -> int:
# Depends on CONFIG_NEED_SG_DMA_LENGTH
if self.has_member("dma_length"):
return self.dma_length
return self.length
def _get_sg_max_single_alloc(self) -> int:
"""Based on kernel's SG_MAX_SINGLE_ALLOC.
Doc. from kernel source :
* Maximum number of entries that will be allocated in one piece, if
* a list larger than this is required then chaining will be utilized.
"""
return self._context.layers[self.vol.layer_name].page_size // self.vol.size
def _sg_next(self) -> Optional[interfaces.objects.ObjectInterface]:
"""Get the next scatterlist struct from the list.
Based on kernel's sg_next.
Doc. from kernel source :
* Notes on SG table design.
*
* We use the unsigned long page_link field in the scatterlist struct to place
* the page pointer AND encode information about the sg table as well. The two
* lower bits are reserved for this information.
*
* If bit 0 is set, then the page_link contains a pointer to the next sg
* table list. Otherwise the next entry is at sg + 1.
*
* If bit 1 is set, then this sg entry is the last element in a list.
"""
if self._sg_is_last():
return None
if self._sg_is_chain():
next_address = self._sg_chain_ptr()
else:
next_address = self.vol.offset + self.vol.size
sg = self._context.object(
self.get_symbol_table_name() + constants.BANG + "scatterlist",
self.vol.layer_name,
next_address,
)
return sg
[docs]
def for_each_sg(self) -> Optional[Iterator[interfaces.objects.ObjectInterface]]:
"""Iterate over each struct in the scatterlist."""
sg = self
sg_max_single_alloc = self._get_sg_max_single_alloc()
# Empty scatterlists protection
if sg.page_link == 0 and sg._sg_dma_len() == 0 and sg.dma_address == 0:
return None
else:
# Yield itself first
yield sg
entries_count = 1
# entries_count <= sg_max_single_alloc should always be true if the
# scatterlists were correctly chained.
while entries_count <= sg_max_single_alloc:
sg = sg._sg_next()
if sg is None:
break
# Points to a new scatterlist
elif sg._sg_is_chain():
entries_count = 0
else:
entries_count += 1
yield sg
[docs]
def get_content(
self,
) -> Optional[Iterator[bytes]]:
"""Traverse a scatterlist to gather content located at each
dma_address position.
Returns:
An iterator of bytes
"""
# Either "physical" is layer-1 because this is a module layer, or "physical" is the current layer
physical_layer_name = self._context.layers[self.vol.layer_name].config.get(
"memory_layer", self.vol.layer_name
)
physical_layer = self._context.layers[physical_layer_name]
for sg in self.for_each_sg():
yield from physical_layer.read(sg.dma_address, sg._sg_dma_len())
[docs]
class latch_tree_root(objects.StructType):
"""Latched RB-trees implementation"""
@functools.cached_property
def _vmlinux(self):
return linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
@functools.lru_cache
def _get_type_cached(self, name):
return self._vmlinux.get_type(name)
def _get_lt_node_from_rb_node(
self, rb_node, index
) -> Optional[interfaces.objects.ObjectInterface]:
"""Gets the latch tree node from the RBTree node.
Based on __lt_from_rb()
"""
# Unfortunately, we cannot use our LinuxUtilities.container_of() here, since the
# member is indexed by the 'index' variable:
# ltn = container_of(node, struct latch_tree_node, node[idx])
pointer_size = self._get_type_cached("pointer").size
type_dec = self._get_type_cached("latch_tree_node")
member_offset = type_dec.relative_child_offset("node") + index * pointer_size
container_addr = rb_node.vol.offset - member_offset
return self._vmlinux.object(
object_type="latch_tree_node", offset=container_addr, absolute=True
)
[docs]
def find(
self, key: int, comp_function: Callable
) -> Optional[interfaces.objects.ObjectInterface]:
"""Returns a pointer to the node matching key or None.
Based on latch_tree_find() and __lt_find()
Args:
key (int): Typically an address
comp_function: Callback comparison function to provide the order between the
search key and an element. It's works like the kernel's latch_tree_ops::comp
i.e.: comp_function(key, latch_tree_node)
Returns:
latch_tree_node: A pointer to the node matching key or None.
"""
# latch_tree_root >= 4.2 ade3f510f93a5613b672febe88eff8ea7f1c63b7
# Use the lowest sequence bit as an index for picking which data copy to read
if self.seq.has_member("seqcount"):
# kernels >= 5.10 0c9794c8b6781eb7dad8e19b78c5d4557790597a
sequence = self.seq.seqcount.sequence
elif self.seq.has_member("sequence"):
# 4.2 <= kernel < 5.10
sequence = self.seq.sequence
else:
raise AttributeError("Unsupported sequence type implementation")
idx = sequence & 1
rb_node_ptr = self.tree[idx].rb_node
while rb_node_ptr and rb_node_ptr.is_readable():
rb_node = rb_node_ptr.dereference()
lt_node = self._get_lt_node_from_rb_node(rb_node, idx)
c = comp_function(key, lt_node)
if c is None:
return None
elif c < 0:
rb_node_ptr = rb_node.rb_left
elif c > 0:
rb_node_ptr = rb_node.rb_right
else:
return lt_node
return None
[docs]
class kernel_symbol(objects.StructType):
def _offset_to_ptr(self, off) -> int:
layer = self._context.layers[self.vol.layer_name]
long_mask = (1 << layer.bits_per_register) - 1
return (self.vol.offset + off) & long_mask
def _do_get_name(self) -> str:
if self.has_member("name_offset"):
# kernel >= 4.19 and CONFIG_HAVE_ARCH_PREL32_RELOCATIONS=y
# See 7290d58095712a89f845e1bca05334796dd49ed2
name_offset = self._offset_to_ptr(self.name_offset)
elif self.has_member("name"):
# kernel < 4.19 or CONFIG_HAVE_ARCH_PREL32_RELOCATIONS=n
name_offset = self.member("name")
else:
raise AttributeError("Unsupported kernel_symbol type implementation")
return utility.pointer_to_string(
name_offset, linux_constants.KSYM_NAME_LEN, errors="ignore"
)
[docs]
def get_name(self) -> Optional[str]:
try:
return self._do_get_name()
except exceptions.InvalidAddressException:
return None
def _do_get_value(self) -> int:
if self.has_member("value_offset"):
# kernel >= 4.19 and CONFIG_HAVE_ARCH_PREL32_RELOCATIONS=y
# See 7290d58095712a89f845e1bca05334796dd49ed2
return self._offset_to_ptr(self.value_offset)
elif self.has_member("value"):
# kernel < 4.19 or CONFIG_HAVE_ARCH_PREL32_RELOCATIONS=n
return self.member("value")
raise AttributeError("Unsupported kernel_symbol type implementation")
[docs]
def get_value(self) -> Optional[int]:
try:
return self._do_get_value()
except exceptions.InvalidAddressException:
return None
def _do_get_namespace(self) -> str:
if self.has_member("namespace_offset"):
# kernel >= 4.19 and CONFIG_HAVE_ARCH_PREL32_RELOCATIONS=y
# See 7290d58095712a89f845e1bca05334796dd49ed2
namespace_offset = self._offset_to_ptr(self.namespace_offset)
elif self.has_member("namespace"):
# kernel < 4.19 or CONFIG_HAVE_ARCH_PREL32_RELOCATIONS=n
namespace_offset = self.member("namespace")
else:
raise AttributeError("Unsupported kernel_symbol type implementation")
return utility.pointer_to_string(
namespace_offset, linux_constants.KSYM_NAME_LEN, errors="ignore"
)
[docs]
def get_namespace(self) -> Optional[str]:
try:
return self._do_get_namespace()
except exceptions.InvalidAddressException:
return None
[docs]
class module_sect_attr(objects.StructType):
[docs]
def get_name(self) -> Optional[str]:
"""
Performs careful extraction of the section name
The `name` member has changed type and meaning over time
It also was present even in cases with `mattr` present, which
holds the name the kernel uses
"""
if hasattr(self, "battr"):
try:
return utility.pointer_to_string(
self.battr.attr.name, count=linux_constants.ATTRIBUTE_NAME_MAX_SIZE
)
except exceptions.InvalidAddressException:
# if battr is present then its name attribute needs to be valid
vollog.debug(f"Invalid battr name for section at {self.vol.offset:#x}")
return None
elif self.name.vol.type_name == "array":
try:
return utility.array_to_string(
self.name, count=linux_constants.ATTRIBUTE_NAME_MAX_SIZE
)
except exceptions.InvalidAddressException:
# specifically do not return here to give `mattr` a chance
vollog.debug(f"Invalid direct name for section at {self.vol.offset:#x}")
elif self.name.vol.type_name == "pointer":
try:
return utility.pointer_to_string(
self.name, count=linux_constants.ATTRIBUTE_NAME_MAX_SIZE
)
except exceptions.InvalidAddressException:
# specifically do not return here to give `mattr` a chance
vollog.debug(
f"Invalid pointer name for section at {self.vol.offset:#x}"
)
# if everything else failed...
if hasattr(self, "mattr"):
try:
return utility.pointer_to_string(
self.mattr.attr.name, count=linux_constants.ATTRIBUTE_NAME_MAX_SIZE
)
except exceptions.InvalidAddressException:
vollog.debug(
f"Unresolvable name for for section at {self.vol.offset:#x}"
)
return None
[docs]
class bin_attribute(objects.StructType):
[docs]
def get_name(self) -> Optional[str]:
"""
Performs extraction of the bin_attribute name
"""
try:
return utility.pointer_to_string(
self.attr.name, count=linux_constants.ATTRIBUTE_NAME_MAX_SIZE
)
except exceptions.InvalidAddressException:
vollog.debug(f"Invalid attr name for bin_attribute at {self.vol.offset:#x}")
return None
@property
def address(self) -> int:
"""Equivalent to module_sect_attr.address:
- https://github.com/torvalds/linux/commit/4b2c11e4aaf7e3d7fd9ce8e5995a32ff5e27d74f
"""
return self.private