Source code for volatility3.framework.symbols.mac.extensions

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import contextlib
import logging
from typing import Generator, Iterable, Optional, Set, Tuple

from volatility3.framework import constants, exceptions, interfaces, objects
from volatility3.framework.objects import utility
from volatility3.framework.renderers import conversion
from volatility3.framework.symbols import generic

vollog = logging.getLogger(__name__)


[docs]class proc(generic.GenericIntelProcess):
[docs] def get_task(self): return self.task.dereference().cast("task")
[docs] def add_process_layer(self, config_prefix: str = None, preferred_name: str = None) -> Optional[str]: """Constructs a new layer based on the process's DTB. Returns the name of the Layer or None. """ parent_layer = self._context.layers[self.vol.layer_name] if not isinstance(parent_layer, interfaces.layers.TranslationLayerInterface): raise TypeError("Parent layer is not a translation layer, unable to construct process layer") try: dtb = self.get_task().map.pmap.pm_cr3 except exceptions.InvalidAddressException: # Bail out because we couldn't find the DTB return None if preferred_name is None: preferred_name = self.vol.layer_name + f"_Process{self.p_pid}" # Add the constructed layer and return the name return self._add_process_layer(self._context, dtb, config_prefix, preferred_name)
[docs] def get_map_iter(self) -> Iterable[interfaces.objects.ObjectInterface]: try: task = self.get_task() current_map = task.map.hdr.links.next except exceptions.InvalidAddressException: return seen: Set[int] = set() for i in range(task.map.hdr.nentries): if (not current_map or current_map.vol.offset in seen or not self._context.layers[task.vol.native_layer_name].is_valid(current_map.dereference().vol.offset, current_map.dereference().vol.size)): vollog.log(constants.LOGLEVEL_VVV, "Breaking process maps iteration due to invalid state.") break # ZP_POISON value used to catch programming errors if current_map.links.start == 0xdeadbeefdeadbeef or current_map.links.end == 0xdeadbeefdeadbeef: break yield current_map seen.add(current_map.vol.offset) current_map = current_map.links.next
###### # ikelos: this breaks with multi threading on, but works with it disabled # with multi threading on, it throws that same error about v4 pickle stuff that linux originally did # the fix for linux was to call int() so that we were not returning vol objects. # I call int() on these and the code works nearly 1-1 with the linux one so I am very confused ######
[docs] def get_process_memory_sections(self, context: interfaces.context.ContextInterface, config_prefix: str, rw_no_file: bool = False) -> \ Generator[Tuple[int, int], None, None]: """Returns a list of sections based on the memory manager's view of this task's virtual memory.""" for vma in self.get_map_iter(): start = int(vma.links.start) end = int(vma.links.end) if rw_no_file: if vma.get_perms() != "rw" or vma.get_path(context, config_prefix) != "": if vma.get_special_path() != "[heap]": continue yield (start, end - start)
[docs]class fileglob(objects.StructType):
[docs] def get_fg_type(self): ret = None if self.has_member("fg_type"): ret = self.fg_type elif self.fg_ops != 0: with contextlib.suppress(exceptions.InvalidAddressException): ret = self.fg_ops.fo_type if ret: ret = str(ret.description).replace("DTYPE_", "") return ret
[docs]class vm_map_object(objects.StructType):
[docs] def get_map_object(self): if self.has_member("vm_object"): return self.vm_object elif self.has_member("vmo_object"): return self.vmo_object raise AttributeError("vm_map_object -> get_object")
[docs]class vnode(objects.StructType): def _do_calc_path(self, ret, vnodeobj, vname): if vnodeobj is None: return if vname: try: ret.append(utility.pointer_to_string(vname, 255)) except exceptions.InvalidAddressException: return if int(vnodeobj.v_flag) & 0x000001 != 0 and int(vnodeobj.v_mount) != 0: if int(vnodeobj.v_mount.mnt_vnodecovered) != 0: self._do_calc_path(ret, vnodeobj.v_mount.mnt_vnodecovered, vnodeobj.v_mount.mnt_vnodecovered.v_name) else: try: parent = vnodeobj.v_parent parent_name = parent.v_name except exceptions.InvalidAddressException: return self._do_calc_path(ret, parent, parent_name)
[docs] def full_path(self): if self.v_flag & 0x000001 != 0 and self.v_mount != 0 and self.v_mount.mnt_flag & 0x00004000 != 0: ret = b"/" else: elements = [] files = [] self._do_calc_path(elements, self, self.v_name) elements.reverse() for e in elements: files.append(e.encode("utf-8")) ret = b"/".join(files) if ret: ret = b"/" + ret return ret.decode("utf-8")
[docs]class vm_map_entry(objects.StructType):
[docs] def is_suspicious(self, context, config_prefix): """Flags memory regions that are mapped rwx or that map an executable not back from a file on disk.""" ret = False perms = self.get_perms() if perms == "rwx": ret = True elif perms == "r-x" and self.get_path(context, config_prefix) == "": ret = True return ret
[docs] def get_perms(self): permask = "rwx" perms = "" for (ctr, i) in enumerate([1, 3, 5]): if (self.protection & i) == i: perms = perms + permask[ctr] else: perms = perms + "-" return perms
[docs] def get_range_alias(self): if self.has_member("alias"): ret = int(self.alias) else: ret = int(self.vme_offset) & 0xfff return ret
[docs] def get_special_path(self): check = self.get_range_alias() if 0 < check < 10: ret = "[heap]" elif check == 30: ret = "[stack]" else: ret = "" return ret
[docs] def get_path(self, context, config_prefix): node = self.get_vnode(context, config_prefix) if type(node) == str and node == "sub_map": ret = node elif node: path = [] seen: Set[int] = set() while node and node.vol.offset not in seen: try: v_name = utility.pointer_to_string(node.v_name, 255) except exceptions.InvalidAddressException: break path.append(v_name) if len(path) > 1024: break seen.add(node.vol.offset) node = node.v_parent path.reverse() ret = "/" + "/".join(path) else: ret = "" return ret
[docs] def get_object(self): if self.has_member("vme_object"): return self.vme_object elif self.has_member("object"): return self.object raise AttributeError("vm_map_entry -> get_object: Unable to determine object")
[docs] def get_offset(self): if self.has_member("vme_offset"): return self.vme_offset elif self.has_member("offset"): return self.offset raise AttributeError("vm_map_entry -> get_offset: Unable to determine offset")
[docs] def get_vnode(self, context, config_prefix): if self.is_sub_map == 1: return "sub_map" # based on find_vnode_object vnode_object = self.get_object().get_map_object() if vnode_object == 0: return None found_end = False while not found_end: try: tmp_vnode_object = vnode_object.shadow.dereference() except exceptions.InvalidAddressException: break if tmp_vnode_object.vol.offset == 0: found_end = True else: vnode_object = tmp_vnode_object if vnode_object.vol.offset == 0: return None try: pager = vnode_object.pager if pager == 0: return None ops = pager.mo_pager_ops.dereference() except exceptions.InvalidAddressException: return None found = False for sym in context.symbol_space.get_symbols_by_location(ops.vol.offset): if sym.split(constants.BANG)[1] in ["vnode_pager_ops", "_vnode_pager_ops"]: found = True break if found: vpager = context.object(config_prefix + constants.BANG + "vnode_pager", layer_name = vnode_object.vol.native_layer_name, offset = vnode_object.pager) ret = vpager.vnode_handle else: ret = None return ret
[docs]class socket(objects.StructType):
[docs] def get_inpcb(self): try: ret = self.so_pcb.dereference().cast("inpcb") except exceptions.InvalidAddressException: ret = None return ret
[docs] def get_family(self): return self.so_proto.pr_domain.dom_family
[docs] def get_protocol_as_string(self): proto = self.so_proto.pr_protocol if proto == 6: ret = "TCP" elif proto == 17: ret = "UDP" else: ret = "" return ret
[docs] def get_state(self): ret = "" if self.so_proto.pr_protocol == 6: inpcb = self.get_inpcb() if inpcb is not None: ret = inpcb.get_tcp_state() return ret
[docs] def get_connection_info(self): inpcb = self.get_inpcb() if inpcb is None: ret = None elif self.get_family() == 2: ret = inpcb.get_ipv4_info() else: ret = inpcb.get_ipv6_info() return ret
[docs] def get_converted_connection_info(self): vals = self.get_connection_info() if vals: ret = conversion.convert_network_four_tuple(self.get_family(), vals) else: ret = None return ret
[docs]class inpcb(objects.StructType):
[docs] def get_tcp_state(self): tcp_states = ("CLOSED", "LISTEN", "SYN_SENT", "SYN_RECV", "ESTABLISHED", "CLOSE_WAIT", "FIN_WAIT1", "CLOSING", "LAST_ACK", "FIN_WAIT2", "TIME_WAIT") try: tcpcb = self.inp_ppcb.dereference().cast("tcpcb") except exceptions.InvalidAddressException: return "" state_type = tcpcb.t_state if state_type and state_type < len(tcp_states): state = tcp_states[state_type] else: state = "" return state
[docs] def get_ipv4_info(self): try: lip = self.inp_dependladdr.inp46_local.ia46_addr4.s_addr except exceptions.InvalidAddressException: return None lport = self.inp_lport try: rip = self.inp_dependfaddr.inp46_foreign.ia46_addr4.s_addr except exceptions.InvalidAddressException: return None rport = self.inp_fport return [lip, lport, rip, rport]
[docs] def get_ipv6_info(self): try: lip = self.inp_dependladdr.inp6_local.member(attr = '__u6_addr').member(attr = '__u6_addr32') except exceptions.InvalidAddressException: return None lport = self.inp_lport try: rip = self.inp_dependfaddr.inp6_foreign.member(attr = '__u6_addr').member(attr = '__u6_addr32') except exceptions.InvalidAddressException: return None rport = self.inp_fport return [lip, lport, rip, rport]
[docs]class queue_entry(objects.StructType):
[docs] def walk_list(self, list_head: interfaces.objects.ObjectInterface, member_name: str, type_name: str, max_size: int = 4096) -> Iterable[interfaces.objects.ObjectInterface]: """ Walks a queue in a smear-aware and smear-resistant manner smear is detected by: - the max_size parameter sets an upper bound - each seen entry is only allowed once attempts to work around smear: - the list is walked in both directions to help find as many elements as possible Args: list_head - the head of the list member_name - the name of the embedded list member type_name - the type of each element in the list max_size - the maximum amount of elements that will be returned Returns: Each instance of the queue cast as "type_name" type """ yielded = 0 seen = set() for attr in ['next', 'prev']: with contextlib.suppress(exceptions.InvalidAddressException): n = getattr(self, attr).dereference().cast(type_name) while n is not None and n.vol.offset != list_head: if n.vol.offset in seen: break yield n seen.add(n.vol.offset) yielded = yielded + 1 if yielded == max_size: return n = getattr(n.member(attr = member_name), attr).dereference().cast(type_name)
[docs]class ifnet(objects.StructType):
[docs] def sockaddr_dl(self): if self.has_member("if_lladdr"): try: val = self.if_lladdr.ifa_addr.dereference().cast("sockaddr_dl") except exceptions.InvalidAddressException: val = None else: try: val = self.if_addrhead.tqh_first.ifa_addr.dereference().cast("sockaddr_dl") except exceptions.InvalidAddressException: val = None return val
# this is used for MAC addresses
[docs]class sockaddr_dl(objects.StructType): def __str__(self): ret = "" if self.sdl_alen > 14: return ret for i in range(self.sdl_alen): try: e = self.sdl_data[self.sdl_nlen + i] except IndexError: break e = e.cast("unsigned char") ret = ret + f"{e:02X}:" if ret and ret[-1] == ":": ret = ret[:-1] return ret
[docs]class sockaddr(objects.StructType):
[docs] def get_address(self): ip = "" family = self.sa_family if family == 2: # AF_INET addr_in = self.cast("sockaddr_in") ip = conversion.convert_ipv4(addr_in.sin_addr.s_addr) elif family == 30: # AF_INET6 addr_in6 = self.cast("sockaddr_in6") ip = conversion.convert_ipv6(addr_in6.sin6_addr.member(attr = "__u6_addr").member(attr = "__u6_addr32")) elif family == 18: # AF_LINK addr_dl = self.cast("sockaddr_dl") ip = str(addr_dl) return ip
[docs]class sysctl_oid(objects.StructType):
[docs] def get_perms(self) -> str: """ Returns the actions allowed on the node Args: None Returns: A combination of: R - readable W - writeable L - self handles locking """ ret = "" checks = [0x80000000, 0x40000000, 0x00800000] perms = ["R", "W", "L"] for (i, c) in enumerate(checks): if c & self.oid_kind: ret = ret + perms[i] else: ret = ret + "-" return ret
[docs] def get_ctltype(self) -> str: """ Returns the type of the sysctl node Args: None Returns: One of: CTLTYPE_NODE CTLTYPE_INT CTLTYPE_STRING CTLTYPE_QUAD CTLTYPE_OPAQUE an empty string for nodes not in the above types Based on sysctl_sysctl_debug_dump_node """ types = {1: 'CTLTYPE_NODE', 2: 'CTLTYPE_INT', 3: 'CTLTYPE_STRING', 4: 'CTLTYPE_QUAD', 5: 'CTLTYPE_OPAQUE'} ctltype = self.oid_kind & 0xf if 0 < ctltype < 6: ret = types[ctltype] else: ret = "" return ret
[docs]class kauth_scope(objects.StructType):
[docs] def get_listeners(self): for listener in self.ks_listeners: if listener != 0 and listener.kll_callback != 0: yield listener