# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from typing import Iterable, Generator, Tuple
from volatility3.framework import interfaces, symbols, exceptions
from volatility3.framework import renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.plugins.windows import pslist, vadinfo
vollog = logging.getLogger(__name__)
[docs]
class Malfind(interfaces.plugins.PluginInterface):
"""Lists process memory ranges that potentially contain injected code."""
_required_framework_version = (2, 22, 0)
_version = (1, 1, 0)
[docs]
@classmethod
def get_requirements(cls):
# Since we're calling the plugin, make sure we have the plugin's requirements
return [
requirements.ModuleRequirement(
name="kernel",
description="Windows kernel",
architectures=["Intel32", "Intel64"],
),
requirements.ListRequirement(
name="pid",
element_type=int,
description="Process IDs to include (all other processes are excluded)",
optional=True,
),
requirements.BooleanRequirement(
name="dump",
description="Extract injected VADs",
default=False,
optional=True,
),
requirements.VersionRequirement(
name="pslist", component=pslist.PsList, version=(3, 0, 0)
),
requirements.VersionRequirement(
name="vadinfo", component=vadinfo.VadInfo, version=(2, 0, 0)
),
]
[docs]
@classmethod
def is_vad_empty(cls, proc_layer, vad):
"""Check if a VAD region is either entirely unavailable due to paging,
entirely consisting of zeros, or a combination of the two. This helps
ignore false positives whose VAD flags match task._injection_filter
requirements but there's no data and thus not worth reporting it.
Args:
proc_layer: the process layer
vad: the MMVAD structure to test
Returns:
A boolean indicating whether a VAD is empty or not
"""
CHUNK_SIZE = 0x1000
all_zero_page = b"\x00" * CHUNK_SIZE
offset = 0
vad_length = vad.get_size()
while offset < vad_length:
next_addr = vad.get_start() + offset
if (
proc_layer.is_valid(next_addr, CHUNK_SIZE)
and proc_layer.read(next_addr, CHUNK_SIZE) != all_zero_page
):
return False
offset += CHUNK_SIZE
return True
[docs]
@classmethod
def list_injections(
cls,
context: interfaces.context.ContextInterface,
kernel_layer_name: str,
symbol_table: str,
proc: interfaces.objects.ObjectInterface,
) -> Iterable[Tuple[interfaces.objects.ObjectInterface, bytes]]:
for vad, data_object in cls.list_injection_sites(
context, kernel_layer_name, symbol_table, proc
):
yield (
vad,
data_object.context.layers[data_object.layer_name].read(
data_object.offset, data_object.length
),
)
[docs]
@classmethod
def list_injection_sites(
cls,
context: interfaces.context.ContextInterface,
kernel_layer_name: str,
symbol_table: str,
proc: interfaces.objects.ObjectInterface,
) -> Generator[
Tuple[interfaces.objects.ObjectInterface, renderers.LayerData],
None,
None,
]:
"""Generate memory regions for a process that may contain injected
code.
Args:
context: The context from which to retrieve required elements (layers, symbol tables)
kernel_layer_name: The name of the kernel layer from which to read the VAD protections
symbol_table: The name of the table containing the kernel symbols
proc: an _EPROCESS instance
Returns:
An iterable of VAD instances and the first 64 bytes of data contained in that region
"""
proc_id = "Unknown"
try:
proc_id = proc.UniqueProcessId
proc_layer_name = proc.add_process_layer()
except exceptions.InvalidAddressException as excp:
vollog.debug(
f"Process {proc_id}: invalid address {excp.invalid_address} in layer {excp.layer_name}"
)
return None
proc_layer = context.layers[proc_layer_name]
for vad in proc.get_vad_root().traverse():
protection_string = vad.get_protection(
vadinfo.VadInfo.protect_values(
context, kernel_layer_name, symbol_table
),
vadinfo.winnt_protections,
)
write_exec = "EXECUTE" in protection_string and "WRITE" in protection_string
dirty_page = None
if not write_exec:
"""
# Inspect "PAGE_EXECUTE_READ" VAD pages to detect
# non-writable memory regions having been injected
# using elevated WriteProcessMemory().
"""
if "EXECUTE" in protection_string:
for page in range(
vad.get_start(), vad.get_end(), proc_layer.page_size
):
try:
# If we have a dirty page in a non-writable "EXECUTE" region, it is suspicious.
if proc_layer.is_dirty(page):
dirty_page = page
break
except exceptions.InvalidAddressException:
# Abort as it is likely that other addresses in the same range will also fail.
break
if dirty_page is None:
continue
else:
continue
if (vad.get_private_memory() == 1 and vad.get_tag() == "VadS") or (
vad.get_private_memory() == 0
and protection_string != "PAGE_EXECUTE_WRITECOPY"
):
if cls.is_vad_empty(proc_layer, vad):
continue
if dirty_page is not None:
# Useful information to investigate the page content with volshell afterwards.
vollog.debug(
f"[proc_id {proc_id}] Found suspicious DIRTY + {protection_string} page at {hex(dirty_page)}",
)
start = vad.get_start()
length = 64
data = renderers.LayerData(
context=context,
layer_name=proc_layer_name,
offset=start,
length=length,
no_surrounding=True,
)
yield (vad, data)
def _generator(self, procs):
# Determine if we're on a 32 or 64 bit kernel
kernel = self.context.modules[self.config["kernel"]]
# Set refined criteria to know when to add to "Notes" column
refined_criteria = {
b"MZ": "MZ header",
b"\x55\x8b": "PE header",
b"\x55\x48": "Function prologue",
b"\x55\x89": "Function prologue",
}
is_32bit_arch = not symbols.symbol_table_is_64bit(
context=self.context, symbol_table_name=kernel.symbol_table_name
)
for proc in procs:
# By default, "Notes" column will be set to N/A
process_name = utility.array_to_string(proc.ImageFileName)
for vad, data_object in self.list_injection_sites(
self.context, kernel.layer_name, kernel.symbol_table_name, proc
):
notes = renderers.NotApplicableValue()
# Check for unique headers and update "Notes" column if criteria is met
data = data_object.context.layers[data_object.layer_name].read(
data_object.offset, data_object.length, True
)
if data[:2] in refined_criteria:
notes = refined_criteria[data[:2]]
# If we're on a 64 bit kernel, we may still need 32 bit disasm due to wow64
if is_32bit_arch or proc.get_is_wow64():
architecture = "intel"
else:
architecture = "intel64"
disasm = renderers.Disassembly(data, vad.get_start(), architecture)
file_output = "Disabled"
if self.config["dump"]:
file_output = "Error outputting to file"
try:
file_handle = vadinfo.VadInfo.vad_dump(
self.context, proc, vad, self.open
)
file_handle.close()
file_output = file_handle.preferred_filename
except (exceptions.InvalidAddressException, OverflowError) as excp:
vollog.debug(
f"Unable to dump PE with pid {proc.UniqueProcessId}.{vad.get_start():#x}: {excp}"
)
yield (
0,
(
proc.UniqueProcessId,
process_name,
format_hints.Hex(vad.get_start()),
format_hints.Hex(vad.get_end()),
vad.get_tag(),
vad.get_protection(
vadinfo.VadInfo.protect_values(
self.context,
kernel.layer_name,
kernel.symbol_table_name,
),
vadinfo.winnt_protections,
),
vad.get_commit_charge(),
vad.get_private_memory(),
file_output,
notes,
data_object,
disasm,
),
)
[docs]
def run(self):
filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))
return renderers.TreeGrid(
[
("PID", int),
("Process", str),
("Start VPN", format_hints.Hex),
("End VPN", format_hints.Hex),
("Tag", str),
("Protection", str),
("CommitCharge", int),
("PrivateMemory", int),
("File output", str),
("Notes", str),
("Hexdump", renderers.LayerData),
("Disasm", renderers.Disassembly),
],
self._generator(
pslist.PsList.list_processes(
context=self.context,
kernel_module_name=self.config["kernel"],
filter_func=filter_func,
)
),
)