From 61c4a919d26248f7c429714f7b2887662433a222 Mon Sep 17 00:00:00 2001 From: roberts Date: Mon, 30 Mar 2026 17:24:19 -0500 Subject: [PATCH] Redesign: stack-based task system with subtasks and persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TaskStack replaces TaskQueue: - Tasks are a STACK — new tasks push on top, completed tasks pop - Interrupted tasks are PAUSED and RESUMED when the interrupt ends - Combat/flee are temporary INTERRUPTIONS that don't affect the stack - Each PrimaryTask has a list of SubTasks (sequential steps) - Open-ended tasks ("explore together") stay active until cancelled - Max stack depth of 5 — oldest self-directed tasks drop off - Player commands always push as HIGH priority - Self-directed goals sit at the bottom Helper functions: - make_task() — create single-step primary tasks - make_interrupt() — create temporary combat/flee interrupts Co-Authored-By: Claude Opus 4.6 (1M context) --- dougbot/core/task_queue.py | 417 +++++++++++++++++++++++++++++-------- 1 file changed, 331 insertions(+), 86 deletions(-) diff --git a/dougbot/core/task_queue.py b/dougbot/core/task_queue.py index 46ee9e4..4aacf94 100644 --- a/dougbot/core/task_queue.py +++ b/dougbot/core/task_queue.py @@ -1,47 +1,135 @@ """ -Task queue system for Doug's autonomous behavior. -Tasks have priorities, can be interrupted, and are influenced by persona traits. +Task Stack — persistent, stack-based task system for Doug. + +Tasks are organized as a STACK: +- Player commands push onto the top +- Self-directed goals sit at the bottom +- Interruptions (combat, player requests) are temporary +- When a task completes, Doug resumes the one below it +- Each task can have subtasks (get materials → return to building) + +Priority levels determine which tasks can interrupt others. """ import time from dataclasses import dataclass, field from enum import IntEnum -from typing import Any, Callable, Optional +from typing import Any, Optional from dougbot.utils.logging import get_logger -log = get_logger("core.task_queue") +log = get_logger("core.tasks") class Priority(IntEnum): """Task priority levels. Higher = more urgent.""" IDLE = 0 # Looking around, wandering - LOW = 10 # Self-directed goals (explore, organize) - NORMAL = 20 # Environmental triggers (sign found, interesting block) - HIGH = 30 # Player requests via chat - URGENT = 40 # Survival (flee, eat, find shelter) - CRITICAL = 50 # Immediate danger (health critical, falling) + SELF_DIRECTED = 10 # Doug's own goals (explore, gather) + LOW = 20 # Low-priority environmental triggers + NORMAL = 30 # Environmental triggers (sign found, interesting block) + HIGH = 40 # Player requests via chat + URGENT = 50 # Survival (flee, eat, find shelter) + CRITICAL = 60 # Immediate danger (health critical) + + +class TaskStatus(IntEnum): + PENDING = 0 + ACTIVE = 1 + PAUSED = 2 # Interrupted by higher priority task + COMPLETE = 3 + FAILED = 4 + CANCELLED = 5 @dataclass -class Task: - """A single task for Doug to perform.""" +class SubTask: + """A single step within a larger task.""" name: str - priority: Priority action: str # Bridge action to execute params: dict = field(default_factory=dict) - description: str = "" # Human-readable description for chat - steps: list = field(default_factory=list) # Multi-step tasks - current_step: int = 0 - created_at: float = field(default_factory=time.time) + description: str = "" + status: TaskStatus = TaskStatus.PENDING + timeout: float = 30.0 started_at: float = 0.0 - timeout: float = 60.0 # Max seconds before auto-cancel - interruptible: bool = True # Can be interrupted by higher priority - callback: Optional[str] = None # Method name to call on completion - context: dict = field(default_factory=dict) # Extra data for the task @property def is_expired(self) -> bool: + if self.started_at > 0: + return (time.time() - self.started_at) > self.timeout + return False + + +@dataclass +class PrimaryTask: + """ + A primary task that persists until completion or cancellation. + Can contain subtasks and survives interruptions. + """ + name: str + priority: Priority + description: str = "" + source: str = "self" # "player" or "self" + source_player: str = "" # Who gave the command + + # Subtask queue + subtasks: list = field(default_factory=list) + current_subtask_idx: int = 0 + + # Task lifecycle + status: TaskStatus = TaskStatus.PENDING + created_at: float = field(default_factory=time.time) + started_at: float = 0.0 + timeout: float = 300.0 # 5 min default for primary tasks + interruptible: bool = True + + # For open-ended tasks (explore, follow) — no auto-complete + open_ended: bool = False + + # Resume info — where to go back to after subtask + resume_position: dict = field(default_factory=dict) + resume_data: dict = field(default_factory=dict) + + def add_subtask(self, name: str, action: str, params: dict = None, + description: str = "", timeout: float = 30.0): + """Add a subtask to this primary task.""" + self.subtasks.append(SubTask( + name=name, action=action, params=params or {}, + description=description, timeout=timeout, + )) + + def current_subtask(self) -> Optional[SubTask]: + """Get the current subtask to execute.""" + while self.current_subtask_idx < len(self.subtasks): + st = self.subtasks[self.current_subtask_idx] + if st.status in (TaskStatus.PENDING, TaskStatus.ACTIVE): + return st + self.current_subtask_idx += 1 + return None + + def advance_subtask(self): + """Mark current subtask complete and move to next.""" + if self.current_subtask_idx < len(self.subtasks): + self.subtasks[self.current_subtask_idx].status = TaskStatus.COMPLETE + self.current_subtask_idx += 1 + + def fail_subtask(self): + """Mark current subtask as failed.""" + if self.current_subtask_idx < len(self.subtasks): + self.subtasks[self.current_subtask_idx].status = TaskStatus.FAILED + + @property + def is_complete(self) -> bool: + """All subtasks done and not open-ended.""" + if self.open_ended: + return False + if not self.subtasks: + return self.status == TaskStatus.COMPLETE + return self.current_subtask_idx >= len(self.subtasks) + + @property + def is_expired(self) -> bool: + if self.open_ended: + return False if self.started_at > 0: return (time.time() - self.started_at) > self.timeout return (time.time() - self.created_at) > self.timeout * 2 @@ -50,94 +138,251 @@ class Task: def age(self) -> float: return time.time() - self.created_at + def __str__(self): + st = self.current_subtask() + step_info = f" → {st.description}" if st else "" + return f"[{self.priority.name}] {self.description}{step_info}" -class TaskQueue: - """Priority queue of tasks for Doug.""" + +class TaskStack: + """ + Stack-based task manager. + + The stack represents Doug's focus: + - Top of stack = what he's doing right now + - Below = what he'll resume when current task is done + - Interruptions push onto the stack temporarily + - Player commands always push as high priority + + Max depth: 5 (oldest self-directed tasks get dropped) + """ + + MAX_DEPTH = 5 def __init__(self): - self._queue: list[Task] = [] - self._current: Optional[Task] = None - self._completed: list[str] = [] # Recent completed task names - self._max_completed = 20 + self._stack: list[PrimaryTask] = [] + self._interruption: Optional[PrimaryTask] = None # Combat, flee — temporary + self._executing_subtask = False + self._completed_names: list[str] = [] @property - def current_task(self) -> Optional[Task]: - return self._current + def current_task(self) -> Optional[PrimaryTask]: + """What Doug is focused on right now.""" + if self._interruption and self._interruption.status == TaskStatus.ACTIVE: + return self._interruption + if self._stack: + return self._stack[-1] + return None @property def is_busy(self) -> bool: - return self._current is not None + return self._executing_subtask @property - def queue_size(self) -> int: - return len(self._queue) + def stack_depth(self) -> int: + return len(self._stack) - def add(self, task: Task) -> bool: - """Add a task to the queue. Returns True if it should interrupt current.""" - # Remove expired tasks - self._queue = [t for t in self._queue if not t.is_expired] + @property + def stack_summary(self) -> list[str]: + """Human-readable stack for debugging.""" + result = [] + for i, task in enumerate(reversed(self._stack)): + marker = "→ " if i == 0 else " " + result.append(f"{marker}{task}") + return result - # Don't duplicate same task - for existing in self._queue: - if existing.name == task.name and existing.action == task.action: + def push(self, task: PrimaryTask) -> bool: + """ + Push a new primary task onto the stack. + Returns True if this task should execute immediately. + """ + # Don't duplicate + for existing in self._stack: + if existing.name == task.name and existing.status == TaskStatus.ACTIVE: return False - if self._current and self._current.name == task.name: - return False - self._queue.append(task) - self._queue.sort(key=lambda t: t.priority, reverse=True) + # Pause the current top task + if self._stack and self._stack[-1].status == TaskStatus.ACTIVE: + self._stack[-1].status = TaskStatus.PAUSED + log.info(f"Pausing: {self._stack[-1].description}") - # Check if this should interrupt current task - if self._current and task.priority > self._current.priority and self._current.interruptible: - log.info(f"Task '{task.name}' (priority {task.priority.name}) interrupts '{self._current.name}'") - # Re-queue current task - self._queue.append(self._current) - self._queue.sort(key=lambda t: t.priority, reverse=True) - self._current = None - return True + # Push new task + task.status = TaskStatus.ACTIVE + task.started_at = time.time() + self._stack.append(task) - return not self.is_busy + # Trim stack if too deep — drop lowest priority at bottom + while len(self._stack) > self.MAX_DEPTH: + self._stack.pop(0) - def next(self) -> Optional[Task]: - """Get the next task to work on.""" - if self._current: - if self._current.is_expired: - log.debug(f"Task '{self._current.name}' expired") - self._current = None - else: - return self._current + log.info(f"New task: {task.description} (stack depth: {len(self._stack)})") + return True - # Remove expired - self._queue = [t for t in self._queue if not t.is_expired] + def interrupt(self, task: PrimaryTask): + """ + Set a temporary interruption (combat, flee). + These don't push onto the main stack — they override temporarily. + """ + self._interruption = task + task.status = TaskStatus.ACTIVE + task.started_at = time.time() + log.info(f"Interrupt: {task.description}") - if not self._queue: + def complete_interruption(self): + """Clear the current interruption, resume stack.""" + if self._interruption: + self._interruption.status = TaskStatus.COMPLETE + self._interruption = None + self._executing_subtask = False + + def complete_current(self): + """Complete the current task/subtask.""" + self._executing_subtask = False + + # If interruption, complete it + if self._interruption and self._interruption.status == TaskStatus.ACTIVE: + self.complete_interruption() + return + + if not self._stack: + return + + top = self._stack[-1] + + # Advance the subtask + top.advance_subtask() + + # Check if the whole task is done + if top.is_complete: + self._stack.pop() + self._completed_names.append(top.name) + if len(self._completed_names) > 20: + self._completed_names.pop(0) + log.info(f"Task complete: {top.description}") + + # Resume the task below if any + if self._stack: + self._stack[-1].status = TaskStatus.ACTIVE + log.info(f"Resuming: {self._stack[-1].description}") + + def fail_current(self): + """Fail current subtask but don't kill the primary task.""" + self._executing_subtask = False + + if self._interruption and self._interruption.status == TaskStatus.ACTIVE: + self.complete_interruption() + return + + if not self._stack: + return + + top = self._stack[-1] + top.fail_subtask() + # Move to next subtask — one failure doesn't kill the whole task + top.advance_subtask() + + if top.is_complete: + self._stack.pop() + if self._stack: + self._stack[-1].status = TaskStatus.ACTIVE + + def cancel_current(self): + """Cancel the current primary task entirely.""" + self._executing_subtask = False + + if self._interruption: + self.complete_interruption() + return + + if self._stack: + removed = self._stack.pop() + log.info(f"Task cancelled: {removed.description}") + if self._stack: + self._stack[-1].status = TaskStatus.ACTIVE + log.info(f"Resuming: {self._stack[-1].description}") + + def cancel_all(self): + """Clear everything.""" + self._stack.clear() + self._interruption = None + self._executing_subtask = False + + def get_next_action(self) -> Optional[SubTask]: + """ + Get the next subtask to execute. + Returns None if nothing to do or already executing. + """ + if self._executing_subtask: return None - self._current = self._queue.pop(0) - self._current.started_at = time.time() - return self._current + # Check interruption first + if self._interruption and self._interruption.status == TaskStatus.ACTIVE: + st = self._interruption.current_subtask() + if st: + st.status = TaskStatus.ACTIVE + st.started_at = time.time() + self._executing_subtask = True + return st + else: + self.complete_interruption() - def complete(self, task_name: str = ""): - """Mark current task as complete.""" - if self._current: - name = self._current.name - self._completed.append(name) - if len(self._completed) > self._max_completed: - self._completed.pop(0) - self._current = None - log.debug(f"Task completed: {name}") + # Check stack + if not self._stack: + return None - def cancel(self, task_name: str = ""): - """Cancel current task.""" - if self._current: - log.debug(f"Task cancelled: {self._current.name}") - self._current = None + # Expire old tasks + top = self._stack[-1] + if top.is_expired: + log.debug(f"Task expired: {top.description}") + self._stack.pop() + if self._stack: + self._stack[-1].status = TaskStatus.ACTIVE + return None - def clear(self): - """Clear all tasks.""" - self._queue.clear() - self._current = None + if top.status != TaskStatus.ACTIVE: + top.status = TaskStatus.ACTIVE - def recently_completed(self, task_name: str) -> bool: - """Check if a task was recently completed (avoid repeating).""" - return task_name in self._completed + st = top.current_subtask() + if st: + st.status = TaskStatus.ACTIVE + st.started_at = time.time() + self._executing_subtask = True + return st + + # No more subtasks — task is done + if not top.open_ended: + self.complete_current() + return self.get_next_action() # Try next task in stack + + return None + + def recently_completed(self, name: str) -> bool: + return name in self._completed_names + + +# ── Convenience: quick task creation ── + +def make_task(name: str, priority: Priority, description: str, + action: str, params: dict = None, timeout: float = 30.0, + source: str = "self", source_player: str = "", + open_ended: bool = False) -> PrimaryTask: + """Create a simple single-step primary task.""" + task = PrimaryTask( + name=name, priority=priority, description=description, + source=source, source_player=source_player, + open_ended=open_ended, timeout=timeout, + ) + task.add_subtask(name, action, params or {}, description, timeout) + return task + + +def make_interrupt(name: str, description: str, + action: str, params: dict = None, + timeout: float = 15.0) -> PrimaryTask: + """Create a temporary interruption task (combat, flee).""" + task = PrimaryTask( + name=name, priority=Priority.CRITICAL, description=description, + interruptible=False, timeout=timeout, + ) + task.add_subtask(name, action, params or {}, description, timeout) + return task