Source code for volatility3.plugins.linux.pslist

# This file is Copyright 2021 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import datetime
from typing import Any, Callable, Iterable, List, Tuple

from volatility3.framework import interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.linux.extensions import elf
from volatility3.plugins import timeliner
from volatility3.plugins.linux import elfs


[docs]class PsList(interfaces.plugins.PluginInterface, timeliner.TimeLinerInterface): """Lists the processes present in a particular linux memory image.""" _required_framework_version = (2, 0, 0) _version = (2, 3, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", description="Linux kernel", architectures=["Intel32", "Intel64"], ), requirements.PluginRequirement( name="elfs", plugin=elfs.Elfs, version=(2, 0, 0) ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), requirements.BooleanRequirement( name="threads", description="Include user threads", optional=True, default=False, ), requirements.BooleanRequirement( name="decorate_comm", description="Show `user threads` comm in curly brackets, and `kernel threads` comm in square brackets", optional=True, default=False, ), requirements.BooleanRequirement( name="dump", description="Extract listed processes", optional=True, default=False, ), ]
[docs] @classmethod def create_pid_filter(cls, pid_list: List[int] = None) -> Callable[[Any], bool]: """Constructs a filter function for process IDs. Args: pid_list: List of process IDs that are acceptable (or None if all are acceptable) Returns: Function which, when provided a process object, returns True if the process is to be filtered out of the list """ # FIXME: mypy #4973 or #2608 pid_list = pid_list or [] filter_list = [x for x in pid_list if x is not None] if filter_list: def filter_func(x): return x.pid not in filter_list return filter_func else: return lambda _: False
[docs] @classmethod def get_task_fields( cls, task: interfaces.objects.ObjectInterface, decorate_comm: bool = False ) -> Tuple[int, int, int, int, str, datetime.datetime]: """Extract the fields needed for the final output Args: task: A task object from where to get the fields. decorate_comm: If True, it decorates the comm string of user threads in curly brackets, and of Kernel threads in square brackets. Defaults to False. Returns: A tuple with the fields to show in the plugin output. """ pid = task.tgid tid = task.pid ppid = task.parent.tgid if task.parent else 0 name = utility.array_to_string(task.comm) start_time = task.get_create_time() if decorate_comm: if task.is_kernel_thread: name = f"[{name}]" elif task.is_user_thread: name = f"{{{name}}}" task_fields = (task.vol.offset, pid, tid, ppid, name, start_time) return task_fields
def _get_file_output(self, task: interfaces.objects.ObjectInterface) -> str: """Extract the elf for the process if requested Args: task: A task object to extract from. Returns: A string showing the results of the extraction, either the filename used or an error. """ elf_table_name = intermed.IntermediateSymbolTable.create( self.context, self.config_path, "linux", "elf", class_types=elf.class_types, ) proc_layer_name = task.add_process_layer() if not proc_layer_name: # if we can't build a proc layer we can't # extract the elf return renderers.NotApplicableValue() else: # Find the vma that belongs to the main ELF of the process file_output = "Error outputting file" for v in task.mm.get_vma_iter(): if v.vm_start == task.mm.start_code: file_handle = elfs.Elfs.elf_dump( self.context, proc_layer_name, elf_table_name, v, task, self.open, ) if file_handle: file_output = str(file_handle.preferred_filename) file_handle.close() break else: file_output = "VMA start matching task start_code not found" return file_output def _generator( self, pid_filter: Callable[[Any], bool], include_threads: bool = False, decorate_comm: bool = False, dump: bool = False, ): """Generates the tasks list. Args: pid_filter: A function which takes a process object and returns True if the process should be ignored/filtered include_threads: If True, the output will also show the user threads If False, only the thread group leaders will be shown Defaults to False. decorate_comm: If True, it decorates the comm string of - User threads: in curly brackets, - Kernel threads: in square brackets Defaults to False. dump: If True, the main executable of the process is written to a file Defaults to False. Yields: Each rows """ for task in self.list_tasks( self.context, self.config["kernel"], pid_filter, include_threads ): if dump: file_output = self._get_file_output(task) else: file_output = "Disabled" offset, pid, tid, ppid, name, creation_time = self.get_task_fields( task, decorate_comm ) yield 0, ( format_hints.Hex(offset), pid, tid, ppid, name, creation_time or renderers.NotAvailableValue(), file_output, )
[docs] @classmethod def list_tasks( cls, context: interfaces.context.ContextInterface, vmlinux_module_name: str, filter_func: Callable[[int], bool] = lambda _: False, include_threads: bool = False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Lists all the tasks in the primary layer. Args: context: The context to retrieve required elements (layers, symbol tables) from vmlinux_module_name: The name of the kernel module on which to operate filter_func: A function which takes a process object and returns True if the process should be ignored/filtered include_threads: If True, it will also return user threads. Yields: Task objects """ vmlinux = context.modules[vmlinux_module_name] init_task = vmlinux.object_from_symbol(symbol_name="init_task") # Note that the init_task itself is not yielded, since "ps" also never shows it. for task in init_task.tasks: if filter_func(task): continue yield task if include_threads: yield from task.get_threads()
[docs] def run(self): pids = self.config.get("pid") include_threads = self.config.get("threads") decorate_comm = self.config.get("decorate_comm") dump = self.config.get("dump") filter_func = self.create_pid_filter(pids) columns = [ ("OFFSET (V)", format_hints.Hex), ("PID", int), ("TID", int), ("PPID", int), ("COMM", str), ("CREATION TIME", datetime.datetime), ("File output", str), ] return renderers.TreeGrid( columns, self._generator(filter_func, include_threads, decorate_comm, dump) )
[docs] def generate_timeline(self): pids = self.config.get("pid") filter_func = self.create_pid_filter(pids) for task in self.list_tasks( self.context, self.config["kernel"], filter_func, include_threads=True ): offset, user_pid, user_tid, _user_ppid, name, creation_time = ( self.get_task_fields(task) ) description = f"Process {user_pid}/{user_tid} {name} ({offset})" yield (description, timeliner.TimeLinerType.CREATED, creation_time)