Combat: - Track entityGone event to detect mob death (not just entity check) - Auto-equip best weapon before fighting - 10-second cooldown between engagements (was 5) - Better target death detection Equipment system: - equipBestWeapon(): finds and equips highest-tier sword/axe - equipBestTool(blockType): picks right tool for block (pickaxe for stone, etc.) - equipBestArmor(): equips best armor in each slot - Auto-equip armor when new armor pieces appear in inventory - Weapon tiers: netherite > diamond > iron > golden > stone > wooden - Tool type mapping: pickaxe for ores, axe for wood, shovel for dirt New bridge actions: equip_best_weapon, equip_best_tool, equip_armor Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
552 lines
21 KiB
Python
552 lines
21 KiB
Python
"""
|
|
Doug's Brain — stack-based autonomous decision engine.
|
|
|
|
Tasks are a STACK:
|
|
- Player commands push on top (highest priority)
|
|
- Self-directed goals sit at the bottom
|
|
- Combat/flee are temporary INTERRUPTIONS that don't affect the stack
|
|
- When a task completes, Doug resumes the one below it
|
|
- Each task can have subtasks (get materials → return to building)
|
|
|
|
The brain ticks every 2 seconds:
|
|
1. Update needs (decay over time)
|
|
2. Scan surroundings → update memory
|
|
3. If busy executing, WAIT
|
|
4. Check for interrupts (combat, flee)
|
|
5. Get next subtask from the stack and execute it
|
|
6. If stack is empty, generate self-directed tasks
|
|
"""
|
|
|
|
import math
|
|
import random
|
|
import time
|
|
from PySide6.QtCore import QObject, QTimer, Signal
|
|
|
|
from dougbot.bridge.ws_client import BridgeWSClient
|
|
from dougbot.bridge.protocol import ResponseMessage
|
|
from dougbot.core.task_queue import (
|
|
TaskStack, PrimaryTask, SubTask, Priority, TaskStatus,
|
|
make_task, make_interrupt, TaskStatus,
|
|
)
|
|
from dougbot.core.behaviors import (
|
|
NeedsSystem, GoalManager, SpatialMemory, DailyRoutine, BehaviorEngine,
|
|
)
|
|
from dougbot.utils.logging import get_logger
|
|
|
|
log = get_logger("core.brain")
|
|
|
|
|
|
class DougBrain(QObject):
|
|
"""Stack-based autonomous decision engine."""
|
|
|
|
wants_to_chat = Signal(str)
|
|
wants_ai_chat = Signal(str, str)
|
|
|
|
def __init__(self, ws_client: BridgeWSClient, doug_name: str,
|
|
traits: dict = None, age: int = 30, parent=None):
|
|
super().__init__(parent)
|
|
self._ws = ws_client
|
|
self._doug_name = doug_name
|
|
self._tick_timer = QTimer(self)
|
|
self._tick_timer.timeout.connect(self._tick)
|
|
self._running = False
|
|
|
|
# Core systems
|
|
self._tasks = TaskStack()
|
|
traits = traits or {}
|
|
self._needs = NeedsSystem(traits)
|
|
self._goals = GoalManager(traits, age)
|
|
self._memory = SpatialMemory()
|
|
self._routine = DailyRoutine(traits, age)
|
|
self._behaviors = BehaviorEngine(traits, age, doug_name)
|
|
self._traits = traits
|
|
|
|
# Scan state
|
|
self._pending_scan = False
|
|
self._last_scan_time = 0.0
|
|
self._scan_interval = 4.0
|
|
|
|
# Tick counter
|
|
self._tick_count = 0
|
|
|
|
# Chat throttle
|
|
self._last_chat_time = 0.0
|
|
|
|
# Action tracking
|
|
self._action_sent_time = 0.0
|
|
self._last_combat_time = 0.0
|
|
|
|
def start(self):
|
|
self._running = True
|
|
self._tick_timer.start(2000)
|
|
self._goals.seed_initial_goals(self._memory, self._behaviors)
|
|
log.info("Brain started — Doug is thinking (stack-based)")
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
self._tick_timer.stop()
|
|
self._tasks.cancel_all()
|
|
self._ws.send_request("stop", {})
|
|
log.info("Brain stopped")
|
|
|
|
# ── Event handling ──
|
|
|
|
def update_from_event(self, event: str, data: dict):
|
|
if event == "spawn_complete":
|
|
pos = data.get("position", {})
|
|
self._behaviors.position = {
|
|
"x": pos.get("x", 0), "y": pos.get("y", 0), "z": pos.get("z", 0)
|
|
}
|
|
self._behaviors.spawn_pos = dict(self._behaviors.position)
|
|
self._memory.set_home(self._behaviors.position)
|
|
|
|
elif event == "health_changed":
|
|
self._behaviors.health = data.get("health", 20)
|
|
self._behaviors.food = data.get("food", 20)
|
|
self._needs.on_health_change(self._behaviors.health, self._behaviors.food)
|
|
|
|
elif event == "time_update":
|
|
self._behaviors.day_time = data.get("dayTime", 0)
|
|
|
|
elif event == "movement_complete":
|
|
self._tasks.complete_current()
|
|
|
|
elif event == "movement_failed":
|
|
self._tasks.fail_current()
|
|
|
|
elif event == "death":
|
|
self._tasks.cancel_all()
|
|
self._needs.on_death()
|
|
self._goals.on_death()
|
|
log.info("Doug died — all tasks cleared")
|
|
|
|
elif event == "player_joined":
|
|
username = data.get("username", "")
|
|
if username and username != self._doug_name:
|
|
self._needs.on_player_nearby()
|
|
log.info(f"Player joined: {username}")
|
|
|
|
elif event == "player_left":
|
|
username = data.get("username", "")
|
|
if username:
|
|
log.info(f"Player left: {username}")
|
|
|
|
# ── Main tick ──
|
|
|
|
def _tick(self):
|
|
from PySide6.QtNetwork import QAbstractSocket
|
|
if not self._running:
|
|
return
|
|
if self._ws._ws.state() != QAbstractSocket.SocketState.ConnectedState:
|
|
return
|
|
|
|
self._tick_count += 1
|
|
|
|
# If executing a subtask, wait (with timeout safety)
|
|
if self._tasks.is_busy:
|
|
if time.time() - self._action_sent_time > 25:
|
|
log.debug("Action timed out — unsticking")
|
|
self._tasks.fail_current()
|
|
else:
|
|
return
|
|
|
|
# Unstick pending scan
|
|
if self._pending_scan and (time.time() - self._last_scan_time > 10):
|
|
self._pending_scan = False
|
|
|
|
# Step 1: Decay needs
|
|
self._needs.decay(
|
|
health=self._behaviors.health,
|
|
food=self._behaviors.food,
|
|
has_shelter=self._is_sheltered(),
|
|
is_night=self._behaviors.is_night,
|
|
has_players_nearby=bool(self._behaviors.nearby_players),
|
|
hostiles_nearby=len(self._behaviors.nearby_hostiles),
|
|
)
|
|
|
|
# Step 2: Scan surroundings periodically
|
|
if not self._pending_scan and (time.time() - self._last_scan_time > self._scan_interval):
|
|
self._pending_scan = True
|
|
self._last_scan_time = time.time()
|
|
self._ws.send_request("scan_surroundings", {"radius": 16}, self._on_scan)
|
|
self._ws.send_request("get_inventory", {}, self._on_inventory)
|
|
return
|
|
|
|
if self._pending_scan:
|
|
return
|
|
|
|
# Step 3: Check for interrupts (combat/flee)
|
|
self._check_interrupts()
|
|
|
|
# Step 4: If stack is empty, generate self-directed tasks
|
|
if not self._tasks.current_task:
|
|
self._generate_self_directed()
|
|
|
|
# Step 5: Execute next subtask from the stack
|
|
self._execute_next()
|
|
|
|
# ── Scanning ──
|
|
|
|
def _on_scan(self, response: ResponseMessage):
|
|
self._pending_scan = False
|
|
if response.status != "success":
|
|
return
|
|
|
|
data = response.data
|
|
self._behaviors.position = data.get("position", self._behaviors.position)
|
|
self._behaviors.health = data.get("health", self._behaviors.health)
|
|
self._behaviors.food = data.get("food", self._behaviors.food)
|
|
self._behaviors.day_time = data.get("time", self._behaviors.day_time)
|
|
self._behaviors.is_raining = data.get("isRaining", False)
|
|
self._behaviors.nearby_players = data.get("players", [])
|
|
self._behaviors.nearby_containers = data.get("containers", [])
|
|
self._behaviors.nearby_signs = data.get("signs", [])
|
|
self._behaviors.nearby_blocks = data.get("blocks", {})
|
|
|
|
entities = data.get("entities", [])
|
|
self._behaviors.nearby_entities = entities
|
|
self._behaviors.nearby_hostiles = [e for e in entities if e.get("isHostile", False)]
|
|
|
|
self._memory.update_from_scan(
|
|
position=self._behaviors.position,
|
|
blocks=self._behaviors.nearby_blocks,
|
|
containers=self._behaviors.nearby_containers,
|
|
players=self._behaviors.nearby_players,
|
|
hostiles=self._behaviors.nearby_hostiles,
|
|
)
|
|
self._needs.on_scan(
|
|
hostiles_nearby=len(self._behaviors.nearby_hostiles),
|
|
players_nearby=len(self._behaviors.nearby_players),
|
|
)
|
|
|
|
def _on_inventory(self, response: ResponseMessage):
|
|
if response.status != "success":
|
|
return
|
|
old_items = set(i.get("name", "") for i in self._behaviors.inventory)
|
|
self._behaviors.inventory = response.data.get("items", [])
|
|
new_items = set(i.get("name", "") for i in self._behaviors.inventory)
|
|
|
|
# Auto-equip armor if we picked up new armor pieces
|
|
new_pickups = new_items - old_items
|
|
armor_words = ("helmet", "chestplate", "leggings", "boots")
|
|
if any(any(a in item for a in armor_words) for item in new_pickups):
|
|
self._ws.send_request("equip_armor", {})
|
|
|
|
# ── Interrupts (combat/flee — temporary, don't affect stack) ──
|
|
|
|
def _check_interrupts(self):
|
|
"""Check for immediate threats that need a temporary interrupt."""
|
|
# Don't stack interrupts — if already fighting/fleeing, let it finish
|
|
if self._tasks._interruption and self._tasks._interruption.status == TaskStatus.ACTIVE:
|
|
return
|
|
|
|
# Cooldown after combat — don't re-engage for 10 seconds
|
|
if hasattr(self, '_last_combat_time') and time.time() - self._last_combat_time < 10:
|
|
return
|
|
|
|
b = self._behaviors
|
|
bravery = self._traits.get("bravery", 50)
|
|
|
|
hostile = self._nearest_hostile(10)
|
|
if not hostile:
|
|
return
|
|
|
|
dist = hostile.get("distance", 99)
|
|
mob_type = hostile.get("type", "mob")
|
|
|
|
# FIGHT if brave enough and healthy
|
|
should_fight = bravery > 30 and b.health > 8 and dist < 6
|
|
# FLEE if scared or hurt
|
|
should_flee = (not should_fight) and (dist < 8) and (b.health < 14 or bravery < 30)
|
|
|
|
if should_fight:
|
|
self._last_combat_time = time.time()
|
|
self._tasks.interrupt(make_interrupt(
|
|
"combat", f"Fighting {mob_type}!",
|
|
"attack_nearest_hostile", {"range": 6}, timeout=12,
|
|
))
|
|
elif should_flee:
|
|
hpos = hostile.get("position", b.position)
|
|
dx = b.position["x"] - hpos.get("x", 0)
|
|
dz = b.position["z"] - hpos.get("z", 0)
|
|
d = max(0.1, math.sqrt(dx * dx + dz * dz))
|
|
flee_x = b.position["x"] + (dx / d) * 15
|
|
flee_z = b.position["z"] + (dz / d) * 15
|
|
|
|
self._tasks.interrupt(make_interrupt(
|
|
"flee", f"Fleeing from {mob_type}!",
|
|
"move_to", {"x": flee_x, "y": b.position["y"], "z": flee_z, "range": 3},
|
|
timeout=15,
|
|
))
|
|
|
|
# ── Self-directed task generation ──
|
|
|
|
def _generate_self_directed(self):
|
|
"""Generate a task when the stack is empty."""
|
|
b = self._behaviors
|
|
|
|
# HUNGER — eat if we have food
|
|
if self._needs.hunger < 25:
|
|
food = self._find_food()
|
|
if food:
|
|
self._tasks.push(make_task(
|
|
"eat", Priority.URGENT, f"Eating {food}",
|
|
"equip_item", {"name": food, "destination": "hand"}, timeout=10,
|
|
))
|
|
return
|
|
|
|
# SHELTER at night
|
|
if b.is_night and self._needs.shelter < 30:
|
|
home = self._memory.home
|
|
if home and self._distance_to(home) > 10:
|
|
self._tasks.push(make_task(
|
|
"go_home", Priority.URGENT, "Heading home for the night",
|
|
"move_to", {"x": home["x"], "y": home["y"], "z": home["z"], "range": 3},
|
|
timeout=30,
|
|
))
|
|
return
|
|
|
|
# SOCIAL — approach nearby players
|
|
if self._needs.social < 30 and b.nearby_players:
|
|
player = b.nearby_players[0]
|
|
dist = player.get("distance", 99)
|
|
name = player.get("name", "player")
|
|
if dist > 5:
|
|
self._tasks.push(make_task(
|
|
f"approach_{name}", Priority.NORMAL, f"Walking toward {name}",
|
|
"move_to", {**player["position"], "range": 3}, timeout=15,
|
|
))
|
|
return
|
|
# Close — say hi (throttled)
|
|
if time.time() - self._last_chat_time > 45:
|
|
self._last_chat_time = time.time()
|
|
self._needs.social = min(100, self._needs.social + 40)
|
|
self.wants_ai_chat.emit(
|
|
self._build_chat_context(),
|
|
f"You notice {name} nearby. Say something friendly and short."
|
|
)
|
|
return
|
|
|
|
# GOAL PROGRESS
|
|
goal = self._goals.get_active_goal()
|
|
if goal:
|
|
step = self._goals.get_next_step(goal, b, self._memory)
|
|
if step:
|
|
# Wrap the old Task as a PrimaryTask
|
|
task = make_task(
|
|
step.name, Priority.SELF_DIRECTED, step.description,
|
|
step.action, step.params, timeout=step.timeout,
|
|
)
|
|
self._tasks.push(task)
|
|
return
|
|
else:
|
|
self._goals.complete_goal(goal["name"])
|
|
self._needs.boredom = min(100, self._needs.boredom + 20)
|
|
|
|
# DAILY ROUTINE
|
|
phase = self._routine.get_phase(b.day_time)
|
|
if phase == "morning" and not self._goals.has_any_goals():
|
|
self._goals.seed_initial_goals(self._memory, b)
|
|
|
|
if phase == "evening":
|
|
home = self._memory.home
|
|
if home and self._distance_to(home) > 15:
|
|
self._tasks.push(make_task(
|
|
"evening_return", Priority.NORMAL, "Heading home for the evening",
|
|
"move_to", {"x": home["x"], "y": home["y"], "z": home["z"], "range": 3},
|
|
timeout=30,
|
|
))
|
|
return
|
|
|
|
# BOREDOM — explore
|
|
if self._needs.boredom < 30:
|
|
interesting = self._memory.get_nearest_unexplored(b.position, max_dist=30)
|
|
if interesting:
|
|
self._needs.boredom = min(100, self._needs.boredom + 15)
|
|
self._tasks.push(make_task(
|
|
"explore", Priority.SELF_DIRECTED, f"Checking out a {interesting['type']}",
|
|
"move_to", {"x": interesting["x"], "y": interesting["y"],
|
|
"z": interesting["z"], "range": 2},
|
|
timeout=20,
|
|
))
|
|
return
|
|
|
|
# Explore new direction
|
|
angle = self._memory.suggest_explore_angle(b.position)
|
|
curiosity = self._traits.get("curiosity", 50)
|
|
dist = random.uniform(8, 12 + curiosity // 10)
|
|
target_x = b.position["x"] + math.cos(angle) * dist
|
|
target_z = b.position["z"] + math.sin(angle) * dist
|
|
self._needs.boredom = min(100, self._needs.boredom + 10)
|
|
self._tasks.push(make_task(
|
|
"explore", Priority.SELF_DIRECTED, "Exploring",
|
|
"move_to", {"x": target_x, "y": b.position["y"], "z": target_z, "range": 2},
|
|
timeout=20,
|
|
))
|
|
return
|
|
|
|
# IDLE — look around, maybe chat
|
|
if b.nearby_players and time.time() - self._last_chat_time > 60:
|
|
if random.random() < 0.1:
|
|
self._last_chat_time = time.time()
|
|
self.wants_ai_chat.emit(self._build_chat_context(), "Say something casual.")
|
|
return
|
|
|
|
if random.random() < 0.3:
|
|
self._tasks.push(make_task(
|
|
"look_around", Priority.IDLE, "",
|
|
"look_at", {
|
|
"x": b.position["x"] + random.uniform(-20, 20),
|
|
"y": b.position["y"] + random.uniform(-3, 5),
|
|
"z": b.position["z"] + random.uniform(-20, 20),
|
|
}, timeout=3,
|
|
))
|
|
|
|
# ── Execute next subtask from the stack ──
|
|
|
|
def _execute_next(self):
|
|
"""Get the next subtask from the stack and send it to the bridge."""
|
|
subtask = self._tasks.get_next_action()
|
|
if not subtask:
|
|
return
|
|
|
|
# Log significant actions
|
|
if subtask.description:
|
|
current = self._tasks.current_task
|
|
priority_name = current.priority.name if current else "?"
|
|
log.info(f"[{priority_name}] {subtask.description}")
|
|
|
|
self._action_sent_time = time.time()
|
|
|
|
def on_response(resp: ResponseMessage):
|
|
if resp.status == "success":
|
|
data = resp.data or {}
|
|
|
|
# Craft results — report to chat
|
|
if subtask.action == "craft_item":
|
|
if data.get("crafted"):
|
|
item = data.get("item", "item").replace("_", " ")
|
|
self._ws.send_request("send_chat", {
|
|
"message": f"Done! Crafted {data.get('count', 1)} {item}."
|
|
})
|
|
else:
|
|
error = data.get("error", "Something went wrong.")
|
|
self._ws.send_request("send_chat", {"message": error})
|
|
self._tasks.complete_current()
|
|
return
|
|
|
|
# Equip results — report to chat
|
|
if subtask.action == "equip_item":
|
|
item = subtask.params.get("name", "item").replace("_", " ")
|
|
self._ws.send_request("send_chat", {"message": f"Equipped {item}."})
|
|
self._tasks.complete_current()
|
|
return
|
|
|
|
# Inventory check — report to chat
|
|
if subtask.action == "get_inventory":
|
|
items = data.get("items", [])
|
|
if items:
|
|
item_strs = [f"{i['count']}x {i['name'].replace('_',' ')}" for i in items[:8]]
|
|
msg = "I have: " + ", ".join(item_strs)
|
|
if len(items) > 8:
|
|
msg += f" and {len(items) - 8} more"
|
|
self._ws.send_request("send_chat", {"message": msg})
|
|
else:
|
|
self._ws.send_request("send_chat", {"message": "My inventory is empty."})
|
|
self._tasks.complete_current()
|
|
return
|
|
|
|
# Movement actions wait for movement_complete event
|
|
if subtask.action in ("move_to", "move_relative", "follow_player"):
|
|
return # Don't complete yet — wait for event
|
|
|
|
# Everything else completes immediately
|
|
self._tasks.complete_current()
|
|
|
|
else:
|
|
error = resp.error or "Something went wrong"
|
|
log.debug(f"Action failed: {error}")
|
|
# Report HIGH priority failures to chat
|
|
current = self._tasks.current_task
|
|
if current and current.priority >= Priority.HIGH:
|
|
self._ws.send_request("send_chat", {"message": error})
|
|
self._tasks.fail_current()
|
|
|
|
self._ws.send_request(subtask.action, subtask.params, on_response)
|
|
|
|
# ── Helpers ──
|
|
|
|
def _build_chat_context(self) -> str:
|
|
parts = []
|
|
b = self._behaviors
|
|
if b.nearby_players:
|
|
names = [p["name"] for p in b.nearby_players]
|
|
parts.append(f"Players nearby: {', '.join(names)}")
|
|
if b.is_night:
|
|
parts.append("It's nighttime")
|
|
if b.is_raining:
|
|
parts.append("It's raining")
|
|
if b.health < 10:
|
|
parts.append(f"Health is low ({b.health})")
|
|
if b.nearby_hostiles:
|
|
types = [h["type"] for h in b.nearby_hostiles[:3]]
|
|
parts.append(f"Nearby mobs: {', '.join(types)}")
|
|
|
|
# Current task context
|
|
current = self._tasks.current_task
|
|
if current and current.description:
|
|
parts.append(f"Currently: {current.description}")
|
|
|
|
# Stack depth
|
|
depth = self._tasks.stack_depth
|
|
if depth > 1:
|
|
parts.append(f"Tasks queued: {depth}")
|
|
|
|
return "; ".join(parts) if parts else "Nothing special happening"
|
|
|
|
def _nearest_hostile(self, max_dist: float):
|
|
closest = None
|
|
closest_dist = max_dist
|
|
for h in self._behaviors.nearby_hostiles:
|
|
d = h.get("distance", 99)
|
|
if d < closest_dist:
|
|
closest = h
|
|
closest_dist = d
|
|
return closest
|
|
|
|
def _distance_to(self, pos: dict) -> float:
|
|
b = self._behaviors.position
|
|
dx = b["x"] - pos.get("x", 0)
|
|
dz = b["z"] - pos.get("z", 0)
|
|
return math.sqrt(dx * dx + dz * dz)
|
|
|
|
def _find_food(self):
|
|
food_items = {
|
|
"cooked_beef", "cooked_porkchop", "cooked_chicken", "cooked_mutton",
|
|
"cooked_rabbit", "cooked_salmon", "cooked_cod", "bread", "apple",
|
|
"golden_apple", "melon_slice", "sweet_berries", "baked_potato",
|
|
"mushroom_stew", "rabbit_stew", "cookie", "pumpkin_pie",
|
|
"carrot", "potato", "dried_kelp",
|
|
}
|
|
for item in self._behaviors.inventory:
|
|
if item.get("name", "").replace("minecraft:", "") in food_items:
|
|
return item["name"]
|
|
return None
|
|
|
|
def _is_sheltered(self) -> bool:
|
|
home = self._memory.home
|
|
if not home:
|
|
return False
|
|
return self._distance_to(home) < 15
|
|
|
|
@property
|
|
def current_action(self) -> str:
|
|
task = self._tasks.current_task
|
|
if task:
|
|
st = task.current_subtask()
|
|
return st.description if st and st.description else task.description
|
|
return "idle"
|
|
|
|
@property
|
|
def is_night(self) -> bool:
|
|
return self._behaviors.is_night
|