Source code for volatility3.plugins.windows.registry.scheduled_tasks

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import base64
import binascii
import dataclasses
import datetime
import enum
import io
import itertools
import logging
import struct
from typing import Dict, Iterator, List, Optional, Tuple, Union

from volatility3.framework import exceptions, interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.layers import registry
from volatility3.framework.renderers import conversion
from volatility3.framework.symbols.windows.extensions import registry as reg_extensions
from volatility3.plugins import timeliner
from volatility3.plugins.windows.registry import hivelist

vollog = logging.getLogger(__name__)

# Reference: https://cyber.wtf/2022/06/01/windows-registry-analysis-todays-episode-tasks/


[docs] class TimeMode(enum.Enum): """ Enumeration containing the different time modes that a 'Time' trigger can be configured to run in. """ Once = "Once" # run at <start_boundary> and repeat every <data1> days Daily = "Daily" # run on days of week <(data2 as day_of_week bitmap)> every <data1> weeks starting at <start_boundary> Weekly = "Weekly" # run in months <(data3 as months bitmap> on days <(data2:data1 as day in month bitmap)> # starting at <start_boundary> DaysInMonths = "Days In Months" # run in months <(data3 as months bitmap> in weeks <(data2 as week bitmap)> # on days <(data1 as day_of_week bitmap)> starting at <start_boundary> DaysInWeeksInMonths = "Days In Weeks in Months" Unknown = "Unknown"
TIME_MODE_DESCRIPTION = { TimeMode.Once: "Once", TimeMode.Daily: "Daily", TimeMode.Weekly: "Weekly", TimeMode.DaysInMonths: "Days In Months", TimeMode.DaysInWeeksInMonths: "Days In Weeks In Months", TimeMode.Unknown: "Unknown", }
[docs] class ActionType(enum.Enum): """ Enumeration that maps action types to their magic number encodings """ Exe = 0x6666 ComHandler = 0x7777 Email = 0x8888 MessageBox = 0x9999
[docs] class TriggerType(enum.Enum): """ Enumeration that maps trigger types to their magic number encodings """ WindowsNotificationFacility = 0x6666 Session = 0x7777 Registration = 0x8888 Logon = 0xAAAA Event = 0xCCCC Time = 0xDDDD Idle = 0xEEEE Boot = 0xFFFF
[docs] class Weekday(enum.Enum): """ Enumeration that contains bitwise values for days of the week. """ Sunday = 0x1 Monday = 0x2 Tuesday = 0x4 Wednesday = 0x8 Thursday = 0x10 Friday = 0x20 Saturday = 0x40
[docs] class Months(enum.Enum): """ Enumeration that contains bitwise values for months of the year. """ January = 0x1 February = 0x2 March = 0x4 April = 0x8 May = 0x10 June = 0x20 July = 0x40 August = 0x80 September = 0x100 October = 0x200 November = 0x400 December = 0x800
[docs] class SidType(enum.Enum): """ Enumeration that maps SID types to their encoded integer values """ User = 1 Group = 2 Domain = 3 Alias = 4 WellKnownGroup = 5 DeletedAccount = 6 Invalid = 7 Unknown = 8 Computer = 9 Label = 10 LogonSession = 11
[docs] @dataclasses.dataclass class TaskSchedulerTimePeriod: """ Class containing information delimiting time periods within scheduled tasks. """ years: int months: int weeks: int days: int hours: int minutes: int seconds: int
JOB_BUCKET_FLAGS = { 0x2: "Run only if idle", 0x4: "Restart on idle", 0x8: "Stop on idle end", 0x10: "Disallow start if on batteries", 0x20: "Stop if going on batteries", 0x40: "Start when available", 0x80: "Run only if network available", 0x100: "Allow start on demand", 0x200: "Wake to run", 0x400: "Execute parallel", 0x800: "Execute stop existing", 0x1000: "Execute queue", 0x2000: "Execute ignore new", 0x4000: "Logon type s4u", 0x10000: "Logon type InteractiveToken", 0x40000: "Logon type Password", 0x80000: "Logon type InteractiveTokenOrPassword", 0x400000: "Enabled", 0x800000: "Hidden", 0x1000000: "Runlevel highest available", 0x2000000: "Task", 0x4000000: "Version", 0x8000000: "Token SID type none", 0x10000000: "Token SID type unrestricted", 0x20000000: "Interval", 0x40000000: "Allow hard terminate", } NULL = "\u0000" class _ScheduledTasksReader(io.BytesIO): def read_task_scheduler_time(self) -> Optional[datetime.datetime]: _ = bool(self.read_aligned_u1()) # is_localized filetime = self.decode_filetime() if filetime is None: return None return filetime def read_bool(self, aligned=False) -> Optional[bool]: try: val = struct.unpack("?", self.read(1))[0] if aligned: self.seek(7) return val except struct.error: return None def decode_filetime(self) -> Optional[datetime.datetime]: filetime = self.read_u8() if filetime is None: return None if filetime == 0 or filetime == 0xFFFFFFFFFFFFFFFF: return None filetime = conversion.wintime_to_datetime(filetime) if isinstance(filetime, datetime.datetime): return filetime else: return None def _read_uint( self, size: int, format: str, aligned: bool = False ) -> Optional[int]: try: val = struct.unpack(format, self.read(size))[0] if aligned: self.seek(8 - size, io.SEEK_CUR) return val except struct.error: return None def read_aligned_u1(self) -> Optional[int]: return self._read_uint(1, "B", True) def read_u2(self) -> Optional[int]: return self._read_uint(2, "<H") def read_aligned_u2(self) -> Optional[int]: return self._read_uint(2, "<H", True) def read_u4(self) -> Optional[int]: return self._read_uint(4, "<I") def read_u8(self) -> Optional[int]: return self._read_uint(8, "<Q") def read_aligned_u4(self) -> Optional[int]: return self._read_uint(4, "<I", True) def read_buffer(self, aligned=False) -> Optional[bytes]: count = self.read_u4() if not aligned else self.read_aligned_u4() if count is None: return None data = self.read(count) if aligned: self.seek((8 - (count % 8)) % 8, io.SEEK_CUR) return data def read_bstring(self, aligned=False) -> Optional[str]: size = self.read_u4() if not aligned else self.read_aligned_u4() if size is None: return None try: raw = self.read(size) val = raw.decode("utf-16le", errors="replace").rstrip(NULL) or None except UnicodeDecodeError: val = None if aligned: self.seek((8 - (size % 8)) % 8, io.SEEK_CUR) return val def read_aligned_bstring_expand_sz(self) -> Optional[str]: sz = self.read_aligned_u4() if sz is None: return None byte_count = sz * 2 + 2 if sz == 0: return None try: content = self.read(byte_count).decode("utf-16le") except UnicodeDecodeError: content = None self.seek((8 - (byte_count % 8)) % 8, io.SEEK_CUR) return content.rstrip("\x00") if content is not None else None def read_tstimeperiod(self) -> Optional[TaskSchedulerTimePeriod]: values = ( self.read_u2(), self.read_u2(), self.read_u2(), self.read_u2(), self.read_u2(), self.read_u2(), self.read_u2(), ) if any(value is None for value in values): return None return TaskSchedulerTimePeriod(*values) def _build_guid_name_map(key: reg_extensions.CM_KEY_NODE) -> Dict[str, str]: mapping = {} task_id_value = None for value in key.get_values(): try: if value.get_name() == "Id": task_id_value = value break except ( exceptions.InvalidAddressException, registry.RegistryException, ): continue if ( task_id_value is not None and task_id_value.get_type() == reg_extensions.RegValueTypes.REG_SZ ): try: id_str = task_id_value.decode_data() except exceptions.InvalidAddressException: id_str = None try: if isinstance(id_str, bytes): mapping[id_str.decode("utf-16le", errors="replace").rstrip(NULL)] = str( key.get_name() ) except ( exceptions.InvalidAddressException, registry.RegistryException, ) as excp: vollog.debug(f"Exception occurred while decoding id_str: {excp}") for subkey in key.get_subkeys(): mapping.update(_build_guid_name_map(subkey)) return mapping
[docs] @dataclasses.dataclass class TaskAction: action_type: ActionType action: str action_args: Optional[str] working_directory: Optional[str]
[docs] @classmethod def decode_messagebox_action( cls, reader: _ScheduledTasksReader ) -> Optional["TaskAction"]: caption, content = reader.read_bstring(), reader.read_bstring() return cls( ActionType.MessageBox, f'"{caption or "<Unknown>"}": {content or "<Unknown>"}', None, None, )
@classmethod def _decode_exe_action( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["TaskAction"]: command = reader.read_bstring() args = reader.read_bstring() if command is None or args is None: return None workdir = reader.read_bstring() if version == 3: _flags = reader.read_u2() return cls(ActionType.Exe, command, args, workdir) @classmethod def _decode_email_action( cls, reader: _ScheduledTasksReader ) -> Optional["TaskAction"]: props = { "From": reader.read_bstring(), "To": reader.read_bstring(), "Cc": reader.read_bstring(), "Bcc": reader.read_bstring(), "Reply_to": reader.read_bstring(), "Server": reader.read_bstring(), "Subject": reader.read_bstring(), "Body": reader.read_bstring(), } num_attachment_filenames = reader.read_u4() if num_attachment_filenames is not None: attachment_filenames = [ reader.read_bstring() for _ in range(num_attachment_filenames) ] props["Attachments"] = ( "<" + ", ".join( filename for filename in attachment_filenames if filename is not None ) + ">" ) num_headers = reader.read_u4() if num_headers is not None: headers = [ (reader.read_bstring(), reader.read_bstring()) for _ in range(num_headers) ] props["Headers"] = ( "<" + ", ".join( f"{field}: {value}" for field, value in headers if field is not None and value is not None ) + ">" ) cls( ActionType.Email, ", ".join( f"{key}: {value}" for key, value in props.items() if value is not None ), None, None, ) @classmethod def _decode_comhandler_action( cls, reader: _ScheduledTasksReader ) -> Optional["TaskAction"]: guid_raw = reader.read(16) if not guid_raw and len(guid_raw) == 16: return None clsid = conversion.windows_bytes_to_guid(guid_raw) args = reader.read_bstring() return cls(ActionType.ComHandler, clsid, args, None)
@dataclasses.dataclass class _ScheduledTaskEntry: name: Union[str, interfaces.renderers.BaseAbsentValue] principal_id: Union[str, interfaces.renderers.BaseAbsentValue] display_name: Union[str, interfaces.renderers.BaseAbsentValue] enabled: Union[bool, interfaces.renderers.BaseAbsentValue] creation_time: Union[datetime.datetime, interfaces.renderers.BaseAbsentValue] last_run_time: Union[datetime.datetime, interfaces.renderers.BaseAbsentValue] last_successful_run_time: Union[ datetime.datetime, interfaces.renderers.BaseAbsentValue ] trigger_type: Union[str, interfaces.renderers.BaseAbsentValue] trigger_description: Union[str, interfaces.renderers.BaseAbsentValue] action_type: Union[str, interfaces.renderers.BaseAbsentValue] action_description: Union[str, interfaces.renderers.BaseAbsentValue] action_args: Union[str, interfaces.renderers.BaseAbsentValue] action_context: Union[str, interfaces.renderers.BaseAbsentValue] working_directory: Union[str, interfaces.renderers.BaseAbsentValue] guid: str @dataclasses.dataclass class _JobSchedule: start_boundary: Optional[datetime.datetime] end_boundary: Optional[datetime.datetime] repetition_interval_secs: Optional[int] repetition_duration_secs: Optional[int] execution_time_limit_secs: Optional[int] mode: Optional[TimeMode] data1: Optional[int] data2: Optional[int] data3: Optional[int] stop_tasks_at_duration_end: Optional[int] is_enabled: Optional[bool] max_delay_seconds: Optional[int] def get_description(self) -> Optional[str]: if self.mode == TimeMode.Once: return "Run one time starting at {}".format( self.start_boundary.isoformat() if self.start_boundary is not None else "<UNKNOWN>" ) elif self.mode == TimeMode.Daily: if self.data1 is None: return None return "Run at {} and repeat every {} days".format( ( self.start_boundary.isoformat() if self.start_boundary is not None else "<UNKNOWN>" ), self.data1, ) elif self.mode == TimeMode.Weekly: if self.data2 is None: return None days = [k.name for k in Weekday if k.value & self.data2] return "Run on {} every {} weeks starting at {}".format( ", ".join(days), self.data1, ( self.start_boundary.isoformat() if self.start_boundary is not None else "<UNKNOWN>" ), ) elif self.mode == TimeMode.DaysInMonths: if self.data2 is None or self.data1 is None or self.data3 is None: return None months = [month.name for month in Months if month.value & self.data3] days_bitmap = (self.data2 << 16) + self.data1 days = [str(v + 1) for v in range(31) if (1 << v) & days_bitmap] return "Run in months {} on days {} starting at {}".format( ", ".join(months), ", ".join(days), ( self.start_boundary.isoformat() if self.start_boundary is not None else "<UNKNOWN>" ), ) elif self.mode == TimeMode.DaysInWeeksInMonths: if self.data1 is None or self.data2 is None or self.data3 is None: return None months = [month.name for month in Months if month.value & self.data3] weeks = [str(v + 1) for v in range(5) if (v << 1) & self.data2] days = [day.name for day in Weekday if day.value & self.data1] return "Run in months {} in weeks {} on days {} starting at {}".format( ", ".join(months), ", ".join(weeks), ", ".join(days), ( self.start_boundary.isoformat() if self.start_boundary is not None else "<UNKNOWN>" ), ) else: return None @classmethod def decode(cls, reader: _ScheduledTasksReader) -> Optional["_JobSchedule"]: start_boundary = reader.read_task_scheduler_time() end_boundary = reader.read_task_scheduler_time() _ = reader.read_task_scheduler_time() repetition_interval_secs = reader.read_u4() repetition_duration_secs = reader.read_u4() execution_time_limit_secs = reader.read_u4() mode_index = reader.read_u4() if mode_index is not None: try: mode = TimeMode(mode_index) except ValueError: mode = TimeMode.Unknown else: mode = None data1 = reader.read_u2() data2 = reader.read_u2() data3 = reader.read_u2() reader.seek(2, io.SEEK_CUR) # pad stop_tasks_at_duration_end = reader.read_bool() is_enabled = reader.read_bool() reader.seek(6, io.SEEK_CUR) # pad (2) + unknown (4) max_delay_seconds = reader.read_u4() reader.seek(4, io.SEEK_CUR) # pad return cls( start_boundary, end_boundary, repetition_interval_secs, repetition_duration_secs, execution_time_limit_secs, mode, data1, data2, data3, stop_tasks_at_duration_end, is_enabled, max_delay_seconds, )
[docs] def decode_sid(data: bytes) -> Optional[str]: """ Decodes a windows SID from variable-length raw bytes Returns the string representation of the SID if decoding was successful, or None if the data could not be parsed due to an insufficient number of bytes. """ try: revision, subid_count, id_authority = struct.unpack( ">BBQ", data[:2] + b"\x00\x00" + data[2:8] ) subauthorities = struct.unpack( "<" + "I" * subid_count, data[8 : 8 + subid_count * 4] ) sid_string = "S-" + "-".join( [str(item) for item in [revision, id_authority] + list(subauthorities)] ) except struct.error: return None return sid_string
[docs] @dataclasses.dataclass class UserInfo: sid_type: Optional[SidType] sid: Optional[str] username: Optional[str] @classmethod def _decode(cls, reader: _ScheduledTasksReader) -> Optional["UserInfo"]: skip_user = reader.read_aligned_u1() != 0 if not skip_user: skip_sid = reader.read_aligned_u1() != 0 else: skip_sid = None sid_type = None sid = None if not skip_user and not skip_sid: try: sid_type = SidType(reader.read_aligned_u4()) except ValueError: sid_type = SidType.Unknown sid_raw = reader.read_buffer(aligned=True) if sid_raw is None: return None sid = decode_sid(sid_raw) username = reader.read_bstring(aligned=True) if not skip_user else None return UserInfo(sid_type, sid, username)
[docs] @dataclasses.dataclass class OptionalSettings: IdleDurationSeconds: int idleWaitTimeoutSeconds: int ExecutionTimeLimitSeconds: int DeleteExpiredTaskAfter: int Priority: int RestartOnFailureDelay: int RestartOnFailureRetries: int NetworkId: bytes Privileges: Optional[List[str]] Periodicity: Optional[TaskSchedulerTimePeriod] Deadline: Optional[TaskSchedulerTimePeriod] Exclusive: Optional[bool] @classmethod def _decode(cls, reader: _ScheduledTasksReader) -> Optional["OptionalSettings"]: LEN_WITH_PRIVILEGES = 0x38 LEN_WITH_TIME_PERIODS = 0x58 length = reader.read_aligned_u4() if length == 0: return None base_values = ( reader.read_u4(), reader.read_u4(), reader.read_u4(), reader.read_u4(), reader.read_u4(), reader.read_u4(), reader.read_u4(), binascii.hexlify(reader.read(16)), ) if any(value is None for value in base_values): return None reader.seek(4, io.SEEK_CUR) # padding privileges = None periodicity = None deadline = None exclusive = None if length == LEN_WITH_PRIVILEGES or length == LEN_WITH_TIME_PERIODS: privileges_raw = reader.read_u8() if privileges_raw is None: return None privileges = [ priv.name for priv in Privileges if priv.value & privileges_raw ] if length == LEN_WITH_TIME_PERIODS: periodicity = reader.read_tstimeperiod() deadline = reader.read_tstimeperiod() exclusive = reader.read_bool() reader.seek(3, io.SEEK_CUR) # padding return OptionalSettings( *base_values, privileges, periodicity, deadline, exclusive )
[docs] @dataclasses.dataclass class JobBucket: flags: List[str] crc32: int principal_id: Optional[str] display_name: Optional[str] user_info: Optional[UserInfo] optional_settings: Optional[OptionalSettings] @classmethod def _decode( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["JobBucket"]: flags_raw = reader.read_aligned_u4() if flags_raw is None: return None flags = [y for x, y in JOB_BUCKET_FLAGS.items() if x & flags_raw] crc32 = reader.read_aligned_u4() if crc32 is None: return None principal_id = None display_name = None if version >= 0x16: principal_id = reader.read_bstring(aligned=True) if version >= 0x17: display_name = reader.read_bstring(aligned=True) user_info = UserInfo._decode(reader) optional_settings = OptionalSettings._decode(reader) return JobBucket( flags, crc32, principal_id, display_name, user_info, optional_settings )
[docs] class Privileges(enum.Enum): SeCreateTokenPrivilege = 0x4 SeAssignPrimaryTokenPrivilege = 0x8 SeLockMemoryPrivilege = 0x10 SeIncreaseQuotaPrivilege = 0x20 SeMachineAccountPrivilege = 0x40 SeTcbPrivilege = 0x80 SeSecurityPrivilege = 0x100 SeTakeOwnershipPrivilege = 0x200 SeLoadDriverPrivilege = 0x400 SeSystemProfilePrivilege = 0x800 SeSystemtimePrivilege = 0x1000 SeProfileSingleProcessPrivilege = 0x2000 SeIncreaseBasePriorityPrivilege = 0x4000 SeCreatePagefilePrivilege = 0x8000 SeCreatePermanentPrivilege = 0x10000 SeBackupPrivilege = 0x20000 SeRestorePrivilege = 0x40000 SeShutdownPrivilege = 0x80000 SeDebugPrivilege = 0x100000 SeAuditPrivilege = 0x200000 SeSystemEnvironmentPrivilege = 0x400000 SeChangeNotifyPrivilege = 0x800000 SeRemoteShutdownPrivilege = 0x1000000 SeUndockPrivilege = 0x2000000 SeSyncAgentPrivilege = 0x4000000 SeEnableDelegationPrivilege = 0x8000000 SeManageVolumePrivilege = 0x10000000 SeImpersonatePrivilege = 0x20000000 SeCreateGlobalPrivilege = 0x40000000 SeTrustedCredManAccessPrivilege = 0x80000000 SeRelabelPrivilege = 0x100000000 SeIncreaseWorkingSetPrivilege = 0x200000000 SeTimeZonePrivilege = 0x400000000 SeCreateSymbolicLinkPrivilege = 0x800000000 SeDelegateSessionUserImpersonatePrivilege = 0x1000000000
[docs] class SessionState(enum.Enum): ConsoleConnect = 1 ConsoleDisconnect = 2 RemoteConnect = 3 RemoteDisconnect = 4 SessionLock = 5 SessionUnlock = 6 Unknown = "Unknown"
[docs] @dataclasses.dataclass class TaskTrigger: start_boundary: Optional[datetime.datetime] end_boundary: Optional[datetime.datetime] repetition_interval_seconds: Optional[int] enabled: Optional[bool] trigger_type: TriggerType description: Optional[str] @classmethod def _decode_generic_trigger( cls, reader: _ScheduledTasksReader, version: int, trigger_type: TriggerType ) -> Optional["TaskTrigger"]: start_boundary = reader.read_task_scheduler_time() end_boundary = reader.read_task_scheduler_time() _ = reader.read_u4() # delay seconds _ = reader.read_u4() # timeout seconds repetition_interval_secs = reader.read_u4() _ = reader.read_u4() # repetition duration seconds _ = reader.read_u4() # repetition duration seconds 2 _ = reader.read_bool() # stop at duration end reader.seek(3, io.SEEK_CUR) trigger_enabled = bool(reader.read_aligned_u1()) reader.seek(8, io.SEEK_CUR) # unknown field if version >= 0x16: cur = reader.tell() _ = reader.read_bstring() # trigger id reader.seek((8 - (reader.tell() - cur)) % 8, io.SEEK_CUR) # pad to block return cls( start_boundary, end_boundary, repetition_interval_secs, trigger_enabled, trigger_type, f"{trigger_type.name} trigger", ) @classmethod def _decode_logon_trigger( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["TaskTrigger"]: base = cls._decode_generic_trigger(reader, version, TriggerType.Logon) if base is None: return None user = UserInfo._decode(reader) if user is not None and user.username is not None: base.description = f"{user.username}: {user.sid} ({user.sid_type})" return base @classmethod def _decode_session_trigger( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["TaskTrigger"]: base = cls._decode_generic_trigger(reader, version, TriggerType.Session) if base is None: return None session_type_raw = reader.read_u4() reader.seek(4, io.SEEK_CUR) try: session_type = SessionState(session_type_raw) except ValueError: session_type = SessionState.Unknown user_info = UserInfo._decode(reader) if user_info is not None and user_info.username is not None: base.description = f"{session_type.name} for user {user_info.username}" else: base.description = session_type.name return base @classmethod def _decode_time_trigger( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["TaskTrigger"]: job_schedule = _JobSchedule.decode(reader) if job_schedule is None: return None if version >= 0x16: cur = reader.tell() _ = reader.read_bstring() # trigger id reader.seek((8 - (reader.tell() - cur)) % 8, io.SEEK_CUR) # pad to block return cls( job_schedule.start_boundary, job_schedule.end_boundary, job_schedule.repetition_interval_secs, job_schedule.is_enabled, TriggerType.Time, job_schedule.get_description() or None, ) @classmethod def _decode_event_trigger( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["TaskTrigger"]: base = cls._decode_generic_trigger(reader, version, TriggerType.Event) if base is None: return base subscription = reader.read_aligned_bstring_expand_sz() reader.seek(8, io.SEEK_CUR) # 2 4-byte unknown fields reader.read_aligned_bstring_expand_sz() # another unknown field len_value_queries = reader.read_aligned_u4() if len_value_queries is None: return base queries = [ ( reader.read_aligned_bstring_expand_sz(), reader.read_aligned_bstring_expand_sz(), ) for _ in range(len_value_queries) ] valid = [(k, v) for (k, v) in queries if k is not None and v is not None] if base.description is None: base.description = "Event Trigger" base.description += f": Subscription: {subscription}, Queries: {str(valid)}" return base @classmethod def _decode_boot_trigger( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["TaskTrigger"]: return cls._decode_generic_trigger(reader, version, TriggerType.Boot) @classmethod def _decode_wnf_trigger( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["TaskTrigger"]: base = cls._decode_generic_trigger( reader, version, TriggerType.WindowsNotificationFacility ) if base is None: return None state_name = binascii.hexlify(reader.read(8)).decode("ascii") datalen = reader.read_aligned_u4() _ = base64.b64encode(reader.read(datalen)) # state binary data base.description = f"WNF state {state_name}" return base @classmethod def _decode_idle_trigger( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["TaskTrigger"]: return cls._decode_generic_trigger(reader, version, TriggerType.Logon) @classmethod def _decode_registration_trigger( cls, reader: _ScheduledTasksReader, version: int ) -> Optional["TaskTrigger"]: return cls._decode_generic_trigger(reader, version, TriggerType.Logon)
[docs] @dataclasses.dataclass class TriggerSet: job_bucket: JobBucket triggers: List[TaskTrigger]
[docs] @classmethod def decode(cls, data) -> Optional["TriggerSet"]: reader = _ScheduledTasksReader(data) version = reader.read_aligned_u1() _ = reader.read_task_scheduler_time() # start boundary _ = reader.read_task_scheduler_time() # end_boundary if version is None: return None job_bucket = JobBucket._decode(reader, version) if job_bucket is None: return None triggers = [] while True: magic = reader.read_aligned_u4() if magic is None: break try: trigger_type = TriggerType(magic) except ValueError: vollog.warning(f"Invalid trigger magic {hex(magic)}") break if trigger_type == TriggerType.Logon: trigger = TaskTrigger._decode_logon_trigger(reader, version) elif trigger_type == TriggerType.Session: trigger = TaskTrigger._decode_session_trigger(reader, version) elif trigger_type == TriggerType.WindowsNotificationFacility: trigger = TaskTrigger._decode_wnf_trigger(reader, version) elif trigger_type == TriggerType.Boot: trigger = TaskTrigger._decode_boot_trigger(reader, version) elif trigger_type == TriggerType.Registration: trigger = TaskTrigger._decode_registration_trigger(reader, version) elif trigger_type == TriggerType.Event: trigger = TaskTrigger._decode_event_trigger(reader, version) elif trigger_type == TriggerType.Idle: trigger = TaskTrigger._decode_idle_trigger(reader, version) elif trigger_type == TriggerType.Time: trigger = TaskTrigger._decode_time_trigger(reader, version) else: vollog.warning( f"Invalid trigger magic {hex(magic)} encountered at offset {hex(reader.tell() - 8)}, stopping parsing" ) break triggers.append(trigger) return cls(job_bucket, triggers)
[docs] @dataclasses.dataclass class ActionSet: actions: List[TaskAction] context: Optional[str]
[docs] @classmethod def decode(cls, data: bytes) -> Optional["ActionSet"]: reader = _ScheduledTasksReader(data) actions = [] version = reader.read_u2() if version is None: return None if version in [2, 3]: action_context = reader.read_bstring() else: action_context = None while True: magic = reader.read_u2() if magic is None: break _ = ( reader.read_bstring() ) # action identifier, usually (but not always) empty if magic == ActionType.Email.value: action = TaskAction._decode_email_action(reader) elif magic == ActionType.Exe.value: action = TaskAction._decode_exe_action(reader, version) elif magic == ActionType.ComHandler.value: action = TaskAction._decode_comhandler_action(reader) elif magic == ActionType.MessageBox.value: action = TaskAction.decode_messagebox_action(reader) else: break actions.append(action) return cls(actions, action_context)
[docs] @dataclasses.dataclass class DynamicInfo: """ Contains information about execution history for this task, including timestamps and the last error code """ creation_time: Optional[datetime.datetime] last_run_time: Optional[datetime.datetime] last_successful_run_time: Optional[datetime.datetime] last_error_code: Optional[int]
[docs] @classmethod def decode(cls, data: bytes) -> Optional["DynamicInfo"]: """ Decodes a DynamicInfo structure from RegBin value data. Raises a `ScheduledTaskDecodingError` if the magic bytes are invalid, but otherwise attempts to decode as much as possible without returning an error. """ DYNAMICINFO_MAGIC = 3 reader = _ScheduledTasksReader(data) magic = reader.read_u4() if magic != DYNAMICINFO_MAGIC: return None creation_time = reader.decode_filetime() last_run_time = reader.decode_filetime() reader.seek(4, io.SEEK_CUR) # deprecated field 'TaskState' last_error_code = reader.read_u4() last_success_time = reader.decode_filetime() vollog.debug((creation_time, last_run_time, last_success_time)) return cls( last_run_time, creation_time, last_success_time, last_error_code, )
[docs] class ScheduledTasks(interfaces.plugins.PluginInterface, timeliner.TimeLinerInterface): """Decodes scheduled task information from the Windows registry, including information about triggers, actions, run times, and creation times.""" _required_framework_version = (2, 11, 0) _version = (2, 0, 0)
[docs] @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: # Since we're calling the plugin, make sure we have the plugin's requirements return [ requirements.ModuleRequirement( name="kernel", description="Windows kernel", architectures=["Intel33", "Intel64"], ), requirements.VersionRequirement( name="hivelist", component=hivelist.HiveList, version=(2, 0, 0) ), requirements.VersionRequirement( name="timeliner", component=timeliner.TimeLinerInterface, version=(1, 0, 0), ), ]
[docs] def generate_timeline( self, ) -> Iterator[Tuple[str, timeliner.TimeLinerType, datetime.datetime]]: for _, task in self._generator(): if isinstance(task.last_run_time, datetime.datetime): yield ( f"ScheduledTasks: task action {task.action_description} with trigger {task.trigger_description} ran", timeliner.TimeLinerType.ACCESSED, task.last_run_time, ) if isinstance(task.last_successful_run_time, datetime.datetime): yield ( f"ScheduledTasks: task action {task.action_description} with trigger {task.trigger_description} ran successfully", timeliner.TimeLinerType.ACCESSED, task.last_successful_run_time, ) if isinstance(task.creation_time, datetime.datetime): yield ( f"ScheduledTasks: Creation Time for task {task.guid} with trigger {task.trigger_description or '<UNKNOWN>'}", timeliner.TimeLinerType.CREATED, task.creation_time, )
[docs] @classmethod def get_software_hive( cls, context: interfaces.context.ContextInterface, config_path: str, kernel_module_name: str, ) -> Optional[registry.RegistryHive]: """Retrieves the `Amcache.hve` registry hive from the kernel module, if it can be located.""" return next( hivelist.HiveList.list_hives( context=context, base_config_path=interfaces.configuration.path_join( config_path, "hivelist" ), kernel_module_name=kernel_module_name, filter_string="SOFTWARE", ), None, )
[docs] @classmethod def parse_actions_value( cls, actions_value: reg_extensions.CM_KEY_VALUE ) -> Optional[ActionSet]: """Parses File entries from the Windows 8 `Root\\File` key. :param programs_key: The `Root\\File` registry key. :return: An iterator of tuples, where the first member is the program ID string for correlating `Root\\Program` entries, and the second member is the `AmcacheEntry`. """ try: data = actions_value.decode_data() except exceptions.InvalidAddressException: data = None if not isinstance(data, bytes): return None return ActionSet.decode(data)
[docs] @classmethod def parse_triggers_value( cls, triggers_value: reg_extensions.CM_KEY_VALUE ) -> Optional[TriggerSet]: try: data = triggers_value.decode_data() except exceptions.InvalidAddressException: data = None if not isinstance(data, bytes): return None return TriggerSet.decode(data)
[docs] @classmethod def parse_dynamic_info_value( cls, dyn_info_value: reg_extensions.CM_KEY_VALUE ) -> Optional[DynamicInfo]: try: data = dyn_info_value.decode_data() except exceptions.InvalidAddressException: data = None if not isinstance(data, bytes): return None return DynamicInfo.decode(data)
@classmethod def _get_task_keys( cls, software_hive: registry.RegistryHive ) -> Tuple[ Optional[reg_extensions.CM_KEY_NODE], Optional[reg_extensions.CM_KEY_NODE] ]: try: task_key = software_hive.get_key( "Microsoft\\Windows NT\\CurrentVersion\\Schedule\\TaskCache\\Tasks" ) except (KeyError, registry.RegistryException): task_key = None try: task_tree = software_hive.get_key( "Microsoft\\Windows NT\\CurrentVersion\\Schedule\\TaskCache\\Tree" ) except (KeyError, registry.RegistryException): task_tree = None return (task_key, task_tree) # type: ignore @classmethod def _parse_task_key( cls, key: reg_extensions.CM_KEY_NODE, guid_mapping: Dict[str, str] ) -> Iterator[_ScheduledTaskEntry]: values = {} for value in key.get_values(): try: name = str(value.get_name()) except ( exceptions.InvalidAddressException, registry.RegistryException, ): continue if name in ["Actions", "Triggers", "DynamicInfo"]: values[name] = value try: key_name = str(key.get_name()) except ( exceptions.InvalidAddressException, registry.RegistryException, ): key_name = None try: task_name = guid_mapping.get(key_name, renderers.NotAvailableValue()) except ( exceptions.InvalidAddressException, registry.RegistryException, ): task_name = renderers.NotAvailableValue() try: action_set = cls.parse_actions_value(values["Actions"]) except KeyError: vollog.debug("Failed to get Actions value") action_set = None try: triggers_value = values["Triggers"] trigger_set = cls.parse_triggers_value(triggers_value) except KeyError: vollog.debug("Failed to get Triggers value") trigger_set = None if trigger_set is not None: vollog.debug("Parsed triggers successfully") principal_id = ( trigger_set.job_bucket.principal_id or renderers.NotAvailableValue() ) display_name = ( trigger_set.job_bucket.display_name or renderers.NotAvailableValue() ) else: vollog.debug("Failed to parse triggers") principal_id = renderers.NotAvailableValue() display_name = renderers.NotAvailableValue() try: dynamic_info = cls.parse_dynamic_info_value(values["DynamicInfo"]) except KeyError: vollog.debug("DynamicInfo value not found") dynamic_info = None vollog.debug(dynamic_info) creation_time = dynamic_info.creation_time if dynamic_info is not None else None last_run_time = dynamic_info.last_run_time if dynamic_info is not None else None last_successful_run_time = ( dynamic_info.last_successful_run_time if dynamic_info is not None else None ) all_triggers = ( trigger_set.triggers or [None] if trigger_set is not None else [None] ) all_actions = action_set.actions or [None] if action_set is not None else [None] for action, trigger in itertools.product(all_actions, all_triggers): if action is not None: if action.action_type in ( ActionType.Exe, ActionType.ComHandler, ): if action.action_args is None: args = renderers.NotAvailableValue() else: args = action.action_args else: args = renderers.NotApplicableValue() if action.action_type == ActionType.Exe: working_directory = ( action.working_directory or renderers.NotAvailableValue() ) else: working_directory = renderers.NotApplicableValue() else: args = renderers.NotAvailableValue() working_directory = renderers.NotAvailableValue() if trigger is not None and trigger.enabled is not None: enabled = trigger.enabled else: enabled = renderers.NotAvailableValue() yield _ScheduledTaskEntry( task_name, principal_id, display_name, enabled, creation_time or renderers.NotAvailableValue(), last_run_time or renderers.NotAvailableValue(), last_successful_run_time or renderers.NotAvailableValue(), ( trigger.trigger_type.name if trigger is not None else renderers.NotAvailableValue() ), ( trigger.description or renderers.NotAvailableValue() if trigger is not None else renderers.NotAvailableValue() ), ( action.action_type.name if action is not None else renderers.NotAvailableValue() ), ( action.action if action is not None else renderers.NotAvailableValue() ), args, ( action_set.context if (action_set is not None and action_set.context is not None) else renderers.NotAvailableValue() ), working_directory, key_name or renderers.NotAvailableValue(), ) def _generator(self) -> Iterator[Tuple[int, _ScheduledTaskEntry]]: # Building the dictionary ahead of time is much better for performance # vs looking up each service's DLL individually. software_hive = self.get_software_hive( self.context, self.config_path, self.config["kernel"] ) if software_hive is None: vollog.warning("Failed to get SOFTWARE hive") return task_key_root, task_tree = self._get_task_keys(software_hive) if task_key_root is None: vollog.warning("Failed to get 'Tasks' key") return if task_tree is not None: task_name_map = _build_guid_name_map(task_tree) else: vollog.info("'Tree' key not found, can't map GUIDs to task names") task_name_map = {} for key in task_key_root.get_subkeys(): for task in self._parse_task_key(key, task_name_map): yield 0, task
[docs] def run(self): return renderers.TreeGrid( [ ("Task Name", str), ("Principal ID", str), ("Display Name", str), ("Enabled", bool), ("Creation Time", datetime.datetime), ("Last Run Time", datetime.datetime), ("Last Successful Run Time", datetime.datetime), ("Trigger Type", str), ("Trigger Description", str), ("Action Type", str), ("Action", str), ("Action Arguments", str), ("Action Context", str), ("Working Directory", str), ("Key Name", str), ], ( (indent, dataclasses.astuple(entry)) for indent, entry in self._generator() ), )