Source code for volatility3.framework.symbols.linux.utilities.modules
import logging
import warnings
import functools
import struct
from abc import ABCMeta, abstractmethod
from typing import (
Callable,
Dict,
Generator,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Set,
Tuple,
Union,
)
from volatility3 import framework
from volatility3.framework import (
constants,
deprecation,
exceptions,
interfaces,
objects,
renderers,
symbols,
)
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.framework.symbols.linux import extensions
from volatility3.framework.symbols.linux.utilities import tainting
from volatility3.framework.constants import linux as linux_constants
vollog = logging.getLogger(__name__)
[docs]
class ModuleInfo(NamedTuple):
"""
Used to track the name and boundary of a kernel module
"""
offset: int
name: str
start: int
end: int
[docs]
class ModuleGathererInterface(
interfaces.configuration.VersionableInterface, metaclass=ABCMeta
):
_version = (1, 0, 0)
_required_framework_version = (2, 0, 0)
framework.require_interface_version(*_required_framework_version)
gatherer_return_type = Generator[Union[ModuleInfo, "extensions.module"], None, None]
# Must be set to a unique, descriptive name of the gathering technique or data structure source
name = None
[docs]
@classmethod
@abstractmethod
def gather_modules(
cls, context: interfaces.context.ContextInterface, kernel_module_name: str
) -> gatherer_return_type:
"""
This method must return a generator (yield) of each `gatherer_return_type` found from its source
"""
[docs]
class Modules(interfaces.configuration.VersionableInterface):
"""Kernel modules related utilities."""
_version = (3, 0, 2)
_required_framework_version = (2, 0, 0)
framework.require_interface_version(*_required_framework_version)
[docs]
@classmethod
def module_lookup_by_address(
cls,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
modules: Iterable[ModuleInfo],
target_address: int,
) -> Optional[Tuple[ModuleInfo, Optional[str]]]:
"""
Determine if a target address lies in a module memory space.
Returns the module where the provided address lies.
`modules` must be non-empty and contain masked addresses via `get_module_info_for_module` or
a ValueError will be thrown
Args:
context: The context on which to operate
layer_name: The name of the layer on which to operate
modules: An iterable containing the modules to match the address against
target_address: The address to check for a match
Returns:
The first memory module in which the address fits and the symbol name for `target_address`
Kernel documentation:
"within_module" and "within_module_mem_type" functions
"""
kernel = context.modules[kernel_module_name]
kernel_layer = context.layers[kernel.layer_name]
if not modules:
raise ValueError("Empty list sent to `module_lookup_by_address`")
matches = []
for module in modules:
if module.start != module.start & kernel_layer.address_mask:
raise ValueError(
"Modules list must be gathered from `run_modules_scanners` to be used in this function"
)
if module.start <= target_address < module.end:
matches.append(module)
if len(matches) >= 1:
if len(matches) > 1:
warnings.warn(
f"Address {hex(target_address)} fits in modules at {[hex(module.start) for module in matches]}, indicating potential modules memory space overlap. The first matching entry {matches[0].name} will be used",
UserWarning,
)
symbol_name = None
match = matches[0]
if match.name == constants.linux.KERNEL_NAME:
symbols = list(kernel.get_symbols_by_absolute_location(target_address))
if len(symbols):
symbol_name = symbols[0]
else:
module = kernel.object("module", offset=module.offset, absolute=True)
symbol_name = module.get_symbol_by_address(target_address)
if symbol_name and symbol_name.find(constants.BANG) != -1:
symbol_name = symbol_name.split(constants.BANG)[1]
return match, symbol_name
return None, None
[docs]
@classmethod
@deprecation.method_being_removed(
removal_date="2025-09-25",
message="Code using this function should adapt `linux_utilities_modules.Modules.run_module_scanners`",
)
def mask_mods_list(
cls,
context: interfaces.context.ContextInterface,
kernel_layer_name: str,
mods: Iterator[extensions.module],
) -> List[Tuple[str, int, int]]:
"""
A helper function to mask the starting and end address of kernel modules
"""
mask = context.layers[kernel_layer_name].address_mask
return [
(
utility.array_to_string(mod.name),
mod.get_module_base() & mask,
(mod.get_module_base() & mask) + mod.get_core_size(),
)
for mod in mods
]
[docs]
@classmethod
@deprecation.method_being_removed(
removal_date="2025-09-25",
message="Use `module_lookup_by_address` to map address to their hosting kernel module and symbol.",
)
def lookup_module_address(
cls,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
handlers: List[Tuple[str, int, int]],
target_address: int,
) -> Tuple[str, str]:
"""
Searches between the start and end address of the kernel module using target_address.
Returns the module and symbol name of the address provided.
"""
kernel_module = context.modules[kernel_module_name]
mod_name = "UNKNOWN"
symbol_name = "N/A"
for name, start, end in handlers:
if start <= target_address <= end:
mod_name = name
if name == constants.linux.KERNEL_NAME:
symbols = list(
kernel_module.get_symbols_by_absolute_location(target_address)
)
if len(symbols):
symbol_name = (
symbols[0].split(constants.BANG)[1]
if constants.BANG in symbols[0]
else symbols[0]
)
break
return mod_name, symbol_name
[docs]
@classmethod
def get_module_info_for_module(
cls, address_mask: int, module: extensions.module
) -> Optional[ModuleInfo]:
"""
Returns a ModuleInfo instance for `module`
This performs address masking to avoid endless calls to `mask_mods_list`
Returns None if the name is smeared
"""
try:
mod_name = utility.array_to_string(module.name)
except exceptions.InvalidAddressException:
return None
start = module.get_module_base() & address_mask
end = start + module.get_core_size()
return ModuleInfo(module.vol.offset, mod_name, start, end)
[docs]
@classmethod
def run_modules_scanners(
cls,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
caller_wanted_gatherers: List[ModuleGathererInterface],
flatten: bool = True,
) -> Dict[str, List[ModuleInfo]]:
"""Run module scanning plugins and aggregate the results. It is designed
to not operate any inter-plugin results triage.
Rules for `caller_wanted_gatherers`:
If `ModuleGatherers.all_gathers_identifier` is specified then every source will be populated
If empty or an invalid gatherer is specified then a ValueError is thrown
All gatherer names must be unique
Args:
called_wanted_sources: The list of sources to gather modules.
flatten: Whether to de-duplicate modules across gatherers
Returns:
Dictionary mapping each gatherer to its corresponding result
"""
if not caller_wanted_gatherers:
raise ValueError(
"`caller_wanted_gatherers` must have at least one gatherer."
)
if not isinstance(caller_wanted_gatherers, Iterable):
raise ValueError("`caller_wanted_gatherers` must be iterable")
seen_names = set()
for gatherer in caller_wanted_gatherers:
if not issubclass(gatherer, ModuleGathererInterface):
raise ValueError(
f"Invalid gatherer sent through `caller_wanted_gatherers`: {gatherer}"
)
if not gatherer.name:
raise ValueError(
f"{gatherer} does not have a valid name attribute, which is required. It must be a non-zero length string."
)
if gatherer.name in seen_names:
raise ValueError(
f"{gatherer} has a name {gatherer.name} which has already been processed. Names must be unique."
)
seen_names.add(gatherer.name)
kernel = context.modules[kernel_module_name]
address_mask = context.layers[kernel.layer_name].address_mask
run_results: Dict[ModuleGathererInterface, List[ModuleInfo]] = {}
# Walk each source gathering modules
for gatherer in caller_wanted_gatherers:
run_results[gatherer.name] = []
# process each module coming from back the current source
for module in gatherer.gather_modules(context, kernel_module_name):
# the kernel sends back a ModuleInfo directly
if isinstance(module, ModuleInfo):
modinfo = module
else:
modinfo = cls.get_module_info_for_module(address_mask, module)
if modinfo:
run_results[gatherer.name].append(modinfo)
if flatten:
return cls.flatten_run_modules_results(run_results)
return run_results
[docs]
@staticmethod
@functools.lru_cache
def get_modules_memory_boundaries(
context: interfaces.context.ContextInterface,
vmlinux_module_name: str,
) -> Tuple[int, int]:
"""Determine the boundaries of the module allocation area
Args:
context: The context to retrieve required elements (layers, symbol tables) from
vmlinux_module_name: The name of the kernel module on which to operate
Returns:
A tuple containing the minimum and maximum addresses for the module allocation area.
"""
vmlinux = context.modules[vmlinux_module_name]
if vmlinux.has_symbol("mod_tree"):
# Kernel >= 5.19 58d208de3e8d87dbe196caf0b57cc58c7a3836ca
mod_tree = vmlinux.object_from_symbol("mod_tree")
modules_addr_min = mod_tree.addr_min
modules_addr_max = mod_tree.addr_max
elif vmlinux.has_symbol("module_addr_min"):
# 2.6.27 <= kernel < 5.19 3a642e99babe0617febb6f402e1e063479f489db
modules_addr_min = vmlinux.object_from_symbol("module_addr_min")
modules_addr_max = vmlinux.object_from_symbol("module_addr_max")
if isinstance(modules_addr_min, objects.Void):
raise exceptions.VolatilityException(
"Your ISF symbols lack type information. You may need to update the"
"ISF using the latest version of dwarf2json"
)
else:
raise exceptions.VolatilityException(
"Cannot find the module memory allocation area. Unsupported kernel"
)
return modules_addr_min, modules_addr_max
[docs]
@classmethod
def flatten_run_modules_results(
cls, run_results: Dict[str, List[ModuleInfo]], deduplicate: bool = True
) -> List[ModuleInfo]:
"""Flatten a dictionary mapping plugin names and modules list, to a single merged list.
This is useful to get a generic lookup list of all the detected modules.
Args:
run_results: dictionary of plugin names mapping a list of detected modules
deduplicate: remove duplicate modules, based on their offsets
Returns:
List of ModuleInfo objects
"""
uniq_modules: List[ModuleInfo] = []
seen_addresses: int = set()
for modules in run_results.values():
for module in modules:
if deduplicate and (module.start in seen_addresses):
continue
seen_addresses.add(module.start)
uniq_modules.append(module)
return uniq_modules
[docs]
@classmethod
def get_module_address_alignment(
cls,
context: interfaces.context.ContextInterface,
vmlinux_module_name: str,
) -> int:
"""Obtain the module memory address alignment.
struct module is aligned to the L1 cache line, which is typically 64 bytes for most
common i386/AMD64/ARM64 configurations. In some cases, it can be 128 bytes, but this
will still work.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
vmlinux_module_name: The name of the kernel module on which to operate
Returns:
The struct module alignment
"""
return context.modules[vmlinux_module_name].get_type("pointer").size
[docs]
@classmethod
def list_modules(
cls, context: interfaces.context.ContextInterface, vmlinux_module_name: str
) -> Iterable[interfaces.objects.ObjectInterface]:
"""Lists all the modules in the primary layer.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
vmlinux_symbols: The name of the table containing the kernel symbols
Yields:
The modules present in the `layer_name` layer's modules list
This function will throw a SymbolError exception if kernel module support is not enabled.
"""
vmlinux = context.modules[vmlinux_module_name]
modules = vmlinux.object_from_symbol(symbol_name="modules").cast("list_head")
table_name = vmlinux.symbol_table_name
yield from modules.to_list(table_name + constants.BANG + "module", "list")
[docs]
@classmethod
def get_kset_modules(
cls, context: interfaces.context.ContextInterface, vmlinux_name: str
) -> Dict[str, extensions.module]:
vmlinux = context.modules[vmlinux_name]
try:
module_kset = vmlinux.object_from_symbol("module_kset")
except exceptions.SymbolError:
module_kset = None
if not module_kset:
raise TypeError(
"This plugin requires the module_kset structure. This structure is not present in the supplied symbol table. This means you are either analyzing an unsupported kernel version or that your symbol table is corrupt."
)
ret = {}
kobj_off = vmlinux.get_type("module_kobject").relative_child_offset("kobj")
for kobj in module_kset.list.to_list(
vmlinux.symbol_table_name + constants.BANG + "kobject", "entry"
):
mod_kobj = vmlinux.object(
object_type="module_kobject",
offset=kobj.vol.offset - kobj_off,
absolute=True,
)
mod = mod_kobj.mod
try:
name = utility.pointer_to_string(kobj.name, 32)
except exceptions.InvalidAddressException:
continue
if kobj.name and kobj.reference_count() > 2:
ret[name] = mod
return ret
[docs]
@staticmethod
def validate_alignment_patterns(
addresses: Iterable[int],
address_alignment: int,
) -> bool:
"""Check if the memory addresses meet our alignments patterns
Args:
addresses: Iterable with the address values
address_alignment: Number of bytes for alignment validation
Returns:
True if all the addresses meet the alignment
"""
return all(addr % address_alignment == 0 for addr in addresses)
@classmethod
def _get_param_handlers(
cls, context: interfaces.context.ContextInterface, vmlinux_name: str
) -> Tuple[Dict[int, str], Dict[str, Optional[int]]]:
"""
This function builds the dictionaries needed to map kernel parameters to their types
We need these values and information to properly decode each parameter to its input representation
"""
kernel = context.modules[vmlinux_name]
# All the integer type parameters
pairs = {
"param_get_invbool": "int",
"param_get_bool": "int",
"param_get_int": "int",
"param_get_ulong": "long unsigned int",
"param_get_ullong": "long long unsigned int",
"param_get_long": "long int",
"param_get_uint": "unsigned int",
"param_get_ushort": "short unsigned int",
"param_get_short": "short int",
"param_get_byte": "char",
}
int_handlers: Dict[int, str] = {}
for sym_name, val_type in pairs.items():
try:
sym_address = kernel.get_absolute_symbol_address(sym_name)
except exceptions.SymbolError:
continue
int_handlers[sym_address] = val_type
# Strings, arrays, booleans
getters = {
"param_get_string": None,
"param_array_get": None,
"param_get_charp": None,
"param_get_bool": None,
"param_get_invbool": None,
}
for sym_name in getters:
try:
sym_address = kernel.get_absolute_symbol_address(sym_name)
except exceptions.SymbolError:
continue
getters[sym_name] = sym_address
return int_handlers, getters
@classmethod
def _get_param_val(
cls,
context: interfaces.context.ContextInterface,
vmlinux_name: str,
int_handlers,
getters,
module,
param,
) -> Optional[Union[str, int]]:
"""
Properly determines the type of a parameter and decodes based on the type.
The type is determined by examining its `get` function, which will be a pointer to
predefined operations handler for particular parameter types.
"""
# Attempt to retrieve the `get` pointer. Bail if smeared
try:
if hasattr(param, "get"):
param_func = param.get
else:
param_func = param.ops.get
except exceptions.InvalidAddressException:
return None
if not param_func:
return None
kernel = context.modules[vmlinux_name]
# For arrays, recursively get the value of each member as the type can be different
if param_func == getters["param_array_get"]:
array = param.arr
if array.num:
max_index = array.num.dereference()
else:
max_index = array.member("max")
if max_index > 32:
vollog.debug(
f"Skipping array parameter with invalid index for module {module.vol.offset:#x}"
)
return None
element_vals = []
for i in range(max_index):
kp = kernel.object(
object_type="kernel_param",
offset=array.elem + (array.elemsize * i),
absolute=True,
)
element_vals.append(
cls._get_param_val(
context, vmlinux_name, int_handlers, getters, module, kp
)
)
# nothing was gathered
if not element_vals:
return None
return ",".join([str(ele) for ele in element_vals])
# strings types
elif param_func in [getters["param_get_string"], getters["param_get_charp"]]:
try:
if param_func == getters["param_get_string"]:
count = param.member("str").maxlen
else:
count = 256
return utility.pointer_to_string(param.member("str"), count=count)
except exceptions.InvalidAddressException:
vollog.debug(
f"Skipping string parameter with invalid address for module {module.vol.offset:#x}"
)
return None
# The integer handles, which also encompass boolean handlers
elif param_func in int_handlers:
try:
int_value = kernel.object(
object_type=int_handlers[param_func], offset=param.arg
)
except exceptions.InvalidAddressException:
vollog.debug(
f"Skipping {int_handlers[param_func]} parameter with invalid address for module {module.vol.offset:#x}"
)
return None
if param_func == getters["param_get_bool"]:
if int_value == 0:
return "N"
else:
return "Y"
elif param_func == getters["param_get_invbool"]:
if int_value == 0:
return "Y"
else:
return "N"
else:
return int_value
else:
handler_symbol = kernel.get_symbols_by_absolute_location(param_func)
msg = f"Unknown kernel parameter handling function ({handler_symbol}) at address {param_func:#x} for module at {module.vol.offset:#x}"
# If a new kernel has a handler symbol we don't support then we want to always see that information
# If the handler doesn't map to a kernel symbol then its smeared/invalid
if handler_symbol:
vollog.warning(msg)
else:
vollog.debug(msg)
return None
[docs]
@classmethod
def get_load_parameters(
cls,
context: interfaces.context.ContextInterface,
vmlinux_name: str,
module: extensions.module,
) -> Generator[Tuple[str, Optional[Union[str, int]]], None, None]:
"""
Recovers the load parameters of the given kernel module
Returns a tuple (key,value) for each parameter
"""
if not hasattr(module, "kp"):
vollog.debug(
"kp member missing for struct module. Cannot recover parameters."
)
return None
if module.num_kp > 128:
vollog.debug(
f"Smeared number of parameters ({module.num_kp}) found for module at offset {module.vol.offset:#x}"
)
return None
kernel = context.modules[vmlinux_name]
int_handlers, getters = cls._get_param_handlers(context, vmlinux_name)
# Build the array of parameters
param_array = kernel.object(
object_type="array",
offset=module.kp.dereference().vol.offset,
subtype=kernel.get_type("kernel_param"),
count=module.num_kp,
absolute=True,
)
for i in range(len(param_array)):
try:
param = param_array[i]
name = utility.pointer_to_string(param.name, count=32)
except exceptions.InvalidAddressException:
vollog.debug(
f"Smeared load parameter module at offset {module.vol.offset:#x}"
)
continue
value = cls._get_param_val(
context, vmlinux_name, int_handlers, getters, module, param
)
yield name, value
# This module is responsible for producing an ELF file of a kernel module (LKM) loaded in memory
# This extraction task is quite complicated as the Linux kernel discards the ELF header at load time
# Due to this, to support static analysis, we must create an ELF header and proper file based on the sections
# There are also several other significant complications that we must deal with when trying to extract an LKM
# that can be analyzed with static analysis tools
# First, the .strtab points somewhere random and is kept off the module structure, not with the other sections
# Second, all of the symbols (.symtab) have mangled members that we must patch for anything to make sense
# Third, the section name string table (.shstrtab) is not an allocated section, meaning its not in memory
# Not having the .shstrtab makes analysis impossible-to-difficult for static analysis tools. To work around this,
# we create the .shstrtab based on the sections in memory and then glue it in as the final section
# ModuleExtract.extract_module is the entry point and only visible method for plugins
[docs]
class ModuleExtract(interfaces.configuration.VersionableInterface):
"""Extracts Linux kernel module structures into an analyzable ELF file"""
_version = (1, 0, 2)
_required_framework_version = (2, 25, 0)
framework.require_interface_version(*_required_framework_version)
[docs]
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.VersionRequirement(
name="linux_utilities_modules_modules",
component=Modules,
version=(3, 0, 2),
),
]
@classmethod
def _find_section(
cls, section_lookups: List[Tuple[str, int, int, int]], sym_address: int
) -> Optional[Tuple[str, int, int, int]]:
"""
Finds the section containing `sym_address`
"""
for name, index, address, size in section_lookups:
if address <= sym_address < address + size:
return name, index, address, size
return None
@classmethod
def _get_st_info_for_sym(
cls, sym: interfaces.objects.ObjectInterface, sym_address: int, sect_name: str
) -> bytes:
"""
This is a helper function called from `_fix_sym_table`
Calculates the `st_info` value for the given symbol
Spec: https://refspecs.linuxbase.org/elf/gabi4+/ch4.symtab.html
"""
if sym.st_name > 0:
# Global symbol
bind = linux_constants.STB_GLOBAL
if sym_address == 0:
sect_type = linux_constants.STT_NOTYPE
elif sect_name:
# rela = relocations
if sect_name.find(".text") != -1 and sect_name.find(".rela") == -1:
sect_type = linux_constants.STT_FUNC
else:
sect_type = linux_constants.STT_OBJECT
else:
# outside the module being extracted
sect_type = linux_constants.STT_NOTYPE
else:
# Local symbol
bind = linux_constants.STB_LOCAL
sect_type = linux_constants.STT_SECTION
# Build the st_info as ELF32_ST_INFO/ELF64_ST_INFO
bind_bits = (bind << 4) & 0xF0
type_bits = sect_type & 0xF
st_info_int = (bind_bits | type_bits) & 0xFF
return struct.pack("B", st_info_int)
@classmethod
def _get_fixed_sym_fields(
cls,
st_fmt: str,
sym: interfaces.objects.ObjectInterface,
sections: List[Tuple[str, int, int, int]],
) -> Tuple[str, int, int, int]:
"""
This is a helper function called from `_fix_sym_table`
The st_value, st_info, and st_shndx fields of each symbol are changed/mangled while loading
Static analysis tools do not understand these transformed values as they only make sense to the kernel loader
We must de-mangle these to have analysis tools understand symbols (a key aspect)
"""
# Start by trying to map a symbol to its section
sym_address = sym.st_value
sect_info = cls._find_section(sections, sym_address)
if not sect_info:
# Symbol does not point into the module being extracted
sect_name, sect_index, sect_address = None, None, None
st_value_int = sym_address
else:
# relative address inside the section
sect_name, sect_index, sect_address, _ = sect_info
st_value_int = sym_address - sect_address
# Get the fixed st_value, st_info, and st_shndx that are broken in the mapped file
# formatted to be written into the extracted file
st_value = struct.pack(st_fmt, st_value_int)
# returns formatted to be written into the extracted file
st_info = cls._get_st_info_for_sym(sym, sym_address, sect_name)
# format to reference its section, if any
if sect_name:
st_shndx = struct.pack("<H", sect_index)
else:
st_shndx = struct.pack("<H", sym.st_shndx)
return sect_name, st_value, st_info, st_shndx
@classmethod
def _fix_sym_table(
cls,
context: interfaces.context.ContextInterface,
vmlinux_name: str,
original_sections: Dict[int, str],
section_sizes: Dict[int, int],
sym_type_name: str,
st_fmt: str,
module: extensions.module,
) -> Optional[bytes]:
"""
Args:
context: The context on which to operate.
vmlinux_name: The name of the kernel module.
original_sections: Dict of module section addresses and names.
section_sizes: Dict of module section addresses and sizes.
sym_type_name: ELF symbol type name (should be one of "Elf64_Sym" or "Elf32_Sym").
st_fmt: "struct"-like unpack format string (should be one of "<Q" or "<I").
module: The Linux "module" object we're currently parsing.
This function implements the most painful part of the reconstruction
The symbols in .symtab are broken/mangled during loading.
We need to normalize these for static analysis tools to understand the references.
Without proper symbols, analysis is pretty pointless and gets nowhere.
Spec: https://refspecs.linuxbase.org/elf/gabi4+/ch4.symtab.html
"""
kernel = context.modules[vmlinux_name]
# Gather the section information into a list
section_lookups: List[Tuple[str, int, int, int]] = []
for index, (address, name) in enumerate(original_sections.items()):
# We are fixing symtab references...
if name == ".symtab":
continue
size = section_sizes[address]
# Add 1 to account for leading NULL section
section_lookups.append((name, index + 1, address, size))
# Build the array of symbols as they are in memory
sym_type = kernel.get_type(sym_type_name)
symbols = kernel.object(
object_type="array",
subtype=sym_type,
offset=module.section_symtab,
count=module.num_symtab,
absolute=True,
)
# used to hold the new (fixed) symbol table
sym_table_data = b""
# build a correct/normalized Elf32_Sym or Elf64_Sym for each symbol
for sym in symbols:
# get the mangled fields' correct values
sect_name, st_value, st_info, st_shndx = cls._get_fixed_sym_fields(
st_fmt, sym, section_lookups
)
# these aren't mangled during loading
st_name = struct.pack("<I", sym.st_name)
st_other = struct.pack("B", sym.st_other)
st_size = struct.pack(st_fmt, sym.st_size)
# The order as in the ELF specification. The order is not the same between 32 and 64 bit symbols
if st_fmt == "<I":
sym_data = st_name + st_value + st_size + st_info + st_other + st_shndx
else:
sym_data = st_name + st_info + st_other + st_shndx + st_value + st_size
# This should never happen regardless of smear or other issues in the data. We build the structure to spec.
if len(sym_data) != sym_type.size:
vollog.error(
f"Size of sym_data is {len(sym_data)} expected {sym_type.size} for symbol at value {sym.st_value} in section {sect_name}"
)
return None
# add the symbol's data to the overall symbol table
sym_table_data += sym_data
if len(sym_table_data) == 0:
sym_table_data = None
return sym_table_data
@classmethod
def _parse_sections(
cls,
context: interfaces.context.ContextInterface,
vmlinux_name: str,
module: extensions.module,
) -> Optional[Tuple[List, int, int]]:
"""
This function first parses the sections as maintained by the kernel
It then orders the sections by load address, and then gathers the data of each section
We also track the file_offset to correctly have alignment in the output file
.symtab requires special handling as its so broken in memory as described in `_fix_sym_table`
The data of .strtab is read directly off the module structure and not its section
as the section from the original module has no meaning after loading as the kernel does not reference it.
"""
kernel = context.modules[vmlinux_name]
kernel_layer = context.layers[kernel.layer_name]
modules_addr_min, modules_addr_max = Modules.get_modules_memory_boundaries(
context, vmlinux_name
)
modules_addr_min &= kernel_layer.address_mask
modules_addr_max &= kernel_layer.address_mask
original_sections = {}
for index, section in enumerate(module.get_sections()):
# Extra sanity check, to prevent OOM on heavily smeared samples at line
# "size = next_address - address"
if not (
modules_addr_min
<= section.address & kernel_layer.address_mask
< modules_addr_max
):
continue
name = section.get_name()
original_sections[section.address] = name
if not original_sections:
return None
if symbols.symbol_table_is_64bit(context, kernel.symbol_table_name):
sym_type = "Elf64_Sym"
elf_hdr_type = "Elf64_Ehdr"
st_fmt = "<Q"
else:
sym_type = "Elf32_Sym"
elf_hdr_type = "Elf32_Ehdr"
st_fmt = "<I"
# At this point, we have the sections starting addresses and names,
# but the kernel does not track the size
# To recover the size, we sort by address and then use the next section as the boundary to calculate size
# .symtab (the symbol table) and .strtab (the strings table) require special handling.
# All others can be read with padding
# get the addresses in sorted order, can index into `original_sections` for names
sorted_addresses = sorted(original_sections.keys())
# We need to track where .symtab is for symbol name offsets
symtab_address = None
strtab_index = None
# Section data starts after the file header
file_offset = kernel.get_type(elf_hdr_type).vol.size
# The ordered set of sections along with their fixed data
updated_sections: List[Tuple[str, int, int, bytes]] = []
# A mapping of section start addresses to sizes
# original_sections does not have this information for reasons explained above
section_sizes: Dict[int, int] = {}
for index, address in enumerate(sorted_addresses):
sect_name = original_sections[address]
# Read out the string table. The full size is not kept, so we give each symbol's string up to 256 bytes
if sect_name == ".strtab":
# Read out symbol strings, giving up to 256 bytes per symbol
data = kernel_layer.read(
module.section_strtab, module.num_symtab * 256, pad=True
)
# The string table should end with two NULLs, but the kernel does not enforce this
end_index = data.find(b"\x00\x00")
if end_index != -1:
data = data[: end_index + 1]
strtab_index = index
# The symbol table in memory is completely transformed and broken from how it appears on disk
# We need to process it last to fix the symbol table entries back to their correct values
elif sect_name == ".symtab":
symtab_address = address
continue
else:
# Compute based on the boundary of the next address-sorted section
try:
# Get the next section in order
next_address = sorted_addresses[index + 1]
size = next_address - address
except IndexError:
## We are at the last section so we need to pick a size
size = 0x10000
vollog.debug(f"Defaulting section {sect_name} to size {size:#x}")
# Read the section normally..
data = kernel_layer.read(address, size, pad=True)
# store the section information in order
updated_sections.append((sect_name, address, file_offset, data))
# Track sizes of each section
section_sizes[address] = len(data)
file_offset += len(data)
if symtab_address:
# Perform the painful demangling of symbol table structures
data = cls._fix_sym_table(
context,
vmlinux_name,
original_sections,
section_sizes,
sym_type,
st_fmt,
module,
)
if not data:
vollog.debug(
f"Could not construct a symbol table for module at {module.vol.offset}. Cannot recover."
)
return None
symtab_index = len(updated_sections)
# Manually add symtab with the correct data
updated_sections.append((".symtab", symtab_address, file_offset, data))
else:
vollog.debug(
f"Did not find a .symtab section for module at {module.vol.offset:#x}. Cannot recover."
)
return None
return updated_sections, strtab_index, symtab_index
@classmethod
def _make_elf_header(
cls, bits: int, sect_hdr_offset: int, num_sections: int
) -> Optional[bytes]:
"""
Creates a `bits` bit ELF header for the file based on recovered values
Called last as it needs information computed from the sections
Spec: https://refspecs.linuxfoundation.org/elf/gabi4+/ch4.eheader.html
"""
if bits == 32:
fmt = "<I"
e_ident = (
b"\x7f\x45\x4c\x46\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
e_machine_int = 3 # EM_X86_86
e_ehsize_int = 52
e_shentsize_int = 40
header_size = 52
else:
fmt = "<Q"
e_ident = (
b"\x7f\x45\x4c\x46\x02\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
e_machine_int = 0x3E # EM_X86_64
e_ehsize_int = 64
e_shentsize_int = 64
header_size = 64
e_type = struct.pack("<H", 1) # relocatable
e_machine = struct.pack("<H", e_machine_int)
e_version = struct.pack("<I", 1)
e_entry = b"\x00" * int(
bits / 8
) # The .init sections are freed after module load
e_phoff = b"\x00" * int(bits / 8) # No program headers
e_shoff = struct.pack(fmt, sect_hdr_offset)
e_flags = b"\x00\x00\x00\x00"
e_ehsize = struct.pack("<H", e_ehsize_int)
e_phentsize = b"\x00\x00"
e_phnum = b"\x00\x00"
e_shentsize = struct.pack("<H", e_shentsize_int)
e_shnum = struct.pack("<H", num_sections + 1)
e_shstrndx = struct.pack("<H", num_sections)
header = (
e_ident
+ e_type
+ e_machine
+ e_version
+ e_entry
+ e_phoff
+ e_shoff
+ e_flags
+ e_ehsize
+ e_phentsize
+ e_phnum
+ e_shentsize
+ e_shnum
+ e_shstrndx
)
# should never happen as we make the header ourselves
if len(header) != header_size:
vollog.error(
f"Making Elf header for arch {bits} created a header of {len(header)} bytes. Cannot proceed"
)
return None
return header
@classmethod
def _calc_sect_type(cls, section_name: str) -> Optional[int]:
"""
This function makes a best effort to map common section names
to their attributes
"""
known_sections = {
".note.gnu.build-id": linux_constants.SHT_NOTE,
".text": linux_constants.SHT_PROGBITS,
".init.text": linux_constants.SHT_PROGBITS,
".exit.text": linux_constants.SHT_PROGBITS,
".static_call.text": linux_constants.SHT_PROGBITS,
".rodata": linux_constants.SHT_PROGBITS,
".modinfo": linux_constants.SHT_PROGBITS,
"__param": linux_constants.SHT_PROGBITS,
".data": linux_constants.SHT_PROGBITS,
".gnu.linkonce.this_module": linux_constants.SHT_PROGBITS,
".comment": linux_constants.SHT_PROGBITS,
".shstrtab": linux_constants.SHT_STRTAB,
".symtab": linux_constants.SHT_SYMTAB,
".strtab": linux_constants.SHT_STRTAB,
}
sect_type_val = linux_constants.SHT_PROGBITS
if section_name.find(".rela.") != -1:
sect_type_val = linux_constants.SHT_RELA
elif section_name in known_sections:
sect_type_val = known_sections[section_name]
return sect_type_val
# all sections from memory are allocated (SHF_ALLOC)
# special check certain other sections to try and ensure extra flags are added where needed
@classmethod
def _calc_sect_flags(cls, name: str) -> int:
"""
Make a best effort to map common section names to their permissions
If we miss a section here, users of common static analysis tools can mark the
sections are writable or executable manually, but that becomes very cumbersome
and breaks initial analysis by the tool
"""
# All sections in memory are allocated (`A` in readelf -S)
flags = linux_constants.SHF_ALLOC
if name in [".text", ".init.text", ".exit.text", ".static_call.text"]:
flags = flags | linux_constants.SHF_EXECINSTR
elif name in [
".data",
".init.data",
".exit.data",
".bss",
"__tracepoints",
".data.once",
"_ftrace_events",
".gnu.linkonce.this_module",
]:
flags = flags | linux_constants.SHF_WRITE
return flags
@classmethod
def _calc_link(
cls, name: str, strtab_index: int, symtab_index: int, sect_type: int
) -> int:
"""
Calculates the link value for a section
The most important ones are symtab indexes for relocations
and to point the symbol table to the string tab
Spec: https://refspecs.linuxbase.org/elf/gabi4+/ch4.sheader.html
"""
# looking for RELA sections
if name.find(".rela.") != -1:
return symtab_index
# per spec: "The section header index of the associated string table."
elif sect_type == linux_constants.SHT_SYMTAB:
return strtab_index
return 0
@classmethod
def _calc_entsize(cls, name: str, sect_type: int, bits: int) -> int:
"""
Calculates the entsize for relocation sections and the symbol table section
Spec: https://refspecs.linuxbase.org/elf/gabi4+/ch4.sheader.html
"""
# looking for RELA sections
if name.find(".rela.") != -1:
return 24
# per spec: "The section header index of the associated string table."
elif sect_type == linux_constants.SHT_SYMTAB:
if bits == 32:
return 16
else:
return 24
return 0
@classmethod
def _make_section_header(
cls,
bits: int,
name_index: int,
name: str,
address: int,
size: int,
file_offset: int,
strtab_index: int,
symtab_index: int,
) -> Optional[bytes]:
"""
Creates a section header (Elf32_Shdr or Elf64_Shdr) for the given section
"""
if bits == 32:
fmt = "<I"
sect_size = 40
else:
fmt = "<Q"
sect_size = 64
sect_header_type_int = cls._calc_sect_type(name)
flags = cls._calc_sect_flags(name)
link = cls._calc_link(name, strtab_index, symtab_index, sect_header_type_int)
entsize = cls._calc_entsize(name, sect_header_type_int, bits)
try:
sh_name = struct.pack("<I", name_index)
sh_type = struct.pack("<I", sect_header_type_int)
sh_flags = struct.pack(fmt, flags)
sh_addr = struct.pack(fmt, address)
sh_offset = struct.pack(fmt, file_offset)
sh_size = struct.pack(fmt, size)
sh_link = struct.pack("<I", link)
sh_info = b"\x00" * 4
sh_addralign = struct.pack(fmt, 1)
sh_entsize = struct.pack(fmt, entsize)
# catch overflows of offset/address/size
except struct.error:
vollog.debug(
f"Unable to build section header for section {name} at address {address:#x}"
)
return None
data = (
sh_name
+ sh_type
+ sh_flags
+ sh_addr
+ sh_offset
+ sh_size
+ sh_link
+ sh_info
+ sh_addralign
+ sh_entsize
)
# This should never happen regardless of smear or other issues in the data. We build the structure to spec.
if len(data) != sect_size:
vollog.error(
f"Size of section data is {len(data)} expected {sect_size} for section {name} at address {address:#x}"
)
return None
return data
[docs]
@classmethod
def extract_module(
cls,
context: interfaces.context.ContextInterface,
vmlinux_name: str,
module: extensions.module,
) -> Optional[bytes]:
# Bail early if bad address sent in
try:
hasattr(module.sect_attrs, "nsections")
except exceptions.InvalidAddressException:
vollog.debug(f"module at offset {module.vol.offset:#x} is paged out.")
return None
# Gather sections
parse_sections_result = cls._parse_sections(context, vmlinux_name, module)
if parse_sections_result is None:
return None
updated_sections, strtab_index, symtab_index = parse_sections_result
kernel = context.modules[vmlinux_name]
# Figure out header sizes
if symbols.symbol_table_is_64bit(context, kernel.symbol_table_name):
header_type = "Elf64_Ehdr"
section_type = "Elf64_Shdr"
bits = 64
else:
header_type = "Elf32_Ehdr"
section_type = "Elf32_Shdr"
bits = 32
header_type_size = kernel.get_type(header_type).size
section_type_size = kernel.get_type(section_type).size
# Per Linux-spec, all LKMs must start with a null section header
# This buffer is used to hold the headers as they are built
sections_headers = b"\x00" * section_type_size
# Holder of the data of the sections
sections_data = b""
# the .shstrtab section is "\x00" + section name for each section
# followed by a terminating null.
# It starts with the null string (\x00)
shstrtab_data = b"\x00"
# Track where we end the sections and data to glue `.shstrtab` after
last_file_offset = None
last_sect_size = None
# Start at 1 in the string table
name_index = 1
# Create the actual section headers
for index, (name, address, file_offset, section_data) in enumerate(
updated_sections
):
# Make the section header
header_bytes = cls._make_section_header(
bits,
name_index,
name,
address,
len(section_data),
file_offset,
strtab_index,
symtab_index,
)
if not header_bytes:
vollog.debug(f"make_section_header failed for section {name}")
return None
# ndex into the string table
name_index += len(name) + 1
# concatenate the header and section bytes
sections_headers += header_bytes
sections_data += section_data
# track where we are so .shstrtab goes into correct offset
last_file_offset = file_offset
last_sect_size = len(section_data)
# append each section name to what will become .shstrtab
shstrtab_data += bytes(name, encoding="utf8") + b"\x00"
# stick our own section reference string at end
# name_index points to the end of the last section string after the loop ends
shstrtab_data += b".shstrtab\x00"
# create our .shstrtab section so sections have names
sections_headers += cls._make_section_header(
bits,
name_index,
".shstrtab",
0,
len(shstrtab_data),
last_file_offset + last_sect_size,
strtab_index,
symtab_index,
)
sections_data += shstrtab_data
num_sections = len(updated_sections) + 1
header = cls._make_elf_header(
bits,
header_type_size + len(sections_data),
num_sections,
)
if not header:
vollog.error(
f"Hit error creating Elf header for module at {module.vol.offset:#x}"
)
return None
# Return our beautiful, hand-crafted, farm raised ELF file
return header + sections_data + sections_headers
[docs]
class ModuleGathererLsmod(ModuleGathererInterface):
"""
Gathers modules from the main kernel list
"""
_version = (1, 0, 0)
name = "Lsmod"
[docs]
@classmethod
def gather_modules(
cls, context: interfaces.context.ContextInterface, kernel_module_name: str
) -> ModuleGathererInterface.gatherer_return_type:
yield from Modules.list_modules(context, kernel_module_name)
[docs]
class ModuleGathererSysFs(ModuleGathererInterface):
"""
Gathers modules from the sysfs /sys/modules objects
"""
_version = (1, 0, 0)
name = "SysFs"
[docs]
@classmethod
def gather_modules(
cls, context: interfaces.context.ContextInterface, kernel_module_name: str
) -> ModuleGathererInterface.gatherer_return_type:
kernel = context.modules[kernel_module_name]
sysfs_modules: dict = Modules.get_kset_modules(context, kernel_module_name)
for m_offset in sysfs_modules.values():
yield kernel.object(object_type="module", offset=m_offset, absolute=True)
[docs]
class ModuleGathererScanner(ModuleGathererInterface):
"""
Gathers modules by scanning memory
"""
_version = (1, 0, 0)
name = "Scanner"
[docs]
@classmethod
def gather_modules(
cls, context: interfaces.context.ContextInterface, kernel_module_name: str
) -> ModuleGathererInterface.gatherer_return_type:
modules_memory_boundaries = Modules.get_modules_memory_boundaries(
context, kernel_module_name
)
# Send in an empty list to not filter on any modules
yield from Modules.get_hidden_modules(
context=context,
vmlinux_module_name=kernel_module_name,
known_module_addresses=[],
modules_memory_boundaries=modules_memory_boundaries,
)
[docs]
class ModuleGathererKernel(ModuleGathererInterface):
"""
Creates a ModuleInfo instance for the kernel so that plugins
can determine when function pointers reference the kernel
"""
_version = (1, 0, 0)
name = "kernel"
[docs]
@classmethod
def gather_modules(
cls, context: interfaces.context.ContextInterface, kernel_module_name: str
) -> ModuleGathererInterface.gatherer_return_type:
"""
Returns a ModuleInfo instance that encodes the kernel
This is required to map function pointers to the kernel executable
"""
kernel = context.modules[kernel_module_name]
address_mask = context.layers[kernel.layer_name].address_mask
start_addr = kernel.object_from_symbol("_text")
start_addr = start_addr.vol.offset & address_mask
end_addr = kernel.object_from_symbol("_etext")
end_addr = end_addr.vol.offset & address_mask
yield ModuleInfo(start_addr, constants.linux.KERNEL_NAME, start_addr, end_addr)
[docs]
class ModuleGatherers(
interfaces.configuration.VersionableInterface,
interfaces.configuration.ConfigurableInterface,
):
_version = (1, 0, 0)
_required_framework_version = (2, 0, 0)
framework.require_interface_version(*_required_framework_version)
# Valid sources of cores kernel module gatherers to send to `run_module_scanners`
# With few exceptions, rootkit checking plugins want all sources
# This provides a stable identifier as new sources are added over time
all_gatherers_identifier = [
ModuleGathererLsmod,
ModuleGathererSysFs,
ModuleGathererScanner,
ModuleGathererKernel,
]
[docs]
@classmethod
def get_requirements(cls):
reqs = []
# for now, all versions are 1, this will be broken out if/when that changes
for gatherer in ModuleGatherers.all_gatherers_identifier:
reqs.append(
requirements.VersionRequirement(
name=gatherer.name.replace(" ", ""),
component=gatherer,
version=(1, 0, 0),
)
)
return reqs
[docs]
class ModuleDisplayPlugin(interfaces.configuration.VersionableInterface):
"""
Plugins that enumerate kernel modules (lsmod, check_modules, etc.)
must inherit from this class to have unified output columns across plugins.
The constructor of the plugin must call super() with the `implementation` set
"""
_version = (2, 0, 0)
[docs]
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.VersionRequirement(
name="linux_utilities_modules",
component=Modules,
version=(3, 0, 1),
),
requirements.VersionRequirement(
name="linux-tainting", component=tainting.Tainting, version=(1, 0, 0)
),
]
[docs]
@classmethod
def generate_results(
cls,
context: interfaces.context.ContextInterface,
implementation: Callable[
[interfaces.context.ContextInterface, str], Iterable[extensions.module]
],
kernel_module_name: str,
dump: bool,
open_implementation: Optional[interfaces.plugins.FileHandlerInterface],
):
"""
Uses the implementation set in the constructor call to produce consistent output fields
across module gathering plugins
"""
for module in implementation(context, kernel_module_name):
try:
name = utility.array_to_string(module.name)
except exceptions.InvalidAddressException:
vollog.debug(
f"Unable to recover name for module {module.vol.offset:#x} from implementation {implementation}"
)
continue
code_size = format_hints.Hex(
module.get_init_size() + module.get_core_size()
)
taints = ",".join(
tainting.Tainting.get_taints_parsed(
context, kernel_module_name, module.taints, True
)
)
parameters_iter = Modules.get_load_parameters(
context, kernel_module_name, module
)
parameters = ", ".join([f"{key}={value}" for key, value in parameters_iter])
file_name = renderers.NotApplicableValue()
if dump and open_implementation:
elf_data = ModuleExtract.extract_module(
context, kernel_module_name, module
)
if not elf_data:
vollog.warning(
f"Unable to reconstruct the ELF for module struct at {module.vol.offset:#x}"
)
file_name = renderers.NotAvailableValue()
else:
file_name = open_implementation.sanitize_filename(
f"kernel_module.{name}.{module.vol.offset:#x}.elf"
)
with open_implementation(file_name) as file_handle:
file_handle.write(elf_data)
yield (
0,
(
format_hints.Hex(module.vol.offset),
name,
format_hints.Hex(code_size),
taints,
parameters,
file_name,
),
)
columns_results = [
("Offset", format_hints.Hex),
("Module Name", str),
("Code Size", format_hints.Hex),
("Taints", str),
("Load Arguments", str),
("File Output", str),
]