Source code for volatility3.plugins.mac.kevents

# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

from typing import Iterable, Callable, Tuple

from volatility3.framework import renderers, interfaces, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.symbols import mac
from volatility3.plugins.mac import pslist


[docs]class Kevents(interfaces.plugins.PluginInterface): """Lists event handlers registered by processes""" _required_framework_version = (2, 0, 0) _version = (1, 0, 0) event_types = { 1: "EVFILT_READ", 2: "EVFILT_WRITE", 3: "EVFILT_AIO", 4: "EVFILT_VNODE", 5: "EVFILT_PROC", 6: "EVFILT_SIGNAL", 7: "EVFILT_TIMER", 8: "EVFILT_MACHPORT", 9: "EVFILT_FS", 10: "EVFILT_USER", 12: "EVFILT_VM", } vnode_filters = [ ("NOTE_DELETE", 1), ("NOTE_WRITE", 2), ("NOTE_EXTEND", 4), ("NOTE_ATTRIB", 8), ("NOTE_LINK", 0x10), ("NOTE_RENAME", 0x20), ("NOTE_REVOKE", 0x40), ] proc_filters = [ ("NOTE_EXIT", 0x80000000), ("NOTE_EXITSTATUS", 0x04000000), ("NOTE_FORK", 0x40000000), ("NOTE_EXEC", 0x20000000), ("NOTE_SIGNAL", 0x08000000), ("NOTE_REAP", 0x10000000), ] timer_filters = [ ("NOTE_SECONDS", 1), ("NOTE_USECONDS", 2), ("NOTE_NSECONDS", 4), ("NOTE_ABSOLUTE", 8), ] all_filters = { 4: vnode_filters, # EVFILT_VNODE 5: proc_filters, # EVFILT_PROC 7: timer_filters, # EVFILT_TIMER }
[docs] @classmethod def get_requirements(cls): return [ requirements.ModuleRequirement( name="kernel", description="Kernel module for the OS", architectures=["Intel32", "Intel64"], ), requirements.PluginRequirement( name="pslist", plugin=pslist.PsList, version=(3, 0, 0) ), requirements.VersionRequirement( name="macutils", component=mac.MacUtilities, version=(1, 2, 0) ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), ]
def _parse_flags(self, filter_index, filter_flags): if filter_flags == 0 or filter_index not in self.all_filters: return "" context = [] filters = self.all_filters[filter_index] for flag, index in filters: if filter_flags & index == index: context.append(flag) return ",".join(context) @classmethod def _walk_klist_array(cls, kernel, fdp, array_pointer_member, array_size_member): """ Convenience wrapper for walking an array of lists of kernel events Handles invalid address references """ try: klist_array_pointer = getattr(fdp, array_pointer_member) array_size = getattr(fdp, array_size_member) klist_array = kernel.object( object_type="array", offset=klist_array_pointer, count=array_size + 1, subtype=kernel.get_type("klist"), ) except exceptions.InvalidAddressException: return None for klist in klist_array: for kn in mac.MacUtilities.walk_slist(klist, "kn_link"): yield kn @classmethod def _get_task_kevents(cls, kernel, task): """ Enumerates event filters per task. Uses smear-safe APIs throughout as these data structures see a significant amount of smear """ fdp = task.p_fd for kn in cls._walk_klist_array(kernel, fdp, "fd_knlist", "fd_knlistsize"): yield kn for kn in cls._walk_klist_array(kernel, fdp, "fd_knhash", "fd_knhashmask"): yield kn try: p_klist = task.p_klist except exceptions.InvalidAddressException: return None for kn in mac.MacUtilities.walk_slist(p_klist, "kn_link"): yield kn
[docs] @classmethod def list_kernel_events( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, filter_func: Callable[[int], bool] = lambda _: False, ) -> Iterable[ Tuple[ interfaces.objects.ObjectInterface, interfaces.objects.ObjectInterface, interfaces.objects.ObjectInterface, ] ]: """ Returns the kernel event filters registered Return values: A tuple of 3 elements: 1) The name of the process that registered the filter 2) The process ID of the process that registered the filter 3) The object of the associated kernel event filter """ kernel = context.modules[kernel_module_name] list_tasks = pslist.PsList.get_list_tasks(pslist.PsList.pslist_methods[0]) for task in list_tasks(context, kernel_module_name, filter_func): task_name = utility.array_to_string(task.p_comm) pid = task.p_pid for kn in cls._get_task_kevents(kernel, task): yield task_name, pid, kn
def _generator(self): filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) for task_name, pid, kn in self.list_kernel_events( self.context, self.config["kernel"], filter_func=filter_func ): filter_index = kn.kn_kevent.filter * -1 if filter_index in self.event_types: filter_name = self.event_types[filter_index] else: continue try: ident = kn.kn_kevent.ident except exceptions.InvalidAddressException: continue context = self._parse_flags(filter_index, kn.kn_sfflags) yield (0, (pid, task_name, ident, filter_name, context))
[docs] def run(self): return renderers.TreeGrid( [ ("PID", int), ("Process", str), ("Ident", int), ("Filter", str), ("Context", str), ], self._generator(), )