120 lines
4.6 KiB
120 lines
4.6 KiB
import os
import asyncio
import logging
from aiosmtpd.controller import Controller
from email.parser import BytesParser
from email.policy import default
import telegram
import re
import httpx
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configuration
NTFY_TOKEN = os.getenv('NTFY_TOKEN')
NTFY_URL = os.getenv('NTFY_URL')
TELEGRAM_DOMAIN = os.getenv('TELEGRAM_DOMAIN', 'telegram.local')
NTFY_DOMAIN = os.getenv('NTFY_DOMAIN', 'ntfy.local')
# SMTP configuration
SMTP_HOST = '' # Listen on all interfaces
SMTP_PORT = 2525 # Alternative SMTP port
class SMTPHandler:
async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
logger.info(f"Received RCPT command for address: {address}")
return '250 OK'
async def handle_DATA(self, server, session, envelope):
logger.info("Handling DATA command")
parser = BytesParser(policy=default)
message = parser.parsebytes(envelope.content)
subject = message.get('subject', 'No subject')
body = message.get_body(preferencelist=('plain', 'html')).get_content()
full_message = f"Subject: {subject}\n\n{body}"
logger.info(f"Parsed email - Subject: {subject}")
logger.debug(f"Email body: {body[:100]}...") # Log first 100 chars of body
# Extract destination from the recipient email address
destination, service = self.extract_destination(envelope.rcpt_tos[0])
logger.info(f"Extracted destination: {destination}, service: {service}")
if service == 'telegram':
await self.send_telegram(destination, full_message)
elif service == 'ntfy':
await self.send_ntfy(destination, subject, body)
logger.error(f"Invalid service: {service}. Message not sent.")
return '250 Message accepted for delivery'
def extract_destination(self, email):
logger.info(f"Extracting destination from email: {email}")
telegram_pattern = rf'^(.+)@{re.escape(TELEGRAM_DOMAIN)}$'
ntfy_pattern = rf'^(.+)@{re.escape(NTFY_DOMAIN)}$'
telegram_match = re.match(telegram_pattern, email)
if telegram_match:
logger.info(f"Matched Telegram pattern: {telegram_match.group(1)}")
return telegram_match.group(1), 'telegram'
ntfy_match = re.match(ntfy_pattern, email)
if ntfy_match:
logger.info(f"Matched ntfy pattern: {ntfy_match.group(1)}")
return ntfy_match.group(1), 'ntfy'
logger.error(f"Invalid email format: {email}. Unable to determine service.")
return None, None
async def send_telegram(self, chat_id, message):
if not chat_id:
logger.error("No valid Telegram chat ID provided. Message not sent.")
logger.info(f"Sending message to Telegram chat ID: {chat_id}")
bot = telegram.Bot(TELEGRAM_API_KEY)
await bot.send_message(chat_id=chat_id, text=message)
logger.info(f"Message sent successfully to Telegram chat ID: {chat_id}")
except telegram.error.BadRequest as e:
logger.error(f"Failed to send message to Telegram chat ID: {chat_id}. Error: {str(e)}")
async def send_ntfy(self, topic, subject, body):
if not topic:
logger.error("No valid ntfy topic provided. Message not sent.")
logger.info(f"Sending message to ntfy topic: {topic}")
headers = {
"Authorization": f"Bearer {NTFY_TOKEN}",
"Title": subject,
"Content-Type": "text/plain",
async with httpx.AsyncClient() as client:
response = await client.post(f"{NTFY_URL}/{topic}", content=body, headers=headers)
logger.info(f"Message sent successfully to ntfy topic: {topic}")
except httpx.HTTPStatusError as e:
logger.error(f"Failed to send message to ntfy topic: {topic}. Error: {str(e)}")
async def start_smtp_server():
controller = Controller(SMTPHandler(), hostname=SMTP_HOST, port=SMTP_PORT)
if __name__ == '__main__':
logger.info(f"Starting SMTP server on {SMTP_HOST}:{SMTP_PORT}")
logger.info(f"Telegram domain: {TELEGRAM_DOMAIN}")
logger.info(f"ntfy domain: {NTFY_DOMAIN}")
loop = asyncio.get_event_loop()
logger.info("SMTP server started. Waiting for incoming messages...")