from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession from app.schemas.search import SearchResult, SearchResponse from app.schemas.document import TagInfo async def search_documents( db: AsyncSession, query: str, agent_id: str | None = None, project_id: str | None = None, tags: list[str] | None = None, limit: int = 20, offset: int = 0, ) -> SearchResponse: """ Full-text search using FTS5. Returns snippets with highlight markup. """ if not query or len(query.strip()) == 0: return SearchResponse(results=[]) # Escape FTS5 special characters and prepare query safe_query = query.replace('"', '""') # Build the FTS5 MATCH query fts_query = f'"{safe_query}"' # Get document IDs from FTS5 fts_sql = text(""" SELECT document_id, title, content, path, bm25(documents_fts) as score FROM documents_fts WHERE documents_fts MATCH :q ORDER BY score LIMIT :limit OFFSET :offset """) fts_result = await db.execute( fts_sql, {"q": fts_query, "limit": limit, "offset": offset} ) fts_rows = fts_result.fetchall() if not fts_rows: return SearchResponse(results=[]) results = [] for row in fts_rows: doc_id = row.document_id # Get document to verify access and get project_id doc_sql = text(""" SELECT d.id, d.title, d.content, d.project_id, d.is_deleted, p.agent_id FROM active_documents d JOIN active_projects p ON d.project_id = p.id WHERE d.id = :doc_id AND p.agent_id = :agent_id """) doc_result = await db.execute( doc_sql, {"doc_id": doc_id, "agent_id": agent_id} ) doc_row = doc_result.fetchone() if not doc_row: continue # Filter by project_id if provided if project_id and doc_row.project_id != project_id: continue # Get tags for this document tags_sql = text(""" SELECT t.id, t.name, t.color FROM active_tags t JOIN document_tags dt ON t.id = dt.tag_id WHERE dt.document_id = :doc_id """) tags_result = await db.execute(tags_sql, {"doc_id": doc_id}) tag_rows = tags_result.fetchall() doc_tags = [TagInfo(id=t.id, name=t.name, color=t.color) for t in tag_rows] # Filter by tags if provided if tags: tag_names = {t.name for t in doc_tags} if not any(tn in tag_names for tn in tags): continue # Build excerpt with snippet content = doc_row.content or "" excerpt = _build_snippet(content, query) results.append(SearchResult( id=doc_row.id, title=doc_row.title, excerpt=excerpt, project_id=doc_row.project_id, tags=doc_tags, score=abs(row.score) if row.score else 0.0, )) return SearchResponse(results=results) def _build_snippet(content: str, query: str, context_chars: int = 150) -> str: """Build a highlighted snippet from content.""" query_lower = query.lower() content_lower = content.lower() idx = content_lower.find(query_lower) if idx == -1: # No exact match, return beginning snippet = content[:context_chars * 2] else: start = max(0, idx - context_chars) end = min(len(content), idx + len(query) + context_chars) snippet = content[start:end] if start > 0: snippet = "..." + snippet if end < len(content): snippet = snippet + "..." # Simple highlight: wrap matches in ** import re pattern = re.compile(re.escape(query), re.IGNORECASE) snippet = pattern.sub(f"**{query}**", snippet) return snippet