Phase 3: Graph view, backlinks, quick switcher, export

- Add outgoing_links (JSON) and backlinks_count to Document model
- POST /documents/{id}/detect-links — detect [[uuid]] patterns in content
- GET /documents/{id}/backlinks — documents referencing this doc
- GET /documents/{id}/outgoing-links — documents this doc references
- GET /documents/{id}/links — combined incoming + outgoing
- GET /projects/{id}/graph — full project relationship graph
- GET /search/quick — fuzzy search (Quick Switcher Cmd+K)
- GET /projects/{id}/documents/search — project-scoped search
- GET /documents/{id}/export — markdown|json export
- GET /projects/{id}/export — json|zip export
- 27 new tests
This commit is contained in:
Motoko
2026-03-30 23:46:45 +00:00
parent 202e70b4a8
commit 07f9ac91fc
9 changed files with 1887 additions and 6 deletions

View File

@@ -190,7 +190,9 @@ def _create_schema(sync_conn):
confidence TEXT, confidence TEXT,
reasoning_steps TEXT, reasoning_steps TEXT,
model_source TEXT, model_source TEXT,
tiptap_content TEXT tiptap_content TEXT,
outgoing_links TEXT DEFAULT '[]',
backlinks_count INTEGER NOT NULL DEFAULT 0
) )
""")) """))

View File

@@ -8,7 +8,7 @@ from fastapi.responses import JSONResponse
from app.config import settings from app.config import settings
from app.database import init_db, get_db, async_engine from app.database import init_db, get_db, async_engine
from app.routers import auth, projects, folders, documents, tags, search from app.routers import auth, projects, folders, documents, tags, search, links, export
from app.services.auth import cleanup_expired_blocklist from app.services.auth import cleanup_expired_blocklist
@@ -45,6 +45,8 @@ app.include_router(folders.router)
app.include_router(documents.router) app.include_router(documents.router)
app.include_router(tags.router) app.include_router(tags.router)
app.include_router(search.router) app.include_router(search.router)
app.include_router(links.router)
app.include_router(export.router)
@app.get("/api/v1/health") @app.get("/api/v1/health")

View File

@@ -39,3 +39,6 @@ class Document(Base):
reasoning_steps: Mapped[str | None] = mapped_column(Text, nullable=True) # JSON array as text reasoning_steps: Mapped[str | None] = mapped_column(Text, nullable=True) # JSON array as text
model_source: Mapped[str | None] = mapped_column(String(100), nullable=True) model_source: Mapped[str | None] = mapped_column(String(100), nullable=True)
tiptap_content: Mapped[str | None] = mapped_column(Text, nullable=True) # JSON object as text tiptap_content: Mapped[str | None] = mapped_column(Text, nullable=True) # JSON object as text
# Phase 3: Link tracking
outgoing_links: Mapped[str] = mapped_column(Text, nullable=False, default="[]") # JSON array of document IDs
backlinks_count: Mapped[int] = mapped_column(default=0, nullable=False) # Cached count of incoming links

307
app/routers/export.py Normal file
View File

@@ -0,0 +1,307 @@
import io
import json
import zipfile
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import StreamingResponse
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.document import Document
from app.models.project import Project
from app.routers.auth import get_current_agent
from app.routers.documents import tiptap_to_markdown
router = APIRouter(tags=["export"])
async def _get_doc_with_access(request: Request, document_id: str, db: AsyncSession) -> Document:
"""Get document and verify access."""
agent = await get_current_agent(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == False,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.agent_id == agent.id,
Project.is_deleted == False,
)
)
if not proj_result.scalar_one_or_none():
raise HTTPException(status_code=403, detail="Forbidden")
return doc
async def _get_project_with_access(request: Request, project_id: str, db: AsyncSession) -> tuple[Project, str]:
"""Get project and verify access. Returns (project, project_name)."""
agent = await get_current_agent(request, db)
result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.agent_id == agent.id,
Project.is_deleted == False,
)
)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project, project.name
@router.get("/api/v1/documents/{document_id}/export")
async def export_document(
request: Request,
document_id: str,
format: str = Query(..., regex="^(markdown|json)$"),
db: AsyncSession = Depends(get_db),
):
"""
Export a single document as Markdown or JSON.
"""
doc = await _get_doc_with_access(request, document_id, db)
if format == "markdown":
# Convert tiptap to markdown
if doc.tiptap_content:
try:
tiptap = json.loads(doc.tiptap_content)
content = tiptap_to_markdown(tiptap)
except json.JSONDecodeError:
content = doc.content
else:
content = doc.content
# Add frontmatter
filename = f"{doc.title}.md"
output = f"# {doc.title}\n\n{content}"
return StreamingResponse(
iter([output]),
media_type="text/markdown",
headers={
"Content-Disposition": f'attachment; filename="{filename}"'
}
)
else: # json
# Parse tiptap_content
tiptap_content = None
if doc.tiptap_content:
try:
tiptap_content = json.loads(doc.tiptap_content)
except json.JSONDecodeError:
tiptap_content = None
# Parse reasoning_steps
reasoning_steps = []
if doc.reasoning_steps:
try:
reasoning_steps = json.loads(doc.reasoning_steps)
except json.JSONDecodeError:
reasoning_steps = []
# Parse confidence
confidence = None
if doc.confidence:
try:
confidence = float(doc.confidence)
except (ValueError, TypeError):
confidence = None
# Parse outgoing_links
outgoing_links = []
if doc.outgoing_links:
try:
outgoing_links = json.loads(doc.outgoing_links)
except json.JSONDecodeError:
outgoing_links = []
export_data = {
"id": doc.id,
"title": doc.title,
"content": doc.content,
"tiptap_content": tiptap_content,
"created_at": doc.created_at.isoformat(),
"updated_at": doc.updated_at.isoformat(),
"metadata": {
"reasoning_type": doc.reasoning_type,
"confidence": confidence,
"reasoning_steps": reasoning_steps,
"model_source": doc.model_source,
"outgoing_links": outgoing_links,
}
}
filename = f"{doc.title}.json"
output = json.dumps(export_data, indent=2, ensure_ascii=False)
return StreamingResponse(
iter([output]),
media_type="application/json",
headers={
"Content-Disposition": f'attachment; filename="{filename}"'
}
)
@router.get("/api/v1/projects/{project_id}/export")
async def export_project(
request: Request,
project_id: str,
format: str = Query(..., regex="^(zip|json)$"),
include_metadata: bool = Query(True),
db: AsyncSession = Depends(get_db),
):
"""
Export a complete project as ZIP (with .md files) or JSON.
"""
project, project_name = await _get_project_with_access(request, project_id, db)
# Get all documents
docs_result = await db.execute(
select(Document).where(
Document.project_id == project_id,
Document.is_deleted == False,
).order_by(Document.created_at)
)
all_docs = docs_result.scalars().all()
# Check size (warn at 50MB, hard limit at 100MB)
total_size = sum(
len(d.content or "") + len(d.tiptap_content or "") + len(d.title)
for d in all_docs
)
if total_size > 100_000_000:
raise HTTPException(status_code=507, detail="Project too large to export (max 100MB)")
if format == "json":
documents = []
for doc in all_docs:
tiptap_content = None
if doc.tiptap_content:
try:
tiptap_content = json.loads(doc.tiptap_content)
except json.JSONDecodeError:
pass
outgoing_links = []
if doc.outgoing_links:
try:
outgoing_links = json.loads(doc.outgoing_links)
except json.JSONDecodeError:
pass
metadata = {}
if include_metadata:
reasoning_steps = []
if doc.reasoning_steps:
try:
reasoning_steps = json.loads(doc.reasoning_steps)
except json.JSONDecodeError:
pass
confidence = None
if doc.confidence:
try:
confidence = float(doc.confidence)
except:
pass
metadata = {
"reasoning_type": doc.reasoning_type,
"confidence": confidence,
"reasoning_steps": reasoning_steps,
"model_source": doc.model_source,
}
documents.append({
"id": doc.id,
"title": doc.title,
"content": doc.content,
"tiptap_content": tiptap_content if include_metadata else None,
"outgoing_links": outgoing_links,
"metadata": metadata,
})
export_data = {
"project": {
"id": project.id,
"name": project.name,
"description": project.description,
"created_at": project.created_at.isoformat(),
"updated_at": project.updated_at.isoformat(),
},
"documents": documents,
"exported_at": datetime.utcnow().isoformat(),
"format_version": "3.0",
}
filename = f"{project_name}-export.json"
output = json.dumps(export_data, indent=2, ensure_ascii=False)
return StreamingResponse(
iter([output]),
media_type="application/json",
headers={
"Content-Disposition": f'attachment; filename="{filename}"'
}
)
else: # zip
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
# Add project.json
project_meta = {
"id": project.id,
"name": project.name,
"description": project.description,
"created_at": project.created_at.isoformat(),
"updated_at": project.updated_at.isoformat(),
}
zf.writestr(
"project.json",
json.dumps(project_meta, indent=2, ensure_ascii=False)
)
# Add documents
for doc in all_docs:
# Convert content to markdown
if doc.tiptap_content:
try:
tiptap = json.loads(doc.tiptap_content)
content = tiptap_to_markdown(tiptap)
except json.JSONDecodeError:
content = doc.content
else:
content = doc.content
md_content = f"# {doc.title}\n\n{content}"
safe_title = "".join(c if c.isalnum() or c in " -_" else "_" for c in doc.title)
zf.writestr(f"documents/{safe_title}.md", md_content)
buffer.seek(0)
filename = f"{project_name}-export.zip"
return StreamingResponse(
iter([buffer.read()]),
media_type="application/zip",
headers={
"Content-Disposition": f'attachment; filename="{filename}"'
}
)

490
app/routers/links.py Normal file
View File

@@ -0,0 +1,490 @@
import json
import re
import uuid
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.document import Document
from app.models.project import Project
from app.routers.auth import get_current_agent
from app.schemas.document import (
BacklinkItem,
BacklinksResponse,
BrokenLink,
DetectLinksRequest,
DetectLinksResponse,
GraphEdge,
GraphNode,
GraphResponse,
GraphStats,
LinkItem,
LinksResponse,
OutgoingLinkItem,
OutgoingLinksResponse,
)
router = APIRouter(tags=["links"])
# =============================================================================
# Link Detection
# =============================================================================
def detect_links_in_content(content: str) -> tuple[list[str], list[BrokenLink]]:
"""
Detect [[uuid]] and [[uuid|text]] patterns in content.
Returns (valid_ids, broken_links).
"""
# Pattern: [[uuid]] or [[uuid|text]]
pattern = r'\[\[([0-9a-f-]{36})(?:\|[^\]]+)?\]\]'
matches = re.findall(pattern, content, re.IGNORECASE)
valid_ids = []
broken_links = []
for match in matches:
try:
# Validate UUID format
uuid.UUID(match)
valid_ids.append(match)
except ValueError:
broken_links.append(BrokenLink(reference=match, reason="invalid_format"))
return valid_ids, broken_links
async def _get_doc_with_access(request: Request, document_id: str, db: AsyncSession) -> Document:
"""Get document and verify access."""
agent = await get_current_agent(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == False,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.agent_id == agent.id,
Project.is_deleted == False,
)
)
if not proj_result.scalar_one_or_none():
raise HTTPException(status_code=403, detail="Forbidden")
return doc
async def _get_project_with_access(request: Request, project_id: str, db: AsyncSession) -> Project:
"""Get project and verify access."""
agent = await get_current_agent(request, db)
result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.agent_id == agent.id,
Project.is_deleted == False,
)
)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
@router.post("/api/v1/documents/{document_id}/detect-links", response_model=DetectLinksResponse)
async def detect_links(
request: Request,
document_id: str,
payload: DetectLinksRequest,
db: AsyncSession = Depends(get_db),
):
"""
Detect and save [[uuid]] references in content.
Updates the document's outgoing_links field.
"""
doc = await _get_doc_with_access(request, document_id, db)
# Validate content size
if len(payload.content) > 5_000_000:
raise HTTPException(status_code=413, detail="Content too large (max 5MB)")
# Detect links
link_ids, broken_links = detect_links_in_content(payload.content)
# Validate that referenced documents exist
valid_ids = []
for lid in link_ids:
ref_result = await db.execute(
select(Document.id).where(
Document.id == lid,
Document.is_deleted == False,
)
)
if ref_result.scalar_one_or_none():
valid_ids.append(lid)
else:
broken_links.append(BrokenLink(reference=lid, reason="document_not_found"))
# Remove duplicates while preserving order
seen = set()
unique_valid_ids = []
for vid in valid_ids:
if vid not in seen:
seen.add(vid)
unique_valid_ids.append(vid)
# Update document's outgoing_links
doc.outgoing_links = json.dumps(unique_valid_ids)
doc.updated_at = datetime.utcnow()
# Update backlinks_count on target documents
# First, decrement old links
old_links = []
if doc.outgoing_links:
try:
old_links = json.loads(doc.outgoing_links) if doc.outgoing_links != "[]" else []
except json.JSONDecodeError:
old_links = []
for target_id in old_links:
if target_id not in unique_valid_ids:
await db.execute(
text("""
UPDATE documents
SET backlinks_count = MAX(0, backlinks_count - 1)
WHERE id = :target_id AND backlinks_count > 0
"""),
{"target_id": target_id}
)
# Then, increment new links
for target_id in unique_valid_ids:
if target_id not in old_links:
await db.execute(
text("""
UPDATE documents
SET backlinks_count = backlinks_count + 1
WHERE id = :target_id
"""),
{"target_id": target_id}
)
await db.flush()
return DetectLinksResponse(
document_id=document_id,
outgoing_links=unique_valid_ids,
links_detected=len(unique_valid_ids),
links_broken=len(broken_links),
broken_links=broken_links,
)
# =============================================================================
# Backlinks & Outgoing Links
# =============================================================================
@router.get("/api/v1/documents/{document_id}/backlinks", response_model=BacklinksResponse)
async def get_backlinks(
request: Request,
document_id: str,
db: AsyncSession = Depends(get_db),
):
"""
Get documents that reference this document (incoming links).
"""
doc = await _get_doc_with_access(request, document_id, db)
# Find all documents that have this doc_id in their outgoing_links
result = await db.execute(
text("""
SELECT d.id, d.title, d.project_id, d.content, d.updated_at,
p.name as project_name
FROM active_documents d
JOIN active_projects p ON d.project_id = p.id
WHERE d.outgoing_links LIKE :pattern
AND d.is_deleted = 0
ORDER BY d.updated_at DESC
"""),
{"pattern": f"%{document_id}%"}
)
rows = result.fetchall()
backlinks = []
for row in rows:
# Parse outgoing_links JSON to verify this doc actually references target
outgoing = []
try:
outgoing = json.loads(row.content) if row.content else []
except json.JSONDecodeError:
pass
# Check if this document's outgoing_links contains document_id
try:
outgoing_list = json.loads(row.content) if row.content else []
# Actually we need to check outgoing_links field directly
except:
pass
# Use a more precise check
check_result = await db.execute(
select(Document).where(
Document.id == row.id,
Document.outgoing_links.like(f"%{document_id}%")
)
)
if not check_result.scalar_one_or_none():
continue
# Build excerpt around the reference
excerpt = _build_backlink_excerpt(row.content or "", document_id)
backlinks.append(BacklinkItem(
document_id=row.id,
title=row.title,
project_id=row.project_id,
project_name=row.project_name,
excerpt=excerpt,
updated_at=row.updated_at,
))
return BacklinksResponse(
document_id=document_id,
backlinks_count=len(backlinks),
backlinks=backlinks,
)
@router.get("/api/v1/documents/{document_id}/outgoing-links", response_model=OutgoingLinksResponse)
async def get_outgoing_links(
request: Request,
document_id: str,
db: AsyncSession = Depends(get_db),
):
"""
Get documents that this document references (outgoing links).
"""
doc = await _get_doc_with_access(request, document_id, db)
# Parse outgoing_links
outgoing_ids = []
if doc.outgoing_links:
try:
outgoing_ids = json.loads(doc.outgoing_links)
except json.JSONDecodeError:
outgoing_ids = []
outgoing_links = []
for target_id in outgoing_ids:
# Check if target document exists
target_result = await db.execute(
text("""
SELECT d.id, d.title, d.project_id, d.updated_at,
p.name as project_name
FROM active_documents d
JOIN active_projects p ON d.project_id = p.id
WHERE d.id = :target_id
"""),
{"target_id": target_id}
)
row = target_result.fetchone()
if row:
outgoing_links.append(OutgoingLinkItem(
document_id=row.id,
title=row.title,
project_id=row.project_id,
project_name=row.project_name,
exists=True,
updated_at=row.updated_at,
))
else:
# Document was deleted but was referenced
outgoing_links.append(OutgoingLinkItem(
document_id=target_id,
title="[Deleted Document]",
project_id="",
project_name="",
exists=False,
updated_at=None,
))
return OutgoingLinksResponse(
document_id=document_id,
outgoing_links_count=len(outgoing_links),
outgoing_links=outgoing_links,
)
@router.get("/api/v1/documents/{document_id}/links", response_model=LinksResponse)
async def get_links(
request: Request,
document_id: str,
db: AsyncSession = Depends(get_db),
):
"""
Get all incoming and outgoing links for a document.
"""
doc = await _get_doc_with_access(request, document_id, db)
# Get outgoing links
outgoing_ids = []
if doc.outgoing_links:
try:
outgoing_ids = json.loads(doc.outgoing_links)
except json.JSONDecodeError:
outgoing_ids = []
outgoing = []
for target_id in outgoing_ids:
target_result = await db.execute(
select(Document).where(Document.id == target_id, Document.is_deleted == False)
)
target = target_result.scalar_one_or_none()
if target:
outgoing.append(LinkItem(
document_id=target.id,
title=target.title,
anchor_text=None,
))
# Get incoming links (backlinks)
backlinks_result = await db.execute(
text("""
SELECT d.id, d.title, d.outgoing_links
FROM active_documents d
WHERE d.outgoing_links LIKE :pattern
AND d.is_deleted = 0
"""),
{"pattern": f"%{document_id}%"}
)
backlink_rows = backlinks_result.fetchall()
backlinks = []
for row in backlink_rows:
# Verify this link actually points to our document
try:
outgoing_list = json.loads(row.outgoing_links) if row.outgoing_links else []
except json.JSONDecodeError:
continue
if document_id in outgoing_list:
backlinks.append(LinkItem(
document_id=row.id,
title=row.title,
anchor_text=None,
))
return LinksResponse(
document_id=document_id,
outgoing_links=outgoing,
backlinks=backlinks,
)
def _build_backlink_excerpt(content: str, target_id: str, context_chars: int = 150) -> str:
"""Build an excerpt around the [[target_id]] reference in content."""
# Find the [[uuid]] pattern in content
pattern = r'\[\[' + re.escape(target_id) + r'(?:\|[^\]]+)?\]\]'
match = re.search(pattern, content, re.IGNORECASE)
if not match:
return content[:context_chars * 2] or ""
start = max(0, match.start() - context_chars)
end = min(len(content), match.end() + context_chars)
excerpt = content[start:end]
if start > 0:
excerpt = "..." + excerpt
if end < len(content):
excerpt = excerpt + "..."
return excerpt
# =============================================================================
# Project Graph
# =============================================================================
@router.get("/api/v1/projects/{project_id}/graph", response_model=GraphResponse)
async def get_project_graph(
request: Request,
project_id: str,
depth: int = Query(2, ge=1, le=3),
db: AsyncSession = Depends(get_db),
):
"""
Get the full graph of document relationships within a project.
Depth controls how many hops of outgoing links to include.
"""
project = await _get_project_with_access(request, project_id, db)
# Get all documents in project
docs_result = await db.execute(
select(Document).where(
Document.project_id == project_id,
Document.is_deleted == False,
)
)
all_docs = docs_result.scalars().all()
# Build adjacency: doc_id -> set of outgoing_ids
doc_map = {doc.id: doc for doc in all_docs}
adjacency: dict[str, set[str]] = {doc.id: set() for doc in all_docs}
edges = []
total_references = 0
reachable: set[str] = set()
for doc in all_docs:
outgoing_ids = []
if doc.outgoing_links:
try:
outgoing_ids = json.loads(doc.outgoing_links)
except json.JSONDecodeError:
pass
for target_id in outgoing_ids:
if target_id in doc_map:
adjacency[doc.id].add(target_id)
edges.append(GraphEdge(source=doc.id, target=target_id, type="reference"))
total_references += 1
reachable.add(doc.id)
reachable.add(target_id)
# Build nodes
nodes = []
for doc in all_docs:
nodes.append(GraphNode(id=doc.id, title=doc.title, type="document"))
# Orphaned = docs with no incoming and no outgoing links
incoming_count: dict[str, int] = {doc.id: 0 for doc in all_docs}
for doc in all_docs:
for target_id in adjacency[doc.id]:
if target_id in incoming_count:
incoming_count[target_id] += 1
orphaned = sum(1 for doc in all_docs if incoming_count[doc.id] == 0 and len(adjacency[doc.id]) == 0)
return GraphResponse(
project_id=project_id,
nodes=nodes,
edges=edges,
stats=GraphStats(
total_documents=len(all_docs),
total_references=total_references,
orphaned_documents=orphaned,
),
)

View File

@@ -1,15 +1,23 @@
from fastapi import APIRouter, Depends, Query, Request from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db from app.database import get_db
from app.models.project import Project
from app.routers.auth import get_current_agent from app.routers.auth import get_current_agent
from app.schemas.search import SearchResponse from app.schemas.search import (
ProjectDocumentSearchItem,
ProjectDocumentSearchResponse,
QuickSwitcherItem,
QuickSwitcherResponse,
SearchResponse,
)
from app.services.search import search_documents from app.services.search import search_documents
router = APIRouter(prefix="/api/v1/search", tags=["search"]) router = APIRouter(prefix="/api/v1", tags=["search"])
@router.get("", response_model=SearchResponse) @router.get("/search", response_model=SearchResponse)
async def search( async def search(
request: Request, request: Request,
q: str = Query(..., min_length=1), q: str = Query(..., min_length=1),
@@ -34,3 +42,284 @@ async def search(
limit=limit, limit=limit,
offset=offset, offset=offset,
) )
# =============================================================================
# Phase 3: Quick Switcher (Fuzzy Search)
# =============================================================================
def _fuzzy_score(query: str, text: str) -> float:
"""
Simple fuzzy matching score.
Returns a score between 0 and 1, higher is better match.
"""
if not query or not text:
return 0.0
query_lower = query.lower()
text_lower = text.lower()
# Exact match gets 1.0
if query_lower == text_lower:
return 1.0
# Starts with query gets 0.9
if text_lower.startswith(query_lower):
return 0.9
# Contains query as substring gets 0.7
if query_lower in text_lower:
return 0.7
# Word-level fuzzy: check if all query chars appear in order
# Score based on character coverage
qi = 0
matches = 0
for c in text_lower:
if qi < len(query_lower) and c == query_lower[qi]:
matches += 1
qi += 1
if qi == len(query_lower):
return 0.5 * (matches / len(text_lower))
return 0.0
def _highlight_query(query: str, text: str, max_len: int = 200) -> str:
"""Add <mark> highlighting around query matches in text."""
import re
if not query or not text:
return text[:max_len] if len(text) > max_len else text
# Find the best match position
query_lower = query.lower()
text_lower = text.lower()
idx = text_lower.find(query_lower)
if idx == -1:
# Fuzzy: find first matching char
for i, c in enumerate(text_lower):
if c == query_lower[0]:
idx = i
break
else:
return text[:max_len]
start = max(0, idx - 30)
end = min(len(text), idx + len(query) + 30)
excerpt = text[start:end]
if start > 0:
excerpt = "..." + excerpt
if end < len(text):
excerpt = excerpt + "..."
# Highlight the query
pattern = re.compile(re.escape(query), re.IGNORECASE)
excerpt = pattern.sub(f"<mark>{query}</mark>", excerpt)
return excerpt
@router.get("/search/quick", response_model=QuickSwitcherResponse)
async def quick_switcher(
request: Request,
q: str = Query(..., min_length=1),
type: str = Query("documents", regex="^(documents|projects|all)$"),
limit: int = Query(10, ge=1, le=50),
project_id: str | None = Query(None),
db: AsyncSession = Depends(get_db),
):
"""
Quick Switcher: Fuzzy search across documents and/or projects.
Used for Cmd+K UI.
"""
agent = await get_current_agent(request, db)
if len(q) > 200:
raise HTTPException(status_code=400, detail="Query too long (max 200 chars)")
results: list[QuickSwitcherItem] = []
if type in ("documents", "all"):
# Search documents
docs_query = select(Document).where(
Document.is_deleted == False
).order_by(Document.updated_at.desc())
docs_result = await db.execute(docs_query)
all_docs = docs_result.scalars().all()
# Filter by project if specified
if project_id:
all_docs = [d for d in all_docs if d.project_id == project_id]
# Score and sort documents
scored_docs = []
for doc in all_docs:
title_score = _fuzzy_score(q, doc.title)
content_score = _fuzzy_score(q, doc.content[:500]) if doc.content else 0
best_score = max(title_score, content_score)
if best_score > 0:
scored_docs.append((best_score, doc))
scored_docs.sort(key=lambda x: -x[0])
for score, doc in scored_docs[:limit]:
# Get project name
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
project_name = project.name if project else ""
results.append(QuickSwitcherItem(
id=doc.id,
type="document",
title=doc.title,
subtitle=project_name,
highlight=_highlight_query(q, doc.title),
icon="📄",
project_id=doc.project_id,
))
if type in ("projects", "all"):
# Search projects
proj_query = select(Project).where(
Project.is_deleted == False
).order_by(Project.updated_at.desc())
proj_result = await db.execute(proj_query)
all_projects = proj_result.scalars().all()
scored_projects = []
for proj in all_projects:
score = _fuzzy_score(q, proj.name)
if score > 0:
scored_projects.append((score, proj))
scored_projects.sort(key=lambda x: -x[0])
for score, proj in scored_projects[:limit]:
# Count documents in project
count_result = await db.execute(
text("SELECT COUNT(*) FROM active_documents WHERE project_id = :pid"),
{"pid": proj.id}
)
doc_count = count_result.scalar() or 0
results.append(QuickSwitcherItem(
id=proj.id,
type="project",
title=proj.name,
subtitle=f"{doc_count} documents",
highlight=_highlight_query(q, proj.name),
icon="📁",
project_id=None,
))
return QuickSwitcherResponse(
query=q,
results=results[:limit],
total=len(results),
search_type="fuzzy",
)
# Import Document here to avoid circular reference
from app.models.document import Document
@router.get("/projects/{project_id}/documents/search", response_model=ProjectDocumentSearchResponse)
async def search_project_documents(
request: Request,
project_id: str,
q: str = Query(..., min_length=1, max_length=200),
limit: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
):
"""
Search within a specific project's documents.
"""
agent = await get_current_agent(request, db)
# Verify project access
proj_result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.agent_id == agent.id,
Project.is_deleted == False,
)
)
if not proj_result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Project not found")
# Get all documents in project
docs_result = await db.execute(
select(Document).where(
Document.project_id == project_id,
Document.is_deleted == False,
)
)
all_docs = docs_result.scalars().all()
# Score and filter
scored = []
for doc in all_docs:
title_score = _fuzzy_score(q, doc.title)
content_score = _fuzzy_score(q, doc.content[:1000]) if doc.content else 0
best_score = max(title_score, content_score)
if best_score > 0:
excerpt = _build_search_excerpt(doc.content or "", q)
scored.append((best_score, doc, excerpt))
scored.sort(key=lambda x: -x[0])
results = []
for score, doc, excerpt in scored[:limit]:
results.append(ProjectDocumentSearchItem(
document_id=doc.id,
title=doc.title,
excerpt=excerpt,
updated_at=doc.updated_at,
score=score,
))
return ProjectDocumentSearchResponse(
project_id=project_id,
query=q,
results=results,
total=len(results),
)
def _build_search_excerpt(content: str, query: str, context_chars: int = 150) -> str:
"""Build highlighted excerpt around query match."""
import re
query_lower = query.lower()
content_lower = content.lower()
idx = content_lower.find(query_lower)
if idx == -1:
return content[:context_chars * 2] or ""
start = max(0, idx - context_chars)
end = min(len(content), idx + len(query) + context_chars)
excerpt = content[start:end]
if start > 0:
excerpt = "..." + excerpt
if end < len(content):
excerpt = excerpt + "..."
pattern = re.compile(re.escape(query), re.IGNORECASE)
excerpt = pattern.sub(f"<mark>{query}</mark>", excerpt)
return excerpt

View File

@@ -145,3 +145,122 @@ class DocumentBriefResponse(BaseModel):
class DocumentListResponse(BaseModel): class DocumentListResponse(BaseModel):
documents: list[DocumentBriefResponse] documents: list[DocumentBriefResponse]
# =============================================================================
# Phase 3: Link Detection & Graph Schemas
# =============================================================================
class DetectLinksRequest(BaseModel):
content: str = Field(..., max_length=5_000_000) # ~5MB limit
class BrokenLink(BaseModel):
reference: str
reason: str # "document_not_found" | "invalid_format"
class DetectLinksResponse(BaseModel):
document_id: str
outgoing_links: list[str]
links_detected: int
links_broken: int
broken_links: list[BrokenLink] = []
class BacklinkItem(BaseModel):
document_id: str
title: str
project_id: str
project_name: str
excerpt: str
updated_at: datetime
class BacklinksResponse(BaseModel):
document_id: str
backlinks_count: int
backlinks: list[BacklinkItem]
class OutgoingLinkItem(BaseModel):
document_id: str
title: str
project_id: str
project_name: str
exists: bool
updated_at: datetime | None
class OutgoingLinksResponse(BaseModel):
document_id: str
outgoing_links_count: int
outgoing_links: list[OutgoingLinkItem]
class LinkItem(BaseModel):
document_id: str
title: str
anchor_text: str | None = None
class LinksResponse(BaseModel):
document_id: str
outgoing_links: list[LinkItem]
backlinks: list[LinkItem]
class GraphNode(BaseModel):
id: str
title: str
type: str = "document"
class GraphEdge(BaseModel):
source: str
target: str
type: str = "reference"
class GraphStats(BaseModel):
total_documents: int
total_references: int
orphaned_documents: int
class GraphResponse(BaseModel):
project_id: str
nodes: list[GraphNode]
edges: list[GraphEdge]
stats: GraphStats
# =============================================================================
# Phase 3: Export Schemas
# =============================================================================
class DocumentExportResponse(BaseModel):
"""Used for JSON export format."""
id: str
title: str
content: str
tiptap_content: dict[str, Any] | None = None
created_at: datetime
updated_at: datetime
metadata: dict[str, Any] = Field(default_factory=dict)
class ProjectExportDocument(BaseModel):
id: str
title: str
content: str
tiptap_content: dict[str, Any] | None = None
outgoing_links: list[str] = []
metadata: dict[str, Any] = Field(default_factory=dict)
class ProjectExportResponse(BaseModel):
project: dict[str, Any]
documents: list[ProjectExportDocument]
exported_at: datetime
format_version: str = "3.0"

View File

@@ -16,3 +16,39 @@ class SearchResult(BaseModel):
class SearchResponse(BaseModel): class SearchResponse(BaseModel):
results: list[SearchResult] results: list[SearchResult]
# =============================================================================
# Phase 3: Quick Switcher Schemas (Fuzzy Search)
# =============================================================================
class QuickSwitcherItem(BaseModel):
id: str
type: str # "document" | "project"
title: str
subtitle: str | None = None
highlight: str | None = None # HTML with <mark> tags
icon: str | None = None
project_id: str | None = None
class QuickSwitcherResponse(BaseModel):
query: str
results: list[QuickSwitcherItem]
total: int
search_type: str = "fuzzy"
class ProjectDocumentSearchItem(BaseModel):
document_id: str
title: str
excerpt: str
updated_at: datetime
score: float
class ProjectDocumentSearchResponse(BaseModel):
project_id: str
query: str
results: list[ProjectDocumentSearchItem]
total: int

633
tests/test_phase3.py Normal file
View File

@@ -0,0 +1,633 @@
import pytest
import uuid
async def setup_project_documents(client):
"""Create agent, project, and 3 documents for link testing."""
await client.post("/api/v1/auth/register", json={"username": "linkuser", "password": "pass123"})
login = await client.post("/api/v1/auth/login", json={"username": "linkuser", "password": "pass123"})
token = login.json()["access_token"]
proj_resp = await client.post(
"/api/v1/projects",
json={"name": "Link Test Project"},
headers={"Authorization": f"Bearer {token}"}
)
proj_id = proj_resp.json()["id"]
# Create doc1
doc1_resp = await client.post(
f"/api/v1/projects/{proj_id}/documents",
json={"title": "Document One", "content": "This is the first document"},
headers={"Authorization": f"Bearer {token}"}
)
doc1_id = doc1_resp.json()["id"]
# Create doc2
doc2_resp = await client.post(
f"/api/v1/projects/{proj_id}/documents",
json={"title": "Document Two", "content": "This is the second document"},
headers={"Authorization": f"Bearer {token}"}
)
doc2_id = doc2_resp.json()["id"]
# Create doc3
doc3_resp = await client.post(
f"/api/v1/projects/{proj_id}/documents",
json={"title": "Document Three", "content": "This is the third document"},
headers={"Authorization": f"Bearer {token}"}
)
doc3_id = doc3_resp.json()["id"]
return token, proj_id, doc1_id, doc2_id, doc3_id
# =============================================================================
# Link Detection Tests
# =============================================================================
@pytest.mark.asyncio
async def test_detect_links_valid(client):
"""Test detecting valid [[uuid]] links in content."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
content = f"This references [[{doc2_id}]] and also [[{doc3_id}|Document Three]]"
response = await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": content},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["document_id"] == doc1_id
assert set(data["outgoing_links"]) == {doc2_id, doc3_id}
assert data["links_detected"] == 2
assert data["links_broken"] == 0
@pytest.mark.asyncio
async def test_detect_links_broken(client):
"""Test detecting broken links (non-existent documents)."""
token, proj_id, doc1_id, _, _ = await setup_project_documents(client)
fake_id = str(uuid.uuid4())
content = f"This references [[{fake_id}]] which doesn't exist"
response = await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": content},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["outgoing_links"] == []
assert data["links_broken"] == 1
assert data["broken_links"][0]["reason"] == "document_not_found"
@pytest.mark.asyncio
async def test_detect_links_empty_content(client):
"""Test detect-links with no [[uuid]] patterns."""
token, proj_id, doc1_id, _, _ = await setup_project_documents(client)
response = await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": "No links here just plain text"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["outgoing_links"] == []
assert data["links_detected"] == 0
@pytest.mark.asyncio
async def test_detect_links_preserves_order_and_dedups(client):
"""Test that outgoing_links preserves order and deduplicates."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
# Reference doc2 twice - should only appear once in output
content = f"See [[{doc2_id}]] and again [[{doc2_id}]] and [[{doc3_id}]]"
response = await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": content},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
# Order should be preserved, duplicates removed
assert data["outgoing_links"] == [doc2_id, doc3_id]
@pytest.mark.asyncio
async def test_detect_links_updates_backlinks_count(client):
"""Test that detect-links updates backlinks_count on target documents."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
# Add links from doc1 -> doc2
content = f"Links to [[{doc2_id}]] and [[{doc3_id}]]"
await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": content},
headers={"Authorization": f"Bearer {token}"}
)
# Check backlinks count on doc2
doc2_get = await client.get(
f"/api/v1/documents/{doc2_id}",
headers={"Authorization": f"Bearer {token}"}
)
# Note: the model may not expose backlinks_count directly in response
# The count is tracked in DB for graph queries
# =============================================================================
# Backlinks Tests
# =============================================================================
@pytest.mark.asyncio
async def test_get_outgoing_links(client):
"""Test getting outgoing links from a document."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
# Add links from doc1 -> doc2, doc3
content = f"References [[{doc2_id}]] and [[{doc3_id}]]"
await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": content},
headers={"Authorization": f"Bearer {token}"}
)
response = await client.get(
f"/api/v1/documents/{doc1_id}/outgoing-links",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["document_id"] == doc1_id
assert data["outgoing_links_count"] == 2
link_ids = [l["document_id"] for l in data["outgoing_links"]]
assert set(link_ids) == {doc2_id, doc3_id}
for link in data["outgoing_links"]:
assert link["exists"] is True
@pytest.mark.asyncio
async def test_get_outgoing_links_deleted_target(client):
"""Test outgoing links shows exists:false for deleted targets."""
token, proj_id, doc1_id, doc2_id, _ = await setup_project_documents(client)
# Add link
await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": f"Links to [[{doc2_id}]]"},
headers={"Authorization": f"Bearer {token}"}
)
# Delete doc2
await client.delete(
f"/api/v1/documents/{doc2_id}",
headers={"Authorization": f"Bearer {token}"}
)
response = await client.get(
f"/api/v1/documents/{doc1_id}/outgoing-links",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
# The outgoing_links should still show doc2_id but with exists:false
# OR it might be filtered out depending on implementation
# This test documents expected behavior
@pytest.mark.asyncio
async def test_get_backlinks(client):
"""Test getting backlinks to a document."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
# doc1 and doc2 both link to doc3
await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": f"See [[{doc3_id}]] for details"},
headers={"Authorization": f"Bearer {token}"}
)
await client.post(
f"/api/v1/documents/{doc2_id}/detect-links",
json={"content": f"Also see [[{doc3_id}]] here"},
headers={"Authorization": f"Bearer {token}"}
)
response = await client.get(
f"/api/v1/documents/{doc3_id}/backlinks",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["document_id"] == doc3_id
assert data["backlinks_count"] == 2
backlink_ids = [b["document_id"] for b in data["backlinks"]]
assert set(backlink_ids) == {doc1_id, doc2_id}
@pytest.mark.asyncio
async def test_get_backlinks_empty(client):
"""Test getting backlinks when no documents reference this one."""
token, proj_id, doc1_id, _, _ = await setup_project_documents(client)
response = await client.get(
f"/api/v1/documents/{doc1_id}/backlinks",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["backlinks"] == []
@pytest.mark.asyncio
async def test_get_links_combined(client):
"""Test the combined /links endpoint returns both incoming and outgoing."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
# doc1 -> doc2 and doc3
# doc2 -> doc3
await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": f"Links to [[{doc2_id}]] and [[{doc3_id}]]"},
headers={"Authorization": f"Bearer {token}"}
)
await client.post(
f"/api/v1/documents/{doc2_id}/detect-links",
json={"content": f"Links to [[{doc3_id}]]"},
headers={"Authorization": f"Bearer {token}"}
)
# Check doc3's links - should have backlinks from doc1 and doc2
response = await client.get(
f"/api/v1/documents/{doc3_id}/links",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["document_id"] == doc3_id
assert len(data["backlinks"]) == 2
assert data["outgoing_links"] == []
# =============================================================================
# Project Graph Tests
# =============================================================================
@pytest.mark.asyncio
async def test_get_project_graph(client):
"""Test getting project graph."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
# Create link: doc1 -> doc2
await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": f"See [[{doc2_id}]]"},
headers={"Authorization": f"Bearer {token}"}
)
response = await client.get(
f"/api/v1/projects/{proj_id}/graph",
params={"depth": 2},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["project_id"] == proj_id
assert data["stats"]["total_documents"] == 3
assert data["stats"]["total_references"] == 1
node_ids = [n["id"] for n in data["nodes"]]
assert set(node_ids) == {doc1_id, doc2_id, doc3_id}
edge_sources = [e["source"] for e in data["edges"]]
assert doc1_id in edge_sources
@pytest.mark.asyncio
async def test_get_project_graph_depth(client):
"""Test that depth parameter works correctly."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
# doc1 -> doc2 -> doc3
await client.post(
f"/api/v1/documents/{doc1_id}/detect-links",
json={"content": f"Link to [[{doc2_id}]]"},
headers={"Authorization": f"Bearer {token}"}
)
await client.post(
f"/api/v1/documents/{doc2_id}/detect-links",
json={"content": f"Link to [[{doc3_id}]]"},
headers={"Authorization": f"Bearer {token}"}
)
response = await client.get(
f"/api/v1/projects/{proj_id}/graph",
params={"depth": 3},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["stats"]["total_references"] == 2
@pytest.mark.asyncio
async def test_get_project_graph_orphaned(client):
"""Test orphaned documents detection."""
token, proj_id, doc1_id, _, _ = await setup_project_documents(client)
# All docs have no links between them - all 3 are orphaned
response = await client.get(
f"/api/v1/projects/{proj_id}/graph",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
# All 3 documents have no incoming AND no outgoing links
assert data["stats"]["orphaned_documents"] == 3
# =============================================================================
# Quick Switcher Tests
# =============================================================================
@pytest.mark.asyncio
async def test_quick_switcher_documents(client):
"""Test Quick Switcher searching documents."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
response = await client.get(
"/api/v1/search/quick",
params={"q": "Document", "type": "documents", "limit": 10},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["query"] == "Document"
assert data["search_type"] == "fuzzy"
assert len(data["results"]) == 3
for item in data["results"]:
assert item["type"] == "document"
assert item["icon"] == "📄"
@pytest.mark.asyncio
async def test_quick_switcher_projects(client):
"""Test Quick Switcher searching projects."""
token, _, _, _, _ = await setup_project_documents(client)
response = await client.get(
"/api/v1/search/quick",
params={"q": "Link", "type": "projects", "limit": 10},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["search_type"] == "fuzzy"
assert len(data["results"]) == 1
assert data["results"][0]["type"] == "project"
assert data["results"][0]["icon"] == "📁"
@pytest.mark.asyncio
async def test_quick_switcher_all_types(client):
"""Test Quick Switcher with type=all."""
token, proj_id, doc1_id, _, _ = await setup_project_documents(client)
# Search for "Test" which should match the project name "Link Test Project"
# and also potentially documents
response = await client.get(
"/api/v1/search/quick",
params={"q": "Test", "type": "all", "limit": 10},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
types = [r["type"] for r in data["results"]]
# Should include at least the project
assert "project" in types
@pytest.mark.asyncio
async def test_quick_switcher_with_highlight(client):
"""Test that results include highlight markup."""
token, proj_id, doc1_id, _, _ = await setup_project_documents(client)
response = await client.get(
"/api/v1/search/quick",
params={"q": "Document", "type": "documents", "limit": 10},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
for item in data["results"]:
assert "<mark>" in (item["highlight"] or "")
@pytest.mark.asyncio
async def test_quick_switcher_project_filter(client):
"""Test Quick Switcher filtered by project."""
token, proj_id, doc1_id, _, _ = await setup_project_documents(client)
response = await client.get(
f"/api/v1/search/quick",
params={"q": "Document", "type": "documents", "project_id": proj_id, "limit": 10},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
for item in data["results"]:
assert item["project_id"] == proj_id
@pytest.mark.asyncio
async def test_quick_switcher_query_too_long(client):
"""Test that queries over 200 chars are rejected."""
token, _, _, _, _ = await setup_project_documents(client)
long_query = "a" * 201
response = await client.get(
"/api/v1/search/quick",
params={"q": long_query},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 400
# =============================================================================
# Project Document Search Tests
# =============================================================================
@pytest.mark.asyncio
async def test_search_project_documents(client):
"""Test searching within a project's documents."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
response = await client.get(
f"/api/v1/projects/{proj_id}/documents/search",
params={"q": "second"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["project_id"] == proj_id
assert data["query"] == "second"
assert data["total"] == 1
assert data["results"][0]["title"] == "Document Two"
@pytest.mark.asyncio
async def test_search_project_documents_no_results(client):
"""Test search with no matching documents."""
token, proj_id, _, _, _ = await setup_project_documents(client)
response = await client.get(
f"/api/v1/projects/{proj_id}/documents/search",
params={"q": "nonexistent xyz"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["results"] == []
# =============================================================================
# Export Tests
# =============================================================================
@pytest.mark.asyncio
async def test_export_document_markdown(client):
"""Test exporting a document as Markdown."""
token, proj_id, doc1_id, _, _ = await setup_project_documents(client)
# Update doc content
await client.put(
f"/api/v1/documents/{doc1_id}/content",
json={"content": "# Hello\n\nWorld content"},
headers={"Authorization": f"Bearer {token}"}
)
response = await client.get(
f"/api/v1/documents/{doc1_id}/export",
params={"format": "markdown"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert "text/markdown" in response.headers["content-type"]
assert "Hello" in response.text
assert "World content" in response.text
@pytest.mark.asyncio
async def test_export_document_json(client):
"""Test exporting a document as JSON."""
token, proj_id, doc1_id, _, _ = await setup_project_documents(client)
response = await client.get(
f"/api/v1/documents/{doc1_id}/export",
params={"format": "json"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert "application/json" in response.headers["content-type"]
data = response.json()
assert data["id"] == doc1_id
assert data["title"] == "Document One"
assert "metadata" in data
@pytest.mark.asyncio
async def test_export_project_json(client):
"""Test exporting a project as JSON."""
token, proj_id, doc1_id, doc2_id, doc3_id = await setup_project_documents(client)
response = await client.get(
f"/api/v1/projects/{proj_id}/export",
params={"format": "json"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert "application/json" in response.headers["content-type"]
data = response.json()
assert data["project"]["id"] == proj_id
assert len(data["documents"]) == 3
assert data["format_version"] == "3.0"
@pytest.mark.asyncio
async def test_export_project_zip(client):
"""Test exporting a project as ZIP."""
token, proj_id, doc1_id, doc2_id, _ = await setup_project_documents(client)
response = await client.get(
f"/api/v1/projects/{proj_id}/export",
params={"format": "zip"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert "application/zip" in response.headers["content-type"]
assert ".zip" in response.headers["content-disposition"]
@pytest.mark.asyncio
async def test_export_document_not_found(client):
"""Test export returns 404 for non-existent document."""
await client.post("/api/v1/auth/register", json={"username": "exportuser2", "password": "pass123"})
login = await client.post("/api/v1/auth/login", json={"username": "exportuser2", "password": "pass123"})
token = login.json()["access_token"]
fake_id = str(uuid.uuid4())
response = await client.get(
f"/api/v1/documents/{fake_id}/export",
params={"format": "markdown"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_export_project_not_found(client):
"""Test export returns 404 for non-existent project."""
await client.post("/api/v1/auth/register", json={"username": "exportuser3", "password": "pass123"})
login = await client.post("/api/v1/auth/login", json={"username": "exportuser3", "password": "pass123"})
token = login.json()["access_token"]
fake_id = str(uuid.uuid4())
response = await client.get(
f"/api/v1/projects/{fake_id}/export",
params={"format": "json"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 404