- API tokens now verify project belongs to token owner before access - Researcher tokens only access research/general docs in owner's projects - Developer tokens only access development/general docs in owner's projects - Viewer tokens have read-only access to all doc types in owner's projects - Add test for cross-user project access prevention
345 lines
11 KiB
Python
345 lines
11 KiB
Python
import pytest
|
|
import pytest_asyncio
|
|
from httpx import AsyncClient
|
|
|
|
import asyncio
|
|
import os
|
|
|
|
os.environ["DATABASE_URL"] = "sqlite+aiosqlite:///:memory:"
|
|
|
|
from app.main import app
|
|
from app.database import Base, get_db, async_engine
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def event_loop():
|
|
loop = asyncio.get_event_loop_policy().new_event_loop()
|
|
yield loop
|
|
loop.close()
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="function")
|
|
async def db_session():
|
|
"""Create a fresh in-memory database for each test."""
|
|
async with async_engine.begin() as conn:
|
|
from app.database import _create_schema
|
|
await conn.run_sync(_create_schema)
|
|
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
|
|
async_session = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
async with async_session() as session:
|
|
yield session
|
|
await session.rollback()
|
|
|
|
async with async_engine.begin() as conn:
|
|
for table in reversed(Base.metadata.sorted_tables):
|
|
await conn.execute(table.delete())
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="function")
|
|
async def client(db_session):
|
|
"""Async HTTP client for testing."""
|
|
async def override_get_db():
|
|
yield db_session
|
|
|
|
app.dependency_overrides[get_db] = override_get_db
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac:
|
|
yield ac
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="function")
|
|
async def admin_user(client):
|
|
"""Create an admin user for testing."""
|
|
# Create admin directly in database with admin role
|
|
import uuid
|
|
import bcrypt
|
|
from sqlalchemy import text
|
|
|
|
password_hash = bcrypt.hashpw("adminpass123".encode(), bcrypt.gensalt()).decode()
|
|
|
|
async with async_engine.begin() as conn:
|
|
await conn.execute(
|
|
text("""
|
|
INSERT INTO agents (id, username, password_hash, role, is_deleted, created_at, updated_at)
|
|
VALUES (:id, :username, :password_hash, 'admin', 0, datetime('now'), datetime('now'))
|
|
"""),
|
|
{
|
|
"id": str(uuid.uuid4()),
|
|
"username": "testadmin",
|
|
"password_hash": password_hash
|
|
}
|
|
)
|
|
|
|
# Login as admin
|
|
login_resp = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": "testadmin", "password": "adminpass123"}
|
|
)
|
|
return login_resp.json()["access_token"]
|
|
|
|
|
|
from httpx import ASGITransport
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_generate_api_token(client, admin_user):
|
|
"""Test creating an API token as admin."""
|
|
# Generate an API token with researcher role
|
|
response = await client.post(
|
|
"/api/v1/auth/token/generate",
|
|
json={"name": "research-token", "role": "researcher"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
assert response.status_code == 201, f"Expected 201 but got {response.status_code}: {response.json()}"
|
|
data = response.json()
|
|
assert data["name"] == "research-token"
|
|
assert data["role"] == "researcher"
|
|
assert "token" in data
|
|
assert len(data["token"]) > 20 # Token should be long
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_generate_api_token_non_admin_forbidden(client):
|
|
"""Test that non-admin cannot create API tokens."""
|
|
# Register and login as regular agent
|
|
await client.post("/api/v1/auth/register", json={"username": "agent1", "password": "pass123"})
|
|
login_resp = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": "agent1", "password": "pass123"}
|
|
)
|
|
token = login_resp.json()["access_token"]
|
|
|
|
# Try to generate API token
|
|
response = await client.post(
|
|
"/api/v1/auth/token/generate",
|
|
json={"name": "test-token", "role": "researcher"},
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert response.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_api_tokens(client, admin_user):
|
|
"""Test listing API tokens."""
|
|
# Generate two tokens
|
|
await client.post(
|
|
"/api/v1/auth/token/generate",
|
|
json={"name": "token1", "role": "researcher"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
await client.post(
|
|
"/api/v1/auth/token/generate",
|
|
json={"name": "token2", "role": "developer"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
|
|
# List tokens
|
|
response = await client.get(
|
|
"/api/v1/auth/tokens",
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) == 2
|
|
assert data[0]["name"] == "token2" # Most recent first
|
|
assert data[1]["name"] == "token1"
|
|
# Tokens should not include the actual token value
|
|
assert "token" not in data[0]
|
|
assert "token_hash" not in data[0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_api_token(client, admin_user):
|
|
"""Test revoking an API token."""
|
|
# Generate a token
|
|
gen_resp = await client.post(
|
|
"/api/v1/auth/token/generate",
|
|
json={"name": "revoke-me", "role": "viewer"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
researcher_token = gen_resp.json()["token"]
|
|
|
|
# List tokens - should have 1
|
|
list_resp = await client.get(
|
|
"/api/v1/auth/tokens",
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
assert len(list_resp.json()) == 1
|
|
api_token_id = list_resp.json()[0]["id"]
|
|
|
|
# Revoke the token
|
|
response = await client.delete(
|
|
f"/api/v1/auth/tokens/{api_token_id}",
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
assert response.status_code == 204
|
|
|
|
# List tokens - should be empty
|
|
list_resp = await client.get(
|
|
"/api/v1/auth/tokens",
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
assert len(list_resp.json()) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_nonexistent_token(client, admin_user):
|
|
"""Test revoking a non-existent token returns 404."""
|
|
response = await client.delete(
|
|
"/api/v1/auth/tokens/nonexistent-id",
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_token_with_invalid_role(client, admin_user):
|
|
"""Test that invalid role is rejected."""
|
|
response = await client.post(
|
|
"/api/v1/auth/token/generate",
|
|
json={"name": "bad-role", "role": "invalid_role"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_token_auth_flow(client, admin_user):
|
|
"""Test full flow: create project with JWT, then access with API token."""
|
|
# Generate researcher API token
|
|
gen_resp = await client.post(
|
|
"/api/v1/auth/token/generate",
|
|
json={"name": "research-token", "role": "researcher"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
researcher_token = gen_resp.json()["token"]
|
|
|
|
# Create a project
|
|
proj_resp = await client.post(
|
|
"/api/v1/projects",
|
|
json={"name": "Test Project"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
proj_id = proj_resp.json()["id"]
|
|
|
|
# Create a research document
|
|
doc_resp = await client.post(
|
|
f"/api/v1/projects/{proj_id}/documents",
|
|
json={"title": "Research Doc", "content": "Research content", "agent_type": "research"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
assert doc_resp.status_code == 201
|
|
doc_id = doc_resp.json()["id"]
|
|
|
|
# Access document with researcher token - should work (research doc)
|
|
get_resp = await client.get(
|
|
f"/api/v1/documents/{doc_id}",
|
|
headers={"Authorization": f"Bearer {researcher_token}"}
|
|
)
|
|
assert get_resp.status_code == 200
|
|
|
|
# Create a development document
|
|
dev_doc_resp = await client.post(
|
|
f"/api/v1/projects/{proj_id}/documents",
|
|
json={"title": "Dev Doc", "content": "Dev content", "agent_type": "development"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
assert dev_doc_resp.status_code == 201
|
|
dev_doc_id = dev_doc_resp.json()["id"]
|
|
|
|
# Access dev document with researcher token - should fail (read access denied)
|
|
get_resp = await client.get(
|
|
f"/api/v1/documents/{dev_doc_id}",
|
|
headers={"Authorization": f"Bearer {researcher_token}"}
|
|
)
|
|
assert get_resp.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_token_cannot_access_other_user_project(client, admin_user):
|
|
"""Test that API token can only access projects belonging to the token owner."""
|
|
# Create another user
|
|
import uuid
|
|
import bcrypt
|
|
from sqlalchemy import text
|
|
|
|
password_hash = bcrypt.hashpw("user2pass".encode(), bcrypt.gensalt()).decode()
|
|
|
|
async with async_engine.begin() as conn:
|
|
user2_id = str(uuid.uuid4())
|
|
await conn.execute(
|
|
text("""
|
|
INSERT INTO agents (id, username, password_hash, role, is_deleted, created_at, updated_at)
|
|
VALUES (:id, :username, :password_hash, 'agent', 0, datetime('now'), datetime('now'))
|
|
"""),
|
|
{
|
|
"id": user2_id,
|
|
"username": "user2",
|
|
"password_hash": password_hash
|
|
}
|
|
)
|
|
|
|
# Login as user2
|
|
login_resp = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": "user2", "password": "user2pass"}
|
|
)
|
|
user2_token = login_resp.json()["access_token"]
|
|
|
|
# Create project by user2
|
|
proj_resp = await client.post(
|
|
"/api/v1/projects",
|
|
json={"name": "User2 Project"},
|
|
headers={"Authorization": f"Bearer {user2_token}"}
|
|
)
|
|
user2_proj_id = proj_resp.json()["id"]
|
|
|
|
# Create document in user2's project
|
|
doc_resp = await client.post(
|
|
f"/api/v1/projects/{user2_proj_id}/documents",
|
|
json={"title": "User2 Doc", "content": "Content", "agent_type": "general"},
|
|
headers={"Authorization": f"Bearer {user2_token}"}
|
|
)
|
|
user2_doc_id = doc_resp.json()["id"]
|
|
|
|
# Admin creates a researcher token
|
|
gen_resp = await client.post(
|
|
"/api/v1/auth/token/generate",
|
|
json={"name": "research-token", "role": "researcher"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
researcher_token = gen_resp.json()["token"]
|
|
|
|
# Admin creates a project and document
|
|
admin_proj_resp = await client.post(
|
|
"/api/v1/projects",
|
|
json={"name": "Admin Project"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
admin_proj_id = admin_proj_resp.json()["id"]
|
|
|
|
admin_doc_resp = await client.post(
|
|
f"/api/v1/projects/{admin_proj_id}/documents",
|
|
json={"title": "Admin Doc", "content": "Content", "agent_type": "research"},
|
|
headers={"Authorization": f"Bearer {admin_user}"}
|
|
)
|
|
admin_doc_id = admin_doc_resp.json()["id"]
|
|
|
|
# Researcher token should NOT be able to access user2's project/document
|
|
get_resp = await client.get(
|
|
f"/api/v1/documents/{user2_doc_id}",
|
|
headers={"Authorization": f"Bearer {researcher_token}"}
|
|
)
|
|
assert get_resp.status_code == 403
|
|
|
|
# Researcher token SHOULD be able to access admin's research document
|
|
get_resp = await client.get(
|
|
f"/api/v1/documents/{admin_doc_id}",
|
|
headers={"Authorization": f"Bearer {researcher_token}"}
|
|
)
|
|
assert get_resp.status_code == 200
|