Fix: Make INITIAL_ADMIN_USERNAME and INITIAL_ADMIN_PASSWORD required env vars with validation

This commit is contained in:
Motoko
2026-03-30 17:47:49 +00:00
parent 957f9bee2a
commit 2884ba2e55
3 changed files with 50 additions and 25 deletions

View File

@@ -1,8 +1,11 @@
import logging
import os
from pathlib import Path
from pydantic_settings import BaseSettings, SettingsConfigDict
logger = logging.getLogger(__name__)
def _resolve_db_url(url: str) -> str:
"""Convert relative sqlite path to absolute path for Docker or local dev."""
@@ -28,8 +31,8 @@ class Settings(BaseSettings):
HOST: str = "0.0.0.0"
PORT: int = 8000
LOG_LEVEL: str = "INFO"
INITIAL_ADMIN_USERNAME: str = "admin" # Auto-created admin user
INITIAL_ADMIN_PASSWORD: str = "admin123"
INITIAL_ADMIN_USERNAME: str # Required: admin user to auto-create
INITIAL_ADMIN_PASSWORD: str # Required: password for auto-created admin
@property
def resolved_database_url(self) -> str:
@@ -43,5 +46,17 @@ class Settings(BaseSettings):
settings = Settings()
# Validate required secrets at startup
_missing = []
if not settings.JWT_SECRET_KEY or settings.JWT_SECRET_KEY == "change-me-super-secret-key-min32chars!!":
raise ValueError("JWT_SECRET_KEY must be set in environment variables")
_missing.append("JWT_SECRET_KEY")
if not settings.INITIAL_ADMIN_USERNAME:
_missing.append("INITIAL_ADMIN_USERNAME")
if not settings.INITIAL_ADMIN_PASSWORD:
_missing.append("INITIAL_ADMIN_PASSWORD")
if _missing:
raise ValueError(f"Required environment variables not set: {', '.join(_missing)}")
# Log initial admin credentials (password masked)
logger.info(f"Initial admin username: {settings.INITIAL_ADMIN_USERNAME}")
logger.info(f"Initial admin password: {'*' * len(settings.INITIAL_ADMIN_PASSWORD)}")

View File

@@ -9,33 +9,24 @@ from sqlalchemy.orm import DeclarativeBase, sessionmaker
from app.config import settings
# Async engine for aiosqlite
DATABASE_URL = settings.resolved_database_url
# Sync engine for migrations and initial setup
SYNC_DATABASE_URL = DATABASE_URL.replace("sqlite+aiosqlite:///", "sqlite:///")
class Base(DeclarativeBase):
pass
# Create data directory at module load time (before any engine connection)
DATA_DIR = Path("/app/data")
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Async session factory - URL uses absolute path to /app/data
# Async session factory - uses DATABASE_URL from settings
async_engine = create_async_engine(
f"sqlite+aiosqlite:///{DATA_DIR}/claudia_docs.db",
settings.resolved_database_url,
echo=False,
connect_args={"check_same_thread": False}
)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)
# Sync engine for migrations
# Sync engine for migrations (same database)
_sync_db_path = settings.resolved_database_url.replace("sqlite+aiosqlite:///", "sqlite:///")
sync_engine = create_engine(
f"sqlite:///{DATA_DIR}/claudia_docs.db",
_sync_db_path,
echo=False,
connect_args={"check_same_thread": False}
)
@@ -62,7 +53,9 @@ async def get_db_simple():
async def init_db():
"""Initialize database with all tables, views, FTS5, and triggers."""
# Data directory already created at module load time
# Ensure data directory exists
db_path = settings.resolved_database_url.replace("sqlite+aiosqlite:///", "")
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
async with async_engine.begin() as conn:
# Create all tables via SQL (not ORM) to handle SQLite-specific features
@@ -75,39 +68,56 @@ async def init_db():
async def _create_initial_admin():
"""Create initial admin user from environment variables if it doesn't exist."""
import bcrypt
import logging
if not settings.INITIAL_ADMIN_USERNAME or not settings.INITIAL_ADMIN_PASSWORD:
logger = logging.getLogger(__name__)
username = settings.INITIAL_ADMIN_USERNAME
password = settings.INITIAL_ADMIN_PASSWORD
if not username or not password:
logger.warning(
"_create_initial_admin skipped: INITIAL_ADMIN_USERNAME or "
"INITIAL_ADMIN_PASSWORD not set"
)
return
logger.info(f"_create_initial_admin: creating admin '{username}'")
async with AsyncSessionLocal() as session:
# Check if username already exists (any role)
result = await session.execute(
text("SELECT id, role FROM agents WHERE username = :username"),
{"username": settings.INITIAL_ADMIN_USERNAME}
{"username": username}
)
existing = result.fetchone()
if existing:
return # Username already taken, skip creation
logger.info(
f"_create_initial_admin: username '{username}' already exists "
f"(role={existing.role}), skipping creation"
)
return
# Create admin user with bcrypt hash
password_hash = bcrypt.hashpw(
settings.INITIAL_ADMIN_PASSWORD.encode("utf-8"),
password.encode("utf-8"),
bcrypt.gensalt()
).decode("utf-8")
await session.execute(
text("""
INSERT INTO agents (id, username, password_hash, role, created_at, updated_at)
VALUES (:id, :username, :password_hash, 'admin', datetime('now'), datetime('now'))
INSERT INTO agents (id, username, password_hash, role, is_deleted, created_at, updated_at)
VALUES (:id, :username, :password_hash, 'admin', 0, datetime('now'), datetime('now'))
"""),
{
"id": str(uuid.uuid4()),
"username": settings.INITIAL_ADMIN_USERNAME,
"username": username,
"password_hash": password_hash
}
)
await session.commit()
logger.info(f"_create_initial_admin: admin '{username}' created successfully")
def _create_schema(sync_conn):