# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import collections.abc
import logging
import socket as socket_module
from typing import Generator, Iterable, Iterator, Optional, Tuple
from volatility3.framework import constants
from volatility3.framework.constants.linux import SOCK_TYPES, SOCK_FAMILY
from volatility3.framework.constants.linux import IP_PROTOCOLS, IPV6_PROTOCOLS
from volatility3.framework.constants.linux import TCP_STATES, NETLINK_PROTOCOLS
from volatility3.framework.constants.linux import ETH_PROTOCOLS, BLUETOOTH_STATES
from volatility3.framework.constants.linux import BLUETOOTH_PROTOCOLS, SOCKET_STATES
from volatility3.framework import exceptions, objects, interfaces, symbols
from volatility3.framework.layers import linear
from volatility3.framework.objects import utility
from volatility3.framework.symbols import generic, linux, intermed
from volatility3.framework.symbols.linux.extensions import elf
vollog = logging.getLogger(__name__)
# Keep these in a basic module, to prevent import cycles when symbol providers require them
[docs]class module(generic.GenericIntelProcess):
[docs] def get_module_base(self):
if self.has_member("core_layout"):
return self.core_layout.base
else:
return self.module_core
[docs] def get_init_size(self):
if self.has_member("init_layout"):
return self.init_layout.size
elif self.has_member("init_size"):
return self.init_size
raise AttributeError(
"module -> get_init_size: Unable to determine .init section size of module"
)
[docs] def get_core_size(self):
if self.has_member("core_layout"):
return self.core_layout.size
elif self.has_member("core_size"):
return self.core_size
raise AttributeError(
"module -> get_core_size: Unable to determine core size of module"
)
[docs] def get_module_core(self):
if self.has_member("core_layout"):
return self.core_layout.base
elif self.has_member("module_core"):
return self.module_core
raise AttributeError("module -> get_module_core: Unable to get module core")
[docs] def get_module_init(self):
if self.has_member("init_layout"):
return self.init_layout.base
elif self.has_member("module_init"):
return self.module_init
raise AttributeError("module -> get_module_core: Unable to get module init")
[docs] def get_name(self):
"""Get the name of the module as a string"""
return utility.array_to_string(self.name)
def _get_sect_count(self, grp):
"""Try to determine the number of valid sections"""
arr = self._context.object(
self.get_symbol_table().name + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=grp.attrs,
subtype=self._context.symbol_space.get_type(
self.get_symbol_table().name + constants.BANG + "pointer"
),
count=25,
)
idx = 0
while arr[idx]:
idx = idx + 1
return idx
[docs] def get_sections(self):
"""Get sections of the module"""
if self.sect_attrs.has_member("nsections"):
num_sects = self.sect_attrs.nsections
else:
num_sects = self._get_sect_count(self.sect_attrs.grp)
arr = self._context.object(
self.get_symbol_table().name + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=self.sect_attrs.attrs.vol.offset,
subtype=self._context.symbol_space.get_type(
self.get_symbol_table().name + constants.BANG + "module_sect_attr"
),
count=num_sects,
)
for attr in arr:
yield attr
[docs] def get_symbols(self):
if symbols.symbol_table_is_64bit(self._context, self.get_symbol_table().name):
prefix = "Elf64_"
else:
prefix = "Elf32_"
elf_table_name = intermed.IntermediateSymbolTable.create(
self.context,
self.config_path,
"linux",
"elf",
native_types=None,
class_types=elf.class_types,
)
syms = self._context.object(
self.get_symbol_table().name + constants.BANG + "array",
layer_name=self.vol.layer_name,
offset=self.section_symtab,
subtype=self._context.symbol_space.get_type(
elf_table_name + constants.BANG + prefix + "Sym"
),
count=self.num_symtab + 1,
)
if self.section_strtab:
for sym in syms:
sym.set_cached_strtab(self.section_strtab)
yield sym
[docs] def get_symbol(self, wanted_sym_name):
"""Get value for a given symbol name"""
for sym in self.get_symbols():
sym_name = sym.get_name()
sym_addr = sym.st_value
if wanted_sym_name == sym_name:
return sym_addr
return None
@property
def section_symtab(self):
if self.has_member("kallsyms"):
return self.kallsyms.symtab
elif self.has_member("symtab"):
return self.symtab
raise AttributeError("module -> symtab: Unable to get symtab")
@property
def num_symtab(self):
if self.has_member("kallsyms"):
return int(self.kallsyms.num_symtab)
elif self.has_member("num_symtab"):
return int(self.num_symtab)
raise AttributeError(
"module -> num_symtab: Unable to determine number of symbols"
)
@property
def section_strtab(self):
# Newer kernels
if self.has_member("kallsyms"):
return self.kallsyms.strtab
# Older kernels
elif self.has_member("strtab"):
return self.strtab
raise AttributeError("module -> strtab: Unable to get strtab")
[docs]class task_struct(generic.GenericIntelProcess):
[docs] def add_process_layer(
self, config_prefix: str = None, preferred_name: str = None
) -> Optional[str]:
"""Constructs a new layer based on the process's DTB.
Returns the name of the Layer or None.
"""
parent_layer = self._context.layers[self.vol.layer_name]
try:
pgd = self.mm.pgd
except exceptions.InvalidAddressException:
return None
if not isinstance(parent_layer, linear.LinearlyMappedLayer):
raise TypeError(
"Parent layer is not a translation layer, unable to construct process layer"
)
dtb, layer_name = parent_layer.translate(pgd)
if not dtb:
return None
if preferred_name is None:
preferred_name = self.vol.layer_name + f"_Process{self.pid}"
# Add the constructed layer and return the name
return self._add_process_layer(
self._context, dtb, config_prefix, preferred_name
)
[docs] def get_process_memory_sections(
self, heap_only: bool = False
) -> Generator[Tuple[int, int], None, None]:
"""Returns a list of sections based on the memory manager's view of
this task's virtual memory."""
for vma in self.mm.get_mmap_iter():
start = int(vma.vm_start)
end = int(vma.vm_end)
if heap_only and not (start <= self.mm.brk and end >= self.mm.start_brk):
continue
else:
# FIXME: Check if this actually needs to be printed out or not
vollog.info(
f"adding vma: {start:x} {self.mm.brk:x} | {end:x} {self.mm.start_brk:x}"
)
yield (start, end - start)
@property
def is_kernel_thread(self) -> bool:
"""Checks if this task is a kernel thread.
Returns:
bool: True, if this task is a kernel thread. Otherwise, False.
"""
return (self.flags & constants.linux.PF_KTHREAD) != 0
@property
def is_thread_group_leader(self) -> bool:
"""Checks if this task is a thread group leader.
Returns:
bool: True, if this task is a thread group leader. Otherwise, False.
"""
return self.tgid == self.pid
@property
def is_user_thread(self) -> bool:
"""Checks if this task is a user thread.
Returns:
bool: True, if this task is a user thread. Otherwise, False.
"""
return not self.is_kernel_thread and self.tgid != self.pid
[docs] def get_threads(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns a list of the task_struct based on the list_head
thread_node structure."""
task_symbol_table_name = self.get_symbol_table_name()
# iterating through the thread_list from thread_group
# this allows iterating through pointers to grab the
# threads and using the thread_group offset to get the
# corresponding task_struct
for task in self.thread_group.to_list(
f"{task_symbol_table_name}{constants.BANG}task_struct", "thread_group"
):
yield task
[docs]class fs_struct(objects.StructType):
[docs] def get_root_dentry(self):
# < 2.6.26
if self.has_member("rootmnt"):
return self.root
elif self.root.has_member("dentry"):
return self.root.dentry
raise AttributeError("Unable to find the root dentry")
[docs] def get_root_mnt(self):
# < 2.6.26
if self.has_member("rootmnt"):
return self.rootmnt
elif self.root.has_member("mnt"):
return self.root.mnt
raise AttributeError("Unable to find the root mount")
[docs]class mm_struct(objects.StructType):
[docs] def get_mmap_iter(self) -> Iterable[interfaces.objects.ObjectInterface]:
"""Returns an iterator for the mmap list member of an mm_struct."""
if not self.mmap:
return
yield self.mmap
seen = {self.mmap.vol.offset}
link = self.mmap.vm_next
while link != 0 and link.vol.offset not in seen:
yield link
seen.add(link.vol.offset)
link = link.vm_next
[docs]class super_block(objects.StructType):
# include/linux/kdev_t.h
MINORBITS = 20
# Superblock flags
SB_RDONLY = 1 # Mount read-only
SB_NOSUID = 2 # Ignore suid and sgid bits
SB_NODEV = 4 # Disallow access to device special files
SB_NOEXEC = 8 # Disallow program execution
SB_SYNCHRONOUS = 16 # Writes are synced at once
SB_MANDLOCK = 64 # Allow mandatory locks on an FS
SB_DIRSYNC = 128 # Directory modifications are synchronous
SB_NOATIME = 1024 # Do not update access times
SB_NODIRATIME = 2048 # Do not update directory access times
SB_SILENT = 32768
SB_POSIXACL = 1 << 16 # VFS does not apply the umask
SB_KERNMOUNT = 1 << 22 # this is a kern_mount call
SB_I_VERSION = 1 << 23 # Update inode I_version field
SB_LAZYTIME = 1 << 25 # Update the on-disk [acm]times lazily
SB_OPTS = {
SB_SYNCHRONOUS: "sync",
SB_DIRSYNC: "dirsync",
SB_MANDLOCK: "mand",
SB_LAZYTIME: "lazytime",
}
@property
def major(self) -> int:
return self.s_dev >> self.MINORBITS
@property
def minor(self) -> int:
return self.s_dev & ((1 << self.MINORBITS) - 1)
[docs] def get_flags_access(self) -> str:
return "ro" if self.s_flags & self.SB_RDONLY else "rw"
[docs] def get_flags_opts(self) -> Iterable[str]:
sb_opts = [
self.SB_OPTS[sb_opt] for sb_opt in self.SB_OPTS if sb_opt & self.s_flags
]
return sb_opts
[docs] def get_type(self):
mnt_sb_type = utility.pointer_to_string(self.s_type.name, count=255)
if self.s_subtype:
mnt_sb_subtype = utility.pointer_to_string(self.s_subtype, count=255)
mnt_sb_type += "." + mnt_sb_subtype
return mnt_sb_type
[docs]class vm_area_struct(objects.StructType):
perm_flags = {
0x00000001: "r",
0x00000002: "w",
0x00000004: "x",
}
extended_flags = {
0x00000001: "VM_READ",
0x00000002: "VM_WRITE",
0x00000004: "VM_EXEC",
0x00000008: "VM_SHARED",
0x00000010: "VM_MAYREAD",
0x00000020: "VM_MAYWRITE",
0x00000040: "VM_MAYEXEC",
0x00000080: "VM_MAYSHARE",
0x00000100: "VM_GROWSDOWN",
0x00000200: "VM_NOHUGEPAGE",
0x00000400: "VM_PFNMAP",
0x00000800: "VM_DENYWRITE",
0x00001000: "VM_EXECUTABLE",
0x00002000: "VM_LOCKED",
0x00004000: "VM_IO",
0x00008000: "VM_SEQ_READ",
0x00010000: "VM_RAND_READ",
0x00020000: "VM_DONTCOPY",
0x00040000: "VM_DONTEXPAND",
0x00080000: "VM_RESERVED",
0x00100000: "VM_ACCOUNT",
0x00200000: "VM_NORESERVE",
0x00400000: "VM_HUGETLB",
0x00800000: "VM_NONLINEAR",
0x01000000: "VM_MAPPED_COP__VM_HUGEPAGE",
0x02000000: "VM_INSERTPAGE",
0x04000000: "VM_ALWAYSDUMP",
0x08000000: "VM_CAN_NONLINEAR",
0x10000000: "VM_MIXEDMAP",
0x20000000: "VM_SAO",
0x40000000: "VM_PFN_AT_MMAP",
0x80000000: "VM_MERGEABLE",
}
def _parse_flags(self, vm_flags, parse_flags) -> str:
"""Returns an string representation of the flags in a
vm_area_struct."""
retval = ""
for mask, char in parse_flags.items():
if (vm_flags & mask) == mask:
retval = retval + char
else:
retval = retval + "-"
return retval
# only parse the rwx bits
[docs] def get_protection(self) -> str:
return self._parse_flags(self.vm_flags & 0b1111, vm_area_struct.perm_flags)
# used by malfind
[docs] def get_flags(self) -> str:
return self._parse_flags(self.vm_flags, self.extended_flags)
[docs] def get_page_offset(self) -> int:
if self.vm_file == 0:
return 0
return self.vm_pgoff << constants.linux.PAGE_SHIFT
[docs] def get_name(self, context, task):
if self.vm_file != 0:
fname = linux.LinuxUtilities.path_for_file(context, task, self.vm_file)
elif self.vm_start <= task.mm.start_brk and self.vm_end >= task.mm.brk:
fname = "[heap]"
elif self.vm_start <= task.mm.start_stack <= self.vm_end:
fname = "[stack]"
elif (
self.vm_mm.context.has_member("vdso")
and self.vm_start == self.vm_mm.context.vdso
):
fname = "[vdso]"
else:
fname = "Anonymous Mapping"
return fname
# used by malfind
[docs] def is_suspicious(self):
ret = False
flags_str = self.get_protection()
if flags_str == "rwx":
ret = True
elif flags_str == "r-x" and self.vm_file.dereference().vol.offset == 0:
ret = True
return ret
[docs]class qstr(objects.StructType):
[docs] def name_as_str(self) -> str:
if self.has_member("len"):
str_length = self.len + 1 # Maximum length should include null terminator
else:
str_length = 255
try:
ret = objects.utility.pointer_to_string(self.name, str_length)
except (exceptions.InvalidAddressException, ValueError):
ret = ""
return ret
[docs]class dentry(objects.StructType):
[docs] def path(self) -> str:
"""Based on __dentry_path Linux kernel function"""
reversed_path = []
dentry_seen = set()
current_dentry = self
while (
not current_dentry.is_root()
and current_dentry.vol.offset not in dentry_seen
):
parent = current_dentry.d_parent
reversed_path.append(current_dentry.d_name.name_as_str())
dentry_seen.add(current_dentry.vol.offset)
current_dentry = parent
return "/" + "/".join(reversed(reversed_path))
[docs] def is_root(self) -> bool:
return self.vol.offset == self.d_parent
[docs] def is_subdir(self, old_dentry):
"""Is this dentry a subdirectory of old_dentry?
Returns true if this dentry is a subdirectory of the parent (at any depth).
Otherwise, it returns false.
"""
if self.vol.offset == old_dentry:
return True
return self.d_ancestor(old_dentry)
[docs] def d_ancestor(self, ancestor_dentry):
"""Search for an ancestor
Returns the ancestor dentry which is a child of "ancestor_dentry",
if "ancestor_dentry" is an ancestor of "child_dentry", else None.
"""
dentry_seen = set()
current_dentry = self
while (
not current_dentry.is_root()
and current_dentry.vol.offset not in dentry_seen
):
if current_dentry.d_parent == ancestor_dentry.vol.offset:
return current_dentry
dentry_seen.add(current_dentry.vol.offset)
current_dentry = current_dentry.d_parent
return None
[docs]class struct_file(objects.StructType):
[docs] def get_dentry(self) -> interfaces.objects.ObjectInterface:
if self.has_member("f_dentry"):
return self.f_dentry
elif self.has_member("f_path"):
return self.f_path.dentry
else:
raise AttributeError("Unable to find file -> dentry")
[docs] def get_vfsmnt(self) -> interfaces.objects.ObjectInterface:
if self.has_member("f_vfsmnt"):
return self.f_vfsmnt
elif self.has_member("f_path"):
return self.f_path.mnt
else:
raise AttributeError("Unable to find file -> vfs mount")
[docs]class list_head(objects.StructType, collections.abc.Iterable):
[docs] def to_list(
self,
symbol_type: str,
member: str,
forward: bool = True,
sentinel: bool = True,
layer: Optional[str] = None,
) -> Iterator[interfaces.objects.ObjectInterface]:
"""Returns an iterator of the entries in the list.
Args:
symbol_type: Type of the list elements
member: Name of the list_head member in the list elements
forward: Set false to go backwards
sentinel: Whether self is a "sentinel node", meaning it is not embedded in a member of the list
Sentinel nodes are NOT yielded. See https://en.wikipedia.org/wiki/Sentinel_node for further reference
layer: Name of layer to read from
Yields:
Objects of the type specified via the "symbol_type" argument.
"""
layer = layer or self.vol.layer_name
relative_offset = self._context.symbol_space.get_type(
symbol_type
).relative_child_offset(member)
direction = "prev"
if forward:
direction = "next"
try:
link = getattr(self, direction).dereference()
except exceptions.InvalidAddressException:
return
if not sentinel:
yield self._context.object(
symbol_type, layer, offset=self.vol.offset - relative_offset
)
seen = {self.vol.offset}
while link.vol.offset not in seen:
obj = self._context.object(
symbol_type, layer, offset=link.vol.offset - relative_offset
)
yield obj
seen.add(link.vol.offset)
try:
link = getattr(link, direction).dereference()
except exceptions.InvalidAddressException:
break
def __iter__(self) -> Iterator[interfaces.objects.ObjectInterface]:
return self.to_list(self.vol.parent.vol.type_name, self.vol.member_name)
[docs]class files_struct(objects.StructType):
[docs] def get_fds(self) -> interfaces.objects.ObjectInterface:
if self.has_member("fdt"):
return self.fdt.fd.dereference()
elif self.has_member("fd"):
return self.fd.dereference()
else:
raise AttributeError("Unable to find files -> file descriptors")
[docs] def get_max_fds(self) -> interfaces.objects.ObjectInterface:
if self.has_member("fdt"):
return self.fdt.max_fds
elif self.has_member("max_fds"):
return self.max_fds
else:
raise AttributeError("Unable to find files -> maximum file descriptors")
[docs]class mount(objects.StructType):
MNT_NOSUID = 0x01
MNT_NODEV = 0x02
MNT_NOEXEC = 0x04
MNT_NOATIME = 0x08
MNT_NODIRATIME = 0x10
MNT_RELATIME = 0x20
MNT_READONLY = 0x40
MNT_SHRINKABLE = 0x100
MNT_WRITE_HOLD = 0x200
MNT_SHARED = 0x1000
MNT_UNBINDABLE = 0x2000
MNT_FLAGS = {
MNT_NOSUID: "nosuid",
MNT_NODEV: "nodev",
MNT_NOEXEC: "noexec",
MNT_NOATIME: "noatime",
MNT_NODIRATIME: "nodiratime",
MNT_RELATIME: "relatime",
}
[docs] def get_mnt_sb(self):
if self.has_member("mnt"):
return self.mnt.mnt_sb
elif self.has_member("mnt_sb"):
return self.mnt_sb
else:
raise AttributeError("Unable to find mount -> super block")
[docs] def get_mnt_root(self):
if self.has_member("mnt"):
return self.mnt.mnt_root
elif self.has_member("mnt_root"):
return self.mnt_root
else:
raise AttributeError("Unable to find mount -> mount root")
[docs] def get_mnt_flags(self):
if self.has_member("mnt"):
return self.mnt.mnt_flags
elif self.has_member("mnt_flags"):
return self.mnt_flags
else:
raise AttributeError("Unable to find mount -> mount flags")
[docs] def get_mnt_parent(self):
return self.mnt_parent
[docs] def get_mnt_mountpoint(self):
return self.mnt_mountpoint
[docs] def get_flags_access(self) -> str:
return "ro" if self.get_mnt_flags() & self.MNT_READONLY else "rw"
[docs] def get_flags_opts(self) -> Iterable[str]:
flags = [
self.MNT_FLAGS[mntflag]
for mntflag in self.MNT_FLAGS
if mntflag & self.get_mnt_flags()
]
return flags
[docs] def is_shared(self) -> bool:
return self.get_mnt_flags() & self.MNT_SHARED
[docs] def is_unbindable(self) -> bool:
return self.get_mnt_flags() & self.MNT_UNBINDABLE
[docs] def is_slave(self) -> bool:
return self.mnt_master and self.mnt_master.vol.offset != 0
[docs] def get_devname(self) -> str:
return utility.pointer_to_string(self.mnt_devname, count=255)
[docs] def has_parent(self) -> bool:
return self.vol.offset != self.mnt_parent
[docs] def get_dominating_id(self, root) -> int:
"""Get ID of closest dominating peer group having a representative under the given root."""
mnt_seen = set()
current_mnt = self.mnt_master
while (
current_mnt
and current_mnt.vol.offset != 0
and current_mnt.vol.offset not in mnt_seen
):
peer = current_mnt.get_peer_under_root(self.mnt_ns, root)
if peer and peer.vol.offset != 0:
return peer.mnt_group_id
mnt_seen.add(current_mnt.vol.offset)
current_mnt = current_mnt.mnt_master
return 0
[docs] def get_peer_under_root(self, ns, root):
"""Return true if path is reachable from root.
It mimics the kernel function is_path_reachable(), ref: fs/namespace.c
"""
mnt_seen = set()
current_mnt = self
while current_mnt.vol.offset not in mnt_seen:
if current_mnt.mnt_ns == ns and current_mnt.is_path_reachable(
current_mnt.mnt.mnt_root, root
):
return current_mnt
mnt_seen.add(current_mnt.vol.offset)
current_mnt = current_mnt.next_peer()
if current_mnt.vol.offset == self.vol.offset:
break
return None
[docs] def is_path_reachable(self, current_dentry, root):
"""Return true if path is reachable.
It mimics the kernel function with same name, ref fs/namespace.c:
"""
mnt_seen = set()
current_mnt = self
while (
current_mnt.mnt.vol.offset != root.mnt
and current_mnt.has_parent()
and current_mnt.vol.offset not in mnt_seen
):
current_dentry = current_mnt.mnt_mountpoint
mnt_seen.add(current_mnt.vol.offset)
current_mnt = current_mnt.mnt_parent
return current_mnt.mnt.vol.offset == root.mnt and current_dentry.is_subdir(
root.dentry
)
[docs] def next_peer(self):
table_name = self.vol.type_name.split(constants.BANG)[0]
mount_struct = "{0}{1}mount".format(table_name, constants.BANG)
offset = self._context.symbol_space.get_type(
mount_struct
).relative_child_offset("mnt_share")
return self._context.object(
mount_struct,
self.vol.layer_name,
offset=self.mnt_share.next.vol.offset - offset,
)
[docs]class vfsmount(objects.StructType):
[docs] def is_valid(self):
return (
self.get_mnt_sb() != 0
and self.get_mnt_root() != 0
and self.get_mnt_parent() != 0
)
def _get_real_mnt(self):
table_name = self.vol.type_name.split(constants.BANG)[0]
mount_struct = f"{table_name}{constants.BANG}mount"
offset = self._context.symbol_space.get_type(
mount_struct
).relative_child_offset("mnt")
return self._context.object(
mount_struct, self.vol.layer_name, offset=self.vol.offset - offset
)
[docs] def get_mnt_parent(self):
if self.has_member("mnt_parent"):
return self.mnt_parent
else:
return self._get_real_mnt().mnt_parent
[docs] def get_mnt_mountpoint(self):
if self.has_member("mnt_mountpoint"):
return self.mnt_mountpoint
else:
return self._get_real_mnt().mnt_mountpoint
[docs] def get_mnt_root(self):
return self.mnt_root
[docs]class kobject(objects.StructType):
[docs] def reference_count(self):
refcnt = self.kref.refcount
if self.has_member("counter"):
ret = refcnt.counter
else:
ret = refcnt.refs.counter
return ret
[docs]class mnt_namespace(objects.StructType):
[docs] def get_inode(self):
if self.has_member("proc_inum"):
return self.proc_inum
elif self.ns.has_member("inum"):
return self.ns.inum
else:
raise AttributeError("Unable to find mnt_namespace inode")
[docs] def get_mount_points(self):
table_name = self.vol.type_name.split(constants.BANG)[0]
mnt_type = table_name + constants.BANG + "mount"
if not self._context.symbol_space.has_type(mnt_type):
# Old kernels ~ 2.6
mnt_type = table_name + constants.BANG + "vfsmount"
for mount in self.list.to_list(mnt_type, "mnt_list"):
yield mount
[docs]class net(objects.StructType):
[docs] def get_inode(self):
if self.has_member("proc_inum"):
return self.proc_inum
elif self.ns.has_member("inum"):
return self.ns.inum
else:
raise AttributeError("Unable to find net_namespace inode")
[docs]class socket(objects.StructType):
def _get_vol_kernel(self):
symbol_table_arr = self.vol.type_name.split("!", 1)
symbol_table = symbol_table_arr[0] if len(symbol_table_arr) == 2 else None
module_names = list(
self._context.modules.get_modules_by_symbol_tables(symbol_table)
)
if not module_names:
raise ValueError(f"No module using the symbol table {symbol_table}")
kernel_module_name = module_names[0]
kernel = self._context.modules[kernel_module_name]
return kernel
[docs] def get_inode(self):
try:
kernel = self._get_vol_kernel()
except ValueError:
return 0
socket_alloc = linux.LinuxUtilities.container_of(
self.vol.offset, "socket_alloc", "socket", kernel
)
vfs_inode = socket_alloc.vfs_inode
return vfs_inode.i_ino
[docs] def get_state(self):
socket_state_idx = self.state
if 0 <= socket_state_idx < len(SOCKET_STATES):
return SOCKET_STATES[socket_state_idx]
[docs]class sock(objects.StructType):
[docs] def get_family(self):
family_idx = self.__sk_common.skc_family
if 0 <= family_idx < len(SOCK_FAMILY):
return SOCK_FAMILY[family_idx]
[docs] def get_type(self):
return SOCK_TYPES.get(self.sk_type, "")
[docs] def get_inode(self):
if not self.sk_socket:
return 0
return self.sk_socket.get_inode()
[docs] def get_protocol(self):
return
[docs] def get_state(self):
# Return the generic socket state
if self.has_member("sk"):
return self.sk.sk_socket.get_state()
return self.sk_socket.get_state()
[docs]class unix_sock(objects.StructType):
[docs] def get_name(self):
if not self.addr:
return
sockaddr_un = self.addr.name.cast("sockaddr_un")
saddr = str(utility.array_to_string(sockaddr_un.sun_path))
return saddr
[docs] def get_protocol(self):
return
[docs] def get_state(self):
"""Return a string representing the sock state."""
# Unix socket states reuse (a subset) of the inet_sock states contants
if self.sk.get_type() == "STREAM":
state_idx = self.sk.__sk_common.skc_state
if 0 <= state_idx < len(TCP_STATES):
return TCP_STATES[state_idx]
else:
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs] def get_inode(self):
return self.sk.get_inode()
[docs]class inet_sock(objects.StructType):
[docs] def get_family(self):
family_idx = self.sk.__sk_common.skc_family
if 0 <= family_idx < len(SOCK_FAMILY):
return SOCK_FAMILY[family_idx]
[docs] def get_protocol(self):
# If INET6 family and a proto is defined, we use that specific IPv6 protocol.
# Otherwise, we use the standard IP protocol.
protocol = IP_PROTOCOLS.get(self.sk.sk_protocol)
if self.get_family() == "AF_INET6":
protocol = IPV6_PROTOCOLS.get(self.sk.sk_protocol, protocol)
return protocol
[docs] def get_state(self):
"""Return a string representing the sock state."""
if self.sk.get_type() == "STREAM":
state_idx = self.sk.__sk_common.skc_state
if 0 <= state_idx < len(TCP_STATES):
return TCP_STATES[state_idx]
else:
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs] def get_src_port(self):
sport_le = getattr(self, "sport", getattr(self, "inet_sport", None))
if sport_le is not None:
return socket_module.htons(sport_le)
[docs] def get_dst_port(self):
sk_common = self.sk.__sk_common
if hasattr(sk_common, "skc_portpair"):
dport_le = sk_common.skc_portpair & 0xFFFF
elif hasattr(self, "dport"):
dport_le = self.dport
elif hasattr(self, "inet_dport"):
dport_le = self.inet_dport
elif hasattr(sk_common, "skc_dport"):
dport_le = sk_common.skc_dport
else:
return
return socket_module.htons(dport_le)
[docs] def get_src_addr(self):
sk_common = self.sk.__sk_common
family = sk_common.skc_family
if family == socket_module.AF_INET:
addr_size = 4
if hasattr(self, "rcv_saddr"):
saddr = self.rcv_saddr
elif hasattr(self, "inet_rcv_saddr"):
saddr = self.inet_rcv_saddr
else:
saddr = sk_common.skc_rcv_saddr
elif family == socket_module.AF_INET6:
addr_size = 16
saddr = self.pinet6.saddr
else:
return
parent_layer = self._context.layers[self.vol.layer_name]
try:
addr_bytes = parent_layer.read(saddr.vol.offset, addr_size)
except exceptions.InvalidAddressException:
vollog.debug(
f"Unable to read socket src address from {saddr.vol.offset:#x}"
)
return
return socket_module.inet_ntop(family, addr_bytes)
[docs] def get_dst_addr(self):
sk_common = self.sk.__sk_common
family = sk_common.skc_family
if family == socket_module.AF_INET:
if hasattr(self, "daddr") and self.daddr:
daddr = self.daddr
elif hasattr(self, "inet_daddr") and self.inet_daddr:
daddr = self.inet_daddr
else:
daddr = sk_common.skc_daddr
addr_size = 4
elif family == socket_module.AF_INET6:
if hasattr(self.pinet6, "daddr"):
daddr = self.pinet6.daddr
else:
daddr = sk_common.skc_v6_daddr
addr_size = 16
else:
return
parent_layer = self._context.layers[self.vol.layer_name]
try:
addr_bytes = parent_layer.read(daddr.vol.offset, addr_size)
except exceptions.InvalidAddressException:
vollog.debug(
f"Unable to read socket dst address from {daddr.vol.offset:#x}"
)
return
return socket_module.inet_ntop(family, addr_bytes)
[docs]class netlink_sock(objects.StructType):
[docs] def get_protocol(self):
protocol_idx = self.sk.sk_protocol
if 0 <= protocol_idx < len(NETLINK_PROTOCOLS):
return NETLINK_PROTOCOLS[protocol_idx]
[docs] def get_state(self):
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs]class vsock_sock(objects.StructType):
[docs] def get_protocol(self):
# The protocol should always be 0 for vsocks
return
[docs] def get_state(self):
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs]class packet_sock(objects.StructType):
[docs] def get_protocol(self):
eth_proto = socket_module.htons(self.num)
if eth_proto == 0:
return
elif eth_proto in ETH_PROTOCOLS:
return ETH_PROTOCOLS[eth_proto]
else:
return f"0x{eth_proto:x}"
[docs] def get_state(self):
# Return the generic socket state
return self.sk.sk_socket.get_state()
[docs]class bt_sock(objects.StructType):
[docs] def get_protocol(self):
type_idx = self.sk.sk_protocol
if 0 <= type_idx < len(BLUETOOTH_PROTOCOLS):
return BLUETOOTH_PROTOCOLS[type_idx]
[docs] def get_state(self):
state_idx = self.sk.__sk_common.skc_state
if 0 <= state_idx < len(BLUETOOTH_STATES):
return BLUETOOTH_STATES[state_idx]
[docs]class xdp_sock(objects.StructType):
[docs] def get_protocol(self):
# The protocol should always be 0 for xdp_sock
return
[docs] def get_state(self):
# xdp_sock.state is an enum
return self.state.lookup()