Source code for volatility3.plugins.windows.netscan

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import datetime
import logging
import os
from typing import Iterable, List, Optional, Tuple, Type

from volatility3.framework import constants, exceptions, interfaces, renderers, symbols
from volatility3.framework.configuration import requirements
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.windows import versions
from volatility3.framework.symbols.windows.extensions import network
from volatility3.plugins import timeliner
from volatility3.plugins.windows import info, poolscanner

vollog = logging.getLogger(__name__)


[docs]class NetScan(interfaces.plugins.PluginInterface, timeliner.TimeLinerInterface): """Scans for network objects present in a particular windows memory image.""" _required_framework_version = (2, 0, 0) _version = (1, 0, 0)
[docs] @classmethod def get_requirements(cls): return [ requirements.ModuleRequirement(name = 'kernel', description = 'Windows kernel', architectures = ["Intel32", "Intel64"]), requirements.VersionRequirement(name = 'poolscanner', component = poolscanner.PoolScanner, version = (1, 0, 0)), requirements.VersionRequirement(name = 'info', component = info.Info, version = (1, 0, 0)), requirements.BooleanRequirement( name = 'include-corrupt', description = "Radically eases result validation. This will show partially overwritten data. WARNING: the results are likely to include garbage and/or corrupt data. Be cautious!", default = False, optional = True), ]
[docs] @staticmethod def create_netscan_constraints(context: interfaces.context.ContextInterface, symbol_table: str) -> List[poolscanner.PoolConstraint]: """Creates a list of Pool Tag Constraints for network objects. Args: context: The context to retrieve required elements (layers, symbol tables) from symbol_table: The name of an existing symbol table containing the symbols / types Returns: The list containing the built constraints. """ tcpl_size = context.symbol_space.get_type(symbol_table + constants.BANG + "_TCP_LISTENER").size tcpe_size = context.symbol_space.get_type(symbol_table + constants.BANG + "_TCP_ENDPOINT").size udpa_size = context.symbol_space.get_type(symbol_table + constants.BANG + "_UDP_ENDPOINT").size # ~ vollog.debug("Using pool size constraints: TcpL {}, TcpE {}, UdpA {}".format(tcpl_size, tcpe_size, udpa_size)) return [ # TCP listener poolscanner.PoolConstraint(b'TcpL', type_name = symbol_table + constants.BANG + "_TCP_LISTENER", size = (tcpl_size, None), page_type = poolscanner.PoolType.NONPAGED | poolscanner.PoolType.FREE), # TCP Endpoint poolscanner.PoolConstraint(b'TcpE', type_name = symbol_table + constants.BANG + "_TCP_ENDPOINT", size = (tcpe_size, None), page_type = poolscanner.PoolType.NONPAGED | poolscanner.PoolType.FREE), # UDP Endpoint poolscanner.PoolConstraint(b'UdpA', type_name = symbol_table + constants.BANG + "_UDP_ENDPOINT", size = (udpa_size, None), page_type = poolscanner.PoolType.NONPAGED | poolscanner.PoolType.FREE) ]
[docs] @classmethod def determine_tcpip_version(cls, context: interfaces.context.ContextInterface, layer_name: str, nt_symbol_table: str) -> Tuple[str, Type]: """Tries to determine which symbol filename to use for the image's tcpip driver. The logic is partially taken from the info plugin. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate nt_symbol_table: The name of the table containing the kernel symbols Returns: The filename of the symbol table to use. """ # while the failsafe way to determine the version of tcpip.sys would be to # extract the driver and parse its PE header containing the versionstring, # unfortunately that header is not guaranteed to persist within memory. # therefore we determine the version based on the kernel version as testing # with several windows versions has showed this to work out correctly. is_64bit = symbols.symbol_table_is_64bit(context, nt_symbol_table) is_18363_or_later = versions.is_win10_18363_or_later(context = context, symbol_table = nt_symbol_table) if is_64bit: arch = "x64" else: arch = "x86" vers = info.Info.get_version_structure(context, layer_name, nt_symbol_table) kuser = info.Info.get_kuser_structure(context, layer_name, nt_symbol_table) try: vers_minor_version = int(vers.MinorVersion) nt_major_version = int(kuser.NtMajorVersion) nt_minor_version = int(kuser.NtMinorVersion) except ValueError: # vers struct exists, but is not an int anymore? raise NotImplementedError("Kernel Debug Structure version format not supported!") except: # unsure what to raise here. Also, it might be useful to add some kind of fallback, # either to a user-provided version or to another method to determine tcpip.sys's version raise exceptions.VolatilityException( "Kernel Debug Structure missing VERSION/KUSER structure, unable to determine Windows version!") vollog.debug("Determined OS Version: {}.{} {}.{}".format(kuser.NtMajorVersion, kuser.NtMinorVersion, vers.MajorVersion, vers.MinorVersion)) if nt_major_version == 10 and arch == "x64": # win10 x64 has an additional class type we have to include. class_types = network.win10_x64_class_types else: # default to general class types class_types = network.class_types # these versions are listed explicitly because symbol files differ based on # version *and* architecture. this is currently the clearest way to show # the differences, even if it introduces a fair bit of redundancy. # furthermore, it is easy to append new versions. if arch == "x86": version_dict = { (6, 0, 6000): "netscan-vista-x86", (6, 0, 6001): "netscan-vista-x86", (6, 0, 6002): "netscan-vista-x86", (6, 0, 6003): "netscan-vista-x86", (6, 1, 7600): "netscan-win7-x86", (6, 1, 7601): "netscan-win7-x86", (6, 1, 8400): "netscan-win7-x86", (6, 2, 9200): "netscan-win8-x86", (6, 3, 9600): "netscan-win81-x86", (10, 0, 10240): "netscan-win10-10240-x86", (10, 0, 10586): "netscan-win10-10586-x86", (10, 0, 14393): "netscan-win10-14393-x86", (10, 0, 15063): "netscan-win10-15063-x86", (10, 0, 16299): "netscan-win10-15063-x86", (10, 0, 17134): "netscan-win10-17134-x86", (10, 0, 17763): "netscan-win10-17134-x86", (10, 0, 18362): "netscan-win10-17134-x86", (10, 0, 18363): "netscan-win10-17134-x86" } else: version_dict = { (6, 0, 6000): "netscan-vista-x64", (6, 0, 6001): "netscan-vista-sp12-x64", (6, 0, 6002): "netscan-vista-sp12-x64", (6, 0, 6003): "netscan-vista-sp12-x64", (6, 1, 7600): "netscan-win7-x64", (6, 1, 7601): "netscan-win7-x64", (6, 1, 8400): "netscan-win7-x64", (6, 2, 9200): "netscan-win8-x64", (6, 3, 9600): "netscan-win81-x64", (10, 0, 10240): "netscan-win10-x64", (10, 0, 10586): "netscan-win10-x64", (10, 0, 14393): "netscan-win10-x64", (10, 0, 15063): "netscan-win10-15063-x64", (10, 0, 16299): "netscan-win10-16299-x64", (10, 0, 17134): "netscan-win10-17134-x64", (10, 0, 17763): "netscan-win10-17763-x64", (10, 0, 18362): "netscan-win10-18362-x64", (10, 0, 18363): "netscan-win10-18363-x64", (10, 0, 19041): "netscan-win10-19041-x64" } # special use case: Win10_18363 is not recognized by windows.info as 18363 # because all kernel file headers and debug structures report 18363 as # "10.0.18362.1198" with the last part being incremented. However, we can use # os_distinguisher to differentiate between 18362 and 18363 if vers_minor_version == 18362 and is_18363_or_later: vollog.debug("Detected 18363 data structures: working with 18363 symbol table.") vers_minor_version = 18363 # when determining the symbol file we have to consider the following cases: # the determined version's symbol file is found by intermed.create -> proceed # the determined version's symbol file is not found by intermed -> intermed will throw an exc and abort # the determined version has no mapped symbol file -> if win10 use latest, otherwise throw exc # windows version cannot be determined -> throw exc filename = version_dict.get((nt_major_version, nt_minor_version, vers_minor_version)) if not filename: # no match on filename means that we possibly have a version newer than those listed here. # try to grab the latest supported version of the current image NT version. If that symbol # version does not work, support has to be added manually. current_versions = [ key for key in list(version_dict.keys()) if key[0] == nt_major_version and key[1] == nt_minor_version ] current_versions.sort() if current_versions: latest_version = current_versions[-1] filename = version_dict.get(latest_version) vollog.debug(f"Unable to find exact matching symbol file, going with latest: {filename}") else: raise NotImplementedError("This version of Windows is not supported: {}.{} {}.{}!".format( nt_major_version, nt_minor_version, vers.MajorVersion, vers_minor_version)) vollog.debug(f"Determined symbol filename: {filename}") return filename, class_types
[docs] @classmethod def create_netscan_symbol_table(cls, context: interfaces.context.ContextInterface, layer_name: str, nt_symbol_table: str, config_path: str) -> str: """Creates a symbol table for TCP Listeners and TCP/UDP Endpoints. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate nt_symbol_table: The name of the table containing the kernel symbols config_path: The config path where to find symbol files Returns: The name of the constructed symbol table """ table_mapping = {"nt_symbols": nt_symbol_table} symbol_filename, class_types = cls.determine_tcpip_version( context, layer_name, nt_symbol_table, ) return intermed.IntermediateSymbolTable.create(context, config_path, os.path.join("windows", "netscan"), symbol_filename, class_types = class_types, table_mapping = table_mapping)
[docs] @classmethod def scan(cls, context: interfaces.context.ContextInterface, layer_name: str, nt_symbol_table: str, netscan_symbol_table: str) -> \ Iterable[interfaces.objects.ObjectInterface]: """Scans for network objects using the poolscanner module and constraints. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate nt_symbol_table: The name of the table containing the kernel symbols netscan_symbol_table: The name of the table containing the network object symbols (_TCP_LISTENER etc.) Returns: A list of network objects found by scanning the `layer_name` layer for network pool signatures """ constraints = cls.create_netscan_constraints(context, netscan_symbol_table) for result in poolscanner.PoolScanner.generate_pool_scan(context, layer_name, nt_symbol_table, constraints): _constraint, mem_object, _header = result yield mem_object
def _generator(self, show_corrupt_results: Optional[bool] = None): """ Generates the network objects for use in rendering. """ kernel = self.context.modules[self.config['kernel']] netscan_symbol_table = self.create_netscan_symbol_table(self.context, kernel.layer_name, kernel.symbol_table_name, self.config_path) for netw_obj in self.scan(self.context, kernel.layer_name, kernel.symbol_table_name, netscan_symbol_table): vollog.debug(f"Found netw obj @ 0x{netw_obj.vol.offset:2x} of assumed type {type(netw_obj)}") # objects passed pool header constraints. check for additional constraints if strict flag is set. if not show_corrupt_results and not netw_obj.is_valid(): continue if isinstance(netw_obj, network._UDP_ENDPOINT): vollog.debug(f"Found UDP_ENDPOINT @ 0x{netw_obj.vol.offset:2x}") # For UdpA, the state is always blank and the remote end is asterisks for ver, laddr, _ in netw_obj.dual_stack_sockets(): yield (0, (format_hints.Hex(netw_obj.vol.offset), "UDP" + ver, laddr, netw_obj.Port, "*", 0, "", netw_obj.get_owner_pid() or renderers.UnreadableValue(), netw_obj.get_owner_procname() or renderers.UnreadableValue(), netw_obj.get_create_time() or renderers.UnreadableValue())) elif isinstance(netw_obj, network._TCP_ENDPOINT): vollog.debug(f"Found _TCP_ENDPOINT @ 0x{netw_obj.vol.offset:2x}") if netw_obj.get_address_family() == network.AF_INET: proto = "TCPv4" elif netw_obj.get_address_family() == network.AF_INET6: proto = "TCPv6" else: proto = "TCPv?" try: state = netw_obj.State.description except ValueError: state = renderers.UnreadableValue() yield (0, (format_hints.Hex(netw_obj.vol.offset), proto, netw_obj.get_local_address() or renderers.UnreadableValue(), netw_obj.LocalPort, netw_obj.get_remote_address() or renderers.UnreadableValue(), netw_obj.RemotePort, state, netw_obj.get_owner_pid() or renderers.UnreadableValue(), netw_obj.get_owner_procname() or renderers.UnreadableValue(), netw_obj.get_create_time() or renderers.UnreadableValue())) # check for isinstance of tcp listener last, because all other objects are inherited from here elif isinstance(netw_obj, network._TCP_LISTENER): vollog.debug(f"Found _TCP_LISTENER @ 0x{netw_obj.vol.offset:2x}") # For TcpL, the state is always listening and the remote port is zero for ver, laddr, raddr in netw_obj.dual_stack_sockets(): yield (0, (format_hints.Hex(netw_obj.vol.offset), "TCP" + ver, laddr, netw_obj.Port, raddr, 0, "LISTENING", netw_obj.get_owner_pid() or renderers.UnreadableValue(), netw_obj.get_owner_procname() or renderers.UnreadableValue(), netw_obj.get_create_time() or renderers.UnreadableValue())) else: # this should not happen therefore we log it. vollog.debug(f"Found network object unsure of its type: {netw_obj} of type {type(netw_obj)}")
[docs] def generate_timeline(self): for row in self._generator(): _depth, row_data = row # Skip network connections without creation time if not isinstance(row_data[9], datetime.datetime): continue row_data = [ "N/A" if isinstance(i, renderers.UnreadableValue) or isinstance(i, renderers.UnparsableValue) else i for i in row_data ] description = "Network connection: Process {} {} Local Address {}:{} " \ "Remote Address {}:{} State {} Protocol {} ".format(row_data[7], row_data[8], row_data[2], row_data[3], row_data[4], row_data[5], row_data[6], row_data[1]) yield (description, timeliner.TimeLinerType.CREATED, row_data[9])
[docs] def run(self): show_corrupt_results = self.config.get('include-corrupt', None) return renderers.TreeGrid([ ("Offset", format_hints.Hex), ("Proto", str), ("LocalAddr", str), ("LocalPort", int), ("ForeignAddr", str), ("ForeignPort", int), ("State", str), ("PID", int), ("Owner", str), ("Created", datetime.datetime), ], self._generator(show_corrupt_results = show_corrupt_results))