import logging
from typing import List, Union, Tuple
from volatility3.framework import interfaces, renderers, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.plugins.windows import pslist
vollog = logging.getLogger(__name__)
# https://www.ired.team/offensive-security/defense-evasion/masquerading-processes-in-userland-through-_peb
# https://github.com/FuzzySecurity/PowerShell-Suite/blob/master/Masquerade-PEB.ps1
[docs]
class PebMasquerade(interfaces.plugins.PluginInterface):
"""Detects potential process name spoofing by comparing EPROCESS and PEB data."""
_version = (1, 0, 0)
_required_framework_version = (2, 27, 0)
[docs]
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.ModuleRequirement(
name="kernel",
description="Windows kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="pslist", component=pslist.PsList, version=(3, 0, 0)
),
requirements.ListRequirement(
name="pid",
element_type=int,
description="Process ID to include (all other processes are excluded)",
optional=True,
),
]
[docs]
@classmethod
def get_process_names(
cls, proc: interfaces.objects.ObjectInterface
) -> Tuple[
Union[str, renderers.NotAvailableValue],
Union[str, renderers.NotAvailableValue],
Union[str, renderers.NotAvailableValue],
Union[str, renderers.NotAvailableValue],
]:
"""Extract process names and related information from various sources (EPROCESS and PEB).
Args:
proc: The process object
Returns:
tuple: (eprocess_imagefilename, eprocess_seaudit_imagefilename, peb_imagefilepath, peb_cmdline)
"""
eprocess_imagefilename = renderers.NotAvailableValue()
eprocess_seaudit_imagefilename = renderers.NotAvailableValue()
peb_imagefilepath = renderers.NotAvailableValue()
peb_cmdline = renderers.NotAvailableValue()
try:
eprocess_imagefilename = utility.array_to_string(proc.ImageFileName)
except (AttributeError, exceptions.InvalidAddressException):
vollog.debug(
"Unable to read EPROCESS.ImageFileName for PID %d", proc.UniqueProcessId
)
except Exception as e:
vollog.warning(
"Error reading EPROCESS.ImageFileName for PID %d: %s",
proc.UniqueProcessId,
str(e),
)
try:
audit = proc.SeAuditProcessCreationInfo.ImageFileName.Name
audit_string = audit.get_string()
if audit_string:
eprocess_seaudit_imagefilename = audit_string
except exceptions.InvalidAddressException:
vollog.debug(
"Unable to read SeAuditProcessCreationInfo.ImageFileName for PID %d",
proc.UniqueProcessId,
)
except AttributeError:
vollog.debug(
"SeAuditProcessCreationInfo structure not available for PID %d",
proc.UniqueProcessId,
)
except Exception as e:
vollog.warning(
"Error reading SeAuditProcessCreationInfo for PID %d: %s",
proc.UniqueProcessId,
str(e),
)
try:
peb = proc.get_peb()
if peb and peb.ProcessParameters:
# Get ImagePathName
try:
image_path_str = peb.ProcessParameters.ImagePathName.get_string()
if image_path_str:
peb_imagefilepath = image_path_str
except (AttributeError, exceptions.InvalidAddressException):
vollog.debug(
"Unable to read PEB.ImagePathName for PID %d",
proc.UniqueProcessId,
)
except Exception as e:
vollog.warning(
"Error reading PEB.ImagePathName for PID %d: %s",
proc.UniqueProcessId,
str(e),
)
try:
cmdline_str = peb.ProcessParameters.CommandLine.get_string()
if cmdline_str:
peb_cmdline = cmdline_str
except (AttributeError, exceptions.InvalidAddressException):
vollog.debug(
"Unable to read PEB.ProcessParameters.CommandLine for PID %d",
proc.UniqueProcessId,
)
except Exception as e:
vollog.warning(
"Error reading PEB.ProcessParameters.CommandLine for PID %d: %s",
proc.UniqueProcessId,
str(e),
)
except (AttributeError, exceptions.InvalidAddressException):
# Important for cases where PEB does not exist or is inaccessible (e.g SYSTEM process)
vollog.debug("Unable to access PEB for PID %d", proc.UniqueProcessId)
except Exception as e:
vollog.warning(
"Error accessing PEB for PID %d: %s", proc.UniqueProcessId, str(e)
)
return (
eprocess_imagefilename,
eprocess_seaudit_imagefilename,
peb_imagefilepath,
peb_cmdline,
)
def _generator(self, pids, context, kernel_module_name):
pid_filter = pslist.PsList.create_pid_filter(pids)
for proc in pslist.PsList.list_processes(
context=context,
kernel_module_name=kernel_module_name,
filter_func=pid_filter,
):
proc_id = proc.UniqueProcessId
try:
peb = proc.get_peb()
except (exceptions.InvalidAddressException, AttributeError):
vollog.debug(
"Unable to access PEB for PID %d, skipping process", proc_id
)
peb_imagefilepath_length_check = False
peb_cmdline_length_check = False
(
eprocess_imagefilename,
eprocess_seaudit_imagefilename,
peb_imagefilepath,
peb_cmdline,
) = PebMasquerade.get_process_names(proc)
if isinstance(peb_imagefilepath, str) and peb:
try:
# Length values are of type USHORT
peb_imagefilepath_length = (
peb.ProcessParameters.ImagePathName.Length // 2
)
peb_imagefilepath_maxlength = (
peb.ProcessParameters.ImagePathName.MaximumLength // 2 - 1
)
if (peb_imagefilepath_length != len(peb_imagefilepath)) or (
peb_imagefilepath_maxlength != len(peb_imagefilepath)
):
peb_imagefilepath_length_check = True
except Exception as e:
vollog.warning(
"PEB.ImagePathName Length comparison error for PID %d: %s",
proc_id,
str(e),
)
if isinstance(peb_cmdline, str) and peb:
try:
# Length values are of type USHORT
peb_cmdline_length = peb.ProcessParameters.CommandLine.Length // 2
peb_cmdline_maxlength = (
peb.ProcessParameters.CommandLine.MaximumLength // 2 - 1
)
if (peb_cmdline_length != len(peb_cmdline)) or (
peb_cmdline_maxlength != len(peb_cmdline)
):
peb_cmdline_length_check = True
except Exception as e:
vollog.warning(
"PEB.CommandLine Length comparison error for PID %d: %s",
proc_id,
str(e),
)
yield (
0,
(
proc_id,
eprocess_imagefilename,
eprocess_seaudit_imagefilename,
peb_imagefilepath,
peb_cmdline_length_check,
peb_imagefilepath_length_check,
),
)
[docs]
def run(self):
pids = self.config.get("pid", None)
context = self.context
kernel_module_name = self.config["kernel"]
return renderers.TreeGrid(
[
("PID", int),
("EPROCESS_ImageFileName", str),
("EPROCESS_SeAudit_ImageFileName", str),
("PEB_ImageFilePath", str),
("PEB_ImageFilePath_Spoofed", bool),
("PEB_CommandLine_Spoofed", bool),
],
self._generator(pids, context, kernel_module_name),
)