107 lines
2.8 KiB
Python
107 lines
2.8 KiB
Python
from sqlalchemy.orm import Session
|
|
from fastapi import HTTPException, status
|
|
from app.models.user import User
|
|
from app.schemas.user import UserCreate
|
|
from app.schemas.auth import TokenResponse
|
|
from app.utils.password import hash_password, verify_password
|
|
from app.utils.security import create_access_token, create_refresh_token
|
|
from app.config import settings
|
|
|
|
|
|
def authenticate_user(db: Session, username: str, password: str) -> User:
|
|
"""
|
|
Authenticate a user by username and password.
|
|
|
|
Args:
|
|
db: Database session
|
|
username: Username
|
|
password: Plain text password
|
|
|
|
Returns:
|
|
User object if authentication successful
|
|
|
|
Raises:
|
|
HTTPException: If authentication fails
|
|
"""
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
if not user or not verify_password(password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
def create_user(db: Session, user_data: UserCreate) -> User:
|
|
"""
|
|
Create a new user.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_data: User creation data
|
|
|
|
Returns:
|
|
Created user object
|
|
|
|
Raises:
|
|
HTTPException: If username or email already exists
|
|
"""
|
|
# Check if registration is allowed
|
|
if not settings.ALLOW_REGISTRATION:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="User registration is currently disabled"
|
|
)
|
|
|
|
# Check if username already exists
|
|
if db.query(User).filter(User.username == user_data.username).first():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Username already registered"
|
|
)
|
|
|
|
# Check if email already exists
|
|
if db.query(User).filter(User.email == user_data.email).first():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Email already registered"
|
|
)
|
|
|
|
# Create new user
|
|
hashed_password = hash_password(user_data.password)
|
|
db_user = User(
|
|
username=user_data.username,
|
|
email=user_data.email,
|
|
password_hash=hashed_password,
|
|
is_admin=False
|
|
)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
|
|
return db_user
|
|
|
|
|
|
def create_tokens_for_user(user: User) -> TokenResponse:
|
|
"""
|
|
Create access and refresh tokens for a user.
|
|
|
|
Args:
|
|
user: User object
|
|
|
|
Returns:
|
|
TokenResponse with access and refresh tokens
|
|
"""
|
|
access_token = create_access_token(str(user.id))
|
|
refresh_token = create_refresh_token(str(user.id))
|
|
|
|
return TokenResponse(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
token_type="bearer"
|
|
)
|