Merge pull request #10 from i701/feat/task-queues
All checks were successful
Build and Push Docker Images / Build and Push Docker Images (push) Successful in 4m39s

feature / task queues cleanup 
This commit is contained in:
Abdulla Aidhaan
2025-07-09 20:16:48 +05:00
committed by GitHub
4 changed files with 128 additions and 21 deletions

View File

@ -10,6 +10,7 @@ from django.utils import timezone
from api.omada import Omada
from apibase.env import env, BASE_DIR
from procrastinate.contrib.django import app
from procrastinate import builtin_tasks
logger = logging.getLogger(__name__)
@ -17,6 +18,23 @@ logger = logging.getLogger(__name__)
env.read_env(os.path.join(BASE_DIR, ".env"))
@app.periodic(cron="0 4 * * *")
@app.task(
queueing_lock="remove_old_jobs",
pass_context=True,
)
async def remove_old_jobs(context, timestamp):
logger.info("Running remove_old_jobs task...")
return await builtin_tasks.remove_old_jobs(
context,
queue="heavy_tasks",
max_hours=24,
remove_failed=True,
remove_cancelled=True,
remove_aborted=True,
)
@app.task
def add(x, y):
logger.info(f"Executing test background task with {x} and {y}")

View File

@ -0,0 +1,17 @@
# Generated by Django 5.2 on 2025-07-09 14:50
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("billing", "0012_payment_status"),
]
operations = [
migrations.AddField(
model_name="payment",
name="expiry_notification_sent",
field=models.BooleanField(default=False),
),
]

View File

@ -23,6 +23,7 @@ class Payment(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="payments")
paid_at = models.DateTimeField(null=True, blank=True)
method = models.CharField(max_length=255, choices=PAYMENT_TYPES, default="TRANSFER")
expiry_notification_sent = models.BooleanField(default=False)
expires_at = models.DateTimeField(null=True, blank=True)
created_at = models.DateTimeField(default=timezone.now)
updated_at = models.DateTimeField(auto_now=True)

View File

@ -4,16 +4,22 @@ from django.db import transaction
from django.utils import timezone
from procrastinate.contrib.django import app
from api.notifications import send_sms
from billing.models import Topup
from billing.models import Topup, Payment
from django.utils.timezone import localtime
from datetime import datetime
from enum import Enum
logger = logging.getLogger(__name__)
class NotificationType(Enum):
TOPUP = "TOPUP"
PAYMENT = "PAYMENT"
@app.periodic(
cron="*/1 * * * * *", periodic_id="notify_expired_topups", queue="heavy_tasks"
) # every 1 minute
)
@app.task
def update_expired_topups(timestamp: int):
expired_topups_qs = Topup.objects.filter(
@ -33,8 +39,9 @@ def update_expired_topups(timestamp: int):
if topup.user and topup.user.mobile and not topup.expiry_notification_sent:
send_sms_task.defer(
mobile=topup.user.mobile,
type=NotificationType.TOPUP,
amount=topup.amount,
topup_id=str(topup.id),
model_id=str(topup.id),
created_at=localtime(topup.created_at).isoformat(),
user=f"{topup.user.first_name + ' ' + topup.user.last_name}"
if topup.user.last_name and topup.user.first_name
@ -43,7 +50,6 @@ def update_expired_topups(timestamp: int):
topup.expiry_notification_sent = True
topup.save()
else:
# Mark as notified even if we can't send SMS (no mobile number)
topup.expiry_notification_sent = True
topup.save()
return
@ -53,29 +59,94 @@ def update_expired_topups(timestamp: int):
}
@app.periodic(
cron="*/1 * * * * *", periodic_id="notify_expired_payments", queue="heavy_tasks"
)
@app.task
def update_expired_payments(timestamp: int):
expired_payments_qs = Payment.objects.filter(
expires_at__lte=timezone.now(),
expiry_notification_sent=False,
paid=False,
).select_related("user")
if not expired_payments_qs.exists():
logger.info("No expired payments found.")
return {"total_expired_payments": 0}
with transaction.atomic():
count = expired_payments_qs.count()
logger.info(f"Found {count} payments to expire.")
for payment in expired_payments_qs:
if (
payment.user
and payment.user.mobile
and not payment.expiry_notification_sent
):
send_sms_task.defer(
mobile=payment.user.mobile,
type=NotificationType.PAYMENT,
amount=payment.amount,
model_id=str(payment.id),
created_at=localtime(payment.created_at).isoformat(),
user=f"{payment.user.first_name + ' ' + payment.user.last_name}"
if payment.user.last_name and payment.user.first_name
else "User",
)
payment.expiry_notification_sent = True
payment.save()
else:
payment.expiry_notification_sent = True
payment.save()
return
return {
"total_expired_payments": count,
}
@app.task
def send_sms_task(
user: str, mobile: str, amount: float, topup_id: str, created_at: str
user: str,
mobile: str,
amount: float,
model_id: str,
created_at: str,
type: NotificationType = NotificationType.TOPUP,
):
# Parse the ISO formatted date string
try:
dt = datetime.fromisoformat(created_at)
formatted_date = dt.strftime("%d %b %Y, %I:%M %p")
except Exception:
formatted_date = created_at # fallback to original if parsing fails
message = (
f"Dear {user}, \n\nYour topup of {amount} MVR [created at {formatted_date}] has expired. "
"Please make a new topup to update your wallet. \n\n- SAR Link"
)
formatted_date = created_at
message: str = ""
if type == NotificationType.TOPUP:
message = (
f"Dear {user}, \n\nYour topup of {amount} MVR [created at {formatted_date}] has expired. "
"Please make a new topup to update your wallet. \n\n- SAR Link"
)
elif type == NotificationType.PAYMENT:
message = f"Dear {user}, \n\nYour payment of {amount} MVR [created at {formatted_date}] has expired. \n\n- SAR Link"
send_sms(mobile, message)
logger.info(f"SMS sent to {mobile} for expired topup of {amount} MVR.")
logger.info(f"SMS sent to {mobile} for expired {type.value} of {amount} MVR.")
# Mark the topup as notified after successful SMS sending
try:
topup = Topup.objects.get(id=topup_id)
topup.expiry_notification_sent = True
topup.save()
logger.info(f"Marked topup {topup_id} as notified.")
except Topup.DoesNotExist:
logger.error(f"Topup {topup_id} not found when trying to mark as notified.")
if type == NotificationType.TOPUP:
try:
topup = Topup.objects.get(id=model_id)
topup.expiry_notification_sent = True
topup.save()
logger.info(f"Marked topup {model_id} as notified.")
except Topup.DoesNotExist:
logger.error(
f"Topup id: {model_id} not found when trying to mark as notified."
)
else:
try:
topup = Payment.objects.get(id=model_id)
topup.expiry_notification_sent = True
topup.save()
logger.info(f"Marked payment {model_id} as notified.")
except Payment.DoesNotExist:
logger.error(
f"Payment id: {model_id} not found when trying to mark as notified."
)