# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from typing import Iterable, List, Tuple, Optional
from enum import IntEnum
from volatility3.framework import exceptions, interfaces, renderers, symbols
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.plugins.linux import pslist, proc
vollog = logging.getLogger(__name__)
[docs]
class MaliciousFlags(IntEnum):
RWX = 0
RX = 1
X_DIRTY = 2
[docs]
class Malfind(interfaces.plugins.PluginInterface):
"""Lists process memory ranges that potentially contain injected code."""
_required_framework_version = (2, 0, 0)
_version = (1, 1, 0)
MAX_DUMPSIZE_DEFAULT = 1024 * 1024 * 1024 # 1 Gb
[docs]
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="pslist", component=pslist.PsList, version=(4, 0, 0)
),
requirements.VersionRequirement(
name="proc", component=proc.Maps, version=(1, 0, 3)
),
requirements.ListRequirement(
name="pid",
description="Filter on specific process IDs",
element_type=int,
optional=True,
),
requirements.IntRequirement(
name="hexdump-size",
description="Amount of bytes to show for each region/page found - Default 64 bytes",
optional=True,
default=64,
),
requirements.BooleanRequirement(
name="show-all-dirty-pages",
description="Show all dirty pages in a VMA if at least one dirty page is found - Default off",
optional=True,
default=False,
),
requirements.BooleanRequirement(
name="dump-regions",
description="Dump each suspicious memory region in output. All dirty pages will be dumped if --show-all-dirty-pages is enabled.",
optional=True,
default=False,
),
requirements.IntRequirement(
name="dump-maxsize",
description="Maximum size for dumped memory regions "
"(all the bigger regions will be ignored) - Default 1 GB",
default=cls.MAX_DUMPSIZE_DEFAULT,
optional=True,
),
]
def _get_dirty_pages(self, proc_layer, vma) -> Iterable[Tuple[int, int]]:
"""Get dirty pages inside the specified VMA.
Yields:
page address and page size
"""
page_addr = vma.vm_start
while page_addr < vma.vm_end:
try:
# We don't want to use the layer's page size by default to handle
# large pages (PUD, PMD...) correctly.
_, page_size, _ = proc_layer._translate(page_addr)
if proc_layer.is_dirty(page_addr):
yield page_addr, page_size
except (
AttributeError,
exceptions.PagedInvalidAddressException,
exceptions.InvalidAddressException,
):
page_size = proc_layer.page_size
page_addr += page_size
def _is_suspicious(self, proc_layer, vma) -> Optional[Tuple[int, MaliciousFlags]]:
"""Determine if a VMA is suspicious based on any of the following criterias:
- RWX
- RX
- X + DIRTY
Returns:
(suspicious page address, suspicious page size, malicious flag) or None
"""
flags_str = vma.get_protection()
if flags_str == "rwx":
return vma.vm_start, vma.vm_end - vma.vm_start, MaliciousFlags.RWX
elif flags_str == "r-x" and vma.vm_file.dereference().vol.offset == 0:
return vma.vm_start, vma.vm_end - vma.vm_start, MaliciousFlags.RX
elif "x" in flags_str:
for page_addr, page_size in self._get_dirty_pages(proc_layer, vma):
vollog.warning(
f"Found dirty page at {page_addr:#x} inside executable region {vma.vm_start:#x}-{vma.vm_end:#x}!"
)
# We do not attempt to find other dirty+exec pages once we have found one
return page_addr, page_size, MaliciousFlags.X_DIRTY
return None
def _list_injections(
self, task
) -> Iterable[
Tuple[interfaces.objects.ObjectInterface, Optional[str], bytes, int, int]
]:
"""Generate memory regions for a process that may contain injected
code."""
proc_layer_name = task.add_process_layer()
if not proc_layer_name:
return
proc_layer = self.context.layers[proc_layer_name]
data_size = self.config["hexdump-size"]
# Dumping page defaults to off, as in case a whole r-xp region is dirty
# this would likely dump 1000's of pages which might not always be wise nor necessary
for vma in task.mm.get_vma_iter():
vma_name = vma.get_name(self.context, task)
vollog.debug(
f"Injections : processing PID {task.pid} : VMA {vma_name} : {hex(vma.vm_start)}-{hex(vma.vm_end)}"
)
if vma_name == "[vdso]":
continue
suspicious_result = self._is_suspicious(proc_layer, vma)
if suspicious_result is None:
continue
region_start, region_size, suspicious_flag = suspicious_result
# If _is_suspicious returns MaliciousFlags.X_DIRTY, this means at least one page
# in the region is dirty. If --show-all-dirty-pages is set, then we show
# all the dirty pages.
if (
suspicious_flag == MaliciousFlags.X_DIRTY
and self.config["show-all-dirty-pages"]
):
# Dump each dirty page
for dirty_page_addr, dirty_page_size in self._get_dirty_pages(
proc_layer, vma
):
name = f"{vma_name}, dirty page address: {dirty_page_addr:#x}"
data = proc_layer.read(dirty_page_addr, data_size, pad=True)
yield vma, name, data, dirty_page_addr, dirty_page_size
continue
name = vma_name
if suspicious_flag == MaliciousFlags.X_DIRTY:
name = f"{vma_name}, dirty page address: {region_start:#x}"
data = proc_layer.read(vma.vm_start, data_size, pad=True)
yield vma, name, data, region_start, region_size
def _generator(self, tasks):
# determine if we're on a 32 or 64 bit kernel
vmlinux = self.context.modules[self.config["kernel"]]
is_32bit_arch = not symbols.symbol_table_is_64bit(
context=self.context, symbol_table_name=vmlinux.symbol_table_name
)
for task in tasks:
process_name = utility.array_to_string(task.comm)
for vma, vma_name, data, region_start, region_size in self._list_injections(
task
):
if is_32bit_arch:
architecture = "intel"
else:
architecture = "intel64"
disasm = renderers.Disassembly(data, region_start, architecture)
file_output = "Disabled"
if self.config["dump-regions"]:
file_handle = proc.Maps.vma_dump(
self.context,
task,
region_start,
region_start + region_size,
self.open,
self.config["dump-maxsize"],
)
if file_handle:
file_handle.close()
file_output = file_handle.preferred_filename
yield (
0,
(
task.pid,
process_name,
format_hints.Hex(vma.vm_start),
format_hints.Hex(vma.vm_end),
vma_name or renderers.NotAvailableValue(),
vma.get_protection(),
format_hints.HexBytes(data),
disasm,
file_output,
),
)
[docs]
def run(self):
filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))
return renderers.TreeGrid(
[
("PID", int),
("Process", str),
("Start", format_hints.Hex),
("End", format_hints.Hex),
("Path", str),
("Protection", str),
("Hexdump", format_hints.HexBytes),
("Disasm", renderers.Disassembly),
("File output", str),
],
self._generator(
pslist.PsList.list_tasks(
self.context, self.config["kernel"], filter_func=filter_func
)
),
)