Source code for volatility3.framework.automagic.symbol_cache

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import gc
import json
import logging
import os
import pickle
import urllib
import urllib.parse
import urllib.request
from typing import Dict, List, Optional

from volatility3.framework import constants, exceptions, interfaces
from volatility3.framework.symbols import intermed

vollog = logging.getLogger(__name__)

BannersType = Dict[bytes, List[str]]


[docs]class SymbolBannerCache(interfaces.automagic.AutomagicInterface): """Runs through all symbols tables and caches their banners.""" # Since this is necessary for ConstructionMagic, we set a lower priority # The user would run it eventually either way, but running it first means it can be used that run priority = 0 os = None # type: Optional[str] symbol_name = "banner_name" # type: str banner_path = None # type: Optional[str]
[docs] @classmethod def load_banners(cls) -> BannersType: if not cls.banner_path: raise ValueError("Banner_path not appropriately set") banners = {} # type: BannersType if os.path.exists(cls.banner_path): with open(cls.banner_path, "rb") as f: # We use pickle over JSON because we're dealing with bytes objects banners.update(pickle.load(f)) # Remove possibilities that can't exist locally. remove_banners = [] for banner in banners: for path in banners[banner]: url = urllib.parse.urlparse(path) if url.scheme == 'file' and not os.path.exists(urllib.request.url2pathname(url.path)): vollog.log( constants.LOGLEVEL_VV, "Removing cached path {} for banner {}: file does not exist".format( path, str(banner or b'', 'latin-1'))) banners[banner].remove(path) # This is probably excessive, but it's here if we need it # if url.scheme == 'jar': # zip_file, zip_path = url.path.split("!") # zip_file = urllib.parse.urlparse(zip_file).path # if ((not os.path.exists(zip_file)) or (zip_path not in zipfile.ZipFile(zip_file).namelist())): # vollog.log(constants.LOGLEVEL_VV, # "Removing cached path {} for banner {}: file does not exist".format(path, banner)) # banners[banner].remove(path) if not banners[banner]: remove_banners.append(banner) for remove_banner in remove_banners: del banners[remove_banner] return banners
[docs] @classmethod def save_banners(cls, banners): with open(cls.banner_path, "wb") as f: pickle.dump(banners, f)
def __call__(self, context, config_path, configurable, progress_callback = None): """Runs the automagic over the configurable.""" # Bomb out if we're just the generic interface if self.os is None: return # We only need to be called once, so no recursion necessary banners = self.load_banners() cacheables = list(intermed.IntermediateSymbolTable.file_symbol_url(self.os)) for banner in banners: for json_file in banners[banner]: if json_file in cacheables: cacheables.remove(json_file) total = len(cacheables) if total > 0: vollog.info("Building {} caches...".format(self.os)) for current in range(total): if progress_callback is not None: progress_callback(current * 100 / total, "Building {} caches".format(self.os)) isf_url = cacheables[current] isf = None try: # Loading the symbol table will be very slow until it's been validated isf = intermed.IntermediateSymbolTable(context, config_path, "temp", isf_url, validate = False) # We should store the banner against the filename # We don't bother with the hash (it'll likely take too long to validate) # but we should check at least that the banner matches on load. banner = isf.get_symbol(self.symbol_name).constant_data vollog.log(constants.LOGLEVEL_VV, "Caching banner {} for file {}".format(banner, isf_url)) bannerlist = banners.get(banner, []) bannerlist.append(isf_url) banners[banner] = bannerlist except exceptions.SymbolError: pass except json.JSONDecodeError: vollog.log(constants.LOGLEVEL_VV, "Caching file {} failed due to JSON error".format(isf_url)) finally: # Get rid of the loaded file, in case it sits in memory if isf: del isf gc.collect() # Rewrite the cached banners each run, since writing is faster than the banner_cache validation portion self.save_banners(banners) if progress_callback is not None: progress_callback(100, "Built {} caches".format(self.os))