Files
Motoko 7f3e8a8f53 Phase 1 MVP - Complete implementation
- Auth: register, login, JWT with refresh tokens, blocklist
- Projects/Folders/Documents CRUD with soft deletes
- Tags CRUD and assignment
- FTS5 search with highlights and tag filtering
- ADR-001, ADR-002, ADR-003 compliant
- Security fixes applied (JWT_SECRET_KEY, exception handler, cookie secure)
- 25 tests passing
2026-03-30 15:17:27 +00:00

129 lines
3.8 KiB
Python

from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.schemas.search import SearchResult, SearchResponse
from app.schemas.document import TagInfo
async def search_documents(
db: AsyncSession,
query: str,
agent_id: str | None = None,
project_id: str | None = None,
tags: list[str] | None = None,
limit: int = 20,
offset: int = 0,
) -> SearchResponse:
"""
Full-text search using FTS5.
Returns snippets with highlight markup.
"""
if not query or len(query.strip()) == 0:
return SearchResponse(results=[])
# Escape FTS5 special characters and prepare query
safe_query = query.replace('"', '""')
# Build the FTS5 MATCH query
fts_query = f'"{safe_query}"'
# Get document IDs from FTS5
fts_sql = text("""
SELECT document_id, title, content, path,
bm25(documents_fts) as score
FROM documents_fts
WHERE documents_fts MATCH :q
ORDER BY score
LIMIT :limit OFFSET :offset
""")
fts_result = await db.execute(
fts_sql,
{"q": fts_query, "limit": limit, "offset": offset}
)
fts_rows = fts_result.fetchall()
if not fts_rows:
return SearchResponse(results=[])
results = []
for row in fts_rows:
doc_id = row.document_id
# Get document to verify access and get project_id
doc_sql = text("""
SELECT d.id, d.title, d.content, d.project_id, d.is_deleted,
p.agent_id
FROM active_documents d
JOIN active_projects p ON d.project_id = p.id
WHERE d.id = :doc_id AND p.agent_id = :agent_id
""")
doc_result = await db.execute(
doc_sql,
{"doc_id": doc_id, "agent_id": agent_id}
)
doc_row = doc_result.fetchone()
if not doc_row:
continue
# Filter by project_id if provided
if project_id and doc_row.project_id != project_id:
continue
# Get tags for this document
tags_sql = text("""
SELECT t.id, t.name, t.color
FROM active_tags t
JOIN document_tags dt ON t.id = dt.tag_id
WHERE dt.document_id = :doc_id
""")
tags_result = await db.execute(tags_sql, {"doc_id": doc_id})
tag_rows = tags_result.fetchall()
doc_tags = [TagInfo(id=t.id, name=t.name, color=t.color) for t in tag_rows]
# Filter by tags if provided
if tags:
tag_names = {t.name for t in doc_tags}
if not any(tn in tag_names for tn in tags):
continue
# Build excerpt with snippet
content = doc_row.content or ""
excerpt = _build_snippet(content, query)
results.append(SearchResult(
id=doc_row.id,
title=doc_row.title,
excerpt=excerpt,
project_id=doc_row.project_id,
tags=doc_tags,
score=abs(row.score) if row.score else 0.0,
))
return SearchResponse(results=results)
def _build_snippet(content: str, query: str, context_chars: int = 150) -> str:
"""Build a highlighted snippet from content."""
query_lower = query.lower()
content_lower = content.lower()
idx = content_lower.find(query_lower)
if idx == -1:
# No exact match, return beginning
snippet = content[:context_chars * 2]
else:
start = max(0, idx - context_chars)
end = min(len(content), idx + len(query) + context_chars)
snippet = content[start:end]
if start > 0:
snippet = "..." + snippet
if end < len(content):
snippet = snippet + "..."
# Simple highlight: wrap matches in **
import re
pattern = re.compile(re.escape(query), re.IGNORECASE)
snippet = pattern.sub(f"**{query}**", snippet)
return snippet