Source code for volatility3.plugins.windows.registry.hivelist

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from typing import Iterator, List, Tuple, Iterable, Optional

from volatility3.framework import renderers, interfaces, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.layers import registry
from volatility3.framework.renderers import format_hints
from volatility3.plugins.windows.registry import hivescan

vollog = logging.getLogger(__name__)


[docs]class HiveGenerator: """Walks the registry HiveList linked list in a given direction and stores an invalid offset if it's unable to fully walk the list""" _required_framework_version = (2, 0, 0) def __init__(self, cmhive, forward=True): self._cmhive = cmhive self._forward = forward self._invalid = None def __iter__(self): for hive in self._cmhive.HiveList.to_list( self._cmhive.vol.type_name, "HiveList", forward=self._forward ): if not hive.is_valid(): self._invalid = hive.vol.offset return None yield hive @property def invalid(self) -> Optional[int]: return self._invalid
[docs]class HiveList(interfaces.plugins.PluginInterface): """Lists the registry hives present in a particular memory image.""" _version = (1, 0, 0) _required_framework_version = (2, 0, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.StringRequirement( name="filter", description="String to filter hive names returned", optional=True, default=None, ), requirements.PluginRequirement( name="hivescan", plugin=hivescan.HiveScan, version=(1, 0, 0) ), requirements.BooleanRequirement( name="dump", description="Extract listed registry hives", default=False, optional=True, ), ]
def _sanitize_hive_name(self, name: str) -> str: return ( name.split("\\")[-1] .replace(" ", "_") .replace(".", "") .replace("[", "") .replace("]", "") ) def _generator(self) -> Iterator[Tuple[int, Tuple[int, str]]]: chunk_size = 0x500000 kernel = self.context.modules[self.config["kernel"]] for hive_object in self.list_hive_objects( context=self.context, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name, filter_string=self.config.get("filter", None), ): file_output = "Disabled" if self.config["dump"]: # Construct the hive hive = next( self.list_hives( self.context, self.config_path, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name, hive_offsets=[hive_object.vol.offset], ) ) maxaddr = hive.hive.Storage[0].Length hive_name = self._sanitize_hive_name(hive.get_name()) file_handle = self.open( f"registry.{hive_name}.{hex(hive.hive_offset)}.hive" ) with file_handle as file_data: if hive._base_block: hive_data = self.context.layers[hive.dependencies[0]].read( hive.hive.BaseBlock, 1 << 12 ) else: hive_data = "\x00" * (1 << 12) file_data.write(hive_data) for i in range(0, maxaddr, chunk_size): current_chunk_size = min(chunk_size, maxaddr - i) data = hive.read(i, current_chunk_size, pad=True) file_data.write(data) # if self._progress_callback: # self._progress_callback((i / maxaddr) * 100, 'Writing layer {}'.format(hive_name)) file_output = file_handle.preferred_filename yield ( 0, ( format_hints.Hex(hive_object.vol.offset), hive_object.get_name() or "", file_output, ), )
[docs] @classmethod def list_hives( cls, context: interfaces.context.ContextInterface, base_config_path: str, layer_name: str, symbol_table: str, filter_string: Optional[str] = None, hive_offsets: List[int] = None, ) -> Iterable[registry.RegistryHive]: """Walks through a registry, hive by hive returning the constructed registry layer name. Args: context: The context to retrieve required elements (layers, symbol tables) from base_config_path: The configuration path for any settings required by the new table layer_name: The name of the layer on which to operate symbol_table: The name of the table containing the kernel symbols filter_string: An optional string which must be present in the hive name if specified offset: An optional offset to specify a specific hive to iterate over (takes precedence over filter_string) Yields: A registry hive layer name """ if hive_offsets is None: try: hive_offsets = [ hive.vol.offset for hive in cls.list_hive_objects( context, layer_name, symbol_table, filter_string ) ] except ImportError: vollog.warning( "Unable to import windows.hivelist plugin, please provide a hive offset" ) raise ValueError( "Unable to import windows.hivelist plugin, please provide a hive offset" ) for hive_offset in hive_offsets: # Construct the hive reg_config_path = cls.make_subconfig( context=context, base_config_path=base_config_path, hive_offset=hive_offset, base_layer=layer_name, nt_symbols=symbol_table, ) try: hive = registry.RegistryHive( context, reg_config_path, name="hive" + hex(hive_offset) ) except exceptions.InvalidAddressException: vollog.warning( f"Couldn't create RegistryHive layer at offset {hex(hive_offset)}, skipping" ) continue context.layers.add_layer(hive) yield hive
[docs] @classmethod def list_hive_objects( cls, context: interfaces.context.ContextInterface, layer_name: str, symbol_table: str, filter_string: str = None, ) -> Iterator[interfaces.objects.ObjectInterface]: """Lists all the hives in the primary layer. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate symbol_table: The name of the table containing the kernel symbols filter_string: A string which must be present in the hive name if specified Returns: The list of registry hives from the `layer_name` layer as filtered against using the `filter_string` """ # We only use the object factory to demonstrate how to use one kvo = context.layers[layer_name].config["kernel_virtual_offset"] ntkrnlmp = context.module(symbol_table, layer_name=layer_name, offset=kvo) list_head = ntkrnlmp.get_symbol("CmpHiveListHead").address list_entry = ntkrnlmp.object(object_type="_LIST_ENTRY", offset=list_head) reloff = ntkrnlmp.get_type("_CMHIVE").relative_child_offset("HiveList") cmhive = ntkrnlmp.object( object_type="_CMHIVE", offset=list_entry.vol.offset - reloff, absolute=True ) # Run through the list forwards seen = set() hg = HiveGenerator(cmhive, forward=True) for hive in hg: if hive.vol.offset in seen: vollog.debug( "Hivelist found an already seen offset {} while " "traversing forwards, this should not occur".format( hex(hive.vol.offset) ) ) break seen.add(hive.vol.offset) if ( filter_string is None or filter_string.lower() in str(hive.get_name() or "").lower() ): if context.layers[layer_name].is_valid(hive.vol.offset): yield hive forward_invalid = hg.invalid if forward_invalid: vollog.debug( "Hivelist failed traversing the list forwards at {}, traversing backwards".format( hex(forward_invalid) ) ) hg = HiveGenerator(cmhive, forward=False) for hive in hg: if hive.vol.offset in seen: vollog.debug( "Hivelist found an already seen offset {} while " "traversing backwards, list walking met in the middle".format( hex(hive.vol.offset) ) ) break seen.add(hive.vol.offset) if ( filter_string is None or filter_string.lower() in str(hive.get_name() or "").lower() ): if context.layers[layer_name].is_valid(hive.vol.offset): yield hive backward_invalid = hg.invalid if backward_invalid and forward_invalid != backward_invalid: # walking forward and backward did not stop at the same offset. they should if: # 1) there are no invalid hives, walking forwards would reach the end and backwards is not necessary # 2) there is one invalid hive, walking backwards would stop at the same place as forwards # therefore, there must be more 2 or more invalid hives, so the middle of the list is not reachable # by walking the list, so revert to scanning, and walk the list forwards and backwards from each # found hive vollog.debug( "Hivelist failed traversing backwards at {}, a different " "location from forwards, revert to scanning".format( hex(backward_invalid) ) ) for hive in hivescan.HiveScan.scan_hives( context, layer_name, symbol_table ): try: if hive.HiveList.Flink: start_hive_offset = hive.HiveList.Flink - reloff ## Now instantiate the first hive in virtual address space as normal start_hive = ntkrnlmp.object( object_type="_CMHIVE", offset=start_hive_offset, absolute=True, ) for forward in (True, False): for linked_hive in start_hive.HiveList.to_list( hive.vol.type_name, "HiveList", forward ): if ( not linked_hive.is_valid() or linked_hive.vol.offset in seen ): continue seen.add(linked_hive.vol.offset) if ( filter_string is None or filter_string.lower() in str(linked_hive.get_name() or "").lower() ): if context.layers[layer_name].is_valid( linked_hive.vol.offset ): yield linked_hive except exceptions.InvalidAddressException: vollog.debug( "InvalidAddressException when traversing hive {} found from scan, skipping".format( hex(hive.vol.offset) ) )
[docs] def run(self) -> renderers.TreeGrid: return renderers.TreeGrid( [("Offset", format_hints.Hex), ("FileFullPath", str), ("File output", str)], self._generator(), )