Source code for volatility3.schemas

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import hashlib
import json
import logging
import os
import re
from typing import Any, Dict, Optional, Set, Tuple
from volatility3.framework import constants

vollog = logging.getLogger(__name__)

cached_validation_filepath = os.path.join(constants.CACHE_PATH, "valid_isf.hashcache")


[docs]def load_cached_validations() -> Set[str]: """Loads up the list of successfully cached json objects, so we don't need to revalidate them.""" validhashes: Set = set() if os.path.exists(cached_validation_filepath): with open(cached_validation_filepath, "r") as f: validhashes.update(json.load(f)) return validhashes
[docs]def record_cached_validations(validations: Set[str]) -> None: """Record the cached validations, so we don't need to revalidate them in future.""" with open(cached_validation_filepath, "w") as f: json.dump(list(validations), f)
cached_validations = load_cached_validations()
[docs]def validate(input: Dict[str, Any], use_cache: bool = True) -> bool: """Validates an input JSON file based upon.""" format = input.get("metadata", {}).get("format", None) if not format: vollog.debug("No schema format defined") return False basepath = os.path.abspath(os.path.dirname(__file__)) schema_path = os.path.join(basepath, "schema-" + format + ".json") if not os.path.exists(schema_path): vollog.debug(f"Schema for format not found: {schema_path}") return False with open(schema_path, "r") as s: schema = json.load(s) return valid(input, schema, use_cache)
[docs]def create_json_hash( input: Dict[str, Any], schema: Optional[Dict[str, Any]] = None ) -> Optional[str]: """Constructs the hash of the input and schema to create a unique identifier for a particular JSON file.""" if schema is None: format = input.get("metadata", {}).get("format", None) if not format: vollog.debug("No schema format defined") return None basepath = os.path.abspath(os.path.dirname(__file__)) schema_path = os.path.join(basepath, "schema-" + format + ".json") if not os.path.exists(schema_path): vollog.debug(f"Schema for format not found: {schema_path}") return None with open(schema_path, "r") as s: schema = json.load(s) return hashlib.sha1( bytes(json.dumps((input, schema), sort_keys=True), "utf-8") ).hexdigest()
[docs]def valid( input: Dict[str, Any], schema: Dict[str, Any], use_cache: bool = True ) -> bool: """Validates a json schema.""" producer = input.get("metadata", {}).get("producer", {}) if producer and producer.get("name") == "dwarf2json": dwarf2json_version = parse_producer_version(producer.get("version", "")) # No warnings if version couldn't be parsed, as it's not our role here # to validate the schema. if dwarf2json_version: if dwarf2json_check_rust_type_confusion(input, dwarf2json_version): vollog.warning( "This ISF was generated by dwarf2json < 0.9.0, which is known to produce inaccurate results (see dwarf2json GitHub issue #63)." ) input_hash = create_json_hash(input, schema) if input_hash in cached_validations and use_cache: return True try: import jsonschema except ImportError: vollog.info("Dependency for validation unavailable: jsonschema") vollog.debug("All validations will report success, even with malformed input") return True try: vollog.debug("Validating JSON against schema...") jsonschema.validate(input, schema) cached_validations.add(input_hash) vollog.debug("JSON validated against schema (result cached)") except jsonschema.exceptions.SchemaError: vollog.debug("Schema validation error", exc_info=True) return False record_cached_validations(cached_validations) return True
[docs]def parse_producer_version(version_string: str) -> Optional[Tuple[int]]: """Parses a producer version and returns a tuple of identifiers. Args: version_string: string containing dot-separated integers, expected to follow the Volatility3 versioning schema Returns: A tuple containing each version identifier """ identifiers = re.search("^(\\d+)[.](\\d+)[.](\\d+)$", version_string) if not identifiers: return None return tuple(int(d) for d in identifiers.groups())
# dwarf2json sanity checks #
[docs]def dwarf2json_check_rust_type_confusion( input: Dict[str, Any], dwarf2json_version: Tuple[int] ) -> bool: """dwarf2json sanity check for Rust and C types confusion: - dwarf2json #63 - volatility3 #1305 Args: dwarf2json_version: a tuple containing each version identifier Returns: True if the issue was detected """ return "rust_helper_BUG" in input.get("symbols", {}) and dwarf2json_version < ( 0, 9, 0, )