Source code for volatility3.plugins.linux.malware.check_afinfo

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
"""A module containing a plugin that verifies the operation function
pointers of network protocols."""

import logging
from typing import List, Tuple, Generator

from volatility3.framework import exceptions, interfaces
from volatility3.framework import renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.renderers import format_hints

vollog = logging.getLogger(__name__)


[docs] class Check_afinfo(plugins.PluginInterface): """Verifies the operation function pointers of network protocols.""" _version = (1, 0, 0) _required_framework_version = (2, 0, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", description="Linux kernel", architectures=["Intel32", "Intel64"], ), ]
@classmethod def _check_members( cls, context: interfaces.context.ContextInterface, vmlinux_name: str, var_ops: interfaces.objects.ObjectInterface, var_name: str, members: List[str], ) -> Generator[Tuple[str, str, int], None, None]: """ Yields any members that are not pointing inside the kernel """ vmlinux = context.modules[vmlinux_name] for check in members: # redhat-specific garbage if check.startswith("__UNIQUE_ID_rh_kabi_hide"): continue # These structures have members like `write` and `next`, which are built in Python functions addr = var_ops.member(attr=check) # Unimplemented handlers are set to 0 if not addr: continue if len(vmlinux.get_symbols_by_absolute_location(addr)) == 0: yield var_name, check, addr @classmethod def _check_pre_4_18_ops( cls, context: interfaces.context.ContextInterface, vmlinux_name: str, var_name: str, var: interfaces.objects.ObjectInterface, op_members: List[str], seq_members: List[str], ): """ Finds the correct way to reference `op_members` """ vmlinux = context.modules[vmlinux_name] if var.has_member("seq_fops"): yield from cls._check_members( context, vmlinux_name, var.seq_fops, var_name, op_members ) # newer kernels if var.has_member("seq_ops"): yield from cls._check_members( context, vmlinux_name, var.seq_ops, var_name, seq_members ) # this is the most commonly hooked member by rootkits, so a force a check on it elif var.has_member("seq_show"): if len(vmlinux.get_symbols_by_location(var.seq_show)) == 0: yield var_name, "show", var.seq_show else: raise exceptions.VolatilityException( "_check_afinfo_pre_4_18: Unable to find sequence operations members for checking." ) @classmethod def _check_afinfo_pre_4_18( cls, context: interfaces.context.ContextInterface, vmlinux_name: str, seq_members: str, ) -> Generator[Tuple[str, str, int], None, None]: """ Checks the operations structures for network protocols of < 4.18 systems """ tcp = ("tcp_seq_afinfo", ["tcp6_seq_afinfo", "tcp4_seq_afinfo"]) udp = ( "udp_seq_afinfo", [ "udplite6_seq_afinfo", "udp6_seq_afinfo", "udplite4_seq_afinfo", "udp4_seq_afinfo", ], ) protocols = [tcp, udp] vmlinux = context.modules[vmlinux_name] op_members = vmlinux.get_type("file_operations").members # loop through all symbols for struct_type, global_vars in protocols: for global_var_name in global_vars: # this will lookup fail for the IPv6 protocols on kernels without IPv6 support try: global_var = vmlinux.object_from_symbol(global_var_name) except exceptions.SymbolError: continue yield from cls._check_pre_4_18_ops( context, vmlinux_name, global_var_name, global_var, op_members, seq_members, ) @classmethod def _check_afinfo_post_4_18( cls, context: interfaces.context.ContextInterface, vmlinux_name: str, seq_members: str, ) -> Generator[Tuple[str, str, int], None, None]: """ Checks the operations structures for network protocols of >= 4.18 systems """ vmlinux = context.modules[vmlinux_name] ops_structs = [ "raw_seq_ops", "udp_seq_ops", "arp_seq_ops", "unix_seq_ops", "udp6_seq_ops", "raw6_seq_ops", "tcp_seq_ops", "tcp4_seq_ops", "tcp6_seq_ops", "packet_seq_ops", ] for protocol_ops_var in ops_structs: # These will fail if the particular kernel doesn't have support for a protocol like IPv6 try: protocol_ops = vmlinux.object_from_symbol(protocol_ops_var) except exceptions.SymbolError: continue yield from cls._check_members( context, vmlinux_name, protocol_ops, protocol_ops_var, seq_members )
[docs] @classmethod def check_afinfo( cls, context: interfaces.context.ContextInterface, vmlinux_name ) -> Generator[Tuple[str, str, int], None, None]: """ Walks the network protocol operations structures for common network protocols. Reports any initialized operations members that do not point inside the kernel. """ vmlinux = context.modules[vmlinux_name] type_check = vmlinux.get_type("tcp_seq_afinfo") if type_check.has_member("seq_fops"): checker = cls._check_afinfo_pre_4_18 else: checker = cls._check_afinfo_post_4_18 seq_members = vmlinux.get_type("seq_operations").members yield from checker(context, vmlinux_name, seq_members)
def _generator(self): """ A simple wrapper around `check_afino` """ for name, member, address in self.check_afinfo( self.context, self.config["kernel"] ): yield 0, (name, member, format_hints.Hex(address))
[docs] def run(self): return renderers.TreeGrid( [ ("Symbol Name", str), ("Member", str), ("Handler Address", format_hints.Hex), ], self._generator(), )