Source code for volatility3.framework.symbols.windows.extensions.pool

import contextlib
import functools
import logging
import struct
from typing import Dict, List, Optional, Tuple, Union

from volatility3.plugins.windows.poolscanner import PoolConstraint

from volatility3.framework import (
    constants,
    exceptions,
    interfaces,
    objects,
    renderers,
    symbols,
)
from volatility3.framework.renderers import conversion

vollog = logging.getLogger(__name__)


[docs]class POOL_HEADER(objects.StructType): """A kernel pool allocation header. Exists at the base of the allocation and provides a tag that we can scan for. """
[docs] def get_object( self, constraint: PoolConstraint, use_top_down: bool, kernel_symbol_table: Optional[str] = None, native_layer_name: Optional[str] = None, ) -> Optional[interfaces.objects.ObjectInterface]: """Carve an object or data structure from a kernel pool allocation Args: constraint: a PoolConstraint object used to get the pool allocation header object use_top_down: for delineating how a windows version finds the size of the object body kernel_symbol_table: in case objects of a different symbol table are scanned for native_layer_name: the name of the layer where the data originally lived Returns: An object as found from a POOL_HEADER """ type_name = constraint.type_name executive = constraint.object_type is not None symbol_table_name = self.vol.type_name.split(constants.BANG)[0] if constants.BANG in type_name: symbol_table_name, type_name = type_name.split(constants.BANG)[0:2] # when checking for symbols from a table other than nt_symbols grab _OBJECT_HEADER from the kernel # because symbol_table_name will be different from kernel_symbol_table. if kernel_symbol_table: object_header_type = self._context.symbol_space.get_type( kernel_symbol_table + constants.BANG + "_OBJECT_HEADER" ) else: # otherwise symbol_table_name *is* the kernel symbol table, so just use that. object_header_type = self._context.symbol_space.get_type( symbol_table_name + constants.BANG + "_OBJECT_HEADER" ) pool_header_size = self.vol.size # if there is no object type, then just instantiate a structure if not executive: mem_object = self._context.object( symbol_table_name + constants.BANG + type_name, layer_name=self.vol.layer_name, offset=self.vol.offset + pool_header_size, native_layer_name=native_layer_name, ) yield mem_object # otherwise we have an executive object in the pool else: if symbols.symbol_table_is_64bit(self._context, symbol_table_name): alignment = 16 else: alignment = 8 # use the top down approach for windows 8 and later if use_top_down: body_offset = object_header_type.relative_child_offset("Body") infomask_offset = object_header_type.relative_child_offset("InfoMask") pointercount_offset = object_header_type.relative_child_offset( "PointerCount" ) pointercount_size = object_header_type.members["PointerCount"][1].size ( optional_headers, lengths_of_optional_headers, ) = self._calculate_optional_header_lengths( self._context, symbol_table_name ) padding_available = ( None if "PADDING_INFO" not in optional_headers else optional_headers.index("PADDING_INFO") ) max_optional_headers_length = sum(lengths_of_optional_headers) # define the starting and ending bounds for the scan start_offset = self.vol.offset + pool_header_size addr_limit = min( max_optional_headers_length, self.BlockSize * alignment ) # A single read is better than lots of little one-byte reads. # We're ok padding this, because the byte we'd check would be 0 which would only be valid if there # were no optional headers in the first place (ie, if we read too much for headers that don't exist, # but the bit we could read were valid) infomask_data = self._context.layers[self.vol.layer_name].read( start_offset, addr_limit + infomask_offset, pad=True ) # Addr stores the offset to the potential start of the OBJECT_HEADER from just after the POOL_HEADER # It will always be aligned to a particular alignment for addr in range(0, addr_limit, alignment): infomask_value = infomask_data[addr + infomask_offset] pointercount_value = int.from_bytes( infomask_data[ addr + pointercount_offset : addr + pointercount_offset + pointercount_size ], byteorder="little", signed=True, ) if not 0x1000000 > pointercount_value >= 0: continue padding_present = False optional_headers_length = 0 for i in range(len(lengths_of_optional_headers)): if infomask_value & (1 << i): optional_headers_length += lengths_of_optional_headers[i] if i == padding_available: padding_present = True # PADDING_INFO is a special case (4 bytes that contain the total padding length) padding_length = 0 if padding_present: # Read the four bytes from just before the next optional_headers_length minus the padding_info size # # --------------- # POOL_HEADER # --------------- # # start of PADDING_INFO # --------------- # End of other optional headers # --------------- # OBJECT_HEADER # --------------- if addr - optional_headers_length < 0: continue (padding_length,) = struct.unpack( "<I", infomask_data[ addr - optional_headers_length : addr - optional_headers_length + 4 ], ) padding_length -= lengths_of_optional_headers[ padding_available or 0 ] # Certain versions of windows have PADDING_INFO lengths that are too long # So we now check that the padding length is at a minimum the right length # and that it doesn't go beyond the entirety of the data if addr - optional_headers_length >= padding_length > addr: continue with contextlib.suppress( TypeError, exceptions.InvalidAddressException ): mem_object = self._context.object( symbol_table_name + constants.BANG + type_name, layer_name=self.vol.layer_name, offset=addr + body_offset + start_offset, native_layer_name=native_layer_name, ) if mem_object.is_valid(): yield mem_object # use the bottom up approach for windows 7 and earlier else: type_size = self._context.symbol_space.get_type( symbol_table_name + constants.BANG + type_name ).size if constraint.additional_structures: for additional_structure in constraint.additional_structures: type_size += self._context.symbol_space.get_type( symbol_table_name + constants.BANG + additional_structure ).size rounded_size = conversion.round(type_size, alignment, up=True) mem_object = self._context.object( symbol_table_name + constants.BANG + type_name, layer_name=self.vol.layer_name, offset=self.vol.offset + self.BlockSize * alignment - rounded_size, native_layer_name=native_layer_name, ) with contextlib.suppress(TypeError, exceptions.InvalidAddressException): if mem_object.is_valid(): yield mem_object
@classmethod @functools.lru_cache() def _calculate_optional_header_lengths( cls, context: interfaces.context.ContextInterface, symbol_table_name: str ) -> Tuple[List[str], List[int]]: headers = [] sizes = [] for header in [ "CREATOR_INFO", "NAME_INFO", "HANDLE_INFO", "QUOTA_INFO", "PROCESS_INFO", "AUDIT_INFO", "EXTENDED_INFO", "HANDLE_REVOCATION_INFO", "PADDING_INFO", ]: with contextlib.suppress(AttributeError, exceptions.SymbolError): type_name = ( f"{symbol_table_name}{constants.BANG}_OBJECT_HEADER_{header}" ) header_type = context.symbol_space.get_type(type_name) headers.append(header) sizes.append(header_type.size) # Some of these may not exist, for example: # if build < 9200: PADDING_INFO else: AUDIT_INFO # if build == 10586: HANDLE_REVOCATION_INFO else EXTENDED_INFO # based on what's present and what's not, this list should be the right order and the right length return headers, sizes
[docs] def is_free_pool(self): return self.PoolType == 0
[docs] def is_paged_pool(self): return self.PoolType % 2 == 0 and self.PoolType > 0
[docs] def is_nonpaged_pool(self): return self.PoolType % 2 == 1
[docs]class POOL_HEADER_VISTA(POOL_HEADER): """A kernel pool allocation header, updated for Vista and later. Exists at the base of the allocation and provides a tag that we can scan for. """
[docs] def is_paged_pool(self): return self.PoolType % 2 == 1
[docs] def is_nonpaged_pool(self): return self.PoolType % 2 == 0 and self.PoolType > 0
[docs]class POOL_TRACKER_BIG_PAGES(objects.StructType): """A kernel big page pool tracker.""" pool_type_lookup: Dict[str, str] = {} def _generate_pool_type_lookup(self): # Enumeration._generate_inverse_choices() raises ValueError because multiple enum names map to the same # value in the kernel _POOL_TYPE so create a custom mapping here and take the first match symbol_table_name = self.vol.type_name.split(constants.BANG)[0] pool_type_enum = self._context.symbol_space.get_enumeration( symbol_table_name + constants.BANG + "_POOL_TYPE" ) for k, v in pool_type_enum.choices.items(): if v not in self.pool_type_lookup: self.pool_type_lookup[v] = k
[docs] def is_valid(self) -> bool: return self.Key > 0
[docs] def is_free(self) -> bool: """Returns if the allocation is freed (True) or in-use (False)""" return self.Va & 1 == 1
[docs] def get_key(self) -> str: """Returns the Key value as a 4 character string""" tag_bytes = objects.convert_value_to_data( self.Key, int, objects.DataFormatInfo(4, "little", False) ) return "".join([chr(x) if 32 < x < 127 else "" for x in tag_bytes])
[docs] def get_pool_type(self) -> Union[str, interfaces.renderers.BaseAbsentValue]: """Returns the enum name for the PoolType value on applicable systems""" # Not applicable until Vista if hasattr(self, "PoolType"): if not self.pool_type_lookup: self._generate_pool_type_lookup() return self.pool_type_lookup.get( self.PoolType, f"Unknown choice {self.PoolType}" ) else: return renderers.NotApplicableValue()
[docs] def get_number_of_bytes(self) -> Union[int, interfaces.renderers.BaseAbsentValue]: """Returns the NumberOfBytes value on applicable systems""" # Not applicable until Vista try: return self.NumberOfBytes except AttributeError: return renderers.NotApplicableValue()
[docs]class ExecutiveObject(interfaces.objects.ObjectInterface): """This is used as a "mixin" that provides all kernel executive objects with a means of finding their own object header."""
[docs] def get_object_header(self) -> "OBJECT_HEADER": if constants.BANG not in self.vol.type_name: raise ValueError( f"Invalid symbol table name syntax (no {constants.BANG} found)" ) symbol_table_name = self.vol.type_name.split(constants.BANG)[0] body_offset = self._context.symbol_space.get_type( symbol_table_name + constants.BANG + "_OBJECT_HEADER" ).relative_child_offset("Body") return self._context.object( symbol_table_name + constants.BANG + "_OBJECT_HEADER", layer_name=self.vol.layer_name, offset=self.vol.offset - body_offset, native_layer_name=self.vol.native_layer_name, )
[docs]class OBJECT_HEADER(objects.StructType): """A class for the headers for executive kernel objects, which contains quota information, ownership details, naming data, and ACLs."""
[docs] def is_valid(self) -> bool: """Determine if the object is valid.""" # if self.InfoMask > 0x48: # return False try: if self.PointerCount > 0x1000000 or self.PointerCount < 0: return False except exceptions.InvalidAddressException: return False return True
[docs] def get_object_type( self, type_map: Dict[int, str], cookie: int = None ) -> Optional[str]: """Across all Windows versions, the _OBJECT_HEADER embeds details on the type of object (i.e. process, file) but the way its embedded differs between versions. This API abstracts away those details. """ if self.vol.get("object_header_object_type", None) is not None: return self.vol.object_header_object_type try: # vista and earlier have a Type member self._vol["object_header_object_type"] = self.Type.Name.String except AttributeError: # windows 7 and later have a TypeIndex, but windows 10 # further encodes the index value with nt1!ObHeaderCookie try: type_index = ((self.vol.offset >> 8) ^ cookie ^ self.TypeIndex) & 0xFF except (AttributeError, TypeError): type_index = self.TypeIndex self._vol["object_header_object_type"] = type_map.get(type_index) return self.vol.object_header_object_type
@property def NameInfo(self) -> interfaces.objects.ObjectInterface: if constants.BANG not in self.vol.type_name: raise ValueError( f"Invalid symbol table name syntax (no {constants.BANG} found)" ) symbol_table_name = self.vol.type_name.split(constants.BANG)[0] if symbol_table_name in self._context.modules: ntkrnlmp = self._context.modules[symbol_table_name] else: layer = self._context.layers[self.vol.native_layer_name] kvo = layer.config.get("kernel_virtual_offset", None) if kvo is None: raise AttributeError( f"Could not find kernel_virtual_offset for layer: {self.vol.layer_name}" ) # We know this symbol table name can't exist because we checked for it earlier ntkrnlmp = self._context.module( symbol_table_name, layer_name=self.vol.layer_name, offset=kvo ) try: header_offset = self.NameInfoOffset except AttributeError: # http://codemachine.com/article_objectheader.html (Windows 7 and later) name_info_bit = 0x2 address = ntkrnlmp.get_symbol("ObpInfoMaskToOffset").address calculated_index = self.InfoMask & (name_info_bit | (name_info_bit - 1)) header_offset = ntkrnlmp.object( "unsigned char", layer_name=self.vol.native_layer_name, offset=address + calculated_index, ) if header_offset == 0: raise ValueError( "Could not find _OBJECT_HEADER_NAME_INFO for object at {} of layer {}".format( self.vol.offset, self.vol.layer_name ) ) header = ntkrnlmp.object( "_OBJECT_HEADER_NAME_INFO", layer_name=self.vol.layer_name, offset=self.vol.offset - header_offset, native_layer_name=self.vol.native_layer_name, absolute=True, ) return header