Files
sarlink-portal-api/api/tasks.py
i701 4714b6ec15
All checks were successful
Build and Push Docker Images / Build and Push Docker Images (push) Successful in 4m2s
move registration sms to view from signals and fix include User verified
attribute 🐛
2025-09-15 21:33:59 +05:00

256 lines
9.4 KiB
Python

# pyright: reportGeneralTypeIssues=false
from django.shortcuts import get_object_or_404
from api.models import TemporaryUser
from devices.models import Device
from api.notifications import send_sms
import os
import logging
from django.utils import timezone
# from api.notifications import send_clean_telegram_markdown
from api.omada import Omada
from api.bot import send_telegram_alert, telegram_loop, escape_markdown_v2
import asyncio
from apibase.env import env, BASE_DIR
from procrastinate.contrib.django import app
from procrastinate import builtin_tasks
import time
import requests
logger = logging.getLogger(__name__)
env.read_env(os.path.join(BASE_DIR, ".env"))
@app.periodic(cron="0 * * * *") # every 1 hour
@app.task(
queueing_lock="remove_old_jobs",
pass_context=True,
)
async def remove_old_jobs(context, timestamp):
logger.info("Running remove_old_jobs task...")
return await builtin_tasks.remove_old_jobs(
context,
queue="heavy_tasks",
max_hours=1,
remove_failed=True,
remove_cancelled=True,
remove_aborted=True,
)
@app.periodic(
cron="0 22 * * *",
queue="heavy_tasks",
periodic_id="deactivate_expired_devices_and_block_in_omada",
) # type: ignore
@app.task
def deactivate_expired_devices_and_block_in_omada():
expired_devices = Device.objects.filter(
expiry_date__lte=timezone.localtime(timezone.now()), is_active=True
).select_related("user")
print("Expired Devices: ", expired_devices)
count = expired_devices.count()
if count == 0:
return {"total_expired_devices": 0}
user_devices_map = {}
devices_successfully_blocked = []
devices_failed_to_block = []
omada_client = Omada()
# Single loop to collect data and block devices
for device in expired_devices:
# Collect devices for SMS notifications
if device.user and device.user.mobile:
if device.user.mobile not in user_devices_map:
user_devices_map[device.user.mobile] = []
user_devices_map[device.user.mobile].append(device.name)
# Try to block device in Omada
try:
omada_client.block_device(mac_address=device.mac, operation="block")
# Only prepare for update if Omada blocking succeeded
device.blocked = True
device.is_active = False
devices_successfully_blocked.append(device)
logger.info(f"Successfully blocked device {device.mac} in Omada")
time.sleep(20) # Sleep to avoid rate limiting
except Exception as e:
logger.error(f"Failed to block device [omada] {device.mac}: {e}")
devices_failed_to_block.append(device)
# Continue to next device without updating this one
# Bulk update only successfully blocked devices
if devices_successfully_blocked:
try:
Device.objects.bulk_update(
devices_successfully_blocked, ["is_active", "blocked"]
)
logger.info(
f"Successfully updated {len(devices_successfully_blocked)} devices in database"
)
except Exception as e:
logger.error(f"Failed to bulk update devices in database: {e}")
# You might want to handle this case - devices are blocked in Omada but not updated in DB
# Send SMS notifications
sms_count = 0
for mobile, device_names in user_devices_map.items():
if not mobile:
continue
device_list = "\n".join(
[f"{i + 1}. {name}" for i, name in enumerate(device_names)]
)
print("device list: ", device_list)
try:
send_sms(
mobile,
f"Dear {mobile}, \n\nThe following devices have expired: \n{device_list}. \n\nPlease make a payment to keep your devices active. \n\n- SAR Link",
)
sms_count += 1
except Exception as e:
logger.error(f"Failed to send SMS to {mobile}: {e}")
print(f"Total {count} expired devices processed.")
print(f"Successfully blocked: {len(devices_successfully_blocked)}")
print(f"Failed to block: {len(devices_failed_to_block)}")
print(f"SMS notifications sent: {sms_count}")
return {
"total_expired_devices": count,
"successfully_blocked": len(devices_successfully_blocked),
"failed_to_block": len(devices_failed_to_block),
"sms_sent": sms_count,
}
@app.task
def add_new_devices_to_omada(new_devices: list[dict]):
"""
Add new devices to Omada via Omada class.
:param new_devices: List of new device names to add.
"""
logger.info("Running add new devices to Omada task...")
omada_client = Omada()
omada_client.add_new_devices_to_omada(new_devices)
def verify_user_with_person_api_task(user_id: int):
"""
Verify the user with the Person API.
:param user_id: The ID of the user to verify.
"""
PERSON_VERIFY_BASE_URL = env.str("PERSON_VERIFY_BASE_URL", default="") # type: ignore
if not PERSON_VERIFY_BASE_URL:
raise ValueError(
"PERSON_VERIFY_BASE_URL is not set in the environment variables."
)
print(f"Verifying user with ID: {user_id}")
if not user_id:
logger.error("User ID is not provided.")
return None
t_user = get_object_or_404(TemporaryUser, t_id=user_id)
if not t_user:
logger.error(f"User with ID {user_id} not found.")
return None
print("t_user:", t_user)
response = requests.get(f"{PERSON_VERIFY_BASE_URL}/api/person/{t_user.t_id_card}")
verification_failed_message = f"""*The following user verification failed*:\n\n*ID Card:* {t_user.t_id_card}\n*Name:* {t_user.t_first_name} {t_user.t_last_name}\n*House Name:* {t_user.t_address}\n*Date of Birth:* {t_user.t_dob}\n*Island:* {(t_user.t_atoll.name if t_user.t_atoll else "N/A")} {(t_user.t_island.name if t_user.t_island else "N/A")}\n*Mobile:* {t_user.t_mobile}\nVisit [SAR Link Portal](https://portal.sarlink.net/users/{user_id}/details) to manually verify this user.
"""
logger.info(verification_failed_message)
if response.status_code == 200:
data = response.json()
api_nic = data.get("nic")
api_name = data.get("name_en")
api_house_name = data.get("house_name_en")
api_dob = data.get("dob")
api_atoll = data.get("atoll_en")
api_island_name = data.get("island_name_en")
if not t_user.t_mobile or t_user.t_dob is None:
logger.error("User mobile or date of birth is not set.")
return None
if not t_user.t_island or t_user.t_atoll is None:
logger.error("User island or atoll is not set.")
return None
logger.info(f"API nic: {api_nic}")
logger.info(f"API name: {api_name}")
logger.info(f"API house name: {api_house_name}")
logger.info(f"API dob: {api_dob}")
logger.info(f"API atoll: {api_atoll}")
logger.info(f"API island name: {api_island_name}")
user_nic = t_user.t_id_card
user_name = f"{t_user.t_first_name} {t_user.t_last_name}"
user_house_name = t_user.t_address
user_dob = t_user.t_dob.isoformat()
logger.info(f"User nic: {user_nic}")
logger.info(f"User name: {user_name}")
logger.info(f"User house name: {user_house_name}")
logger.info(f"User dob: {user_dob}")
logger.info(f"User atoll: {t_user.t_atoll.name if t_user.t_atoll else 'N/A'}")
logger.info(
f"User island name: {t_user.t_island.name if t_user.t_island else 'N/A'}"
)
logger.info(
f"case User atoll: {t_user.t_atoll.name == api_atoll.strip() if api_atoll else False}"
) # Defensive check for api_atoll
logger.info(f"api atoll type: {type(api_atoll)}")
logger.info(f"user atoll type: {type(t_user.t_atoll.name)}")
logger.info(
f"case User island name: {t_user.t_island.name == api_island_name.strip() if api_island_name else False}"
) # Defensive check for api_island_name
logger.info(f"api island name type: {type(api_island_name)}")
logger.info(f"user island name type: {type(t_user.t_island.name)}")
print("CHECKING USER FIELDS AGAINST API DATA")
if (
data.get("nic") == t_user.t_id_card
and data.get("name_en") == f"{t_user.t_first_name} {t_user.t_last_name}"
and data.get("house_name_en") == t_user.t_address
and data.get("dob").split("T")[0] == t_user.t_dob.isoformat()
and data.get("atoll_en").strip() == t_user.t_atoll.name
and data.get("island_name_en").strip() == t_user.t_island.name
):
t_user.t_verified = True
t_user.save()
return True
else:
t_user.t_verified = False
t_user.save()
# send_clean_telegram_markdown(message=verification_failed_message)
try:
asyncio.run_coroutine_threadsafe(
send_telegram_alert(
markdown_message=escape_markdown_v2(verification_failed_message)
),
telegram_loop,
).result()
except Exception as e:
logger.warning("[Registration] TELEGRAM ALERT ERROR", e)
return False
else:
# Handle the error case
print(
f"Error verifying user with api: {response.status_code} - {response.text}"
)
return