Source code for volatility3.plugins.linux.pslist

# This file is Copyright 2021 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at
import datetime
from typing import Any, Callable, Iterable, List, Tuple

from volatility3.framework import interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.linux.extensions import elf
from volatility3.plugins import timeliner
from volatility3.plugins.linux import elfs

[docs]class PsList(interfaces.plugins.PluginInterface, timeliner.TimeLinerInterface): """Lists the processes present in a particular linux memory image.""" _required_framework_version = (2, 0, 0) _version = (2, 3, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", description="Linux kernel", architectures=["Intel32", "Intel64"], ), requirements.PluginRequirement( name="elfs", plugin=elfs.Elfs, version=(2, 0, 0) ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), requirements.BooleanRequirement( name="threads", description="Include user threads", optional=True, default=False, ), requirements.BooleanRequirement( name="decorate_comm", description="Show `user threads` comm in curly brackets, and `kernel threads` comm in square brackets", optional=True, default=False, ), requirements.BooleanRequirement( name="dump", description="Extract listed processes", optional=True, default=False, ), ]
[docs] @classmethod def create_pid_filter(cls, pid_list: List[int] = None) -> Callable[[Any], bool]: """Constructs a filter function for process IDs. Args: pid_list: List of process IDs that are acceptable (or None if all are acceptable) Returns: Function which, when provided a process object, returns True if the process is to be filtered out of the list """ # FIXME: mypy #4973 or #2608 pid_list = pid_list or [] filter_list = [x for x in pid_list if x is not None] if filter_list: def filter_func(x): return not in filter_list return filter_func else: return lambda _: False
[docs] @classmethod def get_task_fields( cls, task: interfaces.objects.ObjectInterface, decorate_comm: bool = False ) -> Tuple[int, int, int, int, str, datetime.datetime]: """Extract the fields needed for the final output Args: task: A task object from where to get the fields. decorate_comm: If True, it decorates the comm string of user threads in curly brackets, and of Kernel threads in square brackets. Defaults to False. Returns: A tuple with the fields to show in the plugin output. """ pid = task.tgid tid = ppid = task.parent.tgid if task.parent else 0 name = utility.array_to_string(task.comm) start_time = task.get_create_time() if decorate_comm: if task.is_kernel_thread: name = f"[{name}]" elif task.is_user_thread: name = f"{{{name}}}" task_fields = (task.vol.offset, pid, tid, ppid, name, start_time) return task_fields
def _get_file_output(self, task: interfaces.objects.ObjectInterface) -> str: """Extract the elf for the process if requested Args: task: A task object to extract from. Returns: A string showing the results of the extraction, either the filename used or an error. """ elf_table_name = intermed.IntermediateSymbolTable.create( self.context, self.config_path, "linux", "elf", class_types=elf.class_types, ) proc_layer_name = task.add_process_layer() if not proc_layer_name: # if we can't build a proc layer we can't # extract the elf return renderers.NotApplicableValue() else: # Find the vma that belongs to the main ELF of the process file_output = "Error outputting file" for v in if v.vm_start == file_handle = elfs.Elfs.elf_dump( self.context, proc_layer_name, elf_table_name, v, task,, ) if file_handle: file_output = str(file_handle.preferred_filename) file_handle.close() break else: file_output = "VMA start matching task start_code not found" return file_output def _generator( self, pid_filter: Callable[[Any], bool], include_threads: bool = False, decorate_comm: bool = False, dump: bool = False, ): """Generates the tasks list. Args: pid_filter: A function which takes a process object and returns True if the process should be ignored/filtered include_threads: If True, the output will also show the user threads If False, only the thread group leaders will be shown Defaults to False. decorate_comm: If True, it decorates the comm string of - User threads: in curly brackets, - Kernel threads: in square brackets Defaults to False. dump: If True, the main executable of the process is written to a file Defaults to False. Yields: Each rows """ for task in self.list_tasks( self.context, self.config["kernel"], pid_filter, include_threads ): if dump: file_output = self._get_file_output(task) else: file_output = "Disabled" offset, pid, tid, ppid, name, creation_time = self.get_task_fields( task, decorate_comm ) yield 0, ( format_hints.Hex(offset), pid, tid, ppid, name, creation_time or renderers.NotAvailableValue(), file_output, )
[docs] @classmethod def list_tasks( cls, context: interfaces.context.ContextInterface, vmlinux_module_name: str, filter_func: Callable[[int], bool] = lambda _: False, include_threads: bool = False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Lists all the tasks in the primary layer. Args: context: The context to retrieve required elements (layers, symbol tables) from vmlinux_module_name: The name of the kernel module on which to operate filter_func: A function which takes a process object and returns True if the process should be ignored/filtered include_threads: If True, it will also return user threads. Yields: Task objects """ vmlinux = context.modules[vmlinux_module_name] init_task = vmlinux.object_from_symbol(symbol_name="init_task") # Note that the init_task itself is not yielded, since "ps" also never shows it. for task in init_task.tasks: if filter_func(task): continue yield task if include_threads: yield from task.get_threads()
[docs] def run(self): pids = self.config.get("pid") include_threads = self.config.get("threads") decorate_comm = self.config.get("decorate_comm") dump = self.config.get("dump") filter_func = self.create_pid_filter(pids) columns = [ ("OFFSET (V)", format_hints.Hex), ("PID", int), ("TID", int), ("PPID", int), ("COMM", str), ("CREATION TIME", datetime.datetime), ("File output", str), ] return renderers.TreeGrid( columns, self._generator(filter_func, include_threads, decorate_comm, dump) )
[docs] def generate_timeline(self): pids = self.config.get("pid") filter_func = self.create_pid_filter(pids) for task in self.list_tasks( self.context, self.config["kernel"], filter_func, include_threads=True ): offset, user_pid, user_tid, _user_ppid, name, creation_time = ( self.get_task_fields(task) ) description = f"Process {user_pid}/{user_tid} {name} ({offset})" yield (description, timeliner.TimeLinerType.CREATED, creation_time)