# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from pathlib import PurePosixPath
from typing import Optional, Tuple, Iterator
from volatility3.framework import exceptions, interfaces, renderers
from volatility3.framework.constants import linux as linux_constants
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.objects import utility
from volatility3.framework.symbols import linux
from volatility3.plugins.linux import pslist
vollog = logging.getLogger(__name__)
[docs]
class ProcessSpoofing(plugins.PluginInterface):
"""Detects process spoofing by comparing executable path to cmdline & comm fields.
Examples of such behavior can be found here: https://github.com/SolitudePy/linux-mal
"""
_required_framework_version = (2, 27, 0)
_version = (1, 1, 0)
[docs]
@classmethod
def get_requirements(cls):
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="pslist", component=pslist.PsList, version=(4, 0, 0)
),
requirements.VersionRequirement(
name="linuxutils", component=linux.LinuxUtilities, version=(2, 0, 0)
),
requirements.ListRequirement(
name="pid",
description="Filter on specific process IDs",
element_type=int,
optional=True,
),
]
[docs]
@classmethod
def get_executable_path(
cls,
context: interfaces.context.ContextInterface,
task: interfaces.objects.ObjectInterface,
) -> Optional[str]:
"""
Extract the executable path from task_struct.mm.exe_file
Args:
context: The context to operate on
task: task_struct object of the process
Returns:
Returns executable path or None if not available
"""
try:
mm = task.mm
except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.debug(f"Unable to access mm for task at {task.vol.offset:#x}: {e}")
return None
if not mm or not mm.is_readable():
# Kernel threads don't have mm struct
return None
try:
exe_file = mm.exe_file
except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.debug(
f"Unable to access exe_file for task at {task.vol.offset:#x}: {e}"
)
return None
if not exe_file or not exe_file.is_readable():
return None
try:
exe_path = linux.LinuxUtilities.path_for_file(context, task, exe_file)
except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.debug(
f"Unable to read exe_file path for task at {task.vol.offset:#x}: {e}"
)
return None
return exe_path
[docs]
@classmethod
def get_cmdline_basename(
cls,
context: interfaces.context.ContextInterface,
task: interfaces.objects.ObjectInterface,
) -> Optional[str]:
"""
Extract the command line arguments and return the basename of the first argument.
Notes:
The read length is capped at ``MAX_ARG_STRLEN`` (32 * 4096) per the
kernel limit defined in ``include/uapi/linux/binfmts.h`` (see
linux.git commit f6031913338f1dad5bd8cb7286ff4e53644b6940).
Args:
context: The context to operate on
task: task_struct object of the process
Returns:
Basename of the first command line argument or None if not available
"""
mm = task.mm
if not mm or not mm.is_readable():
return None
proc_layer_name = task.add_process_layer()
if proc_layer_name is None:
return None
start = task.mm.arg_start
size_to_read = task.mm.arg_end - task.mm.arg_start
if size_to_read <= 0:
return None
read_length = min(size_to_read, linux_constants.MAX_ARG_STRLEN)
try:
cmdline = utility.address_to_string(
context=context,
layer_name=proc_layer_name,
address=start,
count=read_length,
errors="replace",
encoding="utf-8",
)
except exceptions.InvalidAddressException as e:
vollog.debug(
f"Unable to read cmdline for task at {task.vol.offset:#x}: {e}"
)
return None
if not cmdline:
return None
basename = PurePosixPath(cmdline).name
return basename if basename else None
[docs]
@classmethod
def get_comm(cls, task: interfaces.objects.ObjectInterface) -> Optional[str]:
"""
Extract the comm field from task_struct
Args:
task: task_struct object of the process
Returns:
Process name from comm field or None if not available
"""
try:
return utility.array_to_string(task.comm)
except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.debug(f"Unable to read comm for task at {task.vol.offset:#x}: {e}")
return None
def _detect_spoofing(
self,
exe_basename: Optional[str],
cmdline_basename: Optional[str],
comm: Optional[str],
) -> Tuple[bool, bool]:
"""
Analyze the three name sources to detect potential spoofing
Args:
exe_basename: Basename from exe_file path
cmdline_basename: Basename from command line
comm: Name from comm field
Returns:
Tuple of (cmdline_spoofed, comm_spoofed) boolean flags
"""
# Skip kernel threads - need at least 2 sources for comparison
available_sources = sum(
1 for name in [exe_basename, cmdline_basename, comm] if name
)
if available_sources < 2:
return False, False
# Check for cmdline spoofing
cmdline_spoofed = False
if exe_basename and cmdline_basename:
cmdline_spoofed = exe_basename != cmdline_basename
# Check for comm spoofing (comm is truncated to 15 characters)
comm_spoofed = False
if exe_basename and comm:
comm_spoofed = exe_basename[:15] != comm
return cmdline_spoofed, comm_spoofed
def _generator(self, tasks) -> Iterator[Tuple[int, Tuple]]:
"""
Generate process spoofing detection results
Args:
tasks: Iterator of task_struct objects
Yields:
Tuple containing process information and spoofing analysis
"""
for task in tasks:
try:
pid = task.pid
ppid = task.get_parent_pid()
exe_path, exe_basename, cmdline_basename, comm = (
self.extract_process_names(self.context, task)
)
cmdline_spoofed, comm_spoofed = self._detect_spoofing(
exe_basename, cmdline_basename, comm
)
is_deleted = exe_path.endswith(" (deleted)") if exe_path else False
# Convert None values to strings for TreeGrid compatibility
exe_path_render = exe_path if exe_path else "N/A"
cmdline_render = cmdline_basename if cmdline_basename else "N/A"
comm_render = comm if comm else "N/A"
yield (
0,
(
pid,
ppid,
exe_path_render,
cmdline_render,
comm_render,
cmdline_spoofed,
comm_spoofed,
is_deleted,
),
)
except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.warning(
f"Unable to process task PID {getattr(task, 'pid', 'unknown')} at {task.vol.offset:#x}: {e}"
)
continue
[docs]
def run(self):
filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))
return renderers.TreeGrid(
[
("PID", int),
("PPID", int),
("Exe_Path", str),
("Cmdline_Basename", str),
("Comm", str),
("Cmdline_Spoofed", bool),
("Comm_Spoofed", bool),
("Exe_Deleted", bool),
],
self._generator(
pslist.PsList.list_tasks(
self.context, self.config["kernel"], filter_func=filter_func
)
),
)