Redesign: stack-based task system with subtasks and persistence

TaskStack replaces TaskQueue:
- Tasks are a STACK — new tasks push on top, completed tasks pop
- Interrupted tasks are PAUSED and RESUMED when the interrupt ends
- Combat/flee are temporary INTERRUPTIONS that don't affect the stack
- Each PrimaryTask has a list of SubTasks (sequential steps)
- Open-ended tasks ("explore together") stay active until cancelled
- Max stack depth of 5 — oldest self-directed tasks drop off
- Player commands always push as HIGH priority
- Self-directed goals sit at the bottom

Helper functions:
- make_task() — create single-step primary tasks
- make_interrupt() — create temporary combat/flee interrupts

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
roberts 2026-03-30 17:24:19 -05:00
parent e6d4c8d377
commit 61c4a919d2

View file

@ -1,47 +1,135 @@
""" """
Task queue system for Doug's autonomous behavior. Task Stack persistent, stack-based task system for Doug.
Tasks have priorities, can be interrupted, and are influenced by persona traits.
Tasks are organized as a STACK:
- Player commands push onto the top
- Self-directed goals sit at the bottom
- Interruptions (combat, player requests) are temporary
- When a task completes, Doug resumes the one below it
- Each task can have subtasks (get materials return to building)
Priority levels determine which tasks can interrupt others.
""" """
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import IntEnum from enum import IntEnum
from typing import Any, Callable, Optional from typing import Any, Optional
from dougbot.utils.logging import get_logger from dougbot.utils.logging import get_logger
log = get_logger("core.task_queue") log = get_logger("core.tasks")
class Priority(IntEnum): class Priority(IntEnum):
"""Task priority levels. Higher = more urgent.""" """Task priority levels. Higher = more urgent."""
IDLE = 0 # Looking around, wandering IDLE = 0 # Looking around, wandering
LOW = 10 # Self-directed goals (explore, organize) SELF_DIRECTED = 10 # Doug's own goals (explore, gather)
NORMAL = 20 # Environmental triggers (sign found, interesting block) LOW = 20 # Low-priority environmental triggers
HIGH = 30 # Player requests via chat NORMAL = 30 # Environmental triggers (sign found, interesting block)
URGENT = 40 # Survival (flee, eat, find shelter) HIGH = 40 # Player requests via chat
CRITICAL = 50 # Immediate danger (health critical, falling) URGENT = 50 # Survival (flee, eat, find shelter)
CRITICAL = 60 # Immediate danger (health critical)
class TaskStatus(IntEnum):
PENDING = 0
ACTIVE = 1
PAUSED = 2 # Interrupted by higher priority task
COMPLETE = 3
FAILED = 4
CANCELLED = 5
@dataclass @dataclass
class Task: class SubTask:
"""A single task for Doug to perform.""" """A single step within a larger task."""
name: str name: str
priority: Priority
action: str # Bridge action to execute action: str # Bridge action to execute
params: dict = field(default_factory=dict) params: dict = field(default_factory=dict)
description: str = "" # Human-readable description for chat description: str = ""
steps: list = field(default_factory=list) # Multi-step tasks status: TaskStatus = TaskStatus.PENDING
current_step: int = 0 timeout: float = 30.0
created_at: float = field(default_factory=time.time)
started_at: float = 0.0 started_at: float = 0.0
timeout: float = 60.0 # Max seconds before auto-cancel
interruptible: bool = True # Can be interrupted by higher priority
callback: Optional[str] = None # Method name to call on completion
context: dict = field(default_factory=dict) # Extra data for the task
@property @property
def is_expired(self) -> bool: def is_expired(self) -> bool:
if self.started_at > 0:
return (time.time() - self.started_at) > self.timeout
return False
@dataclass
class PrimaryTask:
"""
A primary task that persists until completion or cancellation.
Can contain subtasks and survives interruptions.
"""
name: str
priority: Priority
description: str = ""
source: str = "self" # "player" or "self"
source_player: str = "" # Who gave the command
# Subtask queue
subtasks: list = field(default_factory=list)
current_subtask_idx: int = 0
# Task lifecycle
status: TaskStatus = TaskStatus.PENDING
created_at: float = field(default_factory=time.time)
started_at: float = 0.0
timeout: float = 300.0 # 5 min default for primary tasks
interruptible: bool = True
# For open-ended tasks (explore, follow) — no auto-complete
open_ended: bool = False
# Resume info — where to go back to after subtask
resume_position: dict = field(default_factory=dict)
resume_data: dict = field(default_factory=dict)
def add_subtask(self, name: str, action: str, params: dict = None,
description: str = "", timeout: float = 30.0):
"""Add a subtask to this primary task."""
self.subtasks.append(SubTask(
name=name, action=action, params=params or {},
description=description, timeout=timeout,
))
def current_subtask(self) -> Optional[SubTask]:
"""Get the current subtask to execute."""
while self.current_subtask_idx < len(self.subtasks):
st = self.subtasks[self.current_subtask_idx]
if st.status in (TaskStatus.PENDING, TaskStatus.ACTIVE):
return st
self.current_subtask_idx += 1
return None
def advance_subtask(self):
"""Mark current subtask complete and move to next."""
if self.current_subtask_idx < len(self.subtasks):
self.subtasks[self.current_subtask_idx].status = TaskStatus.COMPLETE
self.current_subtask_idx += 1
def fail_subtask(self):
"""Mark current subtask as failed."""
if self.current_subtask_idx < len(self.subtasks):
self.subtasks[self.current_subtask_idx].status = TaskStatus.FAILED
@property
def is_complete(self) -> bool:
"""All subtasks done and not open-ended."""
if self.open_ended:
return False
if not self.subtasks:
return self.status == TaskStatus.COMPLETE
return self.current_subtask_idx >= len(self.subtasks)
@property
def is_expired(self) -> bool:
if self.open_ended:
return False
if self.started_at > 0: if self.started_at > 0:
return (time.time() - self.started_at) > self.timeout return (time.time() - self.started_at) > self.timeout
return (time.time() - self.created_at) > self.timeout * 2 return (time.time() - self.created_at) > self.timeout * 2
@ -50,94 +138,251 @@ class Task:
def age(self) -> float: def age(self) -> float:
return time.time() - self.created_at return time.time() - self.created_at
def __str__(self):
st = self.current_subtask()
step_info = f"{st.description}" if st else ""
return f"[{self.priority.name}] {self.description}{step_info}"
class TaskQueue:
"""Priority queue of tasks for Doug.""" class TaskStack:
"""
Stack-based task manager.
The stack represents Doug's focus:
- Top of stack = what he's doing right now
- Below = what he'll resume when current task is done
- Interruptions push onto the stack temporarily
- Player commands always push as high priority
Max depth: 5 (oldest self-directed tasks get dropped)
"""
MAX_DEPTH = 5
def __init__(self): def __init__(self):
self._queue: list[Task] = [] self._stack: list[PrimaryTask] = []
self._current: Optional[Task] = None self._interruption: Optional[PrimaryTask] = None # Combat, flee — temporary
self._completed: list[str] = [] # Recent completed task names self._executing_subtask = False
self._max_completed = 20 self._completed_names: list[str] = []
@property @property
def current_task(self) -> Optional[Task]: def current_task(self) -> Optional[PrimaryTask]:
return self._current """What Doug is focused on right now."""
if self._interruption and self._interruption.status == TaskStatus.ACTIVE:
return self._interruption
if self._stack:
return self._stack[-1]
return None
@property @property
def is_busy(self) -> bool: def is_busy(self) -> bool:
return self._current is not None return self._executing_subtask
@property @property
def queue_size(self) -> int: def stack_depth(self) -> int:
return len(self._queue) return len(self._stack)
def add(self, task: Task) -> bool: @property
"""Add a task to the queue. Returns True if it should interrupt current.""" def stack_summary(self) -> list[str]:
# Remove expired tasks """Human-readable stack for debugging."""
self._queue = [t for t in self._queue if not t.is_expired] result = []
for i, task in enumerate(reversed(self._stack)):
marker = "" if i == 0 else " "
result.append(f"{marker}{task}")
return result
# Don't duplicate same task def push(self, task: PrimaryTask) -> bool:
for existing in self._queue: """
if existing.name == task.name and existing.action == task.action: Push a new primary task onto the stack.
Returns True if this task should execute immediately.
"""
# Don't duplicate
for existing in self._stack:
if existing.name == task.name and existing.status == TaskStatus.ACTIVE:
return False return False
if self._current and self._current.name == task.name:
return False
self._queue.append(task) # Pause the current top task
self._queue.sort(key=lambda t: t.priority, reverse=True) if self._stack and self._stack[-1].status == TaskStatus.ACTIVE:
self._stack[-1].status = TaskStatus.PAUSED
log.info(f"Pausing: {self._stack[-1].description}")
# Check if this should interrupt current task # Push new task
if self._current and task.priority > self._current.priority and self._current.interruptible: task.status = TaskStatus.ACTIVE
log.info(f"Task '{task.name}' (priority {task.priority.name}) interrupts '{self._current.name}'") task.started_at = time.time()
# Re-queue current task self._stack.append(task)
self._queue.append(self._current)
self._queue.sort(key=lambda t: t.priority, reverse=True)
self._current = None
return True
return not self.is_busy # Trim stack if too deep — drop lowest priority at bottom
while len(self._stack) > self.MAX_DEPTH:
self._stack.pop(0)
def next(self) -> Optional[Task]: log.info(f"New task: {task.description} (stack depth: {len(self._stack)})")
"""Get the next task to work on.""" return True
if self._current:
if self._current.is_expired:
log.debug(f"Task '{self._current.name}' expired")
self._current = None
else:
return self._current
# Remove expired def interrupt(self, task: PrimaryTask):
self._queue = [t for t in self._queue if not t.is_expired] """
Set a temporary interruption (combat, flee).
These don't push onto the main stack — they override temporarily.
"""
self._interruption = task
task.status = TaskStatus.ACTIVE
task.started_at = time.time()
log.info(f"Interrupt: {task.description}")
if not self._queue: def complete_interruption(self):
"""Clear the current interruption, resume stack."""
if self._interruption:
self._interruption.status = TaskStatus.COMPLETE
self._interruption = None
self._executing_subtask = False
def complete_current(self):
"""Complete the current task/subtask."""
self._executing_subtask = False
# If interruption, complete it
if self._interruption and self._interruption.status == TaskStatus.ACTIVE:
self.complete_interruption()
return
if not self._stack:
return
top = self._stack[-1]
# Advance the subtask
top.advance_subtask()
# Check if the whole task is done
if top.is_complete:
self._stack.pop()
self._completed_names.append(top.name)
if len(self._completed_names) > 20:
self._completed_names.pop(0)
log.info(f"Task complete: {top.description}")
# Resume the task below if any
if self._stack:
self._stack[-1].status = TaskStatus.ACTIVE
log.info(f"Resuming: {self._stack[-1].description}")
def fail_current(self):
"""Fail current subtask but don't kill the primary task."""
self._executing_subtask = False
if self._interruption and self._interruption.status == TaskStatus.ACTIVE:
self.complete_interruption()
return
if not self._stack:
return
top = self._stack[-1]
top.fail_subtask()
# Move to next subtask — one failure doesn't kill the whole task
top.advance_subtask()
if top.is_complete:
self._stack.pop()
if self._stack:
self._stack[-1].status = TaskStatus.ACTIVE
def cancel_current(self):
"""Cancel the current primary task entirely."""
self._executing_subtask = False
if self._interruption:
self.complete_interruption()
return
if self._stack:
removed = self._stack.pop()
log.info(f"Task cancelled: {removed.description}")
if self._stack:
self._stack[-1].status = TaskStatus.ACTIVE
log.info(f"Resuming: {self._stack[-1].description}")
def cancel_all(self):
"""Clear everything."""
self._stack.clear()
self._interruption = None
self._executing_subtask = False
def get_next_action(self) -> Optional[SubTask]:
"""
Get the next subtask to execute.
Returns None if nothing to do or already executing.
"""
if self._executing_subtask:
return None return None
self._current = self._queue.pop(0) # Check interruption first
self._current.started_at = time.time() if self._interruption and self._interruption.status == TaskStatus.ACTIVE:
return self._current st = self._interruption.current_subtask()
if st:
st.status = TaskStatus.ACTIVE
st.started_at = time.time()
self._executing_subtask = True
return st
else:
self.complete_interruption()
def complete(self, task_name: str = ""): # Check stack
"""Mark current task as complete.""" if not self._stack:
if self._current: return None
name = self._current.name
self._completed.append(name)
if len(self._completed) > self._max_completed:
self._completed.pop(0)
self._current = None
log.debug(f"Task completed: {name}")
def cancel(self, task_name: str = ""): # Expire old tasks
"""Cancel current task.""" top = self._stack[-1]
if self._current: if top.is_expired:
log.debug(f"Task cancelled: {self._current.name}") log.debug(f"Task expired: {top.description}")
self._current = None self._stack.pop()
if self._stack:
self._stack[-1].status = TaskStatus.ACTIVE
return None
def clear(self): if top.status != TaskStatus.ACTIVE:
"""Clear all tasks.""" top.status = TaskStatus.ACTIVE
self._queue.clear()
self._current = None
def recently_completed(self, task_name: str) -> bool: st = top.current_subtask()
"""Check if a task was recently completed (avoid repeating).""" if st:
return task_name in self._completed st.status = TaskStatus.ACTIVE
st.started_at = time.time()
self._executing_subtask = True
return st
# No more subtasks — task is done
if not top.open_ended:
self.complete_current()
return self.get_next_action() # Try next task in stack
return None
def recently_completed(self, name: str) -> bool:
return name in self._completed_names
# ── Convenience: quick task creation ──
def make_task(name: str, priority: Priority, description: str,
action: str, params: dict = None, timeout: float = 30.0,
source: str = "self", source_player: str = "",
open_ended: bool = False) -> PrimaryTask:
"""Create a simple single-step primary task."""
task = PrimaryTask(
name=name, priority=priority, description=description,
source=source, source_player=source_player,
open_ended=open_ended, timeout=timeout,
)
task.add_subtask(name, action, params or {}, description, timeout)
return task
def make_interrupt(name: str, description: str,
action: str, params: dict = None,
timeout: float = 15.0) -> PrimaryTask:
"""Create a temporary interruption task (combat, flee)."""
task = PrimaryTask(
name=name, priority=Priority.CRITICAL, description=description,
interruptible=False, timeout=timeout,
)
task.add_subtask(name, action, params or {}, description, timeout)
return task