# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from collections import namedtuple
from typing import List, Tuple, Optional, Generator, Callable
from volatility3.framework.objects import utility
from volatility3.framework import interfaces, renderers, symbols, exceptions
from volatility3.framework.configuration import requirements
from volatility3.plugins import yarascan
from volatility3.framework.renderers import format_hints
from volatility3.plugins.windows import pslist
vollog = logging.getLogger(__name__)
try:
import capstone
has_capstone = True
except ImportError:
has_capstone = False
# Full details on the techniques used in these plugins to detect EDR-evading malware
# can be found in our 20 page whitepaper submitted to DEFCON along with the presentation
# https://www.volexity.com/wp-content/uploads/2024/08/Defcon24_EDR_Evasion_Detection_White-Paper_Andrew-Case.pdf
syscall_finder_type = namedtuple(
"syscall_finder_type",
[
"get_syscall_target_address",
"wants_syscall_inst",
"rule_str",
"invalid_ops",
"termination_ops",
],
)
syscall_finder_type.__doc__ = """
This type is used to specify how malicious system call invocations should be found.
`get_syscall_target_address` is optionally used to extract the address containing the malicious 'syscall' instruction
`wants_syscall_inst` whether or not this method expects the 'syscall' instruction directly within the malicious code block
`rule` the opcode string to search for the malicious syscall instructions
`invalid_ops` instructions that only appear in invalid code blocks. Stops processing of the code block when encountered.
`termination_ops` instructions that are expected to be present in the code block and that stop processing
"""
[docs]
class DirectSystemCalls(interfaces.plugins.PluginInterface):
"""Detects the Direct System Call technique used to bypass EDRs"""
_required_framework_version = (2, 4, 0)
# 2.0.0 - changes signature of `get_tasks_to_scan`
_version = (2, 0, 0)
# DLLs that are expected to host system call invocations
valid_syscall_handlers = ("ntdll.dll", "win32u.dll")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.syscall_finder = syscall_finder_type(
# for direct system calls, we find the `syscall` instruction directly, so we already know the address
None,
# yes, we want the syscall instruction present as it is what this technique looks for
True,
# regex to find "\x0f\x05" (syscall) followed later by "\xc3" (ret)
# we allow spacing in between to break naive anti-analysis forms (e.g., TarTarus Gate)
# Standard techniques, such as HellsGate, look like:
# mov r10, rcx
# mov eax, <system call number>
# syscall
# ret
"/\\x0f\\x05[^\\xc3]{,24}\\xc3/",
# any of these will not be in a workable, malicious direct system call block
["jmp", "call", "leave", "int3"],
# the expected form is to end with a "ret" back to the calling code
["ret"],
)
[docs]
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
# create a list of requirements for vadyarascan
vadyarascan_requirements = [
requirements.ModuleRequirement(
name="kernel",
description="Windows kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="pslist", component=pslist.PsList, version=(3, 0, 0)
),
requirements.VersionRequirement(
name="yarascanner", component=yarascan.YaraScanner, version=(2, 1, 0)
),
requirements.VersionRequirement(
name="yarascan", component=yarascan.YaraScan, version=(2, 0, 0)
),
]
# get base yarascan requirements for command line options
yarascan_requirements = yarascan.YaraScan.get_yarascan_option_requirements()
# return the combined requirements
return yarascan_requirements + vadyarascan_requirements
@staticmethod
def _is_syscall_block(
disasm_func: Callable,
syscall_finder: syscall_finder_type,
data: bytes,
address: int,
) -> Optional[Tuple[str, "capstone._cs_insn"]]:
"""
Determines if the bytes starting at `data` represent a valid syscall instruction invocation block
To maliciously invoke the system call instruction, malware must do each of the following:
1) update RAX to the system call number
2) update R10 to the first parameter
3) hit the 'termination' instruction set in `syscall_finder_type`
We also track whether the 'syscall' instruction was encountered while parsing
This function is reusable for every technique we found and studied during the DEFCON research timeframe
Args:
disasm_func: capstone disassembly function gathered from `get_disasm_function`
syscall_finder: the method and constraints on the malicious system call blocks that the calling plugin knows how to find
data: the bytes from memory to search for malicious syscall invocations
address: the address from where `data` came from in the particular process
Returns:
Optional[Tuple[str, capstone._cs_insn]]: For valid blocks, the disassembled bytes in string from and the last (termination) instruction
"""
found_movr10 = False
found_movreax = False
found_syscall = False
found_end = False
end_inst = None
disasm_bytes = ""
for inst in disasm_func(data, address):
disasm_bytes += f"{inst.address:#x}: {inst.mnemonic} {inst.op_str}; "
# an instruction of all 0x00 opcodes
if inst.opcode.count(0) == len(inst.opcode):
break
op = inst.mnemonic
# invalid op, bail
if op in syscall_finder.invalid_ops:
break
# found the end instruction wanted by the caller
elif op in syscall_finder.termination_ops:
found_end = True
end_inst = inst
break
# track this no matter what to make code more re-usable
elif op == "syscall":
found_syscall = True
# if we hit a 'syscall' but RAX or R10 haven't been touched
# then we are in an invalid path, so bail
if not syscall_finder.wants_syscall_inst or (
not (found_movr10 and found_movreax)
):
break
else:
# attempt to see if any other instruction type wrote to registers
try:
_, regs_written = inst.regs_access()
except capstone.CsError:
continue
if regs_written:
for r in regs_written:
# track writes to eax/rax or R10
reg = inst.reg_name(r)
if reg in ["eax", "rax"]:
found_movreax = True
elif reg == "r10":
found_movr10 = True
# if any of these are missing, the block is invalid regardless of
# the technique we are trying to detect now or in the future
if not (found_movr10 and found_movreax and found_end):
return None
# if the finder requires a 'syscall' instruction then bail now if we didn't find one
if syscall_finder.wants_syscall_inst and not found_syscall:
return None
return disasm_bytes, end_inst
[docs]
@classmethod
def get_disasm_function(cls, architecture: str) -> Callable:
"""
Returns the disassembly handler for the given architecture
.detail is used to get full instruction information
Args:
architecture: the name of the architecture for the process being disassembled
Returns:
The disasm function from capstone for the given architecture
"""
disasm_types = {
"intel": capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32),
"intel64": capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64),
}
disasm_type = disasm_types[architecture]
disasm_type.detail = True
return disasm_type.disasm
@classmethod
def _is_valid_syscall(
cls,
syscall_finder: syscall_finder_type,
proc_layer: interfaces.layers.DataLayerInterface,
architecture: str,
vads: List[Tuple[int, int, str]],
address: int,
) -> Optional[Tuple[int, str]]:
"""
Args:
syscall_finder:
proc_layer: the memory layer of the process being scanned
architecture: the name of the architecture for the process being disassembled
vads: the ranges of this process under 10MB
address: the starting address to check for malicious syscall code blocks
Returns:
Optional[Tuple[int, str]]: For valid code blocks, the starting address of the block and the disassembly string
"""
# the number bytes behind the yara rule hit to scan
behind = 32
address = address - behind
try:
data = proc_layer.read(address, behind * 2)
except exceptions.InvalidAddressException:
return None
disasm_func = cls.get_disasm_function(architecture)
# since Intel does not have fixed-size instructions, we have to scan
# each byte offset and re-disassemble the remaining block
for offset in range(behind):
# if this looks like a system call back (r10, rax, ret/jmp)
syscall_info = cls._is_syscall_block(
disasm_func, syscall_finder, data[offset:], address + offset
)
if syscall_info:
disasm_bytes, end_inst = syscall_info
# if we can recover (and require) a target address for this malware technique
if syscall_finder.get_syscall_target_address:
target_address = syscall_finder.get_syscall_target_address(
proc_layer, end_inst
)
# could not determine the address -> invalid basic block
if not target_address:
continue
# we only care about calls to system call DLLs
path = cls.get_range_path(vads, target_address)
if not isinstance(path, str) or not path.lower().endswith(
cls.valid_syscall_handlers
):
continue
# return the address and disassembly string if all checks pass
return address + offset, disasm_bytes
return None
[docs]
@classmethod
def get_vad_maps(
cls,
task: interfaces.objects.ObjectInterface,
) -> List[Tuple[int, int, str]]:
"""Creates a map of start/end addresses within a virtual address
descriptor tree.
Args:
task: The EPROCESS object of which to traverse the vad tree
Returns:
An iterable of tuples containing start and end addresses for each descriptor
"""
vads: List[Tuple[int, int, str]] = []
# scan regions under 10MB
scan_max = 10 * 1000 * 1000
vad_root = task.get_vad_root()
for vad in vad_root.traverse():
if vad.get_size() < scan_max:
vads.append((vad.get_start(), vad.get_size(), vad.get_file_name()))
return vads
[docs]
@classmethod
def get_range_path(
cls, ranges: List[Tuple[int, int, str]], address: int
) -> Optional[str]:
"""
Returns the path for the range holding `address`, if found
Args:
ranges: VADs collected from `get_vad_maps`
address: the address to find
Returns:
The path holding the address, if any
"""
for start, size, path in ranges:
if start <= address < start + size:
return path
return None
[docs]
@classmethod
def get_tasks_to_scan(
cls,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
) -> Generator[
Tuple[interfaces.objects.ObjectInterface, str, str, str], None, None
]:
"""
Gathers active processes with the extra information needed
to detect malicious syscall instructions
Returns:
Generator of the process object, name, memory layer, and architecture
"""
# gather active processes
filter_func = pslist.PsList.create_active_process_filter()
kernel = context.modules[kernel_module_name]
is_32bit_arch = not symbols.symbol_table_is_64bit(
context=context, symbol_table_name=kernel.symbol_table_name
)
for proc in pslist.PsList.list_processes(
context=context,
kernel_module_name=kernel_module_name,
filter_func=filter_func,
):
proc_name = utility.array_to_string(proc.ImageFileName)
# skip Defender
if proc_name in ["MsMpEng.exe"]:
continue
try:
proc_layer_name = proc.add_process_layer()
except exceptions.InvalidAddressException:
continue
if is_32bit_arch or proc.get_is_wow64():
architecture = "intel"
else:
architecture = "intel64"
yield proc, proc_name, proc_layer_name, architecture
@classmethod
def _get_rule_hits(
cls,
context: interfaces.objects.ObjectInterface,
proc_layer: interfaces.layers.DataLayerInterface,
vads: List[Tuple[int, int, str]],
pattern: str,
) -> Generator[Tuple[int, Optional[str]], None, None]:
"""
Runs the given opcode rule through Yara and returns the address and file path of hits
Args:
context:
proc_layer: the layer to scan
vads: the ranges inside of the process being scanned
pattern: the opcodes rule from the plugin to detect a particular EDR-bypass technique
Returns:
Generator of the address and file path of hits
"""
sections = [(vad[0], vad[1]) for vad in vads]
rule = yarascan.YaraScanner.get_rule(pattern)
for hit in proc_layer.scan(
context=context,
scanner=yarascan.YaraScanner(rules=rule),
sections=sections,
):
address = hit[0]
path = cls.get_range_path(vads, address)
# ignore hits in the system call DLLs
if isinstance(path, str) and path.lower().endswith(
cls.valid_syscall_handlers
):
continue
yield address, path
def _generator(
self,
) -> Generator[Tuple[int, Tuple[str, int, Optional[str], int, str]], None, None]:
if not has_capstone:
vollog.warning(
"capstone is not installed. This plugin requires capstone to operate."
)
return
for proc, proc_name, proc_layer_name, architecture in self.get_tasks_to_scan(
self.context, self.config["kernel"]
):
proc_layer = self.context.layers[proc_layer_name]
vads = self.get_vad_maps(proc)
if not vads:
continue
# for each valid process, look for malicious syscall invocations
for address, vad_path in self._get_rule_hits(
self.context, proc_layer, vads, self.syscall_finder.rule_str
):
syscall_info = self._is_valid_syscall(
self.syscall_finder, proc_layer, architecture, vads, address
)
if not syscall_info:
continue
address, disasm_bytes = syscall_info
yield (
0,
(
proc_name,
proc.UniqueProcessId,
vad_path,
format_hints.Hex(address),
disasm_bytes,
),
)
[docs]
def run(self) -> renderers.TreeGrid:
return renderers.TreeGrid(
[
("Process", str),
("PID", int),
("Range", str),
("Address", format_hints.Hex),
("Disasm", str),
],
self._generator(),
)