# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from typing import List, Optional, Type
from volatility3.framework import renderers, interfaces, constants, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
vollog = logging.getLogger(__name__)
[docs]class LayerWriter(plugins.PluginInterface):
"""Runs the automagics and writes out the primary layer produced by the stacker."""
default_block_size = 0x500000
_required_framework_version = (2, 0, 0)
_version = (2, 0, 0)
[docs] @classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.TranslationLayerRequirement(name = 'primary', description = 'Memory layer for the kernel'),
requirements.IntRequirement(name = 'block_size',
description = "Size of blocks to copy over",
default = cls.default_block_size,
optional = True),
requirements.BooleanRequirement(name = 'list',
description = 'List available layers',
default = False,
optional = True),
requirements.ListRequirement(
name = 'layers',
element_type = str,
description = 'Names of layers to write (defaults to the highest non-mapped layer)',
default = None,
optional = True)
]
[docs] @classmethod
def write_layer(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
preferred_name: str,
open_method: Type[plugins.FileHandlerInterface],
chunk_size: Optional[int] = None,
progress_callback: Optional[constants.ProgressCallback] = None) -> Optional[plugins.FileHandlerInterface]:
"""Produces a FileHandler from the named layer in the provided context or None on failure
Args:
context: the context from which to read the memory layer
layer_name: the name of the layer to write out
preferred_name: a string with the preferred filename for hte file
chunk_size: an optional size for the chunks that should be written (defaults to 0x500000)
open_method: class for creating FileHandler context managers
progress_callback: an optional function that takes a percentage and a string that displays output
"""
if layer_name not in context.layers:
raise exceptions.LayerException("Layer not found")
layer = context.layers[layer_name]
if chunk_size is None:
chunk_size = cls.default_block_size
file_handle = open_method(preferred_name)
for i in range(0, layer.maximum_address, chunk_size):
current_chunk_size = min(chunk_size, layer.maximum_address - i)
data = layer.read(i, current_chunk_size, pad = True)
file_handle.write(data)
if progress_callback:
progress_callback((i / layer.maximum_address) * 100, f'Writing layer {layer_name}')
return file_handle
def _generator(self):
if self.config['list']:
for name in self.context.layers:
yield 0, (name, )
else:
# Choose the most recently added layer that isn't virtual
if not self.config['layers']:
self.config['layers'] = []
for name in self.context.layers:
if not self.context.layers[name].metadata.get('mapped', False):
self.config['layers'] = [name]
for name in self.config['layers']:
# Check the layer exists and validate the output file
if name not in self.context.layers:
yield 0, (f'Layer Name {name} does not exist', )
else:
output_name = self.config.get('output', ".".join([name, "raw"]))
try:
file_handle = self.write_layer(self.context,
name,
output_name,
self.open,
self.config.get('block_size', self.default_block_size),
progress_callback = self._progress_callback)
file_handle.close()
except IOError as excp:
yield 0, (f"Layer cannot be written to {self.config['output_name']}: {excp}", )
yield 0, (f'Layer has been written to {output_name}', )
def _generate_layers(self):
"""List layer names from this run"""
for name in self.context.layers:
yield (0, (name, self.context.layers[name].__class__.__name__))
[docs] def run(self):
if self.config['list']:
return renderers.TreeGrid([("Layer name", str), ('Layer type', str)], self._generate_layers())
return renderers.TreeGrid([("Status", str)], self._generator())