Refactor task management: replace Celery with Procrastinate for background tasks and update related configurations

This commit is contained in:
2025-06-28 10:25:33 +05:00
parent 28315c59cf
commit e3b39478eb
8 changed files with 37 additions and 66 deletions

View File

@ -47,8 +47,7 @@ curl -X POST http://localhost:4000/api/auth/login/ \
``` ```
expected response: `{"message":"Unable to log in with provided credentials."}` expected response: `{"message":"Unable to log in with provided credentials."}`
5. For Celery to work run the worker and the beat 5. For procrastinate (postgres background tasks) to work run the worker
``` ```
celery -A apibase worker --loglevel=info python manage.py procrastinate worker
celery -A apibase beat --loglevel=info
``` ```

View File

@ -4,11 +4,12 @@ from devices.models import Device
from api.notifications import send_sms from api.notifications import send_sms
import os import os
import logging import logging
from celery import shared_task
from django.utils import timezone from django.utils import timezone
from api.notifications import send_clean_telegram_markdown from api.notifications import send_clean_telegram_markdown
from api.omada import Omada from api.omada import Omada
from apibase.env import env, BASE_DIR from apibase.env import env, BASE_DIR
from procrastinate.contrib.django import app
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -17,13 +18,14 @@ env.read_env(os.path.join(BASE_DIR, ".env"))
omada_client = Omada() omada_client = Omada()
@shared_task @app.task
def add(x, y): def add(x, y):
print(f"Adding {x} and {y}") print(f"Adding {x} and {y}")
return x + y return x + y
@shared_task @app.periodic(cron="0 0 */28 * *") # type: ignore
@app.task
def deactivate_expired_devices(): def deactivate_expired_devices():
expired_devices = Device.objects.filter( expired_devices = Device.objects.filter(
expiry_date__lte=timezone.localtime(timezone.now()), is_active=True expiry_date__lte=timezone.localtime(timezone.now()), is_active=True
@ -55,12 +57,13 @@ def deactivate_expired_devices():
} }
@shared_task @app.task
def add_new_devices_to_omada(new_devices: list[dict]): def add_new_devices_to_omada(new_devices: list[dict]):
""" """
Add new devices to Omada via Omada class. Add new devices to Omada via Omada class.
:param new_devices: List of new device names to add. :param new_devices: List of new device names to add.
""" """
logger.info("Running add new devices to Omada task...")
omada_client.add_new_devices_to_omada(new_devices) omada_client.add_new_devices_to_omada(new_devices)

View File

@ -60,7 +60,7 @@ class ErrorMessages:
@api_view(["GET"]) @api_view(["GET"])
def healthcheck(request): def healthcheck(request):
add.delay(1, 2) add.defer(1, 2)
return Response({"status": "Good"}, status=status.HTTP_200_OK) return Response({"status": "Good"}, status=status.HTTP_200_OK)

View File

@ -1,4 +0,0 @@
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)

View File

@ -1,32 +0,0 @@
import os
from celery import Celery
from celery.schedules import crontab
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "apibase.settings")
app = Celery("apibase")
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django apps.
app.autodiscover_tasks(["api", "devices"])
# Add periodic task scheduler
app.conf.beat_schedule = {
"deactivate-expired-devices-every-day": {
"task": "api.tasks.deactivate_expired_devices",
"schedule": crontab(hour=0, minute=0), # Runs daily at midnight
},
}
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f"Request: {self.request!r}")

View File

@ -68,7 +68,7 @@ INSTALLED_APPS = [
# third party # third party
"django_filters", "django_filters",
"corsheaders", "corsheaders",
"django_celery_beat", "procrastinate.contrib.django",
] ]
if DEBUG: if DEBUG:
@ -134,14 +134,6 @@ if not DEBUG:
SECURE_CONTENT_TYPE_NOSNIFF = True SECURE_CONTENT_TYPE_NOSNIFF = True
# DATABASES # DATABASES
if DEBUG:
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": BASE_DIR / "db.sqlite3",
}
}
else:
DATABASES = { DATABASES = {
"default": { "default": {
"ENGINE": "django.db.backends.postgresql", "ENGINE": "django.db.backends.postgresql",
@ -150,7 +142,7 @@ else:
"PASSWORD": env("POSTGRES_PASSWORD"), "PASSWORD": env("POSTGRES_PASSWORD"),
"HOST": env("POSTGRES_HOST"), "HOST": env("POSTGRES_HOST"),
"PORT": env("POSTGRES_PORT"), "PORT": env("POSTGRES_PORT"),
}, }
} }
@ -307,6 +299,9 @@ logging.config.dictConfig(
"format": "%(asctime)s %(levelname)s %(message)s", "format": "%(asctime)s %(levelname)s %(message)s",
}, },
"django.server": DEFAULT_LOGGING["formatters"]["django.server"], "django.server": DEFAULT_LOGGING["formatters"]["django.server"],
"procrastinate": {
"format": "%(asctime)s %(levelname)-7s %(name)s %(message)s"
},
}, },
"handlers": { "handlers": {
"console": { "console": {
@ -318,6 +313,11 @@ logging.config.dictConfig(
"formatter": "request", "formatter": "request",
}, },
"django.server": DEFAULT_LOGGING["handlers"]["django.server"], "django.server": DEFAULT_LOGGING["handlers"]["django.server"],
"procrastinate": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "procrastinate",
},
}, },
"loggers": { "loggers": {
"": { "": {
@ -340,6 +340,11 @@ logging.config.dictConfig(
"handlers": ["console"], "handlers": ["console"],
"propagate": False, "propagate": False,
}, },
"procrastinate": {
"handlers": ["procrastinate"],
"level": "DEBUG",
"propagate": False,
},
}, },
} }
) )

View File

@ -181,7 +181,7 @@ class VerifyPaymentView(StaffEditorPermissionMixin, generics.UpdateAPIView):
) )
if not device.registered: if not device.registered:
# Add to omada # Add to omada
add_new_devices_to_omada.delay(new_devices=device_list) add_new_devices_to_omada.defer(new_devices=device_list)
device.registered = True device.registered = True
device.save() device.save()

View File

@ -13,7 +13,6 @@ billiard==4.2.1
black==23.12.1 black==23.12.1
boto3==1.35.49 boto3==1.35.49
botocore==1.35.49 botocore==1.35.49
celery==5.5.1
certifi==2024.2.2 certifi==2024.2.2
cffi==1.16.0 cffi==1.16.0
chardet==5.2.0 chardet==5.2.0
@ -24,11 +23,11 @@ click-plugins==1.1.1
click-repl==0.3.0 click-repl==0.3.0
colorama==0.4.6 colorama==0.4.6
cron-descriptor==1.4.5 cron-descriptor==1.4.5
croniter==6.0.0
cryptography==41.0.7 cryptography==41.0.7
cssselect2==0.7.0 cssselect2==0.7.0
dj-database-url==2.1.0 dj-database-url==2.1.0
django==5.2 django==5.2
django-celery-beat==2.8.0
django-cors-headers==4.3.1 django-cors-headers==4.3.1
django-debug-toolbar==4.2.0 django-debug-toolbar==4.2.0
django-easy-audit==1.3.7 django-easy-audit==1.3.7
@ -70,12 +69,13 @@ packaging==23.2
pathspec==0.12.1 pathspec==0.12.1
pillow==10.2.0 pillow==10.2.0
platformdirs==4.1.0 platformdirs==4.1.0
procrastinate==3.2.2
prompt-toolkit==3.0.50 prompt-toolkit==3.0.50
propcache==0.2.0 propcache==0.2.0
psycopg==3.2.3 psycopg==3.2.3
psycopg-binary==3.2.3 psycopg-binary==3.2.3
psycopg-pool==3.2.4 psycopg-pool==3.2.4
psycopg2-binary==2.9.9 psycopg2-binary==2.9.10
pycparser==2.21 pycparser==2.21
pyhanko==0.21.0 pyhanko==0.21.0
pyhanko-certvalidator==0.26.3 pyhanko-certvalidator==0.26.3