Files
instagram-clone/app/auth.py
OpenClaw Agent a3eca3b7da feat: implement Instagram clone SocialPhoto API
- FastAPI backend with SQLite database
- JWT authentication (register, login)
- User profiles with follow/unfollow
- Posts with image upload and likes/dislikes
- Comments with likes
- Global and personalized feed
- Comprehensive pytest test suite (37 tests)

TASK-ID: 758f4029-702
2026-04-16 03:20:48 +00:00

67 lines
2.1 KiB
Python

"""Authentication utilities for SocialPhoto."""
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
# Configuration
SECRET_KEY = "your-secret-key-change-in-production" # TODO: Move to env
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
"""Hash a password using bcrypt."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> dict:
"""Decode and verify a JWT token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user_id(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> int:
"""Extract user ID from JWT token."""
token = credentials.credentials
payload = decode_token(token)
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
return int(user_id)