Source code for volatility3.plugins.vmscan

# This file is Copyright 2023 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import enum
import logging
import os
import struct
from typing import Dict, List

from volatility3.framework import constants, exceptions, interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import configuration, plugins
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols import intermed

vollog = logging.getLogger(__name__)


[docs] class VMCSTest(enum.IntFlag): VMCS_ABORT_INVALID = enum.auto() VMCS_LINK_PTR_IS_NOT_FS = enum.auto() VMCS_HOST_CR4_NO_VTX = enum.auto() VMCS_CR3_IS_ZERO = enum.auto() VMCS_GUEST_CR4_RESERVED = enum.auto()
[docs] class PageStartScanner(interfaces.layers.ScannerInterface): _version = (1, 0, 0) def __init__(self, signatures: List[bytes], page_size: int = 0x1000): super().__init__() if not len(signatures): raise ValueError("No signatures passed to constructor") self._siglen = len(signatures[0]) for item in signatures: if len(item) != self._siglen: raise ValueError( "Signatures of different lengths passed to PageStartScanner" ) self._signatures = signatures self._page_size = page_size def __call__(self, data: bytes, data_offset: int): """Scans only the start of every page, to see whether a signature is present or not""" for page_start in range( data_offset % self._page_size, len(data), self._page_size ): if data[page_start : page_start + self._siglen] in self._signatures: yield ( page_start + data_offset, data[page_start : page_start + self._siglen], )
[docs] class Vmscan(plugins.PluginInterface): """Scans for Intel VT-d structures and generates VM volatility configs for them""" _required_framework_version = (2, 2, 0) _version = (1, 0, 0) STRICTLY_REQUIRED_TESTS = { VMCSTest.VMCS_ABORT_INVALID, VMCSTest.VMCS_LINK_PTR_IS_NOT_FS, VMCSTest.VMCS_HOST_CR4_NO_VTX, }
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.TranslationLayerRequirement( name="primary", description="Physical base memory layer" ), requirements.VersionRequirement( name="page_start_scanner", component=PageStartScanner, version=(1, 0, 0), ), requirements.IntRequirement( name="log-threshold", description="Number of criteria failed to log to debug output", default=2, optional=True, ), ]
# Scan for VMCS structures based on the known VMCS structures # found in symbols/vmcs directory def _gather_vmcs_structures( self, context: interfaces.context.ContextInterface, config_path: str ) -> Dict[bytes, str]: """Enumerate all JSON files containing VMCS information and return the structures Signatures can be generated using data extracted using the vmcs_layout tool at https://github.com/google/rekall/tree/master/tools/linux/vmcs_layout Args: context: The volatility context to work against config_path: The location to store symbol table configurations under Returns: A dictionary of pattern bytes to the string representation of the architecture """ filenames = intermed.IntermediateSymbolTable.file_symbol_url( os.path.join("generic", "vmcs") ) table_names = [] for filename in filenames: base_name = os.path.basename(filename).split(".")[0] table_name = intermed.IntermediateSymbolTable.create( context, configuration.path_join(config_path, "vmcs"), os.path.join("generic", "vmcs"), filename=base_name, ) table_names.append(table_name) result = {} for table_name in table_names: symbol_table = context.symbol_space[table_name] revision_id = struct.pack( "<I", int(symbol_table.get_symbol("revision_id").constant_data) ) result[revision_id] = table_name return result @classmethod def _verify_vmcs_page( cls, context: interfaces.context.ContextInterface, vmcs: interfaces.objects.ObjectInterface, ) -> List[str]: """Runs tests to verify whether a block of data is a VMCS page Some tests based on the Hypervisor Memory Forensics paper by Mariano Graziano, Andrea Lanzi and Davide Balzarotti Args: context: The volatility context to be used for this call vmcs: The instantiated VMCS object to verify Returns: The list of failed criteria that the VMCS did not meet """ # The VMCS should have been constructed on the physical layer (even a nested VMCS) physical_layer_name = vmcs.vol.layer_name failed_tests: VMCSTest = VMCSTest(0) # The abort field must be valid (generally 0, although other abort codes may exist) if context.layers[physical_layer_name].read(vmcs.vol.offset + 4, 4) not in [ b"\x00\x00\x00\x00" ]: failed_tests |= VMCSTest.VMCS_ABORT_INVALID # The vmcs link pointer is supposed to always be set if vmcs.vmcs_link_ptr != 0xFFFFFFFFFFFFFFFF: failed_tests |= VMCSTest.VMCS_LINK_PTR_IS_NOT_FS # To have a VMCS the host needs the VTx bit set in CR4, this can false positive often when all bits are set if (vmcs.host_cr4 & 1 << 13) == 0: failed_tests |= VMCSTest.VMCS_HOST_CR4_NO_VTX # The guest CR3 is *exceptionally* unlikely to be 0 and the guest cr4 is likely to have some bits unset if (vmcs.guest_cr3 == 0) or (vmcs.host_cr3 == 0): failed_tests |= VMCSTest.VMCS_CR3_IS_ZERO # CR4 registers have certain bits reserved that should not be set if vmcs.guest_cr4 & 0xFFFFFFFFFF889000: failed_tests |= VMCSTest.VMCS_GUEST_CR4_RESERVED if failed_tests and failed_tests.name: failed_list = failed_tests.name.split("|") return failed_list return [] def _generator(self): # Gather VMCS structures structures = self._gather_vmcs_structures(self.context, self.config_path) # Scan memory for them layer = self.context.layers[self.config["primary"]] # Try to move down to the highest physical layer if layer.config.get("memory_layer"): layer = self.context.layers[layer.config["memory_layer"]] # Run the scan for offset, match in layer.scan( self.context, PageStartScanner(list(structures.keys())), self._progress_callback, ): try: vmcs = self.context.object( structures[match] + constants.BANG + "_VMCS", layer.name, offset=offset, ) failed_list = self._verify_vmcs_page(self.context, vmcs) if not failed_list: yield ( 0, ( structures[match], format_hints.Hex(vmcs.vol.offset), format_hints.Hex(vmcs.ept), format_hints.Hex(vmcs.guest_cr3), ), ) if len(failed_list) <= self.config["log-threshold"]: vollog.debug( f"Potential {structures[match]} VMCS found at {vmcs.vol.offset:x} with failed criteria: {failed_list}" ) except (exceptions.InvalidAddressException, AttributeError): # Not what we're looking for continue
[docs] def run(self): return renderers.TreeGrid( [ ("Architecture", str), ("VMCS Physical offset", format_hints.Hex), ("EPT", format_hints.Hex), ("Guest CR3", format_hints.Hex), ], self._generator(), )