# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
import ntpath
from typing import List, Type, Optional, Iterator, Tuple
from volatility3.framework import constants, exceptions, interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.windows.extensions import pe
from volatility3.plugins.windows import pslist, modules
vollog = logging.getLogger(__name__)
[docs]
class PEDump(interfaces.plugins.PluginInterface):
"""Allows extracting PE Files from a specific address in a specific address space"""
_required_framework_version = (2, 0, 0)
# 2.0.0 - changed the signature of `dump_kernel_pe_at_base`
_version = (2, 0, 0)
[docs]
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
# Since we're calling the plugin, make sure we have the plugin's requirements
return [
requirements.ModuleRequirement(
name="kernel",
description="Windows kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="pslist", component=pslist.PsList, version=(3, 0, 0)
),
requirements.VersionRequirement(
name="modules", component=modules.Modules, version=(3, 0, 0)
),
requirements.ListRequirement(
name="pid",
element_type=int,
description="Process IDs to include (all other processes are excluded)",
optional=True,
),
requirements.IntRequirement(
name="base",
description="Base address to reconstruct a PE file",
optional=False,
),
requirements.BooleanRequirement(
name="kernel_module",
description="Extract from kernel address space.",
default=False,
optional=True,
),
]
[docs]
@classmethod
def dump_pe(
cls,
context: interfaces.context.ContextInterface,
pe_table_name: str,
layer_name: str,
open_method: Type[interfaces.plugins.FileHandlerInterface],
file_name: str,
base: int,
) -> Optional[str]:
"""
Returns the filename of the dump file or None
"""
with open_method(file_name) as file_handle:
try:
dos_header = context.object(
pe_table_name + constants.BANG + "_IMAGE_DOS_HEADER",
offset=base,
layer_name=layer_name,
)
for offset, data in dos_header.reconstruct():
file_handle.seek(offset)
file_handle.write(data)
except (
OSError,
exceptions.VolatilityException,
OverflowError,
ValueError,
) as excp:
vollog.debug(f"Unable to dump PE file at offset {base}: {excp}")
return None
return file_handle.preferred_filename
[docs]
@classmethod
def dump_ldr_entry(
cls,
context: interfaces.context.ContextInterface,
pe_table_name: str,
ldr_entry: interfaces.objects.ObjectInterface,
open_method: Type[interfaces.plugins.FileHandlerInterface],
layer_name: Optional[str] = None,
prefix: str = "",
) -> Optional[str]:
"""Extracts the PE file referenced an LDR_DATA_TABLE_ENTRY (DLL, kernel module) instance
Args:
context: the context to operate upon
pe_table_name: the name for the symbol table containing the PE format symbols
ldr_entry: the object representing the module
open_method: class for constructing output files
layer_name: the layer that the DLL lives within
prefix: optional string to prepend to filename
Returns:
The output file name or None in the case of failure
"""
try:
name = ldr_entry.FullDllName.get_string()
except exceptions.InvalidAddressException:
name = "UnreadableDLLName"
if layer_name is None:
layer_name = ldr_entry.vol.layer_name
file_name = f"{prefix}{ntpath.basename(name)}.{ldr_entry.vol.offset:#x}.{ldr_entry.DllBase:#x}.dmp"
return cls.dump_pe(
context,
pe_table_name,
layer_name,
open_method,
file_name,
ldr_entry.DllBase,
)
[docs]
@classmethod
def dump_pe_at_base(
cls,
context: interfaces.context.ContextInterface,
pe_table_name: str,
layer_name: str,
open_method: Type[interfaces.plugins.FileHandlerInterface],
proc_offset: int,
pid: int,
base: int,
) -> Optional[str]:
file_name = f"PE.{proc_offset:#x}.{pid:d}.{base:#x}.dmp"
return PEDump.dump_pe(
context, pe_table_name, layer_name, open_method, file_name, base
)
[docs]
@classmethod
def dump_kernel_pe_at_base(
cls,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
pe_table_name: str,
open_method: Type[interfaces.plugins.FileHandlerInterface],
base: int,
) -> Iterator[Tuple[int, str, str]]:
"""
Extracts a PE file from kernel memory at the given base address
"""
session_layers = modules.Modules.get_session_layers(
context=context, kernel_module_name=kernel_module_name
)
session_layer_name = modules.Modules.find_session_layer(
context, session_layers, base
)
if session_layer_name:
system_pid = 4
file_output = PEDump.dump_pe_at_base(
context,
pe_table_name,
session_layer_name,
open_method,
0,
system_pid,
base,
)
if file_output:
yield system_pid, "Kernel", file_output
else:
vollog.warning(
"Unable to find a session layer with the provided base address mapped in the kernel."
)
[docs]
@classmethod
def dump_processes(
cls, context, kernel, pe_table_name, open_method, filter_func, base
):
""" """
for proc in pslist.PsList.list_processes(
context=context,
kernel_module_name=kernel.name,
filter_func=filter_func,
):
pid = proc.UniqueProcessId
proc_name = proc.ImageFileName.cast(
"string",
max_length=proc.ImageFileName.vol.count,
errors="replace",
)
proc_layer_name = proc.add_process_layer()
file_output = PEDump.dump_pe_at_base(
context,
pe_table_name,
proc_layer_name,
open_method,
proc.vol.offset,
pid,
base,
)
if file_output:
yield pid, proc_name, file_output
def _generator(self):
kernel = self.context.modules[self.config["kernel"]]
pe_table_name = intermed.IntermediateSymbolTable.create(
self.context, self.config_path, "windows", "pe", class_types=pe.class_types
)
if self.config["kernel_module"] and self.config["pid"]:
vollog.error("Only 'kernel-module' or 'pid' should be set, not both")
return
if not self.config["kernel_module"] and not self.config["pid"]:
vollog.error("Either 'kernel-module' or 'pid' argument must be set")
return
if self.config["kernel_module"]:
pe_files = self.dump_kernel_pe_at_base(
context=self.context,
kernel_module_name=self.config["kernel"],
pe_table_name=pe_table_name,
open_method=self.open,
base=self.config["base"],
)
else:
filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))
pe_files = self.dump_processes(
self.context,
kernel,
pe_table_name,
self.open,
filter_func,
self.config["base"],
)
for pid, proc_name, file_output in pe_files:
yield (
0,
(
pid,
proc_name,
file_output,
),
)
[docs]
def run(self):
return renderers.TreeGrid(
[
("PID", int),
("Process", str),
("File output", str),
],
self._generator(),
)