import asyncio import uuid from contextlib import asynccontextmanager from pathlib import Path from sqlalchemy import create_engine, event, text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import DeclarativeBase, sessionmaker from app.config import settings class Base(DeclarativeBase): pass # Async session factory - uses DATABASE_URL from settings async_engine = create_async_engine( settings.resolved_database_url, echo=False, connect_args={"check_same_thread": False} ) AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False) # Sync engine for migrations (same database) _sync_db_path = settings.resolved_database_url.replace("sqlite+aiosqlite:///", "sqlite:///") sync_engine = create_engine( _sync_db_path, echo=False, connect_args={"check_same_thread": False} ) SyncSessionLocal = sessionmaker(sync_engine) async def get_db(): async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close() async def get_db_simple(): """Bare async generator for FastAPI dependency injection.""" async with AsyncSessionLocal() as session: yield session async def init_db(): """Initialize database with all tables, views, FTS5, and triggers.""" # Ensure data directory exists db_path = settings.resolved_database_url.replace("sqlite+aiosqlite:///", "") Path(db_path).parent.mkdir(parents=True, exist_ok=True) async with async_engine.begin() as conn: # Create all tables via SQL (not ORM) to handle SQLite-specific features await conn.run_sync(_create_schema) # Create initial admin user if configured and doesn't exist await _create_initial_admin() async def _create_initial_admin(): """Create initial admin user from environment variables if it doesn't exist.""" import bcrypt import logging logger = logging.getLogger(__name__) logger.info(f"Initial admin username from env: {settings.INITIAL_ADMIN_USERNAME}") logger.info(f"Initial admin password set: {'Yes' if settings.INITIAL_ADMIN_PASSWORD else 'No'}") username = settings.INITIAL_ADMIN_USERNAME password = settings.INITIAL_ADMIN_PASSWORD if not username or not password: logger.warning( "_create_initial_admin skipped: INITIAL_ADMIN_USERNAME or " "INITIAL_ADMIN_PASSWORD not set" ) return logger.info(f"_create_initial_admin: creating admin '{username}'") async with AsyncSessionLocal() as session: # Check if username already exists (any role) result = await session.execute( text("SELECT id, role FROM agents WHERE username = :username"), {"username": username} ) existing = result.fetchone() if existing: logger.info( f"_create_initial_admin: username '{username}' already exists " f"(role={existing.role}), skipping creation" ) return # Create admin user with bcrypt hash password_hash = bcrypt.hashpw( password.encode("utf-8"), bcrypt.gensalt() ).decode("utf-8") await session.execute( text(""" INSERT INTO agents (id, username, password_hash, role, is_deleted, created_at, updated_at) VALUES (:id, :username, :password_hash, 'admin', 0, datetime('now'), datetime('now')) """), { "id": str(uuid.uuid4()), "username": username, "password_hash": password_hash } ) await session.commit() logger.info(f"_create_initial_admin: admin '{username}' created successfully") def _create_schema(sync_conn): """Create all tables, views, FTS5 tables, and triggers synchronously.""" # Agents table sync_conn.execute(text(""" CREATE TABLE IF NOT EXISTS agents ( id TEXT PRIMARY KEY, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'agent' CHECK (role IN ('agent', 'admin')), is_deleted INTEGER NOT NULL DEFAULT 0, deleted_at TIMESTAMP NULL, deleted_by TEXT NULL, created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) ) """)) # Projects table sync_conn.execute(text(""" CREATE TABLE IF NOT EXISTS projects ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, agent_id TEXT NOT NULL REFERENCES agents(id), is_deleted INTEGER NOT NULL DEFAULT 0, deleted_at TIMESTAMP NULL, deleted_by TEXT NULL, created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) ) """)) # Folders table sync_conn.execute(text(""" CREATE TABLE IF NOT EXISTS folders ( id TEXT PRIMARY KEY, name TEXT NOT NULL, project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE, parent_id TEXT REFERENCES folders(id) ON DELETE CASCADE, path TEXT NOT NULL, is_deleted INTEGER NOT NULL DEFAULT 0, deleted_at TIMESTAMP NULL, deleted_by TEXT NULL, created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) ) """)) # Documents table sync_conn.execute(text(""" CREATE TABLE IF NOT EXISTS documents ( id TEXT PRIMARY KEY, title TEXT NOT NULL, content TEXT NOT NULL DEFAULT '', project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE, folder_id TEXT REFERENCES folders(id) ON DELETE SET NULL, path TEXT NOT NULL, is_deleted INTEGER NOT NULL DEFAULT 0, deleted_at TIMESTAMP NULL, deleted_by TEXT NULL, created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) ) """)) # Tags table sync_conn.execute(text(""" CREATE TABLE IF NOT EXISTS tags ( id TEXT PRIMARY KEY, name TEXT UNIQUE NOT NULL, color TEXT NOT NULL DEFAULT '#6366f1', is_deleted INTEGER NOT NULL DEFAULT 0, deleted_at TIMESTAMP NULL, deleted_by TEXT NULL, created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) ) """)) # Document tags junction sync_conn.execute(text(""" CREATE TABLE IF NOT EXISTS document_tags ( document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE, tag_id TEXT NOT NULL REFERENCES tags(id) ON DELETE CASCADE, PRIMARY KEY (document_id, tag_id) ) """)) # Refresh tokens table sync_conn.execute(text(""" CREATE TABLE IF NOT EXISTS refresh_tokens ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL REFERENCES agents(id), token_hash TEXT NOT NULL UNIQUE, token_family_id TEXT NOT NULL, token_version INTEGER NOT NULL, user_agent TEXT, ip_address TEXT, created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), expires_at TIMESTAMP NOT NULL, revoked_at TIMESTAMP NULL, is_global_logout INTEGER NOT NULL DEFAULT 0 ) """)) # JWT blocklist table sync_conn.execute(text(""" CREATE TABLE IF NOT EXISTS jwt_blocklist ( token_id TEXT PRIMARY KEY, revoked_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), expires_at TIMESTAMP NOT NULL ) """)) # Indexes sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_projects_agent ON projects(agent_id)")) sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_folders_project ON folders(project_id)")) sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_folders_parent ON folders(parent_id)")) sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_documents_project ON documents(project_id)")) sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_documents_folder ON documents(folder_id)")) sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_document_tags_doc ON document_tags(document_id)")) sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_document_tags_tag ON document_tags(tag_id)")) sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_refresh_tokens_hash ON refresh_tokens(token_hash)")) sync_conn.execute(text("CREATE INDEX IF NOT EXISTS idx_refresh_tokens_user_family ON refresh_tokens(user_id, token_family_id)")) # --- FTS5 virtual table --- sync_conn.execute(text(""" CREATE VIRTUAL TABLE IF NOT EXISTS documents_fts USING fts5( document_id, title, content, path, tokenize='unicode61 remove_diacritics 1' ) """)) # --- Active views for soft deletes --- sync_conn.execute(text(""" CREATE VIEW IF NOT EXISTS active_agents AS SELECT * FROM agents WHERE is_deleted = 0 """)) sync_conn.execute(text(""" CREATE VIEW IF NOT EXISTS active_projects AS SELECT * FROM projects WHERE is_deleted = 0 """)) sync_conn.execute(text(""" CREATE VIEW IF NOT EXISTS active_folders AS SELECT * FROM folders WHERE is_deleted = 0 """)) sync_conn.execute(text(""" CREATE VIEW IF NOT EXISTS active_documents AS SELECT * FROM documents WHERE is_deleted = 0 """)) sync_conn.execute(text(""" CREATE VIEW IF NOT EXISTS active_tags AS SELECT * FROM tags WHERE is_deleted = 0 """)) # --- FTS5 Sync Triggers --- # Insert trigger sync_conn.execute(text(""" CREATE TRIGGER IF NOT EXISTS documents_fts_ai AFTER INSERT ON documents BEGIN INSERT INTO documents_fts(document_id, title, content, path) VALUES (new.id, new.title, new.content, new.path); END """)) # Update trigger (delete old + insert new) sync_conn.execute(text(""" CREATE TRIGGER IF NOT EXISTS documents_fts_au AFTER UPDATE ON documents BEGIN DELETE FROM documents_fts WHERE document_id = old.id; INSERT INTO documents_fts(document_id, title, content, path) VALUES (new.id, new.title, new.content, new.path); END """)) # Soft-delete trigger (when is_deleted becomes TRUE) sync_conn.execute(text(""" CREATE TRIGGER IF NOT EXISTS documents_fts_ad AFTER UPDATE ON documents WHEN new.is_deleted = 1 AND old.is_deleted = 0 BEGIN DELETE FROM documents_fts WHERE document_id = old.id; END """)) sync_conn.commit()