Source code for volatility3.plugins.windows.malware.malfind

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from typing import Iterable, Generator, Tuple

from volatility3.framework import interfaces, symbols, exceptions
from volatility3.framework import renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.plugins.windows import pslist, vadinfo

vollog = logging.getLogger(__name__)


[docs] class Malfind(interfaces.plugins.PluginInterface): """Lists process memory ranges that potentially contain injected code.""" _required_framework_version = (2, 22, 0) _version = (1, 1, 0)
[docs] @classmethod def get_requirements(cls): # Since we're calling the plugin, make sure we have the plugin's requirements return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.ListRequirement( name="pid", element_type=int, description="Process IDs to include (all other processes are excluded)", optional=True, ), requirements.BooleanRequirement( name="dump", description="Extract injected VADs", default=False, optional=True, ), requirements.VersionRequirement( name="pslist", component=pslist.PsList, version=(3, 0, 0) ), requirements.VersionRequirement( name="vadinfo", component=vadinfo.VadInfo, version=(2, 0, 0) ), ]
[docs] @classmethod def is_vad_empty(cls, proc_layer, vad): """Check if a VAD region is either entirely unavailable due to paging, entirely consisting of zeros, or a combination of the two. This helps ignore false positives whose VAD flags match task._injection_filter requirements but there's no data and thus not worth reporting it. Args: proc_layer: the process layer vad: the MMVAD structure to test Returns: A boolean indicating whether a VAD is empty or not """ CHUNK_SIZE = 0x1000 all_zero_page = b"\x00" * CHUNK_SIZE offset = 0 vad_length = vad.get_size() while offset < vad_length: next_addr = vad.get_start() + offset if ( proc_layer.is_valid(next_addr, CHUNK_SIZE) and proc_layer.read(next_addr, CHUNK_SIZE) != all_zero_page ): return False offset += CHUNK_SIZE return True
[docs] @classmethod def list_injections( cls, context: interfaces.context.ContextInterface, kernel_layer_name: str, symbol_table: str, proc: interfaces.objects.ObjectInterface, ) -> Iterable[Tuple[interfaces.objects.ObjectInterface, bytes]]: for vad, data_object in cls.list_injection_sites( context, kernel_layer_name, symbol_table, proc ): yield ( vad, data_object.context.layers[data_object.layer_name].read( data_object.offset, data_object.length ), )
[docs] @classmethod def list_injection_sites( cls, context: interfaces.context.ContextInterface, kernel_layer_name: str, symbol_table: str, proc: interfaces.objects.ObjectInterface, ) -> Generator[ Tuple[interfaces.objects.ObjectInterface, renderers.LayerData], None, None, ]: """Generate memory regions for a process that may contain injected code. Args: context: The context from which to retrieve required elements (layers, symbol tables) kernel_layer_name: The name of the kernel layer from which to read the VAD protections symbol_table: The name of the table containing the kernel symbols proc: an _EPROCESS instance Returns: An iterable of VAD instances and the first 64 bytes of data contained in that region """ proc_id = "Unknown" try: proc_id = proc.UniqueProcessId proc_layer_name = proc.add_process_layer() except exceptions.InvalidAddressException as excp: vollog.debug( f"Process {proc_id}: invalid address {excp.invalid_address} in layer {excp.layer_name}" ) return None proc_layer = context.layers[proc_layer_name] for vad in proc.get_vad_root().traverse(): protection_string = vad.get_protection( vadinfo.VadInfo.protect_values( context, kernel_layer_name, symbol_table ), vadinfo.winnt_protections, ) write_exec = "EXECUTE" in protection_string and "WRITE" in protection_string dirty_page = None if not write_exec: """ # Inspect "PAGE_EXECUTE_READ" VAD pages to detect # non-writable memory regions having been injected # using elevated WriteProcessMemory(). """ if "EXECUTE" in protection_string: for page in range( vad.get_start(), vad.get_end(), proc_layer.page_size ): try: # If we have a dirty page in a non-writable "EXECUTE" region, it is suspicious. if proc_layer.is_dirty(page): dirty_page = page break except exceptions.InvalidAddressException: # Abort as it is likely that other addresses in the same range will also fail. break if dirty_page is None: continue else: continue if (vad.get_private_memory() == 1 and vad.get_tag() == "VadS") or ( vad.get_private_memory() == 0 and protection_string != "PAGE_EXECUTE_WRITECOPY" ): if cls.is_vad_empty(proc_layer, vad): continue if dirty_page is not None: # Useful information to investigate the page content with volshell afterwards. vollog.debug( f"[proc_id {proc_id}] Found suspicious DIRTY + {protection_string} page at {hex(dirty_page)}", ) start = vad.get_start() length = 64 data = renderers.LayerData( context=context, layer_name=proc_layer_name, offset=start, length=length, no_surrounding=True, ) yield (vad, data)
def _generator(self, procs): # Determine if we're on a 32 or 64 bit kernel kernel = self.context.modules[self.config["kernel"]] # Set refined criteria to know when to add to "Notes" column refined_criteria = { b"MZ": "MZ header", b"\x55\x8b": "PE header", b"\x55\x48": "Function prologue", b"\x55\x89": "Function prologue", } is_32bit_arch = not symbols.symbol_table_is_64bit( context=self.context, symbol_table_name=kernel.symbol_table_name ) for proc in procs: # By default, "Notes" column will be set to N/A process_name = utility.array_to_string(proc.ImageFileName) for vad, data_object in self.list_injection_sites( self.context, kernel.layer_name, kernel.symbol_table_name, proc ): notes = renderers.NotApplicableValue() # Check for unique headers and update "Notes" column if criteria is met data = data_object.context.layers[data_object.layer_name].read( data_object.offset, data_object.length, True ) if data[:2] in refined_criteria: notes = refined_criteria[data[:2]] # If we're on a 64 bit kernel, we may still need 32 bit disasm due to wow64 if is_32bit_arch or proc.get_is_wow64(): architecture = "intel" else: architecture = "intel64" disasm = renderers.Disassembly(data, vad.get_start(), architecture) file_output = "Disabled" if self.config["dump"]: file_output = "Error outputting to file" try: file_handle = vadinfo.VadInfo.vad_dump( self.context, proc, vad, self.open ) file_handle.close() file_output = file_handle.preferred_filename except (exceptions.InvalidAddressException, OverflowError) as excp: vollog.debug( f"Unable to dump PE with pid {proc.UniqueProcessId}.{vad.get_start():#x}: {excp}" ) yield ( 0, ( proc.UniqueProcessId, process_name, format_hints.Hex(vad.get_start()), format_hints.Hex(vad.get_end()), vad.get_tag(), vad.get_protection( vadinfo.VadInfo.protect_values( self.context, kernel.layer_name, kernel.symbol_table_name, ), vadinfo.winnt_protections, ), vad.get_commit_charge(), vad.get_private_memory(), file_output, notes, data_object, disasm, ), )
[docs] def run(self): filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) return renderers.TreeGrid( [ ("PID", int), ("Process", str), ("Start VPN", format_hints.Hex), ("End VPN", format_hints.Hex), ("Tag", str), ("Protection", str), ("CommitCharge", int), ("PrivateMemory", int), ("File output", str), ("Notes", str), ("Hexdump", renderers.LayerData), ("Disasm", renderers.Disassembly), ], self._generator( pslist.PsList.list_processes( context=self.context, kernel_module_name=self.config["kernel"], filter_func=filter_func, ) ), )