Source code for

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at
import logging
from typing import Iterable, Tuple

from volatility3.framework import interfaces, symbols, exceptions
from volatility3.framework import renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from import pslist, vadinfo

vollog = logging.getLogger(__name__)

[docs]class Malfind(interfaces.plugins.PluginInterface): """Lists process memory ranges that potentially contain injected code.""" _required_framework_version = (2, 4, 0)
[docs] @classmethod def get_requirements(cls): # Since we're calling the plugin, make sure we have the plugin's requirements return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.ListRequirement( name="pid", element_type=int, description="Process IDs to include (all other processes are excluded)", optional=True, ), requirements.BooleanRequirement( name="dump", description="Extract injected VADs", default=False, optional=True, ), requirements.VersionRequirement( name="pslist", component=pslist.PsList, version=(2, 0, 0) ), requirements.VersionRequirement( name="vadinfo", component=vadinfo.VadInfo, version=(2, 0, 0) ), ]
[docs] @classmethod def is_vad_empty(cls, proc_layer, vad): """Check if a VAD region is either entirely unavailable due to paging, entirely consisting of zeros, or a combination of the two. This helps ignore false positives whose VAD flags match task._injection_filter requirements but there's no data and thus not worth reporting it. Args: proc_layer: the process layer vad: the MMVAD structure to test Returns: A boolean indicating whether a vad is empty or not """ CHUNK_SIZE = 0x1000 all_zero_page = b"\x00" * CHUNK_SIZE offset = 0 vad_length = vad.get_size() while offset < vad_length: next_addr = vad.get_start() + offset if ( proc_layer.is_valid(next_addr, CHUNK_SIZE) and, CHUNK_SIZE) != all_zero_page ): return False offset += CHUNK_SIZE return True
[docs] @classmethod def list_injections( cls, context: interfaces.context.ContextInterface, kernel_layer_name: str, symbol_table: str, proc: interfaces.objects.ObjectInterface, ) -> Iterable[Tuple[interfaces.objects.ObjectInterface, bytes]]: """Generate memory regions for a process that may contain injected code. Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_layer_name: The name of the kernel layer from which to read the VAD protections symbol_table: The name of the table containing the kernel symbols proc: an _EPROCESS instance Returns: An iterable of VAD instances and the first 64 bytes of data containing in that region """ proc_id = "Unknown" try: proc_id = proc.UniqueProcessId proc_layer_name = proc.add_process_layer() except exceptions.InvalidAddressException as excp: vollog.debug( "Process {}: invalid address {} in layer {}".format( proc_id, excp.invalid_address, excp.layer_name ) ) return None proc_layer = context.layers[proc_layer_name] for vad in proc.get_vad_root().traverse(): protection_string = vad.get_protection( vadinfo.VadInfo.protect_values( context, kernel_layer_name, symbol_table ), vadinfo.winnt_protections, ) write_exec = "EXECUTE" in protection_string and "WRITE" in protection_string # the write/exec check applies to everything if not write_exec: continue if (vad.get_private_memory() == 1 and vad.get_tag() == "VadS") or ( vad.get_private_memory() == 0 and protection_string != "PAGE_EXECUTE_WRITECOPY" ): if cls.is_vad_empty(proc_layer, vad): continue data =, 64, pad=True) yield vad, data
def _generator(self, procs): # determine if we're on a 32 or 64 bit kernel kernel = self.context.modules[self.config["kernel"]] # set refined criteria to know when to add to "Notes" column refined_criteria = { b"MZ": "MZ header", b"\x55\x8B": "PE header", b"\x55\x48": "Function prologue", b"\x55\x89": "Function prologue", } is_32bit_arch = not symbols.symbol_table_is_64bit( self.context, kernel.symbol_table_name ) for proc in procs: # by default, "Notes" column will be set to N/A notes = renderers.NotApplicableValue() process_name = utility.array_to_string(proc.ImageFileName) for vad, data in self.list_injections( self.context, kernel.layer_name, kernel.symbol_table_name, proc ): # Check for unique headers and update "Notes" column if criteria is met if data[0:2] in refined_criteria: notes = refined_criteria[data[0:2]] # if we're on a 64 bit kernel, we may still need 32 bit disasm due to wow64 if is_32bit_arch or proc.get_is_wow64(): architecture = "intel" else: architecture = "intel64" disasm = interfaces.renderers.Disassembly( data, vad.get_start(), architecture ) file_output = "Disabled" if self.config["dump"]: file_output = "Error outputting to file" try: file_handle = vadinfo.VadInfo.vad_dump( self.context, proc, vad, ) file_handle.close() file_output = file_handle.preferred_filename except (exceptions.InvalidAddressException, OverflowError) as excp: vollog.debug( "Unable to dump PE with pid {0}.{1:#x}: {2}".format( proc.UniqueProcessId, vad.get_start(), excp ) ) yield ( 0, ( proc.UniqueProcessId, process_name, format_hints.Hex(vad.get_start()), format_hints.Hex(vad.get_end()), vad.get_tag(), vad.get_protection( vadinfo.VadInfo.protect_values( self.context, kernel.layer_name, kernel.symbol_table_name, ), vadinfo.winnt_protections, ), vad.get_commit_charge(), vad.get_private_memory(), file_output, notes, format_hints.HexBytes(data), disasm, ), )
[docs] def run(self): filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) kernel = self.context.modules[self.config["kernel"]] return renderers.TreeGrid( [ ("PID", int), ("Process", str), ("Start VPN", format_hints.Hex), ("End VPN", format_hints.Hex), ("Tag", str), ("Protection", str), ("CommitCharge", int), ("PrivateMemory", int), ("File output", str), ("Notes", str), ("Hexdump", format_hints.HexBytes), ("Disasm", interfaces.renderers.Disassembly), ], self._generator( pslist.PsList.list_processes( context=self.context, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name, filter_func=filter_func, ) ), )