Source code for volatility3.cli.text_renderer

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import csv
import datetime
import itertools
import json
import logging
import random
import string
import sys
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar, Union

from volatility3.cli import text_filter
from volatility3.framework import exceptions, interfaces, renderers
from volatility3.framework.renderers import format_hints

vollog = logging.getLogger(__name__)

try:
    CAPSTONE_PRESENT = True
    import capstone
except ImportError:
    CAPSTONE_PRESENT = False
    vollog.debug("Disassembly library capstone not found")


[docs] def hex_bytes_as_text(value: bytes, width: int = 16) -> str: """Renders HexBytes as text. Args: value: A series of bytes to convert to text Returns: A text representation of the hexadecimal bytes plus their ascii equivalents, separated by newline characters """ if not isinstance(value, bytes): raise TypeError(f"hex_bytes_as_text takes bytes not: {type(value)}") printables = "" output = "\n" for count, byte in enumerate(value): output += f"{byte:02x} " char = chr(byte) printables += char if 0x20 <= byte <= 0x7E else "." if count % width == width - 1: output += printables if count < len(value) - 1: output += "\n" printables = "" # Handle leftovers when the length is not a multiple of width if printables: padding = width - len(printables) output += " " * padding output += printables output += " " * padding return output
[docs] def multitypedata_as_text(value: format_hints.MultiTypeData) -> str: """Renders the bytes as a string where possible, otherwise it displays hex data This attempts to convert the string based on its encoding and if no data's been lost due to the split on the null character, then it displays it as is """ if value.show_hex: return hex_bytes_as_text(value) string_representation = str(value, encoding=value.encoding, errors="replace") if value.split_nulls and ( (len(value) / 2 - 1) <= len(string_representation) <= (len(value) / 2) ): return "\n".join(string_representation.split("\x00")) if ( len(string_representation) - 1 <= len(string_representation.split("\x00")[0]) <= len(string_representation) ): return string_representation.split("\x00")[0] return hex_bytes_as_text(value)
T = TypeVar("T")
[docs] def optional( func: Callable[[Union[interfaces.renderers.BaseAbsentValue, T]], str], ) -> Callable[[T], str]: @wraps(func) def wrapped(x: Any) -> str: if isinstance(x, interfaces.renderers.BaseAbsentValue): if isinstance(x, renderers.NotApplicableValue): return "N/A" else: return "-" return func(x) return wrapped
[docs] def quoted_optional(func: Callable) -> Callable: @wraps(func) def wrapped(x: Any) -> str: result = optional(func)(x) if result == "-" or result == "N/A": return "" if isinstance(x, format_hints.MultiTypeData) and x.converted_int: return f"{result}" if isinstance(x, int) and not isinstance( x, (format_hints.Hex, format_hints.Bin) ): return f"{result}" return f'"{result}"' return wrapped
[docs] def display_disassembly(disasm: renderers.Disassembly) -> str: """Renders a disassembly renderer type into string format. Args: disasm: Input disassembly objects Returns: A string as rendered by capstone where available, otherwise output as if it were just bytes """ if CAPSTONE_PRESENT: disasm_types = { "intel": capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32), "intel64": capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64), "arm": capstone.Cs(capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM), "arm64": capstone.Cs(capstone.CS_ARCH_ARM64, capstone.CS_MODE_ARM), } output = "" if disasm.architecture is not None: for i in disasm_types[disasm.architecture].disasm( disasm.data, disasm.offset ): output += f"\n{i.address:#x}:\t{i.mnemonic}\t{i.op_str}" return output return QuickTextRenderer._type_renderers[bytes](disasm.data)
[docs] class CLITypeRenderer(interfaces.renderers.TypeRendererInterface): def __init__(self, func): super().__init__(func=optional(func))
[docs] class LayerDataRenderer(CLITypeRenderer): """Renders a LayerData object into data/bytes""" def __init__(self): self.context_byte_len = 0 self.width = 16 self.display_offset = False self.display_hex = True self.display_ascii = True def render( data: Union[renderers.LayerData, interfaces.renderers.BaseAbsentValue], ) -> str: if isinstance(data, interfaces.renderers.BaseAbsentValue): # FIXME: Do something cleverer here return "" specific_data, error_bytes = self.render_bytes(data) printables = "" output = "\n" for count, byte in enumerate(specific_data): if count not in error_bytes: output += f"{byte:02x} " char = chr(byte) printables += char if 0x20 <= byte <= 0x7E else "." else: output += "__ " printables += "." if count % self.width == self.width - 1: output += printables if count < len(specific_data) - 1: output += "\n" printables = "" # Handle leftovers when the length is not a multiple of width if printables: padding = self.width - len(printables) output += " " * padding output += printables output += " " * padding return output render_func = render return super().__init__(render_func)
[docs] def render_bytes(self, data: renderers.LayerData) -> Tuple[bytes, Set[int]]: """Renders a valid LayerData into bytes (with context bytes)""" context_byte_len = self.context_byte_len if not data.no_surrounding else 0 layer = data.context.layers[data.layer_name] # Map of the holes error_bytes = set() start_offset = data.offset - context_byte_len end_offset = data.offset + data.length + context_byte_len if isinstance(layer, interfaces.layers.TranslationLayerInterface): error_bytes = set() mapping = iter(layer.mapping(start_offset, end_offset, True)) current_map = next(mapping) for i in range(start_offset, end_offset): # Run through the bytes, check if they're present offset, sublength, _, _, _ = current_map if i < offset: error_bytes.add(i - start_offset) if i > offset + sublength: try: current_map = next(mapping) except StopIteration: pass offset, sublength, _, _, _ = current_map if i > offset + sublength: error_bytes.add(i - start_offset) # Padded data specific_data = data.context.layers[data.layer_name].read( start_offset, end_offset - start_offset, True, ) return specific_data, error_bytes
[docs] class CLIRenderer(interfaces.renderers.Renderer): """Class to add specific requirements for CLI renderers.""" _type_renderers = { format_hints.Bin: CLITypeRenderer(lambda x: f"0b{x:b}"), format_hints.Hex: CLITypeRenderer(lambda x: f"0x{x:x}"), format_hints.HexBytes: CLITypeRenderer(hex_bytes_as_text), format_hints.MultiTypeData: CLITypeRenderer(multitypedata_as_text), renderers.Disassembly: CLITypeRenderer(display_disassembly), bytes: CLITypeRenderer(lambda x: " ".join(f"{b:02x}" for b in x)), renderers.LayerData: LayerDataRenderer(), datetime.datetime: CLITypeRenderer( lambda x: x.strftime("%Y-%m-%d %H:%M:%S.%f %Z") ), "default": CLITypeRenderer(lambda x: f"{x}"), } name = "unnamed" structured_output = False filter: Optional[text_filter.CLIFilter] = None column_hide_list: Optional[list] = None
[docs] def ignored_columns( self, grid: interfaces.renderers.TreeGrid, ) -> List[interfaces.renderers.Column]: ignored_column_list = [] if self.column_hide_list: for column in grid.columns: accept = True for column_prefix in self.column_hide_list: if column.name.lower().startswith(column_prefix.lower()): accept = False if not accept: ignored_column_list.append(column) elif self.column_hide_list is None: return [] if len(ignored_column_list) == len(grid.columns): raise exceptions.RenderException("No visible columns to render") vollog.info( f"Hiding columns: {[column.name for column in ignored_column_list]}" ) return ignored_column_list
[docs] class QuickTextRenderer(CLIRenderer): name = "quick"
[docs] def get_render_options(self): return []
[docs] def render(self, grid: interfaces.renderers.TreeGrid) -> None: """Renders each column immediately to stdout. This does not format each line's width appropriately, it merely tab separates each field Args: grid: The TreeGrid object to render """ # TODO: Docstrings # TODO: Improve text output outfd = sys.stdout line = [] ignore_columns = self.ignored_columns(grid) for column in grid.columns: # Ignore the type because namedtuples don't realize they have accessible attributes if column not in ignore_columns: line.append(f"{column.name}") outfd.write("\n{}\n".format("\t".join(line))) def visitor(node: interfaces.renderers.TreeNode, accumulator): line = [] for column_index, column in enumerate(grid.columns): renderer = self._type_renderers.get( column.type, self._type_renderers["default"] ) if column not in ignore_columns: line.append(renderer(node.values[column_index])) if self.filter and self.filter.filter(line): return accumulator accumulator.write("\n") # Nodes always have a path value, giving them a path_depth of at least 1, we use max just in case accumulator.write( "*" * max(0, node.path_depth - 1) + ("" if (node.path_depth <= 1) else " ") ) accumulator.write("{}".format("\t".join(line))) accumulator.flush() return accumulator if not grid.populated: grid.populate(visitor, outfd) else: grid.visit(node=None, function=visitor, initial_accumulator=outfd) outfd.write("\n")
[docs] class NoneRenderer(CLIRenderer): """Outputs no results""" name = "none"
[docs] def get_render_options(self): return []
[docs] def render(self, grid: interfaces.renderers.TreeGrid) -> None: if not grid.populated: grid.populate(lambda x, y: True, True)
[docs] class CSVRenderer(CLIRenderer): name = "csv" structured_output = True
[docs] def get_render_options(self): return []
[docs] def render(self, grid: interfaces.renderers.TreeGrid) -> None: """Renders each row immediately to stdout. Args: grid: The TreeGrid object to render """ outfd = sys.stdout ignore_columns = self.ignored_columns(grid) header_list = ["TreeDepth"] for column in grid.columns: # Ignore the type because namedtuples don't realize they have accessible attributes if column not in ignore_columns: header_list.append(f"{column.name}") writer = csv.DictWriter( outfd, header_list, lineterminator="\n", escapechar="\\" ) writer.writeheader() def visitor(node: interfaces.renderers.TreeNode, accumulator): # Nodes always have a path value, giving them a path_depth of at least 1, we use max just in case row = {"TreeDepth": str(max(0, node.path_depth - 1))} line = [] for column_index, column in enumerate(grid.columns): renderer = self._type_renderers.get( column.type, self._type_renderers["default"] ) row[f"{column.name}"] = renderer(node.values[column_index]) if column not in ignore_columns: line.append(row[f"{column.name}"]) else: del row[f"{column.name}"] if self.filter and self.filter.filter(line): return accumulator accumulator.writerow(row) return accumulator if not grid.populated: grid.populate(visitor, writer) else: grid.visit(node=None, function=visitor, initial_accumulator=writer) outfd.write("\n")
[docs] class PrettyTextRenderer(CLIRenderer): name = "pretty"
[docs] def get_render_options(self): return []
[docs] def render(self, grid: interfaces.renderers.TreeGrid) -> None: """Renders each column immediately to stdout. This does not format each line's width appropriately, it merely tab separates each field Args: grid: The TreeGrid object to render """ # TODO: Docstrings # TODO: Improve text output outfd = sys.stdout sys.stderr.write("Formatting...\n") ignore_columns = self.ignored_columns(grid) display_alignment = ">" column_separator = " | " tree_indent_column = "".join( random.choices(string.ascii_uppercase + string.digits, k=20) ) max_column_widths = dict( [(column.name, len(column.name)) for column in grid.columns] ) def visitor( node: interfaces.renderers.TreeNode, accumulator: List[Tuple[int, Dict[interfaces.renderers.Column, bytes]]], ) -> List[Tuple[int, Dict[interfaces.renderers.Column, bytes]]]: # Nodes always have a path value, giving them a path_depth of at least 1, we use max just in case max_column_widths[tree_indent_column] = max( max_column_widths.get(tree_indent_column, 0), node.path_depth ) line = {} rendered_line = [] for column_index, column in enumerate(grid.columns): renderer = self._type_renderers.get( column.type, self._type_renderers["default"] ) data = renderer(node.values[column_index]) field_width = max( [len(self.tab_stop(x)) for x in f"{data}".split("\n")] ) max_column_widths[column.name] = max( max_column_widths.get(column.name, len(column.name)), field_width ) if column not in ignore_columns: line[column] = data.split("\n") rendered_line.append(data) if self.filter and self.filter.filter(rendered_line): return accumulator accumulator.append((node.path_depth, line)) return accumulator final_output: List[ Tuple[int, Dict[interfaces.renderers.Column, list[str]]] ] = [] if not grid.populated: grid.populate(visitor, final_output) else: grid.visit(node=None, function=visitor, initial_accumulator=final_output) # Always align the tree to the left format_string_list = [ "{0:<" + str(max_column_widths.get(tree_indent_column, 0)) + "s}" ] column_offset = 0 for column_index, column in enumerate(grid.columns): if column not in ignore_columns: format_string_list.append( "{" + str(column_index - column_offset + 1) + ":" + display_alignment + str(max_column_widths[column.name]) + "s}" ) else: column_offset += 1 format_string = column_separator.join(format_string_list) + "\n" column_titles = [""] + [ column.name for column in grid.columns if column not in ignore_columns ] outfd.write(format_string.format(*column_titles)) for depth, line in final_output: nums_line = max([len(line[column]) for column in line]) for column in line: if column in ignore_columns: del line[column] else: line[column] = line[column] + ( [""] * (nums_line - len(line[column])) ) for index in range(nums_line): if index == 0: outfd.write( format_string.format( "*" * depth, *[self.tab_stop(line[column][index]) for column in line], ) ) else: outfd.write( format_string.format( " " * depth, *[self.tab_stop(line[column][index]) for column in line], ) )
[docs] def tab_stop(self, line: str) -> str: tab_width = 8 while line.find("\t") >= 0: i = line.find("\t") pad = " " * (tab_width - (i % tab_width)) line = line.replace("\t", pad, 1) return line
[docs] class JsonRenderer(CLIRenderer): _type_renderers = { format_hints.HexBytes: lambda x: ( x.hex(" ") if not isinstance(x, interfaces.renderers.BaseAbsentValue) else "N/A" ), renderers.Disassembly: quoted_optional(display_disassembly), format_hints.MultiTypeData: quoted_optional(multitypedata_as_text), renderers.LayerData: lambda x: ( LayerDataRenderer().render_bytes(x)[0].hex(" ") if not isinstance(x, interfaces.renderers.BaseAbsentValue) else "N/A" ), bytes: optional(lambda x: " ".join(f"{b:02x}" for b in x)), datetime.datetime: lambda x: ( x.isoformat() if not isinstance(x, interfaces.renderers.BaseAbsentValue) else None ), "default": lambda x: x, } name = "JSON" structured_output = True
[docs] def get_render_options(self) -> List[interfaces.renderers.RenderOption]: return []
[docs] def output_result(self, outfd, result): """Outputs the JSON data to a file in a particular format""" outfd.write(f"{json.dumps(result, indent=2, sort_keys=True)}\n")
[docs] def render(self, grid: interfaces.renderers.TreeGrid): outfd = sys.stdout outfd.write("\n") final_output: Tuple[ Dict[str, List[interfaces.renderers.TreeNode]], List[interfaces.renderers.TreeNode], ] = ({}, []) ignore_columns = self.ignored_columns(grid) def visitor( node: interfaces.renderers.TreeNode, accumulator: Tuple[Dict[str, Dict[str, Any]], List[Dict[str, Any]]], ) -> Tuple[Dict[str, Dict[str, Any]], List[Dict[str, Any]]]: # Nodes always have a path value, giving them a path_depth of at least 1, we use max just in case acc_map, final_tree = accumulator node_dict: Dict[str, Any] = {"__children": []} line = [] for column_index, column in enumerate(grid.columns): if column in ignore_columns: continue renderer = self._type_renderers.get( column.type, self._type_renderers["default"] ) data = renderer(list(node.values)[column_index]) if isinstance(data, interfaces.renderers.BaseAbsentValue): data = None node_dict[column.name] = data line.append(data) if self.filter and self.filter.filter(line): return accumulator # Only add if the parent hasn't been filtered out if node.parent and node.parent.path in acc_map: acc_map[node.parent.path]["__children"].append(node_dict) else: final_tree.append(node_dict) acc_map[node.path] = node_dict return (acc_map, final_tree) if not grid.populated: grid.populate(visitor, final_output) else: grid.visit(node=None, function=visitor, initial_accumulator=final_output) self.output_result(outfd, final_output[1])
[docs] class JsonLinesRenderer(JsonRenderer): name = "JSONL"
[docs] def output_result(self, outfd, result): """Outputs the JSON results as JSON lines""" for line in result: outfd.write(json.dumps(line, sort_keys=True)) outfd.write("\n")
[docs] class MermaidRenderer(CLIRenderer): _type_renderers = { format_hints.Bin: optional(lambda x: f"0b{x:b}"), format_hints.Hex: optional(lambda x: f"0x{x:x}"), format_hints.HexBytes: optional(hex_bytes_as_text), format_hints.MultiTypeData: optional(multitypedata_as_text), interfaces.renderers.Disassembly: optional(display_disassembly), bytes: optional(lambda x: " ".join([f"{b:02x}" for b in x])), datetime.datetime: optional(lambda x: x.strftime("%Y-%m-%d %H:%M:%S.%f %Z")), "default": optional(lambda x: f"{x}"), } name = "mermaid" structured_output = True @staticmethod def _mermaid_label(text: str) -> str: """Escape a value for use inside a Mermaid node label (``["..."]``). Double quotes terminate the label, so they must be replaced with the Mermaid-supported entity. Newlines inside cell renderings are converted to ``<br>`` so each row remains a single Mermaid node. """ return text.replace('"', "&quot;").replace("\n", "<br>")
[docs] def get_render_options(self): pass
[docs] def render(self, grid: interfaces.renderers.TreeGrid) -> None: """Render the TreeGrid as a Mermaid ``graph TD`` flowchart. The renderer is plugin-agnostic: it derives the parent/child relationship from each node's ``path_depth`` in the grid, rather than from any particular column (such as PID/PPID). This means any tree-shaped plugin output -- pstree, vadwalk, handles tree, future plugins -- renders without modification. The algorithm maintains a parent stack while walking the rows in traversal order: * descending one or more levels pushes the previously-emitted node onto the stack (once per level descended) so it becomes the current parent; * ascending pops the same number of levels off the stack; * the top of the stack is always the parent of the next emitted node, or empty for a root-level node. Args: grid: The TreeGrid object to render """ outfd = sys.stdout sys.stderr.write("Formatting...\n") def format_row(node: interfaces.renderers.TreeNode) -> str: """Build a Mermaid node label from every column of ``node``.""" cells = [] for column_index, column in enumerate(grid.columns): renderer = self._type_renderers.get( column.type, self._type_renderers["default"] ) value = renderer(node.values[column_index]) cells.append(f"{column.name}:{self._mermaid_label(value)}") return "<br>".join(cells) rows: List[Tuple[int, str]] = [] def visitor( node: interfaces.renderers.TreeNode, accumulator: List[Tuple[int, str]], ) -> List[Tuple[int, str]]: accumulator.append((node.path_depth, format_row(node))) return accumulator if not grid.populated: grid.populate(visitor, rows) else: grid.visit(node=None, function=visitor, initial_accumulator=rows) # Stable, unique per-node IDs. We never reuse a column value (e.g. # PID) because (a) PID is not guaranteed unique across a TreeGrid, # (b) it is plugin-specific, and (c) Mermaid IDs must avoid # characters like parentheses that may appear in column data. node_ids = itertools.count(1) parent_stack: List[str] = [] prev_depth = 0 prev_id: Optional[str] = None lines: List[str] = ["graph TD"] for depth, label in rows: node_id = f"n{next(node_ids)}" if prev_id is not None: if depth > prev_depth: # Descended one or more levels. Push prev_id once per # level so subsequent pops align even when the tree # skips levels (e.g. depth 1 -> depth 3). for _ in range(depth - prev_depth): parent_stack.append(prev_id) elif depth < prev_depth: for _ in range(prev_depth - depth): if parent_stack: parent_stack.pop() # depth == prev_depth: sibling, keep the same parent if parent_stack: parent = parent_stack[-1] lines.append(f'\t{parent} --> {node_id}["{label}"]') else: # Root-level node: declare it on its own. lines.append(f'\t{node_id}["{label}"]') prev_id = node_id prev_depth = depth outfd.write("\n".join(lines) + "\n")