# django imports from django.contrib.auth import login # rest_framework imports from django.core.exceptions import ObjectDoesNotExist from rest_framework import generics, permissions from rest_framework.authtoken.serializers import AuthTokenSerializer from api.filters import UserFilter from api.mixins import StaffEditorPermissionMixin from api.models import User, Atoll, Island, TemporaryUser from api.notifications import send_sms from rest_framework.response import Response from rest_framework import status from rest_framework.exceptions import ValidationError from rest_framework.decorators import api_view, permission_classes from api.serializers import ( AtollSerializer, IslandSerializer, OTPVerificationSerializer, TemporaryUserSerializer, UserUpdateSerializer, UserAgreementSerializer, ) from django.shortcuts import get_object_or_404 from django.utils import timezone # knox imports from knox.views import LoginView as KnoxLoginView from knox.models import AuthToken from django_filters.rest_framework import DjangoFilterBackend from typing import cast, Dict, Any from django.core.mail import send_mail from django.db.models import Q from api.notifications import send_otp from .utils import check_person_api_verification import uuid from .helpers import ErrorMessages, validate_required_fields, validate_unique_fields, validate_patterns, calculate_age # local apps import from .serializers import ( KnoxTokenSerializer, AuthSerializer, CustomUserSerializer, CustomReadOnlyUserSerializer, UserProfileUpdateSerializer, ) @api_view(["GET"]) def healthcheck(request): return Response({"status": "Good"}, status=status.HTTP_200_OK) class CreateTemporaryUserView(generics.CreateAPIView): serializer_class = TemporaryUserSerializer permission_classes = (permissions.AllowAny,) queryset = TemporaryUser.objects.all() throttle_classes = [] def post(self, request, *args, **kwargs): # Extract data once data = request.data # Validate required fields required_error = validate_required_fields(data) if required_error: return required_error # Parse DOB dob_str = data.get("dob") try: dob = timezone.datetime.strptime(str(dob_str), "%Y-%m-%d").date() # pyright: ignore[reportAttributeAccessIssue] except ValueError: return Response({"message": "Invalid date format for DOB. Use YYYY-MM-DD."}, status=400) # Check age age = calculate_age(dob) if age < 18: return Response({"message": ErrorMessages.UNDERAGE_ERROR}, status=400) # Validate uniqueness uniqueness_error = validate_unique_fields( username=data.get("username"), mobile=data.get("mobile"), id_card=data.get("id_card"), ) if uniqueness_error: return uniqueness_error # Validate patterns pattern_error = validate_patterns( id_card=data.get("id_card"), mobile=data.get("mobile"), acc_no=data.get("acc_no"), ) if pattern_error: return pattern_error # Fetch related objects atoll_id = data.get("atoll") island_id = data.get("island") try: atoll = Atoll.objects.get(id=atoll_id) island = Island.objects.get(id=island_id) except ObjectDoesNotExist as e: model_name = "Atoll" if isinstance(e, Atoll.DoesNotExist) else "Island" return Response({"message": f"{model_name} not found."}, status=404) # Create user temp_user = TemporaryUser.objects.create( t_first_name=data.get("firstname"), t_last_name=data.get("lastname"), t_username=str(data.get("username")), t_email=None, t_address=data.get("address"), t_mobile=data.get("mobile"), t_acc_no=data.get("acc_no"), t_id_card=data.get("id_card"), t_dob=dob, t_atoll=atoll, t_island=island, t_terms_accepted=data.get("terms_accepted"), t_policy_accepted=data.get("policy_accepted"), ) # Generate and send OTP otp_expiry = timezone.now() + timezone.timedelta(minutes=3) #type: ignore formatted_time = otp_expiry.strftime("%d/%m/%Y %H:%M:%S") otp = temp_user.generate_otp() send_otp( str(temp_user.t_mobile), f"Your Registration SARLink OTP: {otp}. \nExpires at {formatted_time}. \n\n- SAR Link", ) # Return success serializer = self.get_serializer(temp_user) headers = self.get_success_headers(serializer.data) return Response( serializer.data, status=status.HTTP_201_CREATED, headers=headers ) class VerifyOTPView(generics.GenericAPIView): permission_classes = (permissions.AllowAny,) serializer_class = OTPVerificationSerializer throttle_classes = [] def post(self, request): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) data = request.data mobile = data.get("mobile") t_user = get_object_or_404(TemporaryUser, t_mobile=mobile) if not t_user: return Response({"message": "User not found."}, status=404) if User.objects.filter(username=t_user.t_username).exists(): return Response( {"message": "User with this mobile number already registered."}, status=400, ) try: temp_user = TemporaryUser.objects.get(t_mobile=data["mobile"]) except TemporaryUser.DoesNotExist: return Response({"message": "User not found."}, status=404) if temp_user.is_expired(): return Response({"message": "OTP expired."}, status=400) if not temp_user.verify_otp(data["otp"]): return Response({"message": "Invalid OTP."}, status=400) # Create real user User.objects.create_user( first_name=temp_user.t_first_name, last_name=temp_user.t_last_name, username=str(temp_user.t_username), password="", address=temp_user.t_address, mobile=temp_user.t_mobile, acc_no=temp_user.t_acc_no, id_card=temp_user.t_id_card, dob=temp_user.t_dob, atoll=temp_user.t_atoll, island=temp_user.t_island, terms_accepted=temp_user.t_terms_accepted, policy_accepted=temp_user.t_policy_accepted, ) # You can now trigger registry verification as a signal or task temp_user.otp_verified = True temp_user.save() return Response({"message": "User created successfully."}) class LoginView(KnoxLoginView): # login view extending KnoxLoginView serializer_class = AuthSerializer permission_classes = (permissions.AllowAny,) throttle_scope = "login" def post(self, request, format=None): try: serializer = AuthTokenSerializer(data=request.data) serializer.is_valid(raise_exception=True) user = cast(Dict[str, Any], serializer.validated_data)["user"] login(request, user) response = super(LoginView, self).post(request, format=None) return response except ValidationError as e: message = "Unable to log in with provided credentials." if ( hasattr(e, "detail") and isinstance(e.detail, list) and len(e.detail) > 0 ): message = e.detail[0] return Response({"message": message}, status=status.HTTP_400_BAD_REQUEST) class UserprofileAPIView(generics.RetrieveUpdateAPIView): """Retrieve user api view""" queryset = User.objects.all() permission_classes = (permissions.IsAuthenticated,) def get_serializer_class(self): """Return the serializer class based on the request method""" if self.request.method == "GET": return CustomReadOnlyUserSerializer elif self.request.method == "PUT" or self.request.method == "PATCH": return UserProfileUpdateSerializer return super().get_serializer_class() def get_object(self): """Retrieve and return authenticated user""" return self.request.user class UserUpdateAPIView(StaffEditorPermissionMixin, generics.UpdateAPIView): serializer_class = UserUpdateSerializer queryset = User.objects.all() lookup_field = "pk" def update(self, request, *args, **kwargs): user_id = kwargs.get("pk") user = get_object_or_404(User, pk=user_id) if user.is_superuser: return Response( {"message": "You cannot update a superuser."}, status=status.HTTP_403_FORBIDDEN, ) if request.user != user and ( not request.user.is_authenticated or not getattr(request.user, "is_admin", False) ): return Response( {"message": "You are not authorized to update this user."}, status=status.HTTP_403_FORBIDDEN, ) serializer = self.get_serializer( user, data=request.data, partial=True, ) serializer.is_valid(raise_exception=True) user.save() return super().update(request, *args, **kwargs) class AgreementUpdateAPIView(StaffEditorPermissionMixin, generics.UpdateAPIView): serializer_class = UserAgreementSerializer queryset = User.objects.all() lookup_field = "pk" def update(self, request, *args, **kwargs): user_id = kwargs.get("pk") user = get_object_or_404(User, pk=user_id) if user.is_superuser: return Response( {"message": "You cannot update a superuser."}, status=status.HTTP_403_FORBIDDEN, ) if request.user != user and ( not request.user.is_authenticated or not getattr(request.user, "is_admin", False) ): return Response( {"message": "You are not authorized to update this user."}, status=status.HTTP_403_FORBIDDEN, ) serializer = self.get_serializer( user, data=request.data, partial=True, ) agreement = request.data.get("agreement") if not agreement: return Response( {"message": "Agreement file is required."}, status=status.HTTP_400_BAD_REQUEST, ) if agreement.size > 10 * 1024 * 1024: # 5 MB limit return Response( {"message": "File size exceeds 10 MB limit."}, status=status.HTTP_400_BAD_REQUEST, ) if agreement.content_type not in [ "application/pdf", ]: return Response( {"message": "Invalid file type. Only PDF files are allowed."}, status=status.HTTP_400_BAD_REQUEST, ) # rename the file name to a random UUID followed by user_id agreement.name = f"{uuid.uuid4()}_{user_id}_agreement.pdf" if agreement: user.agreement = agreement serializer.is_valid(raise_exception=True) user.save() return super().update(request, *args, **kwargs) class KnoxTokenListApiView( StaffEditorPermissionMixin, generics.ListAPIView, ): # Create user API view serializer_class = KnoxTokenSerializer permission_classes = (permissions.IsAuthenticated,) queryset = AuthToken.objects.all() def get(self, request, *args, **kwargs): user_id = getattr(request.user, "id", None) if user_id is None: return Response({"error": "User ID not found"}, status=400) queryset = AuthToken.objects.filter(user_id=user_id) data = KnoxTokenSerializer(queryset, many=True).data return Response({"data": data}) class ListUserView(StaffEditorPermissionMixin, generics.ListAPIView): serializer_class = CustomReadOnlyUserSerializer filter_backends = [DjangoFilterBackend] #type: ignore filterset_fields = "__all__" filterset_class = UserFilter queryset = User.objects.all() def get_queryset(self): user = self.request.user if user.is_authenticated and getattr(user, "is_admin"): return User.objects.filter(is_superuser=False) return User.objects.none() class UserVerifyAPIView(StaffEditorPermissionMixin, generics.UpdateAPIView): serializer_class = CustomUserSerializer queryset = User.objects.all() lookup_field = "pk" def update(self, request, *args, **kwargs): user_id = kwargs.get("pk") user = get_object_or_404(User, pk=user_id) if request.user != user and ( not request.user.is_authenticated or not getattr(request.user, "is_admin", False) ): return Response( {"message": "You are not authorized to update this user."}, status=status.HTTP_403_FORBIDDEN, ) if user.verified: return Response( {"message": "User is already verified."}, status=status.HTTP_400_BAD_REQUEST, ) serializer = self.get_serializer(user, data=request.data, partial=True) serializer.is_valid(raise_exception=True) result = check_person_api_verification(user_data=user, id_card=user.id_card) # The verification system might not have the records of every user hence can be skipped if not found and verify directly. if result.get("error") == "Not Found": user.verified = True user.save() return Response( { "message": "User not found in the verification system. User marked as verified." }, status=status.HTTP_404_NOT_FOUND, ) if not result["ok"]: return Response( result, status=status.HTTP_404_NOT_FOUND, ) if result["mismatch_fields"]: return Response( result, status=status.HTTP_400_BAD_REQUEST, ) user.verified = True user.save() return Response({"message": "User successfully verified."}) class UserRejectAPIView(StaffEditorPermissionMixin, generics.DestroyAPIView): serializer_class = CustomUserSerializer queryset = User.objects.all() lookup_field = "pk" def destroy(self, request, *args, **kwargs): rejection_details = request.data.get("rejection_details", "") if not rejection_details: return Response( {"message": "Rejection details are required."}, status=status.HTTP_400_BAD_REQUEST, ) user_id = kwargs.get("pk") user = get_object_or_404(User, pk=user_id) mobile_number = user.mobile if not mobile_number: return Response( {"message": "User does not have a mobile number."}, status=status.HTTP_400_BAD_REQUEST, ) if user.is_superuser: return Response( {"message": "You cannot remove a superuser."}, status=status.HTTP_403_FORBIDDEN, ) if request.user != user and ( not request.user.is_authenticated or not getattr(request.user, "is_admin", False) ): return Response( {"message": "You are not authorized to reject this user."}, status=status.HTTP_403_FORBIDDEN, ) user.delete() t_user = get_object_or_404(TemporaryUser, t_mobile=user.mobile) t_user.delete() send_sms(message=rejection_details, mobile=mobile_number) return Response( {"message": "User successfully rejected."}, status=status.HTTP_204_NO_CONTENT, ) @api_view(["GET"]) def filter_user(request): id_card = request.GET.get("id_card", "").strip() or None mobile = request.GET.get("mobile", "").strip() or None if not id_card and not mobile: return Response({"ok": False}) filters = Q() if id_card and mobile: filters = Q(id_card=id_card) & Q(mobile=mobile) elif id_card: filters = Q(id_card=id_card) elif mobile: filters = Q(mobile=mobile) user = User.objects.only("id", "verified").filter(filters).first() print(f"Querying with filters: {filters}") print(f"Found user: {user}") return Response( {"ok": True, "verified": user.verified} if user else {"ok": False, "verified": False} ) @api_view(["GET"]) def filter_temporary_user(request): id_card = request.GET.get("id_card", "").strip() or None mobile = request.GET.get("mobile", "").strip() or None if not id_card and not mobile: return Response({"ok": False}) filters = Q() if id_card and mobile: filters |= Q(t_id_card=id_card) & Q(t_mobile=mobile) elif id_card: filters |= Q(t_id_card=id_card) elif mobile: filters |= Q(t_mobile=mobile) user = ( TemporaryUser.objects.only("t_id", "otp_verified", "t_verified") .filter(filters) .first() ) print(f"Querying with filters: {filters}") print(f"Found temporary user: {user}") return Response( {"ok": True, "otp_verified": user.otp_verified, "t_verified": user.t_verified} if user else {"ok": False, "otp_verified": False, "t_verified": False} ) class UserDetailAPIView(StaffEditorPermissionMixin, generics.RetrieveAPIView): queryset = User.objects.all() serializer_class = CustomReadOnlyUserSerializer lookup_field = "pk" def retrieve(self, request, *args, **kwargs): instance = self.get_object() user = request.user if ( user != instance and not getattr(user, "is_admin", False) and not user.is_superuser #type: ignore ): return Response( {"message": "You are not authorized to view this user's details."}, status=status.HTTP_403_FORBIDDEN, ) serializer = self.get_serializer(instance) data = serializer.data # Customize the response format return Response(data) @api_view(["POST"]) @permission_classes((permissions.AllowAny,)) def test_email(request): send_mail( "Subject here", "Here is the message.", "noreply@sarlink.net", ["shihaam@shihaam.me"], fail_silently=False, ) return Response({"status": "ok"}, status=status.HTTP_200_OK) class CreateAtollView(StaffEditorPermissionMixin, generics.CreateAPIView): serializer_class = AtollSerializer queryset = Atoll.objects.all() def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) name = serializer.validated_data.get("name") print(name) if Atoll.objects.filter(name=name).exists(): return Response({"message": "Atoll name already exists."}, status=400) return super().create(request, *args, **kwargs) class ListAtollView(generics.ListAPIView): permission_classes = (permissions.AllowAny,) serializer_class = AtollSerializer queryset = Atoll.objects.all() throttle_classes = () # override throttling class RetrieveUpdateDestroyAtollView( StaffEditorPermissionMixin, generics.RetrieveUpdateDestroyAPIView ): serializer_class = AtollSerializer queryset = Atoll.objects.all() lookup_field = "pk" def update(self, request, *args, **kwargs): instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=True) serializer.is_valid(raise_exception=True) name = serializer.validated_data.get("name") if name and Atoll.objects.filter(name=name).exclude(pk=instance.pk).exists(): return Response({"message": "Atoll name already exists."}, status=400) return super().update(request, *args, **kwargs) class ListCreateIslandView(StaffEditorPermissionMixin, generics.ListCreateAPIView): serializer_class = IslandSerializer queryset = Island.objects.all() def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) name = serializer.validated_data.get("name") if Island.objects.filter(name=name).exists(): return Response({"message": "Island name already exists."}, status=400) return super().create(request, *args, **kwargs) class RetrieveUpdateDestroyIslandView( StaffEditorPermissionMixin, generics.RetrieveUpdateDestroyAPIView ): serializer_class = IslandSerializer queryset = Island.objects.all() lookup_field = "pk" def update(self, request, *args, **kwargs): instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=True) serializer.is_valid(raise_exception=True) name = serializer.validated_data.get("name") if name and Island.objects.filter(name=name).exclude(pk=instance.pk).exists(): return Response({"message": "Island name already exists."}, status=400) return super().update(request, *args, **kwargs)