Source code for volatility3.plugins.windows.registry.hivelist

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from typing import Iterator, List, Tuple, Iterable, Optional

from volatility3.framework import renderers, interfaces, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.layers import registry
from volatility3.framework.renderers import format_hints
from volatility3.plugins.windows.registry import hivescan

vollog = logging.getLogger(__name__)


[docs]class HiveGenerator: """Walks the registry HiveList linked list in a given direction and stores an invalid offset if it's unable to fully walk the list""" _required_framework_version = (1, 0, 0) def __init__(self, cmhive, forward = True): self._cmhive = cmhive self._forward = forward self._invalid = None def __iter__(self): for hive in self._cmhive.HiveList.to_list(self._cmhive.vol.type_name, "HiveList", forward = self._forward): if not hive.is_valid(): self._invalid = hive.vol.offset return yield hive @property def invalid(self) -> Optional[int]: return self._invalid
[docs]class HiveList(interfaces.plugins.PluginInterface): """Lists the registry hives present in a particular memory image.""" _version = (1, 0, 0) _required_framework_version = (1, 0, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.TranslationLayerRequirement(name = 'primary', description = 'Memory layer for the kernel', architectures = ["Intel32", "Intel64"]), requirements.SymbolTableRequirement(name = "nt_symbols", description = "Windows kernel symbols"), requirements.StringRequirement(name = 'filter', description = "String to filter hive names returned", optional = True, default = None), requirements.PluginRequirement(name = 'hivescan', plugin = hivescan.HiveScan, version = (1, 0, 0)), requirements.BooleanRequirement(name = 'dump', description = "Extract listed registry hives", default = False, optional = True) ]
def _sanitize_hive_name(self, name: str) -> str: return name.split('\\')[-1].replace(' ', '_').replace('.', '').replace('[', '').replace(']', '') def _generator(self) -> Iterator[Tuple[int, Tuple[int, str]]]: chunk_size = 0x500000 for hive_object in self.list_hive_objects(context = self.context, layer_name = self.config["primary"], symbol_table = self.config["nt_symbols"], filter_string = self.config.get('filter', None)): file_output = "Disabled" if self.config['dump']: # Construct the hive hive = next( self.list_hives(self.context, self.config_path, layer_name = self.config["primary"], symbol_table = self.config["nt_symbols"], hive_offsets = [hive_object.vol.offset])) maxaddr = hive.hive.Storage[0].Length hive_name = self._sanitize_hive_name(hive.get_name()) file_handle = self.open('registry.{}.{}.hive'.format(hive_name, hex(hive.hive_offset))) with file_handle as file_data: if hive._base_block: hive_data = self.context.layers[hive.dependencies[0]].read(hive.hive.BaseBlock, 1 << 12) else: hive_data = '\x00' * (1 << 12) file_data.write(hive_data) for i in range(0, maxaddr, chunk_size): current_chunk_size = min(chunk_size, maxaddr - i) data = hive.read(i, current_chunk_size, pad = True) file_data.write(data) # if self._progress_callback: # self._progress_callback((i / maxaddr) * 100, 'Writing layer {}'.format(hive_name)) file_output = file_handle.preferred_filename yield (0, (format_hints.Hex(hive_object.vol.offset), hive_object.get_name() or "", file_output))
[docs] @classmethod def list_hives(cls, context: interfaces.context.ContextInterface, base_config_path: str, layer_name: str, symbol_table: str, filter_string: Optional[str] = None, hive_offsets: List[int] = None) -> Iterable[registry.RegistryHive]: """Walks through a registry, hive by hive returning the constructed registry layer name. Args: context: The context to retrieve required elements (layers, symbol tables) from base_config_path: The configuration path for any settings required by the new table layer_name: The name of the layer on which to operate symbol_table: The name of the table containing the kernel symbols filter_string: An optional string which must be present in the hive name if specified offset: An optional offset to specify a specific hive to iterate over (takes precedence over filter_string) Yields: A registry hive layer name """ if hive_offsets is None: try: hive_offsets = [ hive.vol.offset for hive in cls.list_hive_objects(context, layer_name, symbol_table, filter_string) ] except ImportError: vollog.warning("Unable to import windows.hivelist plugin, please provide a hive offset") raise ValueError("Unable to import windows.hivelist plugin, please provide a hive offset") for hive_offset in hive_offsets: # Construct the hive reg_config_path = cls.make_subconfig(context = context, base_config_path = base_config_path, hive_offset = hive_offset, base_layer = layer_name, nt_symbols = symbol_table) try: hive = registry.RegistryHive(context, reg_config_path, name = 'hive' + hex(hive_offset)) except exceptions.InvalidAddressException: vollog.warning("Couldn't create RegistryHive layer at offset {}, skipping".format(hex(hive_offset))) continue context.layers.add_layer(hive) yield hive
[docs] @classmethod def list_hive_objects(cls, context: interfaces.context.ContextInterface, layer_name: str, symbol_table: str, filter_string: str = None) -> Iterator[interfaces.objects.ObjectInterface]: """Lists all the hives in the primary layer. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate symbol_table: The name of the table containing the kernel symbols filter_string: A string which must be present in the hive name if specified Returns: The list of registry hives from the `layer_name` layer as filtered against using the `filter_string` """ # We only use the object factory to demonstrate how to use one kvo = context.layers[layer_name].config['kernel_virtual_offset'] ntkrnlmp = context.module(symbol_table, layer_name = layer_name, offset = kvo) list_head = ntkrnlmp.get_symbol("CmpHiveListHead").address list_entry = ntkrnlmp.object(object_type = "_LIST_ENTRY", offset = list_head) reloff = ntkrnlmp.get_type("_CMHIVE").relative_child_offset("HiveList") cmhive = ntkrnlmp.object(object_type = "_CMHIVE", offset = list_entry.vol.offset - reloff, absolute = True) # Run through the list forwards seen = set() hg = HiveGenerator(cmhive, forward = True) for hive in hg: if hive.vol.offset in seen: vollog.debug("Hivelist found an already seen offset {} while " \ "traversing forwards, this should not occur".format(hex(hive.vol.offset))) break seen.add(hive.vol.offset) if filter_string is None or filter_string.lower() in str(hive.get_name() or "").lower(): if context.layers[layer_name].is_valid(hive.vol.offset): yield hive forward_invalid = hg.invalid if forward_invalid: vollog.debug("Hivelist failed traversing the list forwards at {}, traversing backwards".format( hex(forward_invalid))) hg = HiveGenerator(cmhive, forward = False) for hive in hg: if hive.vol.offset in seen: vollog.debug("Hivelist found an already seen offset {} while " \ "traversing backwards, list walking met in the middle".format(hex(hive.vol.offset))) break seen.add(hive.vol.offset) if filter_string is None or filter_string.lower() in str(hive.get_name() or "").lower(): if context.layers[layer_name].is_valid(hive.vol.offset): yield hive backward_invalid = hg.invalid if backward_invalid and forward_invalid != backward_invalid: # walking forward and backward did not stop at the same offset. they should if: # 1) there are no invalid hives, walking forwards would reach the end and backwards is not necessary # 2) there is one invalid hive, walking backwards would stop at the same place as forwards # therefore, there must be more 2 or more invalid hives, so the middle of the list is not reachable # by walking the list, so revert to scanning, and walk the list forwards and backwards from each # found hive vollog.debug("Hivelist failed traversing backwards at {}, a different " \ "location from forwards, revert to scanning".format(hex(backward_invalid))) for hive in hivescan.HiveScan.scan_hives(context, layer_name, symbol_table): try: if hive.HiveList.Flink: start_hive_offset = hive.HiveList.Flink - reloff ## Now instantiate the first hive in virtual address space as normal start_hive = ntkrnlmp.object(object_type = "_CMHIVE", offset = start_hive_offset, absolute = True) for forward in (True, False): for linked_hive in start_hive.HiveList.to_list(hive.vol.type_name, "HiveList", forward): if not linked_hive.is_valid() or linked_hive.vol.offset in seen: continue seen.add(linked_hive.vol.offset) if filter_string is None or filter_string.lower() in str(linked_hive.get_name() or "").lower(): if context.layers[layer_name].is_valid(linked_hive.vol.offset): yield linked_hive except exceptions.InvalidAddressException: vollog.debug("InvalidAddressException when traversing hive {} found from scan, skipping".format( hex(hive.vol.offset)))
[docs] def run(self) -> renderers.TreeGrid: return renderers.TreeGrid([("Offset", format_hints.Hex), ("FileFullPath", str), ("File output", str)], self._generator())