from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from jose import JWTError, jwt from app.database import get_db from app.config import settings from app.models.user import User # Security scheme for JWT security = HTTPBearer() async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """ Dependency to get the current authenticated user from JWT token. Raises HTTP 401 if token is invalid or user not found. """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: # Decode JWT token token = credentials.credentials payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception # Get user from database user = db.query(User).filter(User.id == user_id).first() if user is None: raise credentials_exception return user async def get_current_active_user( current_user: User = Depends(get_current_user) ) -> User: """ Dependency to get the current active user. Can be extended to check if user is active/banned. """ # Add user.is_active check here if needed in the future return current_user async def get_current_admin_user( current_user: User = Depends(get_current_user) ) -> User: """ Dependency to get the current admin user. Raises HTTP 403 if user is not an admin. """ if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) return current_user async def get_optional_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)), db: Session = Depends(get_db) ) -> Optional[User]: """ Dependency to optionally get the current user. Returns None if no token provided or token is invalid. Useful for endpoints that work for both authenticated and guest users. """ if credentials is None: return None try: token = credentials.credentials payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: return None user = db.query(User).filter(User.id == user_id).first() return user except JWTError: return None