Files
mapmaker/app/services/map_service.py

128 lines
3.8 KiB
Python

"""Map service for business logic."""
from typing import List, Optional
from uuid import UUID
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from app.models.map import Map
from app.models.user import User
from app.models.map_share import MapShare
from app.schemas.map import MapCreate, MapUpdate
def get_user_maps(db: Session, user_id: UUID) -> List[Map]:
"""Get all maps owned by or shared with a user."""
# Get owned maps
owned_maps = db.query(Map).filter(Map.owner_id == user_id).all()
# Get shared maps
shared_map_ids = db.query(MapShare.map_id).filter(MapShare.user_id == user_id).all()
shared_map_ids = [share.map_id for share in shared_map_ids]
shared_maps = []
if shared_map_ids:
shared_maps = db.query(Map).filter(Map.id.in_(shared_map_ids)).all()
# Combine and sort by updated_at
all_maps = owned_maps + shared_maps
all_maps.sort(key=lambda m: m.updated_at, reverse=True)
return all_maps
def get_map_by_id(db: Session, map_id: UUID, user: Optional[User] = None) -> Map:
"""Get a map by ID with optional authorization check."""
map_obj = db.query(Map).filter(Map.id == map_id).first()
if not map_obj:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Map not found"
)
# If user is provided, check authorization
if user:
# Check if user is owner, admin, or has been granted access via share
is_owner = map_obj.owner_id == user.id
is_admin = user.is_admin
has_share_access = db.query(MapShare).filter(
MapShare.map_id == map_id,
MapShare.user_id == user.id
).first() is not None
if not (is_owner or is_admin or has_share_access):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to access this map"
)
return map_obj
def get_default_public_map(db: Session) -> Optional[Map]:
"""Get the default public map (first map with is_default_public=True)."""
return db.query(Map).filter(Map.is_default_public == True).first()
def create_map(db: Session, map_data: MapCreate, user_id: UUID) -> Map:
"""Create a new map."""
map_obj = Map(
name=map_data.name,
description=map_data.description,
owner_id=user_id,
is_default_public=False
)
db.add(map_obj)
db.commit()
db.refresh(map_obj)
return map_obj
def update_map(db: Session, map_id: UUID, map_data: MapUpdate, user: User) -> Map:
"""Update a map. Only owner or admin can update."""
map_obj = get_map_by_id(db, map_id, user)
# Update fields if provided
if map_data.name is not None:
map_obj.name = map_data.name
if map_data.description is not None:
map_obj.description = map_data.description
db.commit()
db.refresh(map_obj)
return map_obj
def delete_map(db: Session, map_id: UUID, user: User) -> None:
"""Delete a map. Only owner or admin can delete."""
map_obj = get_map_by_id(db, map_id, user)
db.delete(map_obj)
db.commit()
def set_default_public_map(db: Session, map_id: UUID, user: User) -> Map:
"""Set a map as the default public map. Only admins can do this."""
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can set the default public map"
)
# Get the map (admin check is done in get_map_by_id)
map_obj = get_map_by_id(db, map_id, user)
# Unset any existing default public maps
db.query(Map).filter(Map.is_default_public == True).update({"is_default_public": False})
# Set this map as default public
map_obj.is_default_public = True
db.commit()
db.refresh(map_obj)
return map_obj