Source code for volatility3.plugins.windows.netscan

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import datetime
import logging
import os
from typing import Iterable, List, Optional, Tuple, Type

from volatility3.framework import constants, exceptions, interfaces, renderers, symbols
from volatility3.framework.configuration import requirements
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.windows import versions
from volatility3.framework.symbols.windows.extensions import network
from volatility3.plugins import timeliner
from volatility3.plugins.windows import info, poolscanner, verinfo

vollog = logging.getLogger(__name__)


[docs]class NetScan(interfaces.plugins.PluginInterface, timeliner.TimeLinerInterface): """Scans for network objects present in a particular windows memory image.""" _required_framework_version = (2, 0, 0) _version = (1, 0, 0)
[docs] @classmethod def get_requirements(cls): return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( name="poolscanner", component=poolscanner.PoolScanner, version=(1, 0, 0) ), requirements.VersionRequirement( name="info", component=info.Info, version=(1, 0, 0) ), requirements.VersionRequirement( name="verinfo", component=verinfo.VerInfo, version=(1, 0, 0) ), requirements.BooleanRequirement( name="include-corrupt", description="Radically eases result validation. This will show partially overwritten data. WARNING: the results are likely to include garbage and/or corrupt data. Be cautious!", default=False, optional=True, ), ]
[docs] @staticmethod def create_netscan_constraints( context: interfaces.context.ContextInterface, symbol_table: str ) -> List[poolscanner.PoolConstraint]: """Creates a list of Pool Tag Constraints for network objects. Args: context: The context to retrieve required elements (layers, symbol tables) from symbol_table: The name of an existing symbol table containing the symbols / types Returns: The list containing the built constraints. """ tcpl_size = context.symbol_space.get_type( symbol_table + constants.BANG + "_TCP_LISTENER" ).size tcpe_size = context.symbol_space.get_type( symbol_table + constants.BANG + "_TCP_ENDPOINT" ).size udpa_size = context.symbol_space.get_type( symbol_table + constants.BANG + "_UDP_ENDPOINT" ).size # ~ vollog.debug("Using pool size constraints: TcpL {}, TcpE {}, UdpA {}".format(tcpl_size, tcpe_size, udpa_size)) return [ # TCP listener poolscanner.PoolConstraint( b"TcpL", type_name=symbol_table + constants.BANG + "_TCP_LISTENER", size=(tcpl_size, None), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.FREE, ), # TCP Endpoint poolscanner.PoolConstraint( b"TcpE", type_name=symbol_table + constants.BANG + "_TCP_ENDPOINT", size=(tcpe_size, None), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.FREE, ), # UDP Endpoint poolscanner.PoolConstraint( b"UdpA", type_name=symbol_table + constants.BANG + "_UDP_ENDPOINT", size=(udpa_size, None), page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.FREE, ), ]
[docs] @classmethod def determine_tcpip_version( cls, context: interfaces.context.ContextInterface, layer_name: str, nt_symbol_table: str, ) -> Tuple[str, Type]: """Tries to determine which symbol filename to use for the image's tcpip driver. The logic is partially taken from the info plugin. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate nt_symbol_table: The name of the table containing the kernel symbols Returns: The filename of the symbol table to use. """ # while the failsafe way to determine the version of tcpip.sys would be to # extract the driver and parse its PE header containing the versionstring, # unfortunately that header is not guaranteed to persist within memory. # therefore we determine the version based on the kernel version as testing # with several windows versions has showed this to work out correctly. is_64bit = symbols.symbol_table_is_64bit(context, nt_symbol_table) is_18363_or_later = versions.is_win10_18363_or_later( context=context, symbol_table=nt_symbol_table ) if is_64bit: arch = "x64" else: arch = "x86" vers = info.Info.get_version_structure(context, layer_name, nt_symbol_table) kuser = info.Info.get_kuser_structure(context, layer_name, nt_symbol_table) try: vers_minor_version = int(vers.MinorVersion) nt_major_version = int(kuser.NtMajorVersion) nt_minor_version = int(kuser.NtMinorVersion) except ValueError: # vers struct exists, but is not an int anymore? raise NotImplementedError( "Kernel Debug Structure version format not supported!" ) except: # unsure what to raise here. Also, it might be useful to add some kind of fallback, # either to a user-provided version or to another method to determine tcpip.sys's version raise exceptions.VolatilityException( "Kernel Debug Structure missing VERSION/KUSER structure, unable to determine Windows version!" ) vollog.debug( "Determined OS Version: {}.{} {}.{}".format( kuser.NtMajorVersion, kuser.NtMinorVersion, vers.MajorVersion, vers.MinorVersion, ) ) if nt_major_version == 10 and arch == "x64": # win10 x64 has an additional class type we have to include. class_types = network.win10_x64_class_types else: # default to general class types class_types = network.class_types # these versions are listed explicitly because symbol files differ based on # version *and* architecture. this is currently the clearest way to show # the differences, even if it introduces a fair bit of redundancy. # furthermore, it is easy to append new versions. if arch == "x86": version_dict = { (6, 0, 6000, 0): "netscan-vista-x86", (6, 0, 6001, 0): "netscan-vista-x86", (6, 0, 6002, 0): "netscan-vista-x86", (6, 0, 6003, 0): "netscan-vista-x86", (6, 1, 7600, 0): "netscan-win7-x86", (6, 1, 7601, 0): "netscan-win7-x86", (6, 1, 8400, 0): "netscan-win7-x86", (6, 2, 9200, 0): "netscan-win8-x86", (6, 3, 9600, 0): "netscan-win81-x86", (10, 0, 10240, 0): "netscan-win10-10240-x86", (10, 0, 10586, 0): "netscan-win10-10586-x86", (10, 0, 14393, 0): "netscan-win10-14393-x86", (10, 0, 15063, 0): "netscan-win10-15063-x86", (10, 0, 16299, 0): "netscan-win10-15063-x86", (10, 0, 17134, 0): "netscan-win10-17134-x86", (10, 0, 17763, 0): "netscan-win10-17134-x86", (10, 0, 18362, 0): "netscan-win10-17134-x86", (10, 0, 18363, 0): "netscan-win10-17134-x86", } else: version_dict = { (6, 0, 6000, 0): "netscan-vista-x64", (6, 0, 6001, 0): "netscan-vista-sp12-x64", (6, 0, 6002, 0): "netscan-vista-sp12-x64", (6, 0, 6003, 0): "netscan-vista-sp12-x64", (6, 1, 7600, 0): "netscan-win7-x64", (6, 1, 7601, 0): "netscan-win7-x64", (6, 1, 8400, 0): "netscan-win7-x64", (6, 2, 9200, 0): "netscan-win8-x64", (6, 3, 9600, 0): "netscan-win81-x64", (6, 3, 9600, 19935): "netscan-win81-19935-x64", (10, 0, 10240, 0): "netscan-win10-x64", (10, 0, 10586, 0): "netscan-win10-x64", (10, 0, 14393, 0): "netscan-win10-x64", (10, 0, 15063, 0): "netscan-win10-15063-x64", (10, 0, 16299, 0): "netscan-win10-16299-x64", (10, 0, 17134, 0): "netscan-win10-17134-x64", (10, 0, 17763, 0): "netscan-win10-17763-x64", (10, 0, 18362, 0): "netscan-win10-18362-x64", (10, 0, 18363, 0): "netscan-win10-18363-x64", (10, 0, 19041, 0): "netscan-win10-19041-x64", } # we do not need to check for tcpip's specific FileVersion in every case tcpip_mod_version = 0 # keep it 0 as a default # special use cases # Win10_18363 is not recognized by windows.info as 18363 # because all kernel file headers and debug structures report 18363 as # "10.0.18362.1198" with the last part being incremented. However, we can use # os_distinguisher to differentiate between 18362 and 18363 if vers_minor_version == 18362 and is_18363_or_later: vollog.debug( "Detected 18363 data structures: working with 18363 symbol table." ) vers_minor_version = 18363 # we need to define additional version numbers (which are then found via tcpip.sys's FileVersion header) in case there is # ambiguity _within_ an OS version. If such a version number (last number of the tuple) is defined for the current OS # we need to inspect tcpip.sys's headers to see if we can grab the precise version if [ (a, b, c, d) for a, b, c, d in version_dict if (a, b, c) == (nt_major_version, nt_minor_version, vers_minor_version) and d != 0 ]: vollog.debug( "Requiring further version inspection due to OS version by checking tcpip.sys's FileVersion header" ) # the following is IntelLayer specific and might need to be adapted to other architectures. physical_layer_name = context.layers[layer_name].config.get( "memory_layer", None ) if physical_layer_name: ver = verinfo.VerInfo.find_version_info( context, physical_layer_name, "tcpip.sys" ) if ver: tcpip_mod_version = ver[3] vollog.debug( "Determined tcpip.sys's FileVersion: {}".format( tcpip_mod_version ) ) else: vollog.debug("Could not determine tcpip.sys's FileVersion.") else: vollog.debug( "Unable to retrieve physical memory layer, skipping FileVersion check." ) # when determining the symbol file we have to consider the following cases: # the determined version's symbol file is found by intermed.create -> proceed # the determined version's symbol file is not found by intermed -> intermed will throw an exc and abort # the determined version has no mapped symbol file -> if win10 use latest, otherwise throw exc # windows version cannot be determined -> throw exc filename = version_dict.get( (nt_major_version, nt_minor_version, vers_minor_version, tcpip_mod_version) ) if not filename: # no match on filename means that we possibly have a version newer than those listed here. # try to grab the latest supported version of the current image NT version. If that symbol # version does not work, support has to be added manually. current_versions = [ (nt_maj, nt_min, vers_min, tcpip_ver) for nt_maj, nt_min, vers_min, tcpip_ver in version_dict if nt_maj == nt_major_version and nt_min == nt_minor_version and tcpip_ver <= tcpip_mod_version ] current_versions.sort() if current_versions: latest_version = current_versions[-1] filename = version_dict.get(latest_version) vollog.debug( f"Unable to find exact matching symbol file, going with latest: {filename}" ) else: raise NotImplementedError( "This version of Windows is not supported: {}.{} {}.{}!".format( nt_major_version, nt_minor_version, vers.MajorVersion, vers_minor_version, ) ) vollog.debug(f"Determined symbol filename: {filename}") return filename, class_types
[docs] @classmethod def create_netscan_symbol_table( cls, context: interfaces.context.ContextInterface, layer_name: str, nt_symbol_table: str, config_path: str, ) -> str: """Creates a symbol table for TCP Listeners and TCP/UDP Endpoints. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate nt_symbol_table: The name of the table containing the kernel symbols config_path: The config path where to find symbol files Returns: The name of the constructed symbol table """ table_mapping = {"nt_symbols": nt_symbol_table} symbol_filename, class_types = cls.determine_tcpip_version( context, layer_name, nt_symbol_table, ) return intermed.IntermediateSymbolTable.create( context, config_path, os.path.join("windows", "netscan"), symbol_filename, class_types=class_types, table_mapping=table_mapping, )
[docs] @classmethod def scan( cls, context: interfaces.context.ContextInterface, layer_name: str, nt_symbol_table: str, netscan_symbol_table: str, ) -> Iterable[interfaces.objects.ObjectInterface]: """Scans for network objects using the poolscanner module and constraints. Args: context: The context to retrieve required elements (layers, symbol tables) from layer_name: The name of the layer on which to operate nt_symbol_table: The name of the table containing the kernel symbols netscan_symbol_table: The name of the table containing the network object symbols (_TCP_LISTENER etc.) Returns: A list of network objects found by scanning the `layer_name` layer for network pool signatures """ constraints = cls.create_netscan_constraints(context, netscan_symbol_table) for result in poolscanner.PoolScanner.generate_pool_scan( context, layer_name, nt_symbol_table, constraints ): _constraint, mem_object, _header = result yield mem_object
def _generator(self, show_corrupt_results: Optional[bool] = None): """Generates the network objects for use in rendering.""" kernel = self.context.modules[self.config["kernel"]] netscan_symbol_table = self.create_netscan_symbol_table( self.context, kernel.layer_name, kernel.symbol_table_name, self.config_path ) for netw_obj in self.scan( self.context, kernel.layer_name, kernel.symbol_table_name, netscan_symbol_table, ): vollog.debug( f"Found netw obj @ 0x{netw_obj.vol.offset:2x} of assumed type {type(netw_obj)}" ) # objects passed pool header constraints. check for additional constraints if strict flag is set. if not show_corrupt_results and not netw_obj.is_valid(): continue if isinstance(netw_obj, network._UDP_ENDPOINT): vollog.debug(f"Found UDP_ENDPOINT @ 0x{netw_obj.vol.offset:2x}") # For UdpA, the state is always blank and the remote end is asterisks for ver, laddr, _ in netw_obj.dual_stack_sockets(): yield ( 0, ( format_hints.Hex(netw_obj.vol.offset), "UDP" + ver, laddr, netw_obj.Port, "*", 0, "", netw_obj.get_owner_pid() or renderers.UnreadableValue(), netw_obj.get_owner_procname() or renderers.UnreadableValue(), netw_obj.get_create_time() or renderers.UnreadableValue(), ), ) elif isinstance(netw_obj, network._TCP_ENDPOINT): vollog.debug(f"Found _TCP_ENDPOINT @ 0x{netw_obj.vol.offset:2x}") if netw_obj.get_address_family() == network.AF_INET: proto = "TCPv4" elif netw_obj.get_address_family() == network.AF_INET6: proto = "TCPv6" else: proto = "TCPv?" try: state = netw_obj.State.description except ValueError: state = renderers.UnreadableValue() yield ( 0, ( format_hints.Hex(netw_obj.vol.offset), proto, netw_obj.get_local_address() or renderers.UnreadableValue(), netw_obj.LocalPort, netw_obj.get_remote_address() or renderers.UnreadableValue(), netw_obj.RemotePort, state, netw_obj.get_owner_pid() or renderers.UnreadableValue(), netw_obj.get_owner_procname() or renderers.UnreadableValue(), netw_obj.get_create_time() or renderers.UnreadableValue(), ), ) # check for isinstance of tcp listener last, because all other objects are inherited from here elif isinstance(netw_obj, network._TCP_LISTENER): vollog.debug(f"Found _TCP_LISTENER @ 0x{netw_obj.vol.offset:2x}") # For TcpL, the state is always listening and the remote port is zero for ver, laddr, raddr in netw_obj.dual_stack_sockets(): yield ( 0, ( format_hints.Hex(netw_obj.vol.offset), "TCP" + ver, laddr, netw_obj.Port, raddr, 0, "LISTENING", netw_obj.get_owner_pid() or renderers.UnreadableValue(), netw_obj.get_owner_procname() or renderers.UnreadableValue(), netw_obj.get_create_time() or renderers.UnreadableValue(), ), ) else: # this should not happen therefore we log it. vollog.debug( f"Found network object unsure of its type: {netw_obj} of type {type(netw_obj)}" )
[docs] def generate_timeline(self): for row in self._generator(): _depth, row_data = row # Skip network connections without creation time if not isinstance(row_data[9], datetime.datetime): continue row_data = [ ( "N/A" if isinstance(i, renderers.UnreadableValue) or isinstance(i, renderers.UnparsableValue) else i ) for i in row_data ] description = ( "Network connection: Process {} {} Local Address {}:{} " "Remote Address {}:{} State {} Protocol {} ".format( row_data[7], row_data[8], row_data[2], row_data[3], row_data[4], row_data[5], row_data[6], row_data[1], ) ) yield (description, timeliner.TimeLinerType.CREATED, row_data[9])
[docs] def run(self): show_corrupt_results = self.config.get("include-corrupt", None) return renderers.TreeGrid( [ ("Offset", format_hints.Hex), ("Proto", str), ("LocalAddr", str), ("LocalPort", int), ("ForeignAddr", str), ("ForeignPort", int), ("State", str), ("PID", int), ("Owner", str), ("Created", datetime.datetime), ], self._generator(show_corrupt_results=show_corrupt_results), )