import json
import logging
import os
import pathlib
import zipfile
from typing import Generator, List

from volatility3 import schemas, symbols
from volatility3.framework import constants, interfaces, renderers
from volatility3.framework.automagic import symbol_cache
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.layers import resources

vollog = logging.getLogger(__name__)

[docs]class IsfInfo(plugins.PluginInterface): """Determines information about the currently available ISF files, or a specific one""" _required_framework_version = (2, 0, 0) _version = (2, 0, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ListRequirement( name="filter", description="String that must be present in the file URI to display the ISF", optional=True, default=[], ), requirements.URIRequirement( name="isf", description="Specific ISF file to process", default=None, optional=True, ), requirements.BooleanRequirement( name="validate", description="Validate against schema if possible", default=False, optional=True, ), requirements.VersionRequirement( name="SQLiteCache", component=symbol_cache.SqliteCache, version=(1, 0, 0), ), requirements.BooleanRequirement( name="live", description="Traverse all files, rather than use the cache", default=False, optional=True, ), ]
[docs] @classmethod def list_all_isf_files(cls) -> Generator[str, None, None]: """Lists all the ISF files that can be found""" for symbol_path in symbols.__path__: for root, dirs, files in os.walk(symbol_path, followlinks=True): for filename in files: base_name = os.path.join(root, filename) if filename.endswith("zip"): with zipfile.ZipFile(base_name, "r") as zfile: for name in zfile.namelist(): for extension in constants.ISF_EXTENSIONS: # By ending with an extension (and therefore, not /), we should not return any directories if name.endswith(extension): yield "jar:file:" + str( pathlib.Path(base_name) ) + "!" + name else: for extension in constants.ISF_EXTENSIONS: if filename.endswith(extension): yield pathlib.Path(base_name).as_uri()
def _generator(self): if self.config.get("isf", None) is not None: file_list = [self.config["isf"]] else: file_list = list(self.list_all_isf_files()) # Filter the files filtered_list = [] if not len(self.config["filter"]): filtered_list = file_list else: for isf_file in file_list: for filter_item in self.config["filter"]: if filter_item in isf_file: filtered_list.append(isf_file) try: import jsonschema if not self.config["validate"]: raise ImportError # Act as if we couldn't import if validation is turned off def check_valid(data): return "True" if schemas.validate(data, True) else "False" except ImportError: def check_valid(data): return "Unknown" if self.config["live"]: # Process the filtered list for entry in filtered_list: num_types = num_enums = num_bases = num_symbols = 0 valid = "Unknown" with resources.ResourceAccessor().open(url=entry) as fp: try: data = json.load(fp) num_symbols = len(data.get("symbols", [])) num_types = len(data.get("user_types", [])) num_enums = len(data.get("enums", [])) num_bases = len(data.get("base_types", [])) identifiers_path = os.path.join( constants.CACHE_PATH, constants.IDENTIFIERS_FILENAME ) identifier_cache = symbol_cache.SqliteCache(identifiers_path) identifier = identifier_cache.get_identifier(location=entry) if identifier: identifier = identifier.decode("utf-8", errors="replace") else: identifier = renderers.NotAvailableValue() valid = check_valid(data) except (UnicodeDecodeError, json.decoder.JSONDecodeError): vollog.warning(f"Invalid ISF: {entry}") yield ( 0, ( entry, valid, num_bases, num_types, num_symbols, num_enums, identifier, ), ) else: identifiers_path = os.path.join( constants.CACHE_PATH, constants.IDENTIFIERS_FILENAME ) cache = symbol_cache.SqliteCache(identifiers_path) valid = "Unknown" for identifier, location in cache.get_identifier_dictionary().items(): ( num_bases, num_types, num_enums, num_symbols, ) = cache.get_location_statistics(location) if identifier: json_hash = cache.get_hash(location) if json_hash and json_hash in schemas.cached_validations: valid = "True (cached)" if self.config["validate"]: # Even if we're not live, if we've been explicitly asked to validate, then do-so with resources.ResourceAccessor().open(url=location) as fp: try: data = json.load(fp) valid = check_valid(data) except (UnicodeDecodeError, json.decoder.JSONDecodeError): vollog.warning(f"Invalid ISF: {location}") yield ( 0, ( location, valid, num_bases, num_types, num_symbols, num_enums, str(identifier), ), ) # Try to open the file, load it as JSON, read the data from it
[docs] def run(self): return renderers.TreeGrid( [ ("URI", str), ("Valid", str), ("Number of base_types", int), ("Number of types", int), ("Number of symbols", int), ("Number of enums", int), ("Identifying information", str), ], self._generator(), )