Files
claudia-docs-api/migrations/add_phase3_columns.py

140 lines
4.7 KiB
Python

"""Migration: Add Phase 3 columns to documents table
This migration adds fields for Phase 3 link intelligence features:
- outgoing_links: JSON array of URLs/paths this document links to
- backlinks_count: Integer count of documents that link to this document
SQLite does not support DROP COLUMN directly, so downgrade()
documents the manual process required.
"""
import asyncio
import json
import sys
from pathlib import Path
# Add backend to path for imports when run standalone
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import text
from app.database import async_engine
# Columns to add with their types
PHASE3_COLUMNS = [
("outgoing_links", "TEXT DEFAULT '[]'"),
("backlinks_count", "INTEGER DEFAULT 0"),
]
async def column_exists(connection, table: str, column: str) -> bool:
"""Check if a column already exists in the table."""
result = await connection.execute(
text(f"PRAGMA table_info({table})")
)
columns = [row[1] for row in result.fetchall()]
return column in columns
async def upgrade():
"""Add Phase 3 columns to the documents table."""
print("Starting Phase 3 migration...")
async with async_engine.begin() as conn:
# Check current table structure
result = await conn.execute(text("PRAGMA table_info(documents)"))
existing_columns = [row[1] for row in result.fetchall()]
print(f"Existing columns: {existing_columns}")
for column_name, column_type in PHASE3_COLUMNS:
if column_name in existing_columns:
print(f" [SKIP] Column '{column_name}' already exists, skipping.")
continue
sql = f"ALTER TABLE documents ADD COLUMN {column_name} {column_type}"
print(f" [ADD] {column_name} ({column_type})")
await conn.execute(text(sql))
await validate()
print("Phase 3 migration completed successfully.")
async def validate():
"""Validate that all Phase 3 columns exist."""
print("\n--- Post-migration validation ---")
async with async_engine.begin() as conn:
result = await conn.execute(text("PRAGMA table_info(documents)"))
columns = {row[1]: row for row in result.fetchall()}
all_ok = True
for column_name, column_type in PHASE3_COLUMNS:
if column_name not in columns:
print(f" [FAIL] Column '{column_name}' is MISSING")
all_ok = False
else:
print(f" [OK] {column_name} added successfully")
if all_ok:
print("\nValidation PASSED: All Phase 3 columns present.")
else:
print("\nValidation FAILED: Some columns are missing.")
raise RuntimeError("Migration validation failed.")
# Smoke test: verify outgoing_links accepts valid JSON array
print("\n--- Smoke test ---")
result = await conn.execute(text("SELECT outgoing_links FROM documents LIMIT 1"))
row = result.fetchone()
if row and row[0] is not None:
try:
parsed = json.loads(row[0])
if isinstance(parsed, list):
print(f" [OK] outgoing_links contains valid JSON array")
else:
print(f" [WARN] outgoing_links is not a JSON array")
except json.JSONDecodeError:
print(f" [WARN] outgoing_links is not valid JSON")
else:
print(f" [OK] outgoing_links accepts NULL/default (as expected)")
# Verify backlinks_count is an integer
result = await conn.execute(text("SELECT backlinks_count FROM documents LIMIT 1"))
row = result.fetchone()
if row:
print(f" [OK] backlinks_count column is accessible")
async def downgrade():
"""
SQLite does not support ALTER TABLE DROP COLUMN directly.
To downgrade manually:
1. Create a new table without the Phase 3 columns
2. Copy data from the original table
3. Drop the original table
4. Rename the new table to 'documents'
Example (run in sqlite3 CLI):
PRAGMA foreign_keys=off;
CREATE TABLE documents_backup AS
SELECT id, title, content, created_at, updated_at, metadata,
reasoning_type, confidence, reasoning_steps, model_source, tiptap_content
FROM documents;
DROP TABLE documents;
ALTER TABLE documents_backup RENAME TO documents;
PRAGMA foreign_keys=on;
"""
print("SQLite does not support DROP COLUMN.")
print("See downgrade() docstring for manual rollback steps.")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "down":
asyncio.run(downgrade())
else:
asyncio.run(upgrade())