347 lines
12 KiB
Python

# django imports
from django.contrib.auth import login
# rest_framework imports
from rest_framework import generics, permissions
from rest_framework.authtoken.serializers import AuthTokenSerializer
from api.filters import UserFilter
from api.mixins import StaffEditorPermissionMixin
from api.models import User, Atoll, Island
from rest_framework.response import Response
from rest_framework import status
from rest_framework.exceptions import ValidationError
from rest_framework.decorators import api_view, permission_classes
from api.serializers import AtollSerializer, IslandSerializer
# knox imports
from knox.views import LoginView as KnoxLoginView
from knox.models import AuthToken
from django_filters.rest_framework import DjangoFilterBackend
import re
from typing import cast, Dict, Any
from django.core.mail import send_mail
from django.db.models import Q
# local apps import
from .serializers import (
KnoxTokenSerializer,
UserSerializer,
AuthSerializer,
CustomUserSerializer,
CustomReadOnlyUserSerializer,
CustomReadOnlyUserByIDCardSerializer,
)
ID_CARD_PATTERN = r"^[A-Z]{1,2}[0-9]{6,7}$"
MOBILE_PATTERN = r"^[7|9][0-9]{6}$"
ACCOUNT_NUMBER_PATTERN = r"^(7\d{12}|9\d{16})$"
class ErrorMessages:
USERNAME_EXISTS = "Username already exists."
INVALID_ID_CARD = "Please enter a valid ID card number."
INVALID_MOBILE = "Please enter a valid mobile number."
INVALID_ACCOUNT = "Please enter a valid account number."
class CreateUserView(generics.CreateAPIView):
# Create user API view
serializer_class = UserSerializer
permission_classes = (permissions.AllowAny,)
queryset = User.objects.all()
def post(self, request, *args, **kwargs):
# Extract required fields from request data
password = request.data.get("password")
username = request.data.get("username")
address = request.data.get("address")
mobile = request.data.get("mobile")
acc_no = request.data.get("acc_no")
id_card = request.data.get("id_card")
dob = request.data.get("dob")
atoll_id = request.data.get("atoll")
island_id = request.data.get("island")
terms_accepted = request.data.get("terms_accepted")
policy_accepted = request.data.get("policy_accepted")
firstname = request.data.get("firstname")
lastname = request.data.get("lastname")
# Validate required fields first
validation_error = self.validate_required_fields(request.data)
if validation_error:
return validation_error
# Check username uniqueness after validation
if User.objects.filter(username=username).exists():
return Response({"message": ErrorMessages.USERNAME_EXISTS}, status=400)
if id_card and not re.match(ID_CARD_PATTERN, id_card):
return Response({"message": ErrorMessages.INVALID_ID_CARD}, status=400)
if User.objects.filter(id_card=id_card).exists():
return Response({"message": "ID card already exists."}, status=400)
if mobile is None or not re.match(MOBILE_PATTERN, mobile):
return Response({"message": ErrorMessages.INVALID_MOBILE}, status=400)
if acc_no is None or not re.match(ACCOUNT_NUMBER_PATTERN, acc_no):
return Response({"message": ErrorMessages.INVALID_ACCOUNT}, status=400)
# Fetch Atoll and Island instances
try:
atoll = Atoll.objects.get(id=atoll_id)
island = Island.objects.get(id=island_id)
except Atoll.DoesNotExist:
return Response({"message": "Atoll not found."}, status=404)
except Island.DoesNotExist:
return Response({"message": "Island not found."}, status=404)
# Create user
user = User.objects.create_user(
first_name=firstname,
last_name=lastname,
username=str(username),
password=password,
email=None,
address=address,
mobile=mobile,
acc_no=acc_no,
id_card=id_card,
dob=dob,
atoll=atoll,
island=island,
terms_accepted=terms_accepted,
policy_accepted=policy_accepted,
)
serializer = self.get_serializer(user)
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data, status=status.HTTP_201_CREATED, headers=headers
)
def validate_required_fields(self, data):
required_fields = {
"firstname": "First name",
"lastname": "Last name",
"password": "Password",
"username": "Username",
"address": "Address",
"mobile": "Mobile number",
"acc_no": "Account number",
"id_card": "ID card",
"dob": "Date of birth",
"atoll": "Atoll",
"island": "Island",
}
for field, label in required_fields.items():
if not data.get(field):
return Response({"message": f"{label} is required."}, status=400)
if data.get("terms_accepted") is None:
return Response({"message": "Terms acceptance is required."}, status=400)
if data.get("policy_accepted") is None:
return Response({"message": "Policy acceptance is required."}, status=400)
return None
class LoginView(KnoxLoginView):
# login view extending KnoxLoginView
serializer_class = AuthSerializer
permission_classes = (permissions.AllowAny,)
throttle_scope = "login"
def post(self, request, format=None):
try:
serializer = AuthTokenSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = cast(Dict[str, Any], serializer.validated_data)["user"]
login(request, user)
response = super(LoginView, self).post(request, format=None)
return response
except ValidationError as e:
message = "Unable to log in with provided credentials."
if (
hasattr(e, "detail")
and isinstance(e.detail, list)
and len(e.detail) > 0
):
message = e.detail[0]
return Response({"message": message}, status=status.HTTP_400_BAD_REQUEST)
class ManageUserView(generics.RetrieveUpdateAPIView):
"""Manage the authenticated user"""
serializer_class = CustomUserSerializer
permission_classes = (permissions.IsAuthenticated,)
def get_object(self):
"""Retrieve and return authenticated user"""
return self.request.user
class KnoxTokenListApiView(
StaffEditorPermissionMixin,
generics.ListAPIView,
):
# Create user API view
serializer_class = KnoxTokenSerializer
permission_classes = (permissions.IsAuthenticated,)
queryset = AuthToken.objects.all()
def get(self, request, *args, **kwargs):
user_id = getattr(request.user, "id", None)
if user_id is None:
return Response({"error": "User ID not found"}, status=400)
queryset = AuthToken.objects.filter(user_id=user_id)
data = KnoxTokenSerializer(queryset, many=True).data
return Response({"data": data})
class ListUserView(StaffEditorPermissionMixin, generics.ListAPIView):
# Create user API view
serializer_class = CustomReadOnlyUserSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = "__all__"
filterset_class = UserFilter
queryset = User.objects.all()
@api_view(["GET"])
def filter_user(request):
id_card = request.GET.get("id_card", "").strip() or None
mobile = request.GET.get("mobile", "").strip() or None
if not id_card and not mobile:
return Response({"ok": False})
filters = Q()
if id_card is not None:
filters |= Q(id_card=id_card)
if mobile is not None:
filters |= Q(mobile=mobile)
user = User.objects.filter(filters).first()
print(f"Querying with filters: {filters}")
print(f"Found user: {user}")
return Response(
{"ok": True, "verified": user.verified}
if user
else {"ok": False, "verified": False}
)
class ListUserByIDCardView(generics.ListAPIView):
# Create user API view
permission_classes = (permissions.AllowAny,)
serializer_class = CustomReadOnlyUserByIDCardSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = "__all__"
filterset_class = UserFilter
queryset = User.objects.all()
class UserDetailAPIView(StaffEditorPermissionMixin, generics.RetrieveAPIView):
queryset = User.objects.all()
serializer_class = CustomReadOnlyUserSerializer
lookup_field = "pk"
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
data = serializer.data
# Customize the response format
return Response(data)
@api_view(["GET"])
def healthcheck(request):
return Response({"status": "ok"}, status=status.HTTP_200_OK)
@api_view(["POST"])
@permission_classes((permissions.AllowAny,))
def test_email(request):
send_mail(
"Subject here",
"Here is the message.",
"noreply@sarlink.net",
["shihaam@shihaam.me"],
fail_silently=False,
)
return Response({"status": "ok"}, status=status.HTTP_200_OK)
class CreateAtollView(StaffEditorPermissionMixin, generics.CreateAPIView):
serializer_class = AtollSerializer
queryset = Atoll.objects.all()
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
name = serializer.validated_data.get("name")
print(name)
if Atoll.objects.filter(name=name).exists():
return Response({"message": "Atoll name already exists."}, status=400)
return super().create(request, *args, **kwargs)
class ListAtollView(generics.ListAPIView):
permission_classes = (permissions.AllowAny,)
serializer_class = AtollSerializer
queryset = Atoll.objects.all()
throttle_classes = () # override throttling
class RetrieveUpdateDestroyAtollView(
StaffEditorPermissionMixin, generics.RetrieveUpdateDestroyAPIView
):
serializer_class = AtollSerializer
queryset = Atoll.objects.all()
lookup_field = "pk"
def update(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
name = serializer.validated_data.get("name")
if name and Atoll.objects.filter(name=name).exclude(pk=instance.pk).exists():
return Response({"message": "Atoll name already exists."}, status=400)
return super().update(request, *args, **kwargs)
class ListCreateIslandView(StaffEditorPermissionMixin, generics.ListCreateAPIView):
serializer_class = IslandSerializer
queryset = Island.objects.all()
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
name = serializer.validated_data.get("name")
if Island.objects.filter(name=name).exists():
return Response({"message": "Island name already exists."}, status=400)
return super().create(request, *args, **kwargs)
class RetrieveUpdateDestroyIslandView(
StaffEditorPermissionMixin, generics.RetrieveUpdateDestroyAPIView
):
serializer_class = IslandSerializer
queryset = Island.objects.all()
lookup_field = "pk"
def update(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
name = serializer.validated_data.get("name")
if name and Island.objects.filter(name=name).exclude(pk=instance.pk).exists():
return Response({"message": "Island name already exists."}, status=400)
return super().update(request, *args, **kwargs)