128 lines
3.8 KiB
Python
128 lines
3.8 KiB
Python
"""Map service for business logic."""
|
|
from typing import List, Optional
|
|
from uuid import UUID
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import HTTPException, status
|
|
|
|
from app.models.map import Map
|
|
from app.models.user import User
|
|
from app.models.map_share import MapShare
|
|
from app.schemas.map import MapCreate, MapUpdate
|
|
|
|
|
|
def get_user_maps(db: Session, user_id: UUID) -> List[Map]:
|
|
"""Get all maps owned by or shared with a user."""
|
|
# Get owned maps
|
|
owned_maps = db.query(Map).filter(Map.owner_id == user_id).all()
|
|
|
|
# Get shared maps
|
|
shared_map_ids = db.query(MapShare.map_id).filter(MapShare.user_id == user_id).all()
|
|
shared_map_ids = [share.map_id for share in shared_map_ids]
|
|
|
|
shared_maps = []
|
|
if shared_map_ids:
|
|
shared_maps = db.query(Map).filter(Map.id.in_(shared_map_ids)).all()
|
|
|
|
# Combine and sort by updated_at
|
|
all_maps = owned_maps + shared_maps
|
|
all_maps.sort(key=lambda m: m.updated_at, reverse=True)
|
|
|
|
return all_maps
|
|
|
|
|
|
def get_map_by_id(db: Session, map_id: UUID, user: Optional[User] = None) -> Map:
|
|
"""Get a map by ID with optional authorization check."""
|
|
map_obj = db.query(Map).filter(Map.id == map_id).first()
|
|
|
|
if not map_obj:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Map not found"
|
|
)
|
|
|
|
# If user is provided, check authorization
|
|
if user:
|
|
# Check if user is owner, admin, or has been granted access via share
|
|
is_owner = map_obj.owner_id == user.id
|
|
is_admin = user.is_admin
|
|
has_share_access = db.query(MapShare).filter(
|
|
MapShare.map_id == map_id,
|
|
MapShare.user_id == user.id
|
|
).first() is not None
|
|
|
|
if not (is_owner or is_admin or has_share_access):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have permission to access this map"
|
|
)
|
|
|
|
return map_obj
|
|
|
|
|
|
def get_default_public_map(db: Session) -> Optional[Map]:
|
|
"""Get the default public map (first map with is_default_public=True)."""
|
|
return db.query(Map).filter(Map.is_default_public == True).first()
|
|
|
|
|
|
def create_map(db: Session, map_data: MapCreate, user_id: UUID) -> Map:
|
|
"""Create a new map."""
|
|
map_obj = Map(
|
|
name=map_data.name,
|
|
description=map_data.description,
|
|
owner_id=user_id,
|
|
is_default_public=False
|
|
)
|
|
|
|
db.add(map_obj)
|
|
db.commit()
|
|
db.refresh(map_obj)
|
|
|
|
return map_obj
|
|
|
|
|
|
def update_map(db: Session, map_id: UUID, map_data: MapUpdate, user: User) -> Map:
|
|
"""Update a map. Only owner or admin can update."""
|
|
map_obj = get_map_by_id(db, map_id, user)
|
|
|
|
# Update fields if provided
|
|
if map_data.name is not None:
|
|
map_obj.name = map_data.name
|
|
if map_data.description is not None:
|
|
map_obj.description = map_data.description
|
|
|
|
db.commit()
|
|
db.refresh(map_obj)
|
|
|
|
return map_obj
|
|
|
|
|
|
def delete_map(db: Session, map_id: UUID, user: User) -> None:
|
|
"""Delete a map. Only owner or admin can delete."""
|
|
map_obj = get_map_by_id(db, map_id, user)
|
|
|
|
db.delete(map_obj)
|
|
db.commit()
|
|
|
|
|
|
def set_default_public_map(db: Session, map_id: UUID, user: User) -> Map:
|
|
"""Set a map as the default public map. Only admins can do this."""
|
|
if not user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only administrators can set the default public map"
|
|
)
|
|
|
|
# Get the map (admin check is done in get_map_by_id)
|
|
map_obj = get_map_by_id(db, map_id, user)
|
|
|
|
# Unset any existing default public maps
|
|
db.query(Map).filter(Map.is_default_public == True).update({"is_default_public": False})
|
|
|
|
# Set this map as default public
|
|
map_obj.is_default_public = True
|
|
|
|
db.commit()
|
|
db.refresh(map_obj)
|
|
|
|
return map_obj
|