"""Authentication utilities for SocialPhoto.""" from datetime import datetime, timedelta, timezone from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from passlib.context import CryptContext # Configuration SECRET_KEY = "your-secret-key-change-in-production" # TODO: Move to env ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_HOURS = 24 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") security = HTTPBearer() def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return pwd_context.verify(plain_password, hashed_password) def hash_password(password: str) -> str: """Hash a password using bcrypt.""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def decode_token(token: str) -> dict: """Decode and verify a JWT token.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) async def get_current_user_id( credentials: HTTPAuthorizationCredentials = Depends(security), ) -> int: """Extract user ID from JWT token.""" token = credentials.credentials payload = decode_token(token) user_id: int = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload", ) return int(user_id)