mirror of
https://github.com/i701/sarlink-portal-api.git
synced 2025-04-19 23:46:53 +00:00
Some checks failed
Build and Push Docker Images / Build and Push Docker Images (push) Failing after 1m19s
437 lines
15 KiB
Python
437 lines
15 KiB
Python
# django imports
|
|
from django.contrib.auth import login
|
|
|
|
# rest_framework imports
|
|
from rest_framework import generics, permissions
|
|
from rest_framework.authtoken.serializers import AuthTokenSerializer
|
|
from api.filters import UserFilter
|
|
from api.mixins import StaffEditorPermissionMixin
|
|
from api.models import User, Atoll, Island, TemporaryUser
|
|
from rest_framework.response import Response
|
|
from rest_framework import status
|
|
from rest_framework.exceptions import ValidationError
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
from api.serializers import (
|
|
AtollSerializer,
|
|
IslandSerializer,
|
|
CustomUserByWalletBalanceSerializer,
|
|
OTPVerificationSerializer,
|
|
TemporaryUserSerializer,
|
|
)
|
|
|
|
# knox imports
|
|
from knox.views import LoginView as KnoxLoginView
|
|
from knox.models import AuthToken
|
|
from django_filters.rest_framework import DjangoFilterBackend
|
|
import re
|
|
from typing import cast, Dict, Any
|
|
from django.core.mail import send_mail
|
|
from django.db.models import Q
|
|
from api.sms import send_otp
|
|
|
|
|
|
# local apps import
|
|
from .serializers import (
|
|
KnoxTokenSerializer,
|
|
AuthSerializer,
|
|
CustomUserSerializer,
|
|
CustomReadOnlyUserSerializer,
|
|
CustomReadOnlyUserByIDCardSerializer,
|
|
)
|
|
|
|
ID_CARD_PATTERN = r"^[A-Z]{1,2}[0-9]{6,7}$"
|
|
MOBILE_PATTERN = r"^[7|9][0-9]{6}$"
|
|
ACCOUNT_NUMBER_PATTERN = r"^(7\d{12}|9\d{16})$"
|
|
|
|
|
|
class ErrorMessages:
|
|
USERNAME_EXISTS = "Username already exists."
|
|
MOBILE_EXISTS = "Mobile number already exists."
|
|
INVALID_ID_CARD = "Please enter a valid ID card number."
|
|
ID_CARD_EXISTS = "ID card already exists."
|
|
INVALID_MOBILE = "Please enter a valid mobile number."
|
|
INVALID_ACCOUNT = "Please enter a valid account number."
|
|
|
|
|
|
class UpdateUserWalletView(generics.UpdateAPIView):
|
|
# Create user API view
|
|
serializer_class = CustomUserByWalletBalanceSerializer
|
|
permission_classes = (permissions.IsAuthenticated,)
|
|
queryset = User.objects.all()
|
|
lookup_field = "pk"
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
id_to_update = kwargs.get("pk")
|
|
user_id = request.user.id
|
|
print(f"User ID: {user_id}")
|
|
print(f"ID to update: {id_to_update}")
|
|
if user_id != id_to_update:
|
|
return Response(
|
|
{"message": "You are not authorized to update this user."},
|
|
status=status.HTTP_403_FORBIDDEN,
|
|
)
|
|
wallet_balance = request.data.get("wallet_balance")
|
|
if not wallet_balance:
|
|
return Response(
|
|
{"message": "wallet_balance is required."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
user = self.get_object()
|
|
user.wallet_balance = wallet_balance
|
|
user.save()
|
|
return Response({"message": "Wallet balance updated successfully."})
|
|
|
|
|
|
class CreateTemporaryUserView(generics.CreateAPIView):
|
|
# Create user API view
|
|
serializer_class = TemporaryUserSerializer
|
|
permission_classes = (permissions.AllowAny,)
|
|
queryset = TemporaryUser.objects.all()
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
# Extract required fields from request data
|
|
username = request.data.get("username")
|
|
address = request.data.get("address")
|
|
mobile = request.data.get("mobile")
|
|
acc_no = request.data.get("acc_no")
|
|
id_card = request.data.get("id_card")
|
|
dob = request.data.get("dob")
|
|
atoll_id = request.data.get("atoll")
|
|
island_id = request.data.get("island")
|
|
terms_accepted = request.data.get("terms_accepted")
|
|
policy_accepted = request.data.get("policy_accepted")
|
|
firstname = request.data.get("firstname")
|
|
lastname = request.data.get("lastname")
|
|
|
|
if TemporaryUser.objects.filter(t_mobile=mobile).exists():
|
|
return Response({"message": ErrorMessages.MOBILE_EXISTS}, status=400)
|
|
|
|
if TemporaryUser.objects.filter(t_username=username).exists():
|
|
return Response({"message": ErrorMessages.USERNAME_EXISTS}, status=400)
|
|
|
|
if TemporaryUser.objects.filter(t_id_card=id_card).exists():
|
|
return Response({"message": ErrorMessages.ID_CARD_EXISTS}, status=400)
|
|
|
|
if id_card and not re.match(ID_CARD_PATTERN, id_card):
|
|
return Response({"message": ErrorMessages.INVALID_ID_CARD}, status=400)
|
|
|
|
if TemporaryUser.objects.filter(t_id_card=id_card).exists():
|
|
return Response({"message": "ID card already exists."}, status=400)
|
|
|
|
if mobile is None or not re.match(MOBILE_PATTERN, mobile):
|
|
return Response({"message": ErrorMessages.INVALID_MOBILE}, status=400)
|
|
|
|
if acc_no is None or not re.match(ACCOUNT_NUMBER_PATTERN, acc_no):
|
|
return Response({"message": ErrorMessages.INVALID_ACCOUNT}, status=400)
|
|
|
|
# Validate required fields first
|
|
validation_error = self.validate_required_fields(request.data)
|
|
if validation_error:
|
|
return validation_error
|
|
|
|
# Fetch Atoll and Island instances
|
|
try:
|
|
atoll = Atoll.objects.get(id=atoll_id)
|
|
island = Island.objects.get(id=island_id)
|
|
except Atoll.DoesNotExist:
|
|
return Response({"message": "Atoll not found."}, status=404)
|
|
except Island.DoesNotExist:
|
|
return Response({"message": "Island not found."}, status=404)
|
|
|
|
# Create user
|
|
temp_user = TemporaryUser.objects.create(
|
|
t_first_name=firstname,
|
|
t_last_name=lastname,
|
|
t_username=str(username),
|
|
t_email=None,
|
|
t_address=address,
|
|
t_mobile=mobile,
|
|
t_acc_no=acc_no,
|
|
t_id_card=id_card,
|
|
t_dob=dob,
|
|
t_atoll=atoll,
|
|
t_island=island,
|
|
t_terms_accepted=terms_accepted,
|
|
t_policy_accepted=policy_accepted,
|
|
)
|
|
otp = temp_user.generate_otp()
|
|
send_otp(
|
|
temp_user.t_mobile,
|
|
otp,
|
|
f"Your Registration SARLink OTP is: {otp}, valid for 30 seconds. Please do not share it with anyone.",
|
|
)
|
|
serializer = self.get_serializer(temp_user)
|
|
headers = self.get_success_headers(serializer.data)
|
|
return Response(
|
|
serializer.data, status=status.HTTP_201_CREATED, headers=headers
|
|
)
|
|
|
|
def validate_required_fields(self, data):
|
|
required_fields = {
|
|
"firstname": "First name",
|
|
"lastname": "Last name",
|
|
"username": "Username",
|
|
"address": "Address",
|
|
"mobile": "Mobile number",
|
|
"acc_no": "Account number",
|
|
"id_card": "ID card",
|
|
"dob": "Date of birth",
|
|
"atoll": "Atoll",
|
|
"island": "Island",
|
|
}
|
|
|
|
for field, label in required_fields.items():
|
|
if not data.get(field):
|
|
return Response({"message": f"{label} is required."}, status=400)
|
|
|
|
if data.get("terms_accepted") is None:
|
|
return Response({"message": "Terms acceptance is required."}, status=400)
|
|
if data.get("policy_accepted") is None:
|
|
return Response({"message": "Policy acceptance is required."}, status=400)
|
|
|
|
return None
|
|
|
|
|
|
class VerifyOTPView(generics.GenericAPIView):
|
|
permission_classes = (permissions.AllowAny,)
|
|
serializer_class = OTPVerificationSerializer
|
|
|
|
def post(self, request):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
data = request.data
|
|
print(data)
|
|
try:
|
|
temp_user = TemporaryUser.objects.get(t_mobile=data["mobile"])
|
|
except TemporaryUser.DoesNotExist:
|
|
return Response({"message": "User not found."}, status=404)
|
|
|
|
if temp_user.is_expired():
|
|
return Response({"message": "OTP expired."}, status=400)
|
|
|
|
if not temp_user.verify_otp(data["otp"]):
|
|
return Response({"message": "Invalid OTP."}, status=400)
|
|
|
|
# Create real user
|
|
User.objects.create_user(
|
|
first_name=temp_user.t_first_name,
|
|
last_name=temp_user.t_last_name,
|
|
username=temp_user.t_username,
|
|
password="",
|
|
address=temp_user.t_address,
|
|
mobile=temp_user.t_mobile,
|
|
acc_no=temp_user.t_acc_no,
|
|
id_card=temp_user.t_id_card,
|
|
dob=temp_user.t_dob,
|
|
atoll=temp_user.t_atoll,
|
|
island=temp_user.t_island,
|
|
terms_accepted=temp_user.t_terms_accepted,
|
|
policy_accepted=temp_user.t_policy_accepted,
|
|
)
|
|
|
|
# You can now trigger registry verification as a signal or task
|
|
temp_user.otp_verified = True
|
|
temp_user.save()
|
|
|
|
return Response({"message": "User created successfully."})
|
|
|
|
|
|
class LoginView(KnoxLoginView):
|
|
# login view extending KnoxLoginView
|
|
serializer_class = AuthSerializer
|
|
permission_classes = (permissions.AllowAny,)
|
|
throttle_scope = "login"
|
|
|
|
def post(self, request, format=None):
|
|
try:
|
|
serializer = AuthTokenSerializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
user = cast(Dict[str, Any], serializer.validated_data)["user"]
|
|
login(request, user)
|
|
response = super(LoginView, self).post(request, format=None)
|
|
return response
|
|
except ValidationError as e:
|
|
message = "Unable to log in with provided credentials."
|
|
if (
|
|
hasattr(e, "detail")
|
|
and isinstance(e.detail, list)
|
|
and len(e.detail) > 0
|
|
):
|
|
message = e.detail[0]
|
|
return Response({"message": message}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class ManageUserView(generics.RetrieveUpdateAPIView):
|
|
"""Manage the authenticated user"""
|
|
|
|
serializer_class = CustomUserSerializer
|
|
permission_classes = (permissions.IsAuthenticated,)
|
|
|
|
def get_object(self):
|
|
"""Retrieve and return authenticated user"""
|
|
return self.request.user
|
|
|
|
|
|
class KnoxTokenListApiView(
|
|
StaffEditorPermissionMixin,
|
|
generics.ListAPIView,
|
|
):
|
|
# Create user API view
|
|
serializer_class = KnoxTokenSerializer
|
|
permission_classes = (permissions.IsAuthenticated,)
|
|
queryset = AuthToken.objects.all()
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
user_id = getattr(request.user, "id", None)
|
|
if user_id is None:
|
|
return Response({"error": "User ID not found"}, status=400)
|
|
queryset = AuthToken.objects.filter(user_id=user_id)
|
|
data = KnoxTokenSerializer(queryset, many=True).data
|
|
return Response({"data": data})
|
|
|
|
|
|
class ListUserView(StaffEditorPermissionMixin, generics.ListAPIView):
|
|
# Create user API view
|
|
serializer_class = CustomReadOnlyUserSerializer
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_fields = "__all__"
|
|
filterset_class = UserFilter
|
|
queryset = User.objects.all()
|
|
|
|
|
|
@api_view(["GET"])
|
|
def filter_user(request):
|
|
id_card = request.GET.get("id_card", "").strip() or None
|
|
mobile = request.GET.get("mobile", "").strip() or None
|
|
|
|
if not id_card and not mobile:
|
|
return Response({"ok": False})
|
|
|
|
filters = Q()
|
|
if id_card is not None:
|
|
filters |= Q(id_card=id_card)
|
|
if mobile is not None:
|
|
filters |= Q(mobile=mobile)
|
|
|
|
user = User.objects.filter(filters).first()
|
|
|
|
print(f"Querying with filters: {filters}")
|
|
print(f"Found user: {user}")
|
|
|
|
return Response(
|
|
{"ok": True, "verified": user.verified}
|
|
if user
|
|
else {"ok": False, "verified": False}
|
|
)
|
|
|
|
|
|
class ListUserByIDCardView(generics.ListAPIView):
|
|
# Create user API view
|
|
permission_classes = (permissions.AllowAny,)
|
|
serializer_class = CustomReadOnlyUserByIDCardSerializer
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_fields = "__all__"
|
|
filterset_class = UserFilter
|
|
queryset = User.objects.all()
|
|
|
|
|
|
class UserDetailAPIView(StaffEditorPermissionMixin, generics.RetrieveAPIView):
|
|
queryset = User.objects.all()
|
|
serializer_class = CustomReadOnlyUserSerializer
|
|
lookup_field = "pk"
|
|
|
|
def retrieve(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
serializer = self.get_serializer(instance)
|
|
data = serializer.data
|
|
|
|
# Customize the response format
|
|
|
|
return Response(data)
|
|
|
|
|
|
@api_view(["GET"])
|
|
def healthcheck(request):
|
|
return Response({"status": "Good"}, status=status.HTTP_200_OK)
|
|
|
|
|
|
@api_view(["POST"])
|
|
@permission_classes((permissions.AllowAny,))
|
|
def test_email(request):
|
|
send_mail(
|
|
"Subject here",
|
|
"Here is the message.",
|
|
"noreply@sarlink.net",
|
|
["shihaam@shihaam.me"],
|
|
fail_silently=False,
|
|
)
|
|
return Response({"status": "ok"}, status=status.HTTP_200_OK)
|
|
|
|
|
|
class CreateAtollView(StaffEditorPermissionMixin, generics.CreateAPIView):
|
|
serializer_class = AtollSerializer
|
|
queryset = Atoll.objects.all()
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
name = serializer.validated_data.get("name")
|
|
print(name)
|
|
if Atoll.objects.filter(name=name).exists():
|
|
return Response({"message": "Atoll name already exists."}, status=400)
|
|
return super().create(request, *args, **kwargs)
|
|
|
|
|
|
class ListAtollView(generics.ListAPIView):
|
|
permission_classes = (permissions.AllowAny,)
|
|
serializer_class = AtollSerializer
|
|
queryset = Atoll.objects.all()
|
|
throttle_classes = () # override throttling
|
|
|
|
|
|
class RetrieveUpdateDestroyAtollView(
|
|
StaffEditorPermissionMixin, generics.RetrieveUpdateDestroyAPIView
|
|
):
|
|
serializer_class = AtollSerializer
|
|
queryset = Atoll.objects.all()
|
|
lookup_field = "pk"
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
serializer = self.get_serializer(instance, data=request.data, partial=True)
|
|
serializer.is_valid(raise_exception=True)
|
|
name = serializer.validated_data.get("name")
|
|
if name and Atoll.objects.filter(name=name).exclude(pk=instance.pk).exists():
|
|
return Response({"message": "Atoll name already exists."}, status=400)
|
|
return super().update(request, *args, **kwargs)
|
|
|
|
|
|
class ListCreateIslandView(StaffEditorPermissionMixin, generics.ListCreateAPIView):
|
|
serializer_class = IslandSerializer
|
|
queryset = Island.objects.all()
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
name = serializer.validated_data.get("name")
|
|
if Island.objects.filter(name=name).exists():
|
|
return Response({"message": "Island name already exists."}, status=400)
|
|
return super().create(request, *args, **kwargs)
|
|
|
|
|
|
class RetrieveUpdateDestroyIslandView(
|
|
StaffEditorPermissionMixin, generics.RetrieveUpdateDestroyAPIView
|
|
):
|
|
serializer_class = IslandSerializer
|
|
queryset = Island.objects.all()
|
|
lookup_field = "pk"
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
serializer = self.get_serializer(instance, data=request.data, partial=True)
|
|
serializer.is_valid(raise_exception=True)
|
|
name = serializer.validated_data.get("name")
|
|
if name and Island.objects.filter(name=name).exclude(pk=instance.pk).exists():
|
|
return Response({"message": "Island name already exists."}, status=400)
|
|
return super().update(request, *args, **kwargs)
|