Source code for volatility3.framework.layers.registry

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at
import contextlib
import logging
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from volatility3.framework import constants, exceptions, interfaces, objects
from volatility3.framework.configuration import requirements
from volatility3.framework.configuration.requirements import (
from volatility3.framework.exceptions import InvalidAddressException
from volatility3.framework.layers import linear
from volatility3.framework.symbols import intermed
from import pslist

vollog = logging.getLogger(__name__)

[docs]class RegistryFormatException(exceptions.LayerException): """Thrown when an error occurs with the underlying Registry file format."""
[docs]class RegistryInvalidIndex(exceptions.LayerException): """Thrown when an index that doesn't exist or can't be found occurs."""
[docs]class RegistryHive(linear.LinearlyMappedLayer): def __init__( self, context: interfaces.context.ContextInterface, config_path: str, name: str, metadata: Optional[Dict[str, Any]] = None, ) -> None: super().__init__( context=context, config_path=config_path, name=name, metadata=metadata ) self._base_layer = self.config["base_layer"] self._hive_offset = self.config["hive_offset"] self._table_name = self.config["nt_symbols"] self._page_size = 1 << 12 self._reg_table_name = intermed.IntermediateSymbolTable.create( context, self._config_path, "windows", "registry" ) cmhive = self.context.object( self._table_name + constants.BANG + "_CMHIVE", self._base_layer, self._hive_offset, ) self._cmhive_name = cmhive.get_name() self.hive = cmhive.Hive # TODO: Check the checksum if self.hive.Signature != 0xBEE0BEE0: raise RegistryFormatException(, f"Registry hive at {self._hive_offset} does not have a valid signature", ) # Win10 17063 introduced the Registry process to map most hives. Check # if it exists and update RegistryHive._base_layer for proc in pslist.PsList.list_processes( self.context, self.config["base_layer"], self.config["nt_symbols"] ): proc_name = proc.ImageFileName.cast( "string", max_length=proc.ImageFileName.vol.count, errors="replace" ) if proc_name == "Registry" and proc.InheritedFromUniqueProcessId == 4: proc_layer_name = proc.add_process_layer() self._base_layer = proc_layer_name break self._base_block = self.hive.BaseBlock.dereference() self._minaddr = 0 try: self._hive_maxaddr_non_volatile = self.hive.Storage[0].Length self._hive_maxaddr_volatile = self.hive.Storage[1].Length self._maxaddr = 0x80000000 | self._hive_maxaddr_volatile vollog.log( constants.LOGLEVEL_VVVV, f"Setting hive {} max address to {hex(self._maxaddr)}", ) except exceptions.InvalidAddressException: self._hive_maxaddr_non_volatile = 0x7FFFFFFF self._hive_maxaddr_volatile = 0x7FFFFFFF self._maxaddr = 0x80000000 | self._hive_maxaddr_volatile vollog.log( constants.LOGLEVEL_VVVV, f"Exception when setting hive {} max address, using {hex(self._maxaddr)}", ) def _get_hive_maxaddr(self, volatile): return ( self._hive_maxaddr_volatile if volatile else self._hive_maxaddr_non_volatile )
[docs] def get_name(self) -> str: return self._cmhive_name or "[NONAME]"
@property def hive_offset(self) -> int: return self._hive_offset @property def address_mask(self) -> int: """Return a mask that allows for the volatile bit to be set.""" return super().address_mask | 0x80000000 @property def root_cell_offset(self) -> int: """Returns the offset for the root cell in this hive.""" with contextlib.suppress(InvalidAddressException): if ( self._base_block.Signature.cast( "string", max_length=4, encoding="latin-1" ) == "regf" ): return self._base_block.RootCell return 0x20
[docs] def get_cell(self, cell_offset: int) -> "objects.StructType": """Returns the appropriate Cell value for a cell offset.""" # This would be an _HCELL containing CELL_DATA, but to save time we skip the size of the HCELL cell = self._context.object( object_type=self._table_name + constants.BANG + "_CELL_DATA", offset=cell_offset + 4,, ) return cell
[docs] def get_node(self, cell_offset: int) -> "objects.StructType": """Returns the appropriate Node, interpreted from the Cell based on its Signature.""" cell = self.get_cell(cell_offset) signature = cell.cast("string", max_length=2, encoding="latin-1") if signature == "nk": return cell.u.KeyNode elif signature == "sk": return cell.u.KeySecurity elif signature == "vk": return cell.u.KeyValue elif signature == "db": # Big Data return cell.u.ValueData elif signature == "lf" or signature == "lh" or signature == "ri": # Fast Leaf, Hash Leaf, Index Root return cell.u.KeyIndex else: # It doesn't matter that we use KeyNode, we're just after the first two bytes vollog.debug( "Unknown Signature {} (0x{:x}) at offset {}".format( signature, cell.u.KeyNode.Signature, cell_offset ) ) return cell
[docs] def get_key( self, key: str, return_list: bool = False ) -> Union[List[objects.StructType], objects.StructType]: """Gets a specific registry key by key path. return_list specifies whether the return result will be a single node (default) or a list of nodes from root to the current node (if return_list is true). """ root_node = self.get_node(self.root_cell_offset) if not root_node.vol.type_name.endswith(constants.BANG + "_CM_KEY_NODE"): raise RegistryFormatException(, "Encountered {} instead of _CM_KEY_NODE".format( root_node.vol.type_name ), ) node_key = [root_node] if key.endswith("\\"): key = key[:-1] key_array = key.split("\\") found_key: List[str] = [] while key_array and node_key: subkeys = node_key[-1].get_subkeys() for subkey in subkeys: # registry keys are not case sensitive so compare lowercase # if subkey.get_name().lower() == key_array[0].lower(): node_key = node_key + [subkey] found_key, key_array = found_key + [key_array[0]], key_array[1:] break else: node_key = [] if not node_key: raise KeyError( "Key {} not found under {}".format(key_array[0], "\\".join(found_key)) ) if return_list: return node_key return node_key[-1]
[docs] def visit_nodes( self, visitor: Callable[[objects.StructType], None], node: Optional[objects.StructType] = None, ) -> None: """Applies a callable (visitor) to all nodes within the registry tree from a given node.""" if not node: node = self.get_node(self.root_cell_offset) visitor(node) for node in node.get_subkeys(): self.visit_nodes(visitor, node)
@staticmethod def _mask(value: int, high_bit: int, low_bit: int) -> int: """Returns the bits of a value between highbit and lowbit inclusive.""" high_mask = (2 ** (high_bit + 1)) - 1 low_mask = (2**low_bit) - 1 mask = high_mask ^ low_mask # print(high_bit, low_bit, bin(mask), bin(value)) return value & mask
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ IntRequirement( name="hive_offset", description="Offset within the base layer at which the hive lives", default=0, optional=False, ), requirements.SymbolTableRequirement( name="nt_symbols", description="Windows kernel symbols" ), TranslationLayerRequirement( name="base_layer", description="Layer in which the registry hive lives", optional=False, ), ]
def _translate(self, offset: int) -> int: """Translates a single cell index to a cell memory offset and the suboffset within it.""" # Ignore the volatile bit when determining maxaddr validity volatile = self._mask(offset, 31, 31) >> 31 if offset & 0x7FFFFFFF > self._get_hive_maxaddr(volatile): vollog.log( constants.LOGLEVEL_VVV, "Layer {} couldn't translate offset {}, greater than {} in {} store of {}".format(, hex(offset & 0x7FFFFFFF), hex(self._get_hive_maxaddr(volatile)), "volative" if volatile else "non-volatile", self.get_name(), ), ) raise RegistryInvalidIndex(, "Mapping request for value greater than maxaddr" ) storage = self.hive.Storage[volatile] dir_index = self._mask(offset, 30, 21) >> 21 table_index = self._mask(offset, 20, 12) >> 12 suboffset = self._mask(offset, 11, 0) >> 0 table = storage.Map.Directory[dir_index] entry = table.Table[table_index] return entry.get_block_offset() + suboffset
[docs] def mapping( self, offset: int, length: int, ignore_errors: bool = False ) -> Iterable[Tuple[int, int, int, int, str]]: if length < 0: raise ValueError("Mapping length of RegistryHive must be positive or zero") # Return the translated offset without checking bounds within the HBIN. The check runs into # issues when pages are swapped on large HBINs, and did not seem to find any errors on single page # HBINs while dramatically slowing performance. response = [] current_offset = offset remaining_length = length chunk_size = self._page_size - (offset & (self._page_size - 1)) while remaining_length > 0: chunk_size = min(chunk_size, remaining_length, self._page_size) try: translated_offset = self._translate(current_offset) response.append( ( current_offset, chunk_size, translated_offset, chunk_size, self._base_layer, ) ) except exceptions.LayerException: if not ignore_errors: raise current_offset += chunk_size remaining_length -= chunk_size chunk_size = self._page_size return response
@property def dependencies(self) -> List[str]: """Returns a list of layer names that this layer translates onto.""" return [self.config["base_layer"]]
[docs] def is_valid(self, offset: int, length: int = 1) -> bool: """Returns a boolean based on whether the offset is valid or not.""" with contextlib.suppress(exceptions.InvalidAddressException): # Pass this to the lower layers for now return all( [ self.context.layers[layer].is_valid(offset, length) for (_, _, offset, length, layer) in self.mapping(offset, length) ] ) return False
@property def minimum_address(self) -> int: return self._minaddr @property def maximum_address(self) -> int: return self._maxaddr