Source code for volatility3.plugins.windows.callbacks

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import contextlib
import logging
from typing import Dict, Iterable, List, Optional, Tuple, Union, cast

from volatility3.framework import (
    constants,
    exceptions,
    interfaces,
    objects,
    renderers,
    symbols,
)
from volatility3.framework.configuration import requirements
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.windows import versions
from volatility3.framework.symbols.windows.extensions import callbacks
from volatility3.plugins.windows import driverirp, handles, poolscanner, ssdt

vollog = logging.getLogger(__name__)


[docs] class Callbacks(interfaces.plugins.PluginInterface): """Lists kernel callbacks and notification routines.""" _required_framework_version = (2, 0, 0) _version = (3, 0, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="ssdt", component=ssdt.SSDT, version=(2, 0, 0) ), requirements.VersionRequirement( name="poolscanner", component=poolscanner.PoolScanner, version=(3, 0, 0) ), requirements.VersionRequirement( name="driverirp", component=driverirp.DriverIrp, version=(1, 0, 0) ), requirements.VersionRequirement( name="handles", component=handles.Handles, version=(4, 0, 0) ), ]
[docs] @classmethod def create_callback_scan_constraints( cls, context: interfaces.context.ContextInterface, symbol_table: str, is_vista_or_above: bool, ) -> List[poolscanner.PoolConstraint]: """Creates a list of Pool Tag Constraints for callback objects. Args: context: The context to retrieve required elements (layers, symbol tables) from symbol_table: The name of an existing symbol table containing the symbols / types is_vista_or_above: A boolean indicating whether the OS version is Vista or newer. Returns: The list containing the built constraints. """ constraints = cls._create_default_scan_constraints(context, symbol_table) if is_vista_or_above: constraints += cls._create_scan_constraints_vista_and_above(symbol_table) return constraints
@staticmethod def _create_default_scan_constraints( context: interfaces.context.ContextInterface, symbol_table: str ) -> List[poolscanner.PoolConstraint]: shutdown_packet_size = context.symbol_space.get_type( symbol_table + constants.BANG + "_SHUTDOWN_PACKET" ).size generic_callback_size = context.symbol_space.get_type( symbol_table + constants.BANG + "_GENERIC_CALLBACK" ).size notification_packet_size = context.symbol_space.get_type( symbol_table + constants.BANG + "_NOTIFICATION_PACKET" ).size return [ poolscanner.PoolConstraint( b"IoFs", type_name=symbol_table + constants.BANG + "_NOTIFICATION_PACKET", size=(notification_packet_size, None), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.PAGED | poolscanner.PoolType.FREE, ), poolscanner.PoolConstraint( b"IoSh", type_name=symbol_table + constants.BANG + "_SHUTDOWN_PACKET", size=(shutdown_packet_size, None), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.PAGED | poolscanner.PoolType.FREE, index=(0, 0), ), poolscanner.PoolConstraint( b"Cbrb", type_name=symbol_table + constants.BANG + "_GENERIC_CALLBACK", size=(generic_callback_size, None), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.PAGED | poolscanner.PoolType.FREE, ), ] @staticmethod def _create_scan_constraints_vista_and_above( symbol_table: str, ) -> List[poolscanner.PoolConstraint]: """Creates a list of Pool Tag Constraints for callback objects. Args: context: The context to retrieve required elements (layers, symbol tables) from symbol_table: The name of an existing symbol table containing the symbols / types Returns: The list containing the built constraints. """ return [ poolscanner.PoolConstraint( b"DbCb", type_name=symbol_table + constants.BANG + "_DBGPRINT_CALLBACK", size=(0x20, 0x40), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.PAGED | poolscanner.PoolType.FREE, ), poolscanner.PoolConstraint( b"Pnp9", type_name=symbol_table + constants.BANG + "_NOTIFY_ENTRY_HEADER", size=(0x30, None), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.PAGED | poolscanner.PoolType.FREE, index=(1, 1), ), poolscanner.PoolConstraint( b"PnpD", type_name=symbol_table + constants.BANG + "_NOTIFY_ENTRY_HEADER", size=(0x40, None), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.PAGED | poolscanner.PoolType.FREE, index=(1, 1), ), poolscanner.PoolConstraint( b"PnpC", type_name=symbol_table + constants.BANG + "_NOTIFY_ENTRY_HEADER", size=(0x38, None), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.PAGED | poolscanner.PoolType.FREE, index=(1, 1), ), ]
[docs] @classmethod def create_callback_symbol_table( cls, context: interfaces.context.ContextInterface, nt_symbol_table: str, config_path: str, ) -> str: """Creates a symbol table for kernel callback objects. Args: context: The context to retrieve required elements (layers, symbol tables) from nt_symbol_table: The name of the table containing the kernel symbols config_path: The config path where to find symbol files Returns: The name of the constructed symbol table """ native_types = context.symbol_space[nt_symbol_table].natives is_64bit = symbols.symbol_table_is_64bit( context=context, symbol_table_name=nt_symbol_table ) table_mapping = {"nt_symbols": nt_symbol_table} if is_64bit: symbol_filename = "callbacks-x64" else: symbol_filename = "callbacks-x86" return intermed.IntermediateSymbolTable.create( context, config_path, "windows", symbol_filename, class_types=callbacks.class_types_x86 if not is_64bit else None, native_types=native_types, table_mapping=table_mapping, )
[docs] @classmethod def scan( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, callback_symbol_table: str, ) -> Iterable[ Tuple[ Union[str, interfaces.renderers.BaseAbsentValue], int, Union[str, interfaces.renderers.BaseAbsentValue], ] ]: """Scans for callback objects using the poolscanner module and constraints. Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: Name of the module for the kernel callback_symbol_table: The name of the table containing the callback object symbols (_SHUTDOWN_PACKET etc.) Returns: A list of callback objects found by scanning the `layer_name` layer for callback pool signatures """ kernel = context.modules[kernel_module_name] is_vista_or_later = versions.is_vista_or_later( context=context, symbol_table=kernel.symbol_table_name ) type_map = handles.Handles.get_type_map( context=context, kernel_module_name=kernel_module_name ) constraints = cls.create_callback_scan_constraints( context, callback_symbol_table, is_vista_or_later ) for ( _constraint, mem_object, _header, ) in poolscanner.PoolScanner.generate_pool_scan( context, kernel_module_name, constraints ): try: if isinstance(mem_object, callbacks._SHUTDOWN_PACKET): if not mem_object.is_parseable(type_map): continue elif hasattr(mem_object, "is_valid"): if not mem_object.is_valid(): continue yield cls._process_scanned_callback(mem_object, type_map) except exceptions.InvalidAddressException: continue
@classmethod def _process_scanned_callback( cls, memory_object: objects.StructType, type_map: Dict[int, str] ) -> Tuple[ Union[str, interfaces.renderers.BaseAbsentValue], int, Union[str, interfaces.renderers.BaseAbsentValue], ]: symbol_table = memory_object.get_symbol_table_name() type_name = memory_object.vol.type_name if isinstance(memory_object, callbacks._SHUTDOWN_PACKET) or ( type_name == symbol_table + constants.BANG + "_SHUTDOWN_PACKET" ): callback_type = "IoRegisterShutdownNotification" try: driver = memory_object.DeviceObject.DriverObject index = driverirp.MAJOR_FUNCTIONS.index("IRP_MJ_SHUTDOWN") callback_address = driver.MajorFunction[index] details = driver.DriverName.String or renderers.UnparsableValue() except exceptions.InvalidAddressException: callback_address = memory_object.vol.offset details = renderers.NotApplicableValue() elif type_name == symbol_table + constants.BANG + "_NOTIFICATION_PACKET": callback_type = "IoRegisterFsRegistrationChange" callback_address = memory_object.NotificationRoutine details = renderers.NotApplicableValue() elif type_name == symbol_table + constants.BANG + "_NOTIFY_ENTRY_HEADER": driver = ( memory_object.DriverObject.dereference() if memory_object.DriverObject.is_readable() else None ) if driver: # Instantiate an object header for the driver name header = driver.get_object_header() try: if header.get_object_type(type_map) == "Driver": # Grab the object name details = header.NameInfo.Name.String else: details = renderers.NotApplicableValue() except exceptions.InvalidAddressException: details = renderers.UnreadableValue() else: details = renderers.UnreadableValue() callback_type = ( memory_object.EventCategory.description if memory_object.EventCategory.is_valid_choice else renderers.UnparsableValue() ) callback_address = memory_object.CallbackRoutine elif type_name == symbol_table + constants.BANG + "_GENERIC_CALLBACK": callback_type = "GenericKernelCallback" callback_address = memory_object.Callback details = renderers.NotApplicableValue() elif type_name == symbol_table + constants.BANG + "_DBGPRINT_CALLBACK": callback_type = "DbgSetDebugPrintCallback" callback_address = memory_object.Function details = renderers.NotApplicableValue() else: raise ValueError(f"Unexpected object type {type_name}") return ( callback_type, callback_address, ( details if not isinstance(details, interfaces.renderers.BaseAbsentValue) else details ), )
[docs] @classmethod def list_notify_routines( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, callback_table_name: str, ) -> Iterable[Tuple[str, int, Optional[str]]]: """Lists all kernel notification routines. Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the module of the kernel callback_table_name: The name of the table containing the callback symbols Yields: A name, location and optional detail string """ ntkrnlmp = context.modules[kernel_module_name] is_vista_or_later = versions.is_vista_or_later( context=context, symbol_table=ntkrnlmp.symbol_table_name ) full_type_name = callback_table_name + constants.BANG + "_GENERIC_CALLBACK" symbol_names = [ ("PspLoadImageNotifyRoutine", False), ("PspCreateThreadNotifyRoutine", True), ("PspCreateProcessNotifyRoutine", True), ] for symbol_name, extended_list in symbol_names: try: symbol_offset = ntkrnlmp.get_symbol(symbol_name).address except exceptions.SymbolError: vollog.debug(f"Cannot find {symbol_name}") continue if is_vista_or_later and extended_list: count = 64 else: count = 8 fast_refs = cast( List[objects.Pointer], ntkrnlmp.object( object_type="array", offset=symbol_offset, subtype=ntkrnlmp.get_type("_EX_FAST_REF"), count=count, ), ) for fast_ref in fast_refs: try: callback = fast_ref.dereference().cast(full_type_name) except exceptions.InvalidAddressException: continue if callback.Callback != 0: yield symbol_name, callback.Callback, None
@classmethod def _list_registry_callbacks_legacy( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, callback_table_name: str, ) -> Iterable[Tuple[str, int, None]]: """ Lists all registry callbacks from the old format via the CmpCallBackVector. """ ntkrnlmp = context.modules[kernel_module_name] full_type_name = ( callback_table_name + constants.BANG + "_EX_CALLBACK_ROUTINE_BLOCK" ) symbol_offset = ntkrnlmp.get_symbol("CmpCallBackVector").address symbol_count_offset = ntkrnlmp.get_symbol("CmpCallBackCount").address callback_count = ntkrnlmp.object( object_type="unsigned int", offset=symbol_count_offset ) if callback_count == 0: return None fast_refs = cast( List[objects.Pointer], ntkrnlmp.object( object_type="array", offset=symbol_offset, subtype=ntkrnlmp.get_type("_EX_FAST_REF"), count=callback_count, ), ) for fast_ref in fast_refs: try: callback = fast_ref.dereference().cast(full_type_name) except exceptions.InvalidAddressException: continue if callback.Function != 0: yield "CmRegisterCallback", callback.Function, None @classmethod def _list_registry_callbacks_new( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, callback_table_name: str, ) -> Iterable[Tuple[str, int, Optional[str]]]: """ Lists all registry callbacks via the CallbackListHead. """ ntkrnlmp = context.modules[kernel_module_name] full_type_name = callback_table_name + constants.BANG + "_CM_CALLBACK_ENTRY" symbol_offset = ntkrnlmp.get_symbol("CallbackListHead").address symbol_count_offset = ntkrnlmp.get_symbol("CmpCallBackCount").address callback_count = ntkrnlmp.object( object_type="unsigned int", offset=symbol_count_offset ) if callback_count == 0: return None callback_list = ntkrnlmp.object(object_type="_LIST_ENTRY", offset=symbol_offset) for callback in callback_list.to_list(full_type_name, "Link"): altitude = None with contextlib.suppress(exceptions.InvalidAddressException): altitude = callback.Altitude.String yield "CmRegisterCallbackEx", callback.Function, f"Altitude: {altitude}"
[docs] @classmethod def list_registry_callbacks( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, callback_table_name: str, ) -> Iterable[Tuple[str, int, Optional[str]]]: """Lists all registry callbacks. Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the module of the kernel callback_table_name: The name of the table containing the callback symbols Yields: A name, location and optional detail string """ ntkrnlmp = context.modules[kernel_module_name] if ntkrnlmp.has_symbol("CmpCallBackVector") and ntkrnlmp.has_symbol( "CmpCallBackCount" ): yield from cls._list_registry_callbacks_legacy( context, kernel_module_name, callback_table_name ) elif ntkrnlmp.has_symbol("CallbackListHead") and ntkrnlmp.has_symbol( "CmpCallBackCount" ): yield from cls._list_registry_callbacks_new( context, kernel_module_name, callback_table_name ) else: symbols_to_check = [ "CmpCallBackVector", "CmpCallBackCount", "CallbackListHead", ] vollog.debug("Failed to get registry callbacks!") for symbol_name in symbols_to_check: symbol_status = "does not exist" if ntkrnlmp.has_symbol(symbol_name): symbol_status = "exists" vollog.debug(f"symbol {symbol_name} {symbol_status}.")
[docs] @classmethod def list_bugcheck_reason_callbacks( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, callback_table_name: str, ) -> Iterable[ Tuple[ str, int, interfaces.renderers.BaseAbsentValue, ] ]: """Lists all kernel bugcheck reason callbacks. Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the module of the kernel callback_table_name: The name of the table containing the callback symbols Yields: A name, location and optional detail string """ ntkrnlmp = context.modules[kernel_module_name] try: list_offset = ntkrnlmp.get_symbol( "KeBugCheckReasonCallbackListHead" ).address except exceptions.SymbolError: vollog.debug("Cannot find KeBugCheckReasonCallbackListHead") return None full_type_name = ( callback_table_name + constants.BANG + "_KBUGCHECK_REASON_CALLBACK_RECORD" ) callback_record = context.object( object_type=full_type_name, offset=ntkrnlmp.offset + list_offset, layer_name=ntkrnlmp.layer_name, ) for callback in callback_record.Entry: if not context.layers[ntkrnlmp.layer_name].is_valid( callback.CallbackRoutine, 64 ): continue try: component: Union[ interfaces.renderers.BaseAbsentValue, interfaces.objects.ObjectInterface, ] = ntkrnlmp.object( "string", absolute=True, offset=callback.Component, max_length=64, errors="replace", ) except exceptions.InvalidAddressException: component = renderers.UnreadableValue() yield ( "KeBugCheckReasonCallbackListHead", callback.CallbackRoutine, component, )
[docs] @classmethod def list_bugcheck_callbacks( cls, context: interfaces.context.ContextInterface, kernel_module_name: str, callback_table_name: str, ) -> Iterable[ Tuple[ str, int, Union[interfaces.objects.ObjectInterface, renderers.UnreadableValue], ] ]: """Lists all kernel bugcheck callbacks. Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_module_name: The name of the module of the kernel callback_table_name: The name of the table containing the callback symbols Yields: A name, location and optional detail string """ ntkrnlmp = context.modules[kernel_module_name] try: list_offset = ntkrnlmp.get_symbol("KeBugCheckCallbackListHead").address except exceptions.SymbolError: vollog.debug("Cannot find KeBugCheckCallbackListHead") return None full_type_name = ( callback_table_name + constants.BANG + "_KBUGCHECK_CALLBACK_RECORD" ) callback_record = context.object( full_type_name, offset=ntkrnlmp.offset + list_offset, layer_name=ntkrnlmp.layer_name, ) for callback in callback_record.Entry: if not context.layers[ntkrnlmp.layer_name].is_valid( callback.CallbackRoutine, 64 ): continue try: component = ntkrnlmp.object( "string", offset=callback.Component, max_length=64, errors="replace", ) except exceptions.InvalidAddressException: component = renderers.UnreadableValue() yield "KeBugCheckCallbackListHead", callback.CallbackRoutine, component
def _generator(self): kernel = self.context.modules[self.config["kernel"]] callback_symbol_table = self.create_callback_symbol_table( self.context, kernel.symbol_table_name, self.config_path ) collection = ssdt.SSDT.build_module_collection( context=self.context, kernel_module_name=self.config["kernel"], ) callback_methods = ( self.list_notify_routines, self.list_bugcheck_callbacks, self.list_bugcheck_reason_callbacks, self.list_registry_callbacks, self.scan, ) for callback_method in callback_methods: for callback_type, callback_address, callback_detail in callback_method( self.context, self.config["kernel"], callback_symbol_table, ): if callback_detail is None: detail = renderers.NotApplicableValue() else: detail = callback_detail module_symbols = list( collection.get_module_symbols_by_absolute_location(callback_address) ) if module_symbols: for module_name, symbol_generator in module_symbols: symbols_found = False # we might have multiple symbols pointing to the same location for symbol in symbol_generator: symbols_found = True yield ( 0, ( callback_type, format_hints.Hex(callback_address), module_name, symbol.split(constants.BANG)[1], detail, ), ) # no symbols, but we at least can report the module name if not symbols_found: yield ( 0, ( callback_type, format_hints.Hex(callback_address), module_name, renderers.NotAvailableValue(), detail, ), ) else: # no module was found at the absolute location yield ( 0, ( callback_type, format_hints.Hex(callback_address), renderers.NotAvailableValue(), renderers.NotAvailableValue(), detail, ), )
[docs] def run(self): return renderers.TreeGrid( [ ("Type", str), ("Callback", format_hints.Hex), ("Module", str), ("Symbol", str), ("Detail", str), ], self._generator(), )