# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import json
import logging
import os
import pathlib
import zipfile
from typing import Generator, List
from volatility3 import schemas, symbols
from volatility3.framework import constants, interfaces, renderers
from volatility3.framework.automagic import symbol_cache
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.layers import resources
vollog = logging.getLogger(__name__)
[docs]class IsfInfo(plugins.PluginInterface):
"""Determines information about the currently available ISF files, or a specific one"""
_required_framework_version = (2, 0, 0)
_version = (2, 0, 0)
[docs] @classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.ListRequirement(name = 'filter',
description = 'String that must be present in the file URI to display the ISF',
optional = True,
default = []),
requirements.URIRequirement(name = 'isf',
description = "Specific ISF file to process",
default = None,
optional = True),
requirements.BooleanRequirement(name = 'validate',
description = 'Validate against schema if possible',
default = False,
optional = True),
requirements.VersionRequirement(name = 'SQLiteCache',
component = symbol_cache.SqliteCache,
version = (1, 0, 0)),
requirements.BooleanRequirement(name = 'live',
description = 'Traverse all files, rather than use the cache',
default = False,
optional = True)
]
[docs] @classmethod
def list_all_isf_files(cls) -> Generator[str, None, None]:
"""Lists all the ISF files that can be found"""
for symbol_path in symbols.__path__:
for root, dirs, files in os.walk(symbol_path, followlinks = True):
for filename in files:
base_name = os.path.join(root, filename)
if filename.endswith('zip'):
with zipfile.ZipFile(base_name, 'r') as zfile:
for name in zfile.namelist():
for extension in constants.ISF_EXTENSIONS:
# By ending with an extension (and therefore, not /), we should not return any directories
if name.endswith(extension):
yield "jar:file:" + str(pathlib.Path(base_name)) + "!" + name
else:
for extension in constants.ISF_EXTENSIONS:
if filename.endswith(extension):
yield pathlib.Path(base_name).as_uri()
def _generator(self):
if self.config.get('isf', None) is not None:
file_list = [self.config['isf']]
else:
file_list = list(self.list_all_isf_files())
# Filter the files
filtered_list = []
if not len(self.config['filter']):
filtered_list = file_list
else:
for isf_file in file_list:
for filter_item in self.config['filter']:
if filter_item in isf_file:
filtered_list.append(isf_file)
try:
import jsonschema
if not self.config['validate']:
raise ImportError # Act as if we couldn't import if validation is turned off
def check_valid(data):
return "True" if schemas.validate(data, True) else "False"
except ImportError:
def check_valid(data):
return "Unknown"
if self.config['live']:
# Process the filtered list
for entry in filtered_list:
num_types = num_enums = num_bases = num_symbols = 0
valid = "Unknown"
with resources.ResourceAccessor().open(url = entry) as fp:
try:
data = json.load(fp)
num_symbols = len(data.get('symbols', []))
num_types = len(data.get('user_types', []))
num_enums = len(data.get('enums', []))
num_bases = len(data.get('base_types', []))
identifiers_path = os.path.join(constants.CACHE_PATH, constants.IDENTIFIERS_FILENAME)
identifier_cache = symbol_cache.SqliteCache(identifiers_path)
identifier = identifier_cache.get_identifier(location = entry)
if identifier:
identifier = identifier.decode('utf-8', errors = 'replace')
else:
identifier = renderers.NotAvailableValue()
valid = check_valid(data)
except (UnicodeDecodeError, json.decoder.JSONDecodeError):
vollog.warning(f"Invalid ISF: {entry}")
yield (0, (entry, valid, num_bases, num_types, num_symbols, num_enums, identifier))
else:
identifiers_path = os.path.join(constants.CACHE_PATH, constants.IDENTIFIERS_FILENAME)
cache = symbol_cache.SqliteCache(identifiers_path)
valid = 'Unknown'
for identifier, location in cache.get_identifier_dictionary().items():
num_bases, num_types, num_enums, num_symbols = cache.get_location_statistics(location)
if identifier:
json_hash = cache.get_hash(location)
if json_hash and json_hash in schemas.cached_validations:
valid = 'True (cached)'
if self.config['validate']:
# Even if we're not live, if we've been explicitly asked to validate, then do-so
with resources.ResourceAccessor().open(url = location) as fp:
try:
data = json.load(fp)
valid = check_valid(data)
except (UnicodeDecodeError, json.decoder.JSONDecodeError):
vollog.warning(f"Invalid ISF: {location}")
yield (0, (location, valid, num_bases, num_types, num_symbols, num_enums, str(identifier)))
# Try to open the file, load it as JSON, read the data from it
[docs] def run(self):
return renderers.TreeGrid([("URI", str), ("Valid", str),
("Number of base_types", int), ("Number of types", int), ("Number of symbols", int),
("Number of enums", int), ("Identifying information", str)], self._generator())