Source code for volatility3.framework.symbols.windows.extensions.network

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
import socket
from typing import Dict, Tuple, List, Union

from volatility3.framework import exceptions
from volatility3.framework import objects, interfaces
from volatility3.framework.objects import Array
from volatility3.framework.renderers import conversion

vollog = logging.getLogger(__name__)


[docs]def inet_ntop(address_family: int, packed_ip: Union[List[int], Array]) -> str: if address_family in [socket.AF_INET6, socket.AF_INET]: try: return socket.inet_ntop(address_family, bytes(packed_ip)) except AttributeError: raise RuntimeError( "This version of python does not have socket.inet_ntop, please upgrade" ) raise socket.error("[Errno 97] Address family not supported by protocol")
# Python's socket.AF_INET6 is 0x1e but Microsoft defines it # as a constant value of 0x17 in their source code. Thus we # need Microsoft's since that's what is found in memory. AF_INET = 2 AF_INET6 = 0x17 # String representations of INADDR_ANY and INADDR6_ANY inaddr_any = inet_ntop(socket.AF_INET, [0] * 4) inaddr6_any = inet_ntop(socket.AF_INET6, [0] * 16) class _TCP_LISTENER(objects.StructType): """Class for objects found in TcpL pools. This class serves as a base class for all pooled network objects. It exposes some functions which return sanity-checked members. Substructures referred to by a pointer may appear valid at first glance but will throw an InvalidAddressException on access. This is not a problem when objects are validated via their `is_valid()` method, but when scanning for semi-corrupted data this check will not be performed. Be mindful that most of those methods return `None` when they would access invalid data. If you want to process the raw data access the attributes directly, e.g. via `network_object.InetAF` instead of `network_object.get_address_family()`. """ MIN_CREATETIME_YEAR = 1950 MAX_CREATETIME_YEAR = 2200 def __init__( self, context: interfaces.context.ContextInterface, type_name: str, object_info: interfaces.objects.ObjectInformation, size: int, members: Dict[str, Tuple[int, interfaces.objects.Template]], ) -> None: super().__init__( context=context, type_name=type_name, object_info=object_info, size=size, members=members, ) def get_address_family(self): try: return self.InetAF.dereference().AddressFamily except exceptions.InvalidAddressException: return None def get_owner(self): try: return self.member("Owner").dereference() except exceptions.InvalidAddressException: return None def get_owner_pid(self): if self.get_owner().is_valid(): if self.get_owner().has_valid_member("UniqueProcessId"): return self.get_owner().UniqueProcessId return None def get_owner_procname(self): if self.get_owner().is_valid(): if self.get_owner().has_valid_member("ImageFileName"): return self.get_owner().ImageFileName.cast( "string", max_length=self.get_owner().ImageFileName.vol.count, errors="replace", ) return None def get_create_time(self): dt_obj = conversion.wintime_to_datetime(self.CreateTime.QuadPart) if isinstance(dt_obj, interfaces.renderers.BaseAbsentValue): return dt_obj # return None if the timestamp seems invalid if not (self.MIN_CREATETIME_YEAR < dt_obj.year < self.MAX_CREATETIME_YEAR): return None else: return dt_obj def get_in_addr(self): try: local_addr = self.LocalAddr.dereference() # there is a rare edge case here we have to consider: # if the struct has a null pointer at the LocalAddr offset, # this generally means this struct has no associated local address. # however, sometimes a pointer to the offset of 0 can be valid because # it points to a valid virtual memory address of 0. this confuses this # plugin because trying to access the nullpointer does not raise any # errors, leading to errors later down the line when accessing the # pointed-to _IN_ADDR addr4/6 attributes. # addr4/6 are at the same offset, accessing the first byte covers both. # if this causes no error, we can expect a valid network addr. _ = local_addr.pData.dereference().addr4[0] if local_addr.pData.dereference(): inaddr = local_addr.inaddr return inaddr else: return None except exceptions.InvalidAddressException: return None def dual_stack_sockets(self): """Handle Windows dual-stack sockets""" # If this pointer is valid, the socket is bound to # a specific IP address. Otherwise, the socket is # listening on all IP addresses of the address family. # Note the remote address is always INADDR_ANY or # INADDR6_ANY for sockets. The moment a client # connects to the listener, a TCP_ENDPOINT is created # and that structure contains the remote address. inaddr = self.get_in_addr() if inaddr: if self.get_address_family() == AF_INET: yield "v4", inet_ntop(socket.AF_INET, inaddr.addr4), inaddr_any elif self.get_address_family() == AF_INET6: yield "v6", inet_ntop(socket.AF_INET6, inaddr.addr6), inaddr6_any else: yield "v4", inaddr_any, inaddr_any if self.get_address_family() == AF_INET6: yield "v6", inaddr6_any, inaddr6_any def is_valid(self): try: if not self.get_address_family() in (AF_INET, AF_INET6): vollog.debug( "netw obj 0x{:x} invalid due to invalid address_family {}".format( self.vol.offset, self.get_address_family() ) ) return False except exceptions.InvalidAddressException: vollog.debug( f"netw obj 0x{self.vol.offset:x} invalid due to invalid address access" ) return False return True class _TCP_ENDPOINT(_TCP_LISTENER): """Class for objects found in TcpE pools""" def _ipv4_or_ipv6(self, inaddr): if self.get_address_family() == AF_INET: return inet_ntop(socket.AF_INET, inaddr.addr4) else: return inet_ntop(socket.AF_INET6, inaddr.addr6) def get_local_address(self): try: inaddr = self.AddrInfo.dereference().Local.pData.dereference().dereference() return self._ipv4_or_ipv6(inaddr) except exceptions.InvalidAddressException: return None def get_remote_address(self): try: inaddr = self.AddrInfo.dereference().Remote.dereference() return self._ipv4_or_ipv6(inaddr) except exceptions.InvalidAddressException: return None def is_valid(self): if self.State not in self.State.choices.values(): vollog.debug( f"{type(self)} 0x{self.vol.offset:x} invalid due to invalid tcp state {self.State}" ) return False try: if self.get_address_family() not in (AF_INET, AF_INET6): vollog.debug( f"{type(self)} 0x{self.vol.offset:x} invalid due to invalid address_family {self.get_address_family()}" ) return False if not self.get_local_address() and ( not self.get_owner() or self.get_owner().UniqueProcessId == 0 or self.get_owner().UniqueProcessId > 65535 ): vollog.debug( f"{type(self)} 0x{self.vol.offset:x} invalid due to invalid owner data" ) return False except exceptions.InvalidAddressException: vollog.debug( f"{type(self)} 0x{self.vol.offset:x} invalid due to invalid address access" ) return False return True class _UDP_ENDPOINT(_TCP_LISTENER): """Class for objects found in UdpA pools""" class _LOCAL_ADDRESS(objects.StructType): @property def inaddr(self): return self.pData.dereference().dereference() class _LOCAL_ADDRESS_WIN10_UDP(objects.StructType): @property def inaddr(self): return self.pData.dereference() win10_x64_class_types = { "_TCP_ENDPOINT": _TCP_ENDPOINT, "_TCP_LISTENER": _TCP_LISTENER, "_UDP_ENDPOINT": _UDP_ENDPOINT, "_LOCAL_ADDRESS": _LOCAL_ADDRESS, "_LOCAL_ADDRESS_WIN10_UDP": _LOCAL_ADDRESS_WIN10_UDP, } class_types = { "_TCP_ENDPOINT": _TCP_ENDPOINT, "_TCP_LISTENER": _TCP_LISTENER, "_UDP_ENDPOINT": _UDP_ENDPOINT, "_LOCAL_ADDRESS": _LOCAL_ADDRESS, }