55 lines
1.6 KiB
Python
55 lines
1.6 KiB
Python
from sqlalchemy.orm import Session
|
|
from app.models.product import Product
|
|
from app.models.stock_movement import StockMovement, MovementType
|
|
from app.schemas.stock_movement import StockMoveRequest
|
|
|
|
|
|
def register_movement(db: Session, data: StockMoveRequest) -> tuple[StockMovement | None, str]:
|
|
prod = db.query(Product).filter(Product.id == data.product_id).first()
|
|
if not prod:
|
|
return None, "Producto no encontrado"
|
|
|
|
movement_type = MovementType(data.type)
|
|
|
|
# Validate stock for OUT movements
|
|
if movement_type == MovementType.OUT and prod.current_stock < data.quantity:
|
|
return None, "Stock insuficiente"
|
|
|
|
# Update product stock
|
|
if movement_type == MovementType.IN:
|
|
prod.current_stock += data.quantity
|
|
elif movement_type == MovementType.OUT:
|
|
prod.current_stock -= data.quantity
|
|
else: # ADJUST
|
|
prod.current_stock = data.quantity
|
|
|
|
# Record movement
|
|
movement = StockMovement(
|
|
product_id=data.product_id,
|
|
type=movement_type,
|
|
quantity=data.quantity,
|
|
reason=data.reason,
|
|
)
|
|
db.add(movement)
|
|
db.commit()
|
|
db.refresh(movement)
|
|
return movement, ""
|
|
|
|
|
|
def get_product_history(db: Session, product_id: str):
|
|
return db.query(StockMovement).filter(StockMovement.product_id == product_id).all()
|
|
|
|
|
|
def get_stock_summary(db: Session):
|
|
products = db.query(Product).all()
|
|
return [
|
|
{
|
|
"product_id": p.id,
|
|
"product_name": p.name,
|
|
"current_stock": p.current_stock,
|
|
"min_stock": p.min_stock,
|
|
"is_low_stock": p.current_stock < p.min_stock,
|
|
}
|
|
for p in products
|
|
]
|