Source code for volatility3.plugins.windows.malfind

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from typing import Iterable, Tuple

from volatility3.framework import interfaces, symbols, exceptions
from volatility3.framework import renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.plugins.windows import pslist, vadinfo

vollog = logging.getLogger(__name__)


[docs]class Malfind(interfaces.plugins.PluginInterface): """Lists process memory ranges that potentially contain injected code.""" _required_framework_version = (2, 4, 0)
[docs] @classmethod def get_requirements(cls): # Since we're calling the plugin, make sure we have the plugin's requirements return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.ListRequirement( name="pid", element_type=int, description="Process IDs to include (all other processes are excluded)", optional=True, ), requirements.BooleanRequirement( name="dump", description="Extract injected VADs", default=False, optional=True, ), requirements.VersionRequirement( name="pslist", component=pslist.PsList, version=(2, 0, 0) ), requirements.VersionRequirement( name="vadinfo", component=vadinfo.VadInfo, version=(2, 0, 0) ), ]
[docs] @classmethod def is_vad_empty(cls, proc_layer, vad): """Check if a VAD region is either entirely unavailable due to paging, entirely consisting of zeros, or a combination of the two. This helps ignore false positives whose VAD flags match task._injection_filter requirements but there's no data and thus not worth reporting it. Args: proc_layer: the process layer vad: the MMVAD structure to test Returns: A boolean indicating whether a vad is empty or not """ CHUNK_SIZE = 0x1000 all_zero_page = b"\x00" * CHUNK_SIZE offset = 0 vad_length = vad.get_size() while offset < vad_length: next_addr = vad.get_start() + offset if ( proc_layer.is_valid(next_addr, CHUNK_SIZE) and proc_layer.read(next_addr, CHUNK_SIZE) != all_zero_page ): return False offset += CHUNK_SIZE return True
[docs] @classmethod def list_injections( cls, context: interfaces.context.ContextInterface, kernel_layer_name: str, symbol_table: str, proc: interfaces.objects.ObjectInterface, ) -> Iterable[Tuple[interfaces.objects.ObjectInterface, bytes]]: """Generate memory regions for a process that may contain injected code. Args: context: The context to retrieve required elements (layers, symbol tables) from kernel_layer_name: The name of the kernel layer from which to read the VAD protections symbol_table: The name of the table containing the kernel symbols proc: an _EPROCESS instance Returns: An iterable of VAD instances and the first 64 bytes of data containing in that region """ proc_id = "Unknown" try: proc_id = proc.UniqueProcessId proc_layer_name = proc.add_process_layer() except exceptions.InvalidAddressException as excp: vollog.debug( "Process {}: invalid address {} in layer {}".format( proc_id, excp.invalid_address, excp.layer_name ) ) return None proc_layer = context.layers[proc_layer_name] for vad in proc.get_vad_root().traverse(): protection_string = vad.get_protection( vadinfo.VadInfo.protect_values( context, kernel_layer_name, symbol_table ), vadinfo.winnt_protections, ) write_exec = "EXECUTE" in protection_string and "WRITE" in protection_string # the write/exec check applies to everything if not write_exec: continue if (vad.get_private_memory() == 1 and vad.get_tag() == "VadS") or ( vad.get_private_memory() == 0 and protection_string != "PAGE_EXECUTE_WRITECOPY" ): if cls.is_vad_empty(proc_layer, vad): continue data = proc_layer.read(vad.get_start(), 64, pad=True) yield vad, data
def _generator(self, procs): # determine if we're on a 32 or 64 bit kernel kernel = self.context.modules[self.config["kernel"]] # set refined criteria to know when to add to "Notes" column refined_criteria = { b"MZ": "MZ header", b"\x55\x8B": "PE header", b"\x55\x48": "Function prologue", b"\x55\x89": "Function prologue", } is_32bit_arch = not symbols.symbol_table_is_64bit( self.context, kernel.symbol_table_name ) for proc in procs: # by default, "Notes" column will be set to N/A notes = renderers.NotApplicableValue() process_name = utility.array_to_string(proc.ImageFileName) for vad, data in self.list_injections( self.context, kernel.layer_name, kernel.symbol_table_name, proc ): # Check for unique headers and update "Notes" column if criteria is met if data[0:2] in refined_criteria: notes = refined_criteria[data[0:2]] # if we're on a 64 bit kernel, we may still need 32 bit disasm due to wow64 if is_32bit_arch or proc.get_is_wow64(): architecture = "intel" else: architecture = "intel64" disasm = interfaces.renderers.Disassembly( data, vad.get_start(), architecture ) file_output = "Disabled" if self.config["dump"]: file_output = "Error outputting to file" try: file_handle = vadinfo.VadInfo.vad_dump( self.context, proc, vad, self.open ) file_handle.close() file_output = file_handle.preferred_filename except (exceptions.InvalidAddressException, OverflowError) as excp: vollog.debug( "Unable to dump PE with pid {0}.{1:#x}: {2}".format( proc.UniqueProcessId, vad.get_start(), excp ) ) yield ( 0, ( proc.UniqueProcessId, process_name, format_hints.Hex(vad.get_start()), format_hints.Hex(vad.get_end()), vad.get_tag(), vad.get_protection( vadinfo.VadInfo.protect_values( self.context, kernel.layer_name, kernel.symbol_table_name, ), vadinfo.winnt_protections, ), vad.get_commit_charge(), vad.get_private_memory(), file_output, notes, format_hints.HexBytes(data), disasm, ), )
[docs] def run(self): filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) kernel = self.context.modules[self.config["kernel"]] return renderers.TreeGrid( [ ("PID", int), ("Process", str), ("Start VPN", format_hints.Hex), ("End VPN", format_hints.Hex), ("Tag", str), ("Protection", str), ("CommitCharge", int), ("PrivateMemory", int), ("File output", str), ("Notes", str), ("Hexdump", format_hints.HexBytes), ("Disasm", interfaces.renderers.Disassembly), ], self._generator( pslist.PsList.list_processes( context=self.context, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name, filter_func=filter_func, ) ), )