feat: Add role-based API tokens for Claudia Docs
- Add api_tokens table with role-based access (researcher, developer, viewer)
- Add POST /auth/token/generate endpoint for creating tokens
- Add GET /auth/tokens endpoint for listing user's tokens
- Add DELETE /auth/tokens/{token_id} endpoint for revoking tokens
- Add agent_type field to documents (research, development, general)
- Implement role-based access control for documents:
- researcher: access to research and general documents
- developer: access to development and general documents
- viewer: read-only access
- Update document model and schemas with agent_type field
- Add comprehensive tests for API token functionality
- All existing tests pass (73 total)
This commit is contained in:
259
tests/test_api_tokens.py
Normal file
259
tests/test_api_tokens.py
Normal file
@@ -0,0 +1,259 @@
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from httpx import AsyncClient
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
os.environ["DATABASE_URL"] = "sqlite+aiosqlite:///:memory:"
|
||||
|
||||
from app.main import app
|
||||
from app.database import Base, get_db, async_engine
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def event_loop():
|
||||
loop = asyncio.get_event_loop_policy().new_event_loop()
|
||||
yield loop
|
||||
loop.close()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="function")
|
||||
async def db_session():
|
||||
"""Create a fresh in-memory database for each test."""
|
||||
async with async_engine.begin() as conn:
|
||||
from app.database import _create_schema
|
||||
await conn.run_sync(_create_schema)
|
||||
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
|
||||
async_session = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)
|
||||
|
||||
async with async_session() as session:
|
||||
yield session
|
||||
await session.rollback()
|
||||
|
||||
async with async_engine.begin() as conn:
|
||||
for table in reversed(Base.metadata.sorted_tables):
|
||||
await conn.execute(table.delete())
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="function")
|
||||
async def client(db_session):
|
||||
"""Async HTTP client for testing."""
|
||||
async def override_get_db():
|
||||
yield db_session
|
||||
|
||||
app.dependency_overrides[get_db] = override_get_db
|
||||
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac:
|
||||
yield ac
|
||||
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="function")
|
||||
async def admin_user(client):
|
||||
"""Create an admin user for testing."""
|
||||
# Create admin directly in database with admin role
|
||||
import uuid
|
||||
import bcrypt
|
||||
from sqlalchemy import text
|
||||
|
||||
password_hash = bcrypt.hashpw("adminpass123".encode(), bcrypt.gensalt()).decode()
|
||||
|
||||
async with async_engine.begin() as conn:
|
||||
await conn.execute(
|
||||
text("""
|
||||
INSERT INTO agents (id, username, password_hash, role, is_deleted, created_at, updated_at)
|
||||
VALUES (:id, :username, :password_hash, 'admin', 0, datetime('now'), datetime('now'))
|
||||
"""),
|
||||
{
|
||||
"id": str(uuid.uuid4()),
|
||||
"username": "testadmin",
|
||||
"password_hash": password_hash
|
||||
}
|
||||
)
|
||||
|
||||
# Login as admin
|
||||
login_resp = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "testadmin", "password": "adminpass123"}
|
||||
)
|
||||
return login_resp.json()["access_token"]
|
||||
|
||||
|
||||
from httpx import ASGITransport
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_api_token(client, admin_user):
|
||||
"""Test creating an API token as admin."""
|
||||
# Generate an API token with researcher role
|
||||
response = await client.post(
|
||||
"/api/v1/auth/token/generate",
|
||||
json={"name": "research-token", "role": "researcher"},
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
assert response.status_code == 201, f"Expected 201 but got {response.status_code}: {response.json()}"
|
||||
data = response.json()
|
||||
assert data["name"] == "research-token"
|
||||
assert data["role"] == "researcher"
|
||||
assert "token" in data
|
||||
assert len(data["token"]) > 20 # Token should be long
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_api_token_non_admin_forbidden(client):
|
||||
"""Test that non-admin cannot create API tokens."""
|
||||
# Register and login as regular agent
|
||||
await client.post("/api/v1/auth/register", json={"username": "agent1", "password": "pass123"})
|
||||
login_resp = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "agent1", "password": "pass123"}
|
||||
)
|
||||
token = login_resp.json()["access_token"]
|
||||
|
||||
# Try to generate API token
|
||||
response = await client.post(
|
||||
"/api/v1/auth/token/generate",
|
||||
json={"name": "test-token", "role": "researcher"},
|
||||
headers={"Authorization": f"Bearer {token}"}
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_api_tokens(client, admin_user):
|
||||
"""Test listing API tokens."""
|
||||
# Generate two tokens
|
||||
await client.post(
|
||||
"/api/v1/auth/token/generate",
|
||||
json={"name": "token1", "role": "researcher"},
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
await client.post(
|
||||
"/api/v1/auth/token/generate",
|
||||
json={"name": "token2", "role": "developer"},
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
|
||||
# List tokens
|
||||
response = await client.get(
|
||||
"/api/v1/auth/tokens",
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert len(data) == 2
|
||||
assert data[0]["name"] == "token2" # Most recent first
|
||||
assert data[1]["name"] == "token1"
|
||||
# Tokens should not include the actual token value
|
||||
assert "token" not in data[0]
|
||||
assert "token_hash" not in data[0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_revoke_api_token(client, admin_user):
|
||||
"""Test revoking an API token."""
|
||||
# Generate a token
|
||||
gen_resp = await client.post(
|
||||
"/api/v1/auth/token/generate",
|
||||
json={"name": "revoke-me", "role": "viewer"},
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
researcher_token = gen_resp.json()["token"]
|
||||
|
||||
# List tokens - should have 1
|
||||
list_resp = await client.get(
|
||||
"/api/v1/auth/tokens",
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
assert len(list_resp.json()) == 1
|
||||
api_token_id = list_resp.json()[0]["id"]
|
||||
|
||||
# Revoke the token
|
||||
response = await client.delete(
|
||||
f"/api/v1/auth/tokens/{api_token_id}",
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
assert response.status_code == 204
|
||||
|
||||
# List tokens - should be empty
|
||||
list_resp = await client.get(
|
||||
"/api/v1/auth/tokens",
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
assert len(list_resp.json()) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_revoke_nonexistent_token(client, admin_user):
|
||||
"""Test revoking a non-existent token returns 404."""
|
||||
response = await client.delete(
|
||||
"/api/v1/auth/tokens/nonexistent-id",
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_token_with_invalid_role(client, admin_user):
|
||||
"""Test that invalid role is rejected."""
|
||||
response = await client.post(
|
||||
"/api/v1/auth/token/generate",
|
||||
json={"name": "bad-role", "role": "invalid_role"},
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
assert response.status_code == 422 # Validation error
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_token_auth_flow(client, admin_user):
|
||||
"""Test full flow: create project with JWT, then access with API token."""
|
||||
# Generate researcher API token
|
||||
gen_resp = await client.post(
|
||||
"/api/v1/auth/token/generate",
|
||||
json={"name": "research-token", "role": "researcher"},
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
researcher_token = gen_resp.json()["token"]
|
||||
|
||||
# Create a project
|
||||
proj_resp = await client.post(
|
||||
"/api/v1/projects",
|
||||
json={"name": "Test Project"},
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
proj_id = proj_resp.json()["id"]
|
||||
|
||||
# Create a research document
|
||||
doc_resp = await client.post(
|
||||
f"/api/v1/projects/{proj_id}/documents",
|
||||
json={"title": "Research Doc", "content": "Research content", "agent_type": "research"},
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
assert doc_resp.status_code == 201
|
||||
doc_id = doc_resp.json()["id"]
|
||||
|
||||
# Access document with researcher token - should work (research doc)
|
||||
get_resp = await client.get(
|
||||
f"/api/v1/documents/{doc_id}",
|
||||
headers={"Authorization": f"Bearer {researcher_token}"}
|
||||
)
|
||||
assert get_resp.status_code == 200
|
||||
|
||||
# Create a development document
|
||||
dev_doc_resp = await client.post(
|
||||
f"/api/v1/projects/{proj_id}/documents",
|
||||
json={"title": "Dev Doc", "content": "Dev content", "agent_type": "development"},
|
||||
headers={"Authorization": f"Bearer {admin_user}"}
|
||||
)
|
||||
assert dev_doc_resp.status_code == 201
|
||||
dev_doc_id = dev_doc_resp.json()["id"]
|
||||
|
||||
# Access dev document with researcher token - should fail (read access denied)
|
||||
get_resp = await client.get(
|
||||
f"/api/v1/documents/{dev_doc_id}",
|
||||
headers={"Authorization": f"Bearer {researcher_token}"}
|
||||
)
|
||||
assert get_resp.status_code == 403
|
||||
Reference in New Issue
Block a user