# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import datetime
import logging
from typing import Iterable, Optional, Generator, Tuple
from volatility3.framework import constants, exceptions, interfaces, renderers, symbols
from volatility3.framework.configuration import requirements
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols.windows import pdbutil
from volatility3.framework.symbols.windows.extensions import network
from volatility3.plugins import timeliner
from volatility3.plugins.windows import netscan, modules, info, verinfo
vollog = logging.getLogger(__name__)
[docs]class NetStat(interfaces.plugins.PluginInterface, timeliner.TimeLinerInterface):
"""Traverses network tracking structures present in a particular windows memory image."""
_required_framework_version = (2, 0, 0)
_version = (1, 0, 0)
[docs] @classmethod
def get_requirements(cls):
return [
requirements.ModuleRequirement(
name="kernel",
description="Windows kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="netscan", component=netscan.NetScan, version=(1, 0, 0)
),
requirements.VersionRequirement(
name="modules", component=modules.Modules, version=(1, 0, 0)
),
requirements.VersionRequirement(
name="pdbutil", component=pdbutil.PDBUtility, version=(1, 0, 0)
),
requirements.VersionRequirement(
name="info", component=info.Info, version=(1, 0, 0)
),
requirements.VersionRequirement(
name="verinfo", component=verinfo.VerInfo, version=(1, 0, 0)
),
requirements.BooleanRequirement(
name="include-corrupt",
description="Radically eases result validation. This will show partially overwritten data. WARNING: the results are likely to include garbage and/or corrupt data. Be cautious!",
default=False,
optional=True,
),
]
@classmethod
def _decode_pointer(cls, value):
"""Copied from `windows.handles`.
Windows encodes pointers to objects and decodes them on the fly
before using them.
This function mimics the decoding routine so we can generate the
proper pointer values as well.
"""
value = value & 0xFFFFFFFFFFFFFFFC
return value
[docs] @classmethod
def read_pointer(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
offset: int,
length: int,
) -> int:
"""Reads a pointer at a given offset and returns the address it points to.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
offset: Offset of pointer
length: Pointer length
Returns:
The value the pointer points to.
"""
return int.from_bytes(context.layers[layer_name].read(offset, length), "little")
[docs] @classmethod
def parse_bitmap(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
bitmap_offset: int,
bitmap_size_in_byte: int,
) -> list:
"""Parses a given bitmap and looks for each occurrence of a 1.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
bitmap_offset: Start address of bitmap
bitmap_size_in_byte: Bitmap size in Byte, not in bit.
Returns:
The list of indices at which a 1 was found.
"""
ret = []
for idx in range(bitmap_size_in_byte):
current_byte = context.layers[layer_name].read(bitmap_offset + idx, 1)[0]
current_offs = idx * 8
for bit in range(8):
if current_byte & (1 << bit) != 0:
ret.append(bit + current_offs)
return ret
[docs] @classmethod
def enumerate_structures_by_port(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
net_symbol_table: str,
port: int,
port_pool_addr: int,
proto="tcp",
) -> Iterable[interfaces.objects.ObjectInterface]:
"""Lists all UDP Endpoints and TCP Listeners by parsing UdpPortPool and TcpPortPool.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
net_symbol_table: The name of the table containing the tcpip types
port: Current port as integer to lookup the associated object.
port_pool_addr: Address of port pool object
proto: Either "tcp" or "udp" to decide which types to use.
Returns:
The list of network objects from this image's TCP and UDP `PortPools`
"""
if proto == "tcp":
obj_name = net_symbol_table + constants.BANG + "_TCP_LISTENER"
ptr_offset = context.symbol_space.get_type(obj_name).relative_child_offset(
"Next"
)
elif proto == "udp":
obj_name = net_symbol_table + constants.BANG + "_UDP_ENDPOINT"
ptr_offset = context.symbol_space.get_type(obj_name).relative_child_offset(
"Next"
)
else:
# invalid argument.
return None
vollog.debug(f"Current Port: {port}")
# the given port serves as a shifted index into the port pool lists
list_index = port >> 8
truncated_port = port & 0xFF
# constructing port_pool object here so callers don't have to
port_pool = context.object(
net_symbol_table + constants.BANG + "_INET_PORT_POOL",
layer_name=layer_name,
offset=port_pool_addr,
)
# first, grab the given port's PortAssignment (`_PORT_ASSIGNMENT`)
inpa = port_pool.PortAssignments[list_index]
# then parse the port assignment list (`_PORT_ASSIGNMENT_LIST`) and grab the correct entry
assignment = inpa.InPaBigPoolBase.Assignments[truncated_port]
if not assignment:
return None
# the value within assignment.Entry is a) masked and b) points inside of the network object
# first decode the pointer
netw_inside = cls._decode_pointer(assignment.Entry)
if netw_inside:
# if the value is valid, calculate the actual object address by subtracting the offset
curr_obj = context.object(
obj_name, layer_name=layer_name, offset=netw_inside - ptr_offset
)
yield curr_obj
# if the same port is used on different interfaces multiple objects are created
# those can be found by following the pointer within the object's `Next` field until it is empty
while curr_obj.Next:
curr_obj = context.object(
obj_name,
layer_name=layer_name,
offset=cls._decode_pointer(curr_obj.Next) - ptr_offset,
)
yield curr_obj
[docs] @classmethod
def get_tcpip_module(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
nt_symbols: str,
) -> Optional[interfaces.objects.ObjectInterface]:
"""Uses `windows.modules` to find tcpip.sys in memory.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
nt_symbols: The name of the table containing the kernel symbols
Returns:
The constructed tcpip.sys module object.
"""
for mod in modules.Modules.list_modules(context, layer_name, nt_symbols):
if mod.BaseDllName.get_string() == "tcpip.sys":
vollog.debug(f"Found tcpip.sys image base @ 0x{mod.DllBase:x}")
return mod
return None
[docs] @classmethod
def parse_hashtable(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
ht_offset: int,
ht_length: int,
alignment: int,
net_symbol_table: str,
) -> Generator[interfaces.objects.ObjectInterface, None, None]:
"""Parses a hashtable quick and dirty.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
ht_offset: Beginning of the hash table
ht_length: Length of the hash table
Returns:
The hash table entries which are _not_ empty
"""
# we are looking for entries whose values are not their own address
for index in range(ht_length):
current_addr = ht_offset + index * alignment
current_pointer = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=current_addr,
)
# check if addr of pointer is equal to the value pointed to
if current_pointer.vol.offset == current_pointer:
continue
yield current_pointer
[docs] @classmethod
def parse_partitions(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
net_symbol_table: str,
tcpip_symbol_table: str,
tcpip_module_offset: int,
) -> Iterable[interfaces.objects.ObjectInterface]:
"""Parses tcpip.sys's PartitionTable containing established TCP connections.
The amount of Partition depends on the value of the symbol `PartitionCount` and correlates with
the maximum processor count (refer to Art of Memory Forensics, chapter 11).
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
net_symbol_table: The name of the table containing the tcpip types
tcpip_symbol_table: The name of the table containing the tcpip driver symbols
tcpip_module_offset: The offset of the tcpip module
Returns:
The list of TCP endpoint objects from the `layer_name` layer's `PartitionTable`
"""
if symbols.symbol_table_is_64bit(context, net_symbol_table):
alignment = 0x10
else:
alignment = 8
obj_name = net_symbol_table + constants.BANG + "_TCP_ENDPOINT"
# part_table_symbol is the offset within tcpip.sys which contains the address of the partition table itself
part_table_symbol = context.symbol_space.get_symbol(
tcpip_symbol_table + constants.BANG + "PartitionTable"
).address
part_count_symbol = context.symbol_space.get_symbol(
tcpip_symbol_table + constants.BANG + "PartitionCount"
).address
part_table_addr = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=tcpip_module_offset + part_table_symbol,
)
# part_table is the actual partition table offset and consists out of a dynamic amount of _PARTITION objects
part_table = context.object(
net_symbol_table + constants.BANG + "_PARTITION_TABLE",
layer_name=layer_name,
offset=part_table_addr,
)
part_count = int.from_bytes(
context.layers[layer_name].read(tcpip_module_offset + part_count_symbol, 1),
"little",
)
part_table.Partitions.count = part_count
vollog.debug(
"Found TCP connection PartitionTable @ 0x{:x} (partition count: {})".format(
part_table_addr, part_count
)
)
entry_offset = context.symbol_space.get_type(obj_name).relative_child_offset(
"ListEntry"
)
for ctr, partition in enumerate(part_table.Partitions):
vollog.debug(f"Parsing partition {ctr}")
if partition.Endpoints.NumEntries > 0:
for endpoint_entry in cls.parse_hashtable(
context,
layer_name,
partition.Endpoints.Directory,
partition.Endpoints.TableSize,
alignment,
net_symbol_table,
):
endpoint = context.object(
obj_name,
layer_name=layer_name,
offset=endpoint_entry - entry_offset,
)
yield endpoint
[docs] @classmethod
def create_tcpip_symbol_table(
cls,
context: interfaces.context.ContextInterface,
config_path: str,
layer_name: str,
tcpip_module_offset: int,
tcpip_module_size: int,
) -> str:
"""DEPRECATED: Use PDBUtility.symbol_table_from_pdb instead
Creates symbol table for the current image's tcpip.sys driver.
Searches the memory section of the loaded tcpip.sys module for its PDB GUID
and loads the associated symbol table into the symbol space.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
config_path: The config path where to find symbol files
layer_name: The name of the layer on which to operate
tcpip_module_offset: This memory dump's tcpip.sys image offset
tcpip_module_size: The size of `tcpip.sys` for this dump
Returns:
The name of the constructed and loaded symbol table
"""
vollog.debug(
"Deprecation: This plugin uses netstat.create_tcpip_symbol_table instead of PDBUtility.symbol_table_from_pdb"
)
return pdbutil.PDBUtility.symbol_table_from_pdb(
context,
interfaces.configuration.path_join(config_path, "tcpip"),
layer_name,
"tcpip.pdb",
tcpip_module_offset,
tcpip_module_size,
)
[docs] @classmethod
def find_port_pools(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
net_symbol_table: str,
tcpip_symbol_table: str,
tcpip_module_offset: int,
) -> Tuple[int, int]:
"""Finds the given image's port pools. Older Windows versions (presumably < Win10 build 14251) use driver
symbols called `UdpPortPool` and `TcpPortPool` which point towards the pools.
Newer Windows versions use `UdpCompartmentSet` and `TcpCompartmentSet`, which we first have to translate into
the port pool address. See also: http://redplait.blogspot.com/2016/06/tcpip-port-pools-in-fresh-windows-10.html
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
net_symbol_table: The name of the table containing the tcpip types
tcpip_module_offset: This memory dump's tcpip.sys image offset
tcpip_symbol_table: The name of the table containing the tcpip driver symbols
Returns:
The tuple containing the address of the UDP and TCP port pool respectively.
"""
if "UdpPortPool" in context.symbol_space[tcpip_symbol_table].symbols:
# older Windows versions
upp_symbol = context.symbol_space.get_symbol(
tcpip_symbol_table + constants.BANG + "UdpPortPool"
).address
upp_addr = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=tcpip_module_offset + upp_symbol,
)
tpp_symbol = context.symbol_space.get_symbol(
tcpip_symbol_table + constants.BANG + "TcpPortPool"
).address
tpp_addr = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=tcpip_module_offset + tpp_symbol,
)
elif "UdpCompartmentSet" in context.symbol_space[tcpip_symbol_table].symbols:
# newer Windows versions since 10.14xxx
ucs = context.symbol_space.get_symbol(
tcpip_symbol_table + constants.BANG + "UdpCompartmentSet"
).address
tcs = context.symbol_space.get_symbol(
tcpip_symbol_table + constants.BANG + "TcpCompartmentSet"
).address
ucs_offset = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=tcpip_module_offset + ucs,
)
tcs_offset = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=tcpip_module_offset + tcs,
)
ucs_obj = context.object(
net_symbol_table + constants.BANG + "_INET_COMPARTMENT_SET",
layer_name=layer_name,
offset=ucs_offset,
)
upp_addr = ucs_obj.InetCompartment.ProtocolCompartment.PortPool
tcs_obj = context.object(
net_symbol_table + constants.BANG + "_INET_COMPARTMENT_SET",
layer_name=layer_name,
offset=tcs_offset,
)
tpp_addr = tcs_obj.InetCompartment.ProtocolCompartment.PortPool
else:
# this branch should not be reached.
raise exceptions.SymbolError(
"UdpPortPool",
tcpip_symbol_table,
f"Neither UdpPortPool nor UdpCompartmentSet found in {tcpip_symbol_table} table",
)
vollog.debug(f"Found PortPools @ 0x{upp_addr:x} (UDP) && 0x{tpp_addr:x} (TCP)")
return upp_addr, tpp_addr
[docs] @classmethod
def list_sockets(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
nt_symbols: str,
net_symbol_table: str,
tcpip_module_offset: int,
tcpip_symbol_table: str,
) -> Iterable[interfaces.objects.ObjectInterface]:
"""Lists all UDP Endpoints, TCP Listeners and TCP Endpoints in the primary layer that
are in tcpip.sys's UdpPortPool, TcpPortPool and TCP Endpoint partition table, respectively.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
nt_symbols: The name of the table containing the kernel symbols
net_symbol_table: The name of the table containing the tcpip types
tcpip_module_offset: Offset of `tcpip.sys`'s PE image in memory
tcpip_symbol_table: The name of the table containing the tcpip driver symbols
Returns:
The list of network objects from the `layer_name` layer's `PartitionTable` and `PortPools`
"""
# first, TCP endpoints by parsing the partition table
for endpoint in cls.parse_partitions(
context,
layer_name,
net_symbol_table,
tcpip_symbol_table,
tcpip_module_offset,
):
yield endpoint
# then, towards the UDP and TCP port pools
# first, find their addresses
upp_addr, tpp_addr = cls.find_port_pools(
context,
layer_name,
net_symbol_table,
tcpip_symbol_table,
tcpip_module_offset,
)
# create port pool objects at the detected address and parse the port bitmap
upp_obj = context.object(
net_symbol_table + constants.BANG + "_INET_PORT_POOL",
layer_name=layer_name,
offset=upp_addr,
)
udpa_ports = cls.parse_bitmap(
context,
layer_name,
upp_obj.PortBitMap.Buffer,
upp_obj.PortBitMap.SizeOfBitMap // 8,
)
tpp_obj = context.object(
net_symbol_table + constants.BANG + "_INET_PORT_POOL",
layer_name=layer_name,
offset=tpp_addr,
)
tcpl_ports = cls.parse_bitmap(
context,
layer_name,
tpp_obj.PortBitMap.Buffer,
tpp_obj.PortBitMap.SizeOfBitMap // 8,
)
vollog.debug(f"Found TCP Ports: {tcpl_ports}")
vollog.debug(f"Found UDP Ports: {udpa_ports}")
# given the list of TCP / UDP ports, calculate the address of their respective objects and yield them.
for port in tcpl_ports:
# port value can be 0, which we can skip
if not port:
continue
for obj in cls.enumerate_structures_by_port(
context, layer_name, net_symbol_table, port, tpp_addr, "tcp"
):
yield obj
for port in udpa_ports:
# same as above, skip port 0
if not port:
continue
for obj in cls.enumerate_structures_by_port(
context, layer_name, net_symbol_table, port, upp_addr, "udp"
):
yield obj
def _generator(self, show_corrupt_results: Optional[bool] = None):
"""Generates the network objects for use in rendering."""
kernel = self.context.modules[self.config["kernel"]]
netscan_symbol_table = netscan.NetScan.create_netscan_symbol_table(
self.context, kernel.layer_name, kernel.symbol_table_name, self.config_path
)
tcpip_module = self.get_tcpip_module(
self.context, kernel.layer_name, kernel.symbol_table_name
)
if not tcpip_module:
vollog.error("Unable to locate symbols for the memory image's tcpip module")
try:
tcpip_symbol_table = pdbutil.PDBUtility.symbol_table_from_pdb(
self.context,
interfaces.configuration.path_join(self.config_path, "tcpip"),
kernel.layer_name,
"tcpip.pdb",
tcpip_module.DllBase,
tcpip_module.SizeOfImage,
)
except exceptions.VolatilityException:
vollog.error("Unable to locate symbols for the memory image's tcpip module")
for netw_obj in self.list_sockets(
self.context,
kernel.layer_name,
kernel.symbol_table_name,
netscan_symbol_table,
tcpip_module.DllBase,
tcpip_symbol_table,
):
# objects passed pool header constraints. check for additional constraints if strict flag is set.
if not show_corrupt_results and not netw_obj.is_valid():
continue
if isinstance(netw_obj, network._UDP_ENDPOINT):
vollog.debug(f"Found UDP_ENDPOINT @ 0x{netw_obj.vol.offset:2x}")
# For UdpA, the state is always blank and the remote end is asterisks
for ver, laddr, _ in netw_obj.dual_stack_sockets():
yield (
0,
(
format_hints.Hex(netw_obj.vol.offset),
"UDP" + ver,
laddr,
netw_obj.Port,
"*",
0,
"",
netw_obj.get_owner_pid() or renderers.UnreadableValue(),
netw_obj.get_owner_procname()
or renderers.UnreadableValue(),
netw_obj.get_create_time() or renderers.UnreadableValue(),
),
)
elif isinstance(netw_obj, network._TCP_ENDPOINT):
vollog.debug(f"Found _TCP_ENDPOINT @ 0x{netw_obj.vol.offset:2x}")
if netw_obj.get_address_family() == network.AF_INET:
proto = "TCPv4"
elif netw_obj.get_address_family() == network.AF_INET6:
proto = "TCPv6"
else:
vollog.debug(
"TCP Endpoint @ 0x{:2x} has unknown address family 0x{:x}".format(
netw_obj.vol.offset, netw_obj.get_address_family()
)
)
proto = "TCPv?"
try:
state = netw_obj.State.description
except ValueError:
state = renderers.UnreadableValue()
yield (
0,
(
format_hints.Hex(netw_obj.vol.offset),
proto,
netw_obj.get_local_address() or renderers.UnreadableValue(),
netw_obj.LocalPort,
netw_obj.get_remote_address() or renderers.UnreadableValue(),
netw_obj.RemotePort,
state,
netw_obj.get_owner_pid() or renderers.UnreadableValue(),
netw_obj.get_owner_procname() or renderers.UnreadableValue(),
netw_obj.get_create_time() or renderers.UnreadableValue(),
),
)
# check for isinstance of tcp listener last, because all other objects are inherited from here
elif isinstance(netw_obj, network._TCP_LISTENER):
vollog.debug(f"Found _TCP_LISTENER @ 0x{netw_obj.vol.offset:2x}")
# For TcpL, the state is always listening and the remote port is zero
for ver, laddr, raddr in netw_obj.dual_stack_sockets():
yield (
0,
(
format_hints.Hex(netw_obj.vol.offset),
"TCP" + ver,
laddr,
netw_obj.Port,
raddr,
0,
"LISTENING",
netw_obj.get_owner_pid() or renderers.UnreadableValue(),
netw_obj.get_owner_procname()
or renderers.UnreadableValue(),
netw_obj.get_create_time() or renderers.UnreadableValue(),
),
)
else:
# this should not happen therefore we log it.
vollog.debug(
f"Found network object unsure of its type: {netw_obj} of type {type(netw_obj)}"
)
[docs] def generate_timeline(self):
for row in self._generator():
_depth, row_data = row
row_dict = {}
(
row_dict["Offset"],
row_dict["Proto"],
row_dict["LocalAddr"],
row_dict["LocalPort"],
row_dict["ForeignAddr"],
row_dict["ForeignPort"],
row_dict["State"],
row_dict["PID"],
row_dict["Owner"],
row_dict["Created"],
) = row_data
# Skip network connections without creation time
if not isinstance(row_dict["Created"], datetime.datetime):
continue
description = (
"Network connection: Process {} {} Local Address {}:{} "
"Remote Address {}:{} State {} Protocol {} ".format(
row_dict["PID"],
row_dict["Owner"],
row_dict["LocalAddr"],
row_dict["LocalPort"],
row_dict["ForeignAddr"],
row_dict["ForeignPort"],
row_dict["State"],
row_dict["Proto"],
)
)
yield (description, timeliner.TimeLinerType.CREATED, row_dict["Created"])
[docs] def run(self):
show_corrupt_results = self.config.get("include-corrupt", None)
return renderers.TreeGrid(
[
("Offset", format_hints.Hex),
("Proto", str),
("LocalAddr", str),
("LocalPort", int),
("ForeignAddr", str),
("ForeignPort", int),
("State", str),
("PID", int),
("Owner", str),
("Created", datetime.datetime),
],
self._generator(show_corrupt_results=show_corrupt_results),
)