# Create your views here. # billing/views.py import os from datetime import timedelta import requests from django.utils import timezone from django.utils.timezone import localtime from django_filters.rest_framework import DjangoFilterBackend from rest_framework import generics, status from rest_framework.response import Response from api.mixins import StaffEditorPermissionMixin from api.tasks import add_new_devices_to_omada from apibase.env import BASE_DIR, env from django.db.models import Prefetch import logging from .utils import calculate_total_new_price from .models import Device, Payment, Topup, WalletTransaction from .serializers import ( PaymentSerializer, UpdatePaymentSerializer, TopupSerializer, WalletTransactionSerializer, ) from .filters import PaymentFilter, TopupFilter, WalletTransactionFilter from dataclasses import dataclass, asdict from typing import Optional from api.models import User from api.omada import Omada env.read_env(os.path.join(BASE_DIR, ".env")) PAYMENT_BASE_URL = env("PAYMENT_BASE_URL", default="") # type: ignore logger = logging.getLogger(__name__) @dataclass class Transaction: ref: str sourceBank: str trxDate: str @dataclass class PaymentVerificationResponse: message: str success: bool transaction: Optional[Transaction] = None class InsufficientFundsError(Exception): pass class ListCreatePaymentView(StaffEditorPermissionMixin, generics.ListCreateAPIView): serializer_class = PaymentSerializer queryset = Payment.objects.all().select_related("user") filter_backends = [DjangoFilterBackend] filterset_fields = "__all__" filterset_class = PaymentFilter def get_queryset(self): unpaid_qs = Payment.objects.filter(paid=False).order_by("-created_at") device_qs = Device.objects.prefetch_related( Prefetch("payments", queryset=unpaid_qs, to_attr="unpaid_payments") ) queryset = Payment.objects.select_related("user").prefetch_related( Prefetch("devices", queryset=device_qs) ) if not self.request.user.is_superuser: queryset = queryset.filter(user=self.request.user) return queryset def create(self, request): data = request.data user = request.user number_of_months = data.get("number_of_months") number_of_devices = 0 device_ids = data.get("device_ids", []) print(number_of_months, device_ids) current_time = timezone.now() expires_at = current_time + timedelta(minutes=10) for device_id in device_ids: device = Device.objects.filter(id=device_id, user=user).first() print("DEVICE", device) if not device: return Response( {"message": f"Device with id {device_id} not found."}, status=status.HTTP_400_BAD_REQUEST, ) number_of_devices += 1 if not number_of_months: return Response( {"message": "number_of_months is required."}, status=status.HTTP_400_BAD_REQUEST, ) if not device_ids: return Response( {"message": "device_ids are required."}, status=status.HTTP_400_BAD_REQUEST, ) amount = calculate_total_new_price( number_of_devices=number_of_devices, number_of_months=number_of_months ) payment = Payment.objects.create( amount=amount, number_of_months=number_of_months, paid=data.get("paid", False), user=user, expires_at=expires_at, ) # Connect devices to payment devices = Device.objects.filter(id__in=device_ids, user=user) for device in devices: device.has_a_pending_payment = True device.save() payment.devices.set(devices) serializer = PaymentSerializer(payment) return Response(serializer.data, status=status.HTTP_201_CREATED) def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) all_payments = request.query_params.get("all_payments", "false").lower() in [ "true", "1", "yes", ] if ( request.user.is_authenticated and getattr(request.user, "is_admin") and bool(all_payments) ): pass else: queryset = queryset.filter(user=request.user) page = self.paginate_queryset(queryset) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) class PaymentDetailAPIView(StaffEditorPermissionMixin, generics.RetrieveAPIView): queryset = Payment.objects.select_related("user").all() serializer_class = PaymentSerializer lookup_field = "pk" class UpdatePaymentAPIView(StaffEditorPermissionMixin, generics.UpdateAPIView): queryset = Payment.objects.select_related("user").prefetch_related("devices").all() serializer_class = UpdatePaymentSerializer lookup_field = "pk" def update(self, request, *args, **kwargs): instance = self.get_object() number_of_months = instance.number_of_months device_expire_date = timezone.now() + timedelta(days=30 * number_of_months) devices = instance.devices.all() serializer = self.get_serializer(instance, data=request.data, partial=False) serializer.is_valid(raise_exception=True) self.perform_update(serializer) devices.update( is_active=False, expiry_date=device_expire_date, has_a_pending_payment=False ) return Response(serializer.data) class VerifyPaymentView(StaffEditorPermissionMixin, generics.UpdateAPIView): serializer_class = PaymentSerializer queryset = Payment.objects.select_related("user").prefetch_related("devices").all() lookup_field = "pk" def update(self, request, *args, **kwargs): payment = self.get_object() devices = payment.devices.all() data = request.data user = request.user user_details = f"{user.first_name.capitalize() if user.first_name else ''} {user.last_name.capitalize() if user.last_name else ''} {user.mobile}" # type: ignore omada_client = Omada() if payment.paid: return Response( {"message": "Payment has already been verified."}, status=status.HTTP_400_BAD_REQUEST, ) if payment.user != user and not user.is_superuser: return Response( {"message": "You are not authorized to verify this payment."}, status=status.HTTP_403_FORBIDDEN, ) method = data.get("method") if not method: return Response( {"message": "method is required. 'WALLET' or 'TRANSFER'"}, status=status.HTTP_400_BAD_REQUEST, ) if method == "WALLET": if user.wallet_balance < payment.amount: # type: ignore return Response( {"message": "Insufficient funds in wallet."}, status=status.HTTP_400_BAD_REQUEST, ) else: self.process_wallet_payment( user, # type: ignore payment, devices, ) device_list = [] for device in devices: device_list.append( { "mac": device.mac, "name": f"{user_details} - {device.name}", } ) if device.registered: omada_client.block_device( mac_address=device.mac, operation="unblock" ) if not device.registered: # Add to omada add_new_devices_to_omada.defer(new_devices=device_list) device.registered = True device.save() return Response( { "status": True, "message": "Payment verified successfully using wallet.", }, status=status.HTTP_200_OK, ) if method == "TRANSFER": data = { "benefName": f"{user.first_name} {user.last_name}", # type: ignore "accountNo": user.acc_no, # type: ignore "absAmount": "{:.2f}".format(payment.amount), "time": localtime(payment.created_at).strftime("%Y-%m-%d %H:%M"), } payment_verification_response = self.verify_transfer_payment(data, payment) if payment_verification_response.success: expiry_date = timezone.now() + timedelta(days=30 * payment.number_of_months) devices.update( is_active=True, expiry_date=expiry_date, has_a_pending_payment=False, ) payment.status = "PAID" payment.save() # add to omada if its a new device and not an existing device device_list = [] for device in devices: device_list.append( { "mac": device.mac, "name": f"{user_details} - {device.name}", } ) if device.registered: omada_client.block_device( mac_address=device.mac, operation="unblock" ) if not device.registered: # Add to omada add_new_devices_to_omada.defer(new_devices=device_list) device.registered = True device.save() return Response( { "status": payment_verification_response.success, "message": payment_verification_response.message, "transaction": asdict(payment_verification_response.transaction) if payment_verification_response.transaction else None, }, status=status.HTTP_200_OK, ) else: return Response( { "status": payment_verification_response.success, "message": payment_verification_response.message or "Topup payment verification failed.", }, status=status.HTTP_400_BAD_REQUEST, ) def process_wallet_payment(self, user: User, payment: Payment, devices=None): print("processing wallet payment...") print(user, payment.amount) # Use passed devices or fetch if not provided if devices is None: devices = payment.devices.all() payment.paid = True payment.paid_at = timezone.now() payment.method = "WALLET" payment.status = "PAID" expiry_date = timezone.now() + timedelta(days=30 * payment.number_of_months) devices.update( is_active=True, expiry_date=expiry_date, has_a_pending_payment=False, ) payment.save() user.deduct_wallet_funds( payment.amount, "Wallet payment for devices", payment.id ) user.save() return True def verify_transfer_payment(self, data, payment) -> PaymentVerificationResponse: if not PAYMENT_BASE_URL: raise ValueError( "PAYMENT_BASE_URL is not set. Please set it in your environment variables." ) response = requests.post( f"{PAYMENT_BASE_URL}/verify-payment", json=data, headers={"Content-Type": "application/json"}, ) logger.info("MIB Verification Response -> ", response) try: response.raise_for_status() except requests.exceptions.HTTPError as e: logger.error(f"HTTPError: {e}") return PaymentVerificationResponse( message="Payment verification failed.", success=False, transaction=None ) mib_resp = response.json() logger.info("MIB Verification Response ->", mib_resp) if not response.json().get("success"): return PaymentVerificationResponse( message=mib_resp["message"], success=mib_resp["success"], transaction=None, ) else: payment.paid = True payment.paid_at = timezone.now() payment.method = "TRANSFER" payment.mib_reference = mib_resp["transaction"]["ref"] or "" payment.save() return PaymentVerificationResponse( message=mib_resp["message"], success=mib_resp["success"], transaction=Transaction( ref=payment.mib_reference, sourceBank=mib_resp["transaction"]["sourceBank"], trxDate=mib_resp["transaction"]["trxDate"], ), ) class CancelPaymentView(StaffEditorPermissionMixin, generics.UpdateAPIView): queryset = Payment.objects.select_related("user").all() serializer_class = PaymentSerializer lookup_field = "pk" def update(self, request, *args, **kwargs): instance = self.get_object() user = request.user if instance.status == "CANCELLED": return Response( {"message": "Payment has already been cancelled."}, status=status.HTTP_400_BAD_REQUEST, ) if instance.user != user and not user.is_superuser: return Response( {"message": "You are not authorized to cancel this payment."}, status=status.HTTP_403_FORBIDDEN, ) if instance.paid: return Response( {"message": "Paid payments cannot be cancelled."}, status=status.HTTP_400_BAD_REQUEST, ) devices = instance.devices.all() instance.status = "CANCELLED" instance.save() devices.update(is_active=False, expiry_date=None, has_a_pending_payment=False) return super().update(request, *args, **kwargs) class ListCreateTopupView(StaffEditorPermissionMixin, generics.ListCreateAPIView): queryset = Topup.objects.all().prefetch_related("user") serializer_class = TopupSerializer filter_backends = [DjangoFilterBackend] filterset_fields = "__all__" filterset_class = TopupFilter def create(self, request, *args, **kwargs): data = request.data user = request.user current_time = timezone.now() expires_at = current_time + timedelta(minutes=10) # Topup expires in 10 minutes amount = data.get("amount") if not amount: return Response( {"message": "amount is required."}, status=status.HTTP_400_BAD_REQUEST, ) topup = Topup.objects.create(amount=amount, user=user, expires_at=expires_at) serializer = TopupSerializer(topup) return Response(serializer.data, status=status.HTTP_201_CREATED) def get_queryset(self): queryset = super().get_queryset() if getattr(self.request.user, "is_admin") or self.request.user.is_superuser: return queryset return queryset.filter(user=self.request.user) def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) all_topups = request.query_params.get("all_topups", "false").lower() in [ "true", "1", "yes", ] if ( request.user.is_authenticated and getattr(request.user, "is_admin") and bool(all_topups) ): pass else: queryset = queryset.filter(user=request.user) page = self.paginate_queryset(queryset) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) class TopupDetailAPIView(StaffEditorPermissionMixin, generics.RetrieveAPIView): queryset = Topup.objects.all() serializer_class = TopupSerializer lookup_field = "pk" def get_queryset(self): queryset = super().get_queryset() if getattr(self.request.user, "is_admin") or self.request.user.is_superuser: return queryset return queryset.filter(user=self.request.user) class VerifyTopupPaymentAPIView(StaffEditorPermissionMixin, generics.UpdateAPIView): queryset = Topup.objects.all() serializer_class = TopupSerializer lookup_field = "pk" def verify_transfer_topup(self, data, topup) -> PaymentVerificationResponse: if not PAYMENT_BASE_URL: raise ValueError( "PAYMENT_BASE_URL is not set. Please set it in your environment variables." ) logger.info(data) response = requests.post( f"{PAYMENT_BASE_URL}/verify-payment", json=data, headers={"Content-Type": "application/json"}, ) try: response.raise_for_status() except requests.exceptions.HTTPError as e: logger.error(f"HTTPError: {e}") return PaymentVerificationResponse( message="Payment verification failed.", success=False, transaction=None ) mib_resp = response.json() print(mib_resp) if not response.json().get("success"): return PaymentVerificationResponse( message=mib_resp["message"], success=mib_resp["success"], transaction=None, ) else: topup.paid = True topup.mib_reference = mib_resp["transaction"]["ref"] or "" topup.paid_at = mib_resp["transaction"]["trxDate"] topup.save() return PaymentVerificationResponse( message=mib_resp["message"], success=mib_resp["success"], transaction=Transaction( ref=topup.mib_reference, sourceBank=mib_resp["transaction"]["sourceBank"], trxDate=mib_resp["transaction"]["trxDate"], ), ) def update(self, request, *args, **kwargs): topup_instance = self.get_object() user = request.user if topup_instance.paid: return Response( {"message": "Payment has already been verified."}, status=status.HTTP_400_BAD_REQUEST, ) if topup_instance.user != user and not user.is_superuser: return Response( {"message": "You are not allowed to pay for this topup."}, status=status.HTTP_403_FORBIDDEN, ) data = { "benefName": f"{user.first_name} {user.last_name}", # type: ignore "accountNo": user.acc_no, # type: ignore "absAmount": "{:.2f}".format(topup_instance.amount), "time": localtime(topup_instance.created_at).strftime("%Y-%m-%d %H:%M"), } logger.info( f"Verifying topup payment created at {localtime(topup_instance.created_at)} with data: {data}" ) topup_verification_response = self.verify_transfer_topup(data, topup_instance) print("Topup verification response:", topup_verification_response) if topup_verification_response.success: user.add_wallet_funds( # type: ignore topup_instance.amount, f"Topup of {topup_instance.amount} MVR", topup_instance.id, ) user.save() topup_instance.status = "PAID" topup_instance.save() return Response( { "status": topup_verification_response.success, "message": topup_verification_response.message, "transaction": asdict(topup_verification_response.transaction) if topup_verification_response.transaction else None, }, status=status.HTTP_200_OK, ) else: return Response( { "status": topup_verification_response.success, "message": topup_verification_response.message or "Topup payment verification failed.", }, status=status.HTTP_400_BAD_REQUEST, ) class CancelTopupView(StaffEditorPermissionMixin, generics.UpdateAPIView): queryset = Topup.objects.all().select_related("user") serializer_class = TopupSerializer lookup_field = "pk" def update(self, request, *args, **kwargs): instance = self.get_object() user = request.user if instance.status == "CANCELLED": return Response( {"message": "Topup has already been cancelled."}, status=status.HTTP_400_BAD_REQUEST, ) if instance.is_expired: return Response( {"message": "Expired topups cannot be cancelled."}, status=status.HTTP_400_BAD_REQUEST, ) if ( instance.user != user and getattr(user, "is_admin") and not user.is_superuser ): return Response( {"message": "You are not authorized to delete this topup."}, status=status.HTTP_403_FORBIDDEN, ) if instance.paid: return Response( {"message": "Paid topups cannot be deleted."}, status=status.HTTP_400_BAD_REQUEST, ) instance.status = "CANCELLED" instance.save() return super().update(request, *args, **kwargs) class AdminTopupCreateView(StaffEditorPermissionMixin, generics.CreateAPIView): queryset = Topup.objects.all().select_related("user") serializer_class = TopupSerializer def create(self, request, *args, **kwargs): data = request.data user_id = data.get("user_id") amount = data.get("amount") topup_description = "" admin_description = data.get("description", "") if not getattr(request.user, "is_admin", False): return Response( {"message": "You are not authorized to perform this action."}, status=status.HTTP_403_FORBIDDEN, ) if not user_id: return Response( {"message": "user_id is required."}, status=status.HTTP_400_BAD_REQUEST, ) if not amount: return Response( {"message": "amount is required."}, status=status.HTTP_400_BAD_REQUEST, ) user = User.objects.filter(id=user_id).first() if not user: return Response( {"message": "User not found."}, status=status.HTTP_404_NOT_FOUND, ) topup = Topup.objects.create( amount=amount, user=user, paid=True, paid_at=timezone.now(), payment_type="CASH", status="PAID", ) default_description = f"Topup of {amount} MVR (Cash)" if admin_description and admin_description.strip() != "": topup_description = admin_description.strip() else: topup_description = default_description user.add_wallet_funds(amount, topup_description, topup.id) serializer = TopupSerializer(topup) return Response(serializer.data, status=status.HTTP_201_CREATED) class ListWalletTransactionView(StaffEditorPermissionMixin, generics.ListAPIView): serializer_class = WalletTransactionSerializer queryset = WalletTransaction.objects.all().select_related("user") filter_backends = [DjangoFilterBackend] filterset_fields = "__all__" filterset_class = WalletTransactionFilter def get_queryset(self): queryset = super().get_queryset() if getattr(self.request.user, "is_admin") or self.request.user.is_superuser: return queryset return queryset.filter(user=self.request.user) def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) all_transations = request.query_params.get( "all_transations", "false" ).lower() in [ "true", "1", "yes", ] if ( request.user.is_authenticated and getattr(request.user, "is_admin") and bool(all_transations) ): pass else: queryset = queryset.filter(user=request.user) page = self.paginate_queryset(queryset) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response(serializer.data)