from sqlalchemy.orm import Session from fastapi import HTTPException, status from app.models.user import User from app.schemas.user import UserCreate from app.schemas.auth import TokenResponse from app.utils.password import hash_password, verify_password from app.utils.security import create_access_token, create_refresh_token from app.config import settings def authenticate_user(db: Session, username: str, password: str) -> User: """ Authenticate a user by username and password. Args: db: Database session username: Username password: Plain text password Returns: User object if authentication successful Raises: HTTPException: If authentication fails """ user = db.query(User).filter(User.username == username).first() if not user or not verify_password(password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) return user def create_user(db: Session, user_data: UserCreate) -> User: """ Create a new user. Args: db: Database session user_data: User creation data Returns: Created user object Raises: HTTPException: If username or email already exists """ # Check if registration is allowed if not settings.ALLOW_REGISTRATION: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User registration is currently disabled" ) # Check if username already exists if db.query(User).filter(User.username == user_data.username).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered" ) # Check if email already exists if db.query(User).filter(User.email == user_data.email).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create new user hashed_password = hash_password(user_data.password) db_user = User( username=user_data.username, email=user_data.email, password_hash=hashed_password, is_admin=False ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def create_tokens_for_user(user: User) -> TokenResponse: """ Create access and refresh tokens for a user. Args: user: User object Returns: TokenResponse with access and refresh tokens """ access_token = create_access_token(str(user.id)) refresh_token = create_refresh_token(str(user.id)) return TokenResponse( access_token=access_token, refresh_token=refresh_token, token_type="bearer" )