Source code for volatility3.framework.objects.utility

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import re
import logging
from typing import Optional, Union

from volatility3.framework import interfaces, objects, constants, exceptions

vollog = logging.getLogger(__name__)


[docs] def rol(value: int, count: int, max_bits: int = 64) -> int: """A rotate-left instruction in Python""" max_bits_mask = (1 << max_bits) - 1 return (value << count % max_bits) & max_bits_mask | ( (value & max_bits_mask) >> (max_bits - (count % max_bits)) )
[docs] def bswap_32(value: int) -> int: value = ((value << 8) & 0xFF00FF00) | ((value >> 8) & 0x00FF00FF) return ((value << 16) | (value >> 16)) & 0xFFFFFFFF
[docs] def bswap_64(value: int) -> int: low = bswap_32(value >> 32) high = bswap_32(value & 0xFFFFFFFF) return ((high << 32) | low) & 0xFFFFFFFFFFFFFFFF
[docs] def array_to_string( array: "objects.Array", count: Optional[int] = None, errors: str = "replace", block_size=32, encoding="utf-8", ) -> str: """Takes a Volatility 'Array' of characters and returns a Python string. Args: array: The Volatility `Array` object containing character elements. count: Optional maximum number of characters to convert. If None, the function processes the entire array. errors: Specifies error handling behavior for decoding, defaulting to "replace". block_size: Reading block size. Defaults to 32 Returns: A decoded string representation of the character array. """ # TODO: Consider checking the Array's target is a native char if not isinstance(array, objects.Array): raise TypeError("Array_to_string takes an Array of char") if count is None: count = array.vol.count return address_to_string( context=array._context, layer_name=array.vol.layer_name, address=array.vol.offset, count=count, errors=errors, block_size=block_size, encoding=encoding, )
[docs] def pointer_to_string( pointer: "objects.Pointer", count: int, errors: str = "replace", block_size=32, encoding="utf-8", ) -> str: """Takes a Volatility 'Pointer' to characters and returns a Python string. Args: pointer: A `Pointer` object containing character elements. count: Optional maximum number of characters to convert. If None, the function processes the entire array. errors: Specifies error handling behavior for decoding, defaulting to "replace". block_size: Reading block size. Defaults to 32 Returns: A decoded string representation of the data referenced by the pointer. """ if not isinstance(pointer, objects.Pointer): raise TypeError("pointer_to_string takes a Pointer") if count < 1: raise ValueError("pointer_to_string requires a positive count") return address_to_string( context=pointer._context, layer_name=pointer.vol.layer_name, address=pointer, count=count, errors=errors, block_size=block_size, encoding=encoding, )
[docs] def gather_contiguous_bytes_from_address( context, data_layer, starting_address: int, count: int ) -> bytes: """ This method reconstructs a string from memory while also carefully examining each page It goes page-by-page reading the bytes. This is done by calculating page boundaries and then only reading one page at a time. If a page is missing, the code initially catches the exception. If data is non-empty (meaning at least one read succeeded), then we return what was read If the first page fails, then we re-raise the exception """ data = b"" if isinstance(data_layer, interfaces.layers.TranslationLayerInterface): last_address = starting_address for address, length, _, _, _ in data_layer.mapping( offset=starting_address, length=count, ignore_errors=True ): # we hit a swapped out page if last_address != address: break data += data_layer.read(address, length) last_address = address + length elif starting_address + count < data_layer.maximum_address: data = data_layer.read(starting_address, count) # if we were able to read from the first page, we want to try and construct the string # if the first page fails -> throw exception if data: return data else: raise exceptions.InvalidAddressException( layer_name=data_layer, invalid_address=starting_address )
[docs] def bytes_to_decoded_string( data: bytes, encoding: str, errors: str, return_truncated: bool = True ) -> str: """ Args: data: The `bytes` buffer containing the string of a string at offset 0 encoding: An encoding value for the encoding parameter of `bytes.decode` errors: An errors value for the errors parameter of `bytes.decode` return_truncated: Dictates whether truncated strings should be returned or if a ValueError should be thrown if a truncated (broken) string was decoded Returns: bytes: The decoded string starting at offset of data This function takes a bytes buffer that contains at a string of unknown length starting at the first byte, and returns the properly decoded string It starts by using Python's `bytes.decode` to attempt to decode the entire string It then finds the termination character (\ufffd or \x00) and splices the string Finally, it returns this spliced string after its been decoded with the caller-specified encoding """ # this is the standard byte used to replace bad unicode characters unicode_replacement_char = "\ufffd" # used to find the terminating byte termination_re = re.compile(f"{unicode_replacement_char}|\x00") # run over the entire string, letting Python replace invalid characters full_decoded_string = data.decode(encoding=encoding, errors="replace") # stop at the first terminating character or get the whole string if not found try: idx = termination_re.search(full_decoded_string).start() except AttributeError: if return_truncated: idx = len(full_decoded_string) else: raise ValueError( "return_truncated set to False and truncated string decoded." ) # cut at terminating byte, if found data = bytes(full_decoded_string[:idx], encoding=encoding) # return with caller-specified encoding and errors return data.decode(encoding=encoding, errors=errors)
[docs] def address_to_string( context: interfaces.context.ContextInterface, layer_name: str, address: int, count: int, errors: str = "replace", block_size=32, encoding="utf-8", ) -> str: """Reads a null-terminated string from a given specified memory address, processing it in blocks for efficiency. Args: context: The context used to retrieve memory layers and symbol tables layer_name: The name of the memory layer to read from address: The address where the string is located in memory count: The number of bytes to read errors: The error handling scheme to use for encoding errors. Defaults to "replace" block_size: Reading block size. Defaults to 32 Returns: The decoded string extracted from memory. """ if not isinstance(address, int): raise TypeError("Address must be a valid integer") if count < 1: raise ValueError("Count must be greater than 0") layer = context.layers[layer_name] data = gather_contiguous_bytes_from_address(context, layer, address, count) return bytes_to_decoded_string(data=data, errors=errors, encoding=encoding)
[docs] def array_of_pointers( array: interfaces.objects.ObjectInterface, count: int, subtype: Union[str, interfaces.objects.Template], context: interfaces.context.ContextInterface, ) -> interfaces.objects.ObjectInterface: """Takes an object, and recasts it as an array of pointers to subtype.""" symbol_table = array.vol.type_name.split(constants.BANG)[0] if isinstance(subtype, str) and context is not None: subtype = context.symbol_space.get_type(subtype) if not isinstance(subtype, interfaces.objects.Template) or subtype is None: raise TypeError( "Subtype must be a valid template (or string name of an object template)" ) # We have to clone the pointer class, or we'll be defining the pointer subtype for all future pointers subtype_pointer = context.symbol_space.get_type( symbol_table + constants.BANG + "pointer" ).clone() subtype_pointer.update_vol(subtype=subtype) return array.cast("array", count=count, subtype=subtype_pointer)
[docs] def dynamically_sized_array_of_pointers( context: interfaces.context.ContextInterface, array: interfaces.objects.ObjectInterface, subtype: Union[str, interfaces.objects.Template], iterator_guard_value: int, stop_value: int = 0, stop_on_invalid_pointers: bool = True, ) -> interfaces.objects.ObjectInterface: """Iterates over a dynamically sized array of pointers (e.g. NULL-terminated). Array iteration should always be performed with an arbitrary guard value as maximum size, to prevent running forever in case something unexpected happens. Args: context: The context on which to operate. array: The object to cast to an array. iterator_guard_value: Stop iterating when the iterator index is greater than this value. This is an extra-safety against smearing. subtype: The subtype of the array's pointers. stop_value: Stop value used to determine when to terminate iteration once it is encountered. Defaults to 0 (NULL-terminated arrays). stop_on_invalid_pointers: Determines whether to stop iterating or not when an invalid pointer is encountered. This can be useful for arrays that are known to have smeared entries before the end. Returns: An array of pointer objects """ new_count = 0 sym_table_name = array.get_symbol_table_name() sym_table = context.symbol_space[sym_table_name] ptr_size = sym_table.get_type("pointer").size layer_name = array.vol.layer_name offset = array.vol.offset entry = None while entry != stop_value and new_count < iterator_guard_value: try: entry = context.object( sym_table_name + constants.BANG + "pointer", offset=offset, layer_name=layer_name, ) except exceptions.InvalidAddressException: break if not entry.is_readable() and stop_on_invalid_pointers: break offset += ptr_size new_count += 1 else: vollog.log( constants.LOGLEVEL_V, f"""Iterator guard value {iterator_guard_value} reached while iterating over array at offset {array.vol.offset:#x}.\ This means that there is a bug (e.g. smearing) with this array, or that it may contain valid entries past the iterator guard value.""", ) # Leverage the "Array" object instead of returning a Python list return array_of_pointers( array=array, count=new_count, subtype=subtype, context=context )