Source code for volatility3.plugins.windows.registry.lsadump

# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from struct import unpack
from typing import Optional
import hashlib

try:
    from Crypto.Cipher import ARC4, DES, AES
except ImportError:
    # Debian/Ubuntu ship pycryptodome under Cryptodome namespace
    from Cryptodome.Cipher import ARC4, DES, AES

from volatility3.framework import interfaces, renderers, exceptions
from volatility3.framework.configuration import requirements

from volatility3.framework.layers import registry as registry_layer
from volatility3.framework.symbols.windows import versions
from volatility3.plugins.windows.registry import hashdump, hivelist
from volatility3.framework.renderers import format_hints

vollog = logging.getLogger(__name__)


[docs] class Lsadump(interfaces.plugins.PluginInterface): """Dumps lsa secrets from memory""" _required_framework_version = (2, 0, 0) _version = (1, 0, 1)
[docs] @classmethod def get_requirements(cls): return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="hashdump", component=hashdump.Hashdump, version=(1, 1, 0) ), requirements.VersionRequirement( name="hivelist", component=hivelist.HiveList, version=(2, 0, 0) ), ]
[docs] @classmethod def decrypt_aes(cls, secret: bytes, key: bytes) -> bytes: """ Based on code from http://lab.mediaservice.net/code/cachedump.rb """ sha = hashlib.sha256() sha.update(key) for _i in range(1, 1000 + 1): sha.update(secret[28:60]) aeskey = sha.digest() data = b"" for i in range(60, len(secret), 16): aes = AES.new(aeskey, AES.MODE_CBC, b"\x00" * 16) buf = secret[i : i + 16] if len(buf) < 16: buf += (16 - len(buf)) * "\00" data += aes.decrypt(buf) return data
[docs] @classmethod def get_lsa_key( cls, sechive: registry_layer.RegistryHive, bootkey: bytes, vista_or_later: bool ) -> Optional[bytes]: if not bootkey: return None if vista_or_later: policy_key = "PolEKList" else: policy_key = "PolSecretEncryptionKey" enc_reg_key = hashdump.Hashdump.get_hive_key(sechive, "Policy\\" + policy_key) if not enc_reg_key: return None enc_reg_value = next(enc_reg_key.get_values(), None) if not enc_reg_value: return None try: obf_lsa_key = sechive.read(enc_reg_value.Data + 4, enc_reg_value.DataLength) except exceptions.InvalidAddressException: return None if not obf_lsa_key: return None if not vista_or_later: md5 = hashlib.md5() md5.update(bootkey) for _i in range(1000): md5.update(obf_lsa_key[60:76]) rc4key = md5.digest() rc4 = ARC4.new(rc4key) lsa_key = rc4.decrypt( obf_lsa_key[12:60] ) # lgtm [py/weak-cryptographic-algorithm] lsa_key = lsa_key[0x10:0x20] else: lsa_key = cls.decrypt_aes(obf_lsa_key, bootkey) lsa_key = lsa_key[68:100] return lsa_key
[docs] @classmethod def get_secret_by_name( cls, sechive: registry_layer.RegistryHive, name: str, lsakey: bytes, is_vista_or_later: bool, ) -> Optional[bytes]: enc_secret_key = hashdump.Hashdump.get_hive_key( sechive, "Policy\\Secrets\\" + name + "\\CurrVal" ) secret = None if enc_secret_key: try: enc_secret_value = next(enc_secret_key.get_values(), None) except ( exceptions.InvalidAddressException, registry_layer.RegistryException, ): enc_secret_value = None if enc_secret_value: try: enc_secret = sechive.read( enc_secret_value.Data + 4, enc_secret_value.DataLength ) except exceptions.InvalidAddressExceptions: return None if enc_secret: if not is_vista_or_later: secret = cls.decrypt_secret(enc_secret[0xC:], lsakey) else: secret = cls.decrypt_aes(enc_secret, lsakey) return secret
[docs] @classmethod def decrypt_secret(cls, secret: bytes, key: bytes) -> bytes: """Python implementation of SystemFunction005. Decrypts a block of data with DES using given key. Note that key can be longer than 7 bytes.""" decrypted_data = b"" j = 0 # key index for i in range(0, len(secret), 8): enc_block = secret[i : i + 8] block_key = key[j : j + 7] des_key = hashdump.Hashdump.sidbytes_to_key(block_key) des = DES.new(des_key, DES.MODE_ECB) enc_block = enc_block + b"\x00" * int(abs(8 - len(enc_block)) % 8) decrypted_data += des.decrypt( enc_block ) # lgtm [py/weak-cryptographic-algorithm] j += 7 if len(key[j : j + 7]) < 7: j = len(key[j : j + 7]) (dec_data_len,) = unpack("<L", decrypted_data[:4]) return decrypted_data[8 : 8 + dec_data_len]
def _generator( self, syshive: registry_layer.RegistryHive, sechive: registry_layer.RegistryHive, ): kernel = self.context.modules[self.config["kernel"]] vista_or_later = versions.is_vista_or_later( context=self.context, symbol_table=kernel.symbol_table_name ) bootkey = hashdump.Hashdump.get_bootkey(syshive) if not bootkey: vollog.warning("Unable to find bootkey") return None lsakey = self.get_lsa_key(sechive, bootkey, vista_or_later) if not lsakey: vollog.warning("Unable to find lsa key") return None secrets_key = hashdump.Hashdump.get_hive_key(sechive, "Policy\\Secrets") if not secrets_key: vollog.warning("Unable to find secrets key") return None for key in secrets_key.get_subkeys(): sec_val_key = hashdump.Hashdump.get_hive_key( sechive, "Policy\\Secrets\\" + key.get_key_path().split("\\")[3] + "\\CurrVal", ) if not sec_val_key: continue try: enc_secret_value = next(sec_val_key.get_values(), None) except ( StopIteration, exceptions.InvalidAddressException, registry_layer.RegistryException, ): enc_secret_value = None if not enc_secret_value: continue try: enc_secret = sechive.read( enc_secret_value.Data + 4, enc_secret_value.DataLength ) except exceptions.InvalidAddressExceptions: continue if not vista_or_later: secret = self.decrypt_secret(enc_secret[0xC:], lsakey) else: secret = self.decrypt_aes(enc_secret, lsakey) try: key_name = key.get_name() except ( exceptions.InvalidAddressException, registry_layer.RegistryException, ): key_name = renderers.UnreadableValue() yield (0, (key_name, format_hints.HexBytes(secret), secret))
[docs] def run(self): offset = self.config.get("offset", None) syshive = sechive = None for hive in hivelist.HiveList.list_hives( context=self.context, base_config_path=self.config_path, kernel_module_name=self.config["kernel"], hive_offsets=None if offset is None else [offset], ): if hive.get_name().split("\\")[-1].upper() == "SYSTEM": syshive = hive if hive.get_name().split("\\")[-1].upper() == "SECURITY": sechive = hive return renderers.TreeGrid( [("Key", str), ("Secret", format_hints.HexBytes), ("Hex", bytes)], self._generator(syshive, sechive), )