Source code for volatility3.framework.interfaces.configuration

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
"""The configuration module contains classes and functions for interacting with
the configuration and requirement trees.

Volatility plugins can specify a list of requirements (which may have
subrequirements, thus forming a requirement tree). These requirement
trees can contain values, which are contained in a complementary
configuration tree.  These two trees act as a protocol between the
plugins and users.  The plugins provide requirements that must be
fulfilled, and the users provide configurations values that fulfill
those requirements.  Where the user does not provide sufficient
configuration values, automagic modules may extend the configuration
tree themselves.
"""

import collections.abc
import copy
import json
import logging
import random
import string
import sys
from abc import ABCMeta, abstractmethod
from typing import (
    Any,
    ClassVar,
    Dict,
    Generator,
    Iterator,
    List,
    Optional,
    Type,
    Union,
    Tuple,
    Set,
)

from volatility3 import classproperty, framework
from volatility3.framework import constants, interfaces

CONFIG_SEPARATOR = "."
"""Use to specify the separator between configuration hierarchies"""

vollog = logging.getLogger(__name__)

BasicTypes = (int, bool, bytes, str)
SimpleTypes = Union[int, bool, bytes, str]
ConfigSimpleType = Optional[Union[SimpleTypes, List[SimpleTypes]]]


[docs]def path_join(*args) -> str: """Joins configuration paths together.""" # If a path element (particularly the first) is empty, then remove it from the list args = tuple([arg for arg in args if arg]) return CONFIG_SEPARATOR.join(args)
[docs]def parent_path(value: str) -> str: """Returns the parent configuration path from a configuration path.""" return CONFIG_SEPARATOR.join(value.split(CONFIG_SEPARATOR)[:-1])
[docs]def path_head(value: str) -> str: """Return the top of the configuration path""" return value.split(CONFIG_SEPARATOR)[-1]
[docs]def path_depth(path: str, depth: int = 1) -> str: """Returns the `path` up to a certain depth. Note that `depth` can be negative (such as `-x`) and will return all elements except for the last `x` components """ return path_join(path.split(CONFIG_SEPARATOR)[:depth])
[docs]class HierarchicalDict(collections.abc.Mapping): """The core of configuration data, it is a mapping class that stores keys within itself, and also stores lower hierarchies.""" def __init__( self, initial_dict: Dict[str, "SimpleTypeRequirement"] = None, separator: str = CONFIG_SEPARATOR, ) -> None: """ Args: initial_dict: A dictionary to populate the HierarchicalDict with initially separator: A custom hierarchy separator (defaults to CONFIG_SEPARATOR) """ if not (isinstance(separator, str) and len(separator) == 1): raise TypeError(f"Separator must be a one character string: {separator}") self._separator = separator self._data: Dict[str, ConfigSimpleType] = {} self._subdict: Dict[str, "HierarchicalDict"] = {} if isinstance(initial_dict, str): initial_dict = json.loads(initial_dict) if isinstance(initial_dict, dict): for k, v in initial_dict.items(): self[k] = v elif initial_dict is not None: raise TypeError( f"Initial_dict must be a dictionary or JSON string containing a dictionary: {initial_dict}" ) def __eq__(self, other): """Define equality between HierarchicalDicts""" return dict(self) == dict(other) @property def separator(self) -> str: """Specifies the hierarchy separator in use in this HierarchyDict.""" return self._separator @property def data(self) -> Dict: """Returns just the data-containing mappings on this level of the Hierarchy.""" return self._data.copy() def _key_head(self, key: str) -> str: """Returns the first division of a key based on the dict separator, or the full key if the separator is not present.""" if self.separator in key: return key[: key.index(self.separator)] else: return key def _key_tail(self, key: str) -> str: """Returns all but the first division of a key based on the dict separator, or None if the separator is not in the key.""" if self.separator in key: return key[key.index(self.separator) + 1 :] return "" def __iter__(self) -> Iterator[Any]: """Returns an iterator object that supports the iterator protocol.""" return self.generator()
[docs] def generator(self) -> Generator[str, None, None]: """A generator for the data in this level and lower levels of this mapping. Returns: Returns each item in the top level data, and then all subkeys in a depth first order """ for key in self._data: yield key for subdict_key in self._subdict: for key in self._subdict[subdict_key]: yield subdict_key + self.separator + key
def __getitem__(self, key: str) -> ConfigSimpleType: """Gets an item, traversing down the trees to get to the final value.""" try: if self.separator in key: subdict = self._subdict[self._key_head(key)] return subdict[self._key_tail(key)] else: return self._data[key] except KeyError: raise KeyError(key) def __setitem__(self, key: str, value: Any) -> None: """Sets an item or creates a subdict and sets the item within that.""" self._setitem(key, value) def _setitem(self, key: str, value: Any, is_data: bool = True) -> None: """Set an item or appends a whole subtree at a key location.""" if self.separator in key: subdict = self._subdict.get( self._key_head(key), HierarchicalDict(separator=self.separator) ) subdict._setitem(self._key_tail(key), value, is_data) self._subdict[self._key_head(key)] = subdict else: if is_data: self._data[key] = self._sanitize_value(value) else: if not isinstance(value, HierarchicalDict): raise TypeError( "HierarchicalDicts can only store HierarchicalDicts within their structure: {}".format( type(value) ) ) self._subdict[key] = value def _sanitize_value(self, value: Any) -> ConfigSimpleType: """Method to ensure all values are standard values and not volatility objects containing contexts.""" if isinstance(value, bool): return bool(value) elif isinstance(value, int): return int(value) elif isinstance(value, str): return str(value) elif isinstance(value, bytes): return bytes(value) elif isinstance(value, list): new_list = [] for element in value: element_value = self._sanitize_value(element) if isinstance(element_value, list): raise TypeError( "Configuration list types cannot contain list types" ) if element_value is not None: new_list.append(element_value) return new_list elif value is None: return None else: raise TypeError(f"Invalid type stored in configuration: {type(value)}") def __delitem__(self, key: str) -> None: """Deletes an item from the hierarchical dict.""" try: if self.separator in key: subdict = self._subdict[self._key_head(key)] del subdict[self._key_tail(key)] else: del self._data[self._key_head(key)] except KeyError: raise KeyError(key) def __contains__(self, key: Any) -> bool: """Determines whether the key is present in the hierarchy.""" if self.separator in key: try: subdict = self._subdict[self._key_head(key)] return self._key_tail(key) in subdict except KeyError: return False else: return key in self._data def __len__(self) -> int: """Returns the length of all items.""" return len(self._data) + sum([len(subdict) for subdict in self._subdict])
[docs] def branch(self, key: str) -> "HierarchicalDict": """Returns the HierarchicalDict housed under the key. This differs from the data property, in that it is directed by the `key`, and all layers under that key are returned, not just those in that level. Higher layers are not prefixed with the location of earlier layers, so branching a hierarchy containing `a.b.c.d` on `a.b` would return a hierarchy containing `c.d`, not `a.b.c.d`. Args: key: The location within the hierarchy to return higher layers. Returns: The HierarchicalDict underneath the specified key (not just the data at that key location in the tree) """ try: if self.separator in key: return self._subdict[self._key_head(key)].branch(self._key_tail(key)) else: return self._subdict[key] except KeyError: self._setitem( key=key, value=HierarchicalDict(separator=self.separator), is_data=False ) return HierarchicalDict()
[docs] def splice(self, key: str, value: "HierarchicalDict") -> None: """Splices an existing HierarchicalDictionary under a specific key. This can be thought of as an inverse of :func:`branch`, although `branch` does not remove the requested hierarchy, it simply returns it. """ if not isinstance(key, str) or not isinstance(value, HierarchicalDict): raise TypeError("Splice requires a string key and HierarchicalDict value") self._setitem(key, value, False)
[docs] def merge( self, key: str, value: "HierarchicalDict", overwrite: bool = False ) -> None: """Acts similarly to splice, but maintains previous values. If overwrite is true, then entries in the new value are used over those that exist within key already Args: key: The location within the hierarchy at which to merge the `value` value: HierarchicalDict to be merged under the key node overwrite: A boolean defining whether the value will be overwritten if it already exists """ if not isinstance(key, str) or not isinstance(value, HierarchicalDict): raise TypeError("Splice requires a string key and HierarchicalDict value") for item in dict(value): if self.get(key + self._separator + item, None) is not None: if overwrite: self[key + self._separator + item] = value[item] else: self[key + self._separator + item] = value[item]
[docs] def clone(self) -> "HierarchicalDict": """Duplicates the configuration, allowing changes without affecting the original. Returns: A duplicate HierarchicalDict of this object """ return copy.deepcopy(self)
def __str__(self) -> str: """Turns the Hierarchical dict into a string representation.""" return json.dumps( dict([(key, self[key]) for key in sorted(self.generator())]), indent=2 )
[docs]class RequirementInterface(metaclass=ABCMeta): """Class that defines a requirement. A requirement is a means for plugins and other framework components to request specific configuration data. Requirements can either be simple types (such as :class:`~volatility3.framework.configuration.requirements.SimpleTypeRequirement`, :class:`~volatility3.framework.configuration.requirements.IntRequirement`, :class:`~volatility3.framework.configuration.requirements.BytesRequirement` and :class:`~volatility3.framework.configuration.requirements.StringRequirement`) or complex types (such as :class:`TranslationLayerRequirement`, :class:`SymbolTableRequirement` and :class:`ClassRequirement` """ def __init__( self, name: str, description: str = None, default: ConfigSimpleType = None, optional: bool = False, ) -> None: """ Args: name: The name of the requirement description: A short textual description of the requirement default: The default value for the requirement if no value is provided optional: Whether the requirement must be satisfied or not """ super().__init__() if CONFIG_SEPARATOR in name: raise ValueError( f"Name cannot contain the config-hierarchy divider ({CONFIG_SEPARATOR})" ) self._name = name self._description = description or "" self._default = default self._optional = optional self._requirements: Dict[str, RequirementInterface] = {} def __repr__(self) -> str: return "<" + self.__class__.__name__ + ": " + self.name + ">" def __eq__(self, other): if not isinstance(other, self.__class__): return False for name in self.__dict__: if other.__dict__.get(name, None) != self.__dict__[name]: return False return True @property def name(self) -> str: """The name of the Requirement. Names cannot contain CONFIG_SEPARATOR ('.' by default) since this is used within the configuration hierarchy. """ return self._name @property def description(self) -> str: """A short description of what the Requirement is designed to affect or achieve.""" return self._description @property def default(self) -> ConfigSimpleType: """Returns the default value if one is set.""" return self._default @property def optional(self) -> bool: """Whether the Requirement is optional or not.""" return self._optional @optional.setter def optional(self, value) -> None: """Sets the optional value for a requirement.""" self._optional = bool(value)
[docs] def config_value( self, context: "interfaces.context.ContextInterface", config_path: str, default: ConfigSimpleType = None, ) -> ConfigSimpleType: """Returns the value for this Requirement from its config path. Args: context: the configuration store to find the value for this requirement config_path: the configuration path of the instance of the requirement to be recovered default: a default value to provide if the requirement's configuration value is not found """ return context.config.get(config_path, default)
# Child operations @property def requirements(self) -> Dict[str, "RequirementInterface"]: """Returns a dictionary of all the child requirements, indexed by name.""" return self._requirements.copy()
[docs] def add_requirement(self, requirement: "RequirementInterface") -> None: """Adds a child to the list of requirements. Args: requirement: The requirement to add as a child-requirement """ self._requirements[requirement.name] = requirement
[docs] def remove_requirement(self, requirement: "RequirementInterface") -> None: """Removes a child from the list of requirements. Args: requirement: The requirement to remove as a child-requirement """ del self._requirements[requirement.name]
[docs] def unsatisfied_children( self, context: "interfaces.context.ContextInterface", config_path: str ) -> Dict[str, "RequirementInterface"]: """Method that will validate all child requirements. Args: context: the context containing the configuration data for this requirement config_path: the configuration path of this instance of the requirement Returns: A dictionary of full configuration paths for each unsatisfied child-requirement """ result = {} for requirement in self.requirements.values(): if not requirement.optional: subresult = requirement.unsatisfied( context, path_join(config_path, self._name) ) result.update(subresult) return result
# Validation routines
[docs] @abstractmethod def unsatisfied( self, context: "interfaces.context.ContextInterface", config_path: str ) -> Dict[str, "RequirementInterface"]: """Method to validate the value stored at config_path for the configuration object against a context. Returns a list containing its own name (or multiple unsatisfied requirement names) when invalid Args: context: The context object containing the configuration for this requirement config_path: The configuration path for this requirement to test satisfaction Returns: A dictionary of configuration-paths to requirements that could not be satisfied """
[docs]class SimpleTypeRequirement(RequirementInterface): """Class to represent a single simple type (such as a boolean, a string, an integer or a series of bytes)""" instance_type: ClassVar[Type] = bool
[docs] def add_requirement(self, requirement: RequirementInterface): """Always raises a TypeError as instance requirements cannot have children.""" raise TypeError("Instance Requirements cannot have subrequirements")
[docs] def remove_requirement(self, requirement: RequirementInterface): """Always raises a TypeError as instance requirements cannot have children.""" raise TypeError("Instance Requirements cannot have subrequirements")
[docs] def unsatisfied( self, context: "interfaces.context.ContextInterface", config_path: str ) -> Dict[str, RequirementInterface]: """Validates the instance requirement based upon its `instance_type`.""" config_path = path_join(config_path, self.name) value = self.config_value(context, config_path, None) if not isinstance(value, self.instance_type): vollog.log( constants.LOGLEVEL_V, "TypeError - {} requirements only accept {} type: {}".format( self.name, self.instance_type.__name__, repr(value) ), ) return {config_path: self} return {}
[docs]class ClassRequirement(RequirementInterface): """Requires a specific class. This is used as means to serialize specific classes for :class:`TranslationLayerRequirement` and :class:`SymbolTableRequirement` classes. """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._cls = None def __eq__(self, other): # We can just use super because it checks all member of `__dict__` # This appeases LGTM and does the right thing return super().__eq__(other) @property def cls(self) -> Optional[Type]: """Contains the actual chosen class based on the configuration value's class name.""" return self._cls
[docs] def unsatisfied( self, context: "interfaces.context.ContextInterface", config_path: str ) -> Dict[str, RequirementInterface]: """Checks to see if a class can be recovered.""" config_path = path_join(config_path, self.name) value = self.config_value(context, config_path, None) self._cls = None if value is not None and isinstance(value, str): if "." in value: # TODO: consider importing the prefix module = sys.modules.get(value[: value.rindex(".")], None) class_name = value[value.rindex(".") + 1 :] if hasattr(module, class_name): self._cls = getattr(module, class_name) else: if value in globals(): self._cls = globals()[value] if self._cls is None: return {config_path: self} return {}
[docs]class ConstructableRequirementInterface(RequirementInterface): """Defines a Requirement that can be constructed based on their own requirements. This effectively offers a means for serializing specific python types, to be reconstructed based on simple configuration data. Each constructable records a `class` requirement, which indicates the object that will be constructed. That class may have its own requirements (which is why validation of a ConstructableRequirement must happen after the class configuration value has been provided). These values are then provided to the object's constructor by name as arguments (as well as the standard `context` and `config_path` arguments). """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.add_requirement( ClassRequirement("class", "Class of the constructable requirement") ) self._current_class_requirements: Set[Any] = set() def __eq__(self, other): # We can just use super because it checks all member of `__dict__` # This appeases LGTM and does the right thing return super().__eq__(other)
[docs] @abstractmethod def construct( self, context: "interfaces.context.ContextInterface", config_path: str ) -> None: """Method for constructing within the context any required elements from subrequirements. Args: context: The context object containing the configuration data for the constructable config_path: The configuration path for the specific instance of this constructable """
def _validate_class( self, context: "interfaces.context.ContextInterface", config_path: str ) -> None: """Method to check if the class Requirement is valid and if so populate the other requirements (but no need to validate, since we're invalid already) Args: context: The context object containing the configuration data for the constructable config_path: The configuration path for the specific instance of this constructable """ class_req = self.requirements["class"] subreq_config_path = path_join(config_path, self.name) if not class_req.unsatisfied(context, subreq_config_path) and isinstance( class_req, ClassRequirement ): # We have a class, and since it's validated we can construct our requirements from it if issubclass(class_req.cls, ConfigurableInterface): # In case the class has changed, clear out the old requirements for old_req in self._current_class_requirements.copy(): del self._requirements[old_req] self._current_class_requirements.remove(old_req) # And add the new ones for requirement in class_req.cls.get_requirements(): self._current_class_requirements.add(requirement.name) self.add_requirement(requirement) def _construct_class( self, context: "interfaces.context.ContextInterface", config_path: str, requirement_dict: Dict[str, object] = None, ) -> Optional["interfaces.objects.ObjectInterface"]: """Constructs the class, handing args and the subrequirements as parameters to __init__""" if self.requirements["class"].unsatisfied(context, config_path): return None if not isinstance(self.requirements["class"], ClassRequirement): return None cls = self.requirements["class"].cls if cls is None: return None # These classes all have a name property # We could subclass this out as a NameableInterface, but it seems a little excessive # FIXME: We can't test this, because importing the other interfaces causes all kinds of import loops # if not issubclass(cls, [interfaces.layers.TranslationLayerInterface, # interfaces.symbols.SymbolTableInterface]): # return None if requirement_dict is None: requirement_dict = {} # Fulfillment must happen, exceptions happening here mean the requirements aren't correct # and these need to be raised and fixed, rather than caught and ignored obj = cls(**requirement_dict) context.config[config_path] = obj.name return obj
[docs]class ConfigurableRequirementInterface(RequirementInterface): """Simple Abstract class to provide build_required_config."""
[docs] def build_configuration( self, context: "interfaces.context.ContextInterface", config_path: str, value: Any, ) -> HierarchicalDict: """Proxies to a ConfigurableInterface if necessary."""
[docs]class ConfigurableInterface(metaclass=ABCMeta): """Class to allow objects to have requirements and read configuration data from the context config tree.""" def __init__( self, context: "interfaces.context.ContextInterface", config_path: str ) -> None: """Basic initializer that allows configurables to access their own config settings.""" super().__init__() self._context = context self._config_path = config_path self._config_cache: Optional[HierarchicalDict] = None @property def context(self) -> "interfaces.context.ContextInterface": """The context object that this configurable belongs to/configuration is stored in.""" return self._context @property def config_path(self) -> str: """The configuration path on which this configurable lives.""" return self._config_path @config_path.setter def config_path(self, value: str) -> None: """The configuration path on which this configurable lives.""" self._config_path = value self._config_cache = None @property def config(self) -> HierarchicalDict: """The Hierarchical configuration Dictionary for this Configurable object.""" if not hasattr(self, "_config_cache") or self._config_cache is None: self._config_cache = self._context.config.branch(self._config_path) return self._config_cache
[docs] def build_configuration(self) -> HierarchicalDict: """Constructs a HierarchicalDictionary of all the options required to build this component in the current context. Ensures that if the class has been created, it can be recreated using the configuration built Inheriting classes must override this to ensure any dependent classes update their configurations too """ result = HierarchicalDict() for req in self.get_requirements(): value = self.config.get(req.name, None) # Do not include the name of constructed classes if value is not None and not isinstance( req, ConstructableRequirementInterface ): result[req.name] = value if isinstance(req, ConfigurableRequirementInterface): if value is not None: result.splice( req.name, req.build_configuration(self.context, self.config_path, value), ) return result
[docs] @classmethod def get_requirements(cls) -> List[RequirementInterface]: """Returns a list of RequirementInterface objects required by this object.""" return []
[docs] @classmethod def unsatisfied( cls, context: "interfaces.context.ContextInterface", config_path: str ) -> Dict[str, RequirementInterface]: """Returns a list of the names of all unsatisfied requirements. Since a satisfied set of requirements will return [], it can be used in tests as follows: .. code-block:: python unmet = configurable.unsatisfied(context, config_path) if unmet: raise RuntimeError("Unsatisfied requirements: {}".format(unmet) """ result = {} for requirement in cls.get_requirements(): if not requirement.optional: subresult = requirement.unsatisfied(context, config_path) result.update(subresult) return result
[docs] @classmethod def make_subconfig( cls, context: "interfaces.context.ContextInterface", base_config_path: str, **kwargs, ) -> str: """Convenience function to allow constructing a new randomly generated sub-configuration path, containing each element from kwargs. Args: context: The context in which to store the new configuration base_config_path: The base configuration path on which to build the new configuration kwargs: Keyword arguments that are used to populate the new configuration path Returns: str: The newly generated full configuration path """ random_config_dict = "".join( random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(8) ) new_config_path = path_join(base_config_path, random_config_dict) # TODO: Check that the new_config_path is empty, although it's not critical if it's not since the values are merged in # This should check that each k corresponds to a requirement and each v is of the appropriate type # This would require knowledge of the new configurable itself to verify, and they should do validation in the # constructor anyway, however, to prevent bad types getting into the config tree we just verify that v is a simple type for k, v in kwargs.items(): if not isinstance(v, (int, str, bool, float, bytes)): raise TypeError( "Config values passed to make_subconfig can only be simple types" ) context.config[path_join(new_config_path, k)] = v return new_config_path
[docs]class VersionableInterface: """A class that allows version checking so that plugins can request specific versions of components they made need This currently includes other Plugins and scanners, but may be extended in the future All version number should use semantic versioning """ _version: Tuple[int, int, int] = (0, 0, 0) _required_framework_version: Tuple[int, int, int] = (0, 0, 0) def __init__(self, *args, **kwargs): framework.require_interface_version(*self._required_framework_version) super().__init__(*args, **kwargs) @classproperty def version(cls) -> Tuple[int, int, int]: """The version of the current interface (classmethods available on the component). It is strongly recommended that Semantic Versioning be used (and the default version verification is defined that way): MAJOR version when you make incompatible API changes. MINOR version when you add functionality in a backwards compatible manner. PATCH version when you make backwards compatible bug fixes. """ return cls._version