# This file is Copyright 2022 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from typing import Iterable, Tuple
from volatility3.framework import renderers, interfaces
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.objects import utility
from volatility3.plugins.linux import pslist
vollog = logging.getLogger(__name__)
[docs]
class Envars(plugins.PluginInterface):
"""Lists processes with their environment variables"""
_required_framework_version = (2, 13, 0)
_version = (2, 0, 1)
[docs]
@classmethod
def get_requirements(cls):
# Since we're calling the plugin, make sure we have the plugin's requirements
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="pslist", component=pslist.PsList, version=(4, 0, 0)
),
requirements.ListRequirement(
name="pid",
description="Filter on specific process IDs",
element_type=int,
optional=True,
),
]
[docs]
@classmethod
def get_task_env_variables(
cls,
context: interfaces.context.ContextInterface,
task: interfaces.objects.ObjectInterface,
env_area_max_size: int = 8192,
) -> Iterable[Tuple[str, str]]:
"""Yields environment variables for a given task.
Args:
context: The plugin's operational context.
task: The task object from which to extract environment variables.
env_area_max_size: Maximum allowable size for the environment variables area.
Tasks exceeding this size will be skipped. Default is 8192.
Yields:
Tuples of (key, value) representing each environment variable.
"""
task_name = utility.array_to_string(task.comm)
task_pid = task.pid
env_start = task.mm.env_start
env_end = task.mm.env_end
env_area_size = env_end - env_start
if not (0 < env_area_size <= env_area_max_size):
vollog.debug(
f"Task {task_pid} {task_name} appears to have environment variables of size "
f"{env_area_size} bytes which fails the sanity checking, will not extract "
"any envars."
)
return None
# Get process layer to read envars from
proc_layer_name = task.add_process_layer()
if proc_layer_name is None:
return None
proc_layer = context.layers[proc_layer_name]
# Ensure the entire buffer is readable to prevent relying on exception handling
if not proc_layer.is_valid(env_start, env_area_size):
# Not mapped / swapped out
vollog.debug(
f"Unable to read environment variables for {task_pid} {task_name} starting at "
f" virtual address 0x{env_start:x} for {env_area_size} bytes, will not "
"extract any envars."
)
return None
# Read the full task environment variable buffer.
envar_data = proc_layer.read(env_start, env_area_size)
# Parse envar data, envars are null terminated, keys and values are separated by '='
envar_data = envar_data.rstrip(b"\x00")
for envar_pair in envar_data.split(b"\x00"):
try:
env_key, env_value = envar_pair.decode(
encoding="utf8", errors="replace"
).split("=", 1)
except ValueError:
# Some legitimate programs, like 'avahi-daemon', avoid reallocating the args
# and instead exploit the fact that the environment variables area is contiguous
# to the args. This allows them to include a longer process name in the listing,
# causing overwrites and incorrect results. In such cases, it's better to abort
# the current task rather than displaying misleading or incorrect output.
break
yield env_key, env_value
def _generator(self, tasks):
"""Generates a listing of processes along with environment variables"""
# walk the process list and return the envars
for task in tasks:
if task.is_kernel_thread:
continue
task_pid = task.pid
task_name = utility.array_to_string(task.comm)
task_ppid = task.get_parent_pid()
for env_key, env_value in self.get_task_env_variables(self.context, task):
yield (0, (task_pid, task_ppid, task_name, env_key, env_value))
[docs]
def run(self):
filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))
tasks = pslist.PsList.list_tasks(
self.context, self.config["kernel"], filter_func=filter_func
)
headers = [
("PID", int),
("PPID", int),
("COMM", str),
("KEY", str),
("VALUE", str),
]
return renderers.TreeGrid(headers, self._generator(tasks))