Files
sarlink-portal-api/billing/views.py
i701 9c082aedf2
All checks were successful
Build and Push Docker Images / Build and Push Docker Images (push) Successful in 4m38s
feat(telegram): implement asynchronous Telegram alert system and enhance user verification messaging
2025-07-27 21:56:59 +05:00

723 lines
27 KiB
Python

# Create your views here.
# billing/views.py
import os
from datetime import timedelta
import requests
from django.utils import timezone
from django.utils.timezone import localtime
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import generics, status
from rest_framework.response import Response
from api.mixins import StaffEditorPermissionMixin
from api.tasks import add_new_devices_to_omada
from apibase.env import BASE_DIR, env
from django.db.models import Prefetch
import logging
from .utils import calculate_total_new_price
from .models import Device, Payment, Topup, WalletTransaction
from .serializers import (
PaymentSerializer,
UpdatePaymentSerializer,
TopupSerializer,
WalletTransactionSerializer,
)
from .filters import PaymentFilter, TopupFilter, WalletTransactionFilter
from dataclasses import dataclass, asdict
from typing import Optional
from api.models import User
from api.omada import Omada
# from api.bot import send_telegram_alert, telegram_loop, escape_markdown_v2
# import asyncio
env.read_env(os.path.join(BASE_DIR, ".env"))
PAYMENT_BASE_URL = env("PAYMENT_BASE_URL", default="") # type: ignore
logger = logging.getLogger(__name__)
@dataclass
class Transaction:
ref: str
sourceBank: str
trxDate: str
@dataclass
class PaymentVerificationResponse:
message: str
success: bool
transaction: Optional[Transaction] = None
class InsufficientFundsError(Exception):
pass
class ListCreatePaymentView(StaffEditorPermissionMixin, generics.ListCreateAPIView):
serializer_class = PaymentSerializer
queryset = Payment.objects.all().select_related("user")
filter_backends = [DjangoFilterBackend]
filterset_fields = "__all__"
filterset_class = PaymentFilter
def get_queryset(self):
unpaid_qs = Payment.objects.filter(paid=False).order_by("-created_at")
device_qs = Device.objects.prefetch_related(
Prefetch("payments", queryset=unpaid_qs, to_attr="unpaid_payments")
)
queryset = Payment.objects.select_related("user").prefetch_related(
Prefetch("devices", queryset=device_qs)
)
if not self.request.user.is_superuser:
queryset = queryset.filter(user=self.request.user)
return queryset
def create(self, request):
data = request.data
user = request.user
number_of_months = data.get("number_of_months")
number_of_devices = 0
device_ids = data.get("device_ids", [])
print(number_of_months, device_ids)
current_time = timezone.now()
expires_at = current_time + timedelta(minutes=10)
for device_id in device_ids:
device = Device.objects.filter(id=device_id, user=user).first()
print("DEVICE", device)
if not device:
return Response(
{"message": f"Device with id {device_id} not found."},
status=status.HTTP_400_BAD_REQUEST,
)
number_of_devices += 1
if not number_of_months:
return Response(
{"message": "number_of_months is required."},
status=status.HTTP_400_BAD_REQUEST,
)
if not device_ids:
return Response(
{"message": "device_ids are required."},
status=status.HTTP_400_BAD_REQUEST,
)
amount = calculate_total_new_price(
number_of_devices=number_of_devices, number_of_months=number_of_months
)
payment = Payment.objects.create(
amount=amount,
number_of_months=number_of_months,
paid=data.get("paid", False),
user=user,
expires_at=expires_at,
)
# Connect devices to payment
devices = Device.objects.filter(id__in=device_ids, user=user)
for device in devices:
device.has_a_pending_payment = True
device.save()
payment.devices.set(devices)
serializer = PaymentSerializer(payment)
return Response(serializer.data, status=status.HTTP_201_CREATED)
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
all_payments = request.query_params.get("all_payments", "false").lower() in [
"true",
"1",
"yes",
]
if (
request.user.is_authenticated
and getattr(request.user, "is_admin")
and bool(all_payments)
):
pass
else:
queryset = queryset.filter(user=request.user)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class PaymentDetailAPIView(StaffEditorPermissionMixin, generics.RetrieveAPIView):
queryset = Payment.objects.select_related("user").all()
serializer_class = PaymentSerializer
lookup_field = "pk"
class UpdatePaymentAPIView(StaffEditorPermissionMixin, generics.UpdateAPIView):
queryset = Payment.objects.select_related("user").prefetch_related("devices").all()
serializer_class = UpdatePaymentSerializer
lookup_field = "pk"
def update(self, request, *args, **kwargs):
instance = self.get_object()
number_of_months = instance.number_of_months
device_expire_date = timezone.now() + timedelta(days=30 * number_of_months)
devices = instance.devices.all()
serializer = self.get_serializer(instance, data=request.data, partial=False)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
devices.update(
is_active=False, expiry_date=device_expire_date, has_a_pending_payment=False
)
return Response(serializer.data)
class VerifyPaymentView(StaffEditorPermissionMixin, generics.UpdateAPIView):
serializer_class = PaymentSerializer
queryset = Payment.objects.select_related("user").prefetch_related("devices").all()
lookup_field = "pk"
def update(self, request, *args, **kwargs):
payment = self.get_object()
devices = payment.devices.all()
data = request.data
user = request.user
user_details = f"{user.first_name.capitalize() if user.first_name else ''} {user.last_name.capitalize() if user.last_name else ''} {user.mobile}" # type: ignore
omada_client = Omada()
if payment.paid:
return Response(
{"message": "Payment has already been verified."},
status=status.HTTP_400_BAD_REQUEST,
)
if payment.user != user and not user.is_superuser:
return Response(
{"message": "You are not authorized to verify this payment."},
status=status.HTTP_403_FORBIDDEN,
)
method = data.get("method")
if not method:
return Response(
{"message": "method is required. 'WALLET' or 'TRANSFER'"},
status=status.HTTP_400_BAD_REQUEST,
)
if method == "WALLET":
if user.wallet_balance < payment.amount: # type: ignore
return Response(
{"message": "Insufficient funds in wallet."},
status=status.HTTP_400_BAD_REQUEST,
)
else:
self.process_wallet_payment(
user, # type: ignore
payment,
devices,
)
device_list = []
for device in devices:
device_list.append(
{
"mac": device.mac,
"name": f"{user_details} - {device.name}",
}
)
if device.registered:
omada_client.block_device(
mac_address=device.mac, operation="unblock"
)
if not device.registered:
# Add to omada
add_new_devices_to_omada.defer(new_devices=device_list)
device.registered = True
device.save()
return Response(
{
"status": True,
"message": "Payment verified successfully using wallet.",
},
status=status.HTTP_200_OK,
)
if method == "TRANSFER":
data = {
"benefName": f"{user.first_name} {user.last_name}", # type: ignore
"accountNo": user.acc_no, # type: ignore
"absAmount": "{:.2f}".format(payment.amount),
"time": localtime(payment.created_at).strftime("%Y-%m-%d %H:%M"),
}
payment_verification_response = self.verify_transfer_payment(data, payment)
if payment_verification_response.success:
expiry_date = timezone.now() + timedelta(days=30 * payment.number_of_months)
devices.update(
is_active=True,
expiry_date=expiry_date,
has_a_pending_payment=False,
)
payment.status = "PAID"
payment.save()
# add to omada if its a new device and not an existing device
device_list = []
for device in devices:
device_list.append(
{
"mac": device.mac,
"name": f"{user_details} - {device.name}",
}
)
if device.registered:
omada_client.block_device(
mac_address=device.mac, operation="unblock"
)
if not device.registered:
# Add to omada
add_new_devices_to_omada.defer(new_devices=device_list)
device.registered = True
device.save()
return Response(
{
"status": payment_verification_response.success,
"message": payment_verification_response.message,
"transaction": asdict(payment_verification_response.transaction)
if payment_verification_response.transaction
else None,
},
status=status.HTTP_200_OK,
)
else:
return Response(
{
"status": payment_verification_response.success,
"message": payment_verification_response.message
or "Topup payment verification failed.",
},
status=status.HTTP_400_BAD_REQUEST,
)
def process_wallet_payment(self, user: User, payment: Payment, devices=None):
print("processing wallet payment...")
print(user, payment.amount)
# Use passed devices or fetch if not provided
if devices is None:
devices = payment.devices.all()
payment.paid = True
payment.paid_at = timezone.now()
payment.method = "WALLET"
payment.status = "PAID"
expiry_date = timezone.now() + timedelta(days=30 * payment.number_of_months)
devices.update(
is_active=True,
expiry_date=expiry_date,
has_a_pending_payment=False,
)
payment.save()
user.deduct_wallet_funds(
payment.amount, "Wallet payment for devices", payment.id
)
user.save()
return True
def verify_transfer_payment(self, data, payment) -> PaymentVerificationResponse:
if not PAYMENT_BASE_URL:
raise ValueError(
"PAYMENT_BASE_URL is not set. Please set it in your environment variables."
)
response = requests.post(
f"{PAYMENT_BASE_URL}/verify-payment",
json=data,
headers={"Content-Type": "application/json"},
)
logger.info("MIB Verification Response -> ", response)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error(f"HTTPError: {e}")
return PaymentVerificationResponse(
message="Payment verification failed.", success=False, transaction=None
)
mib_resp = response.json()
logger.info("MIB Verification Response ->", mib_resp)
if not response.json().get("success"):
return PaymentVerificationResponse(
message=mib_resp["message"],
success=mib_resp["success"],
transaction=None,
)
else:
payment.paid = True
payment.paid_at = timezone.now()
payment.method = "TRANSFER"
payment.mib_reference = mib_resp["transaction"]["ref"] or ""
payment.save()
return PaymentVerificationResponse(
message=mib_resp["message"],
success=mib_resp["success"],
transaction=Transaction(
ref=payment.mib_reference,
sourceBank=mib_resp["transaction"]["sourceBank"],
trxDate=mib_resp["transaction"]["trxDate"],
),
)
class CancelPaymentView(StaffEditorPermissionMixin, generics.UpdateAPIView):
queryset = Payment.objects.select_related("user").all()
serializer_class = PaymentSerializer
lookup_field = "pk"
def update(self, request, *args, **kwargs):
instance = self.get_object()
user = request.user
if instance.status == "CANCELLED":
return Response(
{"message": "Payment has already been cancelled."},
status=status.HTTP_400_BAD_REQUEST,
)
if instance.user != user and not user.is_superuser:
return Response(
{"message": "You are not authorized to cancel this payment."},
status=status.HTTP_403_FORBIDDEN,
)
if instance.paid:
return Response(
{"message": "Paid payments cannot be cancelled."},
status=status.HTTP_400_BAD_REQUEST,
)
devices = instance.devices.all()
instance.status = "CANCELLED"
instance.save()
devices.update(is_active=False, expiry_date=None, has_a_pending_payment=False)
return super().update(request, *args, **kwargs)
class ListCreateTopupView(StaffEditorPermissionMixin, generics.ListCreateAPIView):
queryset = Topup.objects.all().prefetch_related("user")
serializer_class = TopupSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = "__all__"
filterset_class = TopupFilter
def create(self, request, *args, **kwargs):
data = request.data
user = request.user
current_time = timezone.now()
expires_at = current_time + timedelta(minutes=10) # Topup expires in 10 minutes
amount = data.get("amount")
if not amount:
return Response(
{"message": "amount is required."},
status=status.HTTP_400_BAD_REQUEST,
)
topup = Topup.objects.create(amount=amount, user=user, expires_at=expires_at)
serializer = TopupSerializer(topup)
return Response(serializer.data, status=status.HTTP_201_CREATED)
def get_queryset(self):
queryset = super().get_queryset()
if getattr(self.request.user, "is_admin") or self.request.user.is_superuser:
return queryset
return queryset.filter(user=self.request.user)
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
all_topups = request.query_params.get("all_topups", "false").lower() in [
"true",
"1",
"yes",
]
if (
request.user.is_authenticated
and getattr(request.user, "is_admin")
and bool(all_topups)
):
pass
else:
queryset = queryset.filter(user=request.user)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class TopupDetailAPIView(StaffEditorPermissionMixin, generics.RetrieveAPIView):
queryset = Topup.objects.all()
serializer_class = TopupSerializer
lookup_field = "pk"
def get_queryset(self):
queryset = super().get_queryset()
if getattr(self.request.user, "is_admin") or self.request.user.is_superuser:
return queryset
return queryset.filter(user=self.request.user)
class VerifyTopupPaymentAPIView(StaffEditorPermissionMixin, generics.UpdateAPIView):
queryset = Topup.objects.all()
serializer_class = TopupSerializer
lookup_field = "pk"
def verify_transfer_topup(self, data, topup) -> PaymentVerificationResponse:
if not PAYMENT_BASE_URL:
raise ValueError(
"PAYMENT_BASE_URL is not set. Please set it in your environment variables."
)
logger.info(data)
response = requests.post(
f"{PAYMENT_BASE_URL}/verify-payment",
json=data,
headers={"Content-Type": "application/json"},
)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error(f"HTTPError: {e}")
return PaymentVerificationResponse(
message="Payment verification failed.", success=False, transaction=None
)
mib_resp = response.json()
print(mib_resp)
if not response.json().get("success"):
return PaymentVerificationResponse(
message=mib_resp["message"],
success=mib_resp["success"],
transaction=None,
)
else:
topup.paid = True
topup.mib_reference = mib_resp["transaction"]["ref"] or ""
topup.paid_at = mib_resp["transaction"]["trxDate"]
topup.save()
return PaymentVerificationResponse(
message=mib_resp["message"],
success=mib_resp["success"],
transaction=Transaction(
ref=topup.mib_reference,
sourceBank=mib_resp["transaction"]["sourceBank"],
trxDate=mib_resp["transaction"]["trxDate"],
),
)
def update(self, request, *args, **kwargs):
topup_instance = self.get_object()
user = request.user
if topup_instance.paid:
return Response(
{"message": "Payment has already been verified."},
status=status.HTTP_400_BAD_REQUEST,
)
if topup_instance.user != user and not user.is_superuser:
return Response(
{"message": "You are not allowed to pay for this topup."},
status=status.HTTP_403_FORBIDDEN,
)
data = {
"benefName": f"{user.first_name} {user.last_name}", # type: ignore
"accountNo": user.acc_no, # type: ignore
"absAmount": "{:.2f}".format(topup_instance.amount),
"time": localtime(topup_instance.created_at).strftime("%Y-%m-%d %H:%M"),
}
logger.info(
f"Verifying topup payment created at {localtime(topup_instance.created_at)} with data: {data}"
)
topup_verification_response = self.verify_transfer_topup(data, topup_instance)
print("Topup verification response:", topup_verification_response)
if topup_verification_response.success:
user.add_wallet_funds( # type: ignore
topup_instance.amount,
f"Topup of {topup_instance.amount} MVR",
topup_instance.id,
)
user.save()
topup_instance.status = "PAID"
topup_instance.save()
return Response(
{
"status": topup_verification_response.success,
"message": topup_verification_response.message,
"transaction": asdict(topup_verification_response.transaction)
if topup_verification_response.transaction
else None,
},
status=status.HTTP_200_OK,
)
else:
return Response(
{
"status": topup_verification_response.success,
"message": topup_verification_response.message
or "Topup payment verification failed.",
},
status=status.HTTP_400_BAD_REQUEST,
)
class CancelTopupView(StaffEditorPermissionMixin, generics.UpdateAPIView):
queryset = Topup.objects.all().select_related("user")
serializer_class = TopupSerializer
lookup_field = "pk"
def update(self, request, *args, **kwargs):
instance = self.get_object()
user = request.user
if instance.status == "CANCELLED":
return Response(
{"message": "Topup has already been cancelled."},
status=status.HTTP_400_BAD_REQUEST,
)
if instance.is_expired:
return Response(
{"message": "Expired topups cannot be cancelled."},
status=status.HTTP_400_BAD_REQUEST,
)
if (
instance.user != user
and getattr(user, "is_admin")
and not user.is_superuser
):
return Response(
{"message": "You are not authorized to delete this topup."},
status=status.HTTP_403_FORBIDDEN,
)
if instance.paid:
return Response(
{"message": "Paid topups cannot be deleted."},
status=status.HTTP_400_BAD_REQUEST,
)
instance.status = "CANCELLED"
instance.save()
return super().update(request, *args, **kwargs)
class AdminTopupCreateView(StaffEditorPermissionMixin, generics.CreateAPIView):
queryset = Topup.objects.all().select_related("user")
serializer_class = TopupSerializer
def create(self, request, *args, **kwargs):
data = request.data
user_id = data.get("user_id")
amount = data.get("amount")
topup_description = ""
admin_description = data.get("description", "")
if not getattr(request.user, "is_admin", False):
return Response(
{"message": "You are not authorized to perform this action."},
status=status.HTTP_403_FORBIDDEN,
)
if not user_id:
return Response(
{"message": "user_id is required."},
status=status.HTTP_400_BAD_REQUEST,
)
if not amount:
return Response(
{"message": "amount is required."},
status=status.HTTP_400_BAD_REQUEST,
)
user = User.objects.filter(id=user_id).first()
if not user:
return Response(
{"message": "User not found."},
status=status.HTTP_404_NOT_FOUND,
)
topup = Topup.objects.create(
amount=amount,
user=user,
paid=True,
paid_at=timezone.now(),
payment_type="CASH",
status="PAID",
)
default_description = f"Topup of {amount} MVR (Cash)"
if admin_description and admin_description.strip() != "":
topup_description = admin_description.strip()
else:
topup_description = default_description
user.add_wallet_funds(amount, topup_description, topup.id)
serializer = TopupSerializer(topup)
return Response(serializer.data, status=status.HTTP_201_CREATED)
class ListWalletTransactionView(StaffEditorPermissionMixin, generics.ListAPIView):
serializer_class = WalletTransactionSerializer
queryset = WalletTransaction.objects.all().select_related("user")
filter_backends = [DjangoFilterBackend]
filterset_fields = "__all__"
filterset_class = WalletTransactionFilter
def get_queryset(self):
queryset = super().get_queryset()
if getattr(self.request.user, "is_admin") or self.request.user.is_superuser:
return queryset
return queryset.filter(user=self.request.user)
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
all_transations = request.query_params.get(
"all_transations", "false"
).lower() in [
"true",
"1",
"yes",
]
if (
request.user.is_authenticated
and getattr(request.user, "is_admin")
and bool(all_transations)
):
pass
else:
queryset = queryset.filter(user=request.user)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# class AlertTestView(generics.GenericAPIView):
# def get(self, request, *args, **kwargs):
# msg = """*ID Card:* A265117\n*Name:* Abdulla Aidhaan\n*House Name:* Nooree Villa\n*Date of Birth:* 1997-08-24\n*Island:* Sh Funadhoo\n*Mobile:* 9697404\nVisit [SAR Link Portal](https://portal.sarlink.net) to manually verify this user."""
# print(msg)
# print("escaped:", escape_markdown_v2(msg))
# user = request.user
# print(user)
# global telegram_loop # Access the global loop
# if telegram_loop is None:
# return Response(
# {"message": "Telegram worker not initialized."},
# status=status.HTTP_503_SERVICE_UNAVAILABLE,
# )
# try:
# asyncio.run_coroutine_threadsafe(
# send_telegram_alert(markdown_message=escape_markdown_v2(msg)),
# telegram_loop,
# ).result()
# return Response(
# {"message": "Alert sent successfully."}, status=status.HTTP_200_OK
# )
# except Exception as e:
# logger.warning("[alert test] TELEGRAM ALERT ERROR", e)
# return Response(
# {"message": "Alert failed to send."}, status=status.HTTP_400_BAD_REQUEST
# )