Source code for volatility3.framework.symbols.windows.extensions.pool
import contextlib
import functools
import logging
import struct
from typing import Dict, List, Optional, Tuple, Union
from volatility3.plugins.windows.poolscanner import PoolConstraint
from volatility3.framework import constants, exceptions, interfaces, objects, renderers, symbols
from volatility3.framework.renderers import conversion
vollog = logging.getLogger(__name__)
[docs]class POOL_HEADER(objects.StructType):
"""A kernel pool allocation header.
Exists at the base of the allocation and provides a tag that we can
scan for.
"""
[docs] def get_object(self,
constraint: PoolConstraint,
use_top_down: bool,
kernel_symbol_table: Optional[str] = None,
native_layer_name: Optional[str] = None) -> Optional[interfaces.objects.ObjectInterface]:
"""Carve an object or data structure from a kernel pool allocation
Args:
constraint: a PoolConstraint object used to get the pool allocation header object
use_top_down: for delineating how a windows version finds the size of the object body
kernel_symbol_table: in case objects of a different symbol table are scanned for
native_layer_name: the name of the layer where the data originally lived
Returns:
An object as found from a POOL_HEADER
"""
type_name = constraint.type_name
executive = constraint.object_type is not None
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
if constants.BANG in type_name:
symbol_table_name, type_name = type_name.split(constants.BANG)[0:2]
# when checking for symbols from a table other than nt_symbols grab _OBJECT_HEADER from the kernel
# because symbol_table_name will be different from kernel_symbol_table.
if kernel_symbol_table:
object_header_type = self._context.symbol_space.get_type(kernel_symbol_table + constants.BANG +
"_OBJECT_HEADER")
else:
# otherwise symbol_table_name *is* the kernel symbol table, so just use that.
object_header_type = self._context.symbol_space.get_type(symbol_table_name + constants.BANG +
"_OBJECT_HEADER")
pool_header_size = self.vol.size
# if there is no object type, then just instantiate a structure
if not executive:
mem_object = self._context.object(symbol_table_name + constants.BANG + type_name,
layer_name = self.vol.layer_name,
offset = self.vol.offset + pool_header_size,
native_layer_name = native_layer_name)
yield mem_object
# otherwise we have an executive object in the pool
else:
if symbols.symbol_table_is_64bit(self._context, symbol_table_name):
alignment = 16
else:
alignment = 8
# use the top down approach for windows 8 and later
if use_top_down:
body_offset = object_header_type.relative_child_offset('Body')
infomask_offset = object_header_type.relative_child_offset('InfoMask')
pointercount_offset = object_header_type.relative_child_offset('PointerCount')
pointercount_size = object_header_type.members['PointerCount'][1].size
optional_headers, lengths_of_optional_headers = self._calculate_optional_header_lengths(
self._context, symbol_table_name)
padding_available = None if 'PADDING_INFO' not in optional_headers else optional_headers.index(
'PADDING_INFO')
max_optional_headers_length = sum(lengths_of_optional_headers)
# define the starting and ending bounds for the scan
start_offset = self.vol.offset + pool_header_size
addr_limit = min(max_optional_headers_length, self.BlockSize * alignment)
# A single read is better than lots of little one-byte reads.
# We're ok padding this, because the byte we'd check would be 0 which would only be valid if there
# were no optional headers in the first place (ie, if we read too much for headers that don't exist,
# but the bit we could read were valid)
infomask_data = self._context.layers[self.vol.layer_name].read(start_offset,
addr_limit + infomask_offset,
pad = True)
# Addr stores the offset to the potential start of the OBJECT_HEADER from just after the POOL_HEADER
# It will always be aligned to a particular alignment
for addr in range(0, addr_limit, alignment):
infomask_value = infomask_data[addr + infomask_offset]
pointercount_value = int.from_bytes(
infomask_data[addr + pointercount_offset:addr + pointercount_offset + pointercount_size],
byteorder = 'little',
signed = True)
if not 0x1000000 > pointercount_value >= 0:
continue
padding_present = False
optional_headers_length = 0
for i in range(len(lengths_of_optional_headers)):
if infomask_value & (1 << i):
optional_headers_length += lengths_of_optional_headers[i]
if i == padding_available:
padding_present = True
# PADDING_INFO is a special case (4 bytes that contain the total padding length)
padding_length = 0
if padding_present:
# Read the four bytes from just before the next optional_headers_length minus the padding_info size
#
# ---------------
# POOL_HEADER
# ---------------
#
# start of PADDING_INFO
# ---------------
# End of other optional headers
# ---------------
# OBJECT_HEADER
# ---------------
if addr - optional_headers_length < 0:
continue
padding_length, = struct.unpack(
"<I", infomask_data[addr - optional_headers_length:addr - optional_headers_length + 4])
padding_length -= lengths_of_optional_headers[padding_available or 0]
# Certain versions of windows have PADDING_INFO lengths that are too long
# So we now check that the padding length is at a minimum the right length
# and that it doesn't go beyond the entirety of the data
if addr - optional_headers_length >= padding_length > addr:
continue
with contextlib.suppress(TypeError, exceptions.InvalidAddressException):
mem_object = self._context.object(symbol_table_name + constants.BANG + type_name,
layer_name = self.vol.layer_name,
offset = addr + body_offset + start_offset,
native_layer_name = native_layer_name)
if mem_object.is_valid():
yield mem_object
# use the bottom up approach for windows 7 and earlier
else:
type_size = self._context.symbol_space.get_type(symbol_table_name + constants.BANG + type_name).size
if constraint.additional_structures:
for additional_structure in constraint.additional_structures:
type_size += self._context.symbol_space.get_type(
symbol_table_name + constants.BANG + additional_structure).size
rounded_size = conversion.round(type_size, alignment, up = True)
mem_object = self._context.object(symbol_table_name + constants.BANG + type_name,
layer_name = self.vol.layer_name,
offset = self.vol.offset + self.BlockSize * alignment - rounded_size,
native_layer_name = native_layer_name)
with contextlib.suppress(TypeError, exceptions.InvalidAddressException):
if mem_object.is_valid():
yield mem_object
@classmethod
@functools.lru_cache()
def _calculate_optional_header_lengths(cls, context: interfaces.context.ContextInterface,
symbol_table_name: str) -> Tuple[List[str], List[int]]:
headers = []
sizes = []
for header in [
'CREATOR_INFO', 'NAME_INFO', 'HANDLE_INFO', 'QUOTA_INFO', 'PROCESS_INFO', 'AUDIT_INFO', 'EXTENDED_INFO',
'HANDLE_REVOCATION_INFO', 'PADDING_INFO'
]:
with contextlib.suppress(AttributeError, exceptions.SymbolError):
type_name = f"{symbol_table_name}{constants.BANG}_OBJECT_HEADER_{header}"
header_type = context.symbol_space.get_type(type_name)
headers.append(header)
sizes.append(header_type.size)
# Some of these may not exist, for example:
# if build < 9200: PADDING_INFO else: AUDIT_INFO
# if build == 10586: HANDLE_REVOCATION_INFO else EXTENDED_INFO
# based on what's present and what's not, this list should be the right order and the right length
return headers, sizes
[docs]class POOL_HEADER_VISTA(POOL_HEADER):
"""A kernel pool allocation header, updated for Vista and later.
Exists at the base of the allocation and provides a tag that we can
scan for.
"""
[docs]class POOL_TRACKER_BIG_PAGES(objects.StructType):
"""A kernel big page pool tracker."""
pool_type_lookup: Dict[str, str] = {}
def _generate_pool_type_lookup(self):
# Enumeration._generate_inverse_choices() raises ValueError because multiple enum names map to the same
# value in the kernel _POOL_TYPE so create a custom mapping here and take the first match
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
pool_type_enum = self._context.symbol_space.get_enumeration(symbol_table_name + constants.BANG + "_POOL_TYPE")
for k, v in pool_type_enum.choices.items():
if v not in self.pool_type_lookup:
self.pool_type_lookup[v] = k
[docs] def is_free(self) -> bool:
"""Returns if the allocation is freed (True) or in-use (False)"""
return self.Va & 1 == 1
[docs] def get_key(self) -> str:
"""Returns the Key value as a 4 character string"""
tag_bytes = objects.convert_value_to_data(self.Key, int, objects.DataFormatInfo(4, "little", False))
return "".join([chr(x) if 32 < x < 127 else '' for x in tag_bytes])
[docs] def get_pool_type(self) -> Union[str, interfaces.renderers.BaseAbsentValue]:
"""Returns the enum name for the PoolType value on applicable systems"""
# Not applicable until Vista
if hasattr(self, 'PoolType'):
if not self.pool_type_lookup:
self._generate_pool_type_lookup()
return self.pool_type_lookup.get(self.PoolType, f"Unknown choice {self.PoolType}")
else:
return renderers.NotApplicableValue()
[docs] def get_number_of_bytes(self) -> Union[int, interfaces.renderers.BaseAbsentValue]:
"""Returns the NumberOfBytes value on applicable systems"""
# Not applicable until Vista
try:
return self.NumberOfBytes
except AttributeError:
return renderers.NotApplicableValue()
[docs]class ExecutiveObject(interfaces.objects.ObjectInterface):
"""This is used as a "mixin" that provides all kernel executive objects
with a means of finding their own object header."""
[docs] def get_object_header(self) -> 'OBJECT_HEADER':
if constants.BANG not in self.vol.type_name:
raise ValueError(f"Invalid symbol table name syntax (no {constants.BANG} found)")
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
body_offset = self._context.symbol_space.get_type(symbol_table_name + constants.BANG +
"_OBJECT_HEADER").relative_child_offset("Body")
return self._context.object(symbol_table_name + constants.BANG + "_OBJECT_HEADER",
layer_name = self.vol.layer_name,
offset = self.vol.offset - body_offset,
native_layer_name = self.vol.native_layer_name)
[docs]class OBJECT_HEADER(objects.StructType):
"""A class for the headers for executive kernel objects, which contains
quota information, ownership details, naming data, and ACLs."""
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
# if self.InfoMask > 0x48:
# return False
try:
if self.PointerCount > 0x1000000 or self.PointerCount < 0:
return False
except exceptions.InvalidAddressException:
return False
return True
[docs] def get_object_type(self, type_map: Dict[int, str], cookie: int = None) -> Optional[str]:
"""Across all Windows versions, the _OBJECT_HEADER embeds details on
the type of object (i.e. process, file) but the way its embedded
differs between versions.
This API abstracts away those details.
"""
if self.vol.get('object_header_object_type', None) is not None:
return self.vol.object_header_object_type
try:
# vista and earlier have a Type member
self._vol['object_header_object_type'] = self.Type.Name.String
except AttributeError:
# windows 7 and later have a TypeIndex, but windows 10
# further encodes the index value with nt1!ObHeaderCookie
try:
type_index = ((self.vol.offset >> 8) ^ cookie ^ self.TypeIndex) & 0xFF
except (AttributeError, TypeError):
type_index = self.TypeIndex
self._vol['object_header_object_type'] = type_map.get(type_index)
return self.vol.object_header_object_type
@property
def NameInfo(self) -> interfaces.objects.ObjectInterface:
if constants.BANG not in self.vol.type_name:
raise ValueError(f"Invalid symbol table name syntax (no {constants.BANG} found)")
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
try:
header_offset = self.NameInfoOffset
except AttributeError:
# http://codemachine.com/article_objectheader.html (Windows 7 and later)
name_info_bit = 0x2
layer = self._context.layers[self.vol.native_layer_name]
kvo = layer.config.get("kernel_virtual_offset", None)
if kvo is None:
raise AttributeError(f"Could not find kernel_virtual_offset for layer: {self.vol.layer_name}")
ntkrnlmp = self._context.module(symbol_table_name, layer_name = self.vol.layer_name, offset = kvo)
address = ntkrnlmp.get_symbol("ObpInfoMaskToOffset").address
calculated_index = self.InfoMask & (name_info_bit | (name_info_bit - 1))
header_offset = self._context.object(symbol_table_name + constants.BANG + "unsigned char",
layer_name = self.vol.native_layer_name,
offset = kvo + address + calculated_index)
if header_offset == 0:
raise ValueError("Could not find _OBJECT_HEADER_NAME_INFO for object at {} of layer {}".format(
self.vol.offset, self.vol.layer_name))
header = self._context.object(symbol_table_name + constants.BANG + "_OBJECT_HEADER_NAME_INFO",
layer_name = self.vol.layer_name,
offset = self.vol.offset - header_offset,
native_layer_name = self.vol.native_layer_name)
return header