Source code for volatility3.plugins.isfinfo

# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import json
import logging
import os
import pathlib
import zipfile
from typing import Generator, List

from volatility3 import schemas, symbols
from volatility3.framework import constants, interfaces, renderers
from volatility3.framework.automagic import symbol_cache
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.layers import resources

vollog = logging.getLogger(__name__)


[docs]class IsfInfo(plugins.PluginInterface): """Determines information about the currently available ISF files, or a specific one""" _required_framework_version = (2, 0, 0) _version = (2, 0, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ListRequirement(name = 'filter', description = 'String that must be present in the file URI to display the ISF', optional = True, default = []), requirements.URIRequirement(name = 'isf', description = "Specific ISF file to process", default = None, optional = True), requirements.BooleanRequirement(name = 'validate', description = 'Validate against schema if possible', default = False, optional = True), requirements.VersionRequirement(name = 'SQLiteCache', component = symbol_cache.SqliteCache, version = (1, 0, 0)), requirements.BooleanRequirement(name = 'live', description = 'Traverse all files, rather than use the cache', default = False, optional = True) ]
[docs] @classmethod def list_all_isf_files(cls) -> Generator[str, None, None]: """Lists all the ISF files that can be found""" for symbol_path in symbols.__path__: for root, dirs, files in os.walk(symbol_path, followlinks = True): for filename in files: base_name = os.path.join(root, filename) if filename.endswith('zip'): with zipfile.ZipFile(base_name, 'r') as zfile: for name in zfile.namelist(): for extension in constants.ISF_EXTENSIONS: # By ending with an extension (and therefore, not /), we should not return any directories if name.endswith(extension): yield "jar:file:" + str(pathlib.Path(base_name)) + "!" + name else: for extension in constants.ISF_EXTENSIONS: if filename.endswith(extension): yield pathlib.Path(base_name).as_uri()
def _generator(self): if self.config.get('isf', None) is not None: file_list = [self.config['isf']] else: file_list = list(self.list_all_isf_files()) # Filter the files filtered_list = [] if not len(self.config['filter']): filtered_list = file_list else: for isf_file in file_list: for filter_item in self.config['filter']: if filter_item in isf_file: filtered_list.append(isf_file) try: import jsonschema if not self.config['validate']: raise ImportError # Act as if we couldn't import if validation is turned off def check_valid(data): return "True" if schemas.validate(data, True) else "False" except ImportError: def check_valid(data): return "Unknown" if self.config['live']: # Process the filtered list for entry in filtered_list: num_types = num_enums = num_bases = num_symbols = 0 valid = "Unknown" with resources.ResourceAccessor().open(url = entry) as fp: try: data = json.load(fp) num_symbols = len(data.get('symbols', [])) num_types = len(data.get('user_types', [])) num_enums = len(data.get('enums', [])) num_bases = len(data.get('base_types', [])) identifiers_path = os.path.join(constants.CACHE_PATH, constants.IDENTIFIERS_FILENAME) identifier_cache = symbol_cache.SqliteCache(identifiers_path) identifier = identifier_cache.get_identifier(location = entry) if identifier: identifier = identifier.decode('utf-8', errors = 'replace') else: identifier = renderers.NotAvailableValue() valid = check_valid(data) except (UnicodeDecodeError, json.decoder.JSONDecodeError): vollog.warning(f"Invalid ISF: {entry}") yield (0, (entry, valid, num_bases, num_types, num_symbols, num_enums, identifier)) else: identifiers_path = os.path.join(constants.CACHE_PATH, constants.IDENTIFIERS_FILENAME) cache = symbol_cache.SqliteCache(identifiers_path) valid = 'Unknown' for identifier, location in cache.get_identifier_dictionary().items(): num_bases, num_types, num_enums, num_symbols = cache.get_location_statistics(location) if identifier: json_hash = cache.get_hash(location) if json_hash and json_hash in schemas.cached_validations: valid = 'True (cached)' if self.config['validate']: # Even if we're not live, if we've been explicitly asked to validate, then do-so with resources.ResourceAccessor().open(url = location) as fp: try: data = json.load(fp) valid = check_valid(data) except (UnicodeDecodeError, json.decoder.JSONDecodeError): vollog.warning(f"Invalid ISF: {location}") yield (0, (location, valid, num_bases, num_types, num_symbols, num_enums, str(identifier))) # Try to open the file, load it as JSON, read the data from it
[docs] def run(self): return renderers.TreeGrid([("URI", str), ("Valid", str), ("Number of base_types", int), ("Number of types", int), ("Number of symbols", int), ("Number of enums", int), ("Identifying information", str)], self._generator())