Files
claudia-docs-api/migrations/add_phase2_columns.py
Motoko 71e5e3570f fix migration: remove notnull validation that fails with DEFAULT NULL
SQLite PRAGMA table_info returns notnull=1 for columns with DEFAULT,
even when they are nullable. Now only checks column existence.
2026-03-30 23:24:12 +00:00

141 lines
4.9 KiB
Python

"""Migration: Add Phase 2 columns to documents table
This migration adds fields for Phase 2 Claudia Docs features:
- reasoning_type: Type of reasoning used (e.g., 'chain_of_thought', 'direct')
- confidence: Confidence score from 0.0 to 1.0
- reasoning_steps: JSON array of reasoning step objects
- model_source: Source/model that generated the content
- tiptap_content: TipTap JSON document structure
SQLite does not support DROP COLUMN directly, so downgrade()
documents the manual process required.
"""
import asyncio
import json
import sys
from pathlib import Path
# Add backend to path for imports when run standalone
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import text
from app.database import async_engine
# Columns to add with their types
PHASE2_COLUMNS = [
("reasoning_type", "VARCHAR(20) DEFAULT NULL"),
("confidence", "FLOAT DEFAULT NULL"),
("reasoning_steps", "TEXT DEFAULT NULL"),
("model_source", "VARCHAR(255) DEFAULT NULL"),
("tiptap_content", "TEXT DEFAULT NULL"),
]
async def column_exists(connection, table: str, column: str) -> bool:
"""Check if a column already exists in the table."""
result = await connection.execute(
text(f"PRAGMA table_info({table})")
)
columns = [row[1] for row in result.fetchall()]
return column in columns
async def upgrade():
"""Add Phase 2 columns to the documents table."""
print("Starting Phase 2 migration...")
async with async_engine.begin() as conn:
# Check current table structure
result = await conn.execute(text("PRAGMA table_info(documents)"))
existing_columns = [row[1] for row in result.fetchall()]
print(f"Existing columns: {existing_columns}")
for column_name, column_type in PHASE2_COLUMNS:
if column_name in existing_columns:
print(f" [SKIP] Column '{column_name}' already exists, skipping.")
continue
sql = f"ALTER TABLE documents ADD COLUMN {column_name} {column_type}"
print(f" [ADD] {column_name} ({column_type})")
await conn.execute(text(sql))
await validate()
print("Phase 2 migration completed successfully.")
async def validate():
"""Validate that all Phase 2 columns exist and are nullable."""
print("\n--- Post-migration validation ---")
async with async_engine.begin() as conn:
result = await conn.execute(text("PRAGMA table_info(documents)"))
columns = {row[1]: row for row in result.fetchall()}
all_ok = True
for column_name, column_type in PHASE2_COLUMNS:
if column_name not in columns:
print(f" [FAIL] Column '{column_name}' is MISSING")
all_ok = False
else:
print(f" [OK] {column_name} added successfully")
if all_ok:
print("\nValidation PASSED: All Phase 2 columns present.")
else:
print("\nValidation FAILED: Some columns are missing.")
raise RuntimeError("Migration validation failed.")
# Smoke test: verify JSON columns accept NULL and valid JSON
print("\n--- JSON column smoke test ---")
for col in ["reasoning_steps", "tiptap_content"]:
result = await conn.execute(text(f"SELECT {col} FROM documents LIMIT 1"))
row = result.fetchone()
if row and row[0] is not None:
try:
json.loads(row[0])
print(f" [OK] {col} contains valid JSON")
except json.JSONDecodeError:
print(f" [WARN] {col} is not valid JSON (expected for new NULL values)")
else:
print(f" [OK] {col} accepts NULL (as expected for new columns)")
async def downgrade():
"""
SQLite does not support ALTER TABLE DROP COLUMN directly.
To downgrade manually:
1. Create a new table without the Phase 2 columns
2. Copy data from the original table
3. Drop the original table
4. Rename the new table to 'documents'
Example (run in sqlite3 CLI):
PRAGMA foreign_keys=off;
CREATE TABLE documents_backup AS
SELECT id, title, content, created_at, updated_at, metadata
FROM documents;
DROP TABLE documents;
ALTER TABLE documents_backup RENAME TO documents;
PRAGMA foreign_keys=on;
"""
print("SQLite does not support DROP COLUMN.")
print("See downgrade() docstring for manual rollback steps.")
# Uncomment the lines below ONLY if you have a backup table ready:
# async with async_engine.begin() as conn:
# await conn.execute(text("DROP TABLE IF EXISTS documents_backup"))
# await conn.execute(text("ALTER TABLE documents RENAME TO documents_backup"))
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "down":
asyncio.run(downgrade())
else:
asyncio.run(upgrade())