Source code for volatility3.plugins.linux.malware.malfind

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
from typing import Iterable, List, Tuple, Optional
from enum import IntEnum
from volatility3.framework import exceptions, interfaces, renderers, symbols
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.plugins.linux import pslist, proc

vollog = logging.getLogger(__name__)


[docs] class MaliciousFlags(IntEnum): RWX = 0 RX = 1 X_DIRTY = 2
[docs] class Malfind(interfaces.plugins.PluginInterface): """Lists process memory ranges that potentially contain injected code.""" _required_framework_version = (2, 0, 0) _version = (1, 1, 0) MAX_DUMPSIZE_DEFAULT = 1024 * 1024 * 1024 # 1 Gb
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", description="Linux kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="pslist", component=pslist.PsList, version=(4, 0, 0) ), requirements.VersionRequirement( name="proc", component=proc.Maps, version=(1, 0, 3) ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), requirements.IntRequirement( name="hexdump-size", description="Amount of bytes to show for each region/page found - Default 64 bytes", optional=True, default=64, ), requirements.BooleanRequirement( name="show-all-dirty-pages", description="Show all dirty pages in a VMA if at least one dirty page is found - Default off", optional=True, default=False, ), requirements.BooleanRequirement( name="dump-regions", description="Dump each suspicious memory region in output. All dirty pages will be dumped if --show-all-dirty-pages is enabled.", optional=True, default=False, ), requirements.IntRequirement( name="dump-maxsize", description="Maximum size for dumped memory regions " "(all the bigger regions will be ignored) - Default 1 GB", default=cls.MAX_DUMPSIZE_DEFAULT, optional=True, ), ]
def _get_dirty_pages(self, proc_layer, vma) -> Iterable[Tuple[int, int]]: """Get dirty pages inside the specified VMA. Yields: page address and page size """ page_addr = vma.vm_start while page_addr < vma.vm_end: try: # We don't want to use the layer's page size by default to handle # large pages (PUD, PMD...) correctly. _, page_size, _ = proc_layer._translate(page_addr) if proc_layer.is_dirty(page_addr): yield page_addr, page_size except ( AttributeError, exceptions.PagedInvalidAddressException, exceptions.InvalidAddressException, ): page_size = proc_layer.page_size page_addr += page_size def _is_suspicious(self, proc_layer, vma) -> Optional[Tuple[int, MaliciousFlags]]: """Determine if a VMA is suspicious based on any of the following criterias: - RWX - RX - X + DIRTY Returns: (suspicious page address, suspicious page size, malicious flag) or None """ flags_str = vma.get_protection() if flags_str == "rwx": return vma.vm_start, vma.vm_end - vma.vm_start, MaliciousFlags.RWX elif flags_str == "r-x" and vma.vm_file.dereference().vol.offset == 0: return vma.vm_start, vma.vm_end - vma.vm_start, MaliciousFlags.RX elif "x" in flags_str: for page_addr, page_size in self._get_dirty_pages(proc_layer, vma): vollog.warning( f"Found dirty page at {page_addr:#x} inside executable region {vma.vm_start:#x}-{vma.vm_end:#x}!" ) # We do not attempt to find other dirty+exec pages once we have found one return page_addr, page_size, MaliciousFlags.X_DIRTY return None def _list_injections( self, task ) -> Iterable[ Tuple[interfaces.objects.ObjectInterface, Optional[str], bytes, int, int] ]: """Generate memory regions for a process that may contain injected code.""" proc_layer_name = task.add_process_layer() if not proc_layer_name: return proc_layer = self.context.layers[proc_layer_name] data_size = self.config["hexdump-size"] # Dumping page defaults to off, as in case a whole r-xp region is dirty # this would likely dump 1000's of pages which might not always be wise nor necessary for vma in task.mm.get_vma_iter(): vma_name = vma.get_name(self.context, task) vollog.debug( f"Injections : processing PID {task.pid} : VMA {vma_name} : {hex(vma.vm_start)}-{hex(vma.vm_end)}" ) if vma_name == "[vdso]": continue suspicious_result = self._is_suspicious(proc_layer, vma) if suspicious_result is None: continue region_start, region_size, suspicious_flag = suspicious_result # If _is_suspicious returns MaliciousFlags.X_DIRTY, this means at least one page # in the region is dirty. If --show-all-dirty-pages is set, then we show # all the dirty pages. if ( suspicious_flag == MaliciousFlags.X_DIRTY and self.config["show-all-dirty-pages"] ): # Dump each dirty page for dirty_page_addr, dirty_page_size in self._get_dirty_pages( proc_layer, vma ): name = f"{vma_name}, dirty page address: {dirty_page_addr:#x}" data = proc_layer.read(dirty_page_addr, data_size, pad=True) yield vma, name, data, dirty_page_addr, dirty_page_size continue name = vma_name if suspicious_flag == MaliciousFlags.X_DIRTY: name = f"{vma_name}, dirty page address: {region_start:#x}" data = proc_layer.read(vma.vm_start, data_size, pad=True) yield vma, name, data, region_start, region_size def _generator(self, tasks): # determine if we're on a 32 or 64 bit kernel vmlinux = self.context.modules[self.config["kernel"]] is_32bit_arch = not symbols.symbol_table_is_64bit( context=self.context, symbol_table_name=vmlinux.symbol_table_name ) for task in tasks: process_name = utility.array_to_string(task.comm) for vma, vma_name, data, region_start, region_size in self._list_injections( task ): if is_32bit_arch: architecture = "intel" else: architecture = "intel64" disasm = renderers.Disassembly(data, region_start, architecture) file_output = "Disabled" if self.config["dump-regions"]: file_handle = proc.Maps.vma_dump( self.context, task, region_start, region_start + region_size, self.open, self.config["dump-maxsize"], ) if file_handle: file_handle.close() file_output = file_handle.preferred_filename yield ( 0, ( task.pid, process_name, format_hints.Hex(vma.vm_start), format_hints.Hex(vma.vm_end), vma_name or renderers.NotAvailableValue(), vma.get_protection(), format_hints.HexBytes(data), disasm, file_output, ), )
[docs] def run(self): filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) return renderers.TreeGrid( [ ("PID", int), ("Process", str), ("Start", format_hints.Hex), ("End", format_hints.Hex), ("Path", str), ("Protection", str), ("Hexdump", format_hints.HexBytes), ("Disasm", renderers.Disassembly), ("File output", str), ], self._generator( pslist.PsList.list_tasks( self.context, self.config["kernel"], filter_func=filter_func ) ), )