# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import collections
import functools
import logging
import math
import struct
from typing import Any, Dict, Iterable, List, Optional, Tuple
from volatility3 import classproperty
from volatility3.framework import constants, exceptions, interfaces
from volatility3.framework.configuration import requirements
from volatility3.framework.layers import linear
vollog = logging.getLogger(__name__)
INTEL_TRANSLATION_DEBUGGING = False
[docs]
class Intel(linear.LinearlyMappedLayer):
"""Translation Layer for the Intel IA32 memory mapping."""
_PAGE_BIT_PRESENT = 0
_PAGE_BIT_PSE = 7 # Page Size Extension: 4 MB (or 2MB) page
_PAGE_BIT_PROTNONE = 8
_PAGE_BIT_PAT_LARGE = 12 # 2MB or 1GB pages
_PAGE_PRESENT = 1 << _PAGE_BIT_PRESENT
_PAGE_PSE = 1 << _PAGE_BIT_PSE
_PAGE_PROTNONE = 1 << _PAGE_BIT_PROTNONE
_PAGE_PAT_LARGE = 1 << _PAGE_BIT_PAT_LARGE
_entry_format = "<I"
_page_size_in_bits = 12
_bits_per_register = 32
# NOTE: _maxphyaddr is MAXPHYADDR as defined in the Intel specs *NOT* the maximum physical address
_maxphyaddr = 32
_maxvirtaddr = _maxphyaddr
_structure = [("page directory", 10, True), ("page table", 10, False)]
_direct_metadata = collections.ChainMap(
{"architecture": "Intel32"},
{"mapped": True},
interfaces.layers.TranslationLayerInterface._direct_metadata,
)
def __init__(
self,
context: interfaces.context.ContextInterface,
config_path: str,
name: str,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(
context=context, config_path=config_path, name=name, metadata=metadata
)
self._base_layer = self.config["memory_layer"]
self._swap_layers: List[str] = []
self._page_map_offset = self.config["page_map_offset"]
# Assign constants
self._initial_position = min(self._maxvirtaddr, self._bits_per_register) - 1
self._initial_entry = (
self._mask(self._page_map_offset, self._initial_position, 0) | 0x1
)
self._entry_size = struct.calcsize(self._entry_format)
self._entry_number = self.page_size // self._entry_size
self._canonical_prefix = self._mask(
(1 << self._bits_per_register) - 1,
self._bits_per_register,
self._maxvirtaddr,
)
# These can vary depending on the type of space
self._index_shift = math.ceil(math.log2(struct.calcsize(self._entry_format)))
@classproperty
@functools.lru_cache
def page_shift(cls) -> int:
"""Page shift for the intel memory layers."""
return cls._page_size_in_bits
@classproperty
@functools.lru_cache
def page_size(cls) -> int:
"""Page size for the intel memory layers.
All Intel layers work on 4096 byte pages
"""
return 1 << cls._page_size_in_bits
@classproperty
@functools.lru_cache
def page_mask(cls) -> int:
"""Page mask for the intel memory layers."""
return ~(cls.page_size - 1)
@classproperty
@functools.lru_cache
def bits_per_register(cls) -> int:
"""Returns the bits_per_register to determine the range of an
IntelTranslationLayer."""
return cls._bits_per_register
@classproperty
@functools.lru_cache
def minimum_address(cls) -> int:
return 0
@classproperty
@functools.lru_cache
def maximum_address(cls) -> int:
return (1 << cls._maxvirtaddr) - 1
@classproperty
def structure(cls) -> List[Tuple[str, int, bool]]:
return cls._structure
@staticmethod
def _mask(value: int, high_bit: int, low_bit: int) -> int:
"""Returns the bits of a value between highbit and lowbit inclusive."""
high_mask = (1 << (high_bit + 1)) - 1
low_mask = (1 << low_bit) - 1
mask = high_mask ^ low_mask
return value & mask
@staticmethod
def _page_is_valid(entry: int) -> bool:
"""Returns whether a particular page is valid based on its entry."""
return bool(entry & 1)
@staticmethod
def _page_is_dirty(entry: int) -> bool:
"""Returns whether a particular page is dirty based on its entry."""
return bool(entry & (1 << 6))
[docs]
def canonicalize(self, addr: int) -> int:
"""Canonicalizes an address by performing an appropriate sign extension on the higher addresses"""
if self._bits_per_register <= self._maxvirtaddr:
return addr & self.address_mask
elif addr < (1 << self._maxvirtaddr - 1):
return addr
return self._mask(addr, self._maxvirtaddr, 0) + self._canonical_prefix
[docs]
def decanonicalize(self, addr: int) -> int:
"""Removes canonicalization to ensure an address fits within the correct range if it has been canonicalized
This will produce an address outside the range if the canonicalization is incorrect
"""
if addr < (1 << self._maxvirtaddr - 1):
return addr
return addr ^ self._canonical_prefix
def _translate(self, offset: int) -> Tuple[int, int, str]:
"""Translates a specific offset based on paging tables.
Returns the translated offset, the contiguous pagesize that the
translated address lives in and the layer_name that the address
lives in
"""
entry, position = self._translate_entry(offset & self.page_mask)
# Now we're done
if not self._page_is_valid(entry):
raise exceptions.PagedInvalidAddressException(
self.name,
offset,
position + 1,
entry,
f"Page Fault at entry {hex(entry)} in page entry",
)
pfn = self._pte_pfn(entry)
page_offset = self._mask(offset, position, 0)
page = pfn << self.page_shift | page_offset
return page, 1 << (position + 1), self._base_layer
def _pte_pfn(self, entry: int) -> int:
"""Extracts the page frame number (PFN) from the page table entry (PTE) entry"""
return self._mask(entry, self._maxphyaddr - 1, 0) >> self.page_shift
@functools.lru_cache(maxsize=1024)
def _translate_entry(self, page_address: int) -> int:
"""Translates a page address based on paging tables.
Args:
page_address: The page base address
Returns:
the translated entry value
"""
# Setup the entry and how far we are through the offset
# Position maintains the number of bits left to process
# We or with 0x1 to ensure our page_map_offset is always valid
position = self._initial_position
entry = self._initial_entry
if not (
self.minimum_address
<= (page_address & self.address_mask)
<= self.maximum_address
):
raise exceptions.PagedInvalidAddressException(
self.name,
page_address,
position + 1,
entry,
"Entry outside virtual address range: " + hex(entry),
)
# Run through the offset in various chunks
for name, size, large_page in self._structure:
# Check we're valid
if not self._page_is_valid(entry):
raise exceptions.PagedInvalidAddressException(
self.name,
page_address,
position + 1,
entry,
"Page Fault at entry " + hex(entry) + " in table " + name,
)
# Grab the base address of the table we'll be getting the next entry from
base_address = self._mask(
entry, self._maxphyaddr - 1, size + self._index_shift
)
table = self._get_valid_table(base_address)
if table is None:
raise exceptions.PagedInvalidAddressException(
self.name,
page_address,
position + 1,
entry,
"Page Fault at entry " + hex(entry) + " in table " + name,
)
# Figure out how much of the offset we should be using
start = position
position -= size
index = self._mask(page_address, start, position + 1) >> (position + 1)
# Read the data for the next entry
entry_data_start = index << self._index_shift
entry_data = table[entry_data_start : entry_data_start + self._entry_size]
if INTEL_TRANSLATION_DEBUGGING:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Entry {hex(entry)} at index {hex(index)} gives data {hex(struct.unpack(self._entry_format, entry_data)[0])} as {name}",
)
# Read out the new entry from memory
(entry,) = struct.unpack(self._entry_format, entry_data)
# Check if we're a large page
if large_page and (entry & self._PAGE_PSE):
# Mask off the PAT bit
if entry & self._PAGE_PAT_LARGE:
entry -= self._PAGE_PAT_LARGE
# We're a large page, the rest is finished below
# If we want to implement PSE-36, it would need to be done here
break
return entry, position
@functools.lru_cache(maxsize=1025)
def _get_valid_table(self, base_address: int) -> Optional[bytes]:
"""Extracts the table, validates it and returns it if it's valid."""
try:
table = self._context.layers.read(
self._base_layer, base_address, self.page_size
)
except exceptions.InvalidAddressException:
return None
####
# If the table is entirely duplicates, then mark the whole table as bad
# This is because Windows 10 onwards has a tendency to map unused pages as present
# This had the following consequences:
# - Used very litle physical memory
# - Exploded virtual memory
# - Causes *scan plugins to take multiple hours to complete even on small images
# Previous versions of volatility would ignore a page during a scan when it matched
# the one directly preceding it in physical memory.
# This could trip if only two pages were identical and still required enumerating all
# the invalid pages (which itself was quite time consuming)
# For this reason, volatility 3 shifted to looking at entire page tables (1,024 pages)
# and if all the pages mapped to the same place the table wouuld be skipped
# This could also be applied to the Directory level as well as the Table level, allowing
# Volatility to skip huge sections of virtual memory very efficiently, without missing
# any pages that were distinct within a particular page table (or directory).
# In order to work at this level, the logic was moved out of the scanning component and
# directly into the layer logic itself. This does have the side effect of preventing
# entirely duplicated page tables from reporting as present, however, the trade off between
# Windows 10+ reduced scanning times (common amongst scan plugins) versus incorrectly reporting
# entire page tables of identically mapped repeating *valid* data (rare) was accepted in favour
# of the more common occurance.
if table == table[: self._entry_size] * self._entry_number:
return None
return table
[docs]
def is_valid(self, offset: int, length: int = 1) -> bool:
"""Returns whether the address offset can be translated to a valid
address."""
try:
# TODO: Consider reimplementing this, since calls to mapping can call is_valid
return all(
self._context.layers[layer].is_valid(mapped_offset)
for _, _, mapped_offset, _, layer in self.mapping(offset, length)
)
except exceptions.InvalidAddressException:
return False
[docs]
def is_dirty(self, offset: int) -> bool:
"""Returns whether the page at offset is marked dirty"""
return self._page_is_dirty(self._translate_entry(offset & self.page_mask)[0])
[docs]
def mapping(
self, offset: int, length: int, ignore_errors: bool = False
) -> Iterable[Tuple[int, int, int, int, str]]:
"""Returns a sorted iterable of (offset, sublength, mapped_offset, mapped_length, layer)
mappings.
This allows translation layers to provide maps of contiguous
regions in one layer
"""
stashed_offset = stashed_mapped_offset = stashed_size = stashed_mapped_size = (
stashed_map_layer
) = None
for offset, size, mapped_offset, mapped_size, map_layer in self._mapping(
offset, length, ignore_errors
):
if (
stashed_offset is None
or (stashed_offset + stashed_size != offset)
or (stashed_mapped_offset + stashed_mapped_size != mapped_offset)
or (stashed_map_layer != map_layer)
):
# The block isn't contiguous
if stashed_offset is not None:
yield (
stashed_offset,
stashed_size,
stashed_mapped_offset,
stashed_mapped_size,
stashed_map_layer,
)
# Update all the stashed values after output
stashed_offset = offset
stashed_mapped_offset = mapped_offset
stashed_size = size
stashed_mapped_size = mapped_size
stashed_map_layer = map_layer
else:
# Part of an existing block
stashed_size += size
stashed_mapped_size += mapped_size
# Yield whatever's left
if (
stashed_offset is not None
and stashed_mapped_offset is not None
and stashed_size is not None
and stashed_mapped_size is not None
and stashed_map_layer is not None
):
yield (
stashed_offset,
stashed_size,
stashed_mapped_offset,
stashed_mapped_size,
stashed_map_layer,
)
def _mapping(
self, offset: int, length: int, ignore_errors: bool = False
) -> Iterable[Tuple[int, int, int, int, str]]:
"""Returns a sorted iterable of (offset, sublength, mapped_offset, mapped_length, layer)
mappings.
This allows translation layers to provide maps of contiguous
regions in one layer
"""
if length == 0:
try:
mapped_offset, _, layer_name = self._translate(offset)
if not self._context.layers[layer_name].is_valid(mapped_offset):
raise exceptions.InvalidAddressException(
layer_name=layer_name, invalid_address=mapped_offset
)
except exceptions.InvalidAddressException:
if not ignore_errors:
raise
return None
yield offset, length, mapped_offset, length, layer_name
return None
while length > 0:
skip_mask = None
try:
chunk_offset, page_size, layer_name = self._translate(offset)
# Page align the chunk size value
chunk_size = min(page_size - (offset % page_size), length)
if not self._context.layers[layer_name].is_valid(
chunk_offset, chunk_size
):
# Virtual -> physical is contiguous in the chunk_size range.
# If we fail, we can jump directly to the end as we know all bytes in between
# aren't mapped (virtually and) physically anyway.
skip_mask = chunk_size - 1
raise exceptions.InvalidAddressException(
layer_name=layer_name, invalid_address=chunk_offset
)
except (
exceptions.PagedInvalidAddressException,
exceptions.InvalidAddressException,
) as excp:
if not ignore_errors:
raise
if skip_mask is None:
# We can jump more if we know where the page fault occured
if isinstance(excp, exceptions.PagedInvalidAddressException):
skip_mask = (1 << excp.invalid_bits) - 1
else:
skip_mask = (1 << self._page_size_in_bits) - 1
length_diff = skip_mask + 1 - (offset & skip_mask)
length -= length_diff
offset += length_diff
else:
yield offset, chunk_size, chunk_offset, chunk_size, layer_name
length -= chunk_size
offset += chunk_size
@property
def dependencies(self) -> List[str]:
"""Returns a list of the lower layer names that this layer is dependent
upon."""
return [self._base_layer] + self._swap_layers
[docs]
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.TranslationLayerRequirement(
name="memory_layer", optional=False
),
requirements.LayerListRequirement(name="swap_layers", optional=True),
requirements.IntRequirement(name="page_map_offset", optional=False),
requirements.IntRequirement(name="kernel_virtual_offset", optional=True),
requirements.StringRequirement(name="kernel_banner", optional=True),
]
[docs]
class IntelPAE(Intel):
"""Class for handling Physical Address Extensions for Intel
architectures."""
_entry_format = "<Q"
_bits_per_register = 32
_maxphyaddr = 40
_maxvirtaddr = 32
_structure = [
("page directory pointer", 2, False),
("page directory", 9, True),
("page table", 9, False),
]
_direct_metadata = collections.ChainMap({"pae": True}, Intel._direct_metadata)
[docs]
class Intel32e(Intel):
"""Class for handling 64-bit (32-bit extensions) for Intel
architectures."""
_direct_metadata = collections.ChainMap(
{"architecture": "Intel64"}, Intel._direct_metadata
)
_entry_format = "<Q"
_bits_per_register = 64
_maxphyaddr = 52
_maxvirtaddr = 48
_structure = [
("page map layer 4", 9, False),
("page directory pointer", 9, True),
("page directory", 9, True),
("page table", 9, False),
]
[docs]
class WindowsMixin(Intel):
@staticmethod
def _page_is_valid(entry: int) -> bool:
"""Returns whether a particular page is valid based on its entry.
Windows uses additional "available" bits to store flags
These flags allow windows to determine whether a page is still valid
Bit 11 is the transition flag, and Bit 10 is the prototype flag
For more information, see Windows Internals (6th Ed, Part 2, pages 268-269)
"""
return bool((entry & 1) or ((entry & 1 << 11) and not entry & 1 << 10))
def _translate_swap(
self, layer: Intel, offset: int, bit_offset: int
) -> Tuple[int, int, str]:
try:
return super()._translate(offset)
except exceptions.PagedInvalidAddressException as excp:
entry = excp.entry
tbit = bool(entry & (1 << 11))
pbit = bool(entry & (1 << 10))
unknown_bit = bool(entry & (1 << 7))
n = (entry >> 1) & 0xF
vbit = bool(entry & 1)
if (not tbit and not pbit and not vbit and unknown_bit) and (
(entry >> bit_offset) != 0
):
swap_offset = entry >> bit_offset << excp.invalid_bits
if layer.config.get("swap_layers", False):
swap_layer_name = layer.config.get(
interfaces.configuration.path_join(
"swap_layers", "swap_layers" + str(n)
),
None,
)
if swap_layer_name:
return swap_offset, 1 << excp.invalid_bits, swap_layer_name
raise exceptions.SwappedInvalidAddressException(
layer_name=excp.layer_name,
invalid_address=excp.invalid_address,
invalid_bits=excp.invalid_bits,
entry=excp.entry,
swap_offset=swap_offset,
)
raise
### These must be full separate classes so that JSON configs re-create them properly
[docs]
class WindowsIntel(WindowsMixin, Intel):
def _translate(self, offset):
return self._translate_swap(self, offset, self._page_size_in_bits)
[docs]
class WindowsIntelPAE(WindowsMixin, IntelPAE):
def _translate(self, offset: int) -> Tuple[int, int, str]:
return self._translate_swap(self, offset, self._bits_per_register)
[docs]
class WindowsIntel32e(WindowsMixin, Intel32e):
# TODO: Fix appropriately in a future release.
# Currently just a temporary workaround to deal with custom bit flag
# in the PFN field for pages in transition state.
# See https://github.com/volatilityfoundation/volatility3/pull/475
_maxphyaddr = 45
def _translate(self, offset: int) -> Tuple[int, int, str]:
return self._translate_swap(self, offset, self._bits_per_register // 2)
[docs]
class LinuxMixin(Intel):
@functools.cached_property
def _register_mask(self) -> int:
return (1 << self._bits_per_register) - 1
@functools.cached_property
def _physical_mask(self) -> int:
# From kernels 4.18 the physical mask is dynamic: See AMD SME, Intel Multi-Key Total
# Memory Encryption and CONFIG_DYNAMIC_PHYSICAL_MASK: 94d49eb30e854c84d1319095b5dd0405a7da9362
physical_mask = (1 << self._maxphyaddr) - 1
# TODO: Come back once SME support is available in the framework
return physical_mask
@functools.cached_property
def page_mask(self) -> int:
# Note that within the Intel class it's a class method. However, since it uses
# complement operations and we are working in Python, it would be more careful to
# limit it to the architecture's pointer size.
return ~(self.page_size - 1) & self._register_mask
@functools.cached_property
def _physical_page_mask(self) -> int:
return self.page_mask & self._physical_mask
@functools.cached_property
def _pte_pfn_mask(self) -> int:
return self._physical_page_mask
@functools.cached_property
def _pte_flags_mask(self) -> int:
return ~self._pte_pfn_mask & self._register_mask
def _pte_flags(self, pte) -> int:
return pte & self._pte_flags_mask
def _is_pte_present(self, entry: int) -> bool:
return (
self._pte_flags(entry) & (self._PAGE_PRESENT | self._PAGE_PROTNONE)
) != 0
def _page_is_valid(self, entry: int) -> bool:
# Overrides the Intel static method with the Linux-specific implementation
return self._is_pte_present(entry)
def _pte_needs_invert(self, entry) -> bool:
# Entries that were set to PROT_NONE (PAGE_PRESENT) are inverted
# A clear PTE shouldn't be inverted. See f19f5c4
return entry and not (entry & self._PAGE_PRESENT)
def _protnone_mask(self, entry: int) -> int:
"""Gets a mask to XOR with the page table entry to get the correct PFN"""
return self._register_mask if self._pte_needs_invert(entry) else 0
def _pte_pfn(self, entry: int) -> int:
"""Extracts the page frame number from the page table entry"""
pfn = entry ^ self._protnone_mask(entry)
return (pfn & self._pte_pfn_mask) >> self.page_shift
[docs]
class LinuxIntel(LinuxMixin, Intel):
pass
[docs]
class LinuxIntelPAE(LinuxMixin, IntelPAE):
pass
[docs]
class LinuxIntel32e(LinuxMixin, Intel32e):
# In the Linux kernel, the __PHYSICAL_MASK_SHIFT is a mask used to extract the
# physical address from a PTE. In Volatility3, this is referred to as _maxphyaddr.
#
# Until kernel version 4.17, Linux x86-64 used a 46-bit mask. With commit
# b83ce5ee91471d19c403ff91227204fb37c95fb2, this was extended to 52 bits,
# applying to both 4 and 5-level page tables.
#
# We initially used 52 bits for all Intel 64-bit systems, but this produced incorrect
# results for PROT_NONE pages. Since the mask value is defined by a preprocessor macro,
# it's difficult to detect the exact bit shift used in the current kernel.
# Using 46 bits has proven reliable for our use case, as seen in tools like crashtool.
_maxphyaddr = 46