Files
mapmaker/app/dependencies.py

112 lines
3.3 KiB
Python

from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from jose import JWTError, jwt
from app.database import get_db
from app.config import settings
from app.models.user import User
# Security scheme for JWT
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""
Dependency to get the current authenticated user from JWT token.
Raises HTTP 401 if token is invalid or user not found.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Decode JWT token
token = credentials.credentials
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# Get user from database
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
Dependency to get the current active user.
Can be extended to check if user is active/banned.
"""
# Add user.is_active check here if needed in the future
return current_user
async def get_current_admin_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
Dependency to get the current admin user.
Raises HTTP 403 if user is not an admin.
"""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
async def get_optional_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)),
db: Session = Depends(get_db)
) -> Optional[User]:
"""
Dependency to optionally get the current user.
Returns None if no token provided or token is invalid.
Useful for endpoints that work for both authenticated and guest users.
"""
if credentials is None:
return None
try:
token = credentials.credentials
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
return None
user = db.query(User).filter(User.id == user_id).first()
return user
except JWTError:
return None
def get_user_from_token(token: str, db: Session) -> Optional[User]:
"""
Get user from JWT token string (for WebSocket authentication).
Returns None if token is invalid.
"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
return None
user = db.query(User).filter(User.id == user_id).first()
return user
except JWTError:
return None