4 Commits

Author SHA1 Message Date
OpenClaw Agent
dc17802d74 TASK-001: Setup FastAPI project structure
- Fixed main.py to include all route routers (posts, users, comments, feed)
- Renamed app/models.py to app/schemas.py and split into proper schema modules
- Fixed schema imports in routes
- Updated app/models/__init__.py to properly export SQLAlchemy models
- Fixed database imports in route files
- App imports and runs correctly
2026-04-16 13:30:35 +00:00
OpenClaw Agent
135d4111bb 2a9ebe24: Implement auth endpoints (register/login/JWT) with SQLAlchemy 2026-04-16 12:51:02 +00:00
OpenClaw Agent
ef5b32143a TASK-001: Setup FastAPI project structure 2026-04-16 04:40:40 +00:00
OpenClaw Agent
a3eca3b7da feat: implement Instagram clone SocialPhoto API
- FastAPI backend with SQLite database
- JWT authentication (register, login)
- User profiles with follow/unfollow
- Posts with image upload and likes/dislikes
- Comments with likes
- Global and personalized feed
- Comprehensive pytest test suite (37 tests)

TASK-ID: 758f4029-702
2026-04-16 03:20:48 +00:00
124 changed files with 3286 additions and 117 deletions

129
.gitignore vendored
View File

@@ -1,4 +1,3 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
@@ -21,15 +20,11 @@ parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
@@ -47,87 +42,12 @@ htmlcov/
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
*.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
@@ -137,40 +57,17 @@ ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Database
*.db
*.sqlite
*.sqlite3
# Alembic
alembic/versions/*.pyc

View File

@@ -1,2 +1,62 @@
# instagram-clone
# Instagram Clone API
A FastAPI-based Instagram clone API with SQLAlchemy 2.0 and Alembic migrations.
## Tech Stack
- **Framework**: FastAPI 0.109+
- **ORM**: SQLAlchemy 2.0
- **Database**: SQLite (dev), PostgreSQL (prod)
- **Migrations**: Alembic
- **Testing**: pytest + httpx
## Project Structure
```
instagram-clone/
├── app/
│ ├── api/
│ │ ├── endpoints/ # API route handlers
│ │ └── dependencies/ # FastAPI dependencies
│ ├── core/ # Configuration and settings
│ ├── db/ # Database connection and models
│ ├── models/ # SQLAlchemy models
│ ├── schemas/ # Pydantic schemas
│ ├── services/ # Business logic
│ └── utils/ # Utility functions
├── alembic/ # Database migrations
├── tests/
│ ├── unit/ # Unit tests
│ └── integration/ # Integration tests
├── pyproject.toml
└── alembic.ini
```
## Setup
```bash
# Install dependencies
pip install -e ".[dev]"
# Run database migrations
alembic upgrade head
# Run the application
uvicorn app.main:app --reload
```
## Testing
```bash
# Run all tests
pytest
# Run with coverage
pytest --cov=app tests/
```
## API Documentation
Once running, visit:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc

337
SPEC.md Normal file
View File

@@ -0,0 +1,337 @@
# SPEC.md — Instagram Clone (SocialPhoto)
> Sistema de red social para compartir imágenes con likes, comentarios y seguimiento de usuarios
## 1. Concepto & Visión
**SocialPhoto** es un clon minimalista de Instagram que prioriza la simplicidad y velocidad. Permite a usuarios registrarse, subir imágenes, interactuar con publicaciones (likes/no me gusta), comentar y seguir a otros usuarios. El enfoque es en ser rápido, portable (todo en un archivo SQLite) y fácil de desplegar.
**Principios:**
- Simple de instalar y operar
- Base de datos portable (SQLite)
- API RESTful clara y documentada
- Ideal para testing de agentes de IA
---
## 2. Stack Tecnológico
| Componente | Tecnología |
|------------|------------|
| **Backend** | Python 3.11+ con FastAPI |
| **Base de datos** | SQLite3 |
| **Auth** | JWT (python-jose) |
| **Upload imágenes** | Almacenamiento local en `/uploads` |
| **Password hashing** | bcrypt |
| **Testing** | pytest + pytest-asyncio |
| **CLI (testing)** | curl o httpie |
---
## 3. Modelo de Datos (SQLite)
### Tabla: `users`
```sql
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
avatar_url TEXT DEFAULT '/static/default-avatar.png',
bio TEXT DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```
### Tabla: `posts`
```sql
CREATE TABLE posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id),
image_path TEXT NOT NULL,
caption TEXT DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```
### Tabla: `comments`
```sql
CREATE TABLE comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL REFERENCES posts(id),
user_id INTEGER NOT NULL REFERENCES users(id),
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```
### Tabla: `likes`
```sql
CREATE TABLE likes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL REFERENCES posts(id),
user_id INTEGER NOT NULL REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(post_id, user_id)
);
```
### Tabla: `dislikes`
```sql
CREATE TABLE dislikes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL REFERENCES posts(id),
user_id INTEGER NOT NULL REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(post_id, user_id)
);
```
### Tabla: `follows`
```sql
CREATE TABLE follows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
follower_id INTEGER NOT NULL REFERENCES users(id),
following_id INTEGER NOT NULL REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(follower_id, following_id)
);
```
### Tabla: `comment_likes`
```sql
CREATE TABLE comment_likes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
comment_id INTEGER NOT NULL REFERENCES comments(id),
user_id INTEGER NOT NULL REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(comment_id, user_id)
);
```
---
## 4. API Endpoints
### Autenticación
| Método | Endpoint | Descripción |
|--------|----------|-------------|
| POST | `/auth/register` | Registrar usuario |
| POST | `/auth/login` | Login, retorna JWT |
**Register Request:**
```json
{
"username": "daniel",
"email": "daniel@example.com",
"password": "mipass123"
}
```
**Login Response:**
```json
{
"access_token": "eyJ...",
"token_type": "bearer"
}
```
---
### Usuarios
| Método | Endpoint | Descripción |
|--------|----------|-------------|
| GET | `/users/{id}` | Obtener perfil de usuario |
| GET | `/users/{id}/posts` | Posts de un usuario |
| GET | `/users/{id}/stats` | Stats (posts, followers, following) |
| POST | `/users/{id}/follow` | Seguir usuario |
| DELETE | `/users/{id}/follow` | Dejar de seguir |
---
### Posts
| Método | Endpoint | Descripción |
|--------|----------|-------------|
| POST | `/posts` | Crear post (con imagen) |
| GET | `/posts` | Feed global |
| GET | `/posts/{id}` | Detalle de post |
| DELETE | `/posts/{id}` | Eliminar post |
| POST | `/posts/{id}/like` | Dar like |
| DELETE | `/posts/{id}/like` | Quitar like |
| POST | `/posts/{id}/dislike` | Dar no me gusta |
| DELETE | `/posts/{id}/dislike` | Quitar no me gusta |
**Crear Post:**
```
POST /posts
Content-Type: multipart/form-data
caption: "Mi primera foto"
image: <archivo>
```
**Response:**
```json
{
"id": 1,
"user_id": 1,
"username": "daniel",
"image_url": "/uploads/abc123.jpg",
"caption": "Mi primera foto",
"likes_count": 0,
"dislikes_count": 0,
"comments_count": 0,
"created_at": "2026-04-15T22:00:00Z"
}
```
---
### Comentarios
| Método | Endpoint | Descripción |
|--------|----------|-------------|
| GET | `/posts/{post_id}/comments` | Listar comentarios |
| POST | `/posts/{post_id}/comments` | Agregar comentario |
| DELETE | `/comments/{id}` | Eliminar comentario |
| POST | `/comments/{id}/like` | Dar like a comentario |
---
### Feed
| Método | Endpoint | Descripción |
|--------|----------|-------------|
| GET | `/feed` | Feed de usuarios que sigues |
| GET | `/feed/global` | Feed global (todos) |
**Query params:** `?limit=20&offset=0`
---
## 5. Detalles de Implementación
### Upload de Imágenes
- Directorio: `./uploads/`
- Nombres: `{uuid}_{original_filename}`
- Formatos: jpg, jpeg, png, gif, webp
- Tamaño máximo: 10MB
### Auth JWT
- Algorithm: HS256
- Expiración: 24 horas
- Header: `Authorization: Bearer <token>`
### Likes/Dislikes
- Un usuario solo puede dar UN like O un dislike por post (no ambos)
- Dar dislike si ya hay like lo reemplaza
- Dar like si ya hay dislike lo reemplaza
### Follow
- No puedes seguirte a ti mismo
- Mensaje de error si intentas seguir alguien que ya sigues
### Timestamps
- Todos en UTC
- Formato ISO 8601 en responses
---
## 6. Estructura del Proyecto
```
socialphoto/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── database.py # SQLite connection + setup
│ ├── models.py # Pydantic models
│ ├── auth.py # JWT auth helpers
│ ├── routes/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── users.py
│ │ ├── posts.py
│ │ ├── comments.py
│ │ └── feed.py
│ └── services/
│ └── __init__.py
├── uploads/ # Imágenes
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_auth.py
│ ├── test_posts.py
│ └── test_social.py
├── requirements.txt
├── SPEC.md
└── README.md
```
---
## 7. Criterios de Aceptación
### Auth
- [x] Usuario puede registrarse con username, email, password
- [x] Usuario puede hacer login y recibe JWT
- [x] Rutas protegidas requieren token válido
- [x] No permite registro con username/email duplicado
### Posts
- [x] Usuario puede crear post con imagen y caption
- [x] Feed global muestra todos los posts ordenados por fecha
- [x] Feed personalizado solo muestra posts de usuarios seguidos
- [x] Usuario puede eliminar SUS propios posts
### Likes / Dislikes
- [x] Usuario puede dar like a un post
- [x] Usuario puede dar dislike a un post
- [x] Like y dislike son mutuamente excluyentes
- [x] Contador de likes/dislikes se actualiza en tiempo real
### Comentarios
- [x] Usuario puede comentar en un post
- [x] Usuario puede eliminar SUS comentarios
- [x] Lista de comentarios incluye username y timestamp
### Follow
- [x] Usuario puede seguir a otro usuario
- [x] Usuario puede dejar de seguir
- [x] No puede followearse a sí mismo
- [x] Stats muestran followers y following count
---
## 8. Roadmap de Tasks (para Task Manager)
1. **Architecture**: Crear SPEC.md y estructura del proyecto ✓
2. **Backlog**:
- [x] Setup proyecto FastAPI con SQLite
- [x] Implementar auth (register, login, JWT)
- [x] CRUD posts con upload de imágenes
- [x] Sistema de likes/dislikes
- [x] Sistema de comentarios
- [x] Sistema de follow/unfollow
- [x] Feed global y personalizado
- [x] Tests unitarios
- [ ] README y documentación
---
## 9. Notas
- Usar SQLite para simplicidad y portabilidad
- No requiere Docker ni servicios externos
- Ideal para testing de SDMAS Orchestrator
- En producción real se migraría a PostgreSQL

42
alembic.ini Normal file
View File

@@ -0,0 +1,42 @@
# Alembic configuration file
[alembic]
script_location = alembic
prepend_sys_path = .
sqlalchemy.url = sqlite:///./instagram_clone.db
[post_write_hooks]
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

61
alembic/env.py Normal file
View File

@@ -0,0 +1,61 @@
"""Alembic environment configuration."""
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import sys
from pathlib import Path
# Add app to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.db.database import Base
from app.core.config import settings
# this is the Alembic Config object
config = context.config
# Set sqlalchemy.url from settings
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
# Interpret the config file for Python logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Model metadata
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

25
alembic/script.py.mako Normal file
View File

@@ -0,0 +1,25 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

1
app/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Application package."""

1
app/api/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""API package."""

View File

@@ -0,0 +1 @@
"""Dependencies package."""

View File

@@ -0,0 +1 @@
"""Endpoints package."""

66
app/auth.py Normal file
View File

@@ -0,0 +1,66 @@
"""Authentication utilities for SocialPhoto."""
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
# Configuration
SECRET_KEY = "your-secret-key-change-in-production" # TODO: Move to env
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
"""Hash a password using bcrypt."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> dict:
"""Decode and verify a JWT token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user_id(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> int:
"""Extract user ID from JWT token."""
token = credentials.credentials
payload = decode_token(token)
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
return int(user_id)

1
app/core/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Core module package."""

27
app/core/config.py Normal file
View File

@@ -0,0 +1,27 @@
"""Application configuration settings."""
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Optional
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# Application
APP_NAME: str = "Instagram Clone"
APP_VERSION: str = "0.1.0"
DEBUG: bool = False
# Database
DATABASE_URL: str = "sqlite:///./instagram_clone.db"
# Security
SECRET_KEY: Optional[str] = None
model_config = SettingsConfigDict(
env_file=".env",
case_sensitive=True,
)
settings = Settings()

120
app/database.py Normal file
View File

@@ -0,0 +1,120 @@
"""Database module for SocialPhoto."""
import sqlite3
from pathlib import Path
from typing import Optional
DATABASE_PATH = Path(__file__).parent.parent / "socialphoto.db"
def get_db_connection() -> sqlite3.Connection:
"""Get a database connection with row factory."""
conn = sqlite3.connect(DATABASE_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def get_db() -> sqlite3.Connection:
"""Dependency for FastAPI routes."""
conn = get_db_connection()
try:
yield conn
finally:
conn.close()
def init_db() -> None:
"""Initialize the database with all tables."""
conn = get_db_connection()
cursor = conn.cursor()
# Users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
avatar_url TEXT DEFAULT '/static/default-avatar.png',
bio TEXT DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Posts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id),
image_path TEXT NOT NULL,
caption TEXT DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Comments table
cursor.execute("""
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL REFERENCES posts(id),
user_id INTEGER NOT NULL REFERENCES users(id),
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Likes table
cursor.execute("""
CREATE TABLE IF NOT EXISTS likes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL REFERENCES posts(id),
user_id INTEGER NOT NULL REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(post_id, user_id)
)
""")
# Dislikes table
cursor.execute("""
CREATE TABLE IF NOT EXISTS dislikes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL REFERENCES posts(id),
user_id INTEGER NOT NULL REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(post_id, user_id)
)
""")
# Follows table
cursor.execute("""
CREATE TABLE IF NOT EXISTS follows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
follower_id INTEGER NOT NULL REFERENCES users(id),
following_id INTEGER NOT NULL REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(follower_id, following_id)
)
""")
# Comment likes table
cursor.execute("""
CREATE TABLE IF NOT EXISTS comment_likes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
comment_id INTEGER NOT NULL REFERENCES comments(id),
user_id INTEGER NOT NULL REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(comment_id, user_id)
)
""")
conn.commit()
conn.close()
def row_to_dict(row: sqlite3.Row) -> dict:
"""Convert a sqlite Row to a dictionary."""
return dict(row)
if __name__ == "__main__":
init_db()
print("Database initialized successfully.")

1
app/db/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Database module package."""

28
app/db/database.py Normal file
View File

@@ -0,0 +1,28 @@
"""Database connection and session management."""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from app.core.config import settings
# Create engine
engine = create_engine(
settings.DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {},
)
# Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
"""SQLAlchemy declarative base class."""
pass
def get_db():
"""Dependency to get database session."""
db = SessionLocal()
try:
yield db
finally:
db.close()

55
app/deps.py Normal file
View File

@@ -0,0 +1,55 @@
"""FastAPI dependencies for authentication and database access."""
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from app.db.database import get_db
from app.models.user import User
from app.services.auth_service import AuthService, decode_token
security = HTTPBearer()
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
db: Annotated[Session, Depends(get_db)],
) -> User:
"""Get the current authenticated user from JWT token.
Args:
credentials: The HTTP Bearer credentials containing the JWT token.
db: Database session.
Returns:
The authenticated User object.
Raises:
HTTPException: If token is invalid or user not found.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(credentials.credentials)
user_id_str: str = payload.get("sub")
if user_id_str is None:
raise credentials_exception
user_id = int(user_id_str)
except (JWTError, ValueError):
raise credentials_exception
user = AuthService.get_user_by_id(db, user_id)
if user is None:
raise credentials_exception
return user
# Type alias for dependency injection
CurrentUser = Annotated[User, Depends(get_current_user)]

66
app/main.py Normal file
View File

@@ -0,0 +1,66 @@
"""FastAPI application entry point."""
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from app.core.config import settings
from app.db.database import engine, Base
from app.routers.auth import router as auth_router # noqa: F401
from app.routes.posts import router as posts_router
from app.routes.users import router as users_router
from app.routes.comments import router as comments_router
from app.routes.feed import router as feed_router
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager for startup/shutdown events."""
# Startup: Create database tables
Base.metadata.create_all(bind=engine)
yield
# Shutdown: cleanup if needed
# Create FastAPI app
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
debug=settings.DEBUG,
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(auth_router)
app.include_router(posts_router)
app.include_router(users_router)
app.include_router(comments_router)
app.include_router(feed_router)
# Mount static files for uploads
UPLOAD_DIR = Path(__file__).parent.parent / "uploads"
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
app.mount("/uploads", StaticFiles(directory=str(UPLOAD_DIR)), name="uploads")
@app.get("/")
async def root():
"""Root endpoint."""
return {"message": f"Welcome to {settings.APP_NAME}", "version": settings.APP_VERSION}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}

6
app/models/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
"""Models package."""
# SQLAlchemy models
from app.models.user import User
__all__ = ["User"]

32
app/models/user.py Normal file
View File

@@ -0,0 +1,32 @@
"""User SQLAlchemy model."""
from datetime import datetime
from typing import Optional
from sqlalchemy import DateTime, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from app.db.database import Base
class User(Base):
"""User model for authentication and profile information."""
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
avatar_url: Mapped[Optional[str]] = mapped_column(
String(500), default="/static/default-avatar.png"
)
bio: Mapped[Optional[str]] = mapped_column(Text, default="")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}')>"

4
app/routers/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
"""Routers package."""
from app.routers.auth import router as auth_router
__all__ = ["auth_router"]

81
app/routers/auth.py Normal file
View File

@@ -0,0 +1,81 @@
"""Authentication routes for SocialPhoto API."""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.database import get_db
from app.schemas.auth import Token, UserLogin, UserRegister
from app.services.auth_service import AuthService, create_access_token
router = APIRouter(prefix="/auth", tags=["Authentication"])
@router.post("/register", response_model=Token, status_code=status.HTTP_201_CREATED)
async def register(
user_data: UserRegister,
db: Annotated[Session, Depends(get_db)],
) -> Token:
"""Register a new user.
Args:
user_data: User registration data (username, email, password).
db: Database session.
Returns:
Token object with access token.
Raises:
HTTPException: If username or email already exists.
"""
# Check if username exists
existing_user = AuthService.get_user_by_username(db, user_data.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered",
)
# Check if email exists
existing_email = AuthService.get_user_by_email(db, user_data.email)
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
# Create user
user = AuthService.create_user(db, user_data)
# Create access token
access_token = create_access_token(data={"sub": str(user.id)})
return Token(access_token=access_token)
@router.post("/login", response_model=Token)
async def login(
user_data: UserLogin,
db: Annotated[Session, Depends(get_db)],
) -> Token:
"""Login and get access token.
Args:
user_data: User login data (username, password).
db: Database session.
Returns:
Token object with access token.
Raises:
HTTPException: If credentials are invalid.
"""
user = AuthService.authenticate_user(db, user_data.username, user_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token = create_access_token(data={"sub": str(user.id)})
return Token(access_token=access_token)

1
app/routes/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Routes package for SocialPhoto."""

81
app/routes/auth.py Normal file
View File

@@ -0,0 +1,81 @@
"""Authentication routes for SocialPhoto."""
import sqlite3
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.auth import create_access_token, hash_password, verify_password
from app.db.database import get_db, row_to_dict
from app.schemas import Token, UserLogin, UserRegister
router = APIRouter(prefix="/auth", tags=["Authentication"])
security = HTTPBearer()
@router.post("/register", response_model=Token, status_code=status.HTTP_201_CREATED)
async def register(
user_data: UserRegister,
conn: sqlite3.Connection = Depends(get_db),
) -> Token:
"""Register a new user."""
cursor = conn.cursor()
# Check if username exists
cursor.execute("SELECT id FROM users WHERE username = ?", (user_data.username,))
if cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered",
)
# Check if email exists
cursor.execute("SELECT id FROM users WHERE email = ?", (user_data.email,))
if cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
# Hash password and create user
password_hash = hash_password(user_data.password)
cursor.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(user_data.username, user_data.email, password_hash),
)
conn.commit()
# Get the new user's ID
cursor.execute("SELECT id FROM users WHERE username = ?", (user_data.username,))
user = row_to_dict(cursor.fetchone())
user_id = user["id"]
# Create access token
access_token = create_access_token(data={"sub": str(user_id)})
return Token(access_token=access_token)
@router.post("/login", response_model=Token)
async def login(
user_data: UserLogin,
conn: sqlite3.Connection = Depends(get_db),
) -> Token:
"""Login and get access token."""
cursor = conn.cursor()
# Find user by username
cursor.execute(
"SELECT id, password_hash FROM users WHERE username = ?",
(user_data.username,),
)
row = cursor.fetchone()
if not row or not verify_password(user_data.password, row["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
user = row_to_dict(row)
access_token = create_access_token(data={"sub": user["id"]})
return Token(access_token=access_token)

79
app/routes/comments.py Normal file
View File

@@ -0,0 +1,79 @@
"""Comment routes for SocialPhoto - comment-specific operations."""
import sqlite3
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.auth import get_current_user_id
from app.database import get_db, row_to_dict
router = APIRouter(prefix="/comments", tags=["Comments"])
security = HTTPBearer()
@router.delete("/{comment_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_comment(
comment_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> None:
"""Delete a comment (only by owner)."""
user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
# Check comment exists and belongs to user
cursor.execute(
"SELECT user_id FROM comments WHERE id = ?",
(comment_id,),
)
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Comment not found",
)
comment = row_to_dict(row)
if comment["user_id"] != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only delete your own comments",
)
# Delete comment
cursor.execute("DELETE FROM comments WHERE id = ?", (comment_id,))
conn.commit()
@router.post("/{comment_id}/like", status_code=status.HTTP_201_CREATED)
async def like_comment(
comment_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> dict:
"""Like a comment."""
user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
# Check comment exists
cursor.execute("SELECT id FROM comments WHERE id = ?", (comment_id,))
if not cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Comment not found",
)
# Add like
try:
cursor.execute(
"INSERT INTO comment_likes (comment_id, user_id) VALUES (?, ?)",
(comment_id, user_id),
)
conn.commit()
except sqlite3.IntegrityError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You already liked this comment",
)
return {"message": "Comment liked"}

124
app/routes/feed.py Normal file
View File

@@ -0,0 +1,124 @@
"""Feed routes for SocialPhoto."""
import sqlite3
from typing import List
from fastapi import APIRouter, Depends, Query
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.auth import get_current_user_id
from app.database import get_db, row_to_dict
from app.schemas import FeedResponse, PostResponse
router = APIRouter(prefix="/feed", tags=["Feed"])
security = HTTPBearer()
@router.get("", response_model=FeedResponse)
async def get_followed_feed(
limit: int = Query(default=20, ge=1, le=100),
offset: int = Query(default=0, ge=0),
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> FeedResponse:
"""Get feed of posts from users you follow."""
user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
# Get posts from followed users
cursor.execute(
"""
SELECT
p.id, p.user_id, u.username, p.image_path, p.caption, p.created_at,
(SELECT COUNT(*) FROM likes WHERE post_id = p.id) as likes_count,
(SELECT COUNT(*) FROM dislikes WHERE post_id = p.id) as dislikes_count,
(SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comments_count
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE p.user_id IN (
SELECT following_id FROM follows WHERE follower_id = ?
)
ORDER BY p.created_at DESC
LIMIT ? OFFSET ?
""",
(user_id, limit, offset),
)
posts = []
for row in cursor.fetchall():
post = row_to_dict(row)
posts.append(
PostResponse(
id=post["id"],
user_id=post["user_id"],
username=post["username"],
image_url=f"/uploads/{post['image_path'].split('/')[-1]}",
caption=post["caption"],
likes_count=post["likes_count"],
dislikes_count=post["dislikes_count"],
comments_count=post["comments_count"],
created_at=post["created_at"],
)
)
# Get total count
cursor.execute(
"""
SELECT COUNT(*) as total
FROM posts p
WHERE p.user_id IN (
SELECT following_id FROM follows WHERE follower_id = ?
)
""",
(user_id,),
)
total = cursor.fetchone()["total"]
return FeedResponse(posts=posts, total=total, limit=limit, offset=offset)
@router.get("/global", response_model=FeedResponse)
async def get_global_feed(
limit: int = Query(default=20, ge=1, le=100),
offset: int = Query(default=0, ge=0),
conn: sqlite3.Connection = Depends(get_db),
) -> FeedResponse:
"""Get global feed of all posts."""
cursor = conn.cursor()
cursor.execute(
"""
SELECT
p.id, p.user_id, u.username, p.image_path, p.caption, p.created_at,
(SELECT COUNT(*) FROM likes WHERE post_id = p.id) as likes_count,
(SELECT COUNT(*) FROM dislikes WHERE post_id = p.id) as dislikes_count,
(SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comments_count
FROM posts p
JOIN users u ON p.user_id = u.id
ORDER BY p.created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset),
)
posts = []
for row in cursor.fetchall():
post = row_to_dict(row)
posts.append(
PostResponse(
id=post["id"],
user_id=post["user_id"],
username=post["username"],
image_url=f"/uploads/{post['image_path'].split('/')[-1]}",
caption=post["caption"],
likes_count=post["likes_count"],
dislikes_count=post["dislikes_count"],
comments_count=post["comments_count"],
created_at=post["created_at"],
)
)
# Get total count
cursor.execute("SELECT COUNT(*) as total FROM posts")
total = cursor.fetchone()["total"]
return FeedResponse(posts=posts, total=total, limit=limit, offset=offset)

400
app/routes/posts.py Normal file
View File

@@ -0,0 +1,400 @@
"""Post routes for SocialPhoto."""
import os
import sqlite3
import uuid
from pathlib import Path
from typing import List, Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from app.auth import get_current_user_id
from app.database import get_db, row_to_dict
from app.schemas import CommentCreate, CommentResponse, PostResponse
router = APIRouter(prefix="/posts", tags=["Posts"])
security = HTTPBearer()
# Configuration
UPLOAD_DIR = Path(__file__).parent.parent.parent / "uploads"
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
# Ensure upload directory exists
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
def _get_post_with_counts(conn: sqlite3.Connection, post_id: int) -> Optional[dict]:
"""Get post data with like/dislike/comment counts."""
cursor = conn.cursor()
cursor.execute(
"""
SELECT
p.id, p.user_id, u.username, p.image_path, p.caption, p.created_at,
(SELECT COUNT(*) FROM likes WHERE post_id = p.id) as likes_count,
(SELECT COUNT(*) FROM dislikes WHERE post_id = p.id) as dislikes_count,
(SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comments_count
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE p.id = ?
""",
(post_id,),
)
row = cursor.fetchone()
if not row:
return None
post = row_to_dict(row)
post["image_url"] = f"/uploads/{post['image_path'].split('/')[-1]}"
return post
@router.post("", response_model=PostResponse, status_code=status.HTTP_201_CREATED)
async def create_post(
caption: str = Form(""),
image: UploadFile = File(...),
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> PostResponse:
"""Create a new post with image."""
user_id = await get_current_user_id(credentials)
# Validate file type
file_ext = Path(image.filename).suffix.lower()
if file_ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File type not allowed. Allowed: {', '.join(ALLOWED_EXTENSIONS)}",
)
# Generate unique filename
unique_filename = f"{uuid.uuid4()}{file_ext}"
file_path = UPLOAD_DIR / unique_filename
# Save file
contents = await image.read()
if len(contents) > MAX_FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="File too large. Maximum size is 10MB",
)
with open(file_path, "wb") as f:
f.write(contents)
# Insert into database
cursor = conn.cursor()
cursor.execute(
"INSERT INTO posts (user_id, image_path, caption) VALUES (?, ?, ?)",
(user_id, str(file_path), caption),
)
conn.commit()
post_id = cursor.lastrowid
# Get the created post
post = _get_post_with_counts(conn, post_id)
return PostResponse(**post)
@router.get("", response_model=List[PostResponse])
async def get_posts(
limit: int = 20,
offset: int = 0,
conn: sqlite3.Connection = Depends(get_db),
) -> List[PostResponse]:
"""Get global feed of posts."""
cursor = conn.cursor()
cursor.execute(
"""
SELECT
p.id, p.user_id, u.username, p.image_path, p.caption, p.created_at,
(SELECT COUNT(*) FROM likes WHERE post_id = p.id) as likes_count,
(SELECT COUNT(*) FROM dislikes WHERE post_id = p.id) as dislikes_count,
(SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comments_count
FROM posts p
JOIN users u ON p.user_id = u.id
ORDER BY p.created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset),
)
posts = []
for row in cursor.fetchall():
post = row_to_dict(row)
posts.append(
PostResponse(
id=post["id"],
user_id=post["user_id"],
username=post["username"],
image_url=f"/uploads/{post['image_path'].split('/')[-1]}",
caption=post["caption"],
likes_count=post["likes_count"],
dislikes_count=post["dislikes_count"],
comments_count=post["comments_count"],
created_at=post["created_at"],
)
)
return posts
@router.get("/{post_id}", response_model=PostResponse)
async def get_post(
post_id: int,
conn: sqlite3.Connection = Depends(get_db),
) -> PostResponse:
"""Get a specific post."""
post = _get_post_with_counts(conn, post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Post not found",
)
return PostResponse(**post)
@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
post_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> None:
"""Delete a post (only by owner)."""
user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
# Check post exists and belongs to user
cursor.execute("SELECT user_id, image_path FROM posts WHERE id = ?", (post_id,))
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Post not found",
)
post = row_to_dict(row)
if post["user_id"] != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only delete your own posts",
)
# Delete image file
image_path = Path(post["image_path"])
if image_path.exists():
image_path.unlink()
# Delete post (cascade deletes comments, likes, dislikes)
cursor.execute("DELETE FROM posts WHERE id = ?", (post_id,))
conn.commit()
@router.post("/{post_id}/like", status_code=status.HTTP_201_CREATED)
async def like_post(
post_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> dict:
"""Like a post."""
user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
# Check post exists
cursor.execute("SELECT id FROM posts WHERE id = ?", (post_id,))
if not cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Post not found",
)
# Remove any existing dislike first
cursor.execute(
"DELETE FROM dislikes WHERE post_id = ? AND user_id = ?",
(post_id, user_id),
)
# Add like
try:
cursor.execute(
"INSERT INTO likes (post_id, user_id) VALUES (?, ?)",
(post_id, user_id),
)
conn.commit()
except sqlite3.IntegrityError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You already liked this post",
)
return {"message": "Post liked"}
@router.delete("/{post_id}/like", status_code=status.HTTP_200_OK)
async def unlike_post(
post_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> dict:
"""Remove like from a post."""
user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
cursor.execute(
"DELETE FROM likes WHERE post_id = ? AND user_id = ?",
(post_id, user_id),
)
if cursor.rowcount == 0:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="You haven't liked this post",
)
conn.commit()
return {"message": "Like removed"}
@router.post("/{post_id}/dislike", status_code=status.HTTP_201_CREATED)
async def dislike_post(
post_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> dict:
"""Dislike a post."""
user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
# Check post exists
cursor.execute("SELECT id FROM posts WHERE id = ?", (post_id,))
if not cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Post not found",
)
# Remove any existing like first
cursor.execute(
"DELETE FROM likes WHERE post_id = ? AND user_id = ?",
(post_id, user_id),
)
# Add dislike
try:
cursor.execute(
"INSERT INTO dislikes (post_id, user_id) VALUES (?, ?)",
(post_id, user_id),
)
conn.commit()
except sqlite3.IntegrityError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You already disliked this post",
)
return {"message": "Post disliked"}
@router.delete("/{post_id}/dislike", status_code=status.HTTP_200_OK)
async def undislike_post(
post_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> dict:
"""Remove dislike from a post."""
user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
cursor.execute(
"DELETE FROM dislikes WHERE post_id = ? AND user_id = ?",
(post_id, user_id),
)
if cursor.rowcount == 0:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="You haven't disliked this post",
)
conn.commit()
return {"message": "Dislike removed"}
@router.get("/{post_id}/comments", response_model=list[CommentResponse])
async def get_post_comments(
post_id: int,
conn: sqlite3.Connection = Depends(get_db),
) -> list[CommentResponse]:
"""Get all comments for a post."""
cursor = conn.cursor()
# Check post exists
cursor.execute("SELECT id FROM posts WHERE id = ?", (post_id,))
if not cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Post not found",
)
cursor.execute(
"""
SELECT
c.id, c.post_id, c.user_id, u.username, c.content, c.created_at,
(SELECT COUNT(*) FROM comment_likes WHERE comment_id = c.id) as likes_count
FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.post_id = ?
ORDER BY c.created_at ASC
""",
(post_id,),
)
comments = []
for row in cursor.fetchall():
comment = row_to_dict(row)
comments.append(CommentResponse(**comment))
return comments
@router.post("/{post_id}/comments", response_model=CommentResponse, status_code=status.HTTP_201_CREATED)
async def create_comment(
post_id: int,
comment_data: CommentCreate,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> CommentResponse:
"""Create a comment on a post."""
user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
# Check post exists
cursor.execute("SELECT id FROM posts WHERE id = ?", (post_id,))
if not cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Post not found",
)
# Create comment
cursor.execute(
"INSERT INTO comments (post_id, user_id, content) VALUES (?, ?, ?)",
(post_id, user_id, comment_data.content),
)
conn.commit()
comment_id = cursor.lastrowid
# Get the created comment with user info
cursor.execute(
"""
SELECT
c.id, c.post_id, c.user_id, u.username, c.content, c.created_at,
(SELECT COUNT(*) FROM comment_likes WHERE comment_id = c.id) as likes_count
FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.id = ?
""",
(comment_id,),
)
row = cursor.fetchone()
comment = row_to_dict(row)
return CommentResponse(**comment)

205
app/routes/users.py Normal file
View File

@@ -0,0 +1,205 @@
"""User routes for SocialPhoto."""
import sqlite3
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.auth import get_current_user_id
from app.database import get_db, row_to_dict
from app.schemas import PostResponse, UserResponse, UserStats
router = APIRouter(prefix="/users", tags=["Users"])
security = HTTPBearer()
def _get_user_with_stats(conn: sqlite3.Connection, user_id: int) -> dict:
"""Get user data with stats."""
cursor = conn.cursor()
# Get user
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
user = row_to_dict(row)
# Get posts count
cursor.execute("SELECT COUNT(*) as count FROM posts WHERE user_id = ?", (user_id,))
posts_count = cursor.fetchone()["count"]
# Get followers count
cursor.execute(
"SELECT COUNT(*) as count FROM follows WHERE following_id = ?",
(user_id,),
)
followers_count = cursor.fetchone()["count"]
# Get following count
cursor.execute(
"SELECT COUNT(*) as count FROM follows WHERE follower_id = ?",
(user_id,),
)
following_count = cursor.fetchone()["count"]
user["posts_count"] = posts_count
user["followers_count"] = followers_count
user["following_count"] = following_count
return user
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
conn: sqlite3.Connection = Depends(get_db),
) -> UserResponse:
"""Get user profile."""
user = _get_user_with_stats(conn, user_id)
return UserResponse(**user)
@router.get("/{user_id}/posts", response_model=List[PostResponse])
async def get_user_posts(
user_id: int,
conn: sqlite3.Connection = Depends(get_db),
) -> List[PostResponse]:
"""Get all posts by a user."""
cursor = conn.cursor()
# Check user exists
cursor.execute("SELECT id FROM users WHERE id = ?", (user_id,))
if not cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Get posts with counts
cursor.execute(
"""
SELECT
p.id, p.user_id, u.username, p.image_path, p.caption, p.created_at,
(SELECT COUNT(*) FROM likes WHERE post_id = p.id) as likes_count,
(SELECT COUNT(*) FROM dislikes WHERE post_id = p.id) as dislikes_count,
(SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comments_count
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE p.user_id = ?
ORDER BY p.created_at DESC
""",
(user_id,),
)
posts = []
for row in cursor.fetchall():
post = row_to_dict(row)
posts.append(
PostResponse(
id=post["id"],
user_id=post["user_id"],
username=post["username"],
image_url=f"/uploads/{post['image_path'].split('/')[-1]}",
caption=post["caption"],
likes_count=post["likes_count"],
dislikes_count=post["dislikes_count"],
comments_count=post["comments_count"],
created_at=post["created_at"],
)
)
return posts
@router.get("/{user_id}/stats", response_model=UserStats)
async def get_user_stats(
user_id: int,
conn: sqlite3.Connection = Depends(get_db),
) -> UserStats:
"""Get user statistics."""
user = _get_user_with_stats(conn, user_id)
return UserStats(
posts_count=user["posts_count"],
followers_count=user["followers_count"],
following_count=user["following_count"],
)
@router.post("/{user_id}/follow", status_code=status.HTTP_201_CREATED)
async def follow_user(
user_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> dict:
"""Follow a user."""
current_user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
# Check user exists
cursor.execute("SELECT id FROM users WHERE id = ?", (user_id,))
if not cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Cannot follow yourself
if current_user_id == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You cannot follow yourself",
)
# Check if already following
cursor.execute(
"SELECT id FROM follows WHERE follower_id = ? AND following_id = ?",
(current_user_id, user_id),
)
if cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You already follow this user",
)
# Create follow
cursor.execute(
"INSERT INTO follows (follower_id, following_id) VALUES (?, ?)",
(current_user_id, user_id),
)
conn.commit()
return {"message": "Successfully followed user"}
@router.delete("/{user_id}/follow", status_code=status.HTTP_200_OK)
async def unfollow_user(
user_id: int,
credentials: HTTPAuthorizationCredentials = Depends(security),
conn: sqlite3.Connection = Depends(get_db),
) -> dict:
"""Unfollow a user."""
current_user_id = await get_current_user_id(credentials)
cursor = conn.cursor()
# Check if following
cursor.execute(
"SELECT id FROM follows WHERE follower_id = ? AND following_id = ?",
(current_user_id, user_id),
)
if not cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="You are not following this user",
)
# Delete follow
cursor.execute(
"DELETE FROM follows WHERE follower_id = ? AND following_id = ?",
(current_user_id, user_id),
)
conn.commit()
return {"message": "Successfully unfollowed user"}

29
app/schemas/__init__.py Normal file
View File

@@ -0,0 +1,29 @@
"""Schemas package."""
from app.schemas.auth import (
Token,
UserLogin,
UserRegister,
UserResponse,
)
# Post schemas
from app.schemas.post import CommentCreate, CommentResponse, PostResponse
# User schemas
from app.schemas.user import UserBase, UserStats
# Feed schemas
from app.schemas.feed import FeedResponse
__all__ = [
"Token",
"UserLogin",
"UserRegister",
"UserResponse",
"CommentCreate",
"CommentResponse",
"PostResponse",
"UserBase",
"UserStats",
"FeedResponse",
]

40
app/schemas/auth.py Normal file
View File

@@ -0,0 +1,40 @@
"""Authentication Pydantic schemas."""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, ConfigDict, EmailStr, Field
class UserRegister(BaseModel):
"""Request model for user registration."""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=6)
class UserLogin(BaseModel):
"""Request model for user login."""
username: str
password: str
class Token(BaseModel):
"""Response model for JWT token."""
access_token: str
token_type: str = "bearer"
class UserResponse(BaseModel):
"""Response model for user data."""
id: int
username: str
email: str
avatar_url: Optional[str] = "/static/default-avatar.png"
bio: Optional[str] = ""
created_at: datetime
model_config = ConfigDict(from_attributes=True)

14
app/schemas/feed.py Normal file
View File

@@ -0,0 +1,14 @@
"""Feed schemas for SocialPhoto API."""
from typing import List
from pydantic import BaseModel
from app.schemas.post import PostResponse
class FeedResponse(BaseModel):
"""Response model for feed."""
posts: List[PostResponse]
total: int
limit: int
offset: int

48
app/schemas/post.py Normal file
View File

@@ -0,0 +1,48 @@
"""Post schemas for SocialPhoto API."""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, ConfigDict, Field
class PostCreate(BaseModel):
"""Request model for creating a post."""
caption: Optional[str] = ""
class PostResponse(BaseModel):
"""Response model for post data."""
id: int
user_id: int
username: str
image_url: str
caption: str
likes_count: int
dislikes_count: int
comments_count: int
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class PostDetail(PostResponse):
"""Detailed post response with user info."""
user: "UserResponse" # noqa: F821
class CommentCreate(BaseModel):
"""Request model for creating a comment."""
content: str = Field(..., min_length=1, max_length=500)
class CommentResponse(BaseModel):
"""Response model for comment data."""
id: int
post_id: int
user_id: int
username: str
content: str
likes_count: int
created_at: datetime
model_config = ConfigDict(from_attributes=True)

20
app/schemas/user.py Normal file
View File

@@ -0,0 +1,20 @@
"""User schemas for SocialPhoto API."""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, ConfigDict
class UserBase(BaseModel):
"""Base user model."""
username: str
email: str
avatar_url: Optional[str] = "/static/default-avatar.png"
bio: Optional[str] = ""
class UserStats(BaseModel):
"""User statistics model."""
posts_count: int
followers_count: int
following_count: int

4
app/services/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
"""Services package."""
from app.services.auth_service import AuthService
__all__ = ["AuthService"]

View File

@@ -0,0 +1,163 @@
"""Authentication service with password hashing and JWT token management."""
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.user import User
from app.schemas.auth import Token, UserRegister
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash.
Args:
plain_password: The plain text password.
hashed_password: The hashed password to verify against.
Returns:
True if password matches, False otherwise.
"""
return pwd_context.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
"""Hash a password using bcrypt.
Args:
password: The plain text password to hash.
Returns:
The hashed password string.
"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token.
Args:
data: The payload data to encode in the token.
expires_delta: Optional custom expiration time delta.
Returns:
The encoded JWT token string.
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
secret_key = settings.SECRET_KEY or "fallback-secret-key-change-in-production"
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> dict:
"""Decode and verify a JWT token.
Args:
token: The JWT token string to decode.
Returns:
The decoded token payload.
Raises:
JWTError: If the token is invalid or expired.
"""
secret_key = settings.SECRET_KEY or "fallback-secret-key-change-in-production"
payload = jwt.decode(token, secret_key, algorithms=[ALGORITHM])
return payload
class AuthService:
"""Service class for authentication operations."""
@staticmethod
def get_user_by_username(db: Session, username: str) -> Optional[User]:
"""Get a user by username.
Args:
db: Database session.
username: The username to search for.
Returns:
User object if found, None otherwise.
"""
return db.query(User).filter(User.username == username).first()
@staticmethod
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""Get a user by email.
Args:
db: Database session.
email: The email to search for.
Returns:
User object if found, None otherwise.
"""
return db.query(User).filter(User.email == email).first()
@staticmethod
def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
"""Get a user by ID.
Args:
db: Database session.
user_id: The user ID to search for.
Returns:
User object if found, None otherwise.
"""
return db.query(User).filter(User.id == user_id).first()
@staticmethod
def create_user(db: Session, user_data: UserRegister) -> User:
"""Create a new user.
Args:
db: Database session.
user_data: The user registration data.
Returns:
The created User object.
"""
hashed_password = hash_password(user_data.password)
db_user = User(
username=user_data.username,
email=user_data.email,
password_hash=hashed_password,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@staticmethod
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""Authenticate a user with username and password.
Args:
db: Database session.
username: The username.
password: The plain text password.
Returns:
User object if authentication successful, None otherwise.
"""
user = AuthService.get_user_by_username(db, username)
if not user:
return None
if not verify_password(password, user.password_hash):
return None
return user

1
app/utils/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Utils package."""

44
pyproject.toml Normal file
View File

@@ -0,0 +1,44 @@
[project]
name = "instagram-clone"
version = "0.1.0"
description = "Instagram Clone API"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.109.0",
"uvicorn[standard]>=0.27.0",
"sqlalchemy>=2.0.0",
"alembic>=1.13.0",
"pydantic>=2.5.0",
"pydantic-settings>=2.1.0",
"python-multipart>=0.0.6",
"python-jose[cryptography]>=3.3.0",
"passlib[bcrypt]>=1.7.4",
"psycopg2-binary>=2.9.9",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4.0",
"pytest-asyncio>=0.23.0",
"httpx>=0.26.0",
"black>=24.0.0",
"ruff>=0.1.0",
"mypy>=1.8.0",
]
[build-system]
requires = ["setuptools>=68.0.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.black]
line-length = 88
target-version = ['py311']
[tool.ruff]
line-length = 88
select = ["E", "F", "I", "N", "W"]
fixable = ["ALL"]
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

11
requirements.txt Normal file
View File

@@ -0,0 +1,11 @@
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
python-multipart>=0.0.6
python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
pydantic>=2.0.0
sqlalchemy>=2.0.0
pytest>=7.4.0
pytest-asyncio>=0.21.0
httpx>=0.25.0
aiosqlite>=0.19.0

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Tests package."""

51
tests/conftest.py Normal file
View File

@@ -0,0 +1,51 @@
"""Pytest configuration and fixtures."""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import StaticPool
from app.main import app
from app.db.database import Base, get_db
# Create in-memory SQLite database for testing
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture(scope="function")
def db_session() -> Session:
"""Create a fresh database for each test."""
# Import models to ensure they're registered with Base
from app.models.user import User # noqa: F401
Base.metadata.create_all(bind=engine)
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function")
def client(db_session: Session) -> TestClient:
"""Create a test client with fresh database."""
def override_get_db():
try:
yield db_session
finally:
pass
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as test_client:
yield test_client
app.dependency_overrides.clear()

View File

@@ -0,0 +1 @@
"""Integration tests package."""

158
tests/test_auth.py Normal file
View File

@@ -0,0 +1,158 @@
"""Tests for authentication routes."""
import pytest
from fastapi.testclient import TestClient
def test_register_success(client: TestClient):
"""Test successful user registration."""
response = client.post(
"/auth/register",
json={
"username": "newuser",
"email": "newuser@example.com",
"password": "password123",
},
)
assert response.status_code == 201
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
def test_register_duplicate_username(client: TestClient):
"""Test registration with duplicate username."""
# First registration
client.post(
"/auth/register",
json={
"username": "duplicate",
"email": "first@example.com",
"password": "password123",
},
)
# Second registration with same username
response = client.post(
"/auth/register",
json={
"username": "duplicate",
"email": "second@example.com",
"password": "password123",
},
)
assert response.status_code == 400
assert "Username already registered" in response.json()["detail"]
def test_register_duplicate_email(client: TestClient):
"""Test registration with duplicate email."""
# First registration
client.post(
"/auth/register",
json={
"username": "firstuser",
"email": "same@example.com",
"password": "password123",
},
)
# Second registration with same email
response = client.post(
"/auth/register",
json={
"username": "seconduser",
"email": "same@example.com",
"password": "password123",
},
)
assert response.status_code == 400
assert "Email already registered" in response.json()["detail"]
def test_register_invalid_email(client: TestClient):
"""Test registration with invalid email format."""
response = client.post(
"/auth/register",
json={
"username": "validuser",
"email": "not-an-email",
"password": "password123",
},
)
assert response.status_code == 422 # Validation error
def test_register_short_password(client: TestClient):
"""Test registration with too short password."""
response = client.post(
"/auth/register",
json={
"username": "validuser",
"email": "valid@example.com",
"password": "12345",
},
)
assert response.status_code == 422 # Validation error
def test_login_success(client: TestClient):
"""Test successful login."""
# Register first
client.post(
"/auth/register",
json={
"username": "loginuser",
"email": "login@example.com",
"password": "password123",
},
)
# Login
response = client.post(
"/auth/login",
json={
"username": "loginuser",
"password": "password123",
},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
def test_login_wrong_password(client: TestClient):
"""Test login with wrong password."""
# Register first
client.post(
"/auth/register",
json={
"username": "testuser",
"email": "test@example.com",
"password": "correctpassword",
},
)
# Login with wrong password
response = client.post(
"/auth/login",
json={
"username": "testuser",
"password": "wrongpassword",
},
)
assert response.status_code == 401
assert "Incorrect username or password" in response.json()["detail"]
def test_login_nonexistent_user(client: TestClient):
"""Test login with nonexistent username."""
response = client.post(
"/auth/login",
json={
"username": "nonexistent",
"password": "password123",
},
)
assert response.status_code == 401
assert "Incorrect username or password" in response.json()["detail"]

292
tests/test_posts.py Normal file
View File

@@ -0,0 +1,292 @@
"""Tests for post routes."""
import io
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
def test_create_post_success(client: TestClient, two_users: dict):
"""Test successful post creation."""
token = two_users["user1_token"]
# Create a fake image
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "My first post!"},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 201
data = response.json()
assert data["caption"] == "My first post!"
assert data["user_id"] is not None
assert data["likes_count"] == 0
assert data["dislikes_count"] == 0
assert data["comments_count"] == 0
def test_create_post_unauthorized(client: TestClient):
"""Test creating post without authentication."""
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "My first post!"},
)
assert response.status_code == 401 # No credentials
def test_create_post_invalid_file_type(client: TestClient, two_users: dict):
"""Test creating post with invalid file type."""
token = two_users["user1_token"]
fake_file = io.BytesIO(b"fake content")
fake_file.name = "test.txt"
response = client.post(
"/posts",
files={"image": ("test.txt", fake_file, "text/plain")},
data={"caption": "Invalid file"},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 400
assert "File type not allowed" in response.json()["detail"]
def test_get_posts_empty(client: TestClient):
"""Test getting posts when database is empty."""
response = client.get("/posts")
assert response.status_code == 200
assert response.json() == []
def test_get_posts(client: TestClient, two_users: dict):
"""Test getting posts."""
token = two_users["user1_token"]
# Create a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Test post"},
headers={"Authorization": f"Bearer {token}"},
)
response = client.get("/posts")
assert response.status_code == 200
posts = response.json()
assert len(posts) == 1
assert posts[0]["caption"] == "Test post"
def test_get_single_post(client: TestClient, two_users: dict):
"""Test getting a single post."""
token = two_users["user1_token"]
# Create a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Single post test"},
headers={"Authorization": f"Bearer {token}"},
)
post_id = create_response.json()["id"]
response = client.get(f"/posts/{post_id}")
assert response.status_code == 200
assert response.json()["caption"] == "Single post test"
def test_get_nonexistent_post(client: TestClient):
"""Test getting a post that doesn't exist."""
response = client.get("/posts/9999")
assert response.status_code == 404
def test_delete_post_by_owner(client: TestClient, two_users: dict):
"""Test deleting own post."""
token = two_users["user1_token"]
# Create a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "To be deleted"},
headers={"Authorization": f"Bearer {token}"},
)
post_id = create_response.json()["id"]
# Delete the post
response = client.delete(
f"/posts/{post_id}",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 204
# Verify it's deleted
response = client.get(f"/posts/{post_id}")
assert response.status_code == 404
def test_delete_post_by_non_owner(client: TestClient, two_users: dict):
"""Test trying to delete someone else's post."""
token1 = two_users["user1_token"]
token2 = two_users["user2_token"]
# User 1 creates a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "User 1's post"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
# User 2 tries to delete it
response = client.delete(
f"/posts/{post_id}",
headers={"Authorization": f"Bearer {token2}"},
)
assert response.status_code == 403
assert "You can only delete your own posts" in response.json()["detail"]
def test_like_post(client: TestClient, two_users: dict):
"""Test liking a post."""
token1 = two_users["user1_token"]
# User 1 creates a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Like test"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
# Like the post
response = client.post(
f"/posts/{post_id}/like",
headers={"Authorization": f"Bearer {token1}"},
)
assert response.status_code == 201
assert response.json()["message"] == "Post liked"
# Verify like count
response = client.get(f"/posts/{post_id}")
assert response.json()["likes_count"] == 1
def test_unlike_post(client: TestClient, two_users: dict):
"""Test removing a like from a post."""
token1 = two_users["user1_token"]
# User 1 creates a post and likes it
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Unlike test"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
client.post(
f"/posts/{post_id}/like",
headers={"Authorization": f"Bearer {token1}"},
)
# Unlike the post
response = client.delete(
f"/posts/{post_id}/like",
headers={"Authorization": f"Bearer {token1}"},
)
assert response.status_code == 200
assert response.json()["message"] == "Like removed"
# Verify like count
response = client.get(f"/posts/{post_id}")
assert response.json()["likes_count"] == 0
def test_dislike_post(client: TestClient, two_users: dict):
"""Test disliking a post."""
token1 = two_users["user1_token"]
# User 1 creates a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Dislike test"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
# Dislike the post
response = client.post(
f"/posts/{post_id}/dislike",
headers={"Authorization": f"Bearer {token1}"},
)
assert response.status_code == 201
assert response.json()["message"] == "Post disliked"
# Verify dislike count
response = client.get(f"/posts/{post_id}")
assert response.json()["dislikes_count"] == 1
def test_like_and_dislike_mutually_exclusive(client: TestClient, two_users: dict):
"""Test that like and dislike are mutually exclusive."""
token1 = two_users["user1_token"]
# User 1 creates a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Mutual exclusion test"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
# Like the post
client.post(
f"/posts/{post_id}/like",
headers={"Authorization": f"Bearer {token1}"},
)
# Dislike the post (should replace like)
response = client.post(
f"/posts/{post_id}/dislike",
headers={"Authorization": f"Bearer {token1}"},
)
assert response.status_code == 201
# Verify: no likes, 1 dislike
response = client.get(f"/posts/{post_id}")
data = response.json()
assert data["likes_count"] == 0
assert data["dislikes_count"] == 1

358
tests/test_social.py Normal file
View File

@@ -0,0 +1,358 @@
"""Tests for social features: users, follows, comments."""
import io
import pytest
from fastapi.testclient import TestClient
def test_get_user_profile(client: TestClient, two_users: dict):
"""Test getting a user profile."""
token1 = two_users["user1_token"]
# Get user ID from token (simplified - in real app would decode)
# Let's create a post first to get the user
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Test post"},
headers={"Authorization": f"Bearer {token1}"},
)
# For now, just test that user endpoint exists and works
# The actual user ID would need to be extracted from the token
response = client.get("/users/1")
assert response.status_code == 200
data = response.json()
assert data["username"] == "user1"
def test_get_user_stats(client: TestClient, two_users: dict):
"""Test getting user statistics."""
token1 = two_users["user1_token"]
# Create a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Stats test"},
headers={"Authorization": f"Bearer {token1}"},
)
# User 2 follows User 1
client.post(
"/users/1/follow",
headers={"Authorization": f"Bearer {two_users['user2_token']}"},
)
response = client.get("/users/1/stats")
assert response.status_code == 200
data = response.json()
assert data["posts_count"] == 1
assert data["followers_count"] == 1
assert data["following_count"] == 0
def test_follow_user(client: TestClient, two_users: dict):
"""Test following a user."""
# User 2 follows User 1
response = client.post(
"/users/1/follow",
headers={"Authorization": f"Bearer {two_users['user2_token']}"},
)
assert response.status_code == 201
assert response.json()["message"] == "Successfully followed user"
def test_follow_self(client: TestClient, two_users: dict):
"""Test that you cannot follow yourself."""
response = client.post(
"/users/1/follow",
headers={"Authorization": f"Bearer {two_users['user1_token']}"},
)
assert response.status_code == 400
assert "You cannot follow yourself" in response.json()["detail"]
def test_follow_already_following(client: TestClient, two_users: dict):
"""Test following someone you already follow."""
# User 2 follows User 1
client.post(
"/users/1/follow",
headers={"Authorization": f"Bearer {two_users['user2_token']}"},
)
# Try to follow again
response = client.post(
"/users/1/follow",
headers={"Authorization": f"Bearer {two_users['user2_token']}"},
)
assert response.status_code == 400
assert "You already follow this user" in response.json()["detail"]
def test_unfollow_user(client: TestClient, two_users: dict):
"""Test unfollowing a user."""
token2 = two_users["user2_token"]
# User 2 follows User 1
client.post(
"/users/1/follow",
headers={"Authorization": f"Bearer {token2}"},
)
# User 2 unfollows User 1
response = client.delete(
"/users/1/follow",
headers={"Authorization": f"Bearer {token2}"},
)
assert response.status_code == 200
assert response.json()["message"] == "Successfully unfollowed user"
def test_unfollow_not_following(client: TestClient, two_users: dict):
"""Test unfollowing someone you don't follow."""
response = client.delete(
"/users/1/follow",
headers={"Authorization": f"Bearer {two_users['user2_token']}"},
)
assert response.status_code == 404
assert "You are not following this user" in response.json()["detail"]
def test_get_user_posts(client: TestClient, two_users: dict):
"""Test getting posts by a specific user."""
token1 = two_users["user1_token"]
# Create two posts as User 1
for i in range(2):
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": f"User 1 post {i}"},
headers={"Authorization": f"Bearer {token1}"},
)
response = client.get("/users/1/posts")
assert response.status_code == 200
posts = response.json()
assert len(posts) == 2
def test_create_comment(client: TestClient, two_users: dict):
"""Test creating a comment."""
token1 = two_users["user1_token"]
# User 1 creates a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Comment test"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
# User 2 comments on the post
response = client.post(
f"/posts/{post_id}/comments",
json={"content": "Great post!"},
headers={"Authorization": f"Bearer {two_users['user2_token']}"},
)
assert response.status_code == 201
data = response.json()
assert data["content"] == "Great post!"
assert data["username"] == "user2"
def test_get_post_comments(client: TestClient, two_users: dict):
"""Test getting comments for a post."""
token1 = two_users["user1_token"]
# User 1 creates a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Comments test"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
# Add a comment
client.post(
f"/posts/{post_id}/comments",
json={"content": "First comment!"},
headers={"Authorization": f"Bearer {token1}"},
)
response = client.get(f"/posts/{post_id}/comments")
assert response.status_code == 200
comments = response.json()
assert len(comments) == 1
assert comments[0]["content"] == "First comment!"
def test_delete_comment_by_owner(client: TestClient, two_users: dict):
"""Test deleting own comment."""
token1 = two_users["user1_token"]
# User 1 creates a post and comments
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Delete comment test"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
comment_response = client.post(
f"/posts/{post_id}/comments",
json={"content": "To be deleted"},
headers={"Authorization": f"Bearer {token1}"},
)
comment_id = comment_response.json()["id"]
# Delete the comment
response = client.delete(
f"/comments/{comment_id}",
headers={"Authorization": f"Bearer {token1}"},
)
assert response.status_code == 204
def test_delete_comment_by_non_owner(client: TestClient, two_users: dict):
"""Test trying to delete someone else's comment."""
token1 = two_users["user1_token"]
token2 = two_users["user2_token"]
# User 1 creates a post and comments
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Unauthorized delete test"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
comment_response = client.post(
f"/posts/{post_id}/comments",
json={"content": "User 1's comment"},
headers={"Authorization": f"Bearer {token1}"},
)
comment_id = comment_response.json()["id"]
# User 2 tries to delete it
response = client.delete(
f"/comments/{comment_id}",
headers={"Authorization": f"Bearer {token2}"},
)
assert response.status_code == 403
assert "You can only delete your own comments" in response.json()["detail"]
def test_like_comment(client: TestClient, two_users: dict):
"""Test liking a comment."""
token1 = two_users["user1_token"]
# User 1 creates a post and comments
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
create_response = client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Comment like test"},
headers={"Authorization": f"Bearer {token1}"},
)
post_id = create_response.json()["id"]
comment_response = client.post(
f"/posts/{post_id}/comments",
json={"content": "Comment to like"},
headers={"Authorization": f"Bearer {token1}"},
)
comment_id = comment_response.json()["id"]
# Like the comment
response = client.post(
f"/comments/{comment_id}/like",
headers={"Authorization": f"Bearer {token1}"},
)
assert response.status_code == 201
assert response.json()["message"] == "Comment liked"
def test_feed_global(client: TestClient, two_users: dict):
"""Test global feed."""
token1 = two_users["user1_token"]
# User 1 creates a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Global feed test"},
headers={"Authorization": f"Bearer {token1}"},
)
response = client.get("/feed/global")
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["posts"]) >= 1
def test_feed_followed(client: TestClient, two_users: dict):
"""Test followed users feed."""
token1 = two_users["user1_token"]
token2 = two_users["user2_token"]
# User 1 creates a post
fake_image = io.BytesIO(b"fake image content")
fake_image.name = "test.jpg"
client.post(
"/posts",
files={"image": ("test.jpg", fake_image, "image/jpeg")},
data={"caption": "Followed feed test"},
headers={"Authorization": f"Bearer {token1}"},
)
# User 2 follows User 1
client.post(
"/users/1/follow",
headers={"Authorization": f"Bearer {token2}"},
)
# Get followed feed for User 2
response = client.get(
"/feed",
headers={"Authorization": f"Bearer {token2}"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["posts"]) >= 1
def test_feed_followed_no_follows(client: TestClient, two_users: dict):
"""Test followed feed when not following anyone."""
response = client.get(
"/feed",
headers={"Authorization": f"Bearer {two_users['user2_token']}"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 0
assert len(data["posts"]) == 0

1
tests/unit/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Unit tests package."""

18
tests/unit/test_config.py Normal file
View File

@@ -0,0 +1,18 @@
"""Unit tests for core configuration."""
import pytest
from app.core.config import Settings
def test_settings_default_values():
"""Test default settings values."""
settings = Settings()
assert settings.APP_NAME == "Instagram Clone"
assert settings.APP_VERSION == "0.1.0"
assert settings.DEBUG is False
def test_settings_database_url_default():
"""Test default database URL."""
settings = Settings()
assert "sqlite" in settings.DATABASE_URL

View File

@@ -0,0 +1,17 @@
"""Unit tests for database module."""
import pytest
from app.db.database import Base, get_db
def test_base_declarative():
"""Test Base is a DeclarativeBase."""
from sqlalchemy.orm import DeclarativeBase
assert issubclass(Base, DeclarativeBase)
def test_get_db_yields_session(db_session):
"""Test get_db dependency yields database session."""
gen = get_db()
db = next(gen)
assert db is not None

19
tests/unit/test_main.py Normal file
View File

@@ -0,0 +1,19 @@
"""Unit tests for main application endpoints."""
import pytest
def test_root_endpoint(client):
"""Test root endpoint returns welcome message."""
response = client.get("/")
assert response.status_code == 200
data = response.json()
assert "message" in data
assert "version" in data
def test_health_check_endpoint(client):
"""Test health check endpoint."""
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "healthy"}

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

View File

@@ -0,0 +1 @@
fake image content

Some files were not shown because too many files have changed in this diff Show More