Source code for volatility3.framework.symbols.windows.extensions.pool

import functools
import logging
import struct
from typing import Optional, Tuple, List, Dict, Union

from volatility3.framework import objects, interfaces, constants, symbols, exceptions, renderers
from volatility3.framework.renderers import conversion

vollog = logging.getLogger(__name__)


[docs]class POOL_HEADER(objects.StructType): """A kernel pool allocation header. Exists at the base of the allocation and provides a tag that we can scan for. """
[docs] def get_object(self, type_name: str, use_top_down: bool, executive: bool = False, kernel_symbol_table: Optional[str] = None, native_layer_name: Optional[str] = None) -> Optional[interfaces.objects.ObjectInterface]: """Carve an object or data structure from a kernel pool allocation Args: type_name: the data structure type name native_layer_name: the name of the layer where the data originally lived object_type: the object type (executive kernel objects only) kernel_symbol_table: in case objects of a different symbol table are scanned for Returns: An object as found from a POOL_HEADER """ symbol_table_name = self.vol.type_name.split(constants.BANG)[0] if constants.BANG in type_name: symbol_table_name, type_name = type_name.split(constants.BANG)[0:2] # when checking for symbols from a table other than nt_symbols grab _OBJECT_HEADER from the kernel # because symbol_table_name will be different from kernel_symbol_table. if kernel_symbol_table: object_header_type = self._context.symbol_space.get_type(kernel_symbol_table + constants.BANG + "_OBJECT_HEADER") else: # otherwise symbol_table_name *is* the kernel symbol table, so just use that. object_header_type = self._context.symbol_space.get_type(symbol_table_name + constants.BANG + "_OBJECT_HEADER") pool_header_size = self.vol.size # if there is no object type, then just instantiate a structure if not executive: mem_object = self._context.object(symbol_table_name + constants.BANG + type_name, layer_name = self.vol.layer_name, offset = self.vol.offset + pool_header_size, native_layer_name = native_layer_name) return mem_object # otherwise we have an executive object in the pool else: if symbols.symbol_table_is_64bit(self._context, symbol_table_name): alignment = 16 else: alignment = 8 # use the top down approach for windows 8 and later if use_top_down: body_offset = object_header_type.relative_child_offset('Body') infomask_offset = object_header_type.relative_child_offset('InfoMask') pointercount_offset = object_header_type.relative_child_offset('PointerCount') pointercount_size = object_header_type.members['PointerCount'][1].size optional_headers, lengths_of_optional_headers = self._calculate_optional_header_lengths( self._context, symbol_table_name) padding_available = None if 'PADDING_INFO' not in optional_headers else optional_headers.index( 'PADDING_INFO') max_optional_headers_length = sum(lengths_of_optional_headers) # define the starting and ending bounds for the scan start_offset = self.vol.offset + pool_header_size addr_limit = min(max_optional_headers_length, self.BlockSize * alignment) # A single read is better than lots of little one-byte reads. # We're ok padding this, because the byte we'd check would be 0 which would only be valid if there # were no optional headers in the first place (ie, if we read too much for headers that don't exist, # but the bit we could read were valid) infomask_data = self._context.layers[self.vol.layer_name].read(start_offset, addr_limit + infomask_offset, pad = True) # Addr stores the offset to the potential start of the OBJECT_HEADER from just after the POOL_HEADER # It will always be aligned to a particular alignment for addr in range(0, addr_limit, alignment): infomask_value = infomask_data[addr + infomask_offset] pointercount_value = int.from_bytes( infomask_data[addr + pointercount_offset:addr + pointercount_offset + pointercount_size], byteorder = 'little', signed = True) if not 0x1000000 > pointercount_value >= 0: continue padding_present = False optional_headers_length = 0 for i in range(len(lengths_of_optional_headers)): if infomask_value & (1 << i): optional_headers_length += lengths_of_optional_headers[i] if i == padding_available: padding_present = True # PADDING_INFO is a special case (4 bytes that contain the total padding length) padding_length = 0 if padding_present: # Read the four bytes from just before the next optional_headers_length minus the padding_info size # # --------------- # POOL_HEADER # --------------- # # start of PADDING_INFO # --------------- # End of other optional headers # --------------- # OBJECT_HEADER # --------------- if addr - optional_headers_length < 0: continue padding_length = struct.unpack( "<I", infomask_data[addr - optional_headers_length:addr - optional_headers_length + 4])[0] padding_length -= lengths_of_optional_headers[padding_available or 0] # Certain versions of windows have PADDING_INFO lengths that are too long # So we now check that the padding length is at a minimum the right length # and that it doesn't go beyond the entirety of the data if addr - optional_headers_length >= padding_length > addr: continue try: mem_object = self._context.object(symbol_table_name + constants.BANG + type_name, layer_name = self.vol.layer_name, offset = addr + body_offset + start_offset, native_layer_name = native_layer_name) if mem_object.is_valid(): return mem_object except (TypeError, exceptions.InvalidAddressException): pass # use the bottom up approach for windows 7 and earlier else: type_size = self._context.symbol_space.get_type(symbol_table_name + constants.BANG + type_name).size rounded_size = conversion.round(type_size, alignment, up = True) mem_object = self._context.object(symbol_table_name + constants.BANG + type_name, layer_name = self.vol.layer_name, offset = self.vol.offset + self.BlockSize * alignment - rounded_size, native_layer_name = native_layer_name) try: if mem_object.is_valid(): return mem_object except (TypeError, exceptions.InvalidAddressException): return None return None
@classmethod @functools.lru_cache() def _calculate_optional_header_lengths(cls, context: interfaces.context.ContextInterface, symbol_table_name: str) -> Tuple[List[str], List[int]]: headers = [] sizes = [] for header in [ 'CREATOR_INFO', 'NAME_INFO', 'HANDLE_INFO', 'QUOTA_INFO', 'PROCESS_INFO', 'AUDIT_INFO', 'EXTENDED_INFO', 'HANDLE_REVOCATION_INFO', 'PADDING_INFO' ]: try: type_name = "{}{}_OBJECT_HEADER_{}".format(symbol_table_name, constants.BANG, header) header_type = context.symbol_space.get_type(type_name) headers.append(header) sizes.append(header_type.size) except (AttributeError, exceptions.SymbolError): # Some of these may not exist, for example: # if build < 9200: PADDING_INFO else: AUDIT_INFO # if build == 10586: HANDLE_REVOCATION_INFO else EXTENDED_INFO # based on what's present and what's not, this list should be the right order and the right length pass return headers, sizes
[docs] def is_free_pool(self): return self.PoolType == 0
[docs] def is_paged_pool(self): return self.PoolType % 2 == 0 and self.PoolType > 0
[docs] def is_nonpaged_pool(self): return self.PoolType % 2 == 1
[docs]class POOL_HEADER_VISTA(POOL_HEADER): """A kernel pool allocation header, updated for Vista and later. Exists at the base of the allocation and provides a tag that we can scan for. """
[docs] def is_paged_pool(self): return self.PoolType % 2 == 1
[docs] def is_nonpaged_pool(self): return self.PoolType % 2 == 0 and self.PoolType > 0
[docs]class POOL_TRACKER_BIG_PAGES(objects.StructType): """A kernel big page pool tracker.""" pool_type_lookup = {} # type: Dict[str, str] def _generate_pool_type_lookup(self): # Enumeration._generate_inverse_choices() raises ValueError because multiple enum names map to the same # value in the kernel _POOL_TYPE so create a custom mapping here and take the first match symbol_table_name = self.vol.type_name.split(constants.BANG)[0] pool_type_enum = self._context.symbol_space.get_enumeration(symbol_table_name + constants.BANG + "_POOL_TYPE") for k, v in pool_type_enum.choices.items(): if v not in self.pool_type_lookup: self.pool_type_lookup[v] = k
[docs] def is_valid(self) -> bool: return self.Key > 0
# return self.Va > 0x1
[docs] def get_key(self) -> str: """Returns the Key value as a 4 character string""" tag_bytes = objects.convert_value_to_data(self.Key, int, objects.DataFormatInfo(4, "little", False)) return "".join([chr(x) if 32 < x < 127 else '' for x in tag_bytes])
[docs] def get_pool_type(self) -> Union[str, interfaces.renderers.BaseAbsentValue]: """Returns the enum name for the PoolType value on applicable systems""" # Not applicable until Vista if hasattr(self, 'PoolType'): if not self.pool_type_lookup: self._generate_pool_type_lookup() return self.pool_type_lookup.get(self.PoolType, "Unknown choice {}".format(self.PoolType)) else: return renderers.NotApplicableValue()
[docs] def get_number_of_bytes(self) -> Union[int, interfaces.renderers.BaseAbsentValue]: """Returns the NumberOfBytes value on applicable systems""" # Not applicable until Vista try: return self.NumberOfBytes except AttributeError: return renderers.NotApplicableValue()
[docs]class ExecutiveObject(interfaces.objects.ObjectInterface): """This is used as a "mixin" that provides all kernel executive objects with a means of finding their own object header."""
[docs] def get_object_header(self) -> 'OBJECT_HEADER': if constants.BANG not in self.vol.type_name: raise ValueError("Invalid symbol table name syntax (no {} found)".format(constants.BANG)) symbol_table_name = self.vol.type_name.split(constants.BANG)[0] body_offset = self._context.symbol_space.get_type(symbol_table_name + constants.BANG + "_OBJECT_HEADER").relative_child_offset("Body") return self._context.object(symbol_table_name + constants.BANG + "_OBJECT_HEADER", layer_name = self.vol.layer_name, offset = self.vol.offset - body_offset, native_layer_name = self.vol.native_layer_name)
[docs]class OBJECT_HEADER(objects.StructType): """A class for the headers for executive kernel objects, which contains quota information, ownership details, naming data, and ACLs."""
[docs] def is_valid(self) -> bool: """Determine if the object is valid.""" # if self.InfoMask > 0x48: # return False try: if self.PointerCount > 0x1000000 or self.PointerCount < 0: return False except exceptions.InvalidAddressException: return False return True
[docs] def get_object_type(self, type_map: Dict[int, str], cookie: int = None) -> Optional[str]: """Across all Windows versions, the _OBJECT_HEADER embeds details on the type of object (i.e. process, file) but the way its embedded differs between versions. This API abstracts away those details. """ if self.vol.get('object_header_object_type', None) is not None: return self.vol.object_header_object_type try: # vista and earlier have a Type member self._vol['object_header_object_type'] = self.Type.Name.String except AttributeError: # windows 7 and later have a TypeIndex, but windows 10 # further encodes the index value with nt1!ObHeaderCookie try: type_index = ((self.vol.offset >> 8) ^ cookie ^ self.TypeIndex) & 0xFF except (AttributeError, TypeError): type_index = self.TypeIndex self._vol['object_header_object_type'] = type_map.get(type_index) return self.vol.object_header_object_type
@property def NameInfo(self) -> interfaces.objects.ObjectInterface: if constants.BANG not in self.vol.type_name: raise ValueError("Invalid symbol table name syntax (no {} found)".format(constants.BANG)) symbol_table_name = self.vol.type_name.split(constants.BANG)[0] try: header_offset = self.NameInfoOffset except AttributeError: # http://codemachine.com/article_objectheader.html (Windows 7 and later) name_info_bit = 0x2 layer = self._context.layers[self.vol.native_layer_name] kvo = layer.config.get("kernel_virtual_offset", None) if kvo is None: raise AttributeError("Could not find kernel_virtual_offset for layer: {}".format(self.vol.layer_name)) ntkrnlmp = self._context.module(symbol_table_name, layer_name = self.vol.layer_name, offset = kvo) address = ntkrnlmp.get_symbol("ObpInfoMaskToOffset").address calculated_index = self.InfoMask & (name_info_bit | (name_info_bit - 1)) header_offset = self._context.object(symbol_table_name + constants.BANG + "unsigned char", layer_name = self.vol.native_layer_name, offset = kvo + address + calculated_index) if header_offset == 0: raise ValueError("Could not find _OBJECT_HEADER_NAME_INFO for object at {} of layer {}".format( self.vol.offset, self.vol.layer_name)) header = self._context.object(symbol_table_name + constants.BANG + "_OBJECT_HEADER_NAME_INFO", layer_name = self.vol.layer_name, offset = self.vol.offset - header_offset, native_layer_name = self.vol.native_layer_name) return header