diff --git a/README.md b/README.md index 3c141a2..6ae9c29 100644 --- a/README.md +++ b/README.md @@ -47,3 +47,8 @@ curl -X POST http://localhost:4000/api/auth/login/ \ ``` expected response: `{"message":"Unable to log in with provided credentials."}` +5. For Celery to work run the worker and the beat +``` +celery -A apibase worker --loglevel=info +celery -A apibase beat --loglevel=info +``` \ No newline at end of file diff --git a/api/permissions.py b/api/permissions.py index 2d1a8c2..c600ff9 100644 --- a/api/permissions.py +++ b/api/permissions.py @@ -27,6 +27,8 @@ class IsStaffEditorPermission(permissions.DjangoModelPermissions): # Check permissions based on the request method perms = self.perms_map.get(request.method, []) - perms = [perm % {'app_label': app_label, 'model_name': model_name} for perm in perms] + perms = [ + perm % {"app_label": app_label, "model_name": model_name} for perm in perms + ] return request.user.has_perms(perms) diff --git a/api/sms.py b/api/sms.py index 71c83b9..c0473c9 100644 --- a/api/sms.py +++ b/api/sms.py @@ -13,7 +13,7 @@ if not api_url or not api_key: ) -def send_otp(mobile: str, otp: int, message: str): +def send_otp(mobile: str, otp: str, message: str): if not api_url or not api_key: logger.debug("Failed to send SMS. Missing SMS_API_URL or SMS_API_KEY.") return False diff --git a/api/tasks.py b/api/tasks.py index 461e01f..503311d 100644 --- a/api/tasks.py +++ b/api/tasks.py @@ -14,6 +14,27 @@ logger = logging.getLogger(__name__) env.read_env(os.path.join(BASE_DIR, ".env")) PERSON_VERIFY_BASE_URL = env.str("PERSON_VERIFY_BASE_URL") +OMADA_PROXY_API_KEY = env.str("OMADA_PROXY_API_KEY") +OMADA_PROXY_URL = env("OMADA_PROXY_URL") +OMADA_SITE_ID = env( + "OMADA_SITE_ID", +) +OMADA_GROUP_ID = env( + "OMADA_GROUP_ID", +) + +if not OMADA_SITE_ID: + raise ValueError( + "OMADA_SITE_ID is not set. Please set it in your environment variables." + ) +if not OMADA_GROUP_ID: + raise ValueError( + "OMADA_GROUP_ID is not set. Please set it in your environment variables." + ) +if not OMADA_PROXY_URL: + raise ValueError( + "OMADA_PROXY_URL is not set. Please set it in your environment variables." + ) @shared_task @@ -54,6 +75,81 @@ def deactivate_expired_devices(): } +def get_existing_omada_devices(): + """ + Get existing Omada devices from the database. + :return: List of existing device names. + """ + try: + response = requests.get( + f"{OMADA_PROXY_URL}/9fd0cffa3475a74ae4e4d37de0d12414/api/v2/sites/66dcddb804aa0d2978cf145f/setting/profiles/groups", + headers={"X-API-Key": str(OMADA_PROXY_API_KEY)}, + ) + print("Response: ", response.status_code) + data = response.json() + existing_devices = [] + if "result" in data and len(data["result"]["data"]) > 0: + last_entry = data["result"]["data"][-1] + print("Last Entry: ", last_entry) + if "macAddressList" in last_entry: + existing_devices = last_entry["macAddressList"] + print(existing_devices) + return existing_devices + except requests.RequestException as e: + print(f"Error fetching existing devices: {e}") + return [] + + +@shared_task +def add_new_devices_to_omada(new_devices: list[dict]): + """ + Add new devices to Omada. + :param new_devices: List of new device names to add. + """ + try: + PAYLOAD = { + "name": "REGISTERED_DEVICES", + "type": 2, + "resource": 0, + "ipList": None, + "ipv6List": None, + "macAddressList": None, + "portList": None, + "countryList": None, + "portType": None, + "portMaskList": None, + "domainNamePort": None, + } + existing_devices = get_existing_omada_devices() + PAYLOAD["macAddressList"] = existing_devices + print("Payload with existing devices: ", PAYLOAD) + for device in new_devices: + print("Device in loop: ", device) + PAYLOAD["macAddressList"].append( + { + "macAddress": device["mac"], + "name": device["name"], + } + ) + print("New Payload: ", PAYLOAD) + print( + f"{OMADA_PROXY_URL}/9fd0cffa3475a74ae4e4d37de0d12414/api/v2/sites/{OMADA_SITE_ID}/setting/profiles/groups/2/{OMADA_GROUP_ID}" + ) + response = requests.patch( + f"{OMADA_PROXY_URL}/9fd0cffa3475a74ae4e4d37de0d12414/api/v2/sites/{OMADA_SITE_ID}/setting/profiles/groups/2/{OMADA_GROUP_ID}", + headers={"X-API-Key": str(OMADA_PROXY_API_KEY)}, + json=PAYLOAD, + ) + print("Response: ", response.status_code) + if response.status_code == 200: + print("Devices successfully added.") + print(response.json()) + else: + print(f"Failed to add devices: {response.text}") + except requests.RequestException as e: + print(f"Error adding devices: {e}") + + def verify_user_with_person_api_task(user_id: int): """ Verify the user with the Person API. diff --git a/api/utils.py b/api/utils.py index 3f2cef7..6242a6d 100644 --- a/api/utils.py +++ b/api/utils.py @@ -18,7 +18,7 @@ def reverse_dhivehi_string(input_str): i = 0 while i < len(reversed_str): # Check if current character is a combining character - if i + 1 < len(reversed_str) and "\u0300" <= reversed_str[i + 1] <= "\u036F": + if i + 1 < len(reversed_str) and "\u0300" <= reversed_str[i + 1] <= "\u036f": # If next character is a combining mark, add it before the base character corrected_chars.append(reversed_str[i + 1] + reversed_str[i]) i += 2 diff --git a/api/views.py b/api/views.py index 5763152..295a199 100644 --- a/api/views.py +++ b/api/views.py @@ -1,4 +1,5 @@ # django imports +import pprint from django.contrib.auth import login # rest_framework imports @@ -31,7 +32,8 @@ from typing import cast, Dict, Any from django.core.mail import send_mail from django.db.models import Q from api.sms import send_otp -from .tasks import add, deactivate_expired_devices +from .tasks import add, add_new_devices_to_omada +from devices.models import Device # local apps import from .serializers import ( @@ -60,7 +62,8 @@ class ErrorMessages: @api_view(["GET"]) def healthcheck(request): add.delay(1, 2) - deactivate_expired_devices.delay() + # devices = Device.objects.filter(is_active=False).values() + # add_new_devices_to_omada.delay(new_devices=list(devices)) return Response({"status": "Good"}, status=status.HTTP_200_OK) @@ -117,7 +120,7 @@ class CreateTemporaryUserView(generics.CreateAPIView): current_date = timezone.now() try: - dob = timezone.datetime.strptime(dob, "%Y-%m-%d").date() + dob = timezone.datetime.strptime(str(dob), "%Y-%m-%d").date() except ValueError: return Response( {"message": "Invalid date format for DOB. Use YYYY-MM-DD."}, status=400 @@ -193,7 +196,7 @@ class CreateTemporaryUserView(generics.CreateAPIView): formatted_time = otp_expiry.strftime("%d/%m/%Y %H:%M:%S") otp = temp_user.generate_otp() send_otp( - temp_user.t_mobile, + str(temp_user.t_mobile), otp, f"Your Registration SARLink OTP: {otp}. \nExpires at {formatted_time}. \n\n- SAR Link", ) @@ -263,7 +266,7 @@ class VerifyOTPView(generics.GenericAPIView): User.objects.create_user( first_name=temp_user.t_first_name, last_name=temp_user.t_last_name, - username=temp_user.t_username, + username=str(temp_user.t_username), password="", address=temp_user.t_address, mobile=temp_user.t_mobile, diff --git a/billing/views.py b/billing/views.py index 646e796..21efb7b 100644 --- a/billing/views.py +++ b/billing/views.py @@ -1,32 +1,25 @@ # Create your views here. # billing/views.py +import os from datetime import timedelta -from django.utils import timezone import requests - +from django.utils import timezone +from django.utils.timezone import localtime from rest_framework import generics, status from rest_framework.response import Response from api.mixins import StaffEditorPermissionMixin +from api.tasks import add_new_devices_to_omada +from apibase.env import BASE_DIR, env from .models import Device, Payment from .serializers import PaymentSerializer, UpdatePaymentSerializer -from apibase.env import env, BASE_DIR -from django.utils.timezone import localtime -import os - env.read_env(os.path.join(BASE_DIR, ".env")) PAYMENT_BASE_URL = env("PAYMENT_BASE_URL", default=None) -OMADA_PROXY_URL = env("OMADA_PROXY_URL", default=None) - -if not OMADA_PROXY_URL: - raise ValueError( - "OMADA_PROXY_URL is not set. Please set it in your environment variables." - ) if not PAYMENT_BASE_URL: raise ValueError( "PAYMENT_BASE_URL is not set. Please set it in your environment variables." @@ -180,10 +173,19 @@ class VerifyPaymentView(StaffEditorPermissionMixin, generics.UpdateAPIView): registered=True, ) # Need to add to omada if its a new device and not an existing device + device_list = [] for device in devices: + device_list.append( + { + "mac": device.mac, + "name": device.name, + } + ) if not device.registered: # Add to omada - pass + add_new_devices_to_omada.delay(new_devices=device_list) + device.registered = True + device.save() return Response( {"message": f"Payment verified successfully using [{method}]."} @@ -242,12 +244,3 @@ class DeletePaymentView(StaffEditorPermissionMixin, generics.DestroyAPIView): devices = instance.devices.all() devices.update(is_active=False, expiry_date=None, has_a_pending_payment=False) return super().delete(request, *args, **kwargs) - - -def get_existing_devices(OMADA_SITE_ID: str, OMADA_GROUP_ID: str): - # Get existing devices from omada - response = requests.get( - f"{OMADA_PROXY_URL}/{OMADA_GROUP_ID}/api/v2/sites/{OMADA_SITE_ID}/setting/profiles/groups", - ) - response.raise_for_status() - return response.json() diff --git a/devices/filters.py b/devices/filters.py index 2e96677..f412a2b 100644 --- a/devices/filters.py +++ b/devices/filters.py @@ -5,8 +5,10 @@ from .models import Device class DeviceFilter(django_filters.FilterSet): name = django_filters.CharFilter(lookup_expr="icontains") mac = django_filters.CharFilter(lookup_expr="icontains") - user = django_filters.CharFilter(field_name='user__last_name', lookup_expr="icontains") + user = django_filters.CharFilter( + field_name="user__last_name", lookup_expr="icontains" + ) class Meta: model = Device - fields = "__all__" \ No newline at end of file + fields = "__all__" diff --git a/devices/migrations/0006_alter_device_mac.py b/devices/migrations/0006_alter_device_mac.py new file mode 100644 index 0000000..dfed27a --- /dev/null +++ b/devices/migrations/0006_alter_device_mac.py @@ -0,0 +1,20 @@ +# Generated by Django 5.2 on 2025-04-25 08:42 + +import devices.models +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("devices", "0005_device_has_a_pending_payment"), + ] + + operations = [ + migrations.AlterField( + model_name="device", + name="mac", + field=models.CharField( + max_length=255, validators=[devices.models.validate_mac_address] + ), + ), + ] diff --git a/devices/models.py b/devices/models.py index fde5e6d..336cf82 100644 --- a/devices/models.py +++ b/devices/models.py @@ -1,11 +1,26 @@ from django.db import models from django.utils import timezone from api.models import User +import regex +from django.core.exceptions import ValidationError + + +def validate_mac_address(value): + if not regex.match(r"^([0-9A-Fa-f]{2}-){5}[0-9A-Fa-f]{2}$", value): + raise ValidationError( + "This field accepts a valid MAC address in the format XX-XX-XX-XX-XX-XX using '-' as the separator." + ) + return value class Device(models.Model): name = models.CharField(max_length=255) - mac = models.CharField(max_length=255) + mac = models.CharField( + max_length=255, + validators=[ + validate_mac_address, + ], + ) has_a_pending_payment = models.BooleanField(default=False) reason_for_blocking = models.CharField(max_length=255, null=True, blank=True) is_active = models.BooleanField(default=False) diff --git a/devices/serializers.py b/devices/serializers.py index 7fec72e..9240886 100644 --- a/devices/serializers.py +++ b/devices/serializers.py @@ -7,14 +7,12 @@ from billing.models import Payment # Import the Payment model class CreateDeviceSerializer(serializers.ModelSerializer): name = serializers.CharField(required=True) mac = serializers.CharField(required=True) - registered = serializers.BooleanField(required=True) class Meta: model = Device fields = [ "name", "mac", - "registered", "blocked_by", ] depth = 2 diff --git a/devices/views.py b/devices/views.py index f19f148..675006f 100644 --- a/devices/views.py +++ b/devices/views.py @@ -52,6 +52,11 @@ class DeviceListCreateAPIView( return Response( {"message": "Device with this mac address already exists."}, status=400 ) + + # Normalize MAC address to use "-" as separators + mac = re.sub(r"[^0-9A-Fa-f]", "-", mac).upper() + request.data["mac"] = mac + return super().create(request, *args, **kwargs) def perform_create(self, serializer): diff --git a/djangopasswordlessknox/__init__.py b/djangopasswordlessknox/__init__.py index 156fb8d..dc75e56 100644 --- a/djangopasswordlessknox/__init__.py +++ b/djangopasswordlessknox/__init__.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- -__title__ = 'djangopasswordlessknox' -__version__ = '1.4.0' -__author__ = 'Lijo' -__license__ = 'MIT' -__copyright__ = 'Copyright 2019 lijo' +__title__ = "djangopasswordlessknox" +__version__ = "1.4.0" +__author__ = "Lijo" +__license__ = "MIT" +__copyright__ = "Copyright 2019 lijo" # Version synonym VERSION = __version__ -default_app_config = 'djangopasswordlessknox.apps.DrfpasswordlessConfig' +default_app_config = "djangopasswordlessknox.apps.DrfpasswordlessConfig" diff --git a/djangopasswordlessknox/__version__.py b/djangopasswordlessknox/__version__.py index d9e5b37..6fbb7f6 100644 --- a/djangopasswordlessknox/__version__.py +++ b/djangopasswordlessknox/__version__.py @@ -1,3 +1,3 @@ VERSION = (1, 4, 0) -__version__ = '.'.join(map(str, VERSION)) +__version__ = ".".join(map(str, VERSION)) diff --git a/djangopasswordlessknox/admin.py b/djangopasswordlessknox/admin.py index 860ad83..0f36766 100644 --- a/djangopasswordlessknox/admin.py +++ b/djangopasswordlessknox/admin.py @@ -7,20 +7,22 @@ class UserLinkMixin(object): """ A mixin to add a linkable list_display user field. """ - LINK_TO_USER_FIELD = 'link_to_user' + + LINK_TO_USER_FIELD = "link_to_user" def link_to_user(self, obj): - link = reverse('admin:users_user_change', args=[obj.user.id]) - return u'{}'.format(link, obj.user.username) + link = reverse("admin:users_user_change", args=[obj.user.id]) + return "{}".format(link, obj.user.username) + link_to_user.allow_tags = True - link_to_user.short_description = 'User' + link_to_user.short_description = "User" class AbstractCallbackTokenInline(admin.StackedInline): max_num = 0 extra = 0 - readonly_fields = ('created_at', 'key', 'is_active') - fields = ('created_at', 'user', 'key', 'is_active') + readonly_fields = ("created_at", "key", "is_active") + fields = ("created_at", "user", "key", "is_active") class CallbackInline(AbstractCallbackTokenInline): @@ -28,7 +30,7 @@ class CallbackInline(AbstractCallbackTokenInline): class AbstractCallbackTokenAdmin(UserLinkMixin, admin.ModelAdmin): - readonly_fields = ('created_at', 'user', 'key') - list_display = ('created_at', UserLinkMixin.LINK_TO_USER_FIELD, 'key', 'is_active') - fields = ('created_at', 'user', 'key', 'is_active') + readonly_fields = ("created_at", "user", "key") + list_display = ("created_at", UserLinkMixin.LINK_TO_USER_FIELD, "key", "is_active") + fields = ("created_at", "user", "key", "is_active") extra = 0 diff --git a/djangopasswordlessknox/apps.py b/djangopasswordlessknox/apps.py index a0451c7..5452187 100644 --- a/djangopasswordlessknox/apps.py +++ b/djangopasswordlessknox/apps.py @@ -1,8 +1,9 @@ from django.apps import AppConfig from django.utils.translation import gettext_lazy as _ + class DrfpasswordlessConfig(AppConfig): - name = 'djangopasswordlessknox' + name = "djangopasswordlessknox" verbose = _("DRF Passwordless") def ready(self): diff --git a/djangopasswordlessknox/migrations/0001_initial.py b/djangopasswordlessknox/migrations/0001_initial.py index a09553e..4f4b2ac 100644 --- a/djangopasswordlessknox/migrations/0001_initial.py +++ b/djangopasswordlessknox/migrations/0001_initial.py @@ -10,7 +10,6 @@ import uuid class Migration(migrations.Migration): - initial = True dependencies = [ @@ -19,25 +18,47 @@ class Migration(migrations.Migration): operations = [ migrations.CreateModel( - name='CallbackToken', + name="CallbackToken", fields=[ - ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False, unique=True)), - ('created_at', models.DateTimeField(auto_now_add=True)), - ('is_active', models.BooleanField(default=True)), - ('to_alias', models.CharField(blank=True, max_length=40)), - ('to_alias_type', models.CharField(blank=True, max_length=20)), - ('key', models.CharField(default=djangopasswordlessknox.models.generate_numeric_token, max_length=6, unique=True)), - ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)), + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + unique=True, + ), + ), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("is_active", models.BooleanField(default=True)), + ("to_alias", models.CharField(blank=True, max_length=40)), + ("to_alias_type", models.CharField(blank=True, max_length=20)), + ( + "key", + models.CharField( + default=djangopasswordlessknox.models.generate_numeric_token, + max_length=6, + unique=True, + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to=settings.AUTH_USER_MODEL, + ), + ), ], options={ - 'verbose_name': 'Callback Token', - 'abstract': False, - 'ordering': ['-id'], - 'get_latest_by': 'created_at', + "verbose_name": "Callback Token", + "abstract": False, + "ordering": ["-id"], + "get_latest_by": "created_at", }, ), migrations.AlterUniqueTogether( - name='callbacktoken', - unique_together=set([('key', 'is_active')]), + name="callbacktoken", + unique_together=set([("key", "is_active")]), ), ] diff --git a/djangopasswordlessknox/models.py b/djangopasswordlessknox/models.py index d3bcbc7..66ac3ca 100644 --- a/djangopasswordlessknox/models.py +++ b/djangopasswordlessknox/models.py @@ -33,9 +33,14 @@ class AbstractBaseCallbackToken(models.Model): When a new token is created, older ones of the same type are invalidated via the pre_save signal in signals.py. """ - id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True) + + id = models.UUIDField( + primary_key=True, default=uuid.uuid4, editable=False, unique=True + ) created_at = models.DateTimeField(auto_now_add=True) - user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name=None, on_delete=models.CASCADE) + user = models.ForeignKey( + settings.AUTH_USER_MODEL, related_name=None, on_delete=models.CASCADE + ) is_active = models.BooleanField(default=True) to_alias = models.CharField(blank=True, max_length=40) to_alias_type = models.CharField(blank=True, max_length=20) @@ -44,9 +49,9 @@ class AbstractBaseCallbackToken(models.Model): class Meta: abstract = True - get_latest_by = 'created_at' - ordering = ['-id'] - unique_together = (('key', 'is_active'),) + get_latest_by = "created_at" + ordering = ["-id"] + unique_together = (("key", "is_active"),) def __str__(self): return str(self.key) @@ -56,7 +61,8 @@ class CallbackToken(AbstractBaseCallbackToken): """ Generates a random six digit number to be returned. """ + key = models.CharField(default=generate_numeric_token, max_length=6, unique=True) class Meta(AbstractBaseCallbackToken.Meta): - verbose_name = 'Callback Token' + verbose_name = "Callback Token" diff --git a/djangopasswordlessknox/serializers.py b/djangopasswordlessknox/serializers.py index 73bf96a..e5e363b 100644 --- a/djangopasswordlessknox/serializers.py +++ b/djangopasswordlessknox/serializers.py @@ -6,7 +6,11 @@ from django.core.validators import RegexValidator from rest_framework import serializers from djangopasswordlessknox.models import CallbackToken from djangopasswordlessknox.settings import api_settings -from djangopasswordlessknox.utils import authenticate_by_token, verify_user_alias, validate_token_age +from djangopasswordlessknox.utils import ( + authenticate_by_token, + verify_user_alias, + validate_token_age, +) logger = logging.getLogger(__name__) User = get_user_model() @@ -14,11 +18,11 @@ User = get_user_model() class TokenField(serializers.CharField): default_error_messages = { - 'required': _('Invalid Token'), - 'invalid': _('Invalid Token'), - 'blank': _('Invalid Token'), - 'max_length': _('Tokens are {max_length} digits long.'), - 'min_length': _('Tokens are {min_length} digits long.') + "required": _("Invalid Token"), + "invalid": _("Invalid Token"), + "blank": _("Invalid Token"), + "max_length": _("Tokens are {max_length} digits long."), + "min_length": _("Tokens are {min_length} digits long."), } @@ -48,10 +52,10 @@ class AbstractBaseAliasAuthenticationSerializer(serializers.Serializer): except User.DoesNotExist: # If no user is found, raise an error msg = "" - if self.alias_type == 'email': - msg = _('No user found with this email.') - elif self.alias_type == 'mobile': - msg = _('No user found with this mobile number.') + if self.alias_type == "email": + msg = _("No user found with this email.") + elif self.alias_type == "mobile": + msg = _("No user found with this mobile number.") raise serializers.ValidationError(msg) else: # If new aliases should not register new users. @@ -63,23 +67,23 @@ class AbstractBaseAliasAuthenticationSerializer(serializers.Serializer): if user: if not user.is_active: # If valid, return attrs so we can create a token in our logic controller - msg = _('User account is disabled.') + msg = _("User account is disabled.") raise serializers.ValidationError(msg) else: - msg = _('No account is associated with this alias.') + msg = _("No account is associated with this alias.") raise serializers.ValidationError(msg) else: - msg = _('Missing %s.') % self.alias_type + msg = _("Missing %s.") % self.alias_type raise serializers.ValidationError(msg) - attrs['user'] = user + attrs["user"] = user return attrs class EmailAuthSerializer(AbstractBaseAliasAuthenticationSerializer): @property def alias_type(self): - return 'email' + return "email" email = serializers.EmailField() @@ -87,11 +91,13 @@ class EmailAuthSerializer(AbstractBaseAliasAuthenticationSerializer): class MobileAuthSerializer(AbstractBaseAliasAuthenticationSerializer): @property def alias_type(self): - return 'mobile' + return "mobile" - phone_regex = RegexValidator(regex=r'^[7|9][0-9]{6}$', - message="Mobile number must be entered in the format:" - " '7xxxxxx' or '9xxxxxx'.") + phone_regex = RegexValidator( + regex=r"^[7|9][0-9]{6}$", + message="Mobile number must be entered in the format:" + " '7xxxxxx' or '9xxxxxx'.", + ) mobile = serializers.CharField(validators=[phone_regex], max_length=15) @@ -105,14 +111,14 @@ class AbstractBaseAliasVerificationSerializer(serializers.Serializer): Abstract class that returns a callback token based on the field given Returns a token if valid, None or a message if not. """ + @property def alias_type(self): # The alias type, either email or mobile raise NotImplementedError def validate(self, attrs): - - msg = _('There was a problem with your request.') + msg = _("There was a problem with your request.") if self.alias_type: # Get request.user @@ -125,31 +131,31 @@ class AbstractBaseAliasVerificationSerializer(serializers.Serializer): if user: if not user.is_active: # If valid, return attrs so we can create a token in our logic controller - msg = _('User account is disabled.') + msg = _("User account is disabled.") else: if hasattr(user, self.alias_type): # Has the appropriate alias type - attrs['user'] = user + attrs["user"] = user return attrs else: - msg = _('This user doesn\'t have an %s.' % self.alias_type) + msg = _("This user doesn't have an %s." % self.alias_type) raise serializers.ValidationError(msg) else: - msg = _('Missing %s.') % self.alias_type + msg = _("Missing %s.") % self.alias_type raise serializers.ValidationError(msg) class EmailVerificationSerializer(AbstractBaseAliasVerificationSerializer): @property def alias_type(self): - return 'email' + return "email" class MobileVerificationSerializer(AbstractBaseAliasVerificationSerializer): @property def alias_type(self): - return 'mobile' + return "mobile" """ @@ -173,13 +179,13 @@ class AbstractBaseCallbackTokenSerializer(serializers.Serializer): Abstract class inspired by DRF's own token serializer. Returns a user if valid, None or a message if not. """ + token = TokenField(min_length=6, max_length=6, validators=[token_age_validator]) class CallbackTokenAuthSerializer(AbstractBaseCallbackTokenSerializer): - def validate(self, attrs): - callback_token = attrs.get('token', None) + callback_token = attrs.get("token", None) token = CallbackToken.objects.get(key=callback_token, is_active=True) @@ -189,27 +195,29 @@ class CallbackTokenAuthSerializer(AbstractBaseCallbackTokenSerializer): user = authenticate_by_token(token) if user: if not user.is_active: - msg = _('User account is disabled.') + msg = _("User account is disabled.") raise serializers.ValidationError(msg) - if api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED \ - or api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED: + if ( + api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED + or api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED + ): # Mark this alias as verified user = User.objects.get(pk=token.user.pk) success = verify_user_alias(user, token) if success is False: - msg = _('Error validating user alias.') + msg = _("Error validating user alias.") raise serializers.ValidationError(msg) - attrs['user'] = user + attrs["user"] = user return attrs else: - msg = _('Invalid Token') + msg = _("Invalid Token") raise serializers.ValidationError(msg) else: - msg = _('Missing authentication token.') + msg = _("Missing authentication token.") raise serializers.ValidationError(msg) @@ -222,7 +230,7 @@ class CallbackTokenVerificationSerializer(AbstractBaseCallbackTokenSerializer): def validate(self, attrs): try: user_id = self.context.get("user_id") - callback_token = attrs.get('token', None) + callback_token = attrs.get("token", None) token = CallbackToken.objects.get(key=callback_token, is_active=True) user = User.objects.get(pk=user_id) @@ -235,23 +243,31 @@ class CallbackTokenVerificationSerializer(AbstractBaseCallbackTokenSerializer): if success is False: logger.debug("djangopasswordlessknox: Error verifying alias.") - attrs['user'] = user + attrs["user"] = user return attrs else: - msg = _('This token is invalid. Try again later.') - logger.debug("djangopasswordlessknox: User token mismatch when verifying alias.") + msg = _("This token is invalid. Try again later.") + logger.debug( + "djangopasswordlessknox: User token mismatch when verifying alias." + ) except CallbackToken.DoesNotExist: - msg = _('Missing authentication token.') - logger.debug("djangopasswordlessknox: Tried to validate alias with bad token.") + msg = _("Missing authentication token.") + logger.debug( + "djangopasswordlessknox: Tried to validate alias with bad token." + ) pass except User.DoesNotExist: - msg = _('Missing user.') - logger.debug("djangopasswordlessknox: Tried to validate alias with bad user.") + msg = _("Missing user.") + logger.debug( + "djangopasswordlessknox: Tried to validate alias with bad user." + ) pass except PermissionDenied: - msg = _('Insufficient permissions.') - logger.debug("djangopasswordlessknox: Permission denied while validating alias.") + msg = _("Insufficient permissions.") + logger.debug( + "djangopasswordlessknox: Permission denied while validating alias." + ) pass raise serializers.ValidationError(msg) diff --git a/djangopasswordlessknox/services.py b/djangopasswordlessknox/services.py index ed06a12..3b12326 100644 --- a/djangopasswordlessknox/services.py +++ b/djangopasswordlessknox/services.py @@ -1,23 +1,23 @@ from djangopasswordlessknox.utils import ( create_callback_token_for_user, send_email_with_callback_token, - send_sms_with_callback_token + send_sms_with_callback_token, ) class TokenService(object): @staticmethod def send_token(user, alias_type, **message_payload): - token = create_callback_token_for_user(user, alias_type) - send_action = None - if alias_type == 'email': - send_action = send_email_with_callback_token - elif alias_type == 'mobile': - send_action = send_sms_with_callback_token + token = create_callback_token_for_user(user, alias_type) + send_action = None + if alias_type == "email": + send_action = send_email_with_callback_token + elif alias_type == "mobile": + send_action = send_sms_with_callback_token - if send_action is None: - raise ValueError(f"Invalid alias_type: {alias_type}") + if send_action is None: + raise ValueError(f"Invalid alias_type: {alias_type}") - # Send to alias - success = send_action(user, token, **message_payload) - return success + # Send to alias + success = send_action(user, token, **message_payload) + return success diff --git a/djangopasswordlessknox/signals.py b/djangopasswordlessknox/signals.py index f4c9fcf..381dfdb 100644 --- a/djangopasswordlessknox/signals.py +++ b/djangopasswordlessknox/signals.py @@ -17,7 +17,11 @@ def invalidate_previous_tokens(sender, instance, **kwargs): """ active_tokens = None if isinstance(instance, CallbackToken): - active_tokens = CallbackToken.objects.active().filter(user=instance.user).exclude(id=instance.id) + active_tokens = ( + CallbackToken.objects.active() + .filter(user=instance.user) + .exclude(id=instance.id) + ) # Invalidate tokens if active_tokens: @@ -46,15 +50,15 @@ def update_alias_verification(sender, instance, **kwargs): Optionally sends a verification token to the new endpoint. """ if isinstance(instance, User): - if instance.id: - if api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED is True: """ For marking email aliases as not verified when a user changes it. """ email_field = api_settings.PASSWORDLESS_USER_EMAIL_FIELD_NAME - email_verified_field = api_settings.PASSWORDLESS_USER_EMAIL_VERIFIED_FIELD_NAME + email_verified_field = ( + api_settings.PASSWORDLESS_USER_EMAIL_VERIFIED_FIELD_NAME + ) # Verify that this is an existing instance and not a new one. try: @@ -62,24 +66,41 @@ def update_alias_verification(sender, instance, **kwargs): instance_email = getattr(instance, email_field) # Incoming Email old_email = getattr(user_old, email_field) # Pre-save object email - if instance_email != old_email and instance_email != "" and instance_email is not None: + if ( + instance_email != old_email + and instance_email != "" + and instance_email is not None + ): # Email changed, verification should be flagged setattr(instance, email_verified_field, False) - if api_settings.PASSWORDLESS_AUTO_SEND_VERIFICATION_TOKEN is True: - email_subject = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_SUBJECT + if ( + api_settings.PASSWORDLESS_AUTO_SEND_VERIFICATION_TOKEN + is True + ): + email_subject = ( + api_settings.PASSWORDLESS_EMAIL_VERIFICATION_SUBJECT + ) email_plaintext = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_PLAINTEXT_MESSAGE email_html = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_TOKEN_HTML_TEMPLATE_NAME - message_payload = {'email_subject': email_subject, - 'email_plaintext': email_plaintext, - 'email_html': email_html} - success = TokenService.send_token(instance, 'email', **message_payload) + message_payload = { + "email_subject": email_subject, + "email_plaintext": email_plaintext, + "email_html": email_html, + } + success = TokenService.send_token( + instance, "email", **message_payload + ) if success: - logger.info('djangopasswordlessknox: Successfully sent email on updated address: %s' - % instance_email) + logger.info( + "djangopasswordlessknox: Successfully sent email on updated address: %s" + % instance_email + ) else: - logger.info('djangopasswordlessknox: Failed to send email to updated address: %s' - % instance_email) + logger.info( + "djangopasswordlessknox: Failed to send email to updated address: %s" + % instance_email + ) except User.DoesNotExist: # User probably is just initially being created @@ -90,28 +111,45 @@ def update_alias_verification(sender, instance, **kwargs): For marking mobile aliases as not verified when a user changes it. """ mobile_field = api_settings.PASSWORDLESS_USER_MOBILE_FIELD_NAME - mobile_verified_field = api_settings.PASSWORDLESS_USER_MOBILE_VERIFIED_FIELD_NAME + mobile_verified_field = ( + api_settings.PASSWORDLESS_USER_MOBILE_VERIFIED_FIELD_NAME + ) # Verify that this is an existing instance and not a new one. try: user_old = User.objects.get(id=instance.id) # Pre-save object instance_mobile = getattr(instance, mobile_field) # Incoming mobile - old_mobile = getattr(user_old, mobile_field) # Pre-save object mobile + old_mobile = getattr( + user_old, mobile_field + ) # Pre-save object mobile - if instance_mobile != old_mobile and instance_mobile != "" and instance_mobile is not None: + if ( + instance_mobile != old_mobile + and instance_mobile != "" + and instance_mobile is not None + ): # Mobile changed, verification should be flagged setattr(instance, mobile_verified_field, False) - if api_settings.PASSWORDLESS_AUTO_SEND_VERIFICATION_TOKEN is True: + if ( + api_settings.PASSWORDLESS_AUTO_SEND_VERIFICATION_TOKEN + is True + ): mobile_message = api_settings.PASSWORDLESS_MOBILE_MESSAGE - message_payload = {'mobile_message': mobile_message} - success = TokenService.send_token(instance, 'mobile', **message_payload) + message_payload = {"mobile_message": mobile_message} + success = TokenService.send_token( + instance, "mobile", **message_payload + ) if success: - logger.info('djangopasswordlessknox: Successfully sent SMS on updated mobile: %s' - % instance_mobile) + logger.info( + "djangopasswordlessknox: Successfully sent SMS on updated mobile: %s" + % instance_mobile + ) else: - logger.info('djangopasswordlessknox: Failed to send SMS to updated mobile: %s' - % instance_mobile) + logger.info( + "djangopasswordlessknox: Failed to send SMS to updated mobile: %s" + % instance_mobile + ) except User.DoesNotExist: # User probably is just initially being created diff --git a/djangopasswordlessknox/urls.py b/djangopasswordlessknox/urls.py index 4a5f8b0..7ebc511 100644 --- a/djangopasswordlessknox/urls.py +++ b/djangopasswordlessknox/urls.py @@ -1,18 +1,34 @@ from django.urls import path from djangopasswordlessknox.views import ( - ObtainEmailCallbackToken, - ObtainMobileCallbackToken, - ObtainAuthTokenFromCallbackToken, - VerifyAliasFromCallbackToken, - ObtainEmailVerificationCallbackToken, - ObtainMobileVerificationCallbackToken, + ObtainEmailCallbackToken, + ObtainMobileCallbackToken, + ObtainAuthTokenFromCallbackToken, + VerifyAliasFromCallbackToken, + ObtainEmailVerificationCallbackToken, + ObtainMobileVerificationCallbackToken, ) urlpatterns = [ - path('callback/auth/', ObtainAuthTokenFromCallbackToken.as_view(), name='auth_callback'), - path('auth/email/', ObtainEmailCallbackToken.as_view(), name='auth_email'), - path('auth/mobile/', ObtainMobileCallbackToken.as_view(), name='auth_mobile'), - path('callback/verify/', VerifyAliasFromCallbackToken.as_view(), name='verify_callback'), - path('verify/email/', ObtainEmailVerificationCallbackToken.as_view(), name='verify_email'), - path('verify/mobile/', ObtainMobileVerificationCallbackToken.as_view(), name='verify_mobile'), + path( + "callback/auth/", + ObtainAuthTokenFromCallbackToken.as_view(), + name="auth_callback", + ), + path("auth/email/", ObtainEmailCallbackToken.as_view(), name="auth_email"), + path("auth/mobile/", ObtainMobileCallbackToken.as_view(), name="auth_mobile"), + path( + "callback/verify/", + VerifyAliasFromCallbackToken.as_view(), + name="verify_callback", + ), + path( + "verify/email/", + ObtainEmailVerificationCallbackToken.as_view(), + name="verify_email", + ), + path( + "verify/mobile/", + ObtainMobileVerificationCallbackToken.as_view(), + name="verify_mobile", + ), ] diff --git a/djangopasswordlessknox/views.py b/djangopasswordlessknox/views.py index 407adf2..450cdc6 100644 --- a/djangopasswordlessknox/views.py +++ b/djangopasswordlessknox/views.py @@ -2,7 +2,7 @@ import logging from rest_framework import parsers, renderers, status from rest_framework.authtoken.models import Token from rest_framework.response import Response -from rest_framework.permissions import AllowAny, IsAuthenticated +from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.views import APIView from rest_framework import generics from djangopasswordlessknox.settings import api_settings @@ -29,6 +29,7 @@ class AbstractBaseObtainCallbackToken(APIView): """ This returns a 6-digit callback token we can trade for a user's Auth Token. """ + success_response = "A login token has been sent to you." failure_response = "Unable to send you a login code. Try again later." @@ -49,12 +50,16 @@ class AbstractBaseObtainCallbackToken(APIView): # Only allow auth types allowed in settings. return Response(status=status.HTTP_404_NOT_FOUND) - serializer = self.serializer_class(data=request.data, context={'request': request}) + serializer = self.serializer_class( + data=request.data, context={"request": request} + ) if serializer.is_valid(raise_exception=True): # Validate - - user = serializer.validated_data['user'] + user = serializer.validated_data["user"] # Create and send callback token - success = TokenService.send_token(user, self.alias_type, **self.message_payload) + success = TokenService.send_token( + user, self.alias_type, **self.message_payload + ) # Respond With Success Or Failure of Sent if success: @@ -63,67 +68,79 @@ class AbstractBaseObtainCallbackToken(APIView): else: status_code = status.HTTP_400_BAD_REQUEST response_detail = self.failure_response - return Response({'detail': response_detail}, status=status_code) + return Response({"detail": response_detail}, status=status_code) else: - return Response(serializer.error_messages, status=status.HTTP_400_BAD_REQUEST) + return Response( + serializer.error_messages, status=status.HTTP_400_BAD_REQUEST + ) -class ObtainEmailCallbackToken(AbstractBaseObtainCallbackToken, generics.GenericAPIView): +class ObtainEmailCallbackToken( + AbstractBaseObtainCallbackToken, generics.GenericAPIView +): permission_classes = (AllowAny,) serializer_class = EmailAuthSerializer success_response = "A login token has been sent to your email." failure_response = "Unable to email you a login code. Try again later." - alias_type = 'email' + alias_type = "email" email_subject = api_settings.PASSWORDLESS_EMAIL_SUBJECT email_plaintext = api_settings.PASSWORDLESS_EMAIL_PLAINTEXT_MESSAGE email_html = api_settings.PASSWORDLESS_EMAIL_TOKEN_HTML_TEMPLATE_NAME - message_payload = {'email_subject': email_subject, - 'email_plaintext': email_plaintext, - 'email_html': email_html} + message_payload = { + "email_subject": email_subject, + "email_plaintext": email_plaintext, + "email_html": email_html, + } -class ObtainMobileCallbackToken(AbstractBaseObtainCallbackToken, generics.GenericAPIView): +class ObtainMobileCallbackToken( + AbstractBaseObtainCallbackToken, generics.GenericAPIView +): permission_classes = (AllowAny,) serializer_class = MobileAuthSerializer success_response = "We texted you a login code." failure_response = "Unable to send you a login code. Try again later." - alias_type = 'mobile' + alias_type = "mobile" mobile_message = api_settings.PASSWORDLESS_MOBILE_MESSAGE - message_payload = {'mobile_message': mobile_message} + message_payload = {"mobile_message": mobile_message} -class ObtainEmailVerificationCallbackToken(AbstractBaseObtainCallbackToken, generics.GenericAPIView): +class ObtainEmailVerificationCallbackToken( + AbstractBaseObtainCallbackToken, generics.GenericAPIView +): permission_classes = (IsAuthenticated,) serializer_class = EmailVerificationSerializer success_response = "A verification token has been sent to your email." failure_response = "Unable to email you a verification code. Try again later." - alias_type = 'email' + alias_type = "email" email_subject = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_SUBJECT email_plaintext = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_PLAINTEXT_MESSAGE email_html = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_TOKEN_HTML_TEMPLATE_NAME message_payload = { - 'email_subject': email_subject, - 'email_plaintext': email_plaintext, - 'email_html': email_html + "email_subject": email_subject, + "email_plaintext": email_plaintext, + "email_html": email_html, } -class ObtainMobileVerificationCallbackToken(AbstractBaseObtainCallbackToken, generics.GenericAPIView): +class ObtainMobileVerificationCallbackToken( + AbstractBaseObtainCallbackToken, generics.GenericAPIView +): permission_classes = (IsAuthenticated,) serializer_class = MobileVerificationSerializer success_response = "We texted you a verification code." failure_response = "Unable to send you a verification code. Try again later." - alias_type = 'mobile' + alias_type = "mobile" mobile_message = api_settings.PASSWORDLESS_MOBILE_MESSAGE - message_payload = {'mobile_message': mobile_message} + message_payload = {"mobile_message": mobile_message} class AbstractBaseObtainAuthToken(APIView): @@ -131,10 +148,11 @@ class AbstractBaseObtainAuthToken(APIView): This is a duplicate of rest_framework's own ObtainAuthToken method. Instead, this returns an Auth Token based on our 6 digit callback token and source. """ + serializer_class = None def get_context(self): - return {'request': self.request, 'format': self.format_kwarg, 'view': self} + return {"request": self.request, "format": self.format_kwarg, "view": self} def get_token_ttl(self): return knox_settings.TOKEN_TTL @@ -155,43 +173,41 @@ class AbstractBaseObtainAuthToken(APIView): def get_post_response_data(self, user, token, instance): UserSerializer = self.get_user_serializer_class() - data = { - 'expiry': self.format_expiry_datetime(instance.expiry), - 'token': token - } + data = {"expiry": self.format_expiry_datetime(instance.expiry), "token": token} if UserSerializer is not None: - data["user"] = UserSerializer( - user, - context=self.get_context() - ).data + data["user"] = UserSerializer(user, context=self.get_context()).data return data def post(self, request, format=None): token_limit_per_user = self.get_token_limit_per_user() serializer = self.serializer_class(data=request.data) if serializer.is_valid(raise_exception=True): - user = serializer.validated_data['user'] + user = serializer.validated_data["user"] if token_limit_per_user is not None: now = timezone.now() token = user.auth_token_set.filter(expiry__gt=now) if token.count() >= token_limit_per_user: return Response( - {"error": "Maximum amount of tokens allowed per user exceeded."}, - status=status.HTTP_403_FORBIDDEN + { + "error": "Maximum amount of tokens allowed per user exceeded." + }, + status=status.HTTP_403_FORBIDDEN, ) token_ttl = self.get_token_ttl() instance, token = AuthToken.objects.create(user, token_ttl) - user_logged_in.send(sender=user.__class__, - request=request, user=user) + user_logged_in.send(sender=user.__class__, request=request, user=user) data = self.get_post_response_data(user, token, instance) return Response(data) -class ObtainAuthTokenFromCallbackToken(AbstractBaseObtainAuthToken, generics.GenericAPIView): +class ObtainAuthTokenFromCallbackToken( + AbstractBaseObtainAuthToken, generics.GenericAPIView +): """ This is a duplicate of rest_framework's own ObtainAuthToken method. Instead, this returns an Auth Token based on our callback token and source. """ + permission_classes = (AllowAny,) serializer_class = CallbackTokenAuthSerializer @@ -201,13 +217,23 @@ class VerifyAliasFromCallbackToken(APIView): This verifies an alias on correct callback token entry using the same logic as auth. Should be refactored at some point. """ + serializer_class = CallbackTokenVerificationSerializer def post(self, request, *args, **kwargs): - serializer = self.serializer_class(data=request.data, context={'user_id': self.request.user.id}) + serializer = self.serializer_class( + data=request.data, context={"user_id": self.request.user.id} + ) if serializer.is_valid(raise_exception=True): - return Response({'detail': 'Alias verified.'}, status=status.HTTP_200_OK) + return Response({"detail": "Alias verified."}, status=status.HTTP_200_OK) else: - logger.error("Couldn't verify unknown user. Errors on serializer: {}".format(serializer.error_messages)) + logger.error( + "Couldn't verify unknown user. Errors on serializer: {}".format( + serializer.error_messages + ) + ) - return Response({'detail': 'We couldn\'t verify this alias. Try again later.'}, status.HTTP_400_BAD_REQUEST) + return Response( + {"detail": "We couldn't verify this alias. Try again later."}, + status.HTTP_400_BAD_REQUEST, + ) diff --git a/pyrightconfig.json b/pyrightconfig.json index 6ee0185..caf7dda 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -6,6 +6,7 @@ "typeCheckingMode": "standard", "reportArgumentType": "warning", "reportUnusedVariable": "warning", + "reportFunctionMemberAccess": "none", "exclude": [ "council-api/**/migrations", "**/__pycache__",