# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import collections.abc
import contextlib
import datetime
import functools
import logging
import math
from typing import Generator, Iterable, Iterator, List, Optional, Tuple, Union
from volatility3.framework import (
constants,
exceptions,
interfaces,
objects,
renderers,
symbols,
)
from volatility3.framework.interfaces.objects import ObjectInterface
from volatility3.framework.layers import intel
from volatility3.framework.renderers import conversion
from volatility3.framework.symbols import generic
from volatility3.framework.symbols.windows.extensions import kdbg, pe, pool
vollog = logging.getLogger(__name__)
# Keep these in a basic module, to prevent import cycles when symbol providers require them
[docs]class KSYSTEM_TIME(objects.StructType):
"""A system time structure that stores a high and low part."""
[docs] def get_time(self):
wintime = (self.High1Time << 32) | self.LowPart
return conversion.wintime_to_datetime(wintime)
[docs]class MMVAD_SHORT(objects.StructType):
"""A class that represents process virtual memory ranges.
Each instance is a node in a binary tree structure and is pointed to
by VadRoot.
"""
[docs] @functools.lru_cache(maxsize=None)
def get_tag(self):
vad_address = self.vol.offset
# the offset is different on 32 and 64 bits
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
if not symbols.symbol_table_is_64bit(self._context, symbol_table_name):
vad_address -= 4
else:
vad_address -= 12
try:
# TODO: instantiate a _POOL_HEADER and return PoolTag
bytesobj = self._context.object(
symbol_table_name + constants.BANG + "bytes",
layer_name=self.vol.layer_name,
offset=vad_address,
native_layer_name=self.vol.native_layer_name,
length=4,
)
return bytesobj.decode()
except exceptions.InvalidAddressException:
return None
except UnicodeDecodeError:
return None
[docs] def traverse(self, visited=None, depth=0):
"""Traverse the VAD tree, determining each underlying VAD node type by
looking up the pool tag for the structure and then casting into a new
object."""
# TODO: this is an arbitrary limit chosen based on past observations
if depth > 100:
vollog.log(
constants.LOGLEVEL_VVV, "Vad tree is too deep, something went wrong!"
)
raise RuntimeError("Vad tree is too deep")
if visited is None:
visited = set()
vad_address = self.vol.offset
if vad_address in visited:
vollog.log(constants.LOGLEVEL_VVV, "VAD node already seen!")
return None
visited.add(vad_address)
tag = self.get_tag()
if tag in ["VadS", "VadF"]:
target = "_MMVAD_SHORT"
elif tag is not None and tag.startswith("Vad"):
target = "_MMVAD"
elif depth == 0:
# the root node at depth 0 is allowed to not have a tag
# but we still want to continue and access its right & left child
target = None
else:
# any node other than the root that doesn't have a recognized tag
# is just garbage and we skip the node entirely
vollog.log(
constants.LOGLEVEL_VVV,
f"Skipping VAD at {self.vol.offset} depth {depth} with tag {tag}",
)
return None
if target:
vad_object = self.cast(target)
yield vad_object
try:
for vad_node in (
self.get_left_child().dereference().traverse(visited, depth + 1)
):
yield vad_node
except exceptions.InvalidAddressException as excp:
vollog.log(
constants.LOGLEVEL_VVV,
f"Invalid address on LeftChild: {excp.invalid_address:#x}",
)
try:
for vad_node in (
self.get_right_child().dereference().traverse(visited, depth + 1)
):
yield vad_node
except exceptions.InvalidAddressException as excp:
vollog.log(
constants.LOGLEVEL_VVV,
f"Invalid address on RightChild: {excp.invalid_address:#x}",
)
[docs] def get_right_child(self):
"""Get the right child member."""
if self.has_member("RightChild"):
return self.RightChild
elif self.has_member("Right"):
return self.Right
# this is for windows 8 and 10
elif self.has_member("VadNode"):
if self.VadNode.has_member("RightChild"):
return self.VadNode.RightChild
if self.VadNode.has_member("Right"):
return self.VadNode.Right
# also for windows 8 and 10
elif self.has_member("Core"):
if self.Core.has_member("VadNode"):
if self.Core.VadNode.has_member("RightChild"):
return self.Core.VadNode.RightChild
if self.Core.VadNode.has_member("Right"):
return self.Core.VadNode.Right
raise AttributeError("Unable to find the right child member")
[docs] def get_left_child(self):
"""Get the left child member."""
if self.has_member("LeftChild"):
return self.LeftChild
elif self.has_member("Left"):
return self.Left
# this is for windows 8 and 10
elif self.has_member("VadNode"):
if self.VadNode.has_member("LeftChild"):
return self.VadNode.LeftChild
if self.VadNode.has_member("Left"):
return self.VadNode.Left
# also for windows 8 and 10
elif self.has_member("Core"):
if self.Core.has_member("VadNode"):
if self.Core.VadNode.has_member("LeftChild"):
return self.Core.VadNode.LeftChild
if self.Core.VadNode.has_member("Left"):
return self.Core.VadNode.Left
raise AttributeError("Unable to find the left child member")
[docs] def get_parent(self):
"""Get the VAD's parent member."""
# this is for xp and 2003
if self.has_member("Parent"):
return self.Parent
# this is for vista through windows 7
elif self.has_member("u1") and self.u1.has_member("Parent"):
return self.u1.Parent & ~0x3
# this is for windows 8 and 10
elif self.has_member("VadNode"):
if self.VadNode.has_member("u1"):
return self.VadNode.u1.Parent & ~0x3
elif self.VadNode.has_member("ParentValue"):
return self.VadNode.ParentValue & ~0x3
# also for windows 8 and 10
elif self.has_member("Core"):
if self.Core.VadNode.has_member("u1"):
return self.Core.VadNode.u1.Parent & ~0x3
elif self.Core.VadNode.has_member("ParentValue"):
return self.Core.VadNode.ParentValue & ~0x3
raise AttributeError("Unable to find the parent member")
[docs] def get_start(self) -> int:
"""Get the VAD's starting virtual address. This is the first accessible byte in the range."""
if self.has_member("StartingVpn"):
if self.has_member("StartingVpnHigh"):
return (self.StartingVpn << 12) | (self.StartingVpnHigh << 44)
else:
return self.StartingVpn << 12
elif self.has_member("Core"):
if self.Core.has_member("StartingVpnHigh"):
return (self.Core.StartingVpn << 12) | (self.Core.StartingVpnHigh << 44)
else:
return self.Core.StartingVpn << 12
raise AttributeError("Unable to find the starting VPN member")
[docs] def get_end(self) -> int:
"""Get the VAD's ending virtual address. This is the last accessible byte in the range."""
if self.has_member("EndingVpn"):
if self.has_member("EndingVpnHigh"):
return (((self.EndingVpn + 1) << 12) | (self.EndingVpnHigh << 44)) - 1
else:
return ((self.EndingVpn + 1) << 12) - 1
elif self.has_member("Core"):
if self.Core.has_member("EndingVpnHigh"):
return (
((self.Core.EndingVpn + 1) << 12) | (self.Core.EndingVpnHigh << 44)
) - 1
else:
return ((self.Core.EndingVpn + 1) << 12) - 1
raise AttributeError("Unable to find the ending VPN member")
[docs] def get_size(self) -> int:
"""Get the size of the VAD region. The OS ensures page granularity."""
return (self.get_end() - self.get_start()) + 1
[docs] def get_commit_charge(self):
"""Get the VAD's commit charge (number of committed pages)"""
if self.has_member("u1") and self.u1.has_member("VadFlags1"):
return self.u1.VadFlags1.CommitCharge
elif self.has_member("u") and self.u.has_member("VadFlags"):
return self.u.VadFlags.CommitCharge
elif self.has_member("Core"):
return self.Core.u1.VadFlags1.CommitCharge
raise AttributeError("Unable to find the commit charge member")
[docs] def get_private_memory(self):
"""Get the VAD's private memory setting."""
if (
self.has_member("u1")
and self.u1.has_member("VadFlags1")
and self.u1.VadFlags1.has_member("PrivateMemory")
):
return self.u1.VadFlags1.PrivateMemory
elif (
self.has_member("u")
and self.u.has_member("VadFlags")
and self.u.VadFlags.has_member("PrivateMemory")
):
return self.u.VadFlags.PrivateMemory
elif self.has_member("Core"):
if (
self.Core.has_member("u1")
and self.Core.u1.has_member("VadFlags1")
and self.Core.u1.VadFlags1.has_member("PrivateMemory")
):
return self.Core.u1.VadFlags1.PrivateMemory
elif (
self.Core.has_member("u")
and self.Core.u.has_member("VadFlags")
and self.Core.u.VadFlags.has_member("PrivateMemory")
):
return self.Core.u.VadFlags.PrivateMemory
raise AttributeError("Unable to find the private memory member")
[docs] def get_protection(self, protect_values, winnt_protections):
"""Get the VAD's protection constants as a string."""
protect = None
if self.has_member("u"):
protect = self.u.VadFlags.Protection
elif self.has_member("Core"):
protect = self.Core.u.VadFlags.Protection
try:
value = protect_values[protect]
except IndexError:
value = 0
names = []
for name, mask in winnt_protections.items():
if value & mask != 0:
names.append(name)
return "|".join(names)
[docs] def get_file_name(self):
"""Only long(er) vads have mapped files."""
return renderers.NotApplicableValue()
[docs]class MMVAD(MMVAD_SHORT):
"""A version of the process virtual memory range structure that contains
additional fields necessary to map files from disk."""
[docs] def get_file_name(self):
"""Get the name of the file mapped into the memory range (if any)"""
file_name = renderers.NotApplicableValue()
with contextlib.suppress(exceptions.InvalidAddressException):
# this is for xp and 2003
if self.has_member("ControlArea"):
filename_obj = self.ControlArea.FilePointer.FileName
# this is for vista through windows 7
else:
filename_obj = (
self.Subsection.ControlArea.FilePointer.dereference()
.cast("_FILE_OBJECT")
.FileName
)
if filename_obj.Length > 0:
file_name = filename_obj.get_string()
return file_name
[docs]class EX_FAST_REF(objects.StructType):
"""This is a standard Windows structure that stores a pointer to an object
but also leverages the least significant bits to encode additional details.
When dereferencing the pointer, we need to strip off the extra bits.
"""
[docs] def dereference(self) -> interfaces.objects.ObjectInterface:
if constants.BANG not in self.vol.type_name:
raise ValueError(
f"Invalid symbol table name syntax (no {constants.BANG} found)"
)
# the mask value is different on 32 and 64 bits
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
if not symbols.symbol_table_is_64bit(self._context, symbol_table_name):
max_fast_ref = 7
else:
max_fast_ref = 15
return self._context.object(
symbol_table_name + constants.BANG + "pointer",
layer_name=self.vol.layer_name,
offset=self.Object & ~max_fast_ref,
native_layer_name=self.vol.native_layer_name,
)
[docs]class DEVICE_OBJECT(objects.StructType, pool.ExecutiveObject):
"""A class for kernel device objects."""
[docs] def get_device_name(self) -> str:
"""Get device's name from the object header."""
header = self.get_object_header()
return header.NameInfo.Name.String # type: ignore
[docs] def get_attached_devices(self) -> Generator[ObjectInterface, None, None]:
"""Enumerate the attached device's objects"""
device = self.AttachedDevice.dereference()
while device:
yield device
device = device.AttachedDevice.dereference()
[docs]class DRIVER_OBJECT(objects.StructType, pool.ExecutiveObject):
"""A class for kernel driver objects."""
[docs] def get_driver_name(self) -> str:
"""Get driver's name from the object header."""
header = self.get_object_header()
return header.NameInfo.Name.String # type: ignore
[docs] def get_devices(self) -> Generator[ObjectInterface, None, None]:
"""Enumerate the driver's device objects"""
device = self.DeviceObject.dereference()
while device:
yield device
device = device.NextDevice.dereference()
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
return True
[docs]class OBJECT_SYMBOLIC_LINK(objects.StructType, pool.ExecutiveObject):
"""A class for kernel link objects."""
[docs] def get_link_name(self) -> str:
header = self.get_object_header()
return header.NameInfo.Name.String # type: ignore
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
return True
[docs] def get_create_time(self):
return conversion.wintime_to_datetime(self.CreationTime.QuadPart)
[docs]class FILE_OBJECT(objects.StructType, pool.ExecutiveObject):
"""A class for windows file objects."""
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
return self.FileName.Length > 0 and self._context.layers[
self.FileName.Buffer.vol.native_layer_name
].is_valid(self.FileName.Buffer)
[docs] def file_name_with_device(self) -> Union[str, interfaces.renderers.BaseAbsentValue]:
name: Union[str, interfaces.renderers.BaseAbsentValue] = (
renderers.UnreadableValue()
)
# this pointer needs to be checked against native_layer_name because the object may
# be instantiated from a primary (virtual) layer or a memory (physical) layer.
if self._context.layers[self.vol.native_layer_name].is_valid(self.DeviceObject):
with contextlib.suppress(ValueError):
name = f"\\Device\\{self.DeviceObject.get_device_name()}"
with contextlib.suppress(TypeError, exceptions.InvalidAddressException):
name += self.FileName.String
return name
[docs] def access_string(self):
## Make a nicely formatted ACL string
return (
("R" if self.ReadAccess else "-")
+ ("W" if self.WriteAccess else "-")
+ ("D" if self.DeleteAccess else "-")
+ ("r" if self.SharedRead else "-")
+ ("w" if self.SharedWrite else "-")
+ ("d" if self.SharedDelete else "-")
)
[docs]class KMUTANT(objects.StructType, pool.ExecutiveObject):
"""A class for windows mutant objects."""
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
return True
[docs] def get_name(self) -> str:
"""Get the object's name from the object header."""
header = self.get_object_header()
return header.NameInfo.Name.String # type: ignore
[docs]class ETHREAD(objects.StructType, pool.ExecutiveObject):
"""A class for executive thread objects."""
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
try:
# validation by TID:
if self.Cid.UniqueThread % 4 != 0: # NT tids are divisible by 4
return False
# validation by PID of parent process:
if self.Cid.UniqueProcess % 4 != 0:
return False
# validation by thread creation time:
if (
self.Cid.UniqueProcess != 4
): # The System process (PID 4) has no create time
ctime = self.get_create_time()
if not isinstance(ctime, datetime.datetime):
return False
if not (1998 < ctime.year < 2030):
return False
except exceptions.InvalidAddressException:
return False
# passed all validations
return True
[docs] def get_create_time(self):
# For Windows XPs
if self.has_member("ThreadsProcess"):
return conversion.wintime_to_datetime(self.CreateTime.QuadPart >> 3)
return conversion.wintime_to_datetime(self.CreateTime.QuadPart)
[docs] def get_exit_time(self):
return conversion.wintime_to_datetime(self.ExitTime.QuadPart)
[docs] def owning_process(self) -> interfaces.objects.ObjectInterface:
"""Return the EPROCESS that owns this thread."""
# For Windows XPs
if self.has_member("ThreadsProcess"):
return self.ThreadsProcess.dereference().cast("_EPROCESS")
# For Windows Vista and later versions
elif self.has_member("Tcb") and self.Tcb.has_member("Process"):
return self.Tcb.Process.dereference().cast("_EPROCESS")
else:
raise AttributeError("Unable to find the owning process of ethread")
[docs] def get_cross_thread_flags(self) -> str:
dictCrossThreadFlags = {
"PS_CROSS_THREAD_FLAGS_TERMINATED": 0,
"PS_CROSS_THREAD_FLAGS_DEADTHREAD": 1,
"PS_CROSS_THREAD_FLAGS_HIDEFROMDBG": 2,
"PS_CROSS_THREAD_FLAGS_IMPERSONATING": 3,
"PS_CROSS_THREAD_FLAGS_SYSTEM": 4,
"PS_CROSS_THREAD_FLAGS_HARD_ERRORS_DISABLED": 5,
"PS_CROSS_THREAD_FLAGS_BREAK_ON_TERMINATION": 6,
"PS_CROSS_THREAD_FLAGS_SKIP_CREATION_MSG": 7,
"PS_CROSS_THREAD_FLAGS_SKIP_TERMINATION_MSG": 8,
}
flags = self.CrossThreadFlags
stringCrossThreadFlags = ""
for flag in dictCrossThreadFlags:
if flags & 2 ** dictCrossThreadFlags[flag]:
stringCrossThreadFlags += f"{flag} "
return (
stringCrossThreadFlags[:-1]
if stringCrossThreadFlags
else stringCrossThreadFlags
)
[docs]class UNICODE_STRING(objects.StructType):
"""A class for Windows unicode string structures."""
[docs] def get_string(self) -> interfaces.objects.ObjectInterface:
# We explicitly do *not* catch errors here, we allow an exception to be thrown
# (otherwise there's no way to determine anything went wrong)
# It's up to the user of this method to catch exceptions
# We manually construct an object rather than casting a dereferenced pointer in case
# the buffer length is 0 and the pointer is a NULL pointer
return self._context.object(
self.vol.type_name.split(constants.BANG)[0] + constants.BANG + "string",
layer_name=self.Buffer.vol.native_layer_name,
offset=self.Buffer,
max_length=self.Length,
errors="replace",
encoding="utf16",
)
String = property(get_string)
[docs]class EPROCESS(generic.GenericIntelProcess, pool.ExecutiveObject):
"""A class for executive kernel processes objects."""
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
try:
name = objects.utility.array_to_string(self.ImageFileName)
if not name or len(name) == 0 or name[0] == "\x00":
return False
# The System/PID 4 process has no create time
if not (str(name) == "System" and self.UniqueProcessId == 4):
if self.CreateTime.QuadPart == 0:
return False
ctime = self.get_create_time()
if not isinstance(ctime, datetime.datetime):
return False
if not (1998 < ctime.year < 2030):
return False
# NT pids are divisible by 4
if self.UniqueProcessId % 4 != 0:
return False
# check for all 0s besides the PCID entries
if isinstance(self.Pcb.DirectoryTableBase, objects.Array):
dtb = self.Pcb.DirectoryTableBase.cast("pointer")
else:
dtb = self.Pcb.DirectoryTableBase
if dtb == 0:
return False
# check for all 0s besides the PCID entries
if dtb & ~0xFFF == 0:
return False
## TODO: we can also add the thread Flink and Blink tests if necessary
except exceptions.InvalidAddressException:
return False
return True
[docs] def add_process_layer(self, config_prefix: str = None, preferred_name: str = None):
"""Constructs a new layer based on the process's DirectoryTableBase."""
parent_layer = self._context.layers[self.vol.layer_name]
if not isinstance(parent_layer, intel.Intel):
# We can't get bits_per_register unless we're an intel space (since that's not defined at the higher layer)
raise TypeError(
"Parent layer is not a translation layer, unable to construct process layer"
)
# Presumably for 64-bit systems, the DTB is defined as an array, rather than an unsigned long long
dtb: int = 0
if isinstance(self.Pcb.DirectoryTableBase, objects.Array):
dtb = self.Pcb.DirectoryTableBase.cast("unsigned long long")
else:
dtb = self.Pcb.DirectoryTableBase
dtb = dtb & ((1 << parent_layer.bits_per_register) - 1)
if preferred_name is None:
preferred_name = self.vol.layer_name + f"_Process{self.UniqueProcessId}"
# Add the constructed layer and return the name
return self._add_process_layer(
self._context, dtb, config_prefix, preferred_name
)
[docs] def get_peb(self) -> interfaces.objects.ObjectInterface:
"""Constructs a PEB object"""
if constants.BANG not in self.vol.type_name:
raise ValueError(
f"Invalid symbol table name syntax (no {constants.BANG} found)"
)
# add_process_layer can raise InvalidAddressException.
# if that happens, we let the exception propagate upwards
proc_layer_name = self.add_process_layer()
proc_layer = self._context.layers[proc_layer_name]
if not proc_layer.is_valid(self.Peb):
raise exceptions.InvalidAddressException(
proc_layer_name, self.Peb, f"Invalid Peb address at {self.Peb:0x}"
)
sym_table = self.get_symbol_table_name()
peb = self._context.object(
f"{sym_table}{constants.BANG}_PEB",
layer_name=proc_layer_name,
offset=self.Peb,
)
return peb
[docs] def load_order_modules(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Generator for DLLs in the order that they were loaded."""
try:
peb = self.get_peb()
for entry in peb.Ldr.InLoadOrderModuleList.to_list(
f"{self.get_symbol_table_name()}{constants.BANG}_LDR_DATA_TABLE_ENTRY",
"InLoadOrderLinks",
):
yield entry
except exceptions.InvalidAddressException:
return None
[docs] def init_order_modules(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Generator for DLLs in the order that they were initialized"""
try:
peb = self.get_peb()
for entry in peb.Ldr.InInitializationOrderModuleList.to_list(
f"{self.get_symbol_table_name()}{constants.BANG}_LDR_DATA_TABLE_ENTRY",
"InInitializationOrderLinks",
):
yield entry
except exceptions.InvalidAddressException:
return None
[docs] def mem_order_modules(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Generator for DLLs in the order that they appear in memory"""
try:
peb = self.get_peb()
for entry in peb.Ldr.InMemoryOrderModuleList.to_list(
f"{self.get_symbol_table_name()}{constants.BANG}_LDR_DATA_TABLE_ENTRY",
"InMemoryOrderLinks",
):
yield entry
except exceptions.InvalidAddressException:
return None
[docs] def get_handle_count(self):
try:
if self.has_member("ObjectTable"):
if self.ObjectTable.has_member("HandleCount"):
return self.ObjectTable.HandleCount
except exceptions.InvalidAddressException:
vollog.log(
constants.LOGLEVEL_VVV,
f"Cannot access _EPROCESS.ObjectTable.HandleCount at {self.vol.offset:#x}",
)
return renderers.UnreadableValue()
[docs] def get_session_id(self):
try:
if self.has_member("Session"):
if self.Session == 0:
return renderers.NotApplicableValue()
symbol_table_name = self.get_symbol_table_name()
kvo = self._context.layers[self.vol.native_layer_name].config[
"kernel_virtual_offset"
]
ntkrnlmp = self._context.module(
symbol_table_name,
layer_name=self.vol.native_layer_name,
offset=kvo,
native_layer_name=self.vol.native_layer_name,
)
session = ntkrnlmp.object(
object_type="_MM_SESSION_SPACE", offset=self.Session, absolute=True
)
if session.has_member("SessionId"):
return session.SessionId
except exceptions.InvalidAddressException:
vollog.log(
constants.LOGLEVEL_VVV,
f"Cannot access _EPROCESS.Session.SessionId at {self.vol.offset:#x}",
)
return renderers.UnreadableValue()
[docs] def get_create_time(self):
return conversion.wintime_to_datetime(self.CreateTime.QuadPart)
[docs] def get_exit_time(self):
return conversion.wintime_to_datetime(self.ExitTime.QuadPart)
[docs] def get_wow_64_process(self):
if self.has_member("Wow64Process"):
return self.Wow64Process
elif self.has_member("WoW64Process"):
return self.WoW64Process
raise AttributeError("Unable to find Wow64Process")
[docs] def get_is_wow64(self):
try:
value = self.get_wow_64_process()
except AttributeError:
return False
if value:
return True
return False
[docs] def get_vad_root(self):
# windows 8 and 2012 (_MM_AVL_TABLE)
if self.VadRoot.has_member("BalancedRoot"):
return self.VadRoot.BalancedRoot
# windows 8.1 and windows 10 (_RTL_AVL_TREE)
elif self.VadRoot.has_member("Root"):
return self.VadRoot.Root.dereference() # .cast("_MMVAD")
else:
# windows xp and 2003
return self.VadRoot.dereference().cast("_MMVAD")
[docs] def environment_variables(self):
"""Generator for environment variables.
The PEB points to our env block - a series of null-terminated
unicode strings. Each string cannot be more than 0x7FFF chars.
End of the list is a quad-null.
"""
context = self._context
process_space = self.add_process_layer()
try:
block = self.get_peb().ProcessParameters.Environment
try:
block_size = self.get_peb().ProcessParameters.EnvironmentSize
except AttributeError: # Windows XP
block_size = self.get_peb().ProcessParameters.Length
envars = (
context.layers[process_space]
.read(block, block_size)
.decode("utf-16-le", errors="replace")
.split("\x00")[:-1]
)
except exceptions.InvalidAddressException:
return # Generation finished
for envar in envars:
split_index = envar.find("=")
env = envar[:split_index]
var = envar[split_index + 1 :]
# Exclude parse problem with some types of env
if env and var:
yield env, var
return # Generation finished
[docs]class LIST_ENTRY(objects.StructType, collections.abc.Iterable):
"""A class for double-linked lists on Windows."""
[docs] def to_list(
self,
symbol_type: str,
member: str,
forward: bool = True,
sentinel: bool = True,
layer: Optional[str] = None,
) -> Iterator[interfaces.objects.ObjectInterface]:
"""Returns an iterator of the entries in the list."""
layer = layer or self.vol.layer_name
relative_offset = self._context.symbol_space.get_type(
symbol_type
).relative_child_offset(member)
direction = "Blink"
if forward:
direction = "Flink"
trans_layer = self._context.layers[layer]
try:
is_valid = trans_layer.is_valid(self.vol.offset)
if not is_valid:
return None
link = getattr(self, direction).dereference()
except exceptions.InvalidAddressException:
return None
if not sentinel:
yield self._context.object(
symbol_type,
layer,
offset=self.vol.offset - relative_offset,
native_layer_name=layer or self.vol.native_layer_name,
)
seen = {self.vol.offset}
while link.vol.offset not in seen:
obj_offset = link.vol.offset - relative_offset
if not trans_layer.is_valid(obj_offset):
return None
obj = self._context.object(
symbol_type,
layer,
offset=obj_offset,
native_layer_name=layer or self.vol.native_layer_name,
)
yield obj
seen.add(link.vol.offset)
try:
link = getattr(link, direction).dereference()
except exceptions.InvalidAddressException:
return None
def __iter__(self) -> Iterator[interfaces.objects.ObjectInterface]:
return self.to_list(self.vol.parent.vol.type_name, self.vol.member_name)
[docs]class TOKEN(objects.StructType):
"""A class for process etoken object."""
[docs] def get_sids(self) -> Iterable[str]:
"""Yield a sid for the current token object."""
if self.UserAndGroupCount < 0xFFFF:
layer_name = self.vol.layer_name
kvo = self._context.layers[layer_name].config["kernel_virtual_offset"]
symbol_table = self.get_symbol_table_name()
ntkrnlmp = self._context.module(
symbol_table, layer_name=layer_name, offset=kvo
)
UserAndGroups = ntkrnlmp.object(
object_type="array",
offset=self.UserAndGroups.dereference().vol.get("offset") - kvo,
subtype=ntkrnlmp.get_type("_SID_AND_ATTRIBUTES"),
count=self.UserAndGroupCount,
)
for sid_and_attr in UserAndGroups:
try:
sid = sid_and_attr.Sid.dereference().cast("_SID")
# catch invalid pointers (UserAndGroupCount is too high)
if sid is None:
return None
# this mimics the windows API IsValidSid
if sid.Revision & 0xF != 1 or sid.SubAuthorityCount > 15:
return None
id_auth = ""
for i in sid.IdentifierAuthority.Value:
id_auth = i
SubAuthority = ntkrnlmp.object(
object_type="array",
offset=sid.SubAuthority.vol.offset - kvo,
subtype=ntkrnlmp.get_type("unsigned long"),
count=int(sid.SubAuthorityCount),
)
yield "S-" + "-".join(
str(i) for i in (sid.Revision, id_auth) + tuple(SubAuthority)
)
except exceptions.InvalidAddressException:
vollog.log(
constants.LOGLEVEL_VVVV,
"InvalidAddressException while parsing for token sid",
)
[docs] def privileges(self):
"""Return a list of privileges for the current token object."""
try:
for priv_index in range(64):
yield (
priv_index,
bool(self.Privileges.Present & (2**priv_index)),
bool(self.Privileges.Enabled & (2**priv_index)),
bool(self.Privileges.EnabledByDefault & (2**priv_index)),
)
except AttributeError: # Windows XP
if self.PrivilegeCount < 1024:
# This is a pointer to an array of _LUID_AND_ATTRIBUTES
for luid in self.Privileges.dereference().cast(
"array",
count=self.PrivilegeCount,
subtype=self._context.symbol_space[
self.get_symbol_table_name()
].get_type("_LUID_AND_ATTRIBUTES"),
):
# The Attributes member is a flag
enabled = luid.Attributes & 2 != 0
default = luid.Attributes & 1 != 0
yield luid.Luid.LowPart, True, enabled, default
else:
vollog.log(constants.LOGLEVEL_VVVV, "Broken Token Privileges.")
[docs]class KTHREAD(objects.StructType):
"""A class for thread control block objects."""
[docs] def get_state(self) -> str:
dictState = {
0: "Initialized",
1: "Ready",
2: "Running",
3: "Standby",
4: "Terminated",
5: "Waiting",
6: "Transition",
7: "DeferredReady",
8: "GateWait",
}
return dictState.get(self.State, renderers.NotApplicableValue())
[docs] def get_wait_reason(self) -> str:
dictWaitReason = {
0: "Executive",
1: "FreePage",
2: "PageIn",
3: "PoolAllocation",
4: "DelayExecution",
5: "Suspended",
6: "UserRequest",
7: "WrExecutive",
8: "WrFreePage",
9: "WrPageIn",
10: "WrPoolAllocation",
11: "WrDelayExecution",
12: "WrSuspended",
13: "WrUserRequest",
14: "WrEventPair",
15: "WrQueue",
16: "WrLpcReceive",
17: "WrLpcReply",
18: "WrVirtualMemory",
19: "WrPageOut",
20: "WrRendezvous",
21: "Spare2",
22: "Spare3",
23: "Spare4",
24: "Spare5",
25: "Spare6",
26: "WrKernel",
27: "WrResource",
28: "WrPushLock",
29: "WrMutex",
30: "WrQuantumEnd",
31: "WrDispatchInt",
32: "WrPreempted",
33: "WrYieldExecution",
34: "WrFastMutex",
35: "WrGuardedMutex",
36: "WrRundown",
37: "MaximumWaitReason",
}
return dictWaitReason.get(self.WaitReason, renderers.NotApplicableValue())
[docs]class CONTROL_AREA(objects.StructType):
"""A class for _CONTROL_AREA structures"""
PAGE_SIZE = 0x1000
PAGE_MASK = PAGE_SIZE - 1
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
try:
# The Segment.ControlArea should point back to this object
if self.Segment.ControlArea != self.vol.offset:
return False
# The SizeOfSegment should match the total PTEs multiplied by a default page size
if self.Segment.SizeOfSegment != (
self.Segment.TotalNumberOfPtes * self.PAGE_SIZE
):
return False
# The first SubsectionBase should not be page aligned
# subsection = self.get_subsection()
# if subsection.SubsectionBase & self.PAGE_MASK == 0:
# return False
except exceptions.InvalidAddressException:
return False
# True if everything else passes
return True
[docs] def get_subsection(self) -> interfaces.objects.ObjectInterface:
"""Get the Subsection object, which is found immediately after the _CONTROL_AREA."""
return self._context.object(
self.get_symbol_table_name() + constants.BANG + "_SUBSECTION",
layer_name=self.vol.layer_name,
offset=self.vol.offset + self.vol.size,
native_layer_name=self.vol.native_layer_name,
)
[docs] def get_pte(self, offset: int) -> interfaces.objects.ObjectInterface:
"""Get a PTE object at the requested offset"""
return self._context.object(
self.get_symbol_table_name() + constants.BANG + "_MMPTE",
layer_name=self.vol.layer_name,
offset=offset,
native_layer_name=self.vol.native_layer_name,
)
[docs] def get_available_pages(self) -> Iterable[Tuple[int, int, int]]:
"""Get the available pages that correspond to a cached file.
The tuples generated are (physical_offset, file_offset, page_size).
"""
symbol_table_name = self.get_symbol_table_name()
mmpte_type = self._context.symbol_space.get_type(
symbol_table_name + constants.BANG + "_MMPTE"
)
mmpte_size = mmpte_type.size
subsection = self.get_subsection()
is_64bit = symbols.symbol_table_is_64bit(self._context, symbol_table_name)
is_pae = self._context.layers[self.vol.layer_name].metadata.get("pae", False)
# This is a null-terminated single-linked list.
while subsection != 0:
try:
if subsection.ControlArea != self.vol.offset:
break
except exceptions.InvalidAddressException:
break
# The offset into the file is stored implicitly based on the PTE location within the Subsection.
starting_sector = subsection.StartingSector
subsection_offset = starting_sector * 0x200
# Similar to the check in is_valid(), make sure the SubsectionBase is not page aligned.
# if subsection.SubsectionBase & self.PAGE_MASK == 0:
# break
ptecount = 0
while ptecount < subsection.PtesInSubsection:
pte_offset = subsection.SubsectionBase + (mmpte_size * ptecount)
file_offset = subsection_offset + ptecount * 0x1000
try:
mmpte = self.get_pte(pte_offset)
except exceptions.InvalidAddressException:
ptecount += 1
continue
# First we check if the entry is valid. If so, then we get the physical offset.
# The valid entries are actually handled by the hardware.
if mmpte.u.Hard.Valid == 1:
physoffset = mmpte.u.Hard.PageFrameNumber << 12
yield physoffset, file_offset, self.PAGE_SIZE
elif mmpte.u.Soft.Prototype == 1:
if not is_64bit and not is_pae:
subsection_offset = (
mmpte.u.Subsect.SubsectionAddressHigh << 7
) | (mmpte.u.Subsect.SubsectionAddressLow << 3)
# If the entry is not a valid physical address then see if it is in transition.
elif mmpte.u.Trans.Transition == 1:
# TODO: Fix appropriately in a future release.
# Currently just a temporary workaround to deal with custom bit flag
# in the PFN field for pages in transition state.
# See https://github.com/volatilityfoundation/volatility3/pull/475
physoffset = (mmpte.u.Trans.PageFrameNumber & ((1 << 33) - 1)) << 12
yield physoffset, file_offset, self.PAGE_SIZE
# Go to the next PTE entry
ptecount += 1
# Go to the next Subsection in the single-linked list
subsection = subsection.NextSubsection
[docs]class VACB(objects.StructType):
"""A class for _VACB structures"""
FILEOFFSET_MASK = 0xFFFFFFFFFFFF0000
[docs] def get_file_offset(self) -> int:
# The FileOffset member of VACB is used to denote the offset within the file where the
# view begins. Since all views are 256 KB in size, the bottom 16 bits are used to
# store the number of references to the view.
return self.Overlay.FileOffset.QuadPart & self.FILEOFFSET_MASK
[docs]class SHARED_CACHE_MAP(objects.StructType):
"""A class for _SHARED_CACHE_MAP structures"""
VACB_BLOCK = 0x40000
VACB_OFFSET_SHIFT = 18
VACB_LEVEL_SHIFT = 7
VACB_SIZE_OF_FIRST_LEVEL = 1 << (VACB_OFFSET_SHIFT + VACB_LEVEL_SHIFT)
VACB_ARRAY = 0x80
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
if self.FileSize.QuadPart <= 0 or self.ValidDataLength.QuadPart <= 0:
return False
if self.SectionSize.QuadPart < 0 or (
(self.FileSize.QuadPart < self.ValidDataLength.QuadPart)
and (self.ValidDataLength.QuadPart != 0x7FFFFFFFFFFFFFFF)
):
return False
return True
[docs] def process_index_array(
self,
array_pointer: interfaces.objects.ObjectInterface,
level: int,
limit: int,
vacb_list: Optional[List] = None,
) -> List:
"""Recursively process the sparse multilevel VACB index array.
:param array_pointer: The address of a possible index array
:param level: The current level
:param limit: The level where we abandon all hope. Ideally this is 7
:param vacb_list: An array of collected VACBs
:return: Collected VACBs
"""
if vacb_list is None:
vacb_list = []
if level > limit:
return []
symbol_table_name = self.get_symbol_table_name()
pointer_type = self._context.symbol_space.get_type(
symbol_table_name + constants.BANG + "pointer"
)
# Create an array of 128 entries for the VACB index array
vacb_array = self._context.object(
object_type=symbol_table_name + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=array_pointer,
count=self.VACB_ARRAY,
subtype=pointer_type,
)
# Iterate through the entries
for counter in range(0, self.VACB_ARRAY):
# Check if the VACB entry is in use
if not vacb_array[counter]:
continue
vacb_obj = (
vacb_array[counter]
.dereference()
.cast(symbol_table_name + constants.BANG + "_VACB")
)
if vacb_obj.SharedCacheMap == self.vol.offset:
self.save_vacb(vacb_obj, vacb_list)
else:
# Process the next level of the multi-level array
vacb_list = self.process_index_array(
vacb_array[counter], level + 1, limit, vacb_list
)
return vacb_list
[docs] def save_vacb(self, vacb_obj: interfaces.objects.ObjectInterface, vacb_list: List):
data = (
int(vacb_obj.BaseAddress),
int(vacb_obj.get_file_offset()),
self.VACB_BLOCK,
)
vacb_list.append(data)
[docs] def get_available_pages(self) -> List:
"""Get the available pages that correspond to a cached file.
The lists generated are (virtual_offset, file_offset, page_size).
"""
vacb_list = []
section_size = self.SectionSize.QuadPart
# Determine the number of VACBs within the cache (nonpaged). each VACB
# represents a 256-KB view in the system cache.
full_blocks = section_size // self.VACB_BLOCK
left_over = section_size % self.VACB_BLOCK
# As an optimization, the shared cache map object contains a VACB index array of four entries.
# The VACB index arrays are arrays of pointers to VACBs, that track which views of a given file
# are mapped in the cache. For example, the first entry in the VACB index array refers to the first
# 256 KB of the file. The InitialVacbs can describe a file up to 1 MB (4xVACB).
iterval = 0
while (iterval < full_blocks) and (full_blocks <= 4):
vacb_obj = self.InitialVacbs[iterval]
with contextlib.suppress(exceptions.InvalidAddressException):
# Make sure that the SharedCacheMap member of the VACB points back to the parent object.
if vacb_obj.SharedCacheMap == self.vol.offset:
self.save_vacb(vacb_obj, vacb_list)
iterval += 1
# We also have to account for the spill over data that is not found in the full blocks.
# The first case to consider is when the spill over is still in InitialVacbs.
if (left_over > 0) and (full_blocks < 4):
vacb_obj = self.InitialVacbs[iterval]
if vacb_obj.SharedCacheMap == self.vol.offset:
self.save_vacb(vacb_obj, vacb_list)
# If the file is larger than 1 MB, a separate VACB index array needs to be allocated.
# This is based on how many 256 KB blocks would be required for the size of the file.
# This newly allocated VACB index array is found through the Vacbs member of SHARED_CACHE_MAP.
vacb_obj = self.Vacbs
# Note: avoid calling is_valid() here, since self.Vacbs is a pointer to a pointer
if not vacb_obj:
return vacb_list
# There are a number of instances where the initial value in InitialVacb will also be the fist
# entry in Vacbs. Thus we ignore, since it was already processed. It is possible to just
# process again as the file offset is specified for each VACB.
if self.InitialVacbs[0].vol.offset == vacb_obj:
return vacb_list
# If the file is less than 32 MB than it can be found in a single level VACB index array.
symbol_table_name = self.get_symbol_table_name()
pointer_type = self._context.symbol_space.get_type(
symbol_table_name + constants.BANG + "pointer"
)
size_of_pointer = pointer_type.size
if not section_size > self.VACB_SIZE_OF_FIRST_LEVEL:
array_head = vacb_obj
for counter in range(0, full_blocks):
vacb_entry = self._context.object(
symbol_table_name + constants.BANG + "pointer",
layer_name=self.vol.layer_name,
offset=array_head + (counter * size_of_pointer),
)
# If we find a zero entry, then we proceed to the next one. If the entry is zero,
# then the view is not mapped and we skip. We do not pad because we use the
# FileOffset to seek to the correct offset in the file.
if not vacb_entry:
continue
vacb = vacb_entry.dereference().cast(
symbol_table_name + constants.BANG + "_VACB"
)
if vacb.SharedCacheMap == self.vol.offset:
self.save_vacb(vacb, vacb_list)
if left_over > 0:
vacb_entry = self._context.object(
symbol_table_name + constants.BANG + "pointer",
layer_name=self.vol.layer_name,
offset=array_head + ((counter + 1) * size_of_pointer),
)
if not vacb_entry:
return vacb_list
vacb = vacb_entry.dereference().cast(
symbol_table_name + constants.BANG + "_VACB"
)
if vacb.SharedCacheMap == self.vol.offset:
self.save_vacb(vacb, vacb_list)
# The file is less than 32 MB, so we can stop processing.
return vacb_list
# If we get to this point, then we know that the SectionSize is greater than
# VACB_SIZE_OF_FIRST_LEVEL (32 MB). Then we have a "sparse" multilevel index
# array where each VACB index array is made up of 128 entries. We no
# longer assume the data is sequential. (Log2 (32 MB) - 18)/7
level_depth = math.ceil(math.log(section_size, 2))
level_depth = (level_depth - self.VACB_OFFSET_SHIFT) / self.VACB_LEVEL_SHIFT
level_depth = math.ceil(level_depth)
limit_depth = level_depth
if section_size > self.VACB_SIZE_OF_FIRST_LEVEL:
# Create an array of 128 entries for the VACB index array.
vacb_array = self._context.object(
object_type=symbol_table_name + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=vacb_obj,
count=self.VACB_ARRAY,
subtype=pointer_type,
)
# Walk the array and if any entry points to the shared cache map object then we extract it.
# Otherwise, if it is non-zero, then traverse to the next level.
for counter in range(0, self.VACB_ARRAY):
if not vacb_array[counter]:
continue
vacb = (
vacb_array[counter]
.dereference()
.cast(symbol_table_name + constants.BANG + "_VACB")
)
if vacb.SharedCacheMap == self.vol.offset:
self.save_vacb(vacb, vacb_list)
else:
# Process the next level of the multi-level array. We set the limit_depth to be
# the depth of the tree as determined from the size and we initialize the
# current level to 2.
vacb_list = self.process_index_array(
vacb_array[counter], 2, limit_depth, vacb_list
)
return vacb_list