Merge pull request #8 from i701/feat/task-queues
All checks were successful
Build and Push Docker Images / Build and Push Docker Images (push) Successful in 5m25s

feat/task queues
This commit is contained in:
Abdulla Aidhaan
2025-07-05 14:35:41 +05:00
committed by GitHub
7 changed files with 112 additions and 23 deletions

View File

@ -7,7 +7,6 @@ from django.db.models.signals import post_save
from api.models import User from api.models import User
from django.contrib.auth.models import Permission from django.contrib.auth.models import Permission
from api.tasks import verify_user_with_person_api_task from api.tasks import verify_user_with_person_api_task
from asgiref.sync import sync_to_async
@receiver(post_save, sender=User) @receiver(post_save, sender=User)
@ -30,10 +29,9 @@ def assign_device_permissions(sender, instance, created, **kwargs):
@receiver(post_save, sender=User) @receiver(post_save, sender=User)
@sync_to_async async def verify_user_with_person_api(sender, instance, created, **kwargs):
def verify_user_with_person_api(sender, instance, created, **kwargs):
if created: if created:
verify_user_with_person_api_task(instance.id) await verify_user_with_person_api_task.defer_async(instance.id)
@receiver(reset_password_token_created) @receiver(reset_password_token_created)

View File

@ -18,11 +18,13 @@ env.read_env(os.path.join(BASE_DIR, ".env"))
@app.task @app.task
def add(x, y): def add(x, y):
print(f"Adding {x} and {y}") logger.info(f"Executing test background task with {x} and {y}")
return x + y return x + y
@app.periodic(cron="0 0 */28 * *") # type: ignore @app.periodic(
cron="0 0 */28 * *", queue="heavy_tasks", periodic_id="deactivate_expired_devices"
) # type: ignore
@app.task @app.task
def deactivate_expired_devices(): def deactivate_expired_devices():
expired_devices = Device.objects.filter( expired_devices = Device.objects.filter(
@ -66,7 +68,8 @@ def add_new_devices_to_omada(new_devices: list[dict]):
omada_client.add_new_devices_to_omada(new_devices) omada_client.add_new_devices_to_omada(new_devices)
def verify_user_with_person_api_task(user_id: int): @app.task
async def verify_user_with_person_api_task(user_id: int):
""" """
Verify the user with the Person API. Verify the user with the Person API.
:param user_id: The ID of the user to verify. :param user_id: The ID of the user to verify.
@ -80,16 +83,16 @@ def verify_user_with_person_api_task(user_id: int):
return None return None
# Call the Person API to verify the user # Call the Person API to verify the user
verification_failed_message = f""" # verification_failed_message = f"""
_The following user verification failed_: # _The following user verification failed_:
*ID Card:* {user.id_card} # *ID Card:* {user.id_card}
*Name:* {user.first_name} {user.last_name} # *Name:* {user.first_name} {user.last_name}
*House Name:* {user.address} # *House Name:* {user.address}
*Date of Birth:* {user.dob} # *Date of Birth:* {user.dob}
*Island:* {(user.atoll.name if user.atoll else "N/A")} {(user.island.name if user.island else "N/A")} # *Island:* {(user.atoll.name if user.atoll else "N/A")} {(user.island.name if user.island else "N/A")}
*Mobile:* {user.mobile} # *Mobile:* {user.mobile}
Visit [SAR Link Portal](https://portal.sarlink.net) to manually verify this user. # Visit [SAR Link Portal](https://portal.sarlink.net) to manually verify this user.
""" # """
# logger.info(verification_failed_message) # logger.info(verification_failed_message)
PERSON_VERIFY_BASE_URL = env.str("PERSON_VERIFY_BASE_URL", default="") # type: ignore PERSON_VERIFY_BASE_URL = env.str("PERSON_VERIFY_BASE_URL", default="") # type: ignore
@ -170,7 +173,7 @@ def verify_user_with_person_api_task(user_id: int):
user.mobile, user.mobile,
f"Dear {user.first_name} {user.last_name}, \n\nYour account registration is being processed. \n\nWe will notify you once verification is complete. \n\n - SAR Link", f"Dear {user.first_name} {user.last_name}, \n\nYour account registration is being processed. \n\nWe will notify you once verification is complete. \n\n - SAR Link",
) )
send_clean_telegram_markdown(message=verification_failed_message) # send_clean_telegram_markdown(message=verification_failed_message)
return False return False
else: else:
# Handle the error case # Handle the error case

14
api/tasks_app.py Normal file
View File

@ -0,0 +1,14 @@
# projects/tasks_app.py
from procrastinate import App
from procrastinate.contrib.django import django_connector
app = App(
connector=django_connector.DjangoConnector(),
periodic_defaults={"max_delay": 86400}, # accept up to 24h delay
)
def on_app_ready(app):
app.periodic_defaults = {"max_delay": 86400}
app.import_paths.append("api.tasks")
app.import_paths.append("billing.tasks")

View File

@ -371,8 +371,5 @@ PASSWORDLESS_AUTH = {
} }
# CELERY CONFIGURATION PROCRASTINATE_ON_APP_READY = "api.tasks_app.on_app_ready"
CELERY_BROKER_URL = f"redis://{REDIS_HOST}:6379/0" PROCRASTINATE_APP = "api.tasks_app.app"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_BACKEND = f"redis://{REDIS_HOST}:6379/0"

View File

@ -0,0 +1,17 @@
# Generated by Django 5.2 on 2025-07-05 09:26
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("billing", "0009_remove_topup_expired"),
]
operations = [
migrations.AddField(
model_name="topup",
name="expiry_notification_sent",
field=models.BooleanField(default=False),
),
]

View File

@ -54,6 +54,7 @@ class Topup(models.Model):
paid_at = models.DateTimeField(null=True, blank=True) paid_at = models.DateTimeField(null=True, blank=True)
mib_reference = models.CharField(default="", null=True, blank=True) mib_reference = models.CharField(default="", null=True, blank=True)
expires_at = models.DateTimeField(null=True, blank=True) expires_at = models.DateTimeField(null=True, blank=True)
expiry_notification_sent = models.BooleanField(default=False)
created_at = models.DateTimeField(default=timezone.now) created_at = models.DateTimeField(default=timezone.now)
updated_at = models.DateTimeField(auto_now=True) updated_at = models.DateTimeField(auto_now=True)

59
billing/tasks.py Normal file
View File

@ -0,0 +1,59 @@
import logging
from django.db import transaction
from django.utils import timezone
from procrastinate.contrib.django import app
from api.notifications import send_sms
from billing.models import Topup
logger = logging.getLogger(__name__)
@app.periodic(
cron="*/30 * * * * *", periodic_id="notify_expired_topups", queue="heavy_tasks"
) # every 30 seconds
@app.task
def update_expired_topups(timestamp: int):
expired_topups_qs = Topup.objects.filter(
expires_at__lte=timezone.now(), expiry_notification_sent=False
).select_related("user")
with transaction.atomic():
count = expired_topups_qs.count()
logger.info(f"Found {count} topups to expire.")
for topup in expired_topups_qs:
if topup.user and topup.user.mobile and not topup.expiry_notification_sent:
send_sms_task.defer(
mobile=topup.user.mobile,
amount=topup.amount,
topup_id=str(topup.id),
)
else:
# Mark as notified even if we can't send SMS (no mobile number)
topup.expiry_notification_sent = True
topup.save()
return {
"total_expired_topups": count,
}
# Assuming you have a separate task for sending SMS if you go that route
@app.task
def send_sms_task(mobile: str, amount: float, topup_id: str):
message = (
f"Dear {mobile}, \n\nYour topup of {amount} MVR has expired. "
"Please make a new topup to update your wallet. \n\n- SAR Link"
)
send_sms(mobile, message)
logger.info(f"SMS sent to {mobile} for expired topup of {amount} MVR.")
# Mark the topup as notified after successful SMS sending
try:
topup = Topup.objects.get(id=topup_id)
topup.expiry_notification_sent = True
topup.save()
logger.info(f"Marked topup {topup_id} as notified.")
except Topup.DoesNotExist:
logger.error(f"Topup {topup_id} not found when trying to mark as notified.")