# pyright: reportGeneralTypeIssues=false from django.shortcuts import get_object_or_404 from api.models import TemporaryUser from devices.models import Device from api.notifications import send_sms import os import logging from django.utils import timezone # from api.notifications import send_clean_telegram_markdown from api.omada import Omada from api.bot import send_telegram_alert, telegram_loop, escape_markdown_v2 import asyncio from apibase.env import env, BASE_DIR from procrastinate.contrib.django import app from procrastinate import builtin_tasks import time logger = logging.getLogger(__name__) env.read_env(os.path.join(BASE_DIR, ".env")) @app.periodic(cron="0 * * * *") # every 1 hour @app.task( queueing_lock="remove_old_jobs", pass_context=True, ) async def remove_old_jobs(context, timestamp): logger.info("Running remove_old_jobs task...") return await builtin_tasks.remove_old_jobs( context, queue="heavy_tasks", max_hours=1, remove_failed=True, remove_cancelled=True, remove_aborted=True, ) @app.periodic( cron="0 22 * * *", queue="heavy_tasks", periodic_id="deactivate_expired_devices_and_block_in_omada", ) # type: ignore @app.task def deactivate_expired_devices_and_block_in_omada(): expired_devices = Device.objects.filter( expiry_date__lte=timezone.localtime(timezone.now()), is_active=True ).select_related("user") print("Expired Devices: ", expired_devices) count = expired_devices.count() if count == 0: return {"total_expired_devices": 0} user_devices_map = {} devices_successfully_blocked = [] devices_failed_to_block = [] omada_client = Omada() # Single loop to collect data and block devices for device in expired_devices: # Collect devices for SMS notifications if device.user and device.user.mobile: if device.user.mobile not in user_devices_map: user_devices_map[device.user.mobile] = [] user_devices_map[device.user.mobile].append(device.name) # Try to block device in Omada try: omada_client.block_device(mac_address=device.mac, operation="block") # Only prepare for update if Omada blocking succeeded device.blocked = True device.is_active = False devices_successfully_blocked.append(device) logger.info(f"Successfully blocked device {device.mac} in Omada") time.sleep(20) # Sleep to avoid rate limiting except Exception as e: logger.error(f"Failed to block device [omada] {device.mac}: {e}") devices_failed_to_block.append(device) # Continue to next device without updating this one # Bulk update only successfully blocked devices if devices_successfully_blocked: try: Device.objects.bulk_update( devices_successfully_blocked, ["is_active", "blocked"] ) logger.info( f"Successfully updated {len(devices_successfully_blocked)} devices in database" ) except Exception as e: logger.error(f"Failed to bulk update devices in database: {e}") # You might want to handle this case - devices are blocked in Omada but not updated in DB # Send SMS notifications sms_count = 0 for mobile, device_names in user_devices_map.items(): if not mobile: continue device_list = "\n".join( [f"{i + 1}. {name}" for i, name in enumerate(device_names)] ) print("device list: ", device_list) try: send_sms( mobile, f"Dear {mobile}, \n\nThe following devices have expired: \n{device_list}. \n\nPlease make a payment to keep your devices active. \n\n- SAR Link", ) sms_count += 1 except Exception as e: logger.error(f"Failed to send SMS to {mobile}: {e}") print(f"Total {count} expired devices processed.") print(f"Successfully blocked: {len(devices_successfully_blocked)}") print(f"Failed to block: {len(devices_failed_to_block)}") print(f"SMS notifications sent: {sms_count}") return { "total_expired_devices": count, "successfully_blocked": len(devices_successfully_blocked), "failed_to_block": len(devices_failed_to_block), "sms_sent": sms_count, } @app.task def add_new_devices_to_omada(new_devices: list[dict]): """ Add new devices to Omada via Omada class. :param new_devices: List of new device names to add. """ logger.info("Running add new devices to Omada task...") omada_client = Omada() omada_client.add_new_devices_to_omada(new_devices) def verify_user_with_person_api_task(user_id: int): """ Verify the user with the Person API. :param user_id: The ID of the user to verify. """ if not user_id: logger.error("User ID is not provided.") return None t_user = get_object_or_404(TemporaryUser, id=user_id) if not t_user: logger.error(f"User with ID {user_id} not found.") return None verification_failed_message = f"""*The following user verification failed*:\n\n*ID Card:* {t_user.t_id_card}\n*Name:* {t_user.t_first_name} {t_user.t_last_name}\n*House Name:* {t_user.t_address}\n*Date of Birth:* {t_user.t_dob}\n*Island:* {(t_user.t_atoll.name if t_user.t_atoll else "N/A")} {(t_user.t_island.name if t_user.t_island else "N/A")}\n*Mobile:* {t_user.t_mobile}\nVisit [SAR Link Portal](https://portal.sarlink.net/users/{user_id}/details) to manually verify this user. """ # logger.info(verification_failed_message) PERSON_VERIFY_BASE_URL = env.str("PERSON_VERIFY_BASE_URL", default="") # type: ignore if not PERSON_VERIFY_BASE_URL: raise ValueError( "PERSON_VERIFY_BASE_URL is not set in the environment variables." ) import requests response = requests.get(f"{PERSON_VERIFY_BASE_URL}/api/person/{t_user.id_card}") if response.status_code == 200: data = response.json() api_nic = data.get("nic") api_name = data.get("name_en") api_house_name = data.get("house_name_en") api_dob = data.get("dob") api_atoll = data.get("atoll_en") api_island_name = data.get("island_name_en") if not t_user.mobile or t_user.dob is None: logger.error("User mobile or date of birth is not set.") return None if not t_user.island or t_user.atoll is None: logger.error("User island or atoll is not set.") return None logger.info(f"API nic: {api_nic}") logger.info(f"API name: {api_name}") logger.info(f"API house name: {api_house_name}") logger.info(f"API dob: {api_dob}") logger.info(f"API atoll: {api_atoll}") logger.info(f"API island name: {api_island_name}") user_nic = t_user.t_id_card user_name = f"{t_user.t_first_name} {t_user.t_last_name}" user_house_name = t_user.t_address user_dob = t_user.t_dob.isoformat() logger.info(f"User nic: {user_nic}") logger.info(f"User name: {user_name}") logger.info(f"User house name: {user_house_name}") logger.info(f"User dob: {user_dob}") logger.info(f"User atoll: {t_user.t_atoll.name if t_user.t_atoll else 'N/A'}") logger.info( f"User island name: {t_user.t_island.name if t_user.t_island else 'N/A'}" ) logger.info( f"case User atoll: {t_user.t_atoll.name == api_atoll.strip() if api_atoll else False}" ) # Defensive check for api_atoll logger.info(f"api atoll type: {type(api_atoll)}") logger.info(f"user atoll type: {type(t_user.t_atoll.name)}") logger.info( f"case User island name: {t_user.t_island.name == api_island_name.strip() if api_island_name else False}" ) # Defensive check for api_island_name logger.info(f"api island name type: {type(api_island_name)}") logger.info(f"user island name type: {type(t_user.t_island.name)}") if ( data.get("nic") == t_user.t_id_card and data.get("name_en") == f"{t_user.t_first_name} {t_user.t_last_name}" and data.get("house_name_en") == t_user.t_address and data.get("dob").split("T")[0] == t_user.t_dob.isoformat() and data.get("atoll_en").strip() == t_user.t_atoll.name and data.get("island_name_en").strip() == t_user.t_island.name ): t_user.t_verified = True t_user.save() send_sms( t_user.t_mobile, f"Dear {t_user.t_first_name} {t_user.t_last_name}, \n\nYour account has been successfully verified. \n\nYou can now manage your devices and make payments through our portal at https://portal.sarlink.net. \n\n - SAR Link", ) return True else: t_user.t_verified = False t_user.save() send_sms( t_user.t_mobile, f"Dear {t_user.t_first_name} {t_user.t_last_name}, \n\nYour account registration is being processed. \n\nWe will notify you once verification is complete. \n\n - SAR Link", ) # send_clean_telegram_markdown(message=verification_failed_message) try: asyncio.run_coroutine_threadsafe( send_telegram_alert( markdown_message=escape_markdown_v2(verification_failed_message) ), telegram_loop, ).result() except Exception as e: logger.warning("[Registration] TELEGRAM ALERT ERROR", e) return False else: # Handle the error case print( f"Error verifying user with api: {response.status_code} - {response.text}" ) return