Source code for volatility3.plugins.windows.vadinfo

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
from typing import Callable, List, Generator, Iterable, Type, Optional

from volatility3.framework import renderers, interfaces, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.plugins.windows import pslist

vollog = logging.getLogger(__name__)

# these are from WinNT.h
winnt_protections = {
    "PAGE_NOACCESS": 0x01,
    "PAGE_READONLY": 0x02,
    "PAGE_READWRITE": 0x04,
    "PAGE_WRITECOPY": 0x08,
    "PAGE_EXECUTE": 0x10,
    "PAGE_EXECUTE_READ": 0x20,
    "PAGE_EXECUTE_READWRITE": 0x40,
    "PAGE_EXECUTE_WRITECOPY": 0x80,
    "PAGE_GUARD": 0x100,
    "PAGE_NOCACHE": 0x200,
    "PAGE_WRITECOMBINE": 0x400,
    "PAGE_TARGETS_INVALID": 0x40000000,
}


[docs]class VadInfo(interfaces.plugins.PluginInterface): """Lists process memory ranges.""" _required_framework_version = (2, 4, 0) _version = (2, 0, 0) MAXSIZE_DEFAULT = 1024 * 1024 * 1024 # 1 Gb def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._protect_values = None
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: # Since we're calling the plugin, make sure we have the plugin's requirements return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), # TODO: Convert this to a ListRequirement so that people can filter on sets of ranges requirements.IntRequirement( name="address", description="Process virtual memory address to include " "(all other address ranges are excluded).", optional=True, ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), requirements.PluginRequirement( name="pslist", plugin=pslist.PsList, version=(2, 0, 0) ), requirements.BooleanRequirement( name="dump", description="Extract listed memory ranges", default=False, optional=True, ), requirements.IntRequirement( name="maxsize", description="Maximum size for dumped VAD sections " "(all the bigger sections will be ignored)", default=cls.MAXSIZE_DEFAULT, optional=True, ), ]
[docs] @classmethod def protect_values( cls, context: interfaces.context.ContextInterface, layer_name: str, symbol_table: str, ) -> Iterable[int]: """Look up the array of memory protection constants from the memory sample. These don't change often, but if they do in the future, then finding them dynamically versus hard-coding here will ensure we parse them properly. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate symbol_table: The name of the table containing the kernel symbols """ kvo = context.layers[layer_name].config["kernel_virtual_offset"] ntkrnlmp = context.module(symbol_table, layer_name=layer_name, offset=kvo) addr = ntkrnlmp.get_symbol("MmProtectToValue").address values = ntkrnlmp.object( object_type="array", offset=addr, subtype=ntkrnlmp.get_type("int"), count=32 ) return values # type: ignore
[docs] @classmethod def list_vads( cls, proc: interfaces.objects.ObjectInterface, filter_func: Callable[ [interfaces.objects.ObjectInterface], bool ] = lambda _: False, ) -> Generator[interfaces.objects.ObjectInterface, None, None]: """Lists the Virtual Address Descriptors of a specific process. Args: proc: _EPROCESS object from which to list the VADs filter_func: Function to take a virtual address descriptor value and return True if it should be filtered out Returns: A list of virtual address descriptors based on the process and filtered based on the filter function """ for vad in proc.get_vad_root().traverse(): if not filter_func(vad): yield vad
[docs] @classmethod def vad_dump( cls, context: interfaces.context.ContextInterface, proc: interfaces.objects.ObjectInterface, vad: interfaces.objects.ObjectInterface, open_method: Type[interfaces.plugins.FileHandlerInterface], maxsize: int = MAXSIZE_DEFAULT, ) -> Optional[interfaces.plugins.FileHandlerInterface]: """Extracts the complete data for Vad as a FileInterface. Args: context: The context to retrieve required elements (layers, symbol tables) from proc: an _EPROCESS instance vad: The suspected VAD to extract (ObjectInterface) open_method: class to provide context manager for opening the file maxsize: Max size of VAD section (default MAXSIZE_DEFAULT) Returns: An open FileInterface object containing the complete data for the process or None in the case of failure """ try: vad_start = vad.get_start() vad_end = vad.get_end() except AttributeError: vollog.debug("Unable to find the starting/ending VPN member") return None if 0 < maxsize < vad.get_size(): vollog.debug( f"Skip VAD dump {vad_start:#x}-{vad_end:#x} due to maxsize limit" ) return None proc_id = "Unknown" try: proc_id = proc.UniqueProcessId proc_layer_name = proc.add_process_layer() except exceptions.InvalidAddressException as excp: vollog.debug( "Process {}: invalid address {} in layer {}".format( proc_id, excp.invalid_address, excp.layer_name ) ) return None proc_layer = context.layers[proc_layer_name] file_name = f"pid.{proc_id}.vad.{vad_start:#x}-{vad_end:#x}.dmp" try: file_handle = open_method(file_name) chunk_size = 1024 * 1024 * 10 offset = vad_start vad_size = vad.get_size() while offset < vad_start + vad_size: to_read = min(chunk_size, vad_start + vad_size - offset) data = proc_layer.read(offset, to_read, pad=True) if not data: break file_handle.write(data) offset += to_read except Exception as excp: vollog.debug(f"Unable to dump VAD {file_name}: {excp}") return None return file_handle
def _generator(self, procs): kernel = self.context.modules[self.config["kernel"]] kernel_layer = self.context.layers[kernel.layer_name] def passthrough(_: interfaces.objects.ObjectInterface) -> bool: return False filter_func = passthrough if self.config.get("address", None) is not None: def filter_function(x: interfaces.objects.ObjectInterface) -> bool: return not (x.get_start() <= self.config["address"] <= x.get_end()) filter_func = filter_function for proc in procs: process_name = utility.array_to_string(proc.ImageFileName) for vad in self.list_vads(proc, filter_func=filter_func): file_output = "Disabled" if self.config["dump"]: file_handle = self.vad_dump( self.context, proc, vad, self.open, self.config["maxsize"] ) file_output = "Error outputting file" if file_handle: file_handle.close() file_output = file_handle.preferred_filename yield ( 0, ( proc.UniqueProcessId, process_name, format_hints.Hex(kernel_layer.canonicalize(vad.vol.offset)), format_hints.Hex(vad.get_start()), format_hints.Hex(vad.get_end()), vad.get_tag(), vad.get_protection( self.protect_values( self.context, kernel.layer_name, kernel.symbol_table_name, ), winnt_protections, ), vad.get_commit_charge(), vad.get_private_memory(), format_hints.Hex(vad.get_parent()), vad.get_file_name(), file_output, ), )
[docs] def run(self): kernel = self.context.modules[self.config["kernel"]] filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) return renderers.TreeGrid( [ ("PID", int), ("Process", str), ("Offset", format_hints.Hex), ("Start VPN", format_hints.Hex), ("End VPN", format_hints.Hex), ("Tag", str), ("Protection", str), ("CommitCharge", int), ("PrivateMemory", int), ("Parent", format_hints.Hex), ("File", str), ("File output", str), ], self._generator( pslist.PsList.list_processes( context=self.context, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name, filter_func=filter_func, ) ), )