# django imports from django.contrib.auth import login # rest_framework imports from rest_framework import generics, permissions from rest_framework.authtoken.serializers import AuthTokenSerializer from api.filters import UserFilter from api.mixins import StaffEditorPermissionMixin from api.models import User, Atoll, Island from rest_framework.response import Response from rest_framework import status from rest_framework.exceptions import ValidationError from rest_framework.decorators import api_view, permission_classes from api.serializers import AtollSerializer, IslandSerializer # knox imports from knox.views import LoginView as KnoxLoginView from knox.models import AuthToken from django_filters.rest_framework import DjangoFilterBackend import re from typing import cast, Dict, Any from django.core.mail import send_mail # local apps import from .serializers import ( KnoxTokenSerializer, UserSerializer, AuthSerializer, CustomUserSerializer, CustomReadOnlyUserSerializer, CustomReadOnlyUserByIDCardSerializer, ) class CreateUserView(generics.CreateAPIView): # Create user API view serializer_class = UserSerializer permission_classes = (permissions.AllowAny,) queryset = User.objects.all() def post(self, request, *args, **kwargs): # Extract required fields from request data password = request.data.get("password") username = request.data.get("username") # This can be None address = request.data.get("address") mobile = request.data.get("mobile") acc_no = request.data.get("acc_no") id_card = request.data.get("id_card") dob = request.data.get("dob") atoll_id = request.data.get("atoll") # Get the atoll ID island_id = request.data.get("island") # Get the island ID terms_accepted = request.data.get("terms_accepted") policy_accepted = request.data.get("policy_accepted") firstname = request.data.get("firstname") lastname = request.data.get("lastname") # Validate required fields existing_username = User.objects.filter(username=username).first() if existing_username: return Response({"message": "Username already exists."}, status=400) if not firstname: return Response({"message": "firstname is required."}, status=400) if not lastname: return Response({"message": "lastname is required."}, status=400) if not password: return Response({"message": "Password is required."}, status=400) if not username: return Response({"message": "Username is required."}, status=400) if not address: return Response({"message": "Address is required."}, status=400) if not mobile: return Response({"message": "Mobile number is required."}, status=400) if not acc_no: return Response({"message": "Account number is required."}, status=400) if not id_card: return Response({"message": "ID card is required."}, status=400) if not dob: return Response({"message": "Date of birth is required."}, status=400) if not atoll_id: return Response({"message": "Atoll is required."}, status=400) if not island_id: return Response({"message": "Island is required."}, status=400) if terms_accepted is None: return Response({"message": "Terms acceptance is required."}, status=400) if policy_accepted is None: return Response({"message": "Policy acceptance is required."}, status=400) if not re.match(r"^[A-Z]{1,2}[0-9]{6,7}$", id_card): return Response( {"message": "Please enter a valid ID card number."}, status=400 ) if not re.match(r"^[7|9][0-9]{6}$", mobile): return Response( {"message": "Please enter a valid mobile number."}, status=400 ) if not re.match(r"^(7\d{12}|9\d{16})$", acc_no): return Response( {"message": "Please enter a valid account number."}, status=400 ) # Fetch Atoll and Island instances try: atoll = Atoll.objects.get(id=atoll_id) island = Island.objects.get(id=island_id) except Atoll.DoesNotExist: return Response({"message": "Atoll not found."}, status=404) except Island.DoesNotExist: return Response({"message": "Island not found."}, status=404) # Create user without email user = User.objects.create_user( first_name=firstname, last_name=lastname, username=username, password=password, address=address, mobile=mobile, acc_no=acc_no, id_card=id_card, dob=dob, atoll=atoll, # Assign the Atoll instance island=island, # Assign the Island instance terms_accepted=terms_accepted, policy_accepted=policy_accepted, ) serializer = self.get_serializer(user) headers = self.get_success_headers(serializer.data) return Response( serializer.data, status=status.HTTP_201_CREATED, headers=headers ) class LoginView(KnoxLoginView): # login view extending KnoxLoginView serializer_class = AuthSerializer permission_classes = (permissions.AllowAny,) throttle_scope = "login" def post(self, request, format=None): try: serializer = AuthTokenSerializer(data=request.data) serializer.is_valid(raise_exception=True) user = cast(Dict[str, Any], serializer.validated_data)["user"] login(request, user) response = super(LoginView, self).post(request, format=None) return response except ValidationError as e: message = "Unable to log in with provided credentials." if ( hasattr(e, "detail") and isinstance(e.detail, list) and len(e.detail) > 0 ): message = e.detail[0] return Response({"message": message}, status=status.HTTP_400_BAD_REQUEST) class ManageUserView(generics.RetrieveUpdateAPIView): """Manage the authenticated user""" serializer_class = CustomUserSerializer permission_classes = (permissions.IsAuthenticated,) def get_object(self): """Retrieve and return authenticated user""" return self.request.user class KnoxTokenListApiView( StaffEditorPermissionMixin, generics.ListAPIView, ): # Create user API view serializer_class = KnoxTokenSerializer permission_classes = (permissions.IsAuthenticated,) queryset = AuthToken.objects.all() def get(self, request, *args, **kwargs): user_id = getattr(request.user, "id", None) if user_id is None: return Response({"error": "User ID not found"}, status=400) queryset = AuthToken.objects.filter(user_id=user_id) data = KnoxTokenSerializer(queryset, many=True).data return Response({"data": data}) class ListUserView(StaffEditorPermissionMixin, generics.ListAPIView): # Create user API view serializer_class = CustomReadOnlyUserSerializer filter_backends = [DjangoFilterBackend] filterset_fields = "__all__" filterset_class = UserFilter queryset = User.objects.all() class ListUserByIDCardView(generics.ListAPIView): # Create user API view permission_classes = (permissions.AllowAny,) serializer_class = CustomReadOnlyUserByIDCardSerializer filter_backends = [DjangoFilterBackend] filterset_fields = "__all__" filterset_class = UserFilter queryset = User.objects.all() class UserDetailAPIView(StaffEditorPermissionMixin, generics.RetrieveAPIView): queryset = User.objects.all() serializer_class = CustomReadOnlyUserSerializer lookup_field = "pk" def retrieve(self, request, *args, **kwargs): instance = self.get_object() serializer = self.get_serializer(instance) data = serializer.data # Customize the response format return Response(data) @api_view(["GET"]) def healthcheck(request): return Response({"status": "ok"}, status=status.HTTP_200_OK) @api_view(["POST"]) @permission_classes((permissions.AllowAny,)) def test_email(request): send_mail( "Subject here", "Here is the message.", "noreply@sarlink.net", ["shihaam@shihaam.me"], fail_silently=False, ) return Response({"status": "ok"}, status=status.HTTP_200_OK) class CreateAtollView(StaffEditorPermissionMixin, generics.CreateAPIView): serializer_class = AtollSerializer queryset = Atoll.objects.all() def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) name = serializer.validated_data.get("name") print(name) if Atoll.objects.filter(name=name).exists(): return Response({"message": "Atoll name already exists."}, status=400) return super().create(request, *args, **kwargs) class ListAtollView(generics.ListAPIView): permission_classes = (permissions.AllowAny,) serializer_class = AtollSerializer queryset = Atoll.objects.all() throttle_classes = () # override throttling class RetrieveUpdateDestroyAtollView( StaffEditorPermissionMixin, generics.RetrieveUpdateDestroyAPIView ): serializer_class = AtollSerializer queryset = Atoll.objects.all() lookup_field = "pk" def update(self, request, *args, **kwargs): instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=True) serializer.is_valid(raise_exception=True) name = serializer.validated_data.get("name") if name and Atoll.objects.filter(name=name).exclude(pk=instance.pk).exists(): return Response({"message": "Atoll name already exists."}, status=400) return super().update(request, *args, **kwargs) class ListCreateIslandView(StaffEditorPermissionMixin, generics.ListCreateAPIView): serializer_class = IslandSerializer queryset = Island.objects.all() def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) name = serializer.validated_data.get("name") if Island.objects.filter(name=name).exists(): return Response({"message": "Island name already exists."}, status=400) return super().create(request, *args, **kwargs) class RetrieveUpdateDestroyIslandView( StaffEditorPermissionMixin, generics.RetrieveUpdateDestroyAPIView ): serializer_class = IslandSerializer queryset = Island.objects.all() lookup_field = "pk" def update(self, request, *args, **kwargs): instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=True) serializer.is_valid(raise_exception=True) name = serializer.validated_data.get("name") if name and Island.objects.filter(name=name).exclude(pk=instance.pk).exists(): return Response({"message": "Island name already exists."}, status=400) return super().update(request, *args, **kwargs)