Source code for volatility3.framework.symbols.windows.extensions.pool
import functools
import logging
import struct
from typing import Optional, Tuple, List, Dict, Union
from volatility3.framework import objects, interfaces, constants, symbols, exceptions, renderers
from volatility3.framework.renderers import conversion
vollog = logging.getLogger(__name__)
[docs]class POOL_HEADER(objects.StructType):
"""A kernel pool allocation header.
Exists at the base of the allocation and provides a tag that we can
scan for.
"""
[docs] def get_object(self,
type_name: str,
use_top_down: bool,
executive: bool = False,
kernel_symbol_table: Optional[str] = None,
native_layer_name: Optional[str] = None) -> Optional[interfaces.objects.ObjectInterface]:
"""Carve an object or data structure from a kernel pool allocation
Args:
type_name: the data structure type name
native_layer_name: the name of the layer where the data originally lived
object_type: the object type (executive kernel objects only)
kernel_symbol_table: in case objects of a different symbol table are scanned for
Returns:
An object as found from a POOL_HEADER
"""
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
if constants.BANG in type_name:
symbol_table_name, type_name = type_name.split(constants.BANG)[0:2]
# when checking for symbols from a table other than nt_symbols grab _OBJECT_HEADER from the kernel
# because symbol_table_name will be different from kernel_symbol_table.
if kernel_symbol_table:
object_header_type = self._context.symbol_space.get_type(kernel_symbol_table + constants.BANG +
"_OBJECT_HEADER")
else:
# otherwise symbol_table_name *is* the kernel symbol table, so just use that.
object_header_type = self._context.symbol_space.get_type(symbol_table_name + constants.BANG +
"_OBJECT_HEADER")
pool_header_size = self.vol.size
# if there is no object type, then just instantiate a structure
if not executive:
mem_object = self._context.object(symbol_table_name + constants.BANG + type_name,
layer_name = self.vol.layer_name,
offset = self.vol.offset + pool_header_size,
native_layer_name = native_layer_name)
return mem_object
# otherwise we have an executive object in the pool
else:
if symbols.symbol_table_is_64bit(self._context, symbol_table_name):
alignment = 16
else:
alignment = 8
# use the top down approach for windows 8 and later
if use_top_down:
body_offset = object_header_type.relative_child_offset('Body')
infomask_offset = object_header_type.relative_child_offset('InfoMask')
pointercount_offset = object_header_type.relative_child_offset('PointerCount')
pointercount_size = object_header_type.members['PointerCount'][1].size
optional_headers, lengths_of_optional_headers = self._calculate_optional_header_lengths(
self._context, symbol_table_name)
padding_available = None if 'PADDING_INFO' not in optional_headers else optional_headers.index(
'PADDING_INFO')
max_optional_headers_length = sum(lengths_of_optional_headers)
# define the starting and ending bounds for the scan
start_offset = self.vol.offset + pool_header_size
addr_limit = min(max_optional_headers_length, self.BlockSize * alignment)
# A single read is better than lots of little one-byte reads.
# We're ok padding this, because the byte we'd check would be 0 which would only be valid if there
# were no optional headers in the first place (ie, if we read too much for headers that don't exist,
# but the bit we could read were valid)
infomask_data = self._context.layers[self.vol.layer_name].read(start_offset,
addr_limit + infomask_offset,
pad = True)
# Addr stores the offset to the potential start of the OBJECT_HEADER from just after the POOL_HEADER
# It will always be aligned to a particular alignment
for addr in range(0, addr_limit, alignment):
infomask_value = infomask_data[addr + infomask_offset]
pointercount_value = int.from_bytes(
infomask_data[addr + pointercount_offset:addr + pointercount_offset + pointercount_size],
byteorder = 'little',
signed = True)
if not 0x1000000 > pointercount_value >= 0:
continue
padding_present = False
optional_headers_length = 0
for i in range(len(lengths_of_optional_headers)):
if infomask_value & (1 << i):
optional_headers_length += lengths_of_optional_headers[i]
if i == padding_available:
padding_present = True
# PADDING_INFO is a special case (4 bytes that contain the total padding length)
padding_length = 0
if padding_present:
# Read the four bytes from just before the next optional_headers_length minus the padding_info size
#
# ---------------
# POOL_HEADER
# ---------------
#
# start of PADDING_INFO
# ---------------
# End of other optional headers
# ---------------
# OBJECT_HEADER
# ---------------
if addr - optional_headers_length < 0:
continue
padding_length = struct.unpack(
"<I", infomask_data[addr - optional_headers_length:addr - optional_headers_length + 4])[0]
padding_length -= lengths_of_optional_headers[padding_available or 0]
# Certain versions of windows have PADDING_INFO lengths that are too long
# So we now check that the padding length is at a minimum the right length
# and that it doesn't go beyond the entirety of the data
if addr - optional_headers_length >= padding_length > addr:
continue
try:
mem_object = self._context.object(symbol_table_name + constants.BANG + type_name,
layer_name = self.vol.layer_name,
offset = addr + body_offset + start_offset,
native_layer_name = native_layer_name)
if mem_object.is_valid():
return mem_object
except (TypeError, exceptions.InvalidAddressException):
pass
# use the bottom up approach for windows 7 and earlier
else:
type_size = self._context.symbol_space.get_type(symbol_table_name + constants.BANG + type_name).size
rounded_size = conversion.round(type_size, alignment, up = True)
mem_object = self._context.object(symbol_table_name + constants.BANG + type_name,
layer_name = self.vol.layer_name,
offset = self.vol.offset + self.BlockSize * alignment - rounded_size,
native_layer_name = native_layer_name)
try:
if mem_object.is_valid():
return mem_object
except (TypeError, exceptions.InvalidAddressException):
return None
return None
@classmethod
@functools.lru_cache()
def _calculate_optional_header_lengths(cls, context: interfaces.context.ContextInterface,
symbol_table_name: str) -> Tuple[List[str], List[int]]:
headers = []
sizes = []
for header in [
'CREATOR_INFO', 'NAME_INFO', 'HANDLE_INFO', 'QUOTA_INFO', 'PROCESS_INFO', 'AUDIT_INFO', 'EXTENDED_INFO',
'HANDLE_REVOCATION_INFO', 'PADDING_INFO'
]:
try:
type_name = "{}{}_OBJECT_HEADER_{}".format(symbol_table_name, constants.BANG, header)
header_type = context.symbol_space.get_type(type_name)
headers.append(header)
sizes.append(header_type.size)
except (AttributeError, exceptions.SymbolError):
# Some of these may not exist, for example:
# if build < 9200: PADDING_INFO else: AUDIT_INFO
# if build == 10586: HANDLE_REVOCATION_INFO else EXTENDED_INFO
# based on what's present and what's not, this list should be the right order and the right length
pass
return headers, sizes
[docs]class POOL_HEADER_VISTA(POOL_HEADER):
"""A kernel pool allocation header, updated for Vista and later.
Exists at the base of the allocation and provides a tag that we can
scan for.
"""
[docs]class POOL_TRACKER_BIG_PAGES(objects.StructType):
"""A kernel big page pool tracker."""
pool_type_lookup = {} # type: Dict[str, str]
def _generate_pool_type_lookup(self):
# Enumeration._generate_inverse_choices() raises ValueError because multiple enum names map to the same
# value in the kernel _POOL_TYPE so create a custom mapping here and take the first match
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
pool_type_enum = self._context.symbol_space.get_enumeration(symbol_table_name + constants.BANG + "_POOL_TYPE")
for k, v in pool_type_enum.choices.items():
if v not in self.pool_type_lookup:
self.pool_type_lookup[v] = k
# return self.Va > 0x1
[docs] def get_key(self) -> str:
"""Returns the Key value as a 4 character string"""
tag_bytes = objects.convert_value_to_data(self.Key, int, objects.DataFormatInfo(4, "little", False))
return "".join([chr(x) if 32 < x < 127 else '' for x in tag_bytes])
[docs] def get_pool_type(self) -> Union[str, interfaces.renderers.BaseAbsentValue]:
"""Returns the enum name for the PoolType value on applicable systems"""
# Not applicable until Vista
if hasattr(self, 'PoolType'):
if not self.pool_type_lookup:
self._generate_pool_type_lookup()
return self.pool_type_lookup.get(self.PoolType, "Unknown choice {}".format(self.PoolType))
else:
return renderers.NotApplicableValue()
[docs] def get_number_of_bytes(self) -> Union[int, interfaces.renderers.BaseAbsentValue]:
"""Returns the NumberOfBytes value on applicable systems"""
# Not applicable until Vista
try:
return self.NumberOfBytes
except AttributeError:
return renderers.NotApplicableValue()
[docs]class ExecutiveObject(interfaces.objects.ObjectInterface):
"""This is used as a "mixin" that provides all kernel executive objects
with a means of finding their own object header."""
[docs] def get_object_header(self) -> 'OBJECT_HEADER':
if constants.BANG not in self.vol.type_name:
raise ValueError("Invalid symbol table name syntax (no {} found)".format(constants.BANG))
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
body_offset = self._context.symbol_space.get_type(symbol_table_name + constants.BANG +
"_OBJECT_HEADER").relative_child_offset("Body")
return self._context.object(symbol_table_name + constants.BANG + "_OBJECT_HEADER",
layer_name = self.vol.layer_name,
offset = self.vol.offset - body_offset,
native_layer_name = self.vol.native_layer_name)
[docs]class OBJECT_HEADER(objects.StructType):
"""A class for the headers for executive kernel objects, which contains
quota information, ownership details, naming data, and ACLs."""
[docs] def is_valid(self) -> bool:
"""Determine if the object is valid."""
# if self.InfoMask > 0x48:
# return False
try:
if self.PointerCount > 0x1000000 or self.PointerCount < 0:
return False
except exceptions.InvalidAddressException:
return False
return True
[docs] def get_object_type(self, type_map: Dict[int, str], cookie: int = None) -> Optional[str]:
"""Across all Windows versions, the _OBJECT_HEADER embeds details on
the type of object (i.e. process, file) but the way its embedded
differs between versions.
This API abstracts away those details.
"""
if self.vol.get('object_header_object_type', None) is not None:
return self.vol.object_header_object_type
try:
# vista and earlier have a Type member
self._vol['object_header_object_type'] = self.Type.Name.String
except AttributeError:
# windows 7 and later have a TypeIndex, but windows 10
# further encodes the index value with nt1!ObHeaderCookie
try:
type_index = ((self.vol.offset >> 8) ^ cookie ^ self.TypeIndex) & 0xFF
except (AttributeError, TypeError):
type_index = self.TypeIndex
self._vol['object_header_object_type'] = type_map.get(type_index)
return self.vol.object_header_object_type
@property
def NameInfo(self) -> interfaces.objects.ObjectInterface:
if constants.BANG not in self.vol.type_name:
raise ValueError("Invalid symbol table name syntax (no {} found)".format(constants.BANG))
symbol_table_name = self.vol.type_name.split(constants.BANG)[0]
try:
header_offset = self.NameInfoOffset
except AttributeError:
# http://codemachine.com/article_objectheader.html (Windows 7 and later)
name_info_bit = 0x2
layer = self._context.layers[self.vol.native_layer_name]
kvo = layer.config.get("kernel_virtual_offset", None)
if kvo is None:
raise AttributeError("Could not find kernel_virtual_offset for layer: {}".format(self.vol.layer_name))
ntkrnlmp = self._context.module(symbol_table_name, layer_name = self.vol.layer_name, offset = kvo)
address = ntkrnlmp.get_symbol("ObpInfoMaskToOffset").address
calculated_index = self.InfoMask & (name_info_bit | (name_info_bit - 1))
header_offset = self._context.object(symbol_table_name + constants.BANG + "unsigned char",
layer_name = self.vol.native_layer_name,
offset = kvo + address + calculated_index)
if header_offset == 0:
raise ValueError("Could not find _OBJECT_HEADER_NAME_INFO for object at {} of layer {}".format(
self.vol.offset, self.vol.layer_name))
header = self._context.object(symbol_table_name + constants.BANG + "_OBJECT_HEADER_NAME_INFO",
layer_name = self.vol.layer_name,
offset = self.vol.offset - header_offset,
native_layer_name = self.vol.native_layer_name)
return header