# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
"""A module containing a plugin that verifies the operation function
pointers of network protocols."""
import logging
from typing import List, Tuple, Generator
from volatility3.framework import exceptions, interfaces
from volatility3.framework import renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.renderers import format_hints
vollog = logging.getLogger(__name__)
[docs]
class Check_afinfo(plugins.PluginInterface):
"""Verifies the operation function pointers of network protocols."""
_version = (1, 0, 0)
_required_framework_version = (2, 0, 0)
[docs]
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=["Intel32", "Intel64"],
),
]
@classmethod
def _check_members(
cls,
context: interfaces.context.ContextInterface,
vmlinux_name: str,
var_ops: interfaces.objects.ObjectInterface,
var_name: str,
members: List[str],
) -> Generator[Tuple[str, str, int], None, None]:
"""
Yields any members that are not pointing inside the kernel
"""
vmlinux = context.modules[vmlinux_name]
for check in members:
# redhat-specific garbage
if check.startswith("__UNIQUE_ID_rh_kabi_hide"):
continue
# These structures have members like `write` and `next`, which are built in Python functions
addr = var_ops.member(attr=check)
# Unimplemented handlers are set to 0
if not addr:
continue
if len(vmlinux.get_symbols_by_absolute_location(addr)) == 0:
yield var_name, check, addr
@classmethod
def _check_pre_4_18_ops(
cls,
context: interfaces.context.ContextInterface,
vmlinux_name: str,
var_name: str,
var: interfaces.objects.ObjectInterface,
op_members: List[str],
seq_members: List[str],
):
"""
Finds the correct way to reference `op_members`
"""
vmlinux = context.modules[vmlinux_name]
if var.has_member("seq_fops"):
yield from cls._check_members(
context, vmlinux_name, var.seq_fops, var_name, op_members
)
# newer kernels
if var.has_member("seq_ops"):
yield from cls._check_members(
context, vmlinux_name, var.seq_ops, var_name, seq_members
)
# this is the most commonly hooked member by rootkits, so a force a check on it
elif var.has_member("seq_show"):
if len(vmlinux.get_symbols_by_location(var.seq_show)) == 0:
yield var_name, "show", var.seq_show
else:
raise exceptions.VolatilityException(
"_check_afinfo_pre_4_18: Unable to find sequence operations members for checking."
)
@classmethod
def _check_afinfo_pre_4_18(
cls,
context: interfaces.context.ContextInterface,
vmlinux_name: str,
seq_members: str,
) -> Generator[Tuple[str, str, int], None, None]:
"""
Checks the operations structures for network protocols of < 4.18 systems
"""
tcp = ("tcp_seq_afinfo", ["tcp6_seq_afinfo", "tcp4_seq_afinfo"])
udp = (
"udp_seq_afinfo",
[
"udplite6_seq_afinfo",
"udp6_seq_afinfo",
"udplite4_seq_afinfo",
"udp4_seq_afinfo",
],
)
protocols = [tcp, udp]
vmlinux = context.modules[vmlinux_name]
op_members = vmlinux.get_type("file_operations").members
# loop through all symbols
for struct_type, global_vars in protocols:
for global_var_name in global_vars:
# this will lookup fail for the IPv6 protocols on kernels without IPv6 support
try:
global_var = vmlinux.object_from_symbol(global_var_name)
except exceptions.SymbolError:
continue
yield from cls._check_pre_4_18_ops(
context,
vmlinux_name,
global_var_name,
global_var,
op_members,
seq_members,
)
@classmethod
def _check_afinfo_post_4_18(
cls,
context: interfaces.context.ContextInterface,
vmlinux_name: str,
seq_members: str,
) -> Generator[Tuple[str, str, int], None, None]:
"""
Checks the operations structures for network protocols of >= 4.18 systems
"""
vmlinux = context.modules[vmlinux_name]
ops_structs = [
"raw_seq_ops",
"udp_seq_ops",
"arp_seq_ops",
"unix_seq_ops",
"udp6_seq_ops",
"raw6_seq_ops",
"tcp_seq_ops",
"tcp4_seq_ops",
"tcp6_seq_ops",
"packet_seq_ops",
]
for protocol_ops_var in ops_structs:
# These will fail if the particular kernel doesn't have support for a protocol like IPv6
try:
protocol_ops = vmlinux.object_from_symbol(protocol_ops_var)
except exceptions.SymbolError:
continue
yield from cls._check_members(
context, vmlinux_name, protocol_ops, protocol_ops_var, seq_members
)
[docs]
@classmethod
def check_afinfo(
cls, context: interfaces.context.ContextInterface, vmlinux_name
) -> Generator[Tuple[str, str, int], None, None]:
"""
Walks the network protocol operations structures for common network protocols.
Reports any initialized operations members that do not point inside the kernel.
"""
vmlinux = context.modules[vmlinux_name]
type_check = vmlinux.get_type("tcp_seq_afinfo")
if type_check.has_member("seq_fops"):
checker = cls._check_afinfo_pre_4_18
else:
checker = cls._check_afinfo_post_4_18
seq_members = vmlinux.get_type("seq_operations").members
yield from checker(context, vmlinux_name, seq_members)
def _generator(self):
"""
A simple wrapper around `check_afino`
"""
for name, member, address in self.check_afinfo(
self.context, self.config["kernel"]
):
yield 0, (name, member, format_hints.Hex(address))
[docs]
def run(self):
return renderers.TreeGrid(
[
("Symbol Name", str),
("Member", str),
("Handler Address", format_hints.Hex),
],
self._generator(),
)