Source code for volatility3.plugins.isfinfo

# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at
import json
import logging
import os
import pathlib
import zipfile
from typing import Generator, List

from volatility3 import schemas, symbols
from volatility3.framework import constants, interfaces, renderers
from volatility3.framework.automagic import symbol_cache
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.layers import resources

vollog = logging.getLogger(__name__)

[docs]class IsfInfo(plugins.PluginInterface): """Determines information about the currently available ISF files, or a specific one""" _required_framework_version = (2, 0, 0) _version = (2, 0, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ListRequirement( name="filter", description="String that must be present in the file URI to display the ISF", optional=True, default=[], ), requirements.URIRequirement( name="isf", description="Specific ISF file to process", default=None, optional=True, ), requirements.BooleanRequirement( name="validate", description="Validate against schema if possible", default=False, optional=True, ), requirements.VersionRequirement( name="SQLiteCache", component=symbol_cache.SqliteCache, version=(1, 0, 0), ), requirements.BooleanRequirement( name="live", description="Traverse all files, rather than use the cache", default=False, optional=True, ), ]
[docs] @classmethod def list_all_isf_files(cls) -> Generator[str, None, None]: """Lists all the ISF files that can be found""" for symbol_path in symbols.__path__: for root, dirs, files in os.walk(symbol_path, followlinks=True): for filename in files: base_name = os.path.join(root, filename) if filename.endswith("zip"): with zipfile.ZipFile(base_name, "r") as zfile: for name in zfile.namelist(): for extension in constants.ISF_EXTENSIONS: # By ending with an extension (and therefore, not /), we should not return any directories if name.endswith(extension): yield "jar:file:" + str( pathlib.Path(base_name) ) + "!" + name else: for extension in constants.ISF_EXTENSIONS: if filename.endswith(extension): yield pathlib.Path(base_name).as_uri()
def _generator(self): if self.config.get("isf", None) is not None: file_list = [self.config["isf"]] else: file_list = list(self.list_all_isf_files()) # Filter the files filtered_list = [] if not len(self.config["filter"]): filtered_list = file_list else: for isf_file in file_list: for filter_item in self.config["filter"]: if filter_item in isf_file: filtered_list.append(isf_file) try: import jsonschema if not self.config["validate"]: raise ImportError # Act as if we couldn't import if validation is turned off def check_valid(data): return "True" if schemas.validate(data, True) else "False" except ImportError: def check_valid(data): return "Unknown" if self.config["live"]: # Process the filtered list for entry in filtered_list: num_types = num_enums = num_bases = num_symbols = 0 valid = "Unknown" with resources.ResourceAccessor().open(url=entry) as fp: try: data = json.load(fp) num_symbols = len(data.get("symbols", [])) num_types = len(data.get("user_types", [])) num_enums = len(data.get("enums", [])) num_bases = len(data.get("base_types", [])) identifiers_path = os.path.join( constants.CACHE_PATH, constants.IDENTIFIERS_FILENAME ) identifier_cache = symbol_cache.SqliteCache(identifiers_path) identifier = identifier_cache.get_identifier(location=entry) if identifier: identifier = identifier.decode("utf-8", errors="replace") else: identifier = renderers.NotAvailableValue() valid = check_valid(data) except (UnicodeDecodeError, json.decoder.JSONDecodeError): vollog.warning(f"Invalid ISF: {entry}") yield ( 0, ( entry, valid, num_bases, num_types, num_symbols, num_enums, identifier, ), ) else: identifiers_path = os.path.join( constants.CACHE_PATH, constants.IDENTIFIERS_FILENAME ) cache = symbol_cache.SqliteCache(identifiers_path) valid = "Unknown" for identifier, location in cache.get_identifier_dictionary().items(): ( num_bases, num_types, num_enums, num_symbols, ) = cache.get_location_statistics(location) if identifier: json_hash = cache.get_hash(location) if json_hash and json_hash in schemas.cached_validations: valid = "True (cached)" if self.config["validate"]: # Even if we're not live, if we've been explicitly asked to validate, then do-so with resources.ResourceAccessor().open(url=location) as fp: try: data = json.load(fp) valid = check_valid(data) except (UnicodeDecodeError, json.decoder.JSONDecodeError): vollog.warning(f"Invalid ISF: {location}") yield ( 0, ( location, valid, num_bases, num_types, num_symbols, num_enums, str(identifier), ), ) # Try to open the file, load it as JSON, read the data from it
[docs] def run(self): return renderers.TreeGrid( [ ("URI", str), ("Valid", str), ("Number of base_types", int), ("Number of types", int), ("Number of symbols", int), ("Number of enums", int), ("Identifying information", str), ], self._generator(), )