Files
claudia-docs-api/app/database.py
Motoko bbbe42358d Phase 2: Add reasoning and TipTap content endpoints
- Extend Document model with reasoning_type, confidence, reasoning_steps, model_source, tiptap_content fields
- Add new endpoints:
  - GET /documents/{id}/reasoning - Get reasoning metadata
  - PATCH /documents/{id}/reasoning - Update reasoning metadata
  - GET /documents/{id}/reasoning-panel - Get reasoning panel data for UI
  - POST /documents/{id}/reasoning-steps - Add reasoning step
  - DELETE /documents/{id}/reasoning-steps/{step} - Delete reasoning step
  - GET /documents/{id}/content?format=tiptap|markdown - Get content in TipTap or Markdown
  - PUT /documents/{id}/content - Update content (supports both TipTap JSON and Markdown)
- Add TipTap to Markdown conversion
- Update database schema with new columns
- Add comprehensive tests for all new endpoints
- All 37 tests passing
2026-03-30 23:11:44 +00:00

317 lines
11 KiB
Python

import asyncio
import uuid
from contextlib import asynccontextmanager
from pathlib import Path
from sqlalchemy import create_engine, event, text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from app.config import settings
class Base(DeclarativeBase):
pass
# Async session factory - uses DATABASE_URL from settings
async_engine = create_async_engine(
settings.resolved_database_url,
echo=False,
connect_args={"check_same_thread": False}
)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)
# Sync engine for migrations (same database)
_sync_db_path = settings.resolved_database_url.replace("sqlite+aiosqlite:///", "sqlite:///")
sync_engine = create_engine(
_sync_db_path,
echo=False,
connect_args={"check_same_thread": False}
)
SyncSessionLocal = sessionmaker(sync_engine)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def get_db_simple():
"""Bare async generator for FastAPI dependency injection."""
async with AsyncSessionLocal() as session:
yield session
async def init_db():
"""Initialize database with all tables, views, FTS5, and triggers."""
# Ensure data directory exists
db_path = settings.resolved_database_url.replace("sqlite+aiosqlite:///", "")
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
async with async_engine.begin() as conn:
# Create all tables via SQL (not ORM) to handle SQLite-specific features
await conn.run_sync(_create_schema)
# Create initial admin user if configured and doesn't exist
await _create_initial_admin()
async def _create_initial_admin():
"""Create initial admin user from environment variables if it doesn't exist."""
import bcrypt
import logging
logger = logging.getLogger(__name__)
logger.info(f"Initial admin username from env: {settings.INITIAL_ADMIN_USERNAME}")
logger.info(f"Initial admin password set: {'Yes' if settings.INITIAL_ADMIN_PASSWORD else 'No'}")
username = settings.INITIAL_ADMIN_USERNAME
password = settings.INITIAL_ADMIN_PASSWORD
if not username or not password:
logger.warning(
"_create_initial_admin skipped: INITIAL_ADMIN_USERNAME or "
"INITIAL_ADMIN_PASSWORD not set"
)
return
logger.info(f"_create_initial_admin: creating admin '{username}'")
async with AsyncSessionLocal() as session:
# Check if username already exists (any role)
result = await session.execute(
text("SELECT id, role FROM agents WHERE username = :username"),
{"username": username}
)
existing = result.fetchone()
if existing:
logger.info(
f"_create_initial_admin: username '{username}' already exists "
f"(role={existing.role}), skipping creation"
)
return
# Create admin user with bcrypt hash
password_hash = bcrypt.hashpw(
password.encode("utf-8"),
bcrypt.gensalt()
).decode("utf-8")
await session.execute(
text("""
INSERT INTO agents (id, username, password_hash, role, is_deleted, created_at, updated_at)
VALUES (:id, :username, :password_hash, 'admin', 0, datetime('now'), datetime('now'))
"""),
{
"id": str(uuid.uuid4()),
"username": username,
"password_hash": password_hash
}
)
await session.commit()
logger.info(f"_create_initial_admin: admin '{username}' created successfully")
def _create_schema(sync_conn):
"""Create all tables, views, FTS5 tables, and triggers synchronously."""
# Agents table
sync_conn.execute(text("""
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'agent' CHECK (role IN ('agent', 'admin')),
is_deleted INTEGER NOT NULL DEFAULT 0,
deleted_at TIMESTAMP NULL,
deleted_by TEXT NULL,
created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')),
updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now'))
)
"""))
# Projects table
sync_conn.execute(text("""
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
agent_id TEXT NOT NULL REFERENCES agents(id),
is_deleted INTEGER NOT NULL DEFAULT 0,
deleted_at TIMESTAMP NULL,
deleted_by TEXT NULL,
created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')),
updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now'))
)
"""))
# Folders table
sync_conn.execute(text("""
CREATE TABLE IF NOT EXISTS folders (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
parent_id TEXT REFERENCES folders(id) ON DELETE CASCADE,
path TEXT NOT NULL,
is_deleted INTEGER NOT NULL DEFAULT 0,
deleted_at TIMESTAMP NULL,
deleted_by TEXT NULL,
created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')),
updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now'))
)
"""))
# Documents table
sync_conn.execute(text("""
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL DEFAULT '',
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
folder_id TEXT REFERENCES folders(id) ON DELETE SET NULL,
path TEXT NOT NULL,
is_deleted INTEGER NOT NULL DEFAULT 0,
deleted_at TIMESTAMP NULL,
deleted_by TEXT NULL,
created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')),
updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')),
reasoning_type TEXT,
confidence TEXT,
reasoning_steps TEXT,
model_source TEXT,
tiptap_content TEXT
)
"""))
# Tags table
sync_conn.execute(text("""
CREATE TABLE IF NOT EXISTS tags (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
color TEXT NOT NULL DEFAULT '#6366f1',
is_deleted INTEGER NOT NULL DEFAULT 0,
deleted_at TIMESTAMP NULL,
deleted_by TEXT NULL,
created_at TIMESTAMP NOT NULL DEFAULT (datetime('now'))
)
"""))
# Document tags junction
sync_conn.execute(text("""
CREATE TABLE IF NOT EXISTS document_tags (
document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
tag_id TEXT NOT NULL REFERENCES tags(id) ON DELETE CASCADE,
PRIMARY KEY (document_id, tag_id)
)
"""))
# Refresh tokens table
sync_conn.execute(text("""
CREATE TABLE IF NOT EXISTS refresh_tokens (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES agents(id),
token_hash TEXT NOT NULL UNIQUE,
token_family_id TEXT NOT NULL,
token_version INTEGER NOT NULL,
user_agent TEXT,
ip_address TEXT,
created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')),
expires_at TIMESTAMP NOT NULL,
revoked_at TIMESTAMP NULL,
is_global_logout INTEGER NOT NULL DEFAULT 0
)
"""))
# JWT blocklist table
sync_conn.execute(text("""
CREATE TABLE IF NOT EXISTS jwt_blocklist (
token_id TEXT PRIMARY KEY,
revoked_at TIMESTAMP NOT NULL DEFAULT (datetime('now')),
expires_at TIMESTAMP NOT NULL
)
"""))
# Indexes
sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_projects_agent ON projects(agent_id)"))
sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_folders_project ON folders(project_id)"))
sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_folders_parent ON folders(parent_id)"))
sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_documents_project ON documents(project_id)"))
sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_documents_folder ON documents(folder_id)"))
sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_document_tags_doc ON document_tags(document_id)"))
sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_document_tags_tag ON document_tags(tag_id)"))
sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_refresh_tokens_hash ON refresh_tokens(token_hash)"))
sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_refresh_tokens_user_family ON refresh_tokens(user_id, token_family_id)"))
# --- FTS5 virtual table ---
sync_conn.execute(text("""
CREATE VIRTUAL TABLE IF NOT EXISTS documents_fts USING fts5(
document_id,
title,
content,
path,
tokenize='unicode61 remove_diacritics 1'
)
"""))
# --- Active views for soft deletes ---
sync_conn.execute(text("""
CREATE VIEW IF NOT EXISTS active_agents AS
SELECT * FROM agents WHERE is_deleted = 0
"""))
sync_conn.execute(text("""
CREATE VIEW IF NOT EXISTS active_projects AS
SELECT * FROM projects WHERE is_deleted = 0
"""))
sync_conn.execute(text("""
CREATE VIEW IF NOT EXISTS active_folders AS
SELECT * FROM folders WHERE is_deleted = 0
"""))
sync_conn.execute(text("""
CREATE VIEW IF NOT EXISTS active_documents AS
SELECT * FROM documents WHERE is_deleted = 0
"""))
sync_conn.execute(text("""
CREATE VIEW IF NOT EXISTS active_tags AS
SELECT * FROM tags WHERE is_deleted = 0
"""))
# --- FTS5 Sync Triggers ---
# Insert trigger
sync_conn.execute(text("""
CREATE TRIGGER IF NOT EXISTS documents_fts_ai AFTER INSERT ON documents BEGIN
INSERT INTO documents_fts(document_id, title, content, path)
VALUES (new.id, new.title, new.content, new.path);
END
"""))
# Update trigger (delete old + insert new)
sync_conn.execute(text("""
CREATE TRIGGER IF NOT EXISTS documents_fts_au AFTER UPDATE ON documents BEGIN
DELETE FROM documents_fts WHERE document_id = old.id;
INSERT INTO documents_fts(document_id, title, content, path)
VALUES (new.id, new.title, new.content, new.path);
END
"""))
# Soft-delete trigger (when is_deleted becomes TRUE)
sync_conn.execute(text("""
CREATE TRIGGER IF NOT EXISTS documents_fts_ad AFTER UPDATE ON documents
WHEN new.is_deleted = 1 AND old.is_deleted = 0
BEGIN
DELETE FROM documents_fts WHERE document_id = old.id;
END
"""))
sync_conn.commit()