dougbot/dougbot/core/behaviors.py
roberts 195ef2d860 Major overhaul: combat fix, smart brain, Ollama stability, crafting
COMBAT:
- Fixed attack serialization: proper heldItemToNotch() for empty hand
- Attack now looks at target before swinging (server validates aim)
- Player position includes eye height in attack packet
- Click position calculated relative to target entity

INTELLIGENCE (complete rewrite):
- NeedsSystem: Sims-like needs (safety, hunger, social, shelter, boredom)
  that decay over time and drive behavior priorities
- GoalManager: Long-term goals broken into steps (gather_wood,
  explore_area, find_food, check_container, go_home)
- SpatialMemory: Remembers locations of containers, crafting tables,
  interesting blocks, and home position
- DailyRoutine: Morning/day/evening/night phases with trait-influenced
  activity selection
- Brain now WAITS for tasks to complete instead of piling on new ones
- Goal-based decision making replaces random wander

OLLAMA:
- Pre-warm model on deploy (loads into GPU before first chat)
- Keep-alive pings every 2 minutes (prevents model unload)
- Adaptive timeouts: 60s cold, 15s warm, 90s retry
- Auto-retry on timeout failure

CRAFTING:
- Quantity parsing ("craft 5 sticks", "craft a wooden pickaxe")
- Number words supported ("craft three planks")
- Smarter item name extraction with preposition stopping

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 16:07:23 -05:00

599 lines
23 KiB
Python

"""
Behavior systems for Doug's goal-based brain.
Modules:
- NeedsSystem: Sims-like needs (safety, hunger, social, shelter, boredom) 0-100
- GoalManager: Long-term goals broken into steps
- SpatialMemory: Remembers where things are
- DailyRoutine: Time-of-day activity phases
- BehaviorEngine: World state container (kept for compatibility)
"""
import math
import random
import time
from typing import Optional
from dougbot.core.task_queue import Task, Priority
from dougbot.utils.logging import get_logger
log = get_logger("core.behaviors")
# ═══════════════════════════════════════════════════════════════════
# NEEDS SYSTEM — like The Sims, values 0-100, decay over time
# ═══════════════════════════════════════════════════════════════════
class NeedsSystem:
"""
Tracks Doug's needs. Each need is 0-100.
Low values = urgent need. High values = satisfied.
Needs decay every brain tick (2s) based on conditions.
"""
def __init__(self, traits: dict):
self._traits = traits
# All needs start satisfied
self.safety = 100 # Threat level (low = danger nearby)
self.hunger = 80 # Food level (low = need to eat)
self.social = 60 # Interaction need (low = lonely)
self.shelter = 70 # Has safe place (low = exposed)
self.boredom = 60 # Stimulation (low = bored)
def decay(self, health: int, food: int, has_shelter: bool,
is_night: bool, has_players_nearby: bool, hostiles_nearby: int):
"""Called every brain tick (2s). Decay needs based on conditions."""
# SAFETY: based on health and threats
if hostiles_nearby > 0:
self.safety = max(0, self.safety - 8 * hostiles_nearby)
elif health < 10:
self.safety = max(0, min(self.safety, health * 5))
else:
# Slowly recover when safe
self.safety = min(100, self.safety + 3)
# HUNGER: directly tied to food bar (food is 0-20 in MC)
self.hunger = min(100, max(0, food * 5))
# SOCIAL: decays when alone, recovers near players
sociability = self._traits.get("sociability", 50)
if has_players_nearby:
self.social = min(100, self.social + 2)
else:
# Higher sociability = faster social decay (needs people more)
decay_rate = 0.3 + (sociability / 200) # 0.3 to 0.55 per tick
self.social = max(0, self.social - decay_rate)
# SHELTER: drops at night without shelter, fine during day
if is_night and not has_shelter:
anxiety = self._traits.get("anxiety", 0)
self.shelter = max(0, self.shelter - (2 + (1 if anxiety else 0)))
elif has_shelter:
self.shelter = min(100, self.shelter + 5)
else:
# Daytime without shelter is fine
self.shelter = min(100, self.shelter + 1)
# BOREDOM: always slowly decays, faster if nothing is happening
curiosity = self._traits.get("curiosity", 50)
decay_rate = 0.5 + (curiosity / 200) # Curious = bored faster
self.boredom = max(0, self.boredom - decay_rate)
def on_health_change(self, health: int, food: int):
"""Immediate update when health changes."""
if health < 6:
self.safety = min(self.safety, 10)
self.hunger = min(100, max(0, food * 5))
def on_death(self):
"""Reset after death."""
self.safety = 50
self.hunger = 80
self.social = 60
self.shelter = 30
self.boredom = 60
def on_player_nearby(self):
"""Boost social when a player appears."""
self.social = min(100, self.social + 15)
def on_scan(self, hostiles_nearby: int, players_nearby: int):
"""Update from scan results."""
if hostiles_nearby == 0:
self.safety = min(100, self.safety + 5)
if players_nearby > 0:
self.social = min(100, self.social + 3)
def get_critical_needs(self) -> list[str]:
"""Get list of critically low needs for context."""
critical = []
if self.safety < 25:
critical.append("unsafe")
if self.hunger < 25:
critical.append("hungry")
if self.social < 20:
critical.append("lonely")
if self.shelter < 20:
critical.append("exposed")
if self.boredom < 15:
critical.append("bored")
return critical
def most_urgent_need(self) -> tuple[str, int]:
"""Return the name and value of the most urgent (lowest) need."""
needs = {
"safety": self.safety,
"hunger": self.hunger,
"social": self.social,
"shelter": self.shelter,
"boredom": self.boredom,
}
lowest = min(needs, key=needs.get)
return lowest, needs[lowest]
# ═══════════════════════════════════════════════════════════════════
# GOAL MANAGER — long-term goals broken into steps
# ═══════════════════════════════════════════════════════════════════
class GoalManager:
"""
Manages Doug's long-term goals. Each goal is a dict:
{
"name": "gather_wood",
"description": "Gather wood for building",
"priority": 5, # 1-10
"steps": [...], # List of step dicts
"current_step": 0,
"created_at": time,
"status": "active" | "complete" | "failed"
}
"""
# Goal templates — what Doug knows how to do
GOAL_TEMPLATES = {
"gather_wood": {
"description": "Gather some wood",
"priority": 5,
"steps": [
{"action": "find_blocks", "params": {"blockName": "oak_log", "radius": 32, "count": 1},
"description": "Finding trees"},
{"action": "dig_block", "params": {}, # position filled from find result
"description": "Chopping wood"},
],
},
"explore_area": {
"description": "Explore the surroundings",
"priority": 3,
"steps": [
{"action": "move_to", "params": {"range": 2},
"description": "Exploring a new area"},
],
},
"find_food": {
"description": "Find something to eat",
"priority": 7,
"steps": [
{"action": "find_blocks",
"params": {"blockName": "wheat", "radius": 32, "count": 1},
"description": "Looking for food sources"},
],
},
"check_container": {
"description": "Check a nearby container",
"priority": 4,
"steps": [
{"action": "move_to", "params": {"range": 2},
"description": "Going to the container"},
{"action": "open_chest", "params": {},
"description": "Opening the container"},
],
},
"visit_interesting": {
"description": "Check out something interesting",
"priority": 3,
"steps": [
{"action": "move_to", "params": {"range": 2},
"description": "Going to check it out"},
],
},
"go_home": {
"description": "Head back home",
"priority": 6,
"steps": [
{"action": "move_to", "params": {"range": 3},
"description": "Walking home"},
],
},
}
def __init__(self, traits: dict, age: int):
self._traits = traits
self._age = age
self._goals: list[dict] = []
self._completed_goals: list[str] = [] # names of recently completed goals
self._max_goals = 5 # Don't pile up too many goals
def has_any_goals(self) -> bool:
return any(g["status"] == "active" for g in self._goals)
def has_goal(self, name: str) -> bool:
return any(g["name"] == name and g["status"] == "active" for g in self._goals)
def add_goal(self, name: str, priority: int = 5, target_pos: dict = None,
extra_params: dict = None):
"""Add a goal from templates."""
if self.has_goal(name):
return
if len([g for g in self._goals if g["status"] == "active"]) >= self._max_goals:
# Remove lowest priority active goal
active = [g for g in self._goals if g["status"] == "active"]
active.sort(key=lambda g: g["priority"])
if active and active[0]["priority"] < priority:
active[0]["status"] = "dropped"
else:
return
template = self.GOAL_TEMPLATES.get(name)
if not template:
log.debug(f"Unknown goal template: {name}")
return
import copy
goal = {
"name": name,
"description": template["description"],
"priority": priority,
"steps": copy.deepcopy(template["steps"]),
"current_step": 0,
"created_at": time.time(),
"status": "active",
"target_pos": target_pos,
"extra_params": extra_params or {},
}
# Fill in target position for movement steps
if target_pos:
for step in goal["steps"]:
if step["action"] == "move_to":
step["params"].update(target_pos)
elif step["action"] == "open_chest":
step["params"].update(target_pos)
self._goals.append(goal)
log.info(f"New goal: {goal['description']} (priority {priority})")
def get_active_goal(self) -> dict | None:
"""Get the highest priority active goal."""
active = [g for g in self._goals if g["status"] == "active"]
if not active:
return None
active.sort(key=lambda g: g["priority"], reverse=True)
return active[0]
def get_next_step(self, goal: dict, behaviors, memory) -> Task | None:
"""Get the next task from a goal's step list."""
if goal["current_step"] >= len(goal["steps"]):
return None # All steps done
step = goal["steps"][goal["current_step"]]
goal["current_step"] += 1
# Build the task from the step
params = dict(step["params"])
params.update(goal.get("extra_params", {}))
# For find_blocks, we need a callback to process the result
if step["action"] == "find_blocks":
return Task(
name=f"goal_{goal['name']}_find",
priority=Priority.NORMAL,
action=step["action"],
params=params,
description=step["description"],
timeout=15,
)
return Task(
name=f"goal_{goal['name']}_step{goal['current_step']}",
priority=Priority.NORMAL,
action=step["action"],
params=params,
description=step["description"],
timeout=20,
)
def complete_goal(self, name: str):
"""Mark a goal as complete."""
for g in self._goals:
if g["name"] == name and g["status"] == "active":
g["status"] = "complete"
self._completed_goals.append(name)
if len(self._completed_goals) > 20:
self._completed_goals.pop(0)
log.info(f"Goal complete: {g['description']}")
break
# Clean up old goals
self._goals = [g for g in self._goals
if g["status"] == "active"
or (time.time() - g["created_at"]) < 300]
def on_death(self):
"""Clear all goals on death."""
for g in self._goals:
if g["status"] == "active":
g["status"] = "failed"
self._goals.clear()
def seed_initial_goals(self, memory, behaviors):
"""Create starting goals based on persona traits."""
curiosity = self._traits.get("curiosity", 50)
# Everyone starts by exploring their surroundings
self.add_goal("explore_area", priority=3,
target_pos=self._random_nearby_pos(behaviors.position, 15))
# Curious Dougs want to explore more
if curiosity > 60:
self.add_goal("explore_area", priority=4,
target_pos=self._random_nearby_pos(behaviors.position, 25))
# If containers are visible, check them
if behaviors.nearby_containers:
container = behaviors.nearby_containers[0]
self.add_goal("check_container", priority=4,
target_pos=container["position"])
def generate_goal_from_environment(self, memory, behaviors):
"""Generate a new goal based on what we know about the world."""
# Check memory for interesting things
containers = memory.get_known("container")
if containers and not self.has_goal("check_container"):
# Visit nearest unvisited container
nearest = min(containers, key=lambda c: _dist(behaviors.position, c))
self.add_goal("check_container", priority=4, target_pos=nearest)
return
crafting_tables = memory.get_known("crafting_table")
if crafting_tables and not self.has_goal("visit_interesting"):
nearest = min(crafting_tables, key=lambda c: _dist(behaviors.position, c))
self.add_goal("visit_interesting", priority=3, target_pos=nearest)
return
# Default: gather wood (always useful)
if not self.has_goal("gather_wood") and random.random() < 0.3:
self.add_goal("gather_wood", priority=4)
return
# Explore somewhere new
if not self.has_goal("explore_area"):
self.add_goal("explore_area", priority=2,
target_pos=self._random_nearby_pos(behaviors.position, 20))
def _random_nearby_pos(self, pos: dict, radius: float) -> dict:
angle = random.uniform(0, 2 * math.pi)
dist = random.uniform(radius * 0.5, radius)
return {
"x": pos["x"] + math.cos(angle) * dist,
"y": pos["y"],
"z": pos["z"] + math.sin(angle) * dist,
}
# ═══════════════════════════════════════════════════════════════════
# SPATIAL MEMORY — remembers where things are
# ═══════════════════════════════════════════════════════════════════
class SpatialMemory:
"""
Remembers locations Doug has seen and been to.
Stores positions of interesting things: containers, crafting tables,
players, hostile spawn areas, etc.
"""
def __init__(self):
self.home: dict | None = None # Home base position
self._known_locations: list[dict] = [] # {type, x, y, z, seen_at}
self._explored_chunks: set[tuple[int, int]] = set() # (chunk_x, chunk_z)
self._visited: list[dict] = [] # positions we've been to
self._max_locations = 200
self._max_visited = 50
def set_home(self, pos: dict):
self.home = {"x": pos["x"], "y": pos["y"], "z": pos["z"]}
def update_from_scan(self, position: dict, blocks: dict,
containers: list, players: list, hostiles: list):
"""Process scan results into memory."""
now = time.time()
# Record that we've been here
chunk = (int(position["x"]) // 16, int(position["z"]) // 16)
self._explored_chunks.add(chunk)
self._visited.append({"x": position["x"], "y": position["y"],
"z": position["z"], "time": now})
if len(self._visited) > self._max_visited:
self._visited.pop(0)
# Remember containers
for c in containers:
self._remember(c.get("type", "container"), c["position"], now)
# Remember interesting blocks
interesting_blocks = {
"crafting_table", "furnace", "chest", "ender_chest",
"enchanting_table", "anvil", "brewing_stand", "diamond_ore",
"iron_ore", "gold_ore", "coal_ore",
}
for block_type, positions in blocks.items():
if block_type in interesting_blocks:
for pos in positions[:3]: # Don't remember too many of one type
self._remember(block_type, pos, now)
# Remember where hostiles spawned (danger zones)
for h in hostiles:
if "position" in h:
self._remember(f"hostile_{h.get('type', 'mob')}", h["position"], now)
def _remember(self, loc_type: str, pos: dict, timestamp: float):
"""Add or update a location in memory."""
# Check if we already know about this spot (within 3 blocks)
for loc in self._known_locations:
if loc["type"] == loc_type:
dx = loc["x"] - pos.get("x", 0)
dz = loc["z"] - pos.get("z", 0)
if dx * dx + dz * dz < 9: # Within 3 blocks
loc["seen_at"] = timestamp
return
# New location
self._known_locations.append({
"type": loc_type,
"x": pos.get("x", 0),
"y": pos.get("y", 0),
"z": pos.get("z", 0),
"seen_at": timestamp,
"visited": False,
})
# Trim old locations if too many
if len(self._known_locations) > self._max_locations:
self._known_locations.sort(key=lambda l: l["seen_at"])
self._known_locations = self._known_locations[-self._max_locations:]
def get_known(self, loc_type: str) -> list[dict]:
"""Get all known locations of a given type."""
return [l for l in self._known_locations if l["type"] == loc_type]
def get_nearest_unexplored(self, pos: dict, max_dist: float = 50) -> dict | None:
"""Find nearest interesting unvisited location."""
unvisited = [l for l in self._known_locations
if not l.get("visited") and not l["type"].startswith("hostile_")]
if not unvisited:
return None
# Sort by distance
def dist(loc):
dx = pos["x"] - loc["x"]
dz = pos["z"] - loc["z"]
return math.sqrt(dx * dx + dz * dz)
unvisited.sort(key=dist)
nearest = unvisited[0]
d = dist(nearest)
if d > max_dist:
return None
# Mark as visited so we don't go back immediately
nearest["visited"] = True
return nearest
def suggest_explore_angle(self, pos: dict) -> float:
"""Suggest a direction to explore that we haven't visited much."""
# Check which directions we've explored
chunk_x = int(pos["x"]) // 16
chunk_z = int(pos["z"]) // 16
# Score each of 8 directions
best_angle = random.uniform(0, 2 * math.pi)
best_score = -1
for i in range(8):
angle = (2 * math.pi * i) / 8
# Check chunks in this direction
check_x = chunk_x + int(math.cos(angle) * 2)
check_z = chunk_z + int(math.sin(angle) * 2)
# Score: prefer unexplored chunks
score = 0
for dx in range(-1, 2):
for dz in range(-1, 2):
if (check_x + dx, check_z + dz) not in self._explored_chunks:
score += 1
# Add some randomness
score += random.uniform(0, 2)
if score > best_score:
best_score = score
best_angle = angle
return best_angle
# ═══════════════════════════════════════════════════════════════════
# DAILY ROUTINE — time-of-day phases
# ═══════════════════════════════════════════════════════════════════
class DailyRoutine:
"""
Maps Minecraft time-of-day to activity phases.
MC day ticks: 0=sunrise, 6000=noon, 12000=sunset, 18000=midnight
"""
def __init__(self, traits: dict, age: int):
self._traits = traits
self._age = age
def get_phase(self, day_time: int) -> str:
"""Get the current routine phase."""
# Normalize to 0-24000
t = day_time % 24000
if t < 1000:
return "morning" # 0-1000: sunrise, wake up
elif t < 11000:
return "day" # 1000-11000: main working hours
elif t < 12500:
return "evening" # 11000-12500: sunset, wind down
else:
return "night" # 12500-24000: nighttime
# ═══════════════════════════════════════════════════════════════════
# BEHAVIOR ENGINE — world state container (kept for compatibility)
# ═══════════════════════════════════════════════════════════════════
class BehaviorEngine:
"""
Holds Doug's world state. Updated by brain from scan results.
Kept for backward compatibility — main_window accesses these fields.
"""
def __init__(self, traits: dict, age: int, doug_name: str):
self._traits = traits
self._age = age
self._name = doug_name
# World state (updated by brain)
self.position = {"x": 0, "y": 0, "z": 0}
self.health = 20
self.food = 20
self.day_time = 0
self.is_raining = False
self.nearby_players: list[dict] = []
self.nearby_entities: list[dict] = []
self.nearby_hostiles: list[dict] = []
self.nearby_containers: list[dict] = []
self.nearby_signs: list[dict] = []
self.nearby_blocks: dict = {}
self.inventory: list[dict] = []
self.spawn_pos = {"x": 0, "y": 0, "z": 0}
@property
def is_night(self) -> bool:
return self.day_time > 12000
# ═══════════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════════
def _dist(pos_a: dict, pos_b: dict) -> float:
"""Distance between two position dicts."""
dx = pos_a.get("x", 0) - pos_b.get("x", 0)
dz = pos_a.get("z", 0) - pos_b.get("z", 0)
return math.sqrt(dx * dx + dz * dz)