import asyncio import threading import logging import time from telegram import Bot from telegram.constants import ParseMode from decouple import config import re logger = logging.getLogger(__name__) telegram_loop = None BOT_TOKEN = config("TG_BOT_TOKEN", default="killme", cast=str) CHAT_ID = config("TG_CHAT_ID", default="drake", cast=str) if not BOT_TOKEN or not isinstance(BOT_TOKEN, str): raise ValueError( "TG_BOT_TOKEN environment variable must be set and must be a string." ) if not CHAT_ID: raise ValueError( "TG_CHAT_ID environment variable must be set and must be a string." ) bot = Bot(token=BOT_TOKEN) def telegram_worker(): """ Run the event loop for Telegram in a separate daemon thread. """ global telegram_loop telegram_loop = asyncio.new_event_loop() asyncio.set_event_loop(telegram_loop) try: logger.info("Telegram loop started.") telegram_loop.run_forever() except Exception as e: logger.exception(f"Telegram worker crashed! {e}", exc_info=True) finally: telegram_loop.close() # Start the Telegram worker thread when the module is loaded telegram_thread = threading.Thread(target=telegram_worker, daemon=True) telegram_thread.start() # Wait until telegram_loop is ready timeout = 5 for _ in range(timeout * 10): # up to 5 seconds if telegram_loop is not None: break time.sleep(0.1) else: logger.error("Telegram loop failed to initialize in time.") async def send_telegram_alert(markdown_message: str): logger.info("[TELEGRAM] Preparing to send alert...") await bot.send_message( chat_id=str(CHAT_ID), text=markdown_message, parse_mode=ParseMode.MARKDOWN_V2, ) def escape_markdown_v2(text: str) -> str: escape_chars = r"_~`>#+-=|{}.!\\" return re.sub(f"([{re.escape(escape_chars)}])", r"\\\1", text)