Source code for volatility3.plugins.windows.mftscan

# This file is Copyright 2022 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import contextlib
import datetime
import logging

from typing import Generator, Iterable, Dict, Tuple, Callable

from volatility3.framework import constants, exceptions, interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.renderers import conversion, format_hints
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.windows.extensions import mft
from volatility3.plugins import timeliner, yarascan

vollog = logging.getLogger(__name__)


[docs] class MFTScan(interfaces.plugins.PluginInterface, timeliner.TimeLinerInterface): """Scans for MFT FILE objects present in a particular windows memory image.""" _required_framework_version = (2, 0, 0) _version = (2, 0, 0)
[docs] @classmethod def get_requirements(cls): return [ requirements.TranslationLayerRequirement( name="primary", description="Memory layer for the kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="yarascanner", component=yarascan.YaraScanner, version=(2, 1, 0) ), ]
[docs] @staticmethod def enumerate_mft_records( context: interfaces.context.ContextInterface, config_path: str, primary_layer_name: str, attr_callback: Callable[ [ Dict[int, Tuple[str, int, int]], interfaces.objects.ObjectInterface, interfaces.objects.ObjectInterface, str, ], Generator, ], ) -> interfaces.objects.ObjectInterface: try: primary = context.layers[primary_layer_name] except KeyError: vollog.error( "Unable to obtain primary layer for scanning. Please file a bug on GitHub about this issue." ) return try: phys_layer = primary.config["memory_layer"] except KeyError: vollog.error( "Unable to obtain memory layer from primary layer. Please file a bug on GitHub about this issue." ) return layer = context.layers[phys_layer] # Yara Rule to scan for MFT Header Signatures rules = yarascan.YaraScan.process_yara_options( {"yara_string": "/FILE0|FILE\\*|BAAD/"} ) # Read in the Symbol File symbol_table = intermed.IntermediateSymbolTable.create( context=context, config_path=config_path, sub_path="windows", filename="mft", class_types={ "FILE_NAME_ENTRY": mft.MFTFileName, "MFT_ENTRY": mft.MFTEntry, "ATTRIBUTE": mft.MFTAttribute, }, ) # get each of the individual Field Sets mft_object = symbol_table + constants.BANG + "MFT_ENTRY" attribute_object = symbol_table + constants.BANG + "ATTRIBUTE" record_map = {} # Scan the layer for Raw MFT records and parse the fields for offset, _rule_name, _name, _value in layer.scan( context=context, scanner=yarascan.YaraScanner(rules=rules) ): with contextlib.suppress(exceptions.InvalidAddressException): mft_record = context.object( mft_object, offset=offset, layer_name=layer.name ) # We will update this on each pass in the next loop and use it as the new offset. attr_base_offset = mft_record.FirstAttrOffset attr = context.object( attribute_object, offset=offset + attr_base_offset, layer_name=layer.name, ) # There is no field that has a count of Attributes # Keep Attempting to read attributes until we get an invalid attr_header.AttrType while attr.Attr_Header.AttrType.is_valid_choice: yield from attr_callback(record_map, mft_record, attr, symbol_table) # If there's no advancement the loop will never end, so break it now if attr.Attr_Header.Length == 0: break # Update the base offset to point to the next attribute attr_base_offset += attr.Attr_Header.Length # Get the next attribute attr = context.object( attribute_object, offset=offset + attr_base_offset, layer_name=layer.name, )
[docs] @staticmethod def parse_mft_records( record_map: Dict[int, Tuple[str, int, int]], mft_record: interfaces.objects.ObjectInterface, attr: interfaces.objects.ObjectInterface, symbol_table_name: str, ): # MFT Flags determine the file type or dir # If we don't have a valid enum, coerce to hex so we can keep the record try: mft_flag = mft_record.Flags.lookup() except ValueError: mft_flag = hex(mft_record.Flags) # Standard Information Attribute if attr.Attr_Header.AttrType.lookup() == "STANDARD_INFORMATION": si_object = ( symbol_table_name + constants.BANG + "STANDARD_INFORMATION_ENTRY" ) attr_data = attr.Attr_Data.cast(si_object) yield 0, ( format_hints.Hex(attr_data.vol.offset), mft_record.get_signature(), mft_record.RecordNumber, mft_record.LinkCount, mft_flag, renderers.NotApplicableValue(), attr.Attr_Header.AttrType.lookup(), conversion.wintime_to_datetime(attr_data.CreationTime), conversion.wintime_to_datetime(attr_data.ModifiedTime), conversion.wintime_to_datetime(attr_data.UpdatedTime), conversion.wintime_to_datetime(attr_data.AccessedTime), renderers.NotApplicableValue(), ) # File Name Attribute elif attr.Attr_Header.AttrType.lookup() == "FILE_NAME": fn_object = symbol_table_name + constants.BANG + "FILE_NAME_ENTRY" attr_data = attr.Attr_Data.cast(fn_object) file_name = attr_data.get_full_name() # If we don't have a valid enum, coerce to hex so we can keep the record try: permissions = attr_data.Flags.lookup() except ValueError: permissions = hex(attr_data.Flags) yield 1, ( format_hints.Hex(attr_data.vol.offset), mft_record.get_signature(), mft_record.RecordNumber, mft_record.LinkCount, mft_flag, permissions, attr.Attr_Header.AttrType.lookup(), conversion.wintime_to_datetime(attr_data.CreationTime), conversion.wintime_to_datetime(attr_data.ModifiedTime), conversion.wintime_to_datetime(attr_data.UpdatedTime), conversion.wintime_to_datetime(attr_data.AccessedTime), file_name, )
[docs] @staticmethod def parse_data_record( mft_record: interfaces.objects.ObjectInterface, attr: interfaces.objects.ObjectInterface, record_map: Dict[int, Tuple[str, int, int]], return_first_record: bool, ) -> Generator[Iterable, None, None]: """ Returns the parsed data from a MFT record """ # we only care about resident data if attr.Attr_Header.NonResidentFlag: return # we aren't looking ADS when we want the first data record if return_first_record: ads_name = renderers.NotApplicableValue() # skip records without a name if we want ADS entries elif attr.Attr_Header.NameLength == 0: return else: # past the first $DATA record, attempt to get the ADS name # NotAvailableValue = > 1st Data, but name was not parsable ads_name = attr.get_resident_filename() or renderers.NotAvailableValue() content = attr.get_resident_filecontent() if content: content = format_hints.HexBytes(content) else: content = renderers.NotAvailableValue() yield ( format_hints.Hex(record_map[mft_record.vol.offset][2]), mft_record.get_signature(), mft_record.RecordNumber, attr.Attr_Header.AttrType.lookup(), record_map[mft_record.vol.offset][0], ads_name, content, )
[docs] @classmethod def parse_data_records( cls, record_map: Dict[int, Tuple[str, int, int]], mft_record: interfaces.objects.ObjectInterface, attr: interfaces.objects.ObjectInterface, symbol_table_name: str, return_first_record: bool, ) -> Generator[Iterable, None, None]: """ Parses DATA records while maintaining the FILE_NAME association from previous parsing of the record Suports returning the first/main $DATA as well as however many ADS records a file might have """ if mft_record.vol.offset not in record_map: # file name, DATA count, offset record_map[mft_record.vol.offset] = [renderers.NotAvailableValue(), 0, None] if attr.Attr_Header.AttrType.lookup() == "FILE_NAME": fn_object = symbol_table_name + constants.BANG + "FILE_NAME_ENTRY" attr_data = attr.Attr_Data.cast(fn_object) rec_name = attr_data.get_full_name() record_map[mft_record.vol.offset][0] = rec_name elif attr.Attr_Header.AttrType.lookup() == "DATA": # first data record_map[mft_record.vol.offset][2] = attr.Attr_Data.vol.offset display_data = False # first DATA attribute of this record if record_map[mft_record.vol.offset][1] == 0: if return_first_record: display_data = True record_map[mft_record.vol.offset][1] = 1 # at the second DATA attribute of this record elif record_map[mft_record.vol.offset][1] == 1 and not return_first_record: display_data = True if display_data: yield from cls.parse_data_record( mft_record, attr, record_map, return_first_record )
def _generator(self): yield from self.enumerate_mft_records( self.context, self.config_path, self.config["primary"], self.parse_mft_records, )
[docs] def generate_timeline(self): for row in self._generator(): _depth, row_data = row # Only Output FN Records if row_data[6] == "FILE_NAME": filename = row_data[-1] description = f"MFT FILE_NAME entry for {filename}" yield (description, timeliner.TimeLinerType.CREATED, row_data[7]) yield (description, timeliner.TimeLinerType.MODIFIED, row_data[8]) yield (description, timeliner.TimeLinerType.CHANGED, row_data[9]) yield (description, timeliner.TimeLinerType.ACCESSED, row_data[10])
[docs] def run(self): return renderers.TreeGrid( [ ("Offset", format_hints.Hex), ("Record Type", str), ("Record Number", int), ("Link Count", int), ("MFT Type", str), ("Permissions", str), ("Attribute Type", str), ("Created", datetime.datetime), ("Modified", datetime.datetime), ("Updated", datetime.datetime), ("Accessed", datetime.datetime), ("Filename", str), ], self._generator(), )
[docs] class ADS(interfaces.plugins.PluginInterface): """Scans for Alternate Data Stream""" _required_framework_version = (2, 7, 0) _version = (1, 0, 0)
[docs] @classmethod def get_requirements(cls): return [ requirements.PluginRequirement( name="MFTScan", plugin=MFTScan, version=(2, 0, 0) ), requirements.TranslationLayerRequirement( name="primary", description="Memory layer for the kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="yarascanner", component=yarascan.YaraScanner, version=(2, 0, 0) ), ]
[docs] @staticmethod def parse_ads_data_records( record_map: Dict[int, Tuple[str, int, int]], mft_record: interfaces.objects.ObjectInterface, attr: interfaces.objects.ObjectInterface, symbol_table_name: str, ): return MFTScan.parse_data_records( record_map, mft_record, attr, symbol_table_name, False )
def _generator(self): for ( offset, rec_type, rec_num, attr_type, file_name, ads_name, content, ) in MFTScan.enumerate_mft_records( self.context, self.config_path, self.config["primary"], self.parse_ads_data_records, ): yield ( 0, (offset, rec_type, rec_num, attr_type, file_name, ads_name, content), )
[docs] def run(self): return renderers.TreeGrid( [ ("Offset", format_hints.Hex), ("Record Type", str), ("Record Number", int), ("MFT Type", str), ("Filename", str), ("ADS Filename", str), ("Hexdump", format_hints.HexBytes), ], self._generator(), )
[docs] class ResidentData(interfaces.plugins.PluginInterface): """Scans for MFT Records with Resident Data""" _required_framework_version = (2, 7, 0) _version = (1, 0, 0)
[docs] @classmethod def get_requirements(cls): return [ requirements.PluginRequirement( name="MFTScan", plugin=MFTScan, version=(2, 0, 0) ), requirements.TranslationLayerRequirement( name="primary", description="Memory layer for the kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="yarascanner", component=yarascan.YaraScanner, version=(2, 0, 0) ), ]
[docs] @staticmethod def parse_first_data_records( record_map: Dict[int, Tuple[str, int, int]], mft_record: interfaces.objects.ObjectInterface, attr: interfaces.objects.ObjectInterface, symbol_table_name: str, ): return MFTScan.parse_data_records( record_map, mft_record, attr, symbol_table_name, True )
def _generator(self): for ( offset, rec_type, rec_num, attr_type, file_name, _, content, ) in MFTScan.enumerate_mft_records( self.context, self.config_path, self.config["primary"], self.parse_first_data_records, ): yield (0, (offset, rec_type, rec_num, attr_type, file_name, content))
[docs] def run(self): return renderers.TreeGrid( [ ("Offset", format_hints.Hex), ("Record Type", str), ("Record Number", int), ("MFT Type", str), ("Filename", str), ("Hexdump", format_hints.HexBytes), ], self._generator(), )