Source code for volatility3.plugins.mac.pslist

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import datetime
import logging
from typing import Callable, Dict, Iterable, List

from volatility3.framework import exceptions, interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols import mac

vollog = logging.getLogger(__name__)


[docs]class PsList(interfaces.plugins.PluginInterface): """Lists the processes present in a particular mac memory image.""" _required_framework_version = (2, 0, 0) _version = (3, 0, 0) pslist_methods = ["tasks", "allproc", "process_group", "sessions", "pid_hash_table"]
[docs] @classmethod def get_requirements(cls): return [ requirements.ModuleRequirement( name="kernel", description="Kernel module for the OS", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="macutils", component=mac.MacUtilities, version=(1, 1, 0) ), requirements.ChoiceRequirement( name="pslist_method", description="Method to determine for processes", choices=cls.pslist_methods, default=cls.pslist_methods[0], optional=True, ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), ]
[docs] @classmethod def get_list_tasks(cls, method: str) -> Callable[ [interfaces.context.ContextInterface, str, Callable[[int], bool]], Iterable[interfaces.objects.ObjectInterface], ]: """Returns the list_tasks method based on the selector Args: method: Must be one fo the available methods in get_task_choices Returns: list_tasks method for listing tasks """ # Ensure method is one of the suitable choices if method not in cls.pslist_methods: method = cls.pslist_methods[0] if method == "allproc": list_tasks = cls.list_tasks_allproc elif method == "tasks": list_tasks = cls.list_tasks_tasks elif method == "process_group": list_tasks = cls.list_tasks_process_group elif method == "sessions": list_tasks = cls.list_tasks_sessions elif method == "pid_hash_table": list_tasks = cls.list_tasks_pid_hash_table else: raise ValueError("Impossible method choice chosen") vollog.debug(f"Using method {method}") return list_tasks
[docs] @classmethod def create_pid_filter(cls, pid_list: List[int] = None) -> Callable[[int], bool]: filter_func = lambda _: False # FIXME: mypy #4973 or #2608 pid_list = pid_list or [] filter_list = [x for x in pid_list if x is not None] if filter_list: def list_filter(x): return x.p_pid not in filter_list filter_func = list_filter return filter_func
def _generator(self): list_tasks = self.get_list_tasks( self.config.get("pslist_method", self.pslist_methods[0]) ) for task in list_tasks( self.context, self.config["kernel"], filter_func=self.create_pid_filter(self.config.get("pid", None)), ): offset = format_hints.Hex(task.vol.offset) name = utility.array_to_string(task.p_comm) pid = task.p_pid uid = task.p_uid gid = task.p_gid start_time_seconds = task.p_start.tv_sec start_time_microseconds = task.p_start.tv_usec start_time = datetime.datetime.fromtimestamp( start_time_seconds + start_time_microseconds / 1e6 ) ppid = task.p_ppid yield (0, (offset, name, pid, uid, gid, start_time, ppid))
[docs] @classmethod def list_tasks_allproc( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, filter_func: Callable[[int], bool] = lambda _: False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Lists all the processes in the primary layer based on the allproc method Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the the kernel module on which to operate filter_func: A function which takes a process object and returns True if the process should be ignored/filtered Returns: The list of process objects from the processes linked list after filtering """ kernel = context.modules[kernel_module_name] kernel_layer = context.layers[kernel.layer_name] proc = kernel.object_from_symbol(symbol_name="allproc").lh_first seen: Dict[int, int] = {} while proc is not None and proc.vol.offset != 0: if proc.vol.offset in seen: vollog.log( logging.INFO, "Recursive process list detected (a result of non-atomic acquisition).", ) break else: seen[proc.vol.offset] = 1 if kernel_layer.is_valid( proc.vol.offset, proc.vol.size ) and not filter_func(proc): yield proc try: proc = proc.p_list.le_next.dereference() except exceptions.InvalidAddressException: break
[docs] @classmethod def list_tasks_tasks( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, filter_func: Callable[[int], bool] = lambda _: False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Lists all the tasks in the primary layer based on the tasks queue Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the the kernel module on which to operate filter_func: A function which takes a task object and returns True if the task should be ignored/filtered Returns: The list of task objects from the `layer_name` layer's `tasks` list after filtering """ kernel = context.modules[kernel_module_name] kernel_layer = context.layers[kernel.layer_name] queue_entry = kernel.object_from_symbol(symbol_name="tasks") seen: Dict[int, int] = {} for task in queue_entry.walk_list(queue_entry, "tasks", "task"): if task.vol.offset in seen: vollog.log( logging.INFO, "Recursive process list detected (a result of non-atomic acquisition).", ) break else: seen[task.vol.offset] = 1 try: proc = task.bsd_info.dereference().cast("proc") except exceptions.InvalidAddressException: continue if kernel_layer.is_valid( proc.vol.offset, proc.vol.size ) and not filter_func(proc): yield proc
[docs] @classmethod def list_tasks_sessions( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, filter_func: Callable[[int], bool] = lambda _: False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Lists all the tasks in the primary layer using sessions Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the the kernel module on which to operate filter_func: A function which takes a task object and returns True if the task should be ignored/filtered Returns: The list of task objects from the `layer_name` layer's `tasks` list after filtering """ kernel = context.modules[kernel_module_name] table_size = kernel.object_from_symbol(symbol_name="sesshash") sesshashtbl = kernel.object_from_symbol(symbol_name="sesshashtbl") proc_array = kernel.object( object_type="array", offset=sesshashtbl, count=table_size + 1, subtype=kernel.get_type("sesshashhead"), ) for proc_list in proc_array: for proc in mac.MacUtilities.walk_list_head(proc_list, "s_hash"): if proc.s_leader.is_readable() and not filter_func(proc.s_leader): yield proc.s_leader
[docs] @classmethod def list_tasks_process_group( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, filter_func: Callable[[int], bool] = lambda _: False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Lists all the tasks in the primary layer using process groups Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the the kernel module on which to operate filter_func: A function which takes a task object and returns True if the task should be ignored/filtered Returns: The list of task objects from the `layer_name` layer's `tasks` list after filtering """ kernel = context.modules[kernel_module_name] table_size = kernel.object_from_symbol(symbol_name="pgrphash") pgrphashtbl = kernel.object_from_symbol(symbol_name="pgrphashtbl") proc_array = kernel.object( object_type="array", offset=pgrphashtbl, count=table_size + 1, subtype=kernel.get_type("pgrphashhead"), ) for proc_list in proc_array: for pgrp in mac.MacUtilities.walk_list_head(proc_list, "pg_hash"): for proc in mac.MacUtilities.walk_list_head( pgrp.pg_members, "p_pglist" ): if not filter_func(proc): yield proc
[docs] @classmethod def list_tasks_pid_hash_table( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, filter_func: Callable[[int], bool] = lambda _: False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Lists all the tasks in the primary layer using the pid hash table Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the the kernel module on which to operate filter_func: A function which takes a task object and returns True if the task should be ignored/filtered Returns: The list of task objects from the `layer_name` layer's `tasks` list after filtering """ kernel = context.modules[kernel_module_name] table_size = kernel.object_from_symbol(symbol_name="pidhash") pidhashtbl = kernel.object_from_symbol(symbol_name="pidhashtbl") proc_array = kernel.object( object_type="array", offset=pidhashtbl, count=table_size + 1, subtype=kernel.get_type("pidhashhead"), ) for proc_list in proc_array: for proc in mac.MacUtilities.walk_list_head(proc_list, "p_hash"): if not filter_func(proc): yield proc
[docs] def run(self): return renderers.TreeGrid( [ ("OFFSET", format_hints.Hex), ("NAME", str), ("PID", int), ("UID", int), ("GID", int), ("Start Time", datetime.datetime), ("PPID", int), ], self._generator(), )