"""Post routes for SocialPhoto.""" import os import sqlite3 import uuid from pathlib import Path from typing import List, Optional from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from app.auth import get_current_user_id from app.database import get_db, row_to_dict from app.schemas import CommentCreate, CommentResponse, PostResponse router = APIRouter(prefix="/posts", tags=["Posts"]) security = HTTPBearer() # Configuration UPLOAD_DIR = Path(__file__).parent.parent.parent / "uploads" MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp"} # Ensure upload directory exists UPLOAD_DIR.mkdir(parents=True, exist_ok=True) def _get_post_with_counts(conn: sqlite3.Connection, post_id: int) -> Optional[dict]: """Get post data with like/dislike/comment counts.""" cursor = conn.cursor() cursor.execute( """ SELECT p.id, p.user_id, u.username, p.image_path, p.caption, p.created_at, (SELECT COUNT(*) FROM likes WHERE post_id = p.id) as likes_count, (SELECT COUNT(*) FROM dislikes WHERE post_id = p.id) as dislikes_count, (SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comments_count FROM posts p JOIN users u ON p.user_id = u.id WHERE p.id = ? """, (post_id,), ) row = cursor.fetchone() if not row: return None post = row_to_dict(row) post["image_url"] = f"/uploads/{post['image_path'].split('/')[-1]}" return post @router.post("", response_model=PostResponse, status_code=status.HTTP_201_CREATED) async def create_post( caption: str = Form(""), image: UploadFile = File(...), credentials: HTTPAuthorizationCredentials = Depends(security), conn: sqlite3.Connection = Depends(get_db), ) -> PostResponse: """Create a new post with image.""" user_id = await get_current_user_id(credentials) # Validate file type file_ext = Path(image.filename).suffix.lower() if file_ext not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"File type not allowed. Allowed: {', '.join(ALLOWED_EXTENSIONS)}", ) # Generate unique filename unique_filename = f"{uuid.uuid4()}{file_ext}" file_path = UPLOAD_DIR / unique_filename # Save file contents = await image.read() if len(contents) > MAX_FILE_SIZE: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="File too large. Maximum size is 10MB", ) with open(file_path, "wb") as f: f.write(contents) # Insert into database cursor = conn.cursor() cursor.execute( "INSERT INTO posts (user_id, image_path, caption) VALUES (?, ?, ?)", (user_id, str(file_path), caption), ) conn.commit() post_id = cursor.lastrowid # Get the created post post = _get_post_with_counts(conn, post_id) return PostResponse(**post) @router.get("", response_model=List[PostResponse]) async def get_posts( limit: int = 20, offset: int = 0, conn: sqlite3.Connection = Depends(get_db), ) -> List[PostResponse]: """Get global feed of posts.""" cursor = conn.cursor() cursor.execute( """ SELECT p.id, p.user_id, u.username, p.image_path, p.caption, p.created_at, (SELECT COUNT(*) FROM likes WHERE post_id = p.id) as likes_count, (SELECT COUNT(*) FROM dislikes WHERE post_id = p.id) as dislikes_count, (SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comments_count FROM posts p JOIN users u ON p.user_id = u.id ORDER BY p.created_at DESC LIMIT ? OFFSET ? """, (limit, offset), ) posts = [] for row in cursor.fetchall(): post = row_to_dict(row) posts.append( PostResponse( id=post["id"], user_id=post["user_id"], username=post["username"], image_url=f"/uploads/{post['image_path'].split('/')[-1]}", caption=post["caption"], likes_count=post["likes_count"], dislikes_count=post["dislikes_count"], comments_count=post["comments_count"], created_at=post["created_at"], ) ) return posts @router.get("/{post_id}", response_model=PostResponse) async def get_post( post_id: int, conn: sqlite3.Connection = Depends(get_db), ) -> PostResponse: """Get a specific post.""" post = _get_post_with_counts(conn, post_id) if not post: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Post not found", ) return PostResponse(**post) @router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_post( post_id: int, credentials: HTTPAuthorizationCredentials = Depends(security), conn: sqlite3.Connection = Depends(get_db), ) -> None: """Delete a post (only by owner).""" user_id = await get_current_user_id(credentials) cursor = conn.cursor() # Check post exists and belongs to user cursor.execute("SELECT user_id, image_path FROM posts WHERE id = ?", (post_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Post not found", ) post = row_to_dict(row) if post["user_id"] != user_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You can only delete your own posts", ) # Delete image file image_path = Path(post["image_path"]) if image_path.exists(): image_path.unlink() # Delete post (cascade deletes comments, likes, dislikes) cursor.execute("DELETE FROM posts WHERE id = ?", (post_id,)) conn.commit() @router.post("/{post_id}/like", status_code=status.HTTP_201_CREATED) async def like_post( post_id: int, credentials: HTTPAuthorizationCredentials = Depends(security), conn: sqlite3.Connection = Depends(get_db), ) -> dict: """Like a post.""" user_id = await get_current_user_id(credentials) cursor = conn.cursor() # Check post exists cursor.execute("SELECT id FROM posts WHERE id = ?", (post_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Post not found", ) # Remove any existing dislike first cursor.execute( "DELETE FROM dislikes WHERE post_id = ? AND user_id = ?", (post_id, user_id), ) # Add like try: cursor.execute( "INSERT INTO likes (post_id, user_id) VALUES (?, ?)", (post_id, user_id), ) conn.commit() except sqlite3.IntegrityError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="You already liked this post", ) return {"message": "Post liked"} @router.delete("/{post_id}/like", status_code=status.HTTP_200_OK) async def unlike_post( post_id: int, credentials: HTTPAuthorizationCredentials = Depends(security), conn: sqlite3.Connection = Depends(get_db), ) -> dict: """Remove like from a post.""" user_id = await get_current_user_id(credentials) cursor = conn.cursor() cursor.execute( "DELETE FROM likes WHERE post_id = ? AND user_id = ?", (post_id, user_id), ) if cursor.rowcount == 0: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="You haven't liked this post", ) conn.commit() return {"message": "Like removed"} @router.post("/{post_id}/dislike", status_code=status.HTTP_201_CREATED) async def dislike_post( post_id: int, credentials: HTTPAuthorizationCredentials = Depends(security), conn: sqlite3.Connection = Depends(get_db), ) -> dict: """Dislike a post.""" user_id = await get_current_user_id(credentials) cursor = conn.cursor() # Check post exists cursor.execute("SELECT id FROM posts WHERE id = ?", (post_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Post not found", ) # Remove any existing like first cursor.execute( "DELETE FROM likes WHERE post_id = ? AND user_id = ?", (post_id, user_id), ) # Add dislike try: cursor.execute( "INSERT INTO dislikes (post_id, user_id) VALUES (?, ?)", (post_id, user_id), ) conn.commit() except sqlite3.IntegrityError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="You already disliked this post", ) return {"message": "Post disliked"} @router.delete("/{post_id}/dislike", status_code=status.HTTP_200_OK) async def undislike_post( post_id: int, credentials: HTTPAuthorizationCredentials = Depends(security), conn: sqlite3.Connection = Depends(get_db), ) -> dict: """Remove dislike from a post.""" user_id = await get_current_user_id(credentials) cursor = conn.cursor() cursor.execute( "DELETE FROM dislikes WHERE post_id = ? AND user_id = ?", (post_id, user_id), ) if cursor.rowcount == 0: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="You haven't disliked this post", ) conn.commit() return {"message": "Dislike removed"} @router.get("/{post_id}/comments", response_model=list[CommentResponse]) async def get_post_comments( post_id: int, conn: sqlite3.Connection = Depends(get_db), ) -> list[CommentResponse]: """Get all comments for a post.""" cursor = conn.cursor() # Check post exists cursor.execute("SELECT id FROM posts WHERE id = ?", (post_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Post not found", ) cursor.execute( """ SELECT c.id, c.post_id, c.user_id, u.username, c.content, c.created_at, (SELECT COUNT(*) FROM comment_likes WHERE comment_id = c.id) as likes_count FROM comments c JOIN users u ON c.user_id = u.id WHERE c.post_id = ? ORDER BY c.created_at ASC """, (post_id,), ) comments = [] for row in cursor.fetchall(): comment = row_to_dict(row) comments.append(CommentResponse(**comment)) return comments @router.post("/{post_id}/comments", response_model=CommentResponse, status_code=status.HTTP_201_CREATED) async def create_comment( post_id: int, comment_data: CommentCreate, credentials: HTTPAuthorizationCredentials = Depends(security), conn: sqlite3.Connection = Depends(get_db), ) -> CommentResponse: """Create a comment on a post.""" user_id = await get_current_user_id(credentials) cursor = conn.cursor() # Check post exists cursor.execute("SELECT id FROM posts WHERE id = ?", (post_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Post not found", ) # Create comment cursor.execute( "INSERT INTO comments (post_id, user_id, content) VALUES (?, ?, ?)", (post_id, user_id, comment_data.content), ) conn.commit() comment_id = cursor.lastrowid # Get the created comment with user info cursor.execute( """ SELECT c.id, c.post_id, c.user_id, u.username, c.content, c.created_at, (SELECT COUNT(*) FROM comment_likes WHERE comment_id = c.id) as likes_count FROM comments c JOIN users u ON c.user_id = u.id WHERE c.id = ? """, (comment_id,), ) row = cursor.fetchone() comment = row_to_dict(row) return CommentResponse(**comment)