Source code for volatility3.plugins.windows.threads

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
from typing import Callable, Iterable, List, Generator

from volatility3.framework import interfaces, constants
from volatility3.framework.configuration import requirements
from volatility3.plugins.windows import pslist, thrdscan

vollog = logging.getLogger(__name__)


[docs]class Threads(thrdscan.ThrdScan): """Lists process threads""" _required_framework_version = (2, 4, 0) _version = (1, 0, 0) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.implementation = self.list_process_threads
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: # Since we're calling the plugin, make sure we have the plugin's requirements return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", element_type=int, optional=True, ), requirements.PluginRequirement( name="thrdscan", plugin=thrdscan.ThrdScan, version=(1, 1, 0) ), ]
[docs] @classmethod def list_threads( cls, kernel, proc: interfaces.objects.ObjectInterface ) -> Generator[interfaces.objects.ObjectInterface, None, None]: """Lists the Threads of a specific process. Args: proc: _EPROCESS object from which to list the VADs Returns: A list of threads based on the process and filtered based on the filter function """ seen = set() for thread in proc.ThreadListHead.to_list( f"{kernel.symbol_table_name}{constants.BANG}_ETHREAD", "ThreadListEntry" ): if thread.vol.offset in seen: break seen.add(thread.vol.offset) yield thread
[docs] @classmethod def list_process_threads( cls, context: interfaces.context.ContextInterface, module_name: str, ) -> Iterable[interfaces.objects.ObjectInterface]: """Runs through all processes and lists threads for each process""" module = context.modules[module_name] layer_name = module.layer_name symbol_table_name = module.symbol_table_name filter_func = pslist.PsList.create_pid_filter(context.config.get("pid", None)) for proc in pslist.PsList.list_processes( context=context, layer_name=layer_name, symbol_table=symbol_table_name, filter_func=filter_func, ): for thread in cls.list_threads(module, proc): yield thread