Files
claudia-docs-api/app/routers/documents.py
Motoko ae2409ef46 fix: API token project ownership check
- API tokens now verify project belongs to token owner before access
- Researcher tokens only access research/general docs in owner's projects
- Developer tokens only access development/general docs in owner's projects
- Viewer tokens have read-only access to all doc types in owner's projects
- Add test for cross-user project access prevention
2026-03-31 01:56:26 +00:00

982 lines
32 KiB
Python

import json
import uuid
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy import delete, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.document import Document, ReasoningType
from app.models.folder import Folder
from app.models.project import Project
from app.models.tag import DocumentTag, Tag
from app.routers.auth import get_current_agent, get_current_agent_or_api_token
from app.schemas.document import (
DocumentBriefResponse,
DocumentContentUpdate,
DocumentCreate,
DocumentListResponse,
DocumentResponse,
DocumentUpdate,
ReasoningMetadata,
ReasoningPanel,
ReasoningStep,
ReasoningStepAdd,
ReasoningUpdate,
TagInfo,
TipTapContentResponse,
TipTapContentUpdate,
)
from app.schemas.tag import DocumentTagsAssign
router = APIRouter(tags=["documents"])
def build_doc_path(project_id: str, doc_id: str, folder_id: str | None, folder_path: str | None) -> str:
if folder_id and folder_path:
return f"{folder_path}/{doc_id}"
return f"/{project_id}/{doc_id}"
def tiptap_to_markdown(tiptap: dict) -> str:
"""Convert TipTap JSON to Markdown string."""
if not tiptap or not isinstance(tiptap, dict):
return ""
lines = []
def process_node(node: dict) -> str:
if not isinstance(node, dict):
return ""
node_type = node.get("type", "")
content = node.get("content", [])
if node_type == "doc":
result = []
for child in content:
result.append(process_node(child))
return "\n".join(result)
elif node_type == "paragraph":
inner = "".join(process_node(c) for c in content)
return f"{inner}\n"
elif node_type == "heading":
level = node.get("attrs", {}).get("level", 1)
inner = "".join(process_node(c) for c in content)
return f"{'#' * level} {inner}\n"
elif node_type == "text":
text_val = node.get("text", "")
marks = node.get("marks", [])
for mark in marks:
if mark.get("type") == "bold":
text_val = f"**{text_val}**"
elif mark.get("type") == "italic":
text_val = f"*{text_val}*"
elif mark.get("type") == "code":
text_val = f"`{text_val}`"
elif mark.get("type") == "strike":
text_val = f"~~{text_val}~~"
return text_val
elif node_type == "bulletList":
return "\n".join(process_node(item) for item in content) + "\n"
elif node_type == "orderedList":
return "\n".join(process_node(item) for item in content) + "\n"
elif node_type == "listItem":
inner = "".join(process_node(c) for c in content)
return f"- {inner.strip()}\n"
elif node_type == "blockquote":
inner = "".join(process_node(c) for c in content)
return f"> {inner.strip()}\n"
elif node_type == "codeBlock":
lang = node.get("attrs", {}).get("language", "")
inner = "".join(process_node(c) for c in content)
return f"```{lang}\n{inner}\n```\n"
elif node_type == "hardBreak":
return "\n"
elif node_type == "horizontalRule":
return "---\n"
elif node_type == "image":
src = node.get("attrs", {}).get("src", "")
alt = node.get("attrs", {}).get("alt", "")
return f"![{alt}]({src})"
return ""
result = process_node(tiptap)
return result.strip()
async def get_document_tags(db: AsyncSession, doc_id: str) -> list[TagInfo]:
result = await db.execute(
text("""
SELECT t.id, t.name, t.color
FROM active_tags t
JOIN document_tags dt ON t.id = dt.tag_id
WHERE dt.document_id = :doc_id
"""),
{"doc_id": doc_id}
)
rows = result.fetchall()
return [TagInfo(id=r.id, name=r.name, color=r.color) for r in rows]
async def document_to_response(db: AsyncSession, doc: Document) -> DocumentResponse:
tags = await get_document_tags(db, doc.id)
# Parse reasoning_steps from JSON if present
reasoning_steps = []
if doc.reasoning_steps:
try:
reasoning_steps = json.loads(doc.reasoning_steps)
except json.JSONDecodeError:
reasoning_steps = []
# Parse tiptap_content from JSON if present
tiptap_content = None
if doc.tiptap_content:
try:
tiptap_content = json.loads(doc.tiptap_content)
except json.JSONDecodeError:
tiptap_content = None
# Parse confidence from string if present
confidence = None
if doc.confidence:
try:
confidence = float(doc.confidence)
except (ValueError, TypeError):
confidence = None
return DocumentResponse(
id=doc.id,
title=doc.title,
content=doc.content,
project_id=doc.project_id,
folder_id=doc.folder_id,
path=doc.path,
tags=tags,
created_at=doc.created_at,
updated_at=doc.updated_at,
reasoning_type=doc.reasoning_type,
confidence=confidence,
reasoning_steps=reasoning_steps,
model_source=doc.model_source,
tiptap_content=tiptap_content,
)
def _can_access_document(api_role: str | None, doc_agent_type: str | None, require_write: bool = False) -> bool:
"""
Check if a role can access a document based on agent_type.
Rules:
- JWT tokens (api_role is None) have full access via project ownership check
- researcher: can access 'research' and 'general' documents
- developer: can access 'development' and 'general' documents
- viewer: can only read (handled elsewhere), not create/modify
- admin: full access (but admin is a JWT role, not API token role)
For write operations, viewer is denied.
"""
if api_role is None:
# JWT token - access is controlled by project ownership
return True
# API token role-based access
doc_type = doc_agent_type or 'general'
if require_write:
# Viewers cannot create/update/delete
if api_role == "viewer":
return False
# Researchers can only write to research and general
if api_role == "researcher":
return doc_type in ("research", "general")
# Developers can only write to development and general
if api_role == "developer":
return doc_type in ("development", "general")
else:
# Read access - viewers can read research, development, and general
if api_role == "viewer":
return doc_type in ("research", "development", "general")
# Researchers can only read research and general
if api_role == "researcher":
return doc_type in ("research", "general")
# Developers can only read development and general
if api_role == "developer":
return doc_type in ("development", "general")
return False
@router.get("/api/v1/projects/{project_id}/documents", response_model=DocumentListResponse)
async def list_documents(
request: Request,
project_id: str,
db: AsyncSession = Depends(get_db),
):
agent, api_role = await get_current_agent_or_api_token(request, db)
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
result = await db.execute(
select(Document).where(
Document.project_id == project_id,
Document.is_deleted == False,
).order_by(Document.created_at.desc())
)
docs = result.scalars().all()
responses = []
for doc in docs:
# Apply role-based filtering for API tokens
if api_role is not None and not _can_access_document(api_role, doc.agent_type, require_write=False):
continue
tags = await get_document_tags(db, doc.id)
responses.append(DocumentBriefResponse(
id=doc.id,
title=doc.title,
project_id=doc.project_id,
folder_id=doc.folder_id,
path=doc.path,
tags=tags,
created_at=doc.created_at,
updated_at=doc.updated_at,
))
return DocumentListResponse(documents=responses)
@router.post("/api/v1/projects/{project_id}/documents", response_model=DocumentResponse, status_code=201)
async def create_document(
request: Request,
project_id: str,
payload: DocumentCreate,
db: AsyncSession = Depends(get_db),
):
agent, api_role = await get_current_agent_or_api_token(request, db)
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
# Determine agent_type for the document
doc_agent_type = payload.agent_type or "general"
if doc_agent_type not in ("research", "development", "general"):
raise HTTPException(status_code=400, detail="Invalid agent_type")
# Check role-based write access
if api_role is not None and not _can_access_document(api_role, doc_agent_type, require_write=True):
raise HTTPException(status_code=403, detail="Forbidden")
folder_path = None
if payload.folder_id:
folder_result = await db.execute(
select(Folder).where(
Folder.id == payload.folder_id,
Folder.project_id == project_id,
Folder.is_deleted == False,
)
)
folder = folder_result.scalar_one_or_none()
if not folder:
raise HTTPException(status_code=400, detail="Folder not found")
folder_path = folder.path
doc_id = str(uuid.uuid4())
path = build_doc_path(project_id, doc_id, payload.folder_id, folder_path)
doc = Document(
id=doc_id,
title=payload.title,
content=payload.content,
project_id=project_id,
folder_id=payload.folder_id,
path=path,
agent_type=doc_agent_type,
)
db.add(doc)
await db.flush()
return await document_to_response(db, doc)
@router.get("/api/v1/documents/{document_id}", response_model=DocumentResponse)
async def get_document(
request: Request,
document_id: str,
db: AsyncSession = Depends(get_db),
):
agent, api_role = await get_current_agent_or_api_token(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == False,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Document not found")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
# Check role-based access for API tokens
if api_role is not None and not _can_access_document(api_role, doc.agent_type, require_write=False):
raise HTTPException(status_code=403, detail="Forbidden")
return await document_to_response(db, doc)
@router.put("/api/v1/documents/{document_id}", response_model=DocumentResponse)
async def update_document(
request: Request,
document_id: str,
payload: DocumentUpdate,
db: AsyncSession = Depends(get_db),
):
agent, api_role = await get_current_agent_or_api_token(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == False,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=403, detail="Forbidden")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
# Check role-based write access for API tokens
if api_role is not None and not _can_access_document(api_role, doc.agent_type, require_write=True):
raise HTTPException(status_code=403, detail="Forbidden")
if payload.title is not None:
doc.title = payload.title
if payload.folder_id is not None:
if payload.folder_id:
folder_result = await db.execute(
select(Folder).where(
Folder.id == payload.folder_id,
Folder.project_id == doc.project_id,
Folder.is_deleted == False,
)
)
folder = folder_result.scalar_one_or_none()
if not folder:
raise HTTPException(status_code=400, detail="Folder not found")
doc.path = f"{folder.path}/{doc.id}"
else:
doc.path = f"/{doc.project_id}/{doc.id}"
doc.folder_id = payload.folder_id
doc.updated_at = datetime.utcnow()
await db.flush()
return await document_to_response(db, doc)
@router.delete("/api/v1/documents/{document_id}", status_code=204)
async def delete_document(
request: Request,
document_id: str,
db: AsyncSession = Depends(get_db),
):
agent, api_role = await get_current_agent_or_api_token(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == False,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=403, detail="Forbidden")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
# Check role-based write access for API tokens
if api_role is not None and not _can_access_document(api_role, doc.agent_type, require_write=True):
raise HTTPException(status_code=403, detail="Forbidden")
doc.is_deleted = True
doc.deleted_at = datetime.utcnow()
doc.deleted_by = agent.id
await db.flush()
return None
@router.put("/api/v1/documents/{document_id}/content", response_model=DocumentResponse)
async def update_document_content(
request: Request,
document_id: str,
payload: TipTapContentUpdate,
db: AsyncSession = Depends(get_db),
):
"""Update document content (TipTap JSON or Markdown).
Phase 2: Now supports both TipTap JSON and Markdown formats via the 'format' field.
Also backward-compatible with legacy string content (treated as markdown).
"""
agent, api_role = await get_current_agent_or_api_token(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == False,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=403, detail="Forbidden")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
# Check role-based write access for API tokens
if api_role is not None and not _can_access_document(api_role, doc.agent_type, require_write=True):
raise HTTPException(status_code=403, detail="Forbidden")
# Determine actual format based on content type (backward compatibility)
# If content is a string, treat as markdown regardless of format field
# If content is a dict, treat as tiptap
is_string_content = isinstance(payload.content, str)
# Validate content size (1MB limit)
content_json = json.dumps(payload.content)
if len(content_json) > 1_000_000:
raise HTTPException(status_code=413, detail="Content too large (max 1MB)")
if is_string_content:
# Legacy string content or markdown - store as markdown
doc.content = payload.content
# Create a simple tiptap structure for the editor
doc.tiptap_content = json.dumps({
"type": "doc",
"content": [
{
"type": "paragraph",
"content": [{"type": "text", "text": payload.content}]
}
]
})
else:
# TipTap JSON content
if not isinstance(payload.content, dict):
raise HTTPException(status_code=400, detail="content must be a string or dict")
doc.tiptap_content = content_json
# Also update the plain content by converting tiptap -> markdown
doc.content = tiptap_to_markdown(payload.content)
doc.updated_at = datetime.utcnow()
await db.flush()
return await document_to_response(db, doc)
@router.post("/api/v1/documents/{document_id}/restore", response_model=DocumentResponse)
async def restore_document(
request: Request,
document_id: str,
db: AsyncSession = Depends(get_db),
):
agent, api_role = await get_current_agent_or_api_token(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == True,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=403, detail="Forbidden")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
# Check role-based write access for API tokens
if api_role is not None and not _can_access_document(api_role, doc.agent_type, require_write=True):
raise HTTPException(status_code=403, detail="Forbidden")
doc.is_deleted = False
doc.deleted_at = None
doc.deleted_by = None
await db.flush()
return await document_to_response(db, doc)
@router.post("/api/v1/documents/{document_id}/tags", status_code=204)
async def assign_tags(
request: Request,
document_id: str,
payload: DocumentTagsAssign,
db: AsyncSession = Depends(get_db),
):
agent, api_role = await get_current_agent_or_api_token(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == False,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=403, detail="Forbidden")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
# Check role-based write access for API tokens
if api_role is not None and not _can_access_document(api_role, doc.agent_type, require_write=True):
raise HTTPException(status_code=403, detail="Forbidden")
for tag_id in payload.tag_ids:
tag_result = await db.execute(
select(Tag).where(
Tag.id == tag_id,
Tag.is_deleted == False,
)
)
tag = tag_result.scalar_one_or_none()
if not tag:
raise HTTPException(status_code=400, detail=f"Tag {tag_id} not found")
existing = await db.execute(
select(DocumentTag).where(
DocumentTag.document_id == document_id,
DocumentTag.tag_id == tag_id,
)
)
if not existing.scalar_one_or_none():
dt = DocumentTag(document_id=document_id, tag_id=tag_id)
db.add(dt)
await db.flush()
return None
@router.delete("/api/v1/documents/{document_id}/tags/{tag_id}", status_code=204)
async def remove_tag(
request: Request,
document_id: str,
tag_id: str,
db: AsyncSession = Depends(get_db),
):
agent, api_role = await get_current_agent_or_api_token(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == False,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=403, detail="Forbidden")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
# Check role-based write access for API tokens
if api_role is not None and not _can_access_document(api_role, doc.agent_type, require_write=True):
raise HTTPException(status_code=403, detail="Forbidden")
await db.execute(
delete(DocumentTag).where(
DocumentTag.document_id == document_id,
DocumentTag.tag_id == tag_id,
)
)
await db.flush()
return None
# =============================================================================
# Phase 2: New Endpoints
# =============================================================================
async def _get_doc_with_access(
request: Request,
document_id: str,
db: AsyncSession,
require_write: bool = False,
) -> tuple[Document, str | None]:
"""Get document and check access. Returns (doc, api_role)."""
agent, api_role = await get_current_agent_or_api_token(request, db)
result = await db.execute(
select(Document).where(
Document.id == document_id,
Document.is_deleted == False,
)
)
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
# Check project ownership
proj_result = await db.execute(
select(Project).where(
Project.id == doc.project_id,
Project.is_deleted == False,
)
)
project = proj_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=403, detail="Forbidden")
# For API tokens, verify project belongs to the token owner
if api_role is not None and project.agent_id != agent.id:
raise HTTPException(status_code=403, detail="Forbidden")
# Check role-based access for API tokens
if api_role is not None and not _can_access_document(api_role, doc.agent_type, require_write=require_write):
raise HTTPException(status_code=403, detail="Forbidden")
return doc, api_role
@router.get("/api/v1/documents/{document_id}/reasoning")
async def get_document_reasoning(
request: Request,
document_id: str,
db: AsyncSession = Depends(get_db),
):
"""Get reasoning metadata for a document."""
doc, _ = await _get_doc_with_access(request, document_id, db)
# Parse reasoning_steps
reasoning_steps = []
if doc.reasoning_steps:
try:
reasoning_steps = json.loads(doc.reasoning_steps)
except json.JSONDecodeError:
reasoning_steps = []
# Parse confidence
confidence = None
if doc.confidence:
try:
confidence = float(doc.confidence)
except (ValueError, TypeError):
confidence = None
return {
"reasoning_type": doc.reasoning_type,
"confidence": confidence,
"reasoning_steps": reasoning_steps,
"model_source": doc.model_source,
}
@router.patch("/api/v1/documents/{document_id}/reasoning")
async def update_document_reasoning(
request: Request,
document_id: str,
payload: ReasoningUpdate,
db: AsyncSession = Depends(get_db),
):
"""Update reasoning metadata for a document."""
doc, _ = await _get_doc_with_access(request, document_id, db, require_write=True)
if payload.reasoning_type is not None:
doc.reasoning_type = payload.reasoning_type.value if hasattr(payload.reasoning_type, 'value') else payload.reasoning_type
if payload.confidence is not None:
if payload.confidence < 0.0 or payload.confidence > 1.0:
raise HTTPException(status_code=400, detail="confidence must be between 0.0 and 1.0")
doc.confidence = str(payload.confidence)
if payload.reasoning_steps is not None:
# Validate steps
for step in payload.reasoning_steps:
if not isinstance(step.step, int):
raise HTTPException(status_code=400, detail="Each step must have an integer 'step' field")
if not step.thought or len(step.thought) > 2000:
raise HTTPException(status_code=400, detail="thought must be non-empty and max 2000 chars")
doc.reasoning_steps = json.dumps([s.model_dump() for s in payload.reasoning_steps])
if payload.model_source is not None:
doc.model_source = payload.model_source
doc.updated_at = datetime.utcnow()
await db.flush()
return await get_document_reasoning(request, document_id, db)
@router.get("/api/v1/documents/{document_id}/reasoning-panel")
async def get_reasoning_panel(
request: Request,
document_id: str,
db: AsyncSession = Depends(get_db),
):
"""Get reasoning panel data for UI."""
doc, _ = await _get_doc_with_access(request, document_id, db)
# Check if document has reasoning
has_reasoning = any([
doc.reasoning_type is not None,
doc.confidence is not None,
doc.reasoning_steps,
doc.model_source is not None,
])
reasoning_metadata = None
if has_reasoning:
reasoning_steps = []
if doc.reasoning_steps:
try:
reasoning_steps = json.loads(doc.reasoning_steps)
except json.JSONDecodeError:
reasoning_steps = []
confidence = None
if doc.confidence:
try:
confidence = float(doc.confidence)
except (ValueError, TypeError):
confidence = None
reasoning_metadata = ReasoningMetadata(
reasoning_type=doc.reasoning_type,
confidence=confidence,
reasoning_steps=reasoning_steps,
model_source=doc.model_source,
)
return ReasoningPanel(
document_id=document_id,
has_reasoning=has_reasoning,
reasoning=reasoning_metadata,
editable=True, # Agent has write access since they passed _get_doc_with_access
)
@router.post("/api/v1/documents/{document_id}/reasoning-steps", status_code=201)
async def add_reasoning_step(
request: Request,
document_id: str,
payload: ReasoningStepAdd,
db: AsyncSession = Depends(get_db),
):
"""Add a new reasoning step to a document."""
doc, _ = await _get_doc_with_access(request, document_id, db, require_write=True)
# Parse existing steps
steps = []
if doc.reasoning_steps:
try:
steps = json.loads(doc.reasoning_steps)
except json.JSONDecodeError:
steps = []
# Determine next step number
next_step = max([s.get("step", 0) for s in steps], default=0) + 1
# Create new step
new_step = {
"step": next_step,
"thought": payload.thought,
"conclusion": payload.conclusion,
}
steps.append(new_step)
doc.reasoning_steps = json.dumps(steps)
doc.updated_at = datetime.utcnow()
await db.flush()
return new_step
@router.delete("/api/v1/documents/{document_id}/reasoning-steps/{step}", status_code=204)
async def delete_reasoning_step(
request: Request,
document_id: str,
step: int,
db: AsyncSession = Depends(get_db),
):
"""Delete a specific reasoning step from a document."""
doc, _ = await _get_doc_with_access(request, document_id, db, require_write=True)
# Parse existing steps
steps = []
if doc.reasoning_steps:
try:
steps = json.loads(doc.reasoning_steps)
except json.JSONDecodeError:
steps = []
# Find and remove the step
original_len = len(steps)
steps = [s for s in steps if s.get("step") != step]
if len(steps) == original_len:
raise HTTPException(status_code=404, detail="Step not found")
doc.reasoning_steps = json.dumps(steps)
doc.updated_at = datetime.utcnow()
await db.flush()
return None
@router.get("/api/v1/documents/{document_id}/content")
async def get_document_content(
request: Request,
document_id: str,
format: str = Query("tiptap", description="Output format: tiptap or markdown"),
db: AsyncSession = Depends(get_db),
):
"""Get document content in TipTap JSON or Markdown format."""
doc, _ = await _get_doc_with_access(request, document_id, db)
if format == "markdown":
# Try to get tiptap_content and convert
if doc.tiptap_content:
try:
tiptap = json.loads(doc.tiptap_content)
content = tiptap_to_markdown(tiptap)
return TipTapContentResponse(content=content, format="markdown")
except json.JSONDecodeError:
pass
# Fallback to plain content
return TipTapContentResponse(content=doc.content, format="markdown")
else:
# Return tiptap format
if doc.tiptap_content:
try:
tiptap = json.loads(doc.tiptap_content)
return TipTapContentResponse(content=tiptap, format="tiptap")
except json.JSONDecodeError:
pass
# Return default tiptap structure
default_tiptap = {"type": "doc", "content": [{"type": "paragraph", "content": []}]}
return TipTapContentResponse(content=default_tiptap, format="tiptap")