Source code for

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at
import datetime
import logging
from typing import Callable, Dict, Set, Tuple

from volatility3.framework import objects, interfaces, renderers, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.renderers import format_hints
from import pslist

vollog = logging.getLogger(__name__)

[docs]class PsTree(interfaces.plugins.PluginInterface): """Plugin for listing processes in a tree based on their parent process ID.""" _required_framework_version = (2, 0, 0) def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._processes: Dict[int, Tuple[interfaces.objects.ObjectInterface, int]] = {} self._levels: Dict[int, int] = {} self._children: Dict[int, Set[int]] = {} self._ancestors: Set[int] = set([])
[docs] @classmethod def get_requirements(cls): return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.BooleanRequirement( name="physical", description="Display physical offsets instead of virtual", default=pslist.PsList.PHYSICAL_DEFAULT, optional=True, ), requirements.VersionRequirement( name="pslist", component=pslist.PsList, version=(2, 0, 0) ), requirements.ListRequirement( name="pid", element_type=int, description="Process ID to include (with ancestors and descendants, all other processes are excluded)", optional=True, ), ]
[docs] def find_level( self, pid: objects.Pointer, filter_func: Callable[ [interfaces.objects.ObjectInterface], bool ] = lambda _: False, ) -> None: """Finds how deep the pid is in the processes list.""" seen = {pid} level = 0 proc, _ = self._processes.get(pid, None) filtered = not filter_func(proc) while proc is not None and proc.InheritedFromUniqueProcessId not in seen: if filtered: self._ancestors.add(proc.UniqueProcessId) child_list = self._children.get(proc.InheritedFromUniqueProcessId, set([])) child_list.add(proc.UniqueProcessId) self._children[proc.InheritedFromUniqueProcessId] = child_list seen.add(proc.InheritedFromUniqueProcessId) proc, _ = self._processes.get( proc.InheritedFromUniqueProcessId, (None, None) ) level += 1 self._levels[pid] = level
def _generator( self, filter_func: Callable[ [interfaces.objects.ObjectInterface], bool ] = lambda _: False, ): """Generates the Tree of processes.""" kernel = self.context.modules[self.config["kernel"]] for proc in pslist.PsList.list_processes( self.context, kernel.layer_name, kernel.symbol_table_name ): if not self.config.get("physical", pslist.PsList.PHYSICAL_DEFAULT): offset = proc.vol.offset else: layer_name = kernel.layer_name memory = self.context.layers[layer_name] (_, _, offset, _, _) = list( memory.mapping(offset=proc.vol.offset, length=0) )[0] self._processes[proc.UniqueProcessId] = proc, offset # Build the child/level maps for pid in self._processes: self.find_level(pid, filter_func) process_pids = set([]) def yield_processes(pid, descendant: bool = False): if pid in process_pids: vollog.debug(f"Pid cycle: already processed pid {pid}") return None process_pids.add(pid) if pid not in self._ancestors and not descendant: vollog.debug(f"Pid cycle: pid {pid} not in filtered tree") return None proc, offset = self._processes[pid] row = ( proc.UniqueProcessId, proc.InheritedFromUniqueProcessId, proc.ImageFileName.cast( "string", max_length=proc.ImageFileName.vol.count, errors="replace" ), format_hints.Hex(offset), proc.ActiveThreads, proc.get_handle_count(), proc.get_session_id(), proc.get_is_wow64(), proc.get_create_time(), proc.get_exit_time(), ) try: audit = proc.SeAuditProcessCreationInfo.ImageFileName.Name # If 'audit' is set to the empty string, display NotAvailableValue row += (audit.get_string() or renderers.NotAvailableValue(),) except exceptions.InvalidAddressException: row += (renderers.NotAvailableValue(),) try: process_params = proc.get_peb().ProcessParameters row += ( process_params.CommandLine.get_string(), process_params.ImagePathName.get_string(), ) except exceptions.InvalidAddressException: row += ( renderers.NotAvailableValue(), renderers.NotAvailableValue(), ) yield (self._levels[pid] - 1, row) for child_pid in self._children.get(pid, []): yield from yield_processes( child_pid, descendant or not filter_func(proc) ) for pid in self._levels: if self._levels[pid] == 1: yield from yield_processes(pid)
[docs] def run(self): offsettype = ( "(V)" if not self.config.get("physical", pslist.PsList.PHYSICAL_DEFAULT) else "(P)" ) return renderers.TreeGrid( [ ("PID", int), ("PPID", int), ("ImageFileName", str), (f"Offset{offsettype}", format_hints.Hex), ("Threads", int), ("Handles", int), ("SessionId", int), ("Wow64", bool), ("CreateTime", datetime.datetime), ("ExitTime", datetime.datetime), ("Audit", str), ("Cmd", str), ("Path", str), ], self._generator( filter_func=pslist.PsList.create_pid_filter( self.config.get("pid", None) ), ), )