# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import hashlib
import json
import logging
import os
import re
from typing import Any, Dict, Optional, Set, Tuple
from volatility3.framework import constants
vollog = logging.getLogger(__name__)
cached_validation_filepath = os.path.join(constants.CACHE_PATH, "valid_isf.hashcache")
[docs]def load_cached_validations() -> Set[str]:
"""Loads up the list of successfully cached json objects, so we don't need
to revalidate them."""
validhashes: Set = set()
if os.path.exists(cached_validation_filepath):
with open(cached_validation_filepath, "r") as f:
validhashes.update(json.load(f))
return validhashes
[docs]def record_cached_validations(validations: Set[str]) -> None:
"""Record the cached validations, so we don't need to revalidate them in
future."""
with open(cached_validation_filepath, "w") as f:
json.dump(list(validations), f)
cached_validations = load_cached_validations()
[docs]def validate(input: Dict[str, Any], use_cache: bool = True) -> bool:
"""Validates an input JSON file based upon."""
format = input.get("metadata", {}).get("format", None)
if not format:
vollog.debug("No schema format defined")
return False
basepath = os.path.abspath(os.path.dirname(__file__))
schema_path = os.path.join(basepath, "schema-" + format + ".json")
if not os.path.exists(schema_path):
vollog.debug(f"Schema for format not found: {schema_path}")
return False
with open(schema_path, "r") as s:
schema = json.load(s)
return valid(input, schema, use_cache)
[docs]def create_json_hash(
input: Dict[str, Any], schema: Optional[Dict[str, Any]] = None
) -> Optional[str]:
"""Constructs the hash of the input and schema to create a unique
identifier for a particular JSON file."""
if schema is None:
format = input.get("metadata", {}).get("format", None)
if not format:
vollog.debug("No schema format defined")
return None
basepath = os.path.abspath(os.path.dirname(__file__))
schema_path = os.path.join(basepath, "schema-" + format + ".json")
if not os.path.exists(schema_path):
vollog.debug(f"Schema for format not found: {schema_path}")
return None
with open(schema_path, "r") as s:
schema = json.load(s)
return hashlib.sha1(
bytes(json.dumps((input, schema), sort_keys=True), "utf-8")
).hexdigest()
[docs]def valid(
input: Dict[str, Any], schema: Dict[str, Any], use_cache: bool = True
) -> bool:
"""Validates a json schema."""
producer = input.get("metadata", {}).get("producer", {})
if producer and producer.get("name") == "dwarf2json":
dwarf2json_version = parse_producer_version(producer.get("version", ""))
# No warnings if version couldn't be parsed, as it's not our role here
# to validate the schema.
if dwarf2json_version:
if dwarf2json_check_rust_type_confusion(input, dwarf2json_version):
vollog.warning(
"This ISF was generated by dwarf2json < 0.9.0, which is known to produce inaccurate results (see dwarf2json GitHub issue #63)."
)
input_hash = create_json_hash(input, schema)
if input_hash in cached_validations and use_cache:
return True
try:
import jsonschema
except ImportError:
vollog.info("Dependency for validation unavailable: jsonschema")
vollog.debug("All validations will report success, even with malformed input")
return True
try:
vollog.debug("Validating JSON against schema...")
jsonschema.validate(input, schema)
cached_validations.add(input_hash)
vollog.debug("JSON validated against schema (result cached)")
except jsonschema.exceptions.SchemaError:
vollog.debug("Schema validation error", exc_info=True)
return False
record_cached_validations(cached_validations)
return True
[docs]def parse_producer_version(version_string: str) -> Optional[Tuple[int]]:
"""Parses a producer version and returns a tuple of identifiers.
Args:
version_string: string containing dot-separated integers,
expected to follow the Volatility3 versioning schema
Returns:
A tuple containing each version identifier
"""
identifiers = re.search("^(\\d+)[.](\\d+)[.](\\d+)$", version_string)
if not identifiers:
return None
return tuple(int(d) for d in identifiers.groups())
# dwarf2json sanity checks #
[docs]def dwarf2json_check_rust_type_confusion(
input: Dict[str, Any], dwarf2json_version: Tuple[int]
) -> bool:
"""dwarf2json sanity check for Rust and C types confusion:
- dwarf2json #63
- volatility3 #1305
Args:
dwarf2json_version: a tuple containing each version identifier
Returns:
True if the issue was detected
"""
return "rust_helper_BUG" in input.get("symbols", {}) and dwarf2json_version < (
0,
9,
0,
)