Source code for

# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at

import json
import logging
import ntpath
import os
import re
from typing import List, Dict, Union

from volatility3.framework import (
from volatility3.framework.configuration import requirements
from volatility3.framework.renderers import format_hints
from import registry
from import pslist
from import hivelist

vollog = logging.getLogger(__name__)

[docs]def find_sid_re( sid_string, sid_re_list ) -> Union[str, interfaces.renderers.BaseAbsentValue]: for reg, name in sid_re_list: if return name return renderers.NotAvailableValue()
[docs]class GetSIDs(interfaces.plugins.PluginInterface): """Print the SIDs owning each process""" _version = (1, 0, 0) _required_framework_version = (2, 0, 0) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) for plugin_dir in constants.PLUGINS_PATH: sids_json_file_name = os.path.join( plugin_dir, os.path.join("windows", "sids_and_privileges.json") ) if os.path.exists(sids_json_file_name): break else: vollog.log( constants.LOGLEVEL_VVV, "sids_and_privileges.json file is missing plugin error", ) raise RuntimeError( "The sids_and_privileges.json file missed from you plugin directory" ) # Get all the sids from the json file. with open(sids_json_file_name, "r") as file_handle: sids_json_data = json.load(file_handle) self.servicesids = sids_json_data["service sids"] self.well_known_sids = sids_json_data["well known"] # Compile all the sids regex. self.well_known_sid_re = [ (re.compile(c_list[0]), c_list[1]) for c_list in sids_json_data["sids re"] ]
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), requirements.PluginRequirement( name="pslist", plugin=pslist.PsList, version=(2, 0, 0) ), requirements.PluginRequirement( name="hivelist", plugin=hivelist.HiveList, version=(1, 0, 0) ), ]
[docs] def lookup_user_sids(self) -> Dict[str, str]: """ Enumerate the registry for all the users. Returns: An dictionary of {sid: user name} """ key = "Microsoft\\Windows NT\\CurrentVersion\\ProfileList" val = "ProfileImagePath" kernel = self.context.modules[self.config["kernel"]] sids = {} for hive in hivelist.HiveList.list_hives( context=self.context, base_config_path=self.config_path, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name, filter_string="config\\software", hive_offsets=None, ): try: for subkey in hive.get_key(key).get_subkeys(): sid = str(subkey.get_name()) path = "" for node in subkey.get_values(): try: value_node_name = node.get_name() or "(Default)" except ( exceptions.InvalidAddressException, layers.registry.RegistryFormatException, ) as excp: continue try: value_data = node.decode_data() if isinstance(value_data, int): value_data = format_hints.MultiTypeData( value_data, encoding="utf-8" ) elif ( registry.RegValueTypes(node.Type) == registry.RegValueTypes.REG_BINARY ): value_data = format_hints.MultiTypeData( value_data, show_hex=True ) elif ( registry.RegValueTypes(node.Type) == registry.RegValueTypes.REG_MULTI_SZ ): value_data = format_hints.MultiTypeData( value_data, encoding="utf-16-le", split_nulls=True ) else: value_data = format_hints.MultiTypeData( value_data, encoding="utf-16-le" ) if value_node_name == val: path = str(value_data).replace("\\x00", "")[:-1] user = ntpath.basename(path) sids[sid] = user except ( ValueError, exceptions.InvalidAddressException, layers.registry.RegistryFormatException, ) as excp: continue except (KeyError, exceptions.InvalidAddressException): continue return sids
def _generator(self, procs): user_sids = self.lookup_user_sids() # Go all over the process list, get the token for task in procs: # Make sure we have a valid token try: token = task.Token.dereference().cast("_TOKEN") except exceptions.InvalidAddressException: token = False if not token or not isinstance(token, interfaces.objects.ObjectInterface): yield ( 0, [ int(task.UniqueProcessId), str(task.ImageFileName), "Token unreadable", "", ], ) continue # Go all over the sids and try to translate them with one of the tables we have for sid_string in token.get_sids(): if sid_string in self.well_known_sids: sid_name = self.well_known_sids[sid_string] elif sid_string in self.servicesids: sid_name = self.servicesids[sid_string] elif sid_string in user_sids: sid_name = user_sids[sid_string] else: sid_name_re = find_sid_re(sid_string, self.well_known_sid_re) if sid_name_re: sid_name = sid_name_re else: sid_name = "" yield ( 0, ( task.UniqueProcessId, objects.utility.array_to_string(task.ImageFileName), sid_string, sid_name, ), )
[docs] def run(self): filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) kernel = self.context.modules[self.config["kernel"]] return renderers.TreeGrid( [("PID", int), ("Process", str), ("SID", str), ("Name", str)], self._generator( pslist.PsList.list_processes( context=self.context, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name, filter_func=filter_func, ) ), )