Source code for volatility3.plugins.linux.pscallstack

# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
import dataclasses
from typing import List, Iterator

from volatility3.framework import interfaces, renderers, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.renderers import format_hints
from volatility3.framework.constants import architectures
from volatility3.framework.objects import utility
from volatility3.framework.symbols.linux import kallsyms
from volatility3.plugins.linux import pslist

vollog = logging.getLogger(__name__)


[docs] @dataclasses.dataclass class StackEntry: position: int address: int value: int name: str = renderers.NotAvailableValue() type: str = renderers.NotAvailableValue() module: str = renderers.NotAvailableValue()
[docs] class PsCallStack(plugins.PluginInterface): """Enumerates the call stack of each task""" _required_framework_version = (2, 19, 0) _version = (1, 0, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", description="Linux kernel", architectures=architectures.LINUX_ARCHS, ), requirements.VersionRequirement( name="Kallsyms", component=kallsyms.Kallsyms, version=(1, 0, 0) ), requirements.VersionRequirement( name="pslist", component=pslist.PsList, version=(4, 0, 0) ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), requirements.BooleanRequirement( name="unresolved", description="Include unresolved stack values", default=False, optional=True, ), ]
[docs] @classmethod def get_task_callstack( cls, context: interfaces.context.ContextInterface, module_name: str, task: interfaces.objects.ObjectInterface, kas: kallsyms.Kallsyms = None, include_unresolved=False, ) -> Iterator[StackEntry]: """Retrieves the call stack for a given task Args: context: The context used to access memory layers and symbols module_name: The name of the kernel module on which to operate task: The task object whose stack is being retrieved kas: Kallsyms instance for symbol resolution. If not provided or None, a new instance will be created each time include_unresolved: If True, includes stack values that could not be resolved to known symbols. Defaults to False. Yields: StackEntry objects """ task_layer = task.get_address_space_layer() if not task_layer: return None vmlinux = context.modules[module_name] vmlinux_layer = context.layers[vmlinux.layer_name] if not kas: kas = kallsyms.Kallsyms( context=context, layer_name=vmlinux.layer_name, module_name=module_name, ) pointer_size = vmlinux.get_type("pointer").size thread_size_order = 2 # Safe since kernel 3.15 # thread_size_order +=1 # If CONFIG_KASAN is enabled in kernels >= 4.0, default: DISABLED # thread_size_order +=1 # If CONFIG_KASAN_EXTRA is enabled in kernels >= 4.19, default: DISABLED thread_size = vmlinux_layer.page_size << thread_size_order task_base_of_stack = vmlinux_layer.canonicalize(task.stack) task_top_of_stack = task_base_of_stack + thread_size byte_order = task.files.vol.data_format.byteorder rsp_start = task.thread.sp if not (task_base_of_stack <= rsp_start < task_top_of_stack): raise exceptions.VolatilityException( f"Invalid stack pointer {rsp_start:#x} for task {task.pid}" ) current_sp = rsp_start idx = 0 while current_sp < task_top_of_stack: try: stack_value_bytes = task_layer.read(current_sp, pointer_size) except exceptions.InvalidAddressException: break stack_value = int.from_bytes(stack_value_bytes, byteorder=byte_order) if not stack_value: idx += 1 current_sp += pointer_size continue kassymbol = kas.lookup_address(stack_value) sp_address = current_sp & vmlinux_layer.address_mask stack_value &= vmlinux_layer.address_mask if kassymbol: module_name = kassymbol.module_name or renderers.NotAvailableValue() yield StackEntry( position=idx, address=sp_address, value=stack_value, name=kassymbol.name, type=kassymbol.type, module=module_name, ) elif include_unresolved: yield StackEntry( position=idx, address=sp_address, value=stack_value, ) idx += 1 current_sp += pointer_size
def _generator(self): module_name = self.config["kernel"] vmlinux = self.context.modules[module_name] kas = kallsyms.Kallsyms( context=self.context, layer_name=vmlinux.layer_name, module_name=self.config["kernel"], ) include_unresolved = self.config.get("unresolved", False) pids = self.config.get("pid", None) filter_func = pslist.PsList.create_pid_filter(pids) for task in pslist.PsList.list_tasks( self.context, vmlinux.name, filter_func=filter_func, include_threads=True ): task_name = utility.array_to_string(task.comm) for stack_entry in self.get_task_callstack( context=self.context, module_name=vmlinux.name, task=task, kas=kas, include_unresolved=include_unresolved, ): fields = ( task.pid, task_name, stack_entry.position, format_hints.Hex(stack_entry.address), format_hints.Hex(stack_entry.value), stack_entry.name, stack_entry.type, stack_entry.module, ) yield 0, fields
[docs] def run(self): return renderers.TreeGrid( [ ("TID", int), ("Comm", str), ("Position", int), ("Address", format_hints.Hex), ("Value", format_hints.Hex), ("Name", str), ("Type", str), ("Module", str), ], self._generator(), )