Source code for volatility3.plugins.linux.malware.process_spoofing

# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
from pathlib import PurePosixPath
from typing import Optional, Tuple, Iterator

from volatility3.framework import exceptions, interfaces, renderers
from volatility3.framework.constants import linux as linux_constants
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.objects import utility
from volatility3.framework.symbols import linux
from volatility3.plugins.linux import pslist

vollog = logging.getLogger(__name__)


[docs] class ProcessSpoofing(plugins.PluginInterface): """Detects process spoofing by comparing executable path to cmdline & comm fields. Examples of such behavior can be found here: https://github.com/SolitudePy/linux-mal """ _required_framework_version = (2, 27, 0) _version = (1, 1, 0)
[docs] @classmethod def get_requirements(cls): return [ requirements.ModuleRequirement( name="kernel", description="Linux kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="pslist", component=pslist.PsList, version=(4, 0, 0) ), requirements.VersionRequirement( name="linuxutils", component=linux.LinuxUtilities, version=(2, 0, 0) ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), ]
[docs] @classmethod def get_executable_path( cls, context: interfaces.context.ContextInterface, task: interfaces.objects.ObjectInterface, ) -> Optional[str]: """ Extract the executable path from task_struct.mm.exe_file Args: context: The context to operate on task: task_struct object of the process Returns: Returns executable path or None if not available """ try: mm = task.mm except (exceptions.InvalidAddressException, AttributeError) as e: vollog.debug(f"Unable to access mm for task at {task.vol.offset:#x}: {e}") return None if not mm or not mm.is_readable(): # Kernel threads don't have mm struct return None try: exe_file = mm.exe_file except (exceptions.InvalidAddressException, AttributeError) as e: vollog.debug( f"Unable to access exe_file for task at {task.vol.offset:#x}: {e}" ) return None if not exe_file or not exe_file.is_readable(): return None try: exe_path = linux.LinuxUtilities.path_for_file(context, task, exe_file) except (exceptions.InvalidAddressException, AttributeError) as e: vollog.debug( f"Unable to read exe_file path for task at {task.vol.offset:#x}: {e}" ) return None return exe_path
[docs] @classmethod def get_cmdline_basename( cls, context: interfaces.context.ContextInterface, task: interfaces.objects.ObjectInterface, ) -> Optional[str]: """ Extract the command line arguments and return the basename of the first argument. Notes: The read length is capped at ``MAX_ARG_STRLEN`` (32 * 4096) per the kernel limit defined in ``include/uapi/linux/binfmts.h`` (see linux.git commit f6031913338f1dad5bd8cb7286ff4e53644b6940). Args: context: The context to operate on task: task_struct object of the process Returns: Basename of the first command line argument or None if not available """ mm = task.mm if not mm or not mm.is_readable(): return None proc_layer_name = task.add_process_layer() if proc_layer_name is None: return None start = task.mm.arg_start size_to_read = task.mm.arg_end - task.mm.arg_start if size_to_read <= 0: return None read_length = min(size_to_read, linux_constants.MAX_ARG_STRLEN) try: cmdline = utility.address_to_string( context=context, layer_name=proc_layer_name, address=start, count=read_length, errors="replace", encoding="utf-8", ) except exceptions.InvalidAddressException as e: vollog.debug( f"Unable to read cmdline for task at {task.vol.offset:#x}: {e}" ) return None if not cmdline: return None basename = PurePosixPath(cmdline).name return basename if basename else None
[docs] @classmethod def get_comm(cls, task: interfaces.objects.ObjectInterface) -> Optional[str]: """ Extract the comm field from task_struct Args: task: task_struct object of the process Returns: Process name from comm field or None if not available """ try: return utility.array_to_string(task.comm) except (exceptions.InvalidAddressException, AttributeError) as e: vollog.debug(f"Unable to read comm for task at {task.vol.offset:#x}: {e}") return None
[docs] @classmethod def extract_process_names( cls, context: interfaces.context.ContextInterface, task: interfaces.objects.ObjectInterface, ) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], bool]: """ Extract all process name sources for comparison Returns: Tuple of (exe_path, exe_basename, cmdline_basename, comm) """ exe_path = cls.get_executable_path(context, task) exe_basename = PurePosixPath(exe_path).name if exe_path else None if exe_basename and exe_basename.endswith(" (deleted)"): exe_basename = exe_basename[: -len(" (deleted)")] cmdline_basename = cls.get_cmdline_basename(context, task) comm = cls.get_comm(task) return exe_path, exe_basename, cmdline_basename, comm
def _detect_spoofing( self, exe_basename: Optional[str], cmdline_basename: Optional[str], comm: Optional[str], ) -> Tuple[bool, bool]: """ Analyze the three name sources to detect potential spoofing Args: exe_basename: Basename from exe_file path cmdline_basename: Basename from command line comm: Name from comm field Returns: Tuple of (cmdline_spoofed, comm_spoofed) boolean flags """ # Skip kernel threads - need at least 2 sources for comparison available_sources = sum( 1 for name in [exe_basename, cmdline_basename, comm] if name ) if available_sources < 2: return False, False # Check for cmdline spoofing cmdline_spoofed = False if exe_basename and cmdline_basename: cmdline_spoofed = exe_basename != cmdline_basename # Check for comm spoofing (comm is truncated to 15 characters) comm_spoofed = False if exe_basename and comm: comm_spoofed = exe_basename[:15] != comm return cmdline_spoofed, comm_spoofed def _generator(self, tasks) -> Iterator[Tuple[int, Tuple]]: """ Generate process spoofing detection results Args: tasks: Iterator of task_struct objects Yields: Tuple containing process information and spoofing analysis """ for task in tasks: try: pid = task.pid ppid = task.get_parent_pid() exe_path, exe_basename, cmdline_basename, comm = ( self.extract_process_names(self.context, task) ) cmdline_spoofed, comm_spoofed = self._detect_spoofing( exe_basename, cmdline_basename, comm ) is_deleted = exe_path.endswith(" (deleted)") if exe_path else False # Convert None values to strings for TreeGrid compatibility exe_path_render = exe_path if exe_path else "N/A" cmdline_render = cmdline_basename if cmdline_basename else "N/A" comm_render = comm if comm else "N/A" yield ( 0, ( pid, ppid, exe_path_render, cmdline_render, comm_render, cmdline_spoofed, comm_spoofed, is_deleted, ), ) except (exceptions.InvalidAddressException, AttributeError) as e: vollog.warning( f"Unable to process task PID {getattr(task, 'pid', 'unknown')} at {task.vol.offset:#x}: {e}" ) continue
[docs] def run(self): filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) return renderers.TreeGrid( [ ("PID", int), ("PPID", int), ("Exe_Path", str), ("Cmdline_Basename", str), ("Comm", str), ("Cmdline_Spoofed", bool), ("Comm_Spoofed", bool), ("Exe_Deleted", bool), ], self._generator( pslist.PsList.list_tasks( self.context, self.config["kernel"], filter_func=filter_func ) ), )