Source code for

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at

import logging
from typing import Callable, List, Generator, Iterable, Type, Optional

from volatility3.framework import renderers, interfaces, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from import pslist

vollog = logging.getLogger(__name__)

# these are from WinNT.h
winnt_protections = {
    "PAGE_NOACCESS": 0x01,
    "PAGE_READONLY": 0x02,
    "PAGE_READWRITE": 0x04,
    "PAGE_WRITECOPY": 0x08,
    "PAGE_EXECUTE": 0x10,
    "PAGE_EXECUTE_READ": 0x20,
    "PAGE_GUARD": 0x100,
    "PAGE_NOCACHE": 0x200,
    "PAGE_TARGETS_INVALID": 0x40000000,

[docs]class VadInfo(interfaces.plugins.PluginInterface): """Lists process memory ranges.""" _required_framework_version = (2, 4, 0) _version = (2, 0, 0) MAXSIZE_DEFAULT = 1024 * 1024 * 1024 # 1 Gb def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._protect_values = None
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: # Since we're calling the plugin, make sure we have the plugin's requirements return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), # TODO: Convert this to a ListRequirement so that people can filter on sets of ranges requirements.IntRequirement( name="address", description="Process virtual memory address to include " "(all other address ranges are excluded).", optional=True, ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), requirements.PluginRequirement( name="pslist", plugin=pslist.PsList, version=(2, 0, 0) ), requirements.BooleanRequirement( name="dump", description="Extract listed memory ranges", default=False, optional=True, ), requirements.IntRequirement( name="maxsize", description="Maximum size for dumped VAD sections " "(all the bigger sections will be ignored)", default=cls.MAXSIZE_DEFAULT, optional=True, ), ]
[docs] @classmethod def protect_values( cls, context: interfaces.context.ContextInterface, layer_name: str, symbol_table: str, ) -> Iterable[int]: """Look up the array of memory protection constants from the memory sample. These don't change often, but if they do in the future, then finding them dynamically versus hard-coding here will ensure we parse them properly. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate symbol_table: The name of the table containing the kernel symbols """ kvo = context.layers[layer_name].config["kernel_virtual_offset"] ntkrnlmp = context.module(symbol_table, layer_name=layer_name, offset=kvo) addr = ntkrnlmp.get_symbol("MmProtectToValue").address values = ntkrnlmp.object( object_type="array", offset=addr, subtype=ntkrnlmp.get_type("int"), count=32 ) return values # type: ignore
[docs] @classmethod def list_vads( cls, proc: interfaces.objects.ObjectInterface, filter_func: Callable[ [interfaces.objects.ObjectInterface], bool ] = lambda _: False, ) -> Generator[interfaces.objects.ObjectInterface, None, None]: """Lists the Virtual Address Descriptors of a specific process. Args: proc: _EPROCESS object from which to list the VADs filter_func: Function to take a virtual address descriptor value and return True if it should be filtered out Returns: A list of virtual address descriptors based on the process and filtered based on the filter function """ for vad in proc.get_vad_root().traverse(): if not filter_func(vad): yield vad
[docs] @classmethod def vad_dump( cls, context: interfaces.context.ContextInterface, proc: interfaces.objects.ObjectInterface, vad: interfaces.objects.ObjectInterface, open_method: Type[interfaces.plugins.FileHandlerInterface], maxsize: int = MAXSIZE_DEFAULT, ) -> Optional[interfaces.plugins.FileHandlerInterface]: """Extracts the complete data for Vad as a FileInterface. Args: context: The context to retrieve required elements (layers, symbol tables) from proc: an _EPROCESS instance vad: The suspected VAD to extract (ObjectInterface) open_method: class to provide context manager for opening the file maxsize: Max size of VAD section (default MAXSIZE_DEFAULT) Returns: An open FileInterface object containing the complete data for the process or None in the case of failure """ try: vad_start = vad.get_start() vad_end = vad.get_end() except AttributeError: vollog.debug("Unable to find the starting/ending VPN member") return None if 0 < maxsize < vad.get_size(): vollog.debug( f"Skip VAD dump {vad_start:#x}-{vad_end:#x} due to maxsize limit" ) return None proc_id = "Unknown" try: proc_id = proc.UniqueProcessId proc_layer_name = proc.add_process_layer() except exceptions.InvalidAddressException as excp: vollog.debug( "Process {}: invalid address {} in layer {}".format( proc_id, excp.invalid_address, excp.layer_name ) ) return None proc_layer = context.layers[proc_layer_name] file_name = f"pid.{proc_id}.vad.{vad_start:#x}-{vad_end:#x}.dmp" try: file_handle = open_method(file_name) chunk_size = 1024 * 1024 * 10 offset = vad_start vad_size = vad.get_size() while offset < vad_start + vad_size: to_read = min(chunk_size, vad_start + vad_size - offset) data =, to_read, pad=True) if not data: break file_handle.write(data) offset += to_read except Exception as excp: vollog.debug(f"Unable to dump VAD {file_name}: {excp}") return None return file_handle
def _generator(self, procs): kernel = self.context.modules[self.config["kernel"]] kernel_layer = self.context.layers[kernel.layer_name] def passthrough(_: interfaces.objects.ObjectInterface) -> bool: return False filter_func = passthrough if self.config.get("address", None) is not None: def filter_function(x: interfaces.objects.ObjectInterface) -> bool: return not (x.get_start() <= self.config["address"] <= x.get_end()) filter_func = filter_function for proc in procs: process_name = utility.array_to_string(proc.ImageFileName) for vad in self.list_vads(proc, filter_func=filter_func): file_output = "Disabled" if self.config["dump"]: file_handle = self.vad_dump( self.context, proc, vad,, self.config["maxsize"] ) file_output = "Error outputting file" if file_handle: file_handle.close() file_output = file_handle.preferred_filename yield ( 0, ( proc.UniqueProcessId, process_name, format_hints.Hex(kernel_layer.canonicalize(vad.vol.offset)), format_hints.Hex(vad.get_start()), format_hints.Hex(vad.get_end()), vad.get_tag(), vad.get_protection( self.protect_values( self.context, kernel.layer_name, kernel.symbol_table_name, ), winnt_protections, ), vad.get_commit_charge(), vad.get_private_memory(), format_hints.Hex(vad.get_parent()), vad.get_file_name(), file_output, ), )
[docs] def run(self): kernel = self.context.modules[self.config["kernel"]] filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) return renderers.TreeGrid( [ ("PID", int), ("Process", str), ("Offset", format_hints.Hex), ("Start VPN", format_hints.Hex), ("End VPN", format_hints.Hex), ("Tag", str), ("Protection", str), ("CommitCharge", int), ("PrivateMemory", int), ("Parent", format_hints.Hex), ("File", str), ("File output", str), ], self._generator( pslist.PsList.list_processes( context=self.context, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name, filter_func=filter_func, ) ), )