import logging from django.utils.translation import gettext_lazy as _ from django.contrib.auth import get_user_model from django.core.exceptions import PermissionDenied from django.core.validators import RegexValidator from rest_framework import serializers from djangopasswordlessknox.models import CallbackToken from djangopasswordlessknox.settings import api_settings from djangopasswordlessknox.utils import authenticate_by_token, verify_user_alias, validate_token_age logger = logging.getLogger(__name__) User = get_user_model() class TokenField(serializers.CharField): default_error_messages = { 'required': _('Invalid Token'), 'invalid': _('Invalid Token'), 'blank': _('Invalid Token'), 'max_length': _('Tokens are {max_length} digits long.'), 'min_length': _('Tokens are {min_length} digits long.') } class AbstractBaseAliasAuthenticationSerializer(serializers.Serializer): """ Abstract class that returns a callback token based on the field given Returns a token if valid, None or a message if not. """ @property def alias_type(self): # The alias type, either email or mobile raise NotImplementedError def validate(self, attrs): alias = attrs.get(self.alias_type) print("ALIAS", alias) print("ALIAS TYPE", self.alias_type) if alias: # Create or authenticate a user # Return THem if api_settings.PASSWORDLESS_REGISTER_NEW_USERS is True: # If new aliases should register new users. try: user = User.objects.get(**{self.alias_type: alias}) user_created = False except User.DoesNotExist: # If no user is found, raise an error msg = "" if self.alias_type == 'email': msg = _('No user found with this email.') elif self.alias_type == 'mobile': msg = _('No user found with this mobile number.') raise serializers.ValidationError(msg) else: # If new aliases should not register new users. try: user = User.objects.get(**{self.alias_type: alias}) except User.DoesNotExist: user = None if user: if not user.is_active: # If valid, return attrs so we can create a token in our logic controller msg = _('User account is disabled.') raise serializers.ValidationError(msg) else: msg = _('No account is associated with this alias.') raise serializers.ValidationError(msg) else: msg = _('Missing %s.') % self.alias_type raise serializers.ValidationError(msg) attrs['user'] = user return attrs class EmailAuthSerializer(AbstractBaseAliasAuthenticationSerializer): @property def alias_type(self): return 'email' email = serializers.EmailField() class MobileAuthSerializer(AbstractBaseAliasAuthenticationSerializer): @property def alias_type(self): return 'mobile' phone_regex = RegexValidator(regex=r'^[7|9][0-9]{6}$', message="Mobile number must be entered in the format:" " '7xxxxxx' or '9xxxxxx'.") mobile = serializers.CharField(validators=[phone_regex], max_length=15) """ Verification """ class AbstractBaseAliasVerificationSerializer(serializers.Serializer): """ Abstract class that returns a callback token based on the field given Returns a token if valid, None or a message if not. """ @property def alias_type(self): # The alias type, either email or mobile raise NotImplementedError def validate(self, attrs): msg = _('There was a problem with your request.') if self.alias_type: # Get request.user # Get their specified valid endpoint # Validate request = self.context["request"] if request and hasattr(request, "user"): user = request.user if user: if not user.is_active: # If valid, return attrs so we can create a token in our logic controller msg = _('User account is disabled.') else: if hasattr(user, self.alias_type): # Has the appropriate alias type attrs['user'] = user return attrs else: msg = _('This user doesn\'t have an %s.' % self.alias_type) raise serializers.ValidationError(msg) else: msg = _('Missing %s.') % self.alias_type raise serializers.ValidationError(msg) class EmailVerificationSerializer(AbstractBaseAliasVerificationSerializer): @property def alias_type(self): return 'email' class MobileVerificationSerializer(AbstractBaseAliasVerificationSerializer): @property def alias_type(self): return 'mobile' """ Callback Token """ def token_age_validator(value): """ Check token age Makes sure a token is within the proper expiration datetime window. """ valid_token = validate_token_age(value) if not valid_token: raise serializers.ValidationError("The token you entered isn't valid.") return value class AbstractBaseCallbackTokenSerializer(serializers.Serializer): """ Abstract class inspired by DRF's own token serializer. Returns a user if valid, None or a message if not. """ token = TokenField(min_length=6, max_length=6, validators=[token_age_validator]) class CallbackTokenAuthSerializer(AbstractBaseCallbackTokenSerializer): def validate(self, attrs): callback_token = attrs.get('token', None) token = CallbackToken.objects.get(key=callback_token, is_active=True) if token: # Check the token type for our uni-auth method. # authenticates and checks the expiry of the callback token. user = authenticate_by_token(token) if user: if not user.is_active: msg = _('User account is disabled.') raise serializers.ValidationError(msg) if api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED \ or api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED: # Mark this alias as verified user = User.objects.get(pk=token.user.pk) success = verify_user_alias(user, token) if success is False: msg = _('Error validating user alias.') raise serializers.ValidationError(msg) attrs['user'] = user return attrs else: msg = _('Invalid Token') raise serializers.ValidationError(msg) else: msg = _('Missing authentication token.') raise serializers.ValidationError(msg) class CallbackTokenVerificationSerializer(AbstractBaseCallbackTokenSerializer): """ Takes a user and a token, verifies the token belongs to the user and validates the alias that the token was sent from. """ def validate(self, attrs): try: user_id = self.context.get("user_id") callback_token = attrs.get('token', None) token = CallbackToken.objects.get(key=callback_token, is_active=True) user = User.objects.get(pk=user_id) if token.user == user: # Check that the token.user is the request.user # Mark this alias as verified success = verify_user_alias(user, token) if success is False: logger.debug("djangopasswordlessknox: Error verifying alias.") attrs['user'] = user return attrs else: msg = _('This token is invalid. Try again later.') logger.debug("djangopasswordlessknox: User token mismatch when verifying alias.") except CallbackToken.DoesNotExist: msg = _('Missing authentication token.') logger.debug("djangopasswordlessknox: Tried to validate alias with bad token.") pass except User.DoesNotExist: msg = _('Missing user.') logger.debug("djangopasswordlessknox: Tried to validate alias with bad user.") pass except PermissionDenied: msg = _('Insufficient permissions.') logger.debug("djangopasswordlessknox: Permission denied while validating alias.") pass raise serializers.ValidationError(msg)