import logging from django.utils.translation import gettext_lazy as _ from django.contrib.auth import get_user_model from django.core.exceptions import PermissionDenied from django.core.validators import RegexValidator from rest_framework import serializers from djangopasswordlessknox.models import CallbackToken from djangopasswordlessknox.settings import api_settings from djangopasswordlessknox.utils import ( authenticate_by_token, verify_user_alias, validate_token_age, ) logger = logging.getLogger(__name__) User = get_user_model() class TokenField(serializers.CharField): default_error_messages = { "required": _("Invalid Token"), "invalid": _("Invalid Token"), "blank": _("Invalid Token"), "max_length": _("Tokens are {max_length} digits long."), "min_length": _("Tokens are {min_length} digits long."), } class AbstractBaseAliasAuthenticationSerializer(serializers.Serializer): """ Abstract class that returns a callback token based on the field given Returns a token if valid, None or a message if not. """ @property def alias_type(self): # The alias type, either email or mobile raise NotImplementedError def validate(self, attrs): alias = attrs.get(self.alias_type) print("ALIAS", alias) print("ALIAS TYPE", self.alias_type) if alias: # Create or authenticate a user # Return THem if api_settings.PASSWORDLESS_REGISTER_NEW_USERS is True: # If new aliases should register new users. try: user = User.objects.get(**{self.alias_type: alias}) user_created = False except User.DoesNotExist: # If no user is found, raise an error msg = "" if self.alias_type == "email": msg = _("No user found with this email.") elif self.alias_type == "mobile": msg = _("No user found with this mobile number.") raise serializers.ValidationError(msg) else: # If new aliases should not register new users. try: user = User.objects.get(**{self.alias_type: alias}) except User.DoesNotExist: user = None if user: if not user.is_active: # If valid, return attrs so we can create a token in our logic controller msg = _("User account is disabled.") raise serializers.ValidationError(msg) else: msg = _("No account is associated with this alias.") raise serializers.ValidationError(msg) else: msg = _("Missing %s.") % self.alias_type raise serializers.ValidationError(msg) attrs["user"] = user return attrs class EmailAuthSerializer(AbstractBaseAliasAuthenticationSerializer): @property def alias_type(self): return "email" email = serializers.EmailField() class MobileAuthSerializer(AbstractBaseAliasAuthenticationSerializer): @property def alias_type(self): return "mobile" phone_regex = RegexValidator( regex=r"^[7|9][0-9]{6}$", message="Mobile number must be entered in the format:" " '7xxxxxx' or '9xxxxxx'.", ) mobile = serializers.CharField(validators=[phone_regex], max_length=15) """ Verification """ class AbstractBaseAliasVerificationSerializer(serializers.Serializer): """ Abstract class that returns a callback token based on the field given Returns a token if valid, None or a message if not. """ @property def alias_type(self): # The alias type, either email or mobile raise NotImplementedError def validate(self, attrs): msg = _("There was a problem with your request.") if self.alias_type: # Get request.user # Get their specified valid endpoint # Validate request = self.context["request"] if request and hasattr(request, "user"): user = request.user if user: if not user.is_active: # If valid, return attrs so we can create a token in our logic controller msg = _("User account is disabled.") else: if hasattr(user, self.alias_type): # Has the appropriate alias type attrs["user"] = user return attrs else: msg = _("This user doesn't have an %s." % self.alias_type) raise serializers.ValidationError(msg) else: msg = _("Missing %s.") % self.alias_type raise serializers.ValidationError(msg) class EmailVerificationSerializer(AbstractBaseAliasVerificationSerializer): @property def alias_type(self): return "email" class MobileVerificationSerializer(AbstractBaseAliasVerificationSerializer): @property def alias_type(self): return "mobile" """ Callback Token """ def token_age_validator(value): """ Check token age Makes sure a token is within the proper expiration datetime window. """ valid_token = validate_token_age(value) if not valid_token: raise serializers.ValidationError("The token you entered isn't valid.") return value class AbstractBaseCallbackTokenSerializer(serializers.Serializer): """ Abstract class inspired by DRF's own token serializer. Returns a user if valid, None or a message if not. """ token = TokenField(min_length=6, max_length=6, validators=[token_age_validator]) class CallbackTokenAuthSerializer(AbstractBaseCallbackTokenSerializer): def validate(self, attrs): callback_token = attrs.get("token", None) token = CallbackToken.objects.get(key=callback_token, is_active=True) if token: # Check the token type for our uni-auth method. # authenticates and checks the expiry of the callback token. user = authenticate_by_token(token) if user: if not user.is_active: msg = _("User account is disabled.") raise serializers.ValidationError(msg) if ( api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED or api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED ): # Mark this alias as verified user = User.objects.get(pk=token.user.pk) success = verify_user_alias(user, token) if success is False: msg = _("Error validating user alias.") raise serializers.ValidationError(msg) attrs["user"] = user return attrs else: msg = _("Invalid Token") raise serializers.ValidationError(msg) else: msg = _("Missing authentication token.") raise serializers.ValidationError(msg) class CallbackTokenVerificationSerializer(AbstractBaseCallbackTokenSerializer): """ Takes a user and a token, verifies the token belongs to the user and validates the alias that the token was sent from. """ def validate(self, attrs): try: user_id = self.context.get("user_id") callback_token = attrs.get("token", None) token = CallbackToken.objects.get(key=callback_token, is_active=True) user = User.objects.get(pk=user_id) if token.user == user: # Check that the token.user is the request.user # Mark this alias as verified success = verify_user_alias(user, token) if success is False: logger.debug("djangopasswordlessknox: Error verifying alias.") attrs["user"] = user return attrs else: msg = _("This token is invalid. Try again later.") logger.debug( "djangopasswordlessknox: User token mismatch when verifying alias." ) except CallbackToken.DoesNotExist: msg = _("Missing authentication token.") logger.debug( "djangopasswordlessknox: Tried to validate alias with bad token." ) pass except User.DoesNotExist: msg = _("Missing user.") logger.debug( "djangopasswordlessknox: Tried to validate alias with bad user." ) pass except PermissionDenied: msg = _("Insufficient permissions.") logger.debug( "djangopasswordlessknox: Permission denied while validating alias." ) pass raise serializers.ValidationError(msg)