feat(telegram): implement asynchronous Telegram alert system and enhance user verification messaging
All checks were successful
Build and Push Docker Images / Build and Push Docker Images (push) Successful in 4m38s

This commit is contained in:
2025-07-27 21:56:59 +05:00
parent 2bc594da9c
commit 9c082aedf2
4 changed files with 123 additions and 12 deletions

69
api/bot.py Normal file
View File

@@ -0,0 +1,69 @@
import asyncio
import threading
import logging
import time
from telegram import Bot
from telegram.constants import ParseMode
from decouple import config
import re
logger = logging.getLogger(__name__)
telegram_loop = None
BOT_TOKEN = config("TG_BOT_TOKEN", default="killme", cast=str)
CHAT_ID = config("TG_CHAT_ID", default="drake", cast=str)
if not BOT_TOKEN or not isinstance(BOT_TOKEN, str):
raise ValueError(
"TG_BOT_TOKEN environment variable must be set and must be a string."
)
if not CHAT_ID:
raise ValueError(
"TG_CHAT_ID environment variable must be set and must be a string."
)
bot = Bot(token=BOT_TOKEN)
def telegram_worker():
"""
Run the event loop for Telegram in a separate daemon thread.
"""
global telegram_loop
telegram_loop = asyncio.new_event_loop()
asyncio.set_event_loop(telegram_loop)
try:
logger.info("Telegram loop started.")
telegram_loop.run_forever()
except Exception as e:
logger.exception(f"Telegram worker crashed! {e}", exc_info=True)
finally:
telegram_loop.close()
# Start the Telegram worker thread when the module is loaded
telegram_thread = threading.Thread(target=telegram_worker, daemon=True)
telegram_thread.start()
# Wait until telegram_loop is ready
timeout = 5
for _ in range(timeout * 10): # up to 5 seconds
if telegram_loop is not None:
break
time.sleep(0.1)
else:
logger.error("Telegram loop failed to initialize in time.")
async def send_telegram_alert(markdown_message: str):
logger.info("[TELEGRAM] Preparing to send alert...")
await bot.send_message(
chat_id=str(CHAT_ID),
text=markdown_message,
parse_mode=ParseMode.MARKDOWN_V2,
)
def escape_markdown_v2(text: str) -> str:
escape_chars = r"_~`>#+-=|{}.!\\"
return re.sub(f"([{re.escape(escape_chars)}])", r"\\\1", text)

View File

@@ -1,3 +1,4 @@
# pyright: reportGeneralTypeIssues=false
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from api.models import User from api.models import User
from devices.models import Device from devices.models import Device
@@ -8,6 +9,8 @@ from django.utils import timezone
# from api.notifications import send_clean_telegram_markdown # from api.notifications import send_clean_telegram_markdown
from api.omada import Omada from api.omada import Omada
from api.bot import send_telegram_alert, telegram_loop, escape_markdown_v2
import asyncio
from apibase.env import env, BASE_DIR from apibase.env import env, BASE_DIR
from procrastinate.contrib.django import app from procrastinate.contrib.django import app
from procrastinate import builtin_tasks from procrastinate import builtin_tasks
@@ -147,18 +150,9 @@ def verify_user_with_person_api_task(user_id: int):
if not user: if not user:
logger.error(f"User with ID {user_id} not found.") logger.error(f"User with ID {user_id} not found.")
return None return None
# Call the Person API to verify the user
# verification_failed_message = f""" verification_failed_message = f"""*The following user verification failed*:\n\n*ID Card:* {user.id_card}\n*Name:* {user.first_name} {user.last_name}\n*House Name:* {user.address}\n*Date of Birth:* {user.dob}\n*Island:* {(user.atoll.name if user.atoll else "N/A")} {(user.island.name if user.island else "N/A")}\n*Mobile:* {user.mobile}\nVisit [SAR Link Portal](https://portal.sarlink.net/users/{user_id}/details) to manually verify this user.
# _The following user verification failed_: """
# *ID Card:* {user.id_card}
# *Name:* {user.first_name} {user.last_name}
# *House Name:* {user.address}
# *Date of Birth:* {user.dob}
# *Island:* {(user.atoll.name if user.atoll else "N/A")} {(user.island.name if user.island else "N/A")}
# *Mobile:* {user.mobile}
# Visit [SAR Link Portal](https://portal.sarlink.net) to manually verify this user.
# """
# logger.info(verification_failed_message) # logger.info(verification_failed_message)
PERSON_VERIFY_BASE_URL = env.str("PERSON_VERIFY_BASE_URL", default="") # type: ignore PERSON_VERIFY_BASE_URL = env.str("PERSON_VERIFY_BASE_URL", default="") # type: ignore
@@ -228,7 +222,7 @@ def verify_user_with_person_api_task(user_id: int):
user.save() user.save()
send_sms( send_sms(
user.mobile, user.mobile,
f"Dear {user.first_name} {user.last_name}, \n\nYour account has been successfully and verified. \n\nYou can now manage your devices and make payments through our portal at https://portal.sarlink.net. \n\n - SAR Link", f"Dear {user.first_name} {user.last_name}, \n\nYour account has been successfully verified. \n\nYou can now manage your devices and make payments through our portal at https://portal.sarlink.net. \n\n - SAR Link",
) )
return True return True
else: else:
@@ -240,6 +234,16 @@ def verify_user_with_person_api_task(user_id: int):
f"Dear {user.first_name} {user.last_name}, \n\nYour account registration is being processed. \n\nWe will notify you once verification is complete. \n\n - SAR Link", f"Dear {user.first_name} {user.last_name}, \n\nYour account registration is being processed. \n\nWe will notify you once verification is complete. \n\n - SAR Link",
) )
# send_clean_telegram_markdown(message=verification_failed_message) # send_clean_telegram_markdown(message=verification_failed_message)
try:
asyncio.run_coroutine_threadsafe(
send_telegram_alert(
markdown_message=escape_markdown_v2(verification_failed_message)
),
telegram_loop,
).result()
except Exception as e:
logger.warning("[Registration] TELEGRAM ALERT ERROR", e)
return False return False
else: else:
# Handle the error case # Handle the error case

View File

@@ -12,6 +12,7 @@ from .views import (
CancelTopupView, CancelTopupView,
ListWalletTransactionView, ListWalletTransactionView,
AdminTopupCreateView, AdminTopupCreateView,
# AlertTestView,
) )
urlpatterns = [ urlpatterns = [
@@ -50,4 +51,6 @@ urlpatterns = [
ListWalletTransactionView.as_view(), ListWalletTransactionView.as_view(),
name="list-wallet-transactions", name="list-wallet-transactions",
), ),
# Test tg notification
# path("test-alert/", AlertTestView.as_view(), name="test-alert"),
] ]

View File

@@ -30,6 +30,9 @@ from typing import Optional
from api.models import User from api.models import User
from api.omada import Omada from api.omada import Omada
# from api.bot import send_telegram_alert, telegram_loop, escape_markdown_v2
# import asyncio
env.read_env(os.path.join(BASE_DIR, ".env")) env.read_env(os.path.join(BASE_DIR, ".env"))
PAYMENT_BASE_URL = env("PAYMENT_BASE_URL", default="") # type: ignore PAYMENT_BASE_URL = env("PAYMENT_BASE_URL", default="") # type: ignore
@@ -685,3 +688,35 @@ class ListWalletTransactionView(StaffEditorPermissionMixin, generics.ListAPIView
serializer = self.get_serializer(queryset, many=True) serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data) return Response(serializer.data)
# class AlertTestView(generics.GenericAPIView):
# def get(self, request, *args, **kwargs):
# msg = """*ID Card:* A265117\n*Name:* Abdulla Aidhaan\n*House Name:* Nooree Villa\n*Date of Birth:* 1997-08-24\n*Island:* Sh Funadhoo\n*Mobile:* 9697404\nVisit [SAR Link Portal](https://portal.sarlink.net) to manually verify this user."""
# print(msg)
# print("escaped:", escape_markdown_v2(msg))
# user = request.user
# print(user)
# global telegram_loop # Access the global loop
# if telegram_loop is None:
# return Response(
# {"message": "Telegram worker not initialized."},
# status=status.HTTP_503_SERVICE_UNAVAILABLE,
# )
# try:
# asyncio.run_coroutine_threadsafe(
# send_telegram_alert(markdown_message=escape_markdown_v2(msg)),
# telegram_loop,
# ).result()
# return Response(
# {"message": "Alert sent successfully."}, status=status.HTTP_200_OK
# )
# except Exception as e:
# logger.warning("[alert test] TELEGRAM ALERT ERROR", e)
# return Response(
# {"message": "Alert failed to send."}, status=status.HTTP_400_BAD_REQUEST
# )