mirror of
https://github.com/i701/sarlink-portal-api.git
synced 2025-10-05 13:35:23 +00:00
All checks were successful
Build and Push Docker Images / Build and Push Docker Images (push) Successful in 4m52s
259 lines
9.9 KiB
Python
259 lines
9.9 KiB
Python
# pyright: reportGeneralTypeIssues=false
|
|
from django.shortcuts import get_object_or_404
|
|
from api.models import TemporaryUser
|
|
from devices.models import Device
|
|
from api.notifications import send_sms
|
|
import os
|
|
import logging
|
|
from django.utils import timezone
|
|
|
|
# from api.notifications import send_clean_telegram_markdown
|
|
from api.omada import Omada
|
|
from api.bot import send_telegram_alert, telegram_loop, escape_markdown_v2
|
|
import asyncio
|
|
from apibase.env import env, BASE_DIR
|
|
from procrastinate.contrib.django import app
|
|
from procrastinate import builtin_tasks
|
|
import time
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
env.read_env(os.path.join(BASE_DIR, ".env"))
|
|
|
|
|
|
@app.periodic(cron="0 * * * *") # every 1 hour
|
|
@app.task(
|
|
queueing_lock="remove_old_jobs",
|
|
pass_context=True,
|
|
)
|
|
async def remove_old_jobs(context, timestamp):
|
|
logger.info("Running remove_old_jobs task...")
|
|
return await builtin_tasks.remove_old_jobs(
|
|
context,
|
|
queue="heavy_tasks",
|
|
max_hours=1,
|
|
remove_failed=True,
|
|
remove_cancelled=True,
|
|
remove_aborted=True,
|
|
)
|
|
|
|
|
|
@app.periodic(
|
|
cron="0 22 * * *",
|
|
queue="heavy_tasks",
|
|
periodic_id="deactivate_expired_devices_and_block_in_omada",
|
|
) # type: ignore
|
|
@app.task
|
|
def deactivate_expired_devices_and_block_in_omada():
|
|
expired_devices = Device.objects.filter(
|
|
expiry_date__lte=timezone.localtime(timezone.now()), is_active=True
|
|
).select_related("user")
|
|
|
|
print("Expired Devices: ", expired_devices)
|
|
count = expired_devices.count()
|
|
|
|
if count == 0:
|
|
return {"total_expired_devices": 0}
|
|
|
|
user_devices_map = {}
|
|
devices_successfully_blocked = []
|
|
devices_failed_to_block = []
|
|
omada_client = Omada()
|
|
|
|
# Single loop to collect data and block devices
|
|
for device in expired_devices:
|
|
# Collect devices for SMS notifications
|
|
if device.user and device.user.mobile:
|
|
if device.user.mobile not in user_devices_map:
|
|
user_devices_map[device.user.mobile] = []
|
|
user_devices_map[device.user.mobile].append(device.name)
|
|
|
|
# Try to block device in Omada
|
|
try:
|
|
omada_client.block_device(mac_address=device.mac, operation="block")
|
|
# Only prepare for update if Omada blocking succeeded
|
|
device.blocked = True
|
|
device.is_active = False
|
|
devices_successfully_blocked.append(device)
|
|
logger.info(f"Successfully blocked device {device.mac} in Omada")
|
|
time.sleep(20) # Sleep to avoid rate limiting
|
|
except Exception as e:
|
|
logger.error(f"Failed to block device [omada] {device.mac}: {e}")
|
|
devices_failed_to_block.append(device)
|
|
# Continue to next device without updating this one
|
|
|
|
# Bulk update only successfully blocked devices
|
|
if devices_successfully_blocked:
|
|
try:
|
|
Device.objects.bulk_update(
|
|
devices_successfully_blocked, ["is_active", "blocked"]
|
|
)
|
|
logger.info(
|
|
f"Successfully updated {len(devices_successfully_blocked)} devices in database"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to bulk update devices in database: {e}")
|
|
# You might want to handle this case - devices are blocked in Omada but not updated in DB
|
|
|
|
# Send SMS notifications
|
|
sms_count = 0
|
|
for mobile, device_names in user_devices_map.items():
|
|
if not mobile:
|
|
continue
|
|
device_list = "\n".join(
|
|
[f"{i + 1}. {name}" for i, name in enumerate(device_names)]
|
|
)
|
|
print("device list: ", device_list)
|
|
try:
|
|
send_sms(
|
|
mobile,
|
|
f"Dear {mobile}, \n\nThe following devices have expired: \n{device_list}. \n\nPlease make a payment to keep your devices active. \n\n- SAR Link",
|
|
)
|
|
sms_count += 1
|
|
except Exception as e:
|
|
logger.error(f"Failed to send SMS to {mobile}: {e}")
|
|
|
|
print(f"Total {count} expired devices processed.")
|
|
print(f"Successfully blocked: {len(devices_successfully_blocked)}")
|
|
print(f"Failed to block: {len(devices_failed_to_block)}")
|
|
print(f"SMS notifications sent: {sms_count}")
|
|
|
|
return {
|
|
"total_expired_devices": count,
|
|
"successfully_blocked": len(devices_successfully_blocked),
|
|
"failed_to_block": len(devices_failed_to_block),
|
|
"sms_sent": sms_count,
|
|
}
|
|
|
|
|
|
@app.task
|
|
def add_new_devices_to_omada(new_devices: list[dict]):
|
|
"""
|
|
Add new devices to Omada via Omada class.
|
|
:param new_devices: List of new device names to add.
|
|
"""
|
|
logger.info("Running add new devices to Omada task...")
|
|
omada_client = Omada()
|
|
omada_client.add_new_devices_to_omada(new_devices)
|
|
|
|
|
|
def verify_user_with_person_api_task(user_id: int):
|
|
"""
|
|
Verify the user with the Person API.
|
|
:param user_id: The ID of the user to verify.
|
|
"""
|
|
print(f"Verifying user with ID: {user_id}")
|
|
if not user_id:
|
|
logger.error("User ID is not provided.")
|
|
return None
|
|
t_user = get_object_or_404(TemporaryUser, t_id=user_id)
|
|
if not t_user:
|
|
logger.error(f"User with ID {user_id} not found.")
|
|
return None
|
|
print(t_user)
|
|
return
|
|
|
|
verification_failed_message = f"""*The following user verification failed*:\n\n*ID Card:* {t_user.t_id_card}\n*Name:* {t_user.t_first_name} {t_user.t_last_name}\n*House Name:* {t_user.t_address}\n*Date of Birth:* {t_user.t_dob}\n*Island:* {(t_user.t_atoll.name if t_user.t_atoll else "N/A")} {(t_user.t_island.name if t_user.t_island else "N/A")}\n*Mobile:* {t_user.t_mobile}\nVisit [SAR Link Portal](https://portal.sarlink.net/users/{user_id}/details) to manually verify this user.
|
|
"""
|
|
|
|
# logger.info(verification_failed_message)
|
|
PERSON_VERIFY_BASE_URL = env.str("PERSON_VERIFY_BASE_URL", default="") # type: ignore
|
|
|
|
if not PERSON_VERIFY_BASE_URL:
|
|
raise ValueError(
|
|
"PERSON_VERIFY_BASE_URL is not set in the environment variables."
|
|
)
|
|
import requests
|
|
|
|
response = requests.get(f"{PERSON_VERIFY_BASE_URL}/api/person/{t_user.t_id_card}")
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
api_nic = data.get("nic")
|
|
api_name = data.get("name_en")
|
|
api_house_name = data.get("house_name_en")
|
|
api_dob = data.get("dob")
|
|
api_atoll = data.get("atoll_en")
|
|
api_island_name = data.get("island_name_en")
|
|
|
|
if not t_user.mobile or t_user.dob is None:
|
|
logger.error("User mobile or date of birth is not set.")
|
|
return None
|
|
if not t_user.island or t_user.atoll is None:
|
|
logger.error("User island or atoll is not set.")
|
|
return None
|
|
|
|
logger.info(f"API nic: {api_nic}")
|
|
logger.info(f"API name: {api_name}")
|
|
logger.info(f"API house name: {api_house_name}")
|
|
logger.info(f"API dob: {api_dob}")
|
|
logger.info(f"API atoll: {api_atoll}")
|
|
logger.info(f"API island name: {api_island_name}")
|
|
|
|
user_nic = t_user.t_id_card
|
|
user_name = f"{t_user.t_first_name} {t_user.t_last_name}"
|
|
user_house_name = t_user.t_address
|
|
user_dob = t_user.t_dob.isoformat()
|
|
|
|
logger.info(f"User nic: {user_nic}")
|
|
logger.info(f"User name: {user_name}")
|
|
logger.info(f"User house name: {user_house_name}")
|
|
logger.info(f"User dob: {user_dob}")
|
|
logger.info(f"User atoll: {t_user.t_atoll.name if t_user.t_atoll else 'N/A'}")
|
|
logger.info(
|
|
f"User island name: {t_user.t_island.name if t_user.t_island else 'N/A'}"
|
|
)
|
|
|
|
logger.info(
|
|
f"case User atoll: {t_user.t_atoll.name == api_atoll.strip() if api_atoll else False}"
|
|
) # Defensive check for api_atoll
|
|
logger.info(f"api atoll type: {type(api_atoll)}")
|
|
logger.info(f"user atoll type: {type(t_user.t_atoll.name)}")
|
|
logger.info(
|
|
f"case User island name: {t_user.t_island.name == api_island_name.strip() if api_island_name else False}"
|
|
) # Defensive check for api_island_name
|
|
logger.info(f"api island name type: {type(api_island_name)}")
|
|
logger.info(f"user island name type: {type(t_user.t_island.name)}")
|
|
|
|
if (
|
|
data.get("nic") == t_user.t_id_card
|
|
and data.get("name_en") == f"{t_user.t_first_name} {t_user.t_last_name}"
|
|
and data.get("house_name_en") == t_user.t_address
|
|
and data.get("dob").split("T")[0] == t_user.t_dob.isoformat()
|
|
and data.get("atoll_en").strip() == t_user.t_atoll.name
|
|
and data.get("island_name_en").strip() == t_user.t_island.name
|
|
):
|
|
t_user.t_verified = True
|
|
t_user.save()
|
|
send_sms(
|
|
t_user.t_mobile,
|
|
f"Dear {t_user.t_first_name} {t_user.t_last_name}, \n\nYour account has been successfully verified. \n\nYou can now manage your devices and make payments through our portal at https://portal.sarlink.net. \n\n - SAR Link",
|
|
)
|
|
return True
|
|
else:
|
|
t_user.t_verified = False
|
|
t_user.save()
|
|
|
|
send_sms(
|
|
t_user.t_mobile,
|
|
f"Dear {t_user.t_first_name} {t_user.t_last_name}, \n\nYour account registration is being processed. \n\nWe will notify you once verification is complete. \n\n - SAR Link",
|
|
)
|
|
# send_clean_telegram_markdown(message=verification_failed_message)
|
|
|
|
try:
|
|
asyncio.run_coroutine_threadsafe(
|
|
send_telegram_alert(
|
|
markdown_message=escape_markdown_v2(verification_failed_message)
|
|
),
|
|
telegram_loop,
|
|
).result()
|
|
except Exception as e:
|
|
logger.warning("[Registration] TELEGRAM ALERT ERROR", e)
|
|
return False
|
|
else:
|
|
# Handle the error case
|
|
print(
|
|
f"Error verifying user with api: {response.status_code} - {response.text}"
|
|
)
|
|
return
|