Source code for volatility3.plugins.windows.skeleton_key_check

# This file is Copyright 2021 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

# This module attempts to locate skeleton-key like function hooks.
# It does this by locating the CSystems array through a variety of methods,
# and then validating the entry for RC4 HMAC (0x17 / 23)
#
# For a thorough walkthrough on how the R&D was performed to develop this plugin,
# please see our blogpost here:
#
# https://volatility-labs.blogspot.com/2021/10/memory-forensics-r-illustrated.html

import io
import logging
from typing import Iterable, Tuple, List, Optional

import pefile

from volatility3.framework import interfaces, symbols, exceptions
from volatility3.framework import renderers, constants
from volatility3.framework.configuration import requirements
from volatility3.framework.layers import scanners
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.windows import pdbutil
from volatility3.framework.symbols.windows.extensions import pe
from volatility3.plugins.windows import pslist, vadinfo

try:
    import capstone

    has_capstone = True
except ImportError:
    has_capstone = False

vollog = logging.getLogger(__name__)


[docs]class Skeleton_Key_Check(interfaces.plugins.PluginInterface): """Looks for signs of Skeleton Key malware""" _required_framework_version = (2, 4, 0)
[docs] @classmethod def get_requirements(cls): # Since we're calling the plugin, make sure we have the plugin's requirements return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="pslist", component=pslist.PsList, version=(2, 0, 0) ), requirements.VersionRequirement( name="vadinfo", component=vadinfo.VadInfo, version=(2, 0, 0) ), requirements.VersionRequirement( name="pdbutil", component=pdbutil.PDBUtility, version=(1, 0, 0) ), ]
def _get_pefile_obj( self, pe_table_name: str, layer_name: str, base_address: int ) -> pefile.PE: """ Attempts to pefile object from the bytes of the PE file Args: pe_table_name: name of the pe types table layer_name: name of the lsass.exe process layer base_address: base address of cryptdll.dll in lsass.exe Returns: the constructed pefile object """ pe_data = io.BytesIO() try: dos_header = self.context.object( pe_table_name + constants.BANG + "_IMAGE_DOS_HEADER", offset=base_address, layer_name=layer_name, ) for offset, data in dos_header.reconstruct(): pe_data.seek(offset) pe_data.write(data) pe_ret = pefile.PE(data=pe_data.getvalue(), fast_load=True) except exceptions.InvalidAddressException: vollog.debug("Unable to reconstruct cryptdll.dll in memory") pe_ret = None return pe_ret def _check_for_skeleton_key_vad( self, csystem: interfaces.objects.ObjectInterface, cryptdll_base: int, cryptdll_size: int, ) -> bool: """ Checks if Initialize and/or Decrypt is hooked by determining if these function pointers reference addresses inside of the cryptdll VAD Args: csystem: The RC4HMAC KERB_ECRYPT instance cryptdll_base: Base address of the cryptdll.dll VAD cryptdll_size: Size of the VAD Returns: bool: if a skeleton key hook is present """ return not ( (cryptdll_base <= csystem.Initialize <= cryptdll_base + cryptdll_size) and (cryptdll_base <= csystem.Decrypt <= cryptdll_base + cryptdll_size) ) def _check_for_skeleton_key_symbols( self, csystem: interfaces.objects.ObjectInterface, rc4HmacInitialize: int, rc4HmacDecrypt: int, ) -> bool: """ Uses the PDB information to specifically check if the csystem for RC4HMAC has an initialization pointer to rc4HmacInitialize and a decryption pointer to rc4HmacDecrypt. Args: csystem: The RC4HMAC KERB_ECRYPT instance rc4HmacInitialize: The expected address of csystem Initialization function rc4HmacDecrypt: The expected address of the csystem Decryption function Returns: bool: if a skeleton key hook was found """ return ( csystem.Initialize != rc4HmacInitialize or csystem.Decrypt != rc4HmacDecrypt ) def _construct_ecrypt_array( self, array_start: int, count: int, cryptdll_types: interfaces.context.ModuleInterface, ) -> interfaces.context.ModuleInterface: """ Attempts to construct an array of _KERB_ECRYPT structures Args: array_start: starting virtual address of the array count: how many elements are in the array cryptdll_types: the reverse engineered types Returns: The instantiated array """ try: array = cryptdll_types.object( object_type="array", offset=array_start, subtype=cryptdll_types.get_type("_KERB_ECRYPT"), count=count, absolute=True, ) except exceptions.InvalidAddressException: vollog.debug( "Unable to construct cSystems array at given offset: {:x}".format( array_start ) ) array = None return array def _find_array_with_pdb_symbols( self, cryptdll_symbols: str, cryptdll_types: interfaces.context.ModuleInterface, proc_layer_name: str, cryptdll_base: int, ) -> Tuple[interfaces.objects.ObjectInterface, int, int, int]: """ Finds the CSystems array through use of PDB symbols Args: cryptdll_symbols: The symbols table from the PDB file cryptdll_types: The types from cryptdll binary analysis proc_layer_name: The lsass.exe process layer name cryptdll_base: Base address of cryptdll.dll inside of lsass.exe Returns: Tuple of: array: The cSystems array rc4HmacInitialize: The runtime address of the expected initialization function rc4HmacDecrypt: The runtime address of the expected decryption function """ cryptdll_module = self.context.module( cryptdll_symbols, layer_name=proc_layer_name, offset=cryptdll_base ) rc4HmacInitialize = cryptdll_module.get_absolute_symbol_address( "rc4HmacInitialize" ) rc4HmacDecrypt = cryptdll_module.get_absolute_symbol_address("rc4HmacDecrypt") count_address = cryptdll_module.get_symbol("cCSystems").address # we do not want to fail just because the count is not in memory # 16 was the size on samples I tested, so I chose it as the default try: count = cryptdll_types.object( object_type="unsigned long", offset=count_address ) except exceptions.InvalidAddressException: count = 16 array_start = cryptdll_module.get_absolute_symbol_address("CSystems") array = self._construct_ecrypt_array(array_start, count, cryptdll_types) if array is None: vollog.debug( "The CSystem array is not present in memory. Stopping PDB based analysis." ) return array, rc4HmacInitialize, rc4HmacDecrypt def _get_cryptdll_types( self, context: interfaces.context.ContextInterface, config, config_path: str, proc_layer_name: str, cryptdll_base: int, ): """ Builds a symbol table from the cryptdll types generated after binary analysis Args: context: the context to operate upon config: config_path: proc_layer_name: name of the lsass.exe process layer cryptdll_base: base address of cryptdll.dll inside of lsass.exe """ kernel = self.context.modules[self.config["kernel"]] table_mapping = {"nt_symbols": kernel.symbol_table_name} cryptdll_symbol_table = intermed.IntermediateSymbolTable.create( context=context, config_path=config_path, sub_path="windows", filename="kerb_ecrypt", table_mapping=table_mapping, ) return context.module( cryptdll_symbol_table, proc_layer_name, offset=cryptdll_base ) def _find_lsass_proc( self, proc_list: Iterable ) -> Tuple[interfaces.context.ContextInterface, str]: """ Walks the process list and returns the first valid lsass instances. There should be only one lsass process, but malware will often use the process name to try and blend in. Args: proc_list: The process list generator Return: The process object for lsass """ for proc in proc_list: try: proc_id = proc.UniqueProcessId proc_layer_name = proc.add_process_layer() return proc, proc_layer_name except exceptions.InvalidAddressException as excp: vollog.debug( "Process {}: invalid address {} in layer {}".format( proc_id, excp.invalid_address, excp.layer_name ) ) return None, None def _find_cryptdll( self, lsass_proc: interfaces.context.ContextInterface ) -> Tuple[int, int]: """ Finds the base address of cryptdll.dll inside of lsass.exe Args: lsass_proc: the process object for lsass.exe Returns: A tuple of: cryptdll_base: the base address of cryptdll.dll crytpdll_size: the size of the VAD for cryptdll.dll """ for vad in lsass_proc.get_vad_root().traverse(): filename = vad.get_file_name() if isinstance(filename, str) and filename.lower().endswith("cryptdll.dll"): base = vad.get_start() return base, vad.get_size() return None, None def _find_csystems_with_symbols( self, proc_layer_name: str, cryptdll_types: interfaces.context.ModuleInterface, cryptdll_base: int, cryptdll_size: int, ) -> Tuple[interfaces.objects.ObjectInterface, int, int]: """ Attempts to find CSystems and the expected address of the handlers. Relies on downloading and parsing of the cryptdll PDB file. Args: proc_layer_name: the name of the lsass.exe process layer cryptdll_types: The types from cryptdll binary analysis cryptdll_base: the base address of cryptdll.dll crytpdll_size: the size of the VAD for cryptdll.dll Returns: A tuple of: array: An initialized Volatility array of _KERB_ECRYPT structures rc4HmacInitialize: The expected address of csystem Initialization function rc4HmacDecrypt: The expected address of the csystem Decryption function """ try: cryptdll_symbols = pdbutil.PDBUtility.symbol_table_from_pdb( self.context, interfaces.configuration.path_join(self.config_path, "cryptdll"), proc_layer_name, "cryptdll.pdb", cryptdll_base, cryptdll_size, ) except exceptions.VolatilityException: vollog.debug( "Unable to use the cryptdll PDB. Stopping PDB symbols based analysis." ) return None, None, None array, rc4HmacInitialize, rc4HmacDecrypt = self._find_array_with_pdb_symbols( cryptdll_symbols, cryptdll_types, proc_layer_name, cryptdll_base ) if array is None: vollog.debug( "The CSystem array is not present in memory. Stopping PDB symbols based analysis." ) return array, rc4HmacInitialize, rc4HmacDecrypt def _get_rip_relative_target(self, inst) -> int: """ Returns the target address of a RIP-relative instruction. These instructions contain the offset of a target address relative to the current instruction pointer. Args: inst: A capstone instruction instance Returns: None or the target address of the instruction """ try: opnd = inst.operands[1] except capstone.CsError: return None if opnd.type != capstone.x86.X86_OP_MEM: return None if inst.reg_name(opnd.mem.base) != "rip": return None return inst.address + inst.size + opnd.mem.disp def _analyze_cdlocatecsystem( self, function_bytes: bytes, function_start: int, cryptdll_types: interfaces.context.ModuleInterface, proc_layer_name: str, ) -> Optional[interfaces.objects.ObjectInterface]: """ Performs static analysis on CDLocateCSystem to find the instructions that reference CSystems as well as cCsystems Args: function_bytes: the instruction bytes of CDLocateCSystem function_start: the address of CDLocateCSystem proc_layer_name: the name of the lsass.exe process layer Return: The cSystems array of ecrypt instances """ found_count = False array_start = None count = None ## we only support 64bit disassembly analysis md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64) md.detail = True for inst in md.disasm(function_bytes, function_start): # we should not reach debug traps if inst.mnemonic == "int3": break # cCsystems is referenced by a mov instruction elif inst.mnemonic == "mov": if not found_count: target_address = self._get_rip_relative_target(inst) # we do not want to fail just because the count is not in memory # 16 was the size on samples I tested, so I chose it as the default if target_address: count = int.from_bytes( self.context.layers[proc_layer_name].read( target_address, 4 ), "little", ) else: count = 16 found_count = True elif inst.mnemonic == "lea": target_address = self._get_rip_relative_target(inst) if target_address: array_start = target_address # we find the count before, so we can terminate the static analysis here break if array_start and count: array = self._construct_ecrypt_array(array_start, count, cryptdll_types) else: array = None return array def _find_csystems_with_export( self, proc_layer_name: str, cryptdll_types: interfaces.context.ModuleInterface, cryptdll_base: int, _, ) -> Optional[interfaces.objects.ObjectInterface]: """ Uses export table analysis to locate CDLocateCsystem This function references CSystems and cCsystems Args: proc_layer_name: The lsass.exe process layer name cryptdll_types: The types from cryptdll binary analysis cryptdll_base: Base address of cryptdll.dll inside of lsass.exe _: unused in this source Returns: The cSystems array """ if not has_capstone: vollog.debug( "capstone is not installed so cannot fall back to export table analysis." ) return None vollog.debug( "Unable to perform analysis using PDB symbols, falling back to export table analysis." ) pe_table_name = intermed.IntermediateSymbolTable.create( self.context, self.config_path, "windows", "pe", class_types=pe.class_types ) cryptdll = self._get_pefile_obj(pe_table_name, proc_layer_name, cryptdll_base) if not cryptdll: return None cryptdll.parse_data_directories( directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]] ) if not hasattr(cryptdll, "DIRECTORY_ENTRY_EXPORT"): return None # find the location of CDLocateCSystem and then perform static analysis for export in cryptdll.DIRECTORY_ENTRY_EXPORT.symbols: if export.name != b"CDLocateCSystem": continue function_start = cryptdll_base + export.address try: function_bytes = self.context.layers[proc_layer_name].read( function_start, 0x50 ) except exceptions.InvalidAddressException: vollog.debug( "The CDLocateCSystem function is not present in the lsass address space. Stopping export based analysis." ) break array = self._analyze_cdlocatecsystem( function_bytes, function_start, cryptdll_types, proc_layer_name ) if array is None: vollog.debug( "The CSystem array is not present in memory. Stopping export based analysis." ) return array return None def _find_csystems_with_scanning( self, proc_layer_name: str, cryptdll_types: interfaces.context.ModuleInterface, cryptdll_base: int, cryptdll_size: int, ) -> List[interfaces.context.ModuleInterface]: """ Performs scanning to find potential RC4 HMAC csystem instances This function may return several values as it cannot validate which is the active one Args: proc_layer_name: the lsass.exe process layer name cryptdll_types: the types from cryptdll binary analysis cryptdll_base: base address of cryptdll.dll inside of lsass.exe cryptdll_size: size of the VAD Returns: A list of csystem instances """ csystems = [] cryptdll_end = cryptdll_base + cryptdll_size proc_layer = self.context.layers[proc_layer_name] ecrypt_size = cryptdll_types.get_type("_KERB_ECRYPT").size # scan for potential instances of RC4 HMAC # the signature is based on the type being 0x17 # and the block size member being 1 in all test samples for address in proc_layer.scan( self.context, scanners.BytesScanner(b"\x17\x00\x00\x00\x01\x00\x00\x00"), sections=[(cryptdll_base, cryptdll_size)], ): # this occurs across page boundaries if not proc_layer.is_valid(address, ecrypt_size): continue kerb = cryptdll_types.object("_KERB_ECRYPT", offset=address, absolute=True) # ensure the Encrypt and Finish pointers are inside the VAD # these are not manipulated in the attack if (cryptdll_base < kerb.Encrypt < cryptdll_end) and ( cryptdll_base < kerb.Finish < cryptdll_end ): csystems.append(kerb) return csystems def _generator(self, procs): """ Finds instances of the RC4 HMAC CSystem structure Returns whether the instances are hooked as well as the function handler addresses Args: procs: the process list filtered to lsass.exe instances """ kernel = self.context.modules[self.config["kernel"]] if not symbols.symbol_table_is_64bit(self.context, kernel.symbol_table_name): vollog.info("This plugin only supports 64bit Windows memory samples") return None lsass_proc, proc_layer_name = self._find_lsass_proc(procs) if not lsass_proc: vollog.info( "Unable to find a valid lsass.exe process in the process list. This should never happen. Analysis cannot proceed." ) return None cryptdll_base, cryptdll_size = self._find_cryptdll(lsass_proc) if not cryptdll_base: vollog.info( "Unable to find the location of cryptdll.dll inside of lsass.exe. Analysis cannot proceed." ) return None # the custom type information from binary analysis cryptdll_types = self._get_cryptdll_types( self.context, self.config, self.config_path, proc_layer_name, cryptdll_base ) # attempt to find the array and symbols directly from the PDB csystems, rc4HmacInitialize, rc4HmacDecrypt = self._find_csystems_with_symbols( proc_layer_name, cryptdll_types, cryptdll_base, cryptdll_size ) # if we can't find cSystems through the PDB then # we fall back to export analysis and scanning # we keep the address of the rc4 functions from the PDB # though as its our only source to get them if csystems is None: fallback_sources = [ self._find_csystems_with_export, self._find_csystems_with_scanning, ] for source in fallback_sources: csystems = source( proc_layer_name, cryptdll_types, cryptdll_base, cryptdll_size ) if csystems is not None: break if csystems is None: vollog.info( "Unable to find CSystems inside of cryptdll.dll. Analysis cannot proceed." ) return None for csystem in csystems: if not self.context.layers[proc_layer_name].is_valid( csystem.vol.offset, csystem.vol.size ): continue # filter for RC4 HMAC if csystem.EncryptionType != 0x17: continue # use the specific symbols if present, otherwise use the vad start and size if rc4HmacInitialize and rc4HmacDecrypt: skeleton_key_present = self._check_for_skeleton_key_symbols( csystem, rc4HmacInitialize, rc4HmacDecrypt ) else: skeleton_key_present = self._check_for_skeleton_key_vad( csystem, cryptdll_base, cryptdll_size ) yield 0, ( lsass_proc.UniqueProcessId, "lsass.exe", skeleton_key_present, format_hints.Hex(csystem.Initialize), format_hints.Hex(csystem.Decrypt), ) def _lsass_proc_filter(self, proc): """ Used to filter to only lsass.exe processes There should only be one of these, but malware can/does make lsass.exe named processes to blend in or uses lsass.exe as a process hollowing target """ process_name = utility.array_to_string(proc.ImageFileName) return process_name != "lsass.exe"
[docs] def run(self): kernel = self.context.modules[self.config["kernel"]] return renderers.TreeGrid( [ ("PID", int), ("Process", str), ("Skeleton Key Found", bool), ("rc4HmacInitialize", format_hints.Hex), ("rc4HmacDecrypt", format_hints.Hex), ], self._generator( pslist.PsList.list_processes( context=self.context, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name, filter_func=self._lsass_proc_filter, ) ), )