Source code for volatility3.framework.layers.avml

# This file is Copyright 2022 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

"""Functions that read AVML files.

The user of the file doesn't have to worry about the compression,
but random access is not allowed."""
import ctypes
import logging
import struct
from typing import Tuple, List, Optional

from volatility3.framework import exceptions, interfaces, constants
from volatility3.framework.layers import segmented

vollog = logging.getLogger(__name__)

try:
    # TODO: Find library for windows if needed
    try:
        # Linux
        lib_snappy = ctypes.cdll.LoadLibrary("libsnappy.so.1")
    except OSError:
        lib_snappy = None

    try:
        if not lib_snappy:
            # macOS
            lib_snappy = ctypes.cdll.LoadLibrary("libsnappy.1.dylib")
    except OSError:
        lib_snappy = None

    try:
        if not lib_snappy:
            # Windows 64
            lib_snappy = ctypes.cdll.LoadLibrary("snappy64")
    except OSError:
        lib_snappy = None

    if not lib_snappy:
        # Windows 32
        lib_snappy = ctypes.cdll.LoadLibrary("snappy32")

    __snappy_uncompress = lib_snappy.snappy_uncompress
    __snappy_uncompressed_length = lib_snappy.snappy_uncompressed_length

    HAS_SNAPPY = True
except (AttributeError, OSError):
    HAS_SNAPPY = False


[docs]class SnappyException(exceptions.VolatilityException): pass
[docs]def uncompress(s): """Uncompress a snappy compressed string.""" ulen = ctypes.c_int(0) cresult = __snappy_uncompressed_length(s, len(s), ctypes.byref(ulen)) if cresult != 0: raise SnappyException(f"Error in snappy_uncompressed_length: {cresult}") ubuf = ctypes.create_string_buffer(ulen.value) cresult = __snappy_uncompress(s, len(s), ubuf, ctypes.byref(ulen)) if cresult != 0: raise SnappyException(f"Error in snappy_uncompress: {cresult}") return ubuf.raw
[docs]class AVMLLayer(segmented.NonLinearlySegmentedLayer): """A Lime format TranslationLayer. Lime is generally used to store physical memory images where there are large holes in the physical layer """ def __init__(self, *args, **kwargs): self._compressed = {} super().__init__(*args, **kwargs) @classmethod def _check_header(cls, layer: interfaces.layers.DataLayerInterface): header_structure = "<II" magic, version = struct.unpack( header_structure, layer.read(layer.minimum_address, struct.calcsize(header_structure)), ) if magic not in [0x4C4D5641] or version != 2: raise exceptions.LayerException("File not in AVML format") if not HAS_SNAPPY: vollog.warning( "AVML file detected, but snappy library could not be found\n" "Please install the snappy from your distribution or https://google.github.io/snappy/." ) raise exceptions.LayerException( "AVML format dependencies not satisfied (snappy)" ) def _load_segments(self) -> None: base_layer = self.context.layers[self._base_layer] offset = base_layer.minimum_address while offset + 4 < base_layer.maximum_address: avml_header_structure = "<IIQQQ" avml_header_size = struct.calcsize(avml_header_structure) avml_header_data = base_layer.read(offset, avml_header_size) magic, version, start, end, padding = struct.unpack( avml_header_structure, avml_header_data ) if magic not in [0x4C4D5641] or version != 2: raise exceptions.LayerException("File not completely in AVML format") chunk_data = base_layer.read( offset + avml_header_size, min( end - start, base_layer.maximum_address - (offset + avml_header_size), ), ) segments, consumed = self._read_snappy_frames(chunk_data, end - start) # The returned segments are accurate the chunk_data that was passed in, but needs shifting for thing, mapped_offset, size, mapped_size, compressed in segments: self._segments.append( ( thing + start, offset + mapped_offset + avml_header_size, size, mapped_size, ) ) self._compressed[offset + mapped_offset + avml_header_size] = compressed # TODO: Check whatever the remaining 8 bytes are offset += avml_header_size + consumed + 8 def _read_snappy_frames( self, data: bytes, expected_length: int ) -> Tuple[List[Tuple[int, int, int, int, bool]], int]: """ Reads a framed-format snappy stream Args: data: The stream to read expected_length: How big the decompressed stream is expected to be (termination limit) Returns: (offset, mapped_offset, length, mapped_length, compressed) relative to the data chunk (ie, not relative to the file start) """ segments = [] decompressed_len = 0 offset = 0 crc_len = 4 frame_header_struct = "<L" frame_header_len = struct.calcsize(frame_header_struct) while decompressed_len <= expected_length: if offset + frame_header_len < len(data): frame_header = data[offset : offset + frame_header_len] frame_header_val = struct.unpack("<L", frame_header)[0] frame_type, frame_size = frame_header_val & 0xFF, frame_header_val >> 8 if frame_type == 0xFF: if ( data[ offset + frame_header_len : offset + frame_header_len + frame_size ] != b"sNaPpY" ): raise ValueError(f"Snappy header missing at offset: {offset}") elif frame_type in [0x00, 0x01]: # CRC + (Un)compressed data mapped_start = offset + frame_header_len # frame_crc = data[mapped_start: mapped_start + crc_len] frame_data = data[ mapped_start + crc_len : mapped_start + frame_size ] if frame_type == 0x00: # Compressed data frame_data = uncompress(frame_data) # TODO: Verify CRC segments.append( ( decompressed_len, mapped_start + crc_len, len(frame_data), frame_size - crc_len, frame_type == 0x00, ) ) decompressed_len += len(frame_data) elif frame_type in range(0x2, 0x80): # Unskippable raise exceptions.LayerException( f"Unskippable chunk of type {frame_type} found: {offset}" ) offset += frame_header_len + frame_size return segments, offset def _decode_data( self, data: bytes, mapped_offset: int, offset: int, output_length: int ) -> bytes: start_offset, _, _, _ = self._find_segment(offset) if self._compressed[mapped_offset]: decoded_data = uncompress(data) else: decoded_data = data decoded_data = decoded_data[offset - start_offset :] decoded_data = decoded_data[:output_length] return decoded_data
[docs]class AVMLStacker(interfaces.automagic.StackerLayerInterface): stack_order = 10
[docs] @classmethod def stack( cls, context: interfaces.context.ContextInterface, layer_name: str, progress_callback: constants.ProgressCallback = None, ) -> Optional[interfaces.layers.DataLayerInterface]: try: AVMLLayer._check_header(context.layers[layer_name]) except exceptions.LayerException: return None new_name = context.layers.free_layer_name("AVMLLayer") context.config[interfaces.configuration.path_join(new_name, "base_layer")] = ( layer_name ) return AVMLLayer(context, new_name, new_name)