Source code for volatility3.plugins.windows.malware.pebmasquerade

import logging
from typing import List, Union, Tuple

from volatility3.framework import interfaces, renderers, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.plugins.windows import pslist

vollog = logging.getLogger(__name__)


# https://www.ired.team/offensive-security/defense-evasion/masquerading-processes-in-userland-through-_peb
# https://github.com/FuzzySecurity/PowerShell-Suite/blob/master/Masquerade-PEB.ps1
[docs] class PebMasquerade(interfaces.plugins.PluginInterface): """Detects potential process name spoofing by comparing EPROCESS and PEB data.""" _version = (1, 0, 0) _required_framework_version = (2, 27, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="pslist", component=pslist.PsList, version=(3, 0, 0) ), requirements.ListRequirement( name="pid", element_type=int, description="Process ID to include (all other processes are excluded)", optional=True, ), ]
[docs] @classmethod def get_process_names( cls, proc: interfaces.objects.ObjectInterface ) -> Tuple[ Union[str, renderers.NotAvailableValue], Union[str, renderers.NotAvailableValue], Union[str, renderers.NotAvailableValue], Union[str, renderers.NotAvailableValue], ]: """Extract process names and related information from various sources (EPROCESS and PEB). Args: proc: The process object Returns: tuple: (eprocess_imagefilename, eprocess_seaudit_imagefilename, peb_imagefilepath, peb_cmdline) """ eprocess_imagefilename = renderers.NotAvailableValue() eprocess_seaudit_imagefilename = renderers.NotAvailableValue() peb_imagefilepath = renderers.NotAvailableValue() peb_cmdline = renderers.NotAvailableValue() try: eprocess_imagefilename = utility.array_to_string(proc.ImageFileName) except (AttributeError, exceptions.InvalidAddressException): vollog.debug( "Unable to read EPROCESS.ImageFileName for PID %d", proc.UniqueProcessId ) except Exception as e: vollog.warning( "Error reading EPROCESS.ImageFileName for PID %d: %s", proc.UniqueProcessId, str(e), ) try: audit = proc.SeAuditProcessCreationInfo.ImageFileName.Name audit_string = audit.get_string() if audit_string: eprocess_seaudit_imagefilename = audit_string except exceptions.InvalidAddressException: vollog.debug( "Unable to read SeAuditProcessCreationInfo.ImageFileName for PID %d", proc.UniqueProcessId, ) except AttributeError: vollog.debug( "SeAuditProcessCreationInfo structure not available for PID %d", proc.UniqueProcessId, ) except Exception as e: vollog.warning( "Error reading SeAuditProcessCreationInfo for PID %d: %s", proc.UniqueProcessId, str(e), ) try: peb = proc.get_peb() if peb and peb.ProcessParameters: # Get ImagePathName try: image_path_str = peb.ProcessParameters.ImagePathName.get_string() if image_path_str: peb_imagefilepath = image_path_str except (AttributeError, exceptions.InvalidAddressException): vollog.debug( "Unable to read PEB.ImagePathName for PID %d", proc.UniqueProcessId, ) except Exception as e: vollog.warning( "Error reading PEB.ImagePathName for PID %d: %s", proc.UniqueProcessId, str(e), ) try: cmdline_str = peb.ProcessParameters.CommandLine.get_string() if cmdline_str: peb_cmdline = cmdline_str except (AttributeError, exceptions.InvalidAddressException): vollog.debug( "Unable to read PEB.ProcessParameters.CommandLine for PID %d", proc.UniqueProcessId, ) except Exception as e: vollog.warning( "Error reading PEB.ProcessParameters.CommandLine for PID %d: %s", proc.UniqueProcessId, str(e), ) except (AttributeError, exceptions.InvalidAddressException): # Important for cases where PEB does not exist or is inaccessible (e.g SYSTEM process) vollog.debug("Unable to access PEB for PID %d", proc.UniqueProcessId) except Exception as e: vollog.warning( "Error accessing PEB for PID %d: %s", proc.UniqueProcessId, str(e) ) return ( eprocess_imagefilename, eprocess_seaudit_imagefilename, peb_imagefilepath, peb_cmdline, )
def _generator(self, pids, context, kernel_module_name): pid_filter = pslist.PsList.create_pid_filter(pids) for proc in pslist.PsList.list_processes( context=context, kernel_module_name=kernel_module_name, filter_func=pid_filter, ): proc_id = proc.UniqueProcessId try: peb = proc.get_peb() except (exceptions.InvalidAddressException, AttributeError): vollog.debug( "Unable to access PEB for PID %d, skipping process", proc_id ) peb_imagefilepath_length_check = False peb_cmdline_length_check = False ( eprocess_imagefilename, eprocess_seaudit_imagefilename, peb_imagefilepath, peb_cmdline, ) = PebMasquerade.get_process_names(proc) if isinstance(peb_imagefilepath, str) and peb: try: # Length values are of type USHORT peb_imagefilepath_length = ( peb.ProcessParameters.ImagePathName.Length // 2 ) peb_imagefilepath_maxlength = ( peb.ProcessParameters.ImagePathName.MaximumLength // 2 - 1 ) if (peb_imagefilepath_length != len(peb_imagefilepath)) or ( peb_imagefilepath_maxlength != len(peb_imagefilepath) ): peb_imagefilepath_length_check = True except Exception as e: vollog.warning( "PEB.ImagePathName Length comparison error for PID %d: %s", proc_id, str(e), ) if isinstance(peb_cmdline, str) and peb: try: # Length values are of type USHORT peb_cmdline_length = peb.ProcessParameters.CommandLine.Length // 2 peb_cmdline_maxlength = ( peb.ProcessParameters.CommandLine.MaximumLength // 2 - 1 ) if (peb_cmdline_length != len(peb_cmdline)) or ( peb_cmdline_maxlength != len(peb_cmdline) ): peb_cmdline_length_check = True except Exception as e: vollog.warning( "PEB.CommandLine Length comparison error for PID %d: %s", proc_id, str(e), ) yield ( 0, ( proc_id, eprocess_imagefilename, eprocess_seaudit_imagefilename, peb_imagefilepath, peb_cmdline_length_check, peb_imagefilepath_length_check, ), )
[docs] def run(self): pids = self.config.get("pid", None) context = self.context kernel_module_name = self.config["kernel"] return renderers.TreeGrid( [ ("PID", int), ("EPROCESS_ImageFileName", str), ("EPROCESS_SeAudit_ImageFileName", str), ("PEB_ImageFilePath", str), ("PEB_ImageFilePath_Spoofed", bool), ("PEB_CommandLine_Spoofed", bool), ], self._generator(pids, context, kernel_module_name), )