- Hybrid Python/Node.js architecture with WebSocket bridge - PySide6 desktop app with smoky blue futuristic theme - Dashboard, Create Doug, Settings screens - bedrock-protocol connection to BDS (offline + Xbox Live auth) - Realm support (auth flow with device code + browser auto-open) - Ollama integration with lean persona prompt (~95 tokens) - 40 personality traits (15 sliders + 23 quirks + 2 toggles) - SQLite + MariaDB database with 12 tables - Chat working in-game with proper Bedrock text packet format - RakNet protocol 11 patch for newer BDS versions - jsp-raknet backend (native crashes on ARM64 macOS) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
166 lines
5.3 KiB
Python
166 lines
5.3 KiB
Python
"""
|
|
Database connection factory.
|
|
Supports SQLite (default) and MariaDB backends.
|
|
"""
|
|
|
|
import sqlite3
|
|
from typing import Optional, Any
|
|
from pathlib import Path
|
|
from dougbot.utils.logging import get_logger
|
|
|
|
log = get_logger("db.connection")
|
|
|
|
|
|
class DatabaseConnection:
|
|
"""Abstract database connection wrapper."""
|
|
|
|
def __init__(self):
|
|
self._conn = None
|
|
self._db_type = "sqlite"
|
|
|
|
@staticmethod
|
|
def create(db_type: str = "sqlite", **kwargs) -> "DatabaseConnection":
|
|
"""Factory method to create a database connection."""
|
|
if db_type == "mariadb":
|
|
return MariaDBConnection(**kwargs)
|
|
return SQLiteConnection(**kwargs)
|
|
|
|
def execute(self, query: str, params: tuple = ()) -> Any:
|
|
raise NotImplementedError
|
|
|
|
def executemany(self, query: str, params_list: list[tuple]) -> None:
|
|
raise NotImplementedError
|
|
|
|
def fetchone(self, query: str, params: tuple = ()) -> Optional[dict]:
|
|
raise NotImplementedError
|
|
|
|
def fetchall(self, query: str, params: tuple = ()) -> list[dict]:
|
|
raise NotImplementedError
|
|
|
|
def commit(self) -> None:
|
|
raise NotImplementedError
|
|
|
|
def close(self) -> None:
|
|
raise NotImplementedError
|
|
|
|
@property
|
|
def db_type(self) -> str:
|
|
return self._db_type
|
|
|
|
|
|
class SQLiteConnection(DatabaseConnection):
|
|
"""SQLite database connection."""
|
|
|
|
def __init__(self, db_path: Optional[str] = None, **kwargs):
|
|
super().__init__()
|
|
self._db_type = "sqlite"
|
|
|
|
if db_path is None:
|
|
db_path = str(Path.home() / ".dougbot" / "dougbot.db")
|
|
|
|
# Ensure directory exists
|
|
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
self._conn.row_factory = sqlite3.Row
|
|
self._conn.execute("PRAGMA journal_mode=WAL")
|
|
self._conn.execute("PRAGMA foreign_keys=ON")
|
|
|
|
log.info(f"SQLite connected: {db_path}")
|
|
|
|
def execute(self, query: str, params: tuple = ()) -> Any:
|
|
cursor = self._conn.execute(query, params)
|
|
self._conn.commit()
|
|
return cursor
|
|
|
|
def executemany(self, query: str, params_list: list[tuple]) -> None:
|
|
self._conn.executemany(query, params_list)
|
|
self._conn.commit()
|
|
|
|
def fetchone(self, query: str, params: tuple = ()) -> Optional[dict]:
|
|
cursor = self._conn.execute(query, params)
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
def fetchall(self, query: str, params: tuple = ()) -> list[dict]:
|
|
cursor = self._conn.execute(query, params)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
def commit(self) -> None:
|
|
self._conn.commit()
|
|
|
|
def close(self) -> None:
|
|
if self._conn:
|
|
self._conn.close()
|
|
log.info("SQLite connection closed")
|
|
|
|
|
|
class MariaDBConnection(DatabaseConnection):
|
|
"""MariaDB database connection."""
|
|
|
|
def __init__(self, host: str = "127.0.0.1", port: int = 3306,
|
|
user: str = "", password: str = "", database: str = "dougbot",
|
|
**kwargs):
|
|
super().__init__()
|
|
self._db_type = "mariadb"
|
|
|
|
try:
|
|
import mysql.connector
|
|
self._conn = mysql.connector.connect(
|
|
host=host,
|
|
port=port,
|
|
user=user,
|
|
password=password,
|
|
database=database,
|
|
autocommit=True,
|
|
)
|
|
self._mysql = mysql.connector
|
|
log.info(f"MariaDB connected: {host}:{port}/{database}")
|
|
except ImportError:
|
|
raise RuntimeError(
|
|
"mysql-connector-python is required for MariaDB support. "
|
|
"Install with: pip install mysql-connector-python"
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to connect to MariaDB: {e}")
|
|
|
|
def _prepare(self, query: str) -> str:
|
|
"""Adapt a query for MariaDB: fix placeholders and reserved words."""
|
|
import re
|
|
query = query.replace("?", "%s")
|
|
# Backtick `key` when used as a column name (it's a MySQL reserved word)
|
|
# Match "key" as a standalone word not already backticked
|
|
query = re.sub(r'(?<!`)(?<!\w)\bkey\b(?!`)(?!\w*\()', '`key`', query)
|
|
return query
|
|
|
|
def execute(self, query: str, params: tuple = ()) -> Any:
|
|
query = self._prepare(query)
|
|
cursor = self._conn.cursor(dictionary=True)
|
|
cursor.execute(query, params)
|
|
self._conn.commit()
|
|
return cursor
|
|
|
|
def executemany(self, query: str, params_list: list[tuple]) -> None:
|
|
query = self._prepare(query)
|
|
cursor = self._conn.cursor()
|
|
cursor.executemany(query, params_list)
|
|
|
|
def fetchone(self, query: str, params: tuple = ()) -> Optional[dict]:
|
|
query = self._prepare(query)
|
|
cursor = self._conn.cursor(dictionary=True)
|
|
cursor.execute(query, params)
|
|
return cursor.fetchone()
|
|
|
|
def fetchall(self, query: str, params: tuple = ()) -> list[dict]:
|
|
query = self._prepare(query)
|
|
cursor = self._conn.cursor(dictionary=True)
|
|
cursor.execute(query, params)
|
|
return cursor.fetchall()
|
|
|
|
def commit(self) -> None:
|
|
self._conn.commit()
|
|
|
|
def close(self) -> None:
|
|
if self._conn:
|
|
self._conn.close()
|
|
log.info("MariaDB connection closed")
|