from fastapi import APIRouter, Depends, HTTPException, Query, Request from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.models.project import Project from app.routers.auth import get_current_agent from app.schemas.search import ( ProjectDocumentSearchItem, ProjectDocumentSearchResponse, QuickSwitcherItem, QuickSwitcherResponse, SearchResponse, ) from app.services.search import search_documents router = APIRouter(prefix="/api/v1", tags=["search"]) @router.get("/search", response_model=SearchResponse) async def search( request: Request, q: str = Query(..., min_length=1), project_id: str | None = Query(None), tags: str | None = Query(None), limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0), db: AsyncSession = Depends(get_db), ): agent = await get_current_agent(request, db) tag_list = None if tags: tag_list = [t.strip() for t in tags.split(",") if t.strip()] return await search_documents( db=db, query=q, agent_id=agent.id, project_id=project_id, tags=tag_list, limit=limit, offset=offset, ) # ============================================================================= # Phase 3: Quick Switcher (Fuzzy Search) # ============================================================================= def _fuzzy_score(query: str, text: str) -> float: """ Simple fuzzy matching score. Returns a score between 0 and 1, higher is better match. """ if not query or not text: return 0.0 query_lower = query.lower() text_lower = text.lower() # Exact match gets 1.0 if query_lower == text_lower: return 1.0 # Starts with query gets 0.9 if text_lower.startswith(query_lower): return 0.9 # Contains query as substring gets 0.7 if query_lower in text_lower: return 0.7 # Word-level fuzzy: check if all query chars appear in order # Score based on character coverage qi = 0 matches = 0 for c in text_lower: if qi < len(query_lower) and c == query_lower[qi]: matches += 1 qi += 1 if qi == len(query_lower): return 0.5 * (matches / len(text_lower)) return 0.0 def _highlight_query(query: str, text: str, max_len: int = 200) -> str: """Add highlighting around query matches in text.""" import re if not query or not text: return text[:max_len] if len(text) > max_len else text # Find the best match position query_lower = query.lower() text_lower = text.lower() idx = text_lower.find(query_lower) if idx == -1: # Fuzzy: find first matching char for i, c in enumerate(text_lower): if c == query_lower[0]: idx = i break else: return text[:max_len] start = max(0, idx - 30) end = min(len(text), idx + len(query) + 30) excerpt = text[start:end] if start > 0: excerpt = "..." + excerpt if end < len(text): excerpt = excerpt + "..." # Highlight the query pattern = re.compile(re.escape(query), re.IGNORECASE) excerpt = pattern.sub(f"{query}", excerpt) return excerpt @router.get("/search/quick", response_model=QuickSwitcherResponse) async def quick_switcher( request: Request, q: str = Query(..., min_length=1), type: str = Query("documents", regex="^(documents|projects|all)$"), limit: int = Query(10, ge=1, le=50), project_id: str | None = Query(None), db: AsyncSession = Depends(get_db), ): """ Quick Switcher: Fuzzy search across documents and/or projects. Used for Cmd+K UI. """ agent = await get_current_agent(request, db) if len(q) > 200: raise HTTPException(status_code=400, detail="Query too long (max 200 chars)") results: list[QuickSwitcherItem] = [] if type in ("documents", "all"): # Search documents docs_query = select(Document).where( Document.is_deleted == False ).order_by(Document.updated_at.desc()) docs_result = await db.execute(docs_query) all_docs = docs_result.scalars().all() # Filter by project if specified if project_id: all_docs = [d for d in all_docs if d.project_id == project_id] # Score and sort documents scored_docs = [] for doc in all_docs: title_score = _fuzzy_score(q, doc.title) content_score = _fuzzy_score(q, doc.content[:500]) if doc.content else 0 best_score = max(title_score, content_score) if best_score > 0: scored_docs.append((best_score, doc)) scored_docs.sort(key=lambda x: -x[0]) for score, doc in scored_docs[:limit]: # Get project name proj_result = await db.execute( select(Project).where( Project.id == doc.project_id, Project.is_deleted == False, ) ) project = proj_result.scalar_one_or_none() project_name = project.name if project else "" results.append(QuickSwitcherItem( id=doc.id, type="document", title=doc.title, subtitle=project_name, highlight=_highlight_query(q, doc.title), icon="📄", project_id=doc.project_id, )) if type in ("projects", "all"): # Search projects proj_query = select(Project).where( Project.is_deleted == False ).order_by(Project.updated_at.desc()) proj_result = await db.execute(proj_query) all_projects = proj_result.scalars().all() scored_projects = [] for proj in all_projects: score = _fuzzy_score(q, proj.name) if score > 0: scored_projects.append((score, proj)) scored_projects.sort(key=lambda x: -x[0]) for score, proj in scored_projects[:limit]: # Count documents in project count_result = await db.execute( text("SELECT COUNT(*) FROM active_documents WHERE project_id = :pid"), {"pid": proj.id} ) doc_count = count_result.scalar() or 0 results.append(QuickSwitcherItem( id=proj.id, type="project", title=proj.name, subtitle=f"{doc_count} documents", highlight=_highlight_query(q, proj.name), icon="📁", project_id=None, )) return QuickSwitcherResponse( query=q, results=results[:limit], total=len(results), search_type="fuzzy", ) # Import Document here to avoid circular reference from app.models.document import Document @router.get("/projects/{project_id}/documents/search", response_model=ProjectDocumentSearchResponse) async def search_project_documents( request: Request, project_id: str, q: str = Query(..., min_length=1, max_length=200), limit: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), ): """ Search within a specific project's documents. """ agent = await get_current_agent(request, db) # Verify project access proj_result = await db.execute( select(Project).where( Project.id == project_id, Project.agent_id == agent.id, Project.is_deleted == False, ) ) if not proj_result.scalar_one_or_none(): raise HTTPException(status_code=404, detail="Project not found") # Get all documents in project docs_result = await db.execute( select(Document).where( Document.project_id == project_id, Document.is_deleted == False, ) ) all_docs = docs_result.scalars().all() # Score and filter scored = [] for doc in all_docs: title_score = _fuzzy_score(q, doc.title) content_score = _fuzzy_score(q, doc.content[:1000]) if doc.content else 0 best_score = max(title_score, content_score) if best_score > 0: excerpt = _build_search_excerpt(doc.content or "", q) scored.append((best_score, doc, excerpt)) scored.sort(key=lambda x: -x[0]) results = [] for score, doc, excerpt in scored[:limit]: results.append(ProjectDocumentSearchItem( document_id=doc.id, title=doc.title, excerpt=excerpt, updated_at=doc.updated_at, score=score, )) return ProjectDocumentSearchResponse( project_id=project_id, query=q, results=results, total=len(results), ) def _build_search_excerpt(content: str, query: str, context_chars: int = 150) -> str: """Build highlighted excerpt around query match.""" import re query_lower = query.lower() content_lower = content.lower() idx = content_lower.find(query_lower) if idx == -1: return content[:context_chars * 2] or "" start = max(0, idx - context_chars) end = min(len(content), idx + len(query) + context_chars) excerpt = content[start:end] if start > 0: excerpt = "..." + excerpt if end < len(content): excerpt = excerpt + "..." pattern = re.compile(re.escape(query), re.IGNORECASE) excerpt = pattern.sub(f"{query}", excerpt) return excerpt